2 * (c) Copyright 2016 Hewlett Packard Enterprise Development LP
3 * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 #include "replication.h"
22 #include "condition.h"
23 #include "openvswitch/json.h"
26 #include "ovsdb-error.h"
33 #include "transaction.h"
35 static char *remote_ovsdb_server;
36 static struct jsonrpc *rpc;
37 static struct sset monitored_tables = SSET_INITIALIZER(&monitored_tables);
38 static struct sset tables_blacklist = SSET_INITIALIZER(&tables_blacklist);
39 static bool reset_dbs = true;
41 static struct jsonrpc *open_jsonrpc(const char *server);
42 static struct ovsdb_error *check_jsonrpc_error(int error,
43 struct jsonrpc_msg **reply_);
44 static void fetch_dbs(struct jsonrpc *rpc, struct svec *dbs);
45 static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc,
46 const char *database);
48 static void send_monitor_requests(struct shash *all_dbs);
49 static void add_monitored_table(struct ovsdb_table_schema *table,
50 struct json *monitor_requests);
52 static void get_initial_db_state(const struct db *database);
53 static void reset_database(struct ovsdb *db, struct ovsdb_txn *txn);
54 static struct ovsdb_error *reset_databases(struct shash *all_dbs);
56 static void check_for_notifications(struct shash *all_dbs);
57 static void process_notification(struct json *table_updates,
58 struct ovsdb *database);
59 static struct ovsdb_error *process_table_update(struct json *table_update,
60 const char *table_name,
61 struct ovsdb *database,
62 struct ovsdb_txn *txn);
64 static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
66 struct ovsdb_table *table,
68 static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
70 struct ovsdb_table *table);
71 static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
73 struct ovsdb_table *table,
77 replication_init(void)
79 sset_init(&monitored_tables);
80 sset_init(&tables_blacklist);
85 replication_run(struct shash *all_dbs)
87 if (sset_is_empty(&monitored_tables) && remote_ovsdb_server) {
88 /* Reset local databases. */
90 struct ovsdb_error *error = reset_databases(all_dbs);
94 /* In case of success reseting the databases,
95 * return in order to notify monitors. */
101 rpc = open_jsonrpc(remote_ovsdb_server);
106 /* Send monitor requests. */
107 send_monitor_requests(all_dbs);
109 if (!sset_is_empty(&monitored_tables)) {
110 check_for_notifications(all_dbs);
115 set_remote_ovsdb_server(const char *remote_server)
117 remote_ovsdb_server = nullable_xstrdup(remote_server);
121 get_remote_ovsdb_server(void)
123 return remote_ovsdb_server;
127 set_tables_blacklist(const char *blacklist)
131 sset_from_delimited_string(&tables_blacklist, blacklist, ",");
136 get_tables_blacklist(void)
138 return tables_blacklist;
142 disconnect_remote_server(void)
145 sset_destroy(&monitored_tables);
146 sset_destroy(&tables_blacklist);
148 if (remote_ovsdb_server) {
149 free(remote_ovsdb_server);
150 remote_ovsdb_server = NULL;
155 find_db(const struct shash *all_dbs, const char *db_name)
157 struct shash_node *node;
159 SHASH_FOR_EACH (node, all_dbs) {
160 struct db *db = node->data;
161 if (!strcmp(db->db->schema->name, db_name)) {
169 static struct ovsdb_error *
170 reset_databases(struct shash *all_dbs)
172 struct shash_node *db_node;
173 struct ovsdb_error *error = NULL;
175 SHASH_FOR_EACH (db_node, all_dbs) {
176 struct db *db = db_node->data;
177 struct ovsdb_txn *txn = ovsdb_txn_create(db->db);
178 reset_database(db->db, txn);
179 error = ovsdb_txn_commit(txn, false);
186 reset_database(struct ovsdb *db, struct ovsdb_txn *txn)
188 struct shash_node *table_node;
190 SHASH_FOR_EACH (table_node, &db->tables) {
191 struct ovsdb_table *table = table_node->data;
192 struct ovsdb_row *row;
194 /* Do not reset if table is blacklisted. */
195 char *blacklist_item = xasprintf(
196 "%s%s%s", db->schema->name, ":", table_node->name);
197 if (!sset_contains(&tables_blacklist, blacklist_item)) {
198 HMAP_FOR_EACH (row, hmap_node, &table->rows) {
199 ovsdb_txn_row_delete(txn, row);
202 free(blacklist_item);
206 static struct jsonrpc *
207 open_jsonrpc(const char *server)
209 struct stream *stream;
212 error = jsonrpc_stream_open(server, &stream, DSCP_DEFAULT);
214 return error ? NULL : jsonrpc_open(stream);
217 static struct ovsdb_error *
218 check_jsonrpc_error(int error, struct jsonrpc_msg **reply_)
220 struct jsonrpc_msg *reply = *reply_;
223 return ovsdb_error("transaction failed",
224 "transaction returned error %d",
229 return ovsdb_error("transaction failed",
230 "transaction returned error: %s",
231 json_to_string(reply->error, 0));
237 fetch_dbs(struct jsonrpc *rpc, struct svec *dbs)
239 struct jsonrpc_msg *request, *reply;
240 struct ovsdb_error *error;
243 request = jsonrpc_create_request("list_dbs", json_array_create_empty(),
246 error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
249 ovsdb_error_assert(error);
253 if (reply->result->type != JSON_ARRAY) {
254 ovsdb_error_assert(ovsdb_error("list-dbs failed",
255 "list_dbs response is not array"));
259 for (i = 0; i < reply->result->u.array.n; i++) {
260 const struct json *name = reply->result->u.array.elems[i];
262 if (name->type != JSON_STRING) {
263 ovsdb_error_assert(ovsdb_error(
265 "list_dbs response %"PRIuSIZE" is not string",
268 svec_add(dbs, name->u.string);
270 jsonrpc_msg_destroy(reply);
274 static struct ovsdb_schema *
275 fetch_schema(struct jsonrpc *rpc, const char *database)
277 struct jsonrpc_msg *request, *reply;
278 struct ovsdb_schema *schema;
279 struct ovsdb_error *error;
281 request = jsonrpc_create_request("get_schema",
283 json_string_create(database)),
285 error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
288 jsonrpc_msg_destroy(reply);
289 ovsdb_error_assert(error);
293 error = ovsdb_schema_from_json(reply->result, &schema);
295 jsonrpc_msg_destroy(reply);
296 ovsdb_error_assert(error);
299 jsonrpc_msg_destroy(reply);
305 send_monitor_requests(struct shash *all_dbs)
312 fetch_dbs(rpc, &dbs);
313 SVEC_FOR_EACH (i, db_name, &dbs) {
314 const struct db *database = find_db(all_dbs, db_name);
317 struct ovsdb_schema *local_schema, *remote_schema;
319 local_schema = database->db->schema;
320 remote_schema = fetch_schema(rpc, db_name);
321 if (ovsdb_schema_equal(local_schema, remote_schema)) {
322 struct jsonrpc_msg *request;
323 struct json *monitor, *monitor_request;
325 monitor_request = json_object_create();
326 size_t n = shash_count(&local_schema->tables);
327 const struct shash_node **nodes = shash_sort(
328 &local_schema->tables);
330 for (int j = 0; j < n; j++) {
331 struct ovsdb_table_schema *table = nodes[j]->data;
333 /* Check if table is not blacklisted. */
334 char *blacklist_item = xasprintf(
335 "%s%s%s", db_name, ":", table->name);
336 if (!sset_contains(&tables_blacklist, blacklist_item)) {
337 add_monitored_table(table, monitor_request);
339 free(blacklist_item);
343 /* Send monitor request. */
344 monitor = json_array_create_3(
345 json_string_create(db_name),
346 json_string_create(db_name),
348 request = jsonrpc_create_request("monitor", monitor, NULL);
349 jsonrpc_send(rpc, request);
350 get_initial_db_state(database);
352 ovsdb_schema_destroy(remote_schema);
359 get_initial_db_state(const struct db *database)
361 struct jsonrpc_msg *msg;
363 jsonrpc_recv_block(rpc, &msg);
365 if (msg->type == JSONRPC_REPLY) {
366 process_notification(msg->result, database->db);
371 add_monitored_table(struct ovsdb_table_schema *table,
372 struct json *monitor_request)
374 struct json *monitor_request_array;
376 sset_add(&monitored_tables, table->name);
378 monitor_request_array = json_array_create_empty();
379 json_array_add(monitor_request_array, json_object_create());
381 json_object_put(monitor_request, table->name, monitor_request_array);
385 check_for_notifications(struct shash *all_dbs)
387 struct jsonrpc_msg *msg;
390 error = jsonrpc_recv(rpc, &msg);
391 if (error == EAGAIN) {
394 rpc = open_jsonrpc(remote_ovsdb_server);
396 /* Remote server went down. */
397 disconnect_remote_server();
399 jsonrpc_msg_destroy(msg);
402 if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
403 jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
405 } else if (msg->type == JSONRPC_NOTIFY
406 && !strcmp(msg->method, "update")) {
407 struct json *params = msg->params;
408 if (params->type == JSON_ARRAY
409 && params->u.array.n == 2) {
410 char *db_name = params->u.array.elems[0]->u.string;
411 const struct db *database = find_db(all_dbs, db_name);
413 process_notification(params->u.array.elems[1], database->db);
417 jsonrpc_msg_destroy(msg);
422 process_notification(struct json *table_updates, struct ovsdb *database)
424 struct ovsdb_error *error;
425 struct ovsdb_txn *txn;
427 if (table_updates->type != JSON_OBJECT) {
428 sset_clear(&monitored_tables);
432 txn = ovsdb_txn_create(database);
435 /* Process each table update. */
436 struct shash_node *node;
437 SHASH_FOR_EACH (node, json_object(table_updates)) {
438 struct json *table_update = node->data;
440 error = process_table_update(table_update, node->name, database, txn);
448 /* Commit transaction. */
449 error = ovsdb_txn_commit(txn, false);
451 ovsdb_error_assert(error);
452 sset_clear(&monitored_tables);
455 ovsdb_txn_abort(txn);
456 ovsdb_error_assert(error);
457 sset_clear(&monitored_tables);
460 ovsdb_error_destroy(error);
463 static struct ovsdb_error *
464 process_table_update(struct json *table_update, const char *table_name,
465 struct ovsdb *database, struct ovsdb_txn *txn)
467 struct shash_node *node;
468 struct ovsdb_table *table;
469 struct ovsdb_error *error;
471 if (table_update->type != JSON_OBJECT) {
472 return ovsdb_error("Not a JSON object",
473 "<table-update> for table is not object");
476 table = ovsdb_get_table(database, table_name);
479 SHASH_FOR_EACH (node, json_object(table_update)) {
480 struct json *row_update = node->data;
481 struct json *old, *new;
483 if (row_update->type != JSON_OBJECT) {
484 error = ovsdb_error("NOt a JSON object",
485 "<row-update> is not object");
488 old = shash_find_data(json_object(row_update), "old");
489 new = shash_find_data(json_object(row_update), "new");
492 error = execute_insert(txn, node->name, table, new);
495 error = execute_delete(txn, node->name, table);
497 error = execute_update(txn, node->name, table, new);
504 static struct ovsdb_error *
505 execute_insert(struct ovsdb_txn *txn, const char *uuid,
506 struct ovsdb_table *table, struct json *json_row)
508 struct ovsdb_row *row = NULL;
509 struct uuid row_uuid;
510 struct ovsdb_error *error;
512 row = ovsdb_row_create(table);
513 error = ovsdb_row_from_json(row, json_row, NULL, NULL);
515 /* Add UUID to row. */
516 uuid_from_string(&row_uuid, uuid);
517 *ovsdb_row_get_uuid_rw(row) = row_uuid;
518 ovsdb_txn_row_insert(txn, row);
520 ovsdb_row_destroy(row);
526 struct delete_row_cbdata {
528 const struct ovsdb_table *table;
529 struct ovsdb_txn *txn;
533 delete_row_cb(const struct ovsdb_row *row, void *dr_)
535 struct delete_row_cbdata *dr = dr_;
538 ovsdb_txn_row_delete(dr->txn, row);
543 static struct ovsdb_error *
544 execute_delete(struct ovsdb_txn *txn, const char *uuid,
545 struct ovsdb_table *table)
547 const struct json *where;
548 struct ovsdb_error *error;
549 struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER(&condition);
550 char where_string[UUID_LEN+29];
553 return OVSDB_BUG("null table");
556 snprintf(where_string, sizeof where_string, "%s%s%s",
557 "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
559 where = json_from_string(where_string);
560 error = ovsdb_condition_from_json(table->schema, where, NULL, &condition);
562 struct delete_row_cbdata dr;
567 ovsdb_query(table, &condition, delete_row_cb, &dr);
570 ovsdb_condition_destroy(&condition);
574 struct update_row_cbdata {
576 struct ovsdb_txn *txn;
577 const struct ovsdb_row *row;
578 const struct ovsdb_column_set *columns;
582 update_row_cb(const struct ovsdb_row *row, void *ur_)
584 struct update_row_cbdata *ur = ur_;
587 if (!ovsdb_row_equal_columns(row, ur->row, ur->columns)) {
588 ovsdb_row_update_columns(ovsdb_txn_row_modify(ur->txn, row),
589 ur->row, ur->columns);
595 static struct ovsdb_error *
596 execute_update(struct ovsdb_txn *txn, const char *uuid,
597 struct ovsdb_table *table, struct json *json_row)
599 struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
600 struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER(&condition);
601 struct update_row_cbdata ur;
602 struct ovsdb_row *row;
603 struct ovsdb_error *error;
604 const struct json *where;
605 char where_string[UUID_LEN+29];
607 snprintf(where_string, sizeof where_string, "%s%s%s",
608 "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
609 where = json_from_string(where_string);
611 row = ovsdb_row_create(table);
612 error = ovsdb_row_from_json(row, json_row, NULL, &columns);
614 error = ovsdb_condition_from_json(table->schema, where, NULL,
621 ur.columns = &columns;
622 ovsdb_query(table, &condition, update_row_cb, &ur);
625 ovsdb_row_destroy(row);
626 ovsdb_column_set_destroy(&columns);
627 ovsdb_condition_destroy(&condition);
633 replication_usage(void)
637 --sync-from=SERVER sync DATABASE from remote SERVER\n\
638 --sync-exclude-tables=DB:TABLE,...\n\
639 exclude the TABLE in DB from syncing\n");