2 * Copyright (c) 2015 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 #include "openvswitch/dynamic-string.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
30 #include "condition.h"
36 #include "transaction.h"
37 #include "jsonrpc-server.h"
39 #include "openvswitch/vlog.h"
41 VLOG_DEFINE_THIS_MODULE(ovsdb_monitor);
43 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
44 static struct hmap ovsdb_monitors = HMAP_INITIALIZER(&ovsdb_monitors);
46 /* Keep state of session's conditions */
47 struct ovsdb_monitor_session_condition {
50 struct shash tables; /* Contains
51 * "struct ovsdb_monitor_table_condition *"s. */
54 /* Monitored table session's conditions */
55 struct ovsdb_monitor_table_condition {
56 const struct ovsdb_table *table;
57 struct ovsdb_monitor_table *mt;
58 struct ovsdb_condition old_condition;
59 struct ovsdb_condition new_condition;
64 * ovsdb_monitor keep track of the ovsdb changes.
67 /* A collection of tables being monitored. */
68 struct ovsdb_monitor {
69 struct ovsdb_replica replica;
70 struct shash tables; /* Holds "struct ovsdb_monitor_table"s. */
71 struct ovs_list jsonrpc_monitors; /* Contains "jsonrpc_monitor_node"s. */
73 uint64_t n_transactions; /* Count number of committed transactions. */
74 struct hmap_node hmap_node; /* Elements within ovsdb_monitors. */
75 struct hmap json_cache; /* Contains "ovsdb_monitor_json_cache_node"s.*/
78 /* A json object of updates between 'from_txn' and 'dbmon->n_transactions'
80 struct ovsdb_monitor_json_cache_node {
81 struct hmap_node hmap_node; /* Elements in json cache. */
82 enum ovsdb_monitor_version version;
84 struct json *json; /* Null, or a cloned of json */
87 struct jsonrpc_monitor_node {
88 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
92 /* A particular column being monitored. */
93 struct ovsdb_monitor_column {
94 const struct ovsdb_column *column;
95 enum ovsdb_monitor_selection select;
99 /* A row that has changed in a monitored table. */
100 struct ovsdb_monitor_row {
101 struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
102 struct uuid uuid; /* UUID of row that changed. */
103 struct ovsdb_datum *old; /* Old data, NULL for an inserted row. */
104 struct ovsdb_datum *new; /* New data, NULL for a deleted row. */
107 /* Contains 'struct ovsdb_monitor_row's for rows that have been
108 * updated but not yet flushed to all the jsonrpc connection.
110 * 'n_refs' represent the number of jsonrpc connections that have
111 * not received updates. Generate the update for the last jsonprc
112 * connection will also destroy the whole "struct ovsdb_monitor_changes"
115 * 'transaction' stores the first update's transaction id.
117 struct ovsdb_monitor_changes {
118 struct ovsdb_monitor_table *mt;
121 uint64_t transaction;
122 struct hmap_node hmap_node; /* Element in ovsdb_monitor_tables' changes
126 /* A particular table being monitored. */
127 struct ovsdb_monitor_table {
128 const struct ovsdb_table *table;
130 /* This is the union (bitwise-OR) of the 'select' values in all of the
131 * members of 'columns' below. */
132 enum ovsdb_monitor_selection select;
134 /* Columns being monitored. */
135 struct ovsdb_monitor_column *columns;
137 size_t n_monitored_columns;
138 size_t allocated_columns;
140 /* Columns in ovsdb_monitor_row have different indexes then in
141 * ovsdb_row. This field maps between column->index to the index in the
142 * ovsdb_monitor_row. It is used for condition evaluation. */
143 unsigned int *columns_index_map;
145 /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
149 typedef struct json *
150 (*compose_row_update_cb_func)
151 (const struct ovsdb_monitor_table *mt,
152 const struct ovsdb_monitor_session_condition * condition,
153 const struct ovsdb_monitor_row *row,
154 bool initial, unsigned long int *changed);
156 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
157 static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
158 struct ovsdb_monitor_table *mt, uint64_t next_txn);
159 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
160 struct ovsdb_monitor_table *mt, uint64_t unflushed);
161 static void ovsdb_monitor_changes_destroy(
162 struct ovsdb_monitor_changes *changes);
163 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
167 json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn)
171 hash = hash_uint64(version);
172 hash = hash_uint64_basis(from_txn, hash);
177 static struct ovsdb_monitor_json_cache_node *
178 ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
179 enum ovsdb_monitor_version version,
182 struct ovsdb_monitor_json_cache_node *node;
183 uint32_t hash = json_cache_hash(version, from_txn);
185 HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) {
186 if (node->from_txn == from_txn && node->version == version) {
195 ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon,
196 enum ovsdb_monitor_version version,
197 uint64_t from_txn, struct json *json)
199 struct ovsdb_monitor_json_cache_node *node;
200 uint32_t hash = json_cache_hash(version, from_txn);
202 node = xmalloc(sizeof *node);
204 node->version = version;
205 node->from_txn = from_txn;
206 node->json = json ? json_clone(json) : NULL;
208 hmap_insert(&dbmon->json_cache, &node->hmap_node, hash);
212 ovsdb_monitor_json_cache_flush(struct ovsdb_monitor *dbmon)
214 struct ovsdb_monitor_json_cache_node *node;
216 HMAP_FOR_EACH_POP(node, hmap_node, &dbmon->json_cache) {
217 json_destroy(node->json);
223 compare_ovsdb_monitor_column(const void *a_, const void *b_)
225 const struct ovsdb_monitor_column *a = a_;
226 const struct ovsdb_monitor_column *b = b_;
228 /* put all monitored columns at the begining */
229 if (a->monitored != b->monitored) {
230 return a->monitored ? -1 : 1;
233 return a->column < b->column ? -1 : a->column > b->column;
236 static struct ovsdb_monitor *
237 ovsdb_monitor_cast(struct ovsdb_replica *replica)
239 ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
240 return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
243 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
244 * given 'uuid', or NULL if there is no such row. */
245 static struct ovsdb_monitor_row *
246 ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
247 const struct uuid *uuid)
249 struct ovsdb_monitor_row *row;
251 HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
253 if (uuid_equals(uuid, &row->uuid)) {
260 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
261 * copies of the data in 'row' drawn from the columns represented by
262 * mt->columns[]. Returns the array.
264 * If 'row' is NULL, returns NULL. */
265 static struct ovsdb_datum *
266 clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
267 const struct ovsdb_row *row)
269 struct ovsdb_datum *data;
276 data = xmalloc(mt->n_columns * sizeof *data);
277 for (i = 0; i < mt->n_columns; i++) {
278 const struct ovsdb_column *c = mt->columns[i].column;
279 const struct ovsdb_datum *src = &row->fields[c->index];
280 struct ovsdb_datum *dst = &data[i];
281 const struct ovsdb_type *type = &c->type;
283 ovsdb_datum_clone(dst, src, type);
288 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
289 * in 'row' drawn from the columns represented by mt->columns[]. */
291 update_monitor_row_data(const struct ovsdb_monitor_table *mt,
292 const struct ovsdb_row *row,
293 struct ovsdb_datum *data)
297 for (i = 0; i < mt->n_columns; i++) {
298 const struct ovsdb_column *c = mt->columns[i].column;
299 const struct ovsdb_datum *src = &row->fields[c->index];
300 struct ovsdb_datum *dst = &data[i];
301 const struct ovsdb_type *type = &c->type;
303 if (!ovsdb_datum_equals(src, dst, type)) {
304 ovsdb_datum_destroy(dst, type);
305 ovsdb_datum_clone(dst, src, type);
310 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
311 * from mt->columns[], plus 'data' itself. */
313 free_monitor_row_data(const struct ovsdb_monitor_table *mt,
314 struct ovsdb_datum *data)
319 for (i = 0; i < mt->n_columns; i++) {
320 const struct ovsdb_column *c = mt->columns[i].column;
322 ovsdb_datum_destroy(&data[i], &c->type);
328 /* Frees 'row', which must have been created from 'mt'. */
330 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
331 struct ovsdb_monitor_row *row)
334 free_monitor_row_data(mt, row->old);
335 free_monitor_row_data(mt, row->new);
341 ovsdb_monitor_columns_sort(struct ovsdb_monitor *dbmon)
344 struct shash_node *node;
346 SHASH_FOR_EACH (node, &dbmon->tables) {
347 struct ovsdb_monitor_table *mt = node->data;
349 qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
350 compare_ovsdb_monitor_column);
351 for (i = 0; i < mt->n_columns; i++) {
352 /* re-set index map due to sort */
353 mt->columns_index_map[mt->columns[i].column->index] = i;
359 ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
360 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
362 struct jsonrpc_monitor_node *jm;
364 jm = xzalloc(sizeof *jm);
365 jm->jsonrpc_monitor = jsonrpc_monitor;
366 ovs_list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
369 struct ovsdb_monitor *
370 ovsdb_monitor_create(struct ovsdb *db,
371 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
373 struct ovsdb_monitor *dbmon;
375 dbmon = xzalloc(sizeof *dbmon);
377 ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
378 ovsdb_add_replica(db, &dbmon->replica);
379 ovs_list_init(&dbmon->jsonrpc_monitors);
381 dbmon->n_transactions = 0;
382 shash_init(&dbmon->tables);
383 hmap_node_nullify(&dbmon->hmap_node);
384 hmap_init(&dbmon->json_cache);
386 ovsdb_monitor_add_jsonrpc_monitor(dbmon, jsonrpc_monitor);
391 ovsdb_monitor_add_table(struct ovsdb_monitor *m,
392 const struct ovsdb_table *table)
394 struct ovsdb_monitor_table *mt;
396 size_t n_columns = shash_count(&table->schema->columns);
398 mt = xzalloc(sizeof *mt);
400 shash_add(&m->tables, table->schema->name, mt);
401 hmap_init(&mt->changes);
402 mt->columns_index_map =
403 xmalloc(sizeof *mt->columns_index_map * n_columns);
404 for (i = 0; i < n_columns; i++) {
405 mt->columns_index_map[i] = -1;
410 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
411 const struct ovsdb_table *table,
412 const struct ovsdb_column *column,
413 enum ovsdb_monitor_selection select,
416 struct ovsdb_monitor_table *mt;
417 struct ovsdb_monitor_column *c;
419 mt = shash_find_data(&dbmon->tables, table->schema->name);
421 /* Check for column duplication. Return duplicated column name. */
422 if (mt->columns_index_map[column->index] != -1) {
426 if (mt->n_columns >= mt->allocated_columns) {
427 mt->columns = x2nrealloc(mt->columns, &mt->allocated_columns,
428 sizeof *mt->columns);
431 mt->select |= select;
432 mt->columns_index_map[column->index] = mt->n_columns;
433 c = &mt->columns[mt->n_columns++];
436 c->monitored = monitored;
438 mt->n_monitored_columns++;
445 ovsdb_monitor_condition_add_columns(struct ovsdb_monitor *dbmon,
446 const struct ovsdb_table *table,
447 struct ovsdb_condition *condition)
451 const struct ovsdb_column **columns =
452 ovsdb_condition_get_columns(condition, &n_columns);
454 for (i = 0; i < n_columns; i++) {
455 ovsdb_monitor_add_column(dbmon, table, columns[i],
462 /* Bind this session's condition to ovsdb_monitor */
464 ovsdb_monitor_condition_bind(struct ovsdb_monitor *dbmon,
465 struct ovsdb_monitor_session_condition *cond)
467 struct shash_node *node;
469 SHASH_FOR_EACH(node, &cond->tables) {
470 struct ovsdb_monitor_table_condition *mtc = node->data;
471 struct ovsdb_monitor_table *mt =
472 shash_find_data(&dbmon->tables, mtc->table->schema->name);
475 ovsdb_monitor_condition_add_columns(dbmon, mtc->table,
476 &mtc->new_condition);
480 static struct ovsdb_monitor_changes *
481 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
484 struct ovsdb_monitor_changes *changes;
486 changes = xzalloc(sizeof *changes);
488 changes->transaction = next_txn;
491 hmap_init(&changes->rows);
492 hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
497 static struct ovsdb_monitor_changes *
498 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
499 uint64_t transaction)
501 struct ovsdb_monitor_changes *changes;
502 size_t hash = hash_uint64(transaction);
504 HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
505 if (changes->transaction == transaction) {
513 /* Stop currently tracking changes to table 'mt' since 'transaction'. */
515 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
516 uint64_t transaction)
518 struct ovsdb_monitor_changes *changes =
519 ovsdb_monitor_table_find_changes(mt, transaction);
521 if (--changes->n_refs == 0) {
522 hmap_remove(&mt->changes, &changes->hmap_node);
523 ovsdb_monitor_changes_destroy(changes);
528 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
531 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
532 uint64_t transaction)
534 struct ovsdb_monitor_changes *changes;
536 changes = ovsdb_monitor_table_find_changes(mt, transaction);
540 ovsdb_monitor_table_add_changes(mt, transaction);
545 ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
547 struct ovsdb_monitor_row *row, *next;
549 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
550 hmap_remove(&changes->rows, &row->hmap_node);
551 ovsdb_monitor_row_destroy(changes->mt, row);
553 hmap_destroy(&changes->rows);
557 static enum ovsdb_monitor_selection
558 ovsdb_monitor_row_update_type(bool initial, const bool old, const bool new)
560 return initial ? OJMS_INITIAL
566 /* Set conditional monitoring mode only if we have non-empty condition in one
567 * of the tables at least */
569 ovsdb_monitor_session_condition_set_mode(
570 struct ovsdb_monitor_session_condition *cond)
572 cond->conditional = shash_count(&cond->tables) !=
576 /* Returnes an empty allocated session's condition state holder */
577 struct ovsdb_monitor_session_condition *
578 ovsdb_monitor_session_condition_create(void)
580 struct ovsdb_monitor_session_condition *condition =
581 xzalloc(sizeof *condition);
583 condition->conditional = false;
584 shash_init(&condition->tables);
589 ovsdb_monitor_session_condition_destroy(
590 struct ovsdb_monitor_session_condition *condition)
592 struct shash_node *node, *next;
598 SHASH_FOR_EACH_SAFE (node, next, &condition->tables) {
599 struct ovsdb_monitor_table_condition *mtc = node->data;
601 ovsdb_condition_destroy(&mtc->new_condition);
602 ovsdb_condition_destroy(&mtc->old_condition);
603 shash_delete(&condition->tables, node);
610 ovsdb_monitor_table_condition_create(
611 struct ovsdb_monitor_session_condition *condition,
612 const struct ovsdb_table *table,
613 const struct json *json_cnd)
615 struct ovsdb_monitor_table_condition *mtc;
616 struct ovsdb_error *error;
618 mtc = xzalloc(sizeof *mtc);
620 ovsdb_condition_init(&mtc->old_condition);
621 ovsdb_condition_init(&mtc->new_condition);
624 error = ovsdb_condition_from_json(table->schema,
627 &mtc->old_condition);
634 shash_add(&condition->tables, table->schema->name, mtc);
635 /* On session startup old == new condition */
636 ovsdb_condition_clone(&mtc->new_condition, &mtc->old_condition);
637 if (ovsdb_condition_is_true(&mtc->old_condition)) {
638 condition->n_true_cnd++;
639 ovsdb_monitor_session_condition_set_mode(condition);
646 ovsdb_monitor_get_table_conditions(
647 const struct ovsdb_monitor_table *mt,
648 const struct ovsdb_monitor_session_condition *condition,
649 struct ovsdb_condition **old_condition,
650 struct ovsdb_condition **new_condition)
656 struct ovsdb_monitor_table_condition *mtc =
657 shash_find_data(&condition->tables, mt->table->schema->name);
662 *old_condition = &mtc->old_condition;
663 *new_condition = &mtc->new_condition;
668 static enum ovsdb_monitor_selection
669 ovsdb_monitor_row_update_type_condition(
670 const struct ovsdb_monitor_table *mt,
671 const struct ovsdb_monitor_session_condition *condition,
673 const struct ovsdb_datum *old,
674 const struct ovsdb_datum *new)
676 struct ovsdb_condition *old_condition, *new_condition;
677 enum ovsdb_monitor_selection type =
678 ovsdb_monitor_row_update_type(initial, old, new);
680 if (ovsdb_monitor_get_table_conditions(mt,
684 bool old_cond = !old ? false
685 : ovsdb_condition_empty_or_match_any(old,
687 mt->columns_index_map);
688 bool new_cond = !new ? false
689 : ovsdb_condition_empty_or_match_any(new,
691 mt->columns_index_map);
693 if (!old_cond && !new_cond) {
705 type = !old_cond ? OJMS_INSERT : !new_cond
706 ? OJMS_DELETE : OJMS_MODIFY;
721 ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
722 const struct ovsdb_monitor_row *row,
723 enum ovsdb_monitor_selection type,
724 unsigned long int *changed)
726 if (!(mt->select & type)) {
730 if (type == OJMS_MODIFY) {
734 memset(changed, 0, bitmap_n_bytes(mt->n_columns));
735 for (i = 0; i < mt->n_columns; i++) {
736 const struct ovsdb_column *c = mt->columns[i].column;
737 if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
738 bitmap_set1(changed, i);
743 /* No actual changes: presumably a row changed and then
744 * changed back later. */
752 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
753 * 'mt', or NULL if no row update should be sent.
755 * The caller should specify 'initial' as true if the returned JSON is going to
756 * be used as part of the initial reply to a "monitor" request, false if it is
757 * going to be used as part of an "update" notification.
759 * 'changed' must be a scratch buffer for internal use that is at least
760 * bitmap_n_bytes(mt->n_columns) bytes long. */
762 ovsdb_monitor_compose_row_update(
763 const struct ovsdb_monitor_table *mt,
764 const struct ovsdb_monitor_session_condition *condition OVS_UNUSED,
765 const struct ovsdb_monitor_row *row,
766 bool initial, unsigned long int *changed)
768 enum ovsdb_monitor_selection type;
769 struct json *old_json, *new_json;
770 struct json *row_json;
773 type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
774 if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
778 row_json = json_object_create();
779 old_json = new_json = NULL;
780 if (type & (OJMS_DELETE | OJMS_MODIFY)) {
781 old_json = json_object_create();
782 json_object_put(row_json, "old", old_json);
784 if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
785 new_json = json_object_create();
786 json_object_put(row_json, "new", new_json);
788 for (i = 0; i < mt->n_monitored_columns; i++) {
789 const struct ovsdb_monitor_column *c = &mt->columns[i];
791 if (!c->monitored || !(type & c->select)) {
792 /* We don't care about this type of change for this
793 * particular column (but we will care about it for some
798 if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
799 || type == OJMS_DELETE) {
800 json_object_put(old_json, c->column->name,
801 ovsdb_datum_to_json(&row->old[i],
804 if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
805 json_object_put(new_json, c->column->name,
806 ovsdb_datum_to_json(&row->new[i],
814 /* Returns JSON for a <row-update2> (as described in ovsdb-server(1) mapage)
815 * for 'row' within * 'mt', or NULL if no row update should be sent.
817 * The caller should specify 'initial' as true if the returned JSON is
818 * going to be used as part of the initial reply to a "monitor2" request,
819 * false if it is going to be used as part of an "update2" notification.
821 * 'changed' must be a scratch buffer for internal use that is at least
822 * bitmap_n_bytes(mt->n_columns) bytes long. */
824 ovsdb_monitor_compose_row_update2(
825 const struct ovsdb_monitor_table *mt,
826 const struct ovsdb_monitor_session_condition *condition,
827 const struct ovsdb_monitor_row *row,
828 bool initial, unsigned long int *changed)
830 enum ovsdb_monitor_selection type;
831 struct json *row_update2, *diff_json;
834 type = ovsdb_monitor_row_update_type_condition(mt, condition, initial,
836 if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
840 row_update2 = json_object_create();
841 if (type == OJMS_DELETE) {
842 json_object_put(row_update2, "delete", json_null_create());
844 diff_json = json_object_create();
847 for (i = 0; i < mt->n_monitored_columns; i++) {
848 const struct ovsdb_monitor_column *c = &mt->columns[i];
850 if (!c->monitored || !(type & c->select)) {
851 /* We don't care about this type of change for this
852 * particular column (but we will care about it for some
857 if (type == OJMS_MODIFY) {
858 struct ovsdb_datum diff;
860 if (!bitmap_is_set(changed, i)) {
864 ovsdb_datum_diff(&diff ,&row->old[i], &row->new[i],
866 json_object_put(diff_json, c->column->name,
867 ovsdb_datum_to_json(&diff, &c->column->type));
868 ovsdb_datum_destroy(&diff, &c->column->type);
870 if (!ovsdb_datum_is_default(&row->new[i], &c->column->type)) {
871 json_object_put(diff_json, c->column->name,
872 ovsdb_datum_to_json(&row->new[i],
878 op = type == OJMS_INITIAL ? "initial"
879 : type == OJMS_MODIFY ? "modify" : "insert";
880 json_object_put(row_update2, op, diff_json);
887 ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
889 struct shash_node *node;
890 size_t max_columns = 0;
892 SHASH_FOR_EACH (node, &dbmon->tables) {
893 struct ovsdb_monitor_table *mt = node->data;
895 max_columns = MAX(max_columns, mt->n_columns);
901 /* Constructs and returns JSON for a <table-updates> object (as described in
902 * RFC 7047) for all the outstanding changes within 'monitor', starting from
905 ovsdb_monitor_compose_update(
906 struct ovsdb_monitor *dbmon,
907 bool initial, uint64_t transaction,
908 const struct ovsdb_monitor_session_condition *condition,
909 compose_row_update_cb_func row_update)
911 struct shash_node *node;
913 size_t max_columns = ovsdb_monitor_max_columns(dbmon);
914 unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
917 SHASH_FOR_EACH (node, &dbmon->tables) {
918 struct ovsdb_monitor_table *mt = node->data;
919 struct ovsdb_monitor_row *row, *next;
920 struct ovsdb_monitor_changes *changes;
921 struct json *table_json = NULL;
923 changes = ovsdb_monitor_table_find_changes(mt, transaction);
928 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
929 struct json *row_json;
931 row_json = (*row_update)(mt, condition, row, initial, changed);
933 char uuid[UUID_LEN + 1];
935 /* Create JSON object for transaction overall. */
937 json = json_object_create();
940 /* Create JSON object for transaction on this table. */
942 table_json = json_object_create();
943 json_object_put(json, mt->table->schema->name, table_json);
946 /* Add JSON row to JSON table. */
947 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
948 json_object_put(table_json, uuid, row_json);
957 /* Returns JSON for a <table-updates> object (as described in RFC 7047)
958 * for all the outstanding changes within 'monitor' that starts from
959 * '*unflushed' transaction id.
961 * The caller should specify 'initial' as true if the returned JSON is going to
962 * be used as part of the initial reply to a "monitor" request, false if it is
963 * going to be used as part of an "update" notification. */
965 ovsdb_monitor_get_update(
966 struct ovsdb_monitor *dbmon,
967 bool initial, uint64_t *unflushed_,
968 const struct ovsdb_monitor_session_condition *condition,
969 enum ovsdb_monitor_version version)
971 struct ovsdb_monitor_json_cache_node *cache_node = NULL;
972 struct shash_node *node;
974 const uint64_t unflushed = *unflushed_;
975 const uint64_t next_unflushed = dbmon->n_transactions + 1;
977 /* Return a clone of cached json if one exists. Otherwise,
978 * generate a new one and add it to the cache. */
979 if (!condition || !condition->conditional) {
980 cache_node = ovsdb_monitor_json_cache_search(dbmon, version,
984 json = cache_node->json ? json_clone(cache_node->json) : NULL;
986 if (version == OVSDB_MONITOR_V1) {
988 ovsdb_monitor_compose_update(dbmon, initial, unflushed,
990 ovsdb_monitor_compose_row_update);
992 ovs_assert(version == OVSDB_MONITOR_V2);
994 ovsdb_monitor_compose_update(dbmon, initial, unflushed,
996 ovsdb_monitor_compose_row_update2);
998 if (!condition || !condition->conditional) {
999 ovsdb_monitor_json_cache_insert(dbmon, version, unflushed, json);
1003 /* Maintain transaction id of 'changes'. */
1004 SHASH_FOR_EACH (node, &dbmon->tables) {
1005 struct ovsdb_monitor_table *mt = node->data;
1007 ovsdb_monitor_table_untrack_changes(mt, unflushed);
1008 ovsdb_monitor_table_track_changes(mt, next_unflushed);
1010 *unflushed_ = next_unflushed;
1016 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
1017 uint64_t next_transaction)
1019 ovs_assert(next_transaction <= dbmon->n_transactions + 1);
1020 return (next_transaction <= dbmon->n_transactions);
1024 ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
1025 const struct ovsdb_table *table,
1026 enum ovsdb_monitor_selection select)
1028 struct ovsdb_monitor_table * mt;
1030 mt = shash_find_data(&dbmon->tables, table->schema->name);
1031 mt->select |= select;
1035 * If a row's change type (insert, delete or modify) matches that of
1036 * the monitor, they should be sent to the monitor's clients as updates.
1037 * Of cause, the monitor should also internally update with this change.
1039 * When a change type does not require client side update, the monitor
1040 * may still need to keep track of certain changes in order to generate
1041 * correct future updates. For example, the monitor internal state should
1042 * be updated whenever a new row is inserted, in order to generate the
1043 * correct initial state, regardless if a insert change type is being
1046 * On the other hand, if a transaction only contains changes to columns
1047 * that are not monitored, this transaction can be safely ignored by the
1050 * Thus, the order of the declaration is important:
1051 * 'OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE' always implies
1052 * 'OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE', but not vice versa. */
1053 enum ovsdb_monitor_changes_efficacy {
1054 OVSDB_CHANGES_NO_EFFECT, /* Monitor does not care about this
1056 OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE, /* Monitor internal updates. */
1057 OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE, /* Client needs to be updated. */
1060 struct ovsdb_monitor_aux {
1061 const struct ovsdb_monitor *monitor;
1062 struct ovsdb_monitor_table *mt;
1063 enum ovsdb_monitor_changes_efficacy efficacy;
1067 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
1068 const struct ovsdb_monitor *m)
1072 aux->efficacy = OVSDB_CHANGES_NO_EFFECT;
1076 ovsdb_monitor_changes_update(const struct ovsdb_row *old,
1077 const struct ovsdb_row *new,
1078 const struct ovsdb_monitor_table *mt,
1079 struct ovsdb_monitor_changes *changes)
1081 const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
1082 struct ovsdb_monitor_row *change;
1084 change = ovsdb_monitor_changes_row_find(changes, uuid);
1086 change = xzalloc(sizeof *change);
1087 hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
1088 change->uuid = *uuid;
1089 change->old = clone_monitor_row_data(mt, old);
1090 change->new = clone_monitor_row_data(mt, new);
1093 update_monitor_row_data(mt, new, change->new);
1095 free_monitor_row_data(mt, change->new);
1099 /* This row was added then deleted. Forget about it. */
1100 hmap_remove(&changes->rows, &change->hmap_node);
1108 ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
1109 const unsigned long int *changed)
1113 for (i = 0; i < mt->n_columns; i++) {
1114 size_t column_index = mt->columns[i].column->index;
1116 if (bitmap_is_set(changed, column_index)) {
1124 /* Return the efficacy of a row's change to a monitor table.
1126 * Please see the block comment above 'ovsdb_monitor_changes_efficacy'
1127 * definition form more information. */
1128 static enum ovsdb_monitor_changes_efficacy
1129 ovsdb_monitor_changes_classify(enum ovsdb_monitor_selection type,
1130 const struct ovsdb_monitor_table *mt,
1131 const unsigned long int *changed)
1133 if (type == OJMS_MODIFY &&
1134 !ovsdb_monitor_columns_changed(mt, changed)) {
1135 return OVSDB_CHANGES_NO_EFFECT;
1138 if (type == OJMS_MODIFY) {
1139 /* Condition might turn a modify operation to insert or delete */
1140 type |= OJMS_INSERT | OJMS_DELETE;
1143 return (mt->select & type)
1144 ? OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
1145 : OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
1149 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
1150 const struct ovsdb_row *new,
1151 const unsigned long int *changed,
1154 struct ovsdb_monitor_aux *aux = aux_;
1155 const struct ovsdb_monitor *m = aux->monitor;
1156 struct ovsdb_table *table = new ? new->table : old->table;
1157 struct ovsdb_monitor_table *mt;
1158 struct ovsdb_monitor_changes *changes;
1160 if (!aux->mt || table != aux->mt->table) {
1161 aux->mt = shash_find_data(&m->tables, table->schema->name);
1163 /* We don't care about rows in this table at all. Tell the caller
1170 enum ovsdb_monitor_selection type =
1171 ovsdb_monitor_row_update_type(false, old, new);
1172 enum ovsdb_monitor_changes_efficacy efficacy =
1173 ovsdb_monitor_changes_classify(type, mt, changed);
1175 HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
1176 if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
1177 ovsdb_monitor_changes_update(old, new, mt, changes);
1180 if (aux->efficacy < efficacy) {
1181 aux->efficacy = efficacy;
1188 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
1190 struct ovsdb_monitor_aux aux;
1191 struct shash_node *node;
1193 ovsdb_monitor_init_aux(&aux, dbmon);
1194 SHASH_FOR_EACH (node, &dbmon->tables) {
1195 struct ovsdb_monitor_table *mt = node->data;
1197 if (mt->select & OJMS_INITIAL) {
1198 struct ovsdb_row *row;
1199 struct ovsdb_monitor_changes *changes;
1201 changes = ovsdb_monitor_table_find_changes(mt, 0);
1203 changes = ovsdb_monitor_table_add_changes(mt, 0);
1204 HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1205 ovsdb_monitor_changes_update(NULL, row, mt, changes);
1215 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
1216 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
1219 struct jsonrpc_monitor_node *jm;
1221 if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
1222 ovsdb_monitor_destroy(dbmon);
1226 /* Find and remove the jsonrpc monitor from the list. */
1227 LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
1228 if (jm->jsonrpc_monitor == jsonrpc_monitor) {
1229 /* Release the tracked changes. */
1230 struct shash_node *node;
1231 SHASH_FOR_EACH (node, &dbmon->tables) {
1232 struct ovsdb_monitor_table *mt = node->data;
1233 ovsdb_monitor_table_untrack_changes(mt, unflushed);
1235 ovs_list_remove(&jm->node);
1238 /* Destroy ovsdb monitor if this is the last user. */
1239 if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
1240 ovsdb_monitor_destroy(dbmon);
1247 /* Should never reach here. jsonrpc_monitor should be on the list. */
1252 ovsdb_monitor_table_equal(const struct ovsdb_monitor_table *a,
1253 const struct ovsdb_monitor_table *b)
1257 ovs_assert(b->n_columns == b->n_monitored_columns);
1259 if ((a->table != b->table) ||
1260 (a->select != b->select) ||
1261 (a->n_monitored_columns != b->n_monitored_columns)) {
1265 /* Compare only monitored columns that must be sorted already */
1266 for (i = 0; i < a->n_monitored_columns; i++) {
1267 if ((a->columns[i].column != b->columns[i].column) ||
1268 (a->columns[i].select != b->columns[i].select)) {
1276 ovsdb_monitor_equal(const struct ovsdb_monitor *a,
1277 const struct ovsdb_monitor *b)
1279 struct shash_node *node;
1281 if (shash_count(&a->tables) != shash_count(&b->tables)) {
1285 SHASH_FOR_EACH(node, &a->tables) {
1286 const struct ovsdb_monitor_table *mta = node->data;
1287 const struct ovsdb_monitor_table *mtb;
1289 mtb = shash_find_data(&b->tables, node->name);
1294 if (!ovsdb_monitor_table_equal(mta, mtb)) {
1303 ovsdb_monitor_hash(const struct ovsdb_monitor *dbmon, size_t basis)
1305 const struct shash_node **nodes;
1308 nodes = shash_sort(&dbmon->tables);
1309 n = shash_count(&dbmon->tables);
1311 for (i = 0; i < n; i++) {
1312 struct ovsdb_monitor_table *mt = nodes[i]->data;
1314 basis = hash_pointer(mt->table, basis);
1315 basis = hash_3words(mt->select, mt->n_columns, basis);
1317 for (j = 0; j < mt->n_columns; j++) {
1318 basis = hash_pointer(mt->columns[j].column, basis);
1319 basis = hash_2words(mt->columns[j].select, basis);
1327 struct ovsdb_monitor *
1328 ovsdb_monitor_add(struct ovsdb_monitor *new_dbmon)
1330 struct ovsdb_monitor *dbmon;
1333 /* New_dbmon should be associated with only one jsonrpc
1335 ovs_assert(ovs_list_is_singleton(&new_dbmon->jsonrpc_monitors));
1337 ovsdb_monitor_columns_sort(new_dbmon);
1339 hash = ovsdb_monitor_hash(new_dbmon, 0);
1340 HMAP_FOR_EACH_WITH_HASH(dbmon, hmap_node, hash, &ovsdb_monitors) {
1341 if (ovsdb_monitor_equal(dbmon, new_dbmon)) {
1346 hmap_insert(&ovsdb_monitors, &new_dbmon->hmap_node, hash);
1351 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
1353 struct shash_node *node;
1355 ovs_list_remove(&dbmon->replica.node);
1357 if (!hmap_node_is_null(&dbmon->hmap_node)) {
1358 hmap_remove(&ovsdb_monitors, &dbmon->hmap_node);
1361 ovsdb_monitor_json_cache_flush(dbmon);
1362 hmap_destroy(&dbmon->json_cache);
1364 SHASH_FOR_EACH (node, &dbmon->tables) {
1365 struct ovsdb_monitor_table *mt = node->data;
1366 struct ovsdb_monitor_changes *changes, *next;
1368 HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
1369 hmap_remove(&mt->changes, &changes->hmap_node);
1370 ovsdb_monitor_changes_destroy(changes);
1372 hmap_destroy(&mt->changes);
1374 free(mt->columns_index_map);
1377 shash_destroy(&dbmon->tables);
1381 static struct ovsdb_error *
1382 ovsdb_monitor_commit(struct ovsdb_replica *replica,
1383 const struct ovsdb_txn *txn,
1384 bool durable OVS_UNUSED)
1386 struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
1387 struct ovsdb_monitor_aux aux;
1389 ovsdb_monitor_init_aux(&aux, m);
1390 /* Update ovsdb_monitor's transaction number for
1391 * each transaction, before calling ovsdb_monitor_change_cb(). */
1392 m->n_transactions++;
1393 ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
1395 switch(aux.efficacy) {
1396 case OVSDB_CHANGES_NO_EFFECT:
1397 /* The transaction is ignored by the monitor.
1398 * Roll back the 'n_transactions' as if the transaction
1399 * has never happened. */
1400 m->n_transactions--;
1402 case OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE:
1405 case OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE:
1406 ovsdb_monitor_json_cache_flush(m);
1414 ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
1416 struct ovsdb_monitor *dbmon = ovsdb_monitor_cast(replica);
1417 struct jsonrpc_monitor_node *jm, *next;
1419 /* Delete all front end monitors. Removing the last front
1420 * end monitor will also destroy the corresponding 'ovsdb_monitor'.
1421 * ovsdb monitor will also be destroied. */
1422 LIST_FOR_EACH_SAFE(jm, next, node, &dbmon->jsonrpc_monitors) {
1423 ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor);
1427 /* Add some memory usage statics for monitors into 'usage', for use with
1428 * memory_report(). */
1430 ovsdb_monitor_get_memory_usage(struct simap *usage)
1432 struct ovsdb_monitor *dbmon;
1433 simap_put(usage, "monitors", hmap_count(&ovsdb_monitors));
1435 HMAP_FOR_EACH(dbmon, hmap_node, &ovsdb_monitors) {
1436 simap_increase(usage, "json-caches", hmap_count(&dbmon->json_cache));
1440 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1441 ovsdb_monitor_commit,
1442 ovsdb_monitor_destroy_callback,