f734ae2ac624eebcee7c0b7ef666d15485a638af
[cascardo/ovs.git] / ovsdb / replication.c
1 /*
2  * (c) Copyright 2016 Hewlett Packard Enterprise Development LP
3  * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <config.h>
19
20 #include "replication.h"
21
22 #include "condition.h"
23 #include "json.h"
24 #include "jsonrpc.h"
25 #include "ovsdb.h"
26 #include "ovsdb-error.h"
27 #include "query.h"
28 #include "row.h"
29 #include "stream.h"
30 #include "sset.h"
31 #include "svec.h"
32 #include "table.h"
33 #include "transaction.h"
34
35 static char *remote_ovsdb_server;
36 static struct jsonrpc *rpc;
37 static struct sset monitored_tables = SSET_INITIALIZER(&monitored_tables);
38 static struct sset tables_blacklist = SSET_INITIALIZER(&tables_blacklist);
39 static bool reset_dbs = true;
40
41 void replication_init(void);
42 static struct jsonrpc *open_jsonrpc(const char *server);
43 static struct ovsdb_error *check_jsonrpc_error(int error,
44                                                struct jsonrpc_msg **reply_);
45 static void fetch_dbs(struct jsonrpc *rpc, struct svec *dbs);
46 static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc,
47                                          const char *database);
48
49 static void send_monitor_requests(struct shash *all_dbs);
50 static void add_monitored_table(struct ovsdb_table_schema *table,
51                                 struct json *monitor_requests);
52
53 static void get_initial_db_state(const struct db *database);
54 static void reset_database(struct ovsdb *db, struct ovsdb_txn *txn);
55 static struct ovsdb_error *reset_databases(struct shash *all_dbs);
56
57 static void check_for_notifications(struct shash *all_dbs);
58 static void process_notification(struct json *table_updates,
59                                  struct ovsdb *database);
60 static struct ovsdb_error *process_table_update(struct json *table_update,
61                                                 const char *table_name,
62                                                 struct ovsdb *database,
63                                                 struct ovsdb_txn *txn);
64
65 static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
66                                           const char *uuid,
67                                           struct ovsdb_table *table,
68                                           struct json *new);
69 static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
70                                           const char *uuid,
71                                           struct ovsdb_table *table);
72 static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
73                                           const char *uuid,
74                                           struct ovsdb_table *table,
75                                           struct json *new);
76 \f
77 void
78 replication_init(void)
79 {
80     sset_init(&monitored_tables);
81     sset_init(&tables_blacklist);
82     reset_dbs = true;
83 }
84
85 void
86 replication_run(struct shash *all_dbs)
87 {
88     if (sset_is_empty(&monitored_tables) && remote_ovsdb_server) {
89         /* Reset local databases. */
90         if (reset_dbs) {
91             struct ovsdb_error *error = reset_databases(all_dbs);
92             if (!error) {
93                 reset_dbs = false;
94             }
95             /* In case of success reseting the databases,
96              * return in order to notify monitors. */
97             return;
98         }
99
100         /* Open JSON-RPC. */
101         jsonrpc_close(rpc);
102         rpc = open_jsonrpc(remote_ovsdb_server);
103         if (!rpc) {
104             return;
105         }
106
107         /* Send monitor requests. */
108         send_monitor_requests(all_dbs);
109     }
110     if (!sset_is_empty(&monitored_tables)) {
111         check_for_notifications(all_dbs);
112     }
113 }
114
115 void
116 set_remote_ovsdb_server(const char *remote_server)
117 {
118     remote_ovsdb_server = nullable_xstrdup(remote_server);
119 }
120
121 void
122 set_tables_blacklist(const char *blacklist)
123 {
124     replication_init();
125     if (blacklist) {
126         sset_from_delimited_string(&tables_blacklist, blacklist, ",");
127     }
128 }
129
130 void
131 disconnect_remote_server(void)
132 {
133     jsonrpc_close(rpc);
134     sset_destroy(&monitored_tables);
135     sset_destroy(&tables_blacklist);
136
137     if (remote_ovsdb_server) {
138         free(remote_ovsdb_server);
139         remote_ovsdb_server = NULL;
140     }
141 }
142
143 const struct db *
144 find_db(const struct shash *all_dbs, const char *db_name)
145 {
146     struct shash_node *node;
147
148     SHASH_FOR_EACH (node, all_dbs) {
149         struct db *db = node->data;
150         if (!strcmp(db->db->schema->name, db_name)) {
151             return db;
152         }
153     }
154
155     return NULL;
156 }
157 \f
158 static struct ovsdb_error *
159 reset_databases(struct shash *all_dbs)
160 {
161     struct shash_node *db_node;
162     struct ovsdb_error *error = NULL;
163
164     SHASH_FOR_EACH (db_node, all_dbs) {
165         struct db *db = db_node->data;
166         struct ovsdb_txn *txn = ovsdb_txn_create(db->db);
167         reset_database(db->db, txn);
168         error = ovsdb_txn_commit(txn, false);
169     }
170
171     return error;
172 }
173
174 static void
175 reset_database(struct ovsdb *db, struct ovsdb_txn *txn)
176 {
177     struct shash_node *table_node;
178
179     SHASH_FOR_EACH (table_node, &db->tables) {
180         struct ovsdb_table *table = table_node->data;
181         struct ovsdb_row *row;
182
183         /* Do not reset if table is blacklisted. */
184         char *blacklist_item = xasprintf(
185             "%s%s%s", db->schema->name, ":", table_node->name);
186         if (!sset_contains(&tables_blacklist, blacklist_item)) {
187             HMAP_FOR_EACH (row, hmap_node, &table->rows) {
188                 ovsdb_txn_row_delete(txn, row);
189             }
190         }
191         free(blacklist_item);
192     }
193 }
194
195 static struct jsonrpc *
196 open_jsonrpc(const char *server)
197 {
198     struct stream *stream;
199     int error;
200
201     error = jsonrpc_stream_open(server, &stream, DSCP_DEFAULT);
202
203     return error ? NULL : jsonrpc_open(stream);
204 }
205
206 static struct ovsdb_error *
207 check_jsonrpc_error(int error, struct jsonrpc_msg **reply_)
208 {
209     struct jsonrpc_msg *reply = *reply_;
210
211     if (error) {
212         return ovsdb_error("transaction failed",
213                            "transaction returned error %d",
214                            error);
215     }
216
217     if (reply->error) {
218         return ovsdb_error("transaction failed",
219                            "transaction returned error: %s",
220                            json_to_string(reply->error, 0));
221     }
222     return NULL;
223 }
224
225 static void
226 fetch_dbs(struct jsonrpc *rpc, struct svec *dbs)
227 {
228     struct jsonrpc_msg *request, *reply;
229     struct ovsdb_error *error;
230     size_t i;
231
232     request = jsonrpc_create_request("list_dbs", json_array_create_empty(),
233                                      NULL);
234
235     error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
236                                 &reply);
237     if (error) {
238         ovsdb_error_assert(error);
239         return;
240     }
241
242     if (reply->result->type != JSON_ARRAY) {
243         ovsdb_error_assert(ovsdb_error("list-dbs failed",
244                                        "list_dbs response is not array"));
245         return;
246     }
247
248     for (i = 0; i < reply->result->u.array.n; i++) {
249         const struct json *name = reply->result->u.array.elems[i];
250
251         if (name->type != JSON_STRING) {
252             ovsdb_error_assert(ovsdb_error(
253                                    "list_dbs failed",
254                                    "list_dbs response %"PRIuSIZE" is not string",
255                                    i));
256         }
257         svec_add(dbs, name->u.string);
258     }
259     jsonrpc_msg_destroy(reply);
260     svec_sort(dbs);
261 }
262
263 static struct ovsdb_schema *
264 fetch_schema(struct jsonrpc *rpc, const char *database)
265 {
266     struct jsonrpc_msg *request, *reply;
267     struct ovsdb_schema *schema;
268     struct ovsdb_error *error;
269
270     request = jsonrpc_create_request("get_schema",
271                                      json_array_create_1(
272                                          json_string_create(database)),
273                                      NULL);
274     error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
275                                 &reply);
276     if (error) {
277         jsonrpc_msg_destroy(reply);
278         ovsdb_error_assert(error);
279         return NULL;
280     }
281
282     error = ovsdb_schema_from_json(reply->result, &schema);
283     if (error) {
284         jsonrpc_msg_destroy(reply);
285         ovsdb_error_assert(error);
286         return NULL;
287     }
288     jsonrpc_msg_destroy(reply);
289
290     return schema;
291 }
292
293 static void
294 send_monitor_requests(struct shash *all_dbs)
295 {
296     const char *db_name;
297     struct svec dbs;
298     size_t i;
299
300     svec_init(&dbs);
301     fetch_dbs(rpc, &dbs);
302     SVEC_FOR_EACH (i, db_name, &dbs) {
303         const struct db *database = find_db(all_dbs, db_name);
304
305         if (database) {
306             struct ovsdb_schema *local_schema, *remote_schema;
307
308             local_schema = database->db->schema;
309             remote_schema = fetch_schema(rpc, db_name);
310             if (ovsdb_schema_equal(local_schema, remote_schema)) {
311                 struct jsonrpc_msg *request;
312                 struct json *monitor, *monitor_request;
313
314                 monitor_request = json_object_create();
315                 size_t n = shash_count(&local_schema->tables);
316                 const struct shash_node **nodes = shash_sort(
317                     &local_schema->tables);
318
319                 for (int j = 0; j < n; j++) {
320                     struct ovsdb_table_schema *table = nodes[j]->data;
321
322                     /* Check if table is not blacklisted. */
323                     char *blacklist_item = xasprintf(
324                         "%s%s%s", db_name, ":", table->name);
325                     if (!sset_contains(&tables_blacklist, blacklist_item)) {
326                         add_monitored_table(table, monitor_request);
327                     }
328                     free(blacklist_item);
329                 }
330                 free(nodes);
331
332                 /* Send monitor request. */
333                 monitor = json_array_create_3(
334                     json_string_create(db_name),
335                     json_string_create(db_name),
336                     monitor_request);
337                 request = jsonrpc_create_request("monitor", monitor, NULL);
338                 jsonrpc_send(rpc, request);
339                 get_initial_db_state(database);
340             }
341             ovsdb_schema_destroy(remote_schema);
342         }
343     }
344     svec_destroy(&dbs);
345 }
346
347 static void
348 get_initial_db_state(const struct db *database)
349 {
350     struct jsonrpc_msg *msg;
351
352     jsonrpc_recv_block(rpc, &msg);
353
354     if (msg->type == JSONRPC_REPLY) {
355         process_notification(msg->result, database->db);
356     }
357 }
358
359 static void
360 add_monitored_table(struct ovsdb_table_schema *table,
361                     struct json *monitor_request)
362 {
363     struct json *monitor_request_array;
364
365     sset_add(&monitored_tables, table->name);
366
367     monitor_request_array = json_array_create_empty();
368     json_array_add(monitor_request_array, json_object_create());
369
370     json_object_put(monitor_request, table->name, monitor_request_array);
371 }
372 \f
373 static void
374 check_for_notifications(struct shash *all_dbs)
375 {
376     struct jsonrpc_msg *msg;
377     int error;
378
379     error = jsonrpc_recv(rpc, &msg);
380     if (error == EAGAIN) {
381         return;
382     } else if (error) {
383         rpc = open_jsonrpc(remote_ovsdb_server);
384         if (!rpc) {
385             /* Remote server went down. */
386             disconnect_remote_server();
387         }
388         jsonrpc_msg_destroy(msg);
389         return;
390     }
391     if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
392         jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
393                                                msg->id));
394     } else if (msg->type == JSONRPC_NOTIFY
395                && !strcmp(msg->method, "update")) {
396         struct json *params = msg->params;
397         if (params->type == JSON_ARRAY
398             && params->u.array.n == 2) {
399             char *db_name = params->u.array.elems[0]->u.string;
400             const struct db *database = find_db(all_dbs, db_name);
401             if (database) {
402                 process_notification(params->u.array.elems[1], database->db);
403             }
404         }
405     }
406     jsonrpc_msg_destroy(msg);
407     jsonrpc_run(rpc);
408 }
409
410 static void
411 process_notification(struct json *table_updates, struct ovsdb *database)
412 {
413     struct ovsdb_error *error;
414     struct ovsdb_txn *txn;
415
416     if (table_updates->type != JSON_OBJECT) {
417         sset_clear(&monitored_tables);
418         return;
419     }
420
421     txn = ovsdb_txn_create(database);
422     error = NULL;
423
424     /* Process each table update. */
425     struct shash_node *node;
426     SHASH_FOR_EACH (node, json_object(table_updates)) {
427         struct json *table_update = node->data;
428         if (table_update) {
429             error = process_table_update(table_update, node->name, database, txn);
430             if (error) {
431                 break;
432             }
433         }
434     }
435
436     if (!error){
437         /* Commit transaction. */
438         error = ovsdb_txn_commit(txn, false);
439         if (error) {
440             ovsdb_error_assert(error);
441             sset_clear(&monitored_tables);
442         }
443     } else {
444         ovsdb_txn_abort(txn);
445         ovsdb_error_assert(error);
446         sset_clear(&monitored_tables);
447     }
448
449     ovsdb_error_destroy(error);
450 }
451
452 static struct ovsdb_error *
453 process_table_update(struct json *table_update, const char *table_name,
454                      struct ovsdb *database, struct ovsdb_txn *txn)
455 {
456     struct shash_node *node;
457     struct ovsdb_table *table;
458     struct ovsdb_error *error;
459
460     if (table_update->type != JSON_OBJECT) {
461         return ovsdb_error("Not a JSON object",
462                            "<table-update> for table is not object");
463     }
464
465     table = ovsdb_get_table(database, table_name);
466     error = NULL;
467
468     SHASH_FOR_EACH (node, json_object(table_update)) {
469         struct json *row_update = node->data;
470         struct json *old, *new;
471
472         if (row_update->type != JSON_OBJECT) {
473             error = ovsdb_error("NOt a JSON object",
474                                 "<row-update> is not object");
475             break;
476         }
477         old = shash_find_data(json_object(row_update), "old");
478         new = shash_find_data(json_object(row_update), "new");
479
480         if (!old) {
481             error = execute_insert(txn, node->name, table, new);
482         } else{
483             if (!new) {
484                 error = execute_delete(txn, node->name, table);
485             } else {
486                 error = execute_update(txn, node->name, table, new);
487             }
488         }
489     }
490     return error;
491 }
492 \f
493 static struct ovsdb_error *
494 execute_insert(struct ovsdb_txn *txn, const char *uuid,
495                struct ovsdb_table *table, struct json *json_row)
496 {
497     struct ovsdb_row *row = NULL;
498     struct uuid row_uuid;
499     struct ovsdb_error *error;
500
501     row = ovsdb_row_create(table);
502     error = ovsdb_row_from_json(row, json_row, NULL, NULL);
503     if (!error) {
504         /* Add UUID to row. */
505         uuid_from_string(&row_uuid, uuid);
506         *ovsdb_row_get_uuid_rw(row) = row_uuid;
507         ovsdb_txn_row_insert(txn, row);
508     } else {
509         ovsdb_row_destroy(row);
510     }
511
512     return error;
513 }
514
515 struct delete_row_cbdata {
516     size_t n_matches;
517     const struct ovsdb_table *table;
518     struct ovsdb_txn *txn;
519 };
520
521 static bool
522 delete_row_cb(const struct ovsdb_row *row, void *dr_)
523 {
524     struct delete_row_cbdata *dr = dr_;
525
526     dr->n_matches++;
527     ovsdb_txn_row_delete(dr->txn, row);
528
529     return true;
530 }
531
532 static struct ovsdb_error *
533 execute_delete(struct ovsdb_txn *txn, const char *uuid,
534                struct ovsdb_table *table)
535 {
536     const struct json *where;
537     struct ovsdb_error *error;
538     struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER(&condition);
539     char where_string[UUID_LEN+29];
540
541     if (!table) {
542         return OVSDB_BUG("null table");
543     }
544
545     snprintf(where_string, sizeof where_string, "%s%s%s",
546              "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
547
548     where = json_from_string(where_string);
549     error = ovsdb_condition_from_json(table->schema, where, NULL, &condition);
550     if (!error) {
551         struct delete_row_cbdata dr;
552
553         dr.n_matches = 0;
554         dr.table = table;
555         dr.txn = txn;
556         ovsdb_query(table, &condition, delete_row_cb, &dr);
557     }
558
559     ovsdb_condition_destroy(&condition);
560     return error;
561 }
562
563 struct update_row_cbdata {
564     size_t n_matches;
565     struct ovsdb_txn *txn;
566     const struct ovsdb_row *row;
567     const struct ovsdb_column_set *columns;
568 };
569
570 static bool
571 update_row_cb(const struct ovsdb_row *row, void *ur_)
572 {
573     struct update_row_cbdata *ur = ur_;
574
575     ur->n_matches++;
576     if (!ovsdb_row_equal_columns(row, ur->row, ur->columns)) {
577         ovsdb_row_update_columns(ovsdb_txn_row_modify(ur->txn, row),
578                                  ur->row, ur->columns);
579     }
580
581     return true;
582 }
583
584 static struct ovsdb_error *
585 execute_update(struct ovsdb_txn *txn, const char *uuid,
586                struct ovsdb_table *table, struct json *json_row)
587 {
588     struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
589     struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER(&condition);
590     struct update_row_cbdata ur;
591     struct ovsdb_row *row;
592     struct ovsdb_error *error;
593     const struct json *where;
594     char where_string[UUID_LEN+29];
595
596     snprintf(where_string, sizeof where_string, "%s%s%s",
597              "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
598     where = json_from_string(where_string);
599
600     row = ovsdb_row_create(table);
601     error = ovsdb_row_from_json(row, json_row, NULL, &columns);
602     if (!error) {
603         error = ovsdb_condition_from_json(table->schema, where, NULL,
604                                           &condition);
605     }
606     if (!error) {
607         ur.n_matches = 0;
608         ur.txn = txn;
609         ur.row = row;
610         ur.columns = &columns;
611         ovsdb_query(table, &condition, update_row_cb, &ur);
612     }
613
614     ovsdb_row_destroy(row);
615     ovsdb_column_set_destroy(&columns);
616     ovsdb_condition_destroy(&condition);
617
618     return error;
619 }
620
621 void
622 replication_usage(void)
623 {
624     printf("\n\
625 Syncing options:\n\
626   --sync-from=SERVER      sync DATABASE from remote SERVER\n\
627   --sync-exclude-tables=DB:TABLE,...\n\
628                           exclude the TABLE in DB from syncing\n");
629 }