json: Move from lib to include/openvswitch.
[cascardo/ovs.git] / ovsdb / replication.c
1 /*
2  * (c) Copyright 2016 Hewlett Packard Enterprise Development LP
3  * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <config.h>
19
20 #include "replication.h"
21
22 #include "condition.h"
23 #include "openvswitch/json.h"
24 #include "jsonrpc.h"
25 #include "ovsdb.h"
26 #include "ovsdb-error.h"
27 #include "query.h"
28 #include "row.h"
29 #include "stream.h"
30 #include "sset.h"
31 #include "svec.h"
32 #include "table.h"
33 #include "transaction.h"
34
35 static char *remote_ovsdb_server;
36 static struct jsonrpc *rpc;
37 static struct sset monitored_tables = SSET_INITIALIZER(&monitored_tables);
38 static struct sset tables_blacklist = SSET_INITIALIZER(&tables_blacklist);
39 static bool reset_dbs = true;
40
41 static struct jsonrpc *open_jsonrpc(const char *server);
42 static struct ovsdb_error *check_jsonrpc_error(int error,
43                                                struct jsonrpc_msg **reply_);
44 static void fetch_dbs(struct jsonrpc *rpc, struct svec *dbs);
45 static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc,
46                                          const char *database);
47
48 static void send_monitor_requests(struct shash *all_dbs);
49 static void add_monitored_table(struct ovsdb_table_schema *table,
50                                 struct json *monitor_requests);
51
52 static void get_initial_db_state(const struct db *database);
53 static void reset_database(struct ovsdb *db, struct ovsdb_txn *txn);
54 static struct ovsdb_error *reset_databases(struct shash *all_dbs);
55
56 static void check_for_notifications(struct shash *all_dbs);
57 static void process_notification(struct json *table_updates,
58                                  struct ovsdb *database);
59 static struct ovsdb_error *process_table_update(struct json *table_update,
60                                                 const char *table_name,
61                                                 struct ovsdb *database,
62                                                 struct ovsdb_txn *txn);
63
64 static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
65                                           const char *uuid,
66                                           struct ovsdb_table *table,
67                                           struct json *new);
68 static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
69                                           const char *uuid,
70                                           struct ovsdb_table *table);
71 static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
72                                           const char *uuid,
73                                           struct ovsdb_table *table,
74                                           struct json *new);
75 \f
76 void
77 replication_init(void)
78 {
79     sset_init(&monitored_tables);
80     sset_init(&tables_blacklist);
81     reset_dbs = true;
82 }
83
84 void
85 replication_run(struct shash *all_dbs)
86 {
87     if (sset_is_empty(&monitored_tables) && remote_ovsdb_server) {
88         /* Reset local databases. */
89         if (reset_dbs) {
90             struct ovsdb_error *error = reset_databases(all_dbs);
91             if (!error) {
92                 reset_dbs = false;
93             }
94             /* In case of success reseting the databases,
95              * return in order to notify monitors. */
96             return;
97         }
98
99         /* Open JSON-RPC. */
100         jsonrpc_close(rpc);
101         rpc = open_jsonrpc(remote_ovsdb_server);
102         if (!rpc) {
103             return;
104         }
105
106         /* Send monitor requests. */
107         send_monitor_requests(all_dbs);
108     }
109     if (!sset_is_empty(&monitored_tables)) {
110         check_for_notifications(all_dbs);
111     }
112 }
113
114 void
115 set_remote_ovsdb_server(const char *remote_server)
116 {
117     remote_ovsdb_server = nullable_xstrdup(remote_server);
118 }
119
120 const char *
121 get_remote_ovsdb_server(void)
122 {
123     return remote_ovsdb_server;
124 }
125
126 void
127 set_tables_blacklist(const char *blacklist)
128 {
129     replication_init();
130     if (blacklist) {
131         sset_from_delimited_string(&tables_blacklist, blacklist, ",");
132     }
133 }
134
135 struct sset
136 get_tables_blacklist(void)
137 {
138     return tables_blacklist;
139 }
140
141 void
142 disconnect_remote_server(void)
143 {
144     jsonrpc_close(rpc);
145     sset_destroy(&monitored_tables);
146     sset_destroy(&tables_blacklist);
147
148     if (remote_ovsdb_server) {
149         free(remote_ovsdb_server);
150         remote_ovsdb_server = NULL;
151     }
152 }
153
154 const struct db *
155 find_db(const struct shash *all_dbs, const char *db_name)
156 {
157     struct shash_node *node;
158
159     SHASH_FOR_EACH (node, all_dbs) {
160         struct db *db = node->data;
161         if (!strcmp(db->db->schema->name, db_name)) {
162             return db;
163         }
164     }
165
166     return NULL;
167 }
168 \f
169 static struct ovsdb_error *
170 reset_databases(struct shash *all_dbs)
171 {
172     struct shash_node *db_node;
173     struct ovsdb_error *error = NULL;
174
175     SHASH_FOR_EACH (db_node, all_dbs) {
176         struct db *db = db_node->data;
177         struct ovsdb_txn *txn = ovsdb_txn_create(db->db);
178         reset_database(db->db, txn);
179         error = ovsdb_txn_commit(txn, false);
180     }
181
182     return error;
183 }
184
185 static void
186 reset_database(struct ovsdb *db, struct ovsdb_txn *txn)
187 {
188     struct shash_node *table_node;
189
190     SHASH_FOR_EACH (table_node, &db->tables) {
191         struct ovsdb_table *table = table_node->data;
192         struct ovsdb_row *row;
193
194         /* Do not reset if table is blacklisted. */
195         char *blacklist_item = xasprintf(
196             "%s%s%s", db->schema->name, ":", table_node->name);
197         if (!sset_contains(&tables_blacklist, blacklist_item)) {
198             HMAP_FOR_EACH (row, hmap_node, &table->rows) {
199                 ovsdb_txn_row_delete(txn, row);
200             }
201         }
202         free(blacklist_item);
203     }
204 }
205
206 static struct jsonrpc *
207 open_jsonrpc(const char *server)
208 {
209     struct stream *stream;
210     int error;
211
212     error = jsonrpc_stream_open(server, &stream, DSCP_DEFAULT);
213
214     return error ? NULL : jsonrpc_open(stream);
215 }
216
217 static struct ovsdb_error *
218 check_jsonrpc_error(int error, struct jsonrpc_msg **reply_)
219 {
220     struct jsonrpc_msg *reply = *reply_;
221
222     if (error) {
223         return ovsdb_error("transaction failed",
224                            "transaction returned error %d",
225                            error);
226     }
227
228     if (reply->error) {
229         return ovsdb_error("transaction failed",
230                            "transaction returned error: %s",
231                            json_to_string(reply->error, 0));
232     }
233     return NULL;
234 }
235
236 static void
237 fetch_dbs(struct jsonrpc *rpc, struct svec *dbs)
238 {
239     struct jsonrpc_msg *request, *reply;
240     struct ovsdb_error *error;
241     size_t i;
242
243     request = jsonrpc_create_request("list_dbs", json_array_create_empty(),
244                                      NULL);
245
246     error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
247                                 &reply);
248     if (error) {
249         ovsdb_error_assert(error);
250         return;
251     }
252
253     if (reply->result->type != JSON_ARRAY) {
254         ovsdb_error_assert(ovsdb_error("list-dbs failed",
255                                        "list_dbs response is not array"));
256         return;
257     }
258
259     for (i = 0; i < reply->result->u.array.n; i++) {
260         const struct json *name = reply->result->u.array.elems[i];
261
262         if (name->type != JSON_STRING) {
263             ovsdb_error_assert(ovsdb_error(
264                                    "list_dbs failed",
265                                    "list_dbs response %"PRIuSIZE" is not string",
266                                    i));
267         }
268         svec_add(dbs, name->u.string);
269     }
270     jsonrpc_msg_destroy(reply);
271     svec_sort(dbs);
272 }
273
274 static struct ovsdb_schema *
275 fetch_schema(struct jsonrpc *rpc, const char *database)
276 {
277     struct jsonrpc_msg *request, *reply;
278     struct ovsdb_schema *schema;
279     struct ovsdb_error *error;
280
281     request = jsonrpc_create_request("get_schema",
282                                      json_array_create_1(
283                                          json_string_create(database)),
284                                      NULL);
285     error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
286                                 &reply);
287     if (error) {
288         jsonrpc_msg_destroy(reply);
289         ovsdb_error_assert(error);
290         return NULL;
291     }
292
293     error = ovsdb_schema_from_json(reply->result, &schema);
294     if (error) {
295         jsonrpc_msg_destroy(reply);
296         ovsdb_error_assert(error);
297         return NULL;
298     }
299     jsonrpc_msg_destroy(reply);
300
301     return schema;
302 }
303
304 static void
305 send_monitor_requests(struct shash *all_dbs)
306 {
307     const char *db_name;
308     struct svec dbs;
309     size_t i;
310
311     svec_init(&dbs);
312     fetch_dbs(rpc, &dbs);
313     SVEC_FOR_EACH (i, db_name, &dbs) {
314         const struct db *database = find_db(all_dbs, db_name);
315
316         if (database) {
317             struct ovsdb_schema *local_schema, *remote_schema;
318
319             local_schema = database->db->schema;
320             remote_schema = fetch_schema(rpc, db_name);
321             if (ovsdb_schema_equal(local_schema, remote_schema)) {
322                 struct jsonrpc_msg *request;
323                 struct json *monitor, *monitor_request;
324
325                 monitor_request = json_object_create();
326                 size_t n = shash_count(&local_schema->tables);
327                 const struct shash_node **nodes = shash_sort(
328                     &local_schema->tables);
329
330                 for (int j = 0; j < n; j++) {
331                     struct ovsdb_table_schema *table = nodes[j]->data;
332
333                     /* Check if table is not blacklisted. */
334                     char *blacklist_item = xasprintf(
335                         "%s%s%s", db_name, ":", table->name);
336                     if (!sset_contains(&tables_blacklist, blacklist_item)) {
337                         add_monitored_table(table, monitor_request);
338                     }
339                     free(blacklist_item);
340                 }
341                 free(nodes);
342
343                 /* Send monitor request. */
344                 monitor = json_array_create_3(
345                     json_string_create(db_name),
346                     json_string_create(db_name),
347                     monitor_request);
348                 request = jsonrpc_create_request("monitor", monitor, NULL);
349                 jsonrpc_send(rpc, request);
350                 get_initial_db_state(database);
351             }
352             ovsdb_schema_destroy(remote_schema);
353         }
354     }
355     svec_destroy(&dbs);
356 }
357
358 static void
359 get_initial_db_state(const struct db *database)
360 {
361     struct jsonrpc_msg *msg;
362
363     jsonrpc_recv_block(rpc, &msg);
364
365     if (msg->type == JSONRPC_REPLY) {
366         process_notification(msg->result, database->db);
367     }
368 }
369
370 static void
371 add_monitored_table(struct ovsdb_table_schema *table,
372                     struct json *monitor_request)
373 {
374     struct json *monitor_request_array;
375
376     sset_add(&monitored_tables, table->name);
377
378     monitor_request_array = json_array_create_empty();
379     json_array_add(monitor_request_array, json_object_create());
380
381     json_object_put(monitor_request, table->name, monitor_request_array);
382 }
383 \f
384 static void
385 check_for_notifications(struct shash *all_dbs)
386 {
387     struct jsonrpc_msg *msg;
388     int error;
389
390     error = jsonrpc_recv(rpc, &msg);
391     if (error == EAGAIN) {
392         return;
393     } else if (error) {
394         rpc = open_jsonrpc(remote_ovsdb_server);
395         if (!rpc) {
396             /* Remote server went down. */
397             disconnect_remote_server();
398         }
399         jsonrpc_msg_destroy(msg);
400         return;
401     }
402     if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
403         jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
404                                                msg->id));
405     } else if (msg->type == JSONRPC_NOTIFY
406                && !strcmp(msg->method, "update")) {
407         struct json *params = msg->params;
408         if (params->type == JSON_ARRAY
409             && params->u.array.n == 2) {
410             char *db_name = params->u.array.elems[0]->u.string;
411             const struct db *database = find_db(all_dbs, db_name);
412             if (database) {
413                 process_notification(params->u.array.elems[1], database->db);
414             }
415         }
416     }
417     jsonrpc_msg_destroy(msg);
418     jsonrpc_run(rpc);
419 }
420
421 static void
422 process_notification(struct json *table_updates, struct ovsdb *database)
423 {
424     struct ovsdb_error *error;
425     struct ovsdb_txn *txn;
426
427     if (table_updates->type != JSON_OBJECT) {
428         sset_clear(&monitored_tables);
429         return;
430     }
431
432     txn = ovsdb_txn_create(database);
433     error = NULL;
434
435     /* Process each table update. */
436     struct shash_node *node;
437     SHASH_FOR_EACH (node, json_object(table_updates)) {
438         struct json *table_update = node->data;
439         if (table_update) {
440             error = process_table_update(table_update, node->name, database, txn);
441             if (error) {
442                 break;
443             }
444         }
445     }
446
447     if (!error){
448         /* Commit transaction. */
449         error = ovsdb_txn_commit(txn, false);
450         if (error) {
451             ovsdb_error_assert(error);
452             sset_clear(&monitored_tables);
453         }
454     } else {
455         ovsdb_txn_abort(txn);
456         ovsdb_error_assert(error);
457         sset_clear(&monitored_tables);
458     }
459
460     ovsdb_error_destroy(error);
461 }
462
463 static struct ovsdb_error *
464 process_table_update(struct json *table_update, const char *table_name,
465                      struct ovsdb *database, struct ovsdb_txn *txn)
466 {
467     struct shash_node *node;
468     struct ovsdb_table *table;
469     struct ovsdb_error *error;
470
471     if (table_update->type != JSON_OBJECT) {
472         return ovsdb_error("Not a JSON object",
473                            "<table-update> for table is not object");
474     }
475
476     table = ovsdb_get_table(database, table_name);
477     error = NULL;
478
479     SHASH_FOR_EACH (node, json_object(table_update)) {
480         struct json *row_update = node->data;
481         struct json *old, *new;
482
483         if (row_update->type != JSON_OBJECT) {
484             error = ovsdb_error("NOt a JSON object",
485                                 "<row-update> is not object");
486             break;
487         }
488         old = shash_find_data(json_object(row_update), "old");
489         new = shash_find_data(json_object(row_update), "new");
490
491         if (!old) {
492             error = execute_insert(txn, node->name, table, new);
493         } else{
494             if (!new) {
495                 error = execute_delete(txn, node->name, table);
496             } else {
497                 error = execute_update(txn, node->name, table, new);
498             }
499         }
500     }
501     return error;
502 }
503 \f
504 static struct ovsdb_error *
505 execute_insert(struct ovsdb_txn *txn, const char *uuid,
506                struct ovsdb_table *table, struct json *json_row)
507 {
508     struct ovsdb_row *row = NULL;
509     struct uuid row_uuid;
510     struct ovsdb_error *error;
511
512     row = ovsdb_row_create(table);
513     error = ovsdb_row_from_json(row, json_row, NULL, NULL);
514     if (!error) {
515         /* Add UUID to row. */
516         uuid_from_string(&row_uuid, uuid);
517         *ovsdb_row_get_uuid_rw(row) = row_uuid;
518         ovsdb_txn_row_insert(txn, row);
519     } else {
520         ovsdb_row_destroy(row);
521     }
522
523     return error;
524 }
525
526 struct delete_row_cbdata {
527     size_t n_matches;
528     const struct ovsdb_table *table;
529     struct ovsdb_txn *txn;
530 };
531
532 static bool
533 delete_row_cb(const struct ovsdb_row *row, void *dr_)
534 {
535     struct delete_row_cbdata *dr = dr_;
536
537     dr->n_matches++;
538     ovsdb_txn_row_delete(dr->txn, row);
539
540     return true;
541 }
542
543 static struct ovsdb_error *
544 execute_delete(struct ovsdb_txn *txn, const char *uuid,
545                struct ovsdb_table *table)
546 {
547     const struct json *where;
548     struct ovsdb_error *error;
549     struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER(&condition);
550     char where_string[UUID_LEN+29];
551
552     if (!table) {
553         return OVSDB_BUG("null table");
554     }
555
556     snprintf(where_string, sizeof where_string, "%s%s%s",
557              "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
558
559     where = json_from_string(where_string);
560     error = ovsdb_condition_from_json(table->schema, where, NULL, &condition);
561     if (!error) {
562         struct delete_row_cbdata dr;
563
564         dr.n_matches = 0;
565         dr.table = table;
566         dr.txn = txn;
567         ovsdb_query(table, &condition, delete_row_cb, &dr);
568     }
569
570     ovsdb_condition_destroy(&condition);
571     return error;
572 }
573
574 struct update_row_cbdata {
575     size_t n_matches;
576     struct ovsdb_txn *txn;
577     const struct ovsdb_row *row;
578     const struct ovsdb_column_set *columns;
579 };
580
581 static bool
582 update_row_cb(const struct ovsdb_row *row, void *ur_)
583 {
584     struct update_row_cbdata *ur = ur_;
585
586     ur->n_matches++;
587     if (!ovsdb_row_equal_columns(row, ur->row, ur->columns)) {
588         ovsdb_row_update_columns(ovsdb_txn_row_modify(ur->txn, row),
589                                  ur->row, ur->columns);
590     }
591
592     return true;
593 }
594
595 static struct ovsdb_error *
596 execute_update(struct ovsdb_txn *txn, const char *uuid,
597                struct ovsdb_table *table, struct json *json_row)
598 {
599     struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
600     struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER(&condition);
601     struct update_row_cbdata ur;
602     struct ovsdb_row *row;
603     struct ovsdb_error *error;
604     const struct json *where;
605     char where_string[UUID_LEN+29];
606
607     snprintf(where_string, sizeof where_string, "%s%s%s",
608              "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
609     where = json_from_string(where_string);
610
611     row = ovsdb_row_create(table);
612     error = ovsdb_row_from_json(row, json_row, NULL, &columns);
613     if (!error) {
614         error = ovsdb_condition_from_json(table->schema, where, NULL,
615                                           &condition);
616     }
617     if (!error) {
618         ur.n_matches = 0;
619         ur.txn = txn;
620         ur.row = row;
621         ur.columns = &columns;
622         ovsdb_query(table, &condition, update_row_cb, &ur);
623     }
624
625     ovsdb_row_destroy(row);
626     ovsdb_column_set_destroy(&columns);
627     ovsdb_condition_destroy(&condition);
628
629     return error;
630 }
631
632 void
633 replication_usage(void)
634 {
635     printf("\n\
636 Syncing options:\n\
637   --sync-from=SERVER      sync DATABASE from remote SERVER\n\
638   --sync-exclude-tables=DB:TABLE,...\n\
639                           exclude the TABLE in DB from syncing\n");
640 }