2 * (c) Copyright 2016 Hewlett Packard Enterprise Development LP
3 * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 #include "replication.h"
22 #include "condition.h"
26 #include "ovsdb-error.h"
33 #include "transaction.h"
35 static char *remote_ovsdb_server;
36 static struct jsonrpc *rpc;
37 static struct sset monitored_tables = SSET_INITIALIZER(&monitored_tables);
38 static struct sset tables_blacklist = SSET_INITIALIZER(&tables_blacklist);
39 static bool reset_dbs = true;
41 void replication_init(void);
42 static struct jsonrpc *open_jsonrpc(const char *server);
43 static struct ovsdb_error *check_jsonrpc_error(int error,
44 struct jsonrpc_msg **reply_);
45 static void fetch_dbs(struct jsonrpc *rpc, struct svec *dbs);
46 static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc,
47 const char *database);
49 static void send_monitor_requests(struct shash *all_dbs);
50 static void add_monitored_table(struct ovsdb_table_schema *table,
51 struct json *monitor_requests);
53 static void get_initial_db_state(const struct db *database);
54 static void reset_database(struct ovsdb *db, struct ovsdb_txn *txn);
55 static struct ovsdb_error *reset_databases(struct shash *all_dbs);
57 static void check_for_notifications(struct shash *all_dbs);
58 static void process_notification(struct json *table_updates,
59 struct ovsdb *database);
60 static struct ovsdb_error *process_table_update(struct json *table_update,
61 const char *table_name,
62 struct ovsdb *database,
63 struct ovsdb_txn *txn);
65 static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
67 struct ovsdb_table *table,
69 static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
71 struct ovsdb_table *table);
72 static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
74 struct ovsdb_table *table,
78 replication_init(void)
80 sset_init(&monitored_tables);
81 sset_init(&tables_blacklist);
86 replication_run(struct shash *all_dbs)
88 if (sset_is_empty(&monitored_tables) && remote_ovsdb_server) {
89 /* Reset local databases. */
91 struct ovsdb_error *error = reset_databases(all_dbs);
95 /* In case of success reseting the databases,
96 * return in order to notify monitors. */
102 rpc = open_jsonrpc(remote_ovsdb_server);
107 /* Send monitor requests. */
108 send_monitor_requests(all_dbs);
110 if (!sset_is_empty(&monitored_tables)) {
111 check_for_notifications(all_dbs);
116 set_remote_ovsdb_server(const char *remote_server)
118 remote_ovsdb_server = nullable_xstrdup(remote_server);
122 set_tables_blacklist(const char *blacklist)
126 sset_from_delimited_string(&tables_blacklist, blacklist, ",");
131 disconnect_remote_server(void)
134 sset_destroy(&monitored_tables);
135 sset_destroy(&tables_blacklist);
137 if (remote_ovsdb_server) {
138 free(remote_ovsdb_server);
139 remote_ovsdb_server = NULL;
144 find_db(const struct shash *all_dbs, const char *db_name)
146 struct shash_node *node;
148 SHASH_FOR_EACH (node, all_dbs) {
149 struct db *db = node->data;
150 if (!strcmp(db->db->schema->name, db_name)) {
158 static struct ovsdb_error *
159 reset_databases(struct shash *all_dbs)
161 struct shash_node *db_node;
162 struct ovsdb_error *error = NULL;
164 SHASH_FOR_EACH (db_node, all_dbs) {
165 struct db *db = db_node->data;
166 struct ovsdb_txn *txn = ovsdb_txn_create(db->db);
167 reset_database(db->db, txn);
168 error = ovsdb_txn_commit(txn, false);
175 reset_database(struct ovsdb *db, struct ovsdb_txn *txn)
177 struct shash_node *table_node;
179 SHASH_FOR_EACH (table_node, &db->tables) {
180 struct ovsdb_table *table = table_node->data;
181 struct ovsdb_row *row;
183 /* Do not reset if table is blacklisted. */
184 char *blacklist_item = xasprintf(
185 "%s%s%s", db->schema->name, ":", table_node->name);
186 if (!sset_contains(&tables_blacklist, blacklist_item)) {
187 HMAP_FOR_EACH (row, hmap_node, &table->rows) {
188 ovsdb_txn_row_delete(txn, row);
191 free(blacklist_item);
195 static struct jsonrpc *
196 open_jsonrpc(const char *server)
198 struct stream *stream;
201 error = jsonrpc_stream_open(server, &stream, DSCP_DEFAULT);
203 return error ? NULL : jsonrpc_open(stream);
206 static struct ovsdb_error *
207 check_jsonrpc_error(int error, struct jsonrpc_msg **reply_)
209 struct jsonrpc_msg *reply = *reply_;
212 return ovsdb_error("transaction failed",
213 "transaction returned error %d",
218 return ovsdb_error("transaction failed",
219 "transaction returned error: %s",
220 json_to_string(reply->error, 0));
226 fetch_dbs(struct jsonrpc *rpc, struct svec *dbs)
228 struct jsonrpc_msg *request, *reply;
229 struct ovsdb_error *error;
232 request = jsonrpc_create_request("list_dbs", json_array_create_empty(),
235 error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
238 ovsdb_error_assert(error);
242 if (reply->result->type != JSON_ARRAY) {
243 ovsdb_error_assert(ovsdb_error("list-dbs failed",
244 "list_dbs response is not array"));
248 for (i = 0; i < reply->result->u.array.n; i++) {
249 const struct json *name = reply->result->u.array.elems[i];
251 if (name->type != JSON_STRING) {
252 ovsdb_error_assert(ovsdb_error(
254 "list_dbs response %"PRIuSIZE" is not string",
257 svec_add(dbs, name->u.string);
259 jsonrpc_msg_destroy(reply);
263 static struct ovsdb_schema *
264 fetch_schema(struct jsonrpc *rpc, const char *database)
266 struct jsonrpc_msg *request, *reply;
267 struct ovsdb_schema *schema;
268 struct ovsdb_error *error;
270 request = jsonrpc_create_request("get_schema",
272 json_string_create(database)),
274 error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
277 jsonrpc_msg_destroy(reply);
278 ovsdb_error_assert(error);
282 error = ovsdb_schema_from_json(reply->result, &schema);
284 jsonrpc_msg_destroy(reply);
285 ovsdb_error_assert(error);
288 jsonrpc_msg_destroy(reply);
294 send_monitor_requests(struct shash *all_dbs)
301 fetch_dbs(rpc, &dbs);
302 SVEC_FOR_EACH (i, db_name, &dbs) {
303 const struct db *database = find_db(all_dbs, db_name);
306 struct ovsdb_schema *local_schema, *remote_schema;
308 local_schema = database->db->schema;
309 remote_schema = fetch_schema(rpc, db_name);
310 if (ovsdb_schema_equal(local_schema, remote_schema)) {
311 struct jsonrpc_msg *request;
312 struct json *monitor, *monitor_request;
314 monitor_request = json_object_create();
315 size_t n = shash_count(&local_schema->tables);
316 const struct shash_node **nodes = shash_sort(
317 &local_schema->tables);
319 for (int j = 0; j < n; j++) {
320 struct ovsdb_table_schema *table = nodes[j]->data;
322 /* Check if table is not blacklisted. */
323 char *blacklist_item = xasprintf(
324 "%s%s%s", db_name, ":", table->name);
325 if (!sset_contains(&tables_blacklist, blacklist_item)) {
326 add_monitored_table(table, monitor_request);
328 free(blacklist_item);
332 /* Send monitor request. */
333 monitor = json_array_create_3(
334 json_string_create(db_name),
335 json_string_create(db_name),
337 request = jsonrpc_create_request("monitor", monitor, NULL);
338 jsonrpc_send(rpc, request);
339 get_initial_db_state(database);
341 ovsdb_schema_destroy(remote_schema);
348 get_initial_db_state(const struct db *database)
350 struct jsonrpc_msg *msg;
352 jsonrpc_recv_block(rpc, &msg);
354 if (msg->type == JSONRPC_REPLY) {
355 process_notification(msg->result, database->db);
360 add_monitored_table(struct ovsdb_table_schema *table,
361 struct json *monitor_request)
363 struct json *monitor_request_array;
365 sset_add(&monitored_tables, table->name);
367 monitor_request_array = json_array_create_empty();
368 json_array_add(monitor_request_array, json_object_create());
370 json_object_put(monitor_request, table->name, monitor_request_array);
374 check_for_notifications(struct shash *all_dbs)
376 struct jsonrpc_msg *msg;
379 error = jsonrpc_recv(rpc, &msg);
380 if (error == EAGAIN) {
383 rpc = open_jsonrpc(remote_ovsdb_server);
385 /* Remote server went down. */
386 disconnect_remote_server();
388 jsonrpc_msg_destroy(msg);
391 if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
392 jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
394 } else if (msg->type == JSONRPC_NOTIFY
395 && !strcmp(msg->method, "update")) {
396 struct json *params = msg->params;
397 if (params->type == JSON_ARRAY
398 && params->u.array.n == 2) {
399 char *db_name = params->u.array.elems[0]->u.string;
400 const struct db *database = find_db(all_dbs, db_name);
402 process_notification(params->u.array.elems[1], database->db);
406 jsonrpc_msg_destroy(msg);
411 process_notification(struct json *table_updates, struct ovsdb *database)
413 struct ovsdb_error *error;
414 struct ovsdb_txn *txn;
416 if (table_updates->type != JSON_OBJECT) {
417 sset_clear(&monitored_tables);
421 txn = ovsdb_txn_create(database);
424 /* Process each table update. */
425 struct shash_node *node;
426 SHASH_FOR_EACH (node, json_object(table_updates)) {
427 struct json *table_update = node->data;
429 error = process_table_update(table_update, node->name, database, txn);
437 /* Commit transaction. */
438 error = ovsdb_txn_commit(txn, false);
440 ovsdb_error_assert(error);
441 sset_clear(&monitored_tables);
444 ovsdb_txn_abort(txn);
445 ovsdb_error_assert(error);
446 sset_clear(&monitored_tables);
449 ovsdb_error_destroy(error);
452 static struct ovsdb_error *
453 process_table_update(struct json *table_update, const char *table_name,
454 struct ovsdb *database, struct ovsdb_txn *txn)
456 struct shash_node *node;
457 struct ovsdb_table *table;
458 struct ovsdb_error *error;
460 if (table_update->type != JSON_OBJECT) {
461 return ovsdb_error("Not a JSON object",
462 "<table-update> for table is not object");
465 table = ovsdb_get_table(database, table_name);
468 SHASH_FOR_EACH (node, json_object(table_update)) {
469 struct json *row_update = node->data;
470 struct json *old, *new;
472 if (row_update->type != JSON_OBJECT) {
473 error = ovsdb_error("NOt a JSON object",
474 "<row-update> is not object");
477 old = shash_find_data(json_object(row_update), "old");
478 new = shash_find_data(json_object(row_update), "new");
481 error = execute_insert(txn, node->name, table, new);
484 error = execute_delete(txn, node->name, table);
486 error = execute_update(txn, node->name, table, new);
493 static struct ovsdb_error *
494 execute_insert(struct ovsdb_txn *txn, const char *uuid,
495 struct ovsdb_table *table, struct json *json_row)
497 struct ovsdb_row *row = NULL;
498 struct uuid row_uuid;
499 struct ovsdb_error *error;
501 row = ovsdb_row_create(table);
502 error = ovsdb_row_from_json(row, json_row, NULL, NULL);
504 /* Add UUID to row. */
505 uuid_from_string(&row_uuid, uuid);
506 *ovsdb_row_get_uuid_rw(row) = row_uuid;
507 ovsdb_txn_row_insert(txn, row);
509 ovsdb_row_destroy(row);
515 struct delete_row_cbdata {
517 const struct ovsdb_table *table;
518 struct ovsdb_txn *txn;
522 delete_row_cb(const struct ovsdb_row *row, void *dr_)
524 struct delete_row_cbdata *dr = dr_;
527 ovsdb_txn_row_delete(dr->txn, row);
532 static struct ovsdb_error *
533 execute_delete(struct ovsdb_txn *txn, const char *uuid,
534 struct ovsdb_table *table)
536 const struct json *where;
537 struct ovsdb_error *error;
538 struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER;
539 char where_string[UUID_LEN+29];
542 return OVSDB_BUG("null table");
545 snprintf(where_string, sizeof where_string, "%s%s%s",
546 "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
548 where = json_from_string(where_string);
549 error = ovsdb_condition_from_json(table->schema, where, NULL, &condition);
551 struct delete_row_cbdata dr;
556 ovsdb_query(table, &condition, delete_row_cb, &dr);
559 ovsdb_condition_destroy(&condition);
563 struct update_row_cbdata {
565 struct ovsdb_txn *txn;
566 const struct ovsdb_row *row;
567 const struct ovsdb_column_set *columns;
571 update_row_cb(const struct ovsdb_row *row, void *ur_)
573 struct update_row_cbdata *ur = ur_;
576 if (!ovsdb_row_equal_columns(row, ur->row, ur->columns)) {
577 ovsdb_row_update_columns(ovsdb_txn_row_modify(ur->txn, row),
578 ur->row, ur->columns);
584 static struct ovsdb_error *
585 execute_update(struct ovsdb_txn *txn, const char *uuid,
586 struct ovsdb_table *table, struct json *json_row)
588 struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
589 struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER;
590 struct update_row_cbdata ur;
591 struct ovsdb_row *row;
592 struct ovsdb_error *error;
593 const struct json *where;
594 char where_string[UUID_LEN+29];
596 snprintf(where_string, sizeof where_string, "%s%s%s",
597 "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
598 where = json_from_string(where_string);
600 row = ovsdb_row_create(table);
601 error = ovsdb_row_from_json(row, json_row, NULL, &columns);
603 error = ovsdb_condition_from_json(table->schema, where, NULL,
610 ur.columns = &columns;
611 ovsdb_query(table, &condition, update_row_cb, &ur);
614 ovsdb_row_destroy(row);
615 ovsdb_column_set_destroy(&columns);
616 ovsdb_condition_destroy(&condition);
622 replication_usage(void)
626 --sync-from=SERVER sync DATABASE from remote SERVER\n\
627 --sync-exclude-tables=DB:TABLE,...\n\
628 exclude the TABLE in DB from syncing\n");