2 * Copyright (c) 2015 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 #include "dynamic-string.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
34 #include "transaction.h"
35 #include "jsonrpc-server.h"
37 #include "openvswitch/vlog.h"
40 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
44 * ovsdb_monitor keep track of the ovsdb changes.
47 /* A collection of tables being monitored. */
48 struct ovsdb_monitor {
49 struct ovsdb_replica replica;
50 struct shash tables; /* Holds "struct ovsdb_monitor_table"s. */
51 struct ovs_list jsonrpc_monitors; /* Contains "jsonrpc_monitor_node"s. */
53 uint64_t n_transactions; /* Count number of committed transactions. */
56 struct jsonrpc_monitor_node {
57 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
61 /* A particular column being monitored. */
62 struct ovsdb_monitor_column {
63 const struct ovsdb_column *column;
64 enum ovsdb_monitor_selection select;
67 /* A row that has changed in a monitored table. */
68 struct ovsdb_monitor_row {
69 struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
70 struct uuid uuid; /* UUID of row that changed. */
71 struct ovsdb_datum *old; /* Old data, NULL for an inserted row. */
72 struct ovsdb_datum *new; /* New data, NULL for a deleted row. */
75 /* Contains 'struct ovsdb_monitor_row's for rows that have been
76 * updated but not yet flushed to all the jsonrpc connection.
78 * 'n_refs' represent the number of jsonrpc connections that have
79 * not received updates. Generate the update for the last jsonprc
80 * connection will also destroy the whole "struct ovsdb_monitor_changes"
83 * 'transaction' stores the first update's transaction id.
85 struct ovsdb_monitor_changes {
86 struct ovsdb_monitor_table *mt;
90 struct hmap_node hmap_node; /* Element in ovsdb_monitor_tables' changes
94 /* A particular table being monitored. */
95 struct ovsdb_monitor_table {
96 const struct ovsdb_table *table;
98 /* This is the union (bitwise-OR) of the 'select' values in all of the
99 * members of 'columns' below. */
100 enum ovsdb_monitor_selection select;
102 /* Columns being monitored. */
103 struct ovsdb_monitor_column *columns;
106 /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
110 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
111 static void ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
113 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
114 struct ovsdb_monitor_table *mt, uint64_t unflushed);
115 static void ovsdb_monitor_changes_destroy(
116 struct ovsdb_monitor_changes *changes);
117 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
121 compare_ovsdb_monitor_column(const void *a_, const void *b_)
123 const struct ovsdb_monitor_column *a = a_;
124 const struct ovsdb_monitor_column *b = b_;
126 return a->column < b->column ? -1 : a->column > b->column;
129 static struct ovsdb_monitor *
130 ovsdb_monitor_cast(struct ovsdb_replica *replica)
132 ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
133 return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
136 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
137 * given 'uuid', or NULL if there is no such row. */
138 static struct ovsdb_monitor_row *
139 ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
140 const struct uuid *uuid)
142 struct ovsdb_monitor_row *row;
144 HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
146 if (uuid_equals(uuid, &row->uuid)) {
153 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
154 * copies of the data in 'row' drawn from the columns represented by
155 * mt->columns[]. Returns the array.
157 * If 'row' is NULL, returns NULL. */
158 static struct ovsdb_datum *
159 clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
160 const struct ovsdb_row *row)
162 struct ovsdb_datum *data;
169 data = xmalloc(mt->n_columns * sizeof *data);
170 for (i = 0; i < mt->n_columns; i++) {
171 const struct ovsdb_column *c = mt->columns[i].column;
172 const struct ovsdb_datum *src = &row->fields[c->index];
173 struct ovsdb_datum *dst = &data[i];
174 const struct ovsdb_type *type = &c->type;
176 ovsdb_datum_clone(dst, src, type);
181 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
182 * in 'row' drawn from the columns represented by mt->columns[]. */
184 update_monitor_row_data(const struct ovsdb_monitor_table *mt,
185 const struct ovsdb_row *row,
186 struct ovsdb_datum *data)
190 for (i = 0; i < mt->n_columns; i++) {
191 const struct ovsdb_column *c = mt->columns[i].column;
192 const struct ovsdb_datum *src = &row->fields[c->index];
193 struct ovsdb_datum *dst = &data[i];
194 const struct ovsdb_type *type = &c->type;
196 if (!ovsdb_datum_equals(src, dst, type)) {
197 ovsdb_datum_destroy(dst, type);
198 ovsdb_datum_clone(dst, src, type);
203 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
204 * from mt->columns[], plus 'data' itself. */
206 free_monitor_row_data(const struct ovsdb_monitor_table *mt,
207 struct ovsdb_datum *data)
212 for (i = 0; i < mt->n_columns; i++) {
213 const struct ovsdb_column *c = mt->columns[i].column;
215 ovsdb_datum_destroy(&data[i], &c->type);
221 /* Frees 'row', which must have been created from 'mt'. */
223 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
224 struct ovsdb_monitor_row *row)
227 free_monitor_row_data(mt, row->old);
228 free_monitor_row_data(mt, row->new);
233 struct ovsdb_monitor *
234 ovsdb_monitor_create(struct ovsdb *db,
235 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
237 struct ovsdb_monitor *dbmon;
238 struct jsonrpc_monitor_node *jm;
240 dbmon = xzalloc(sizeof *dbmon);
242 ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
243 ovsdb_add_replica(db, &dbmon->replica);
244 list_init(&dbmon->jsonrpc_monitors);
246 dbmon->n_transactions = 0;
247 shash_init(&dbmon->tables);
249 jm = xzalloc(sizeof *jm);
250 jm->jsonrpc_monitor = jsonrpc_monitor;
251 list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
257 ovsdb_monitor_add_table(struct ovsdb_monitor *m,
258 const struct ovsdb_table *table)
260 struct ovsdb_monitor_table *mt;
262 mt = xzalloc(sizeof *mt);
264 shash_add(&m->tables, table->schema->name, mt);
265 hmap_init(&mt->changes);
269 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
270 const struct ovsdb_table *table,
271 const struct ovsdb_column *column,
272 enum ovsdb_monitor_selection select,
273 size_t *allocated_columns)
275 struct ovsdb_monitor_table *mt;
276 struct ovsdb_monitor_column *c;
278 mt = shash_find_data(&dbmon->tables, table->schema->name);
280 if (mt->n_columns >= *allocated_columns) {
281 mt->columns = x2nrealloc(mt->columns, allocated_columns,
282 sizeof *mt->columns);
285 mt->select |= select;
286 c = &mt->columns[mt->n_columns++];
291 /* Check for duplicated column names. Return the first
292 * duplicated column's name if found. Otherwise return
294 const char * OVS_WARN_UNUSED_RESULT
295 ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
296 const struct ovsdb_table *table)
298 struct ovsdb_monitor_table *mt;
301 mt = shash_find_data(&m->tables, table->schema->name);
304 /* Check for duplicate columns. */
305 qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
306 compare_ovsdb_monitor_column);
307 for (i = 1; i < mt->n_columns; i++) {
308 if (mt->columns[i].column == mt->columns[i - 1].column) {
309 return mt->columns[i].column->name;
318 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
321 struct ovsdb_monitor_changes *changes;
323 changes = xzalloc(sizeof *changes);
325 changes->transaction = next_txn;
328 hmap_init(&changes->rows);
329 hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
332 static struct ovsdb_monitor_changes *
333 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
334 uint64_t transaction)
336 struct ovsdb_monitor_changes *changes;
337 size_t hash = hash_uint64(transaction);
339 HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
340 if (changes->transaction == transaction) {
348 /* Stop currently tracking changes to table 'mt' since 'transaction'.
350 * Return 'true' if the 'transaction' is being tracked. 'false' otherwise. */
352 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
353 uint64_t transaction)
355 struct ovsdb_monitor_changes *changes =
356 ovsdb_monitor_table_find_changes(mt, transaction);
358 ovs_assert(changes->transaction == transaction);
359 if (--changes->n_refs == 0) {
360 hmap_remove(&mt->changes, &changes->hmap_node);
361 ovsdb_monitor_changes_destroy(changes);
366 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
369 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
370 uint64_t transaction)
372 struct ovsdb_monitor_changes *changes;
374 changes = ovsdb_monitor_table_find_changes(mt, transaction);
378 ovsdb_monitor_table_add_changes(mt, transaction);
383 ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
385 struct ovsdb_monitor_row *row, *next;
387 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
388 hmap_remove(&changes->rows, &row->hmap_node);
389 ovsdb_monitor_row_destroy(changes->mt, row);
391 hmap_destroy(&changes->rows);
395 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
396 * 'mt', or NULL if no row update should be sent.
398 * The caller should specify 'initial' as true if the returned JSON is going to
399 * be used as part of the initial reply to a "monitor" request, false if it is
400 * going to be used as part of an "update" notification.
402 * 'changed' must be a scratch buffer for internal use that is at least
403 * bitmap_n_bytes(mt->n_columns) bytes long. */
405 ovsdb_monitor_compose_row_update(
406 const struct ovsdb_monitor_table *mt,
407 const struct ovsdb_monitor_row *row,
408 bool initial, unsigned long int *changed)
410 enum ovsdb_monitor_selection type;
411 struct json *old_json, *new_json;
412 struct json *row_json;
415 type = (initial ? OJMS_INITIAL
416 : !row->old ? OJMS_INSERT
417 : !row->new ? OJMS_DELETE
419 if (!(mt->select & type)) {
423 if (type == OJMS_MODIFY) {
427 memset(changed, 0, bitmap_n_bytes(mt->n_columns));
428 for (i = 0; i < mt->n_columns; i++) {
429 const struct ovsdb_column *c = mt->columns[i].column;
430 if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
431 bitmap_set1(changed, i);
436 /* No actual changes: presumably a row changed and then
437 * changed back later. */
442 row_json = json_object_create();
443 old_json = new_json = NULL;
444 if (type & (OJMS_DELETE | OJMS_MODIFY)) {
445 old_json = json_object_create();
446 json_object_put(row_json, "old", old_json);
448 if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
449 new_json = json_object_create();
450 json_object_put(row_json, "new", new_json);
452 for (i = 0; i < mt->n_columns; i++) {
453 const struct ovsdb_monitor_column *c = &mt->columns[i];
455 if (!(type & c->select)) {
456 /* We don't care about this type of change for this
457 * particular column (but we will care about it for some
462 if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
463 || type == OJMS_DELETE) {
464 json_object_put(old_json, c->column->name,
465 ovsdb_datum_to_json(&row->old[i],
468 if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
469 json_object_put(new_json, c->column->name,
470 ovsdb_datum_to_json(&row->new[i],
478 /* Constructs and returns JSON for a <table-updates> object (as described in
479 * RFC 7047) for all the outstanding changes within 'monitor', and deletes all
480 * the outstanding changes from 'monitor'. Returns NULL if no update needs to
483 * The caller should specify 'initial' as true if the returned JSON is going to
484 * be used as part of the initial reply to a "monitor" request, false if it is
485 * going to be used as part of an "update" notification.
487 * 'unflushed' should point to value that is the transaction ID that did
488 * was not updated. The update contains changes between
489 * ['unflushed, ovsdb->n_transcations]. Before the function returns, this
490 * value will be updated to ovsdb->n_transactions + 1, ready for the next
493 ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
494 bool initial, uint64_t *unflushed)
496 struct shash_node *node;
497 unsigned long int *changed;
500 uint64_t prev_txn = *unflushed;
501 uint64_t next_txn = dbmon->n_transactions + 1;
504 SHASH_FOR_EACH (node, &dbmon->tables) {
505 struct ovsdb_monitor_table *mt = node->data;
507 max_columns = MAX(max_columns, mt->n_columns);
509 changed = xmalloc(bitmap_n_bytes(max_columns));
512 SHASH_FOR_EACH (node, &dbmon->tables) {
513 struct ovsdb_monitor_table *mt = node->data;
514 struct ovsdb_monitor_row *row, *next;
515 struct ovsdb_monitor_changes *changes;
516 struct json *table_json = NULL;
518 changes = ovsdb_monitor_table_find_changes(mt, prev_txn);
520 ovsdb_monitor_table_track_changes(mt, next_txn);
524 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
525 struct json *row_json;
527 row_json = ovsdb_monitor_compose_row_update(
528 mt, row, initial, changed);
530 char uuid[UUID_LEN + 1];
532 /* Create JSON object for transaction overall. */
534 json = json_object_create();
537 /* Create JSON object for transaction on this table. */
539 table_json = json_object_create();
540 json_object_put(json, mt->table->schema->name, table_json);
543 /* Add JSON row to JSON table. */
544 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
545 json_object_put(table_json, uuid, row_json);
548 hmap_remove(&changes->rows, &row->hmap_node);
549 ovsdb_monitor_row_destroy(mt, row);
552 ovsdb_monitor_table_untrack_changes(mt, prev_txn);
553 ovsdb_monitor_table_track_changes(mt, next_txn);
556 *unflushed = next_txn;
562 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
563 uint64_t next_transaction)
565 ovs_assert(next_transaction <= dbmon->n_transactions + 1);
566 return (next_transaction <= dbmon->n_transactions);
570 ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
571 const struct ovsdb_table *table,
572 enum ovsdb_monitor_selection select)
574 struct ovsdb_monitor_table * mt;
576 mt = shash_find_data(&dbmon->tables, table->schema->name);
577 mt->select |= select;
580 struct ovsdb_monitor_aux {
581 const struct ovsdb_monitor *monitor;
582 struct ovsdb_monitor_table *mt;
586 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
587 const struct ovsdb_monitor *m)
594 ovsdb_monitor_changes_update(const struct ovsdb_row *old,
595 const struct ovsdb_row *new,
596 const struct ovsdb_monitor_table *mt,
597 struct ovsdb_monitor_changes *changes)
599 const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
600 struct ovsdb_monitor_row *change;
602 change = ovsdb_monitor_changes_row_find(changes, uuid);
604 change = xzalloc(sizeof *change);
605 hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
606 change->uuid = *uuid;
607 change->old = clone_monitor_row_data(mt, old);
608 change->new = clone_monitor_row_data(mt, new);
611 update_monitor_row_data(mt, new, change->new);
613 free_monitor_row_data(mt, change->new);
617 /* This row was added then deleted. Forget about it. */
618 hmap_remove(&changes->rows, &change->hmap_node);
626 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
627 const struct ovsdb_row *new,
628 const unsigned long int *changed OVS_UNUSED,
631 struct ovsdb_monitor_aux *aux = aux_;
632 const struct ovsdb_monitor *m = aux->monitor;
633 struct ovsdb_table *table = new ? new->table : old->table;
634 struct ovsdb_monitor_table *mt;
635 struct ovsdb_monitor_changes *changes;
637 if (!aux->mt || table != aux->mt->table) {
638 aux->mt = shash_find_data(&m->tables, table->schema->name);
640 /* We don't care about rows in this table at all. Tell the caller
647 HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
648 ovsdb_monitor_changes_update(old, new, mt, changes);
654 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
656 struct ovsdb_monitor_aux aux;
657 struct shash_node *node;
659 ovsdb_monitor_init_aux(&aux, dbmon);
660 SHASH_FOR_EACH (node, &dbmon->tables) {
661 struct ovsdb_monitor_table *mt = node->data;
663 if (mt->select & OJMS_INITIAL) {
664 struct ovsdb_row *row;
666 if (hmap_is_empty(&mt->changes)) {
667 ovsdb_monitor_table_add_changes(mt, 0);
670 HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
671 ovsdb_monitor_change_cb(NULL, row, NULL, &aux);
678 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
679 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
681 struct jsonrpc_monitor_node *jm;
683 /* Find and remove the jsonrpc monitor from the list. */
684 LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
685 if (jm->jsonrpc_monitor == jsonrpc_monitor) {
686 list_remove(&jm->node);
689 /* Destroy ovsdb monitor if this is the last user. */
690 if (list_is_empty(&dbmon->jsonrpc_monitors)) {
691 ovsdb_monitor_destroy(dbmon);
698 /* Should never reach here. jsonrpc_monitor should be on the list. */
703 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
705 struct shash_node *node;
707 list_remove(&dbmon->replica.node);
709 SHASH_FOR_EACH (node, &dbmon->tables) {
710 struct ovsdb_monitor_table *mt = node->data;
711 struct ovsdb_monitor_changes *changes, *next;
713 HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
714 hmap_remove(&mt->changes, &changes->hmap_node);
715 ovsdb_monitor_changes_destroy(changes);
720 shash_destroy(&dbmon->tables);
724 static struct ovsdb_error *
725 ovsdb_monitor_commit(struct ovsdb_replica *replica,
726 const struct ovsdb_txn *txn,
727 bool durable OVS_UNUSED)
729 struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
730 struct ovsdb_monitor_aux aux;
732 ovsdb_monitor_init_aux(&aux, m);
733 ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
740 ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
742 struct ovsdb_monitor *dbmon = ovsdb_monitor_cast(replica);
743 struct jsonrpc_monitor_node *jm, *next;
745 /* Delete all front end monitors. Removing the last front
746 * end monitor will also destroy the corresponding 'ovsdb_monitor'.
747 * ovsdb monitor will also be destroied. */
748 LIST_FOR_EACH_SAFE(jm, next, node, &dbmon->jsonrpc_monitors) {
749 ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor);
753 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
754 ovsdb_monitor_commit,
755 ovsdb_monitor_destroy_callback,