2 * Copyright (c) 2015 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 #include "dynamic-string.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
34 #include "transaction.h"
35 #include "jsonrpc-server.h"
37 #include "openvswitch/vlog.h"
40 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
44 * ovsdb_monitor keep track of the ovsdb changes.
47 /* A collection of tables being monitored. */
48 struct ovsdb_monitor {
49 struct ovsdb_replica replica;
50 struct shash tables; /* Holds "struct ovsdb_monitor_table"s. */
51 struct ovs_list jsonrpc_monitors; /* Contains "jsonrpc_monitor_node"s. */
53 uint64_t n_transactions; /* Count number of committed transactions. */
56 struct jsonrpc_monitor_node {
57 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
61 /* A particular column being monitored. */
62 struct ovsdb_monitor_column {
63 const struct ovsdb_column *column;
64 enum ovsdb_monitor_selection select;
67 /* A row that has changed in a monitored table. */
68 struct ovsdb_monitor_row {
69 struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
70 struct uuid uuid; /* UUID of row that changed. */
71 struct ovsdb_datum *old; /* Old data, NULL for an inserted row. */
72 struct ovsdb_datum *new; /* New data, NULL for a deleted row. */
75 /* Contains 'struct ovsdb_monitor_row's for rows that have been
76 * updated but not yet flushed to all the jsonrpc connection.
78 * 'n_refs' represent the number of jsonrpc connections that have
79 * not received updates. Generate the update for the last jsonprc
80 * connection will also destroy the whole "struct ovsdb_monitor_changes"
83 * 'transaction' stores the first update's transaction id.
85 struct ovsdb_monitor_changes {
86 struct ovsdb_monitor_table *mt;
90 struct hmap_node hmap_node; /* Element in ovsdb_monitor_tables' changes
94 /* A particular table being monitored. */
95 struct ovsdb_monitor_table {
96 const struct ovsdb_table *table;
98 /* This is the union (bitwise-OR) of the 'select' values in all of the
99 * members of 'columns' below. */
100 enum ovsdb_monitor_selection select;
102 /* Columns being monitored. */
103 struct ovsdb_monitor_column *columns;
106 /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
110 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
111 static void ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
113 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
114 struct ovsdb_monitor_table *mt, uint64_t unflushed);
115 static void ovsdb_monitor_changes_destroy(
116 struct ovsdb_monitor_changes *changes);
117 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
121 compare_ovsdb_monitor_column(const void *a_, const void *b_)
123 const struct ovsdb_monitor_column *a = a_;
124 const struct ovsdb_monitor_column *b = b_;
126 return a->column < b->column ? -1 : a->column > b->column;
129 static struct ovsdb_monitor *
130 ovsdb_monitor_cast(struct ovsdb_replica *replica)
132 ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
133 return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
136 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
137 * given 'uuid', or NULL if there is no such row. */
138 static struct ovsdb_monitor_row *
139 ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
140 const struct uuid *uuid)
142 struct ovsdb_monitor_row *row;
144 HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
146 if (uuid_equals(uuid, &row->uuid)) {
153 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
154 * copies of the data in 'row' drawn from the columns represented by
155 * mt->columns[]. Returns the array.
157 * If 'row' is NULL, returns NULL. */
158 static struct ovsdb_datum *
159 clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
160 const struct ovsdb_row *row)
162 struct ovsdb_datum *data;
169 data = xmalloc(mt->n_columns * sizeof *data);
170 for (i = 0; i < mt->n_columns; i++) {
171 const struct ovsdb_column *c = mt->columns[i].column;
172 const struct ovsdb_datum *src = &row->fields[c->index];
173 struct ovsdb_datum *dst = &data[i];
174 const struct ovsdb_type *type = &c->type;
176 ovsdb_datum_clone(dst, src, type);
181 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
182 * in 'row' drawn from the columns represented by mt->columns[]. */
184 update_monitor_row_data(const struct ovsdb_monitor_table *mt,
185 const struct ovsdb_row *row,
186 struct ovsdb_datum *data)
190 for (i = 0; i < mt->n_columns; i++) {
191 const struct ovsdb_column *c = mt->columns[i].column;
192 const struct ovsdb_datum *src = &row->fields[c->index];
193 struct ovsdb_datum *dst = &data[i];
194 const struct ovsdb_type *type = &c->type;
196 if (!ovsdb_datum_equals(src, dst, type)) {
197 ovsdb_datum_destroy(dst, type);
198 ovsdb_datum_clone(dst, src, type);
203 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
204 * from mt->columns[], plus 'data' itself. */
206 free_monitor_row_data(const struct ovsdb_monitor_table *mt,
207 struct ovsdb_datum *data)
212 for (i = 0; i < mt->n_columns; i++) {
213 const struct ovsdb_column *c = mt->columns[i].column;
215 ovsdb_datum_destroy(&data[i], &c->type);
221 /* Frees 'row', which must have been created from 'mt'. */
223 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
224 struct ovsdb_monitor_row *row)
227 free_monitor_row_data(mt, row->old);
228 free_monitor_row_data(mt, row->new);
234 ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
235 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
237 struct jsonrpc_monitor_node *jm;
239 jm = xzalloc(sizeof *jm);
240 jm->jsonrpc_monitor = jsonrpc_monitor;
241 list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
244 struct ovsdb_monitor *
245 ovsdb_monitor_create(struct ovsdb *db,
246 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
248 struct ovsdb_monitor *dbmon;
250 dbmon = xzalloc(sizeof *dbmon);
252 ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
253 ovsdb_add_replica(db, &dbmon->replica);
254 list_init(&dbmon->jsonrpc_monitors);
256 dbmon->n_transactions = 0;
257 shash_init(&dbmon->tables);
259 ovsdb_monitor_add_jsonrpc_monitor(dbmon, jsonrpc_monitor);
264 ovsdb_monitor_add_table(struct ovsdb_monitor *m,
265 const struct ovsdb_table *table)
267 struct ovsdb_monitor_table *mt;
269 mt = xzalloc(sizeof *mt);
271 shash_add(&m->tables, table->schema->name, mt);
272 hmap_init(&mt->changes);
276 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
277 const struct ovsdb_table *table,
278 const struct ovsdb_column *column,
279 enum ovsdb_monitor_selection select,
280 size_t *allocated_columns)
282 struct ovsdb_monitor_table *mt;
283 struct ovsdb_monitor_column *c;
285 mt = shash_find_data(&dbmon->tables, table->schema->name);
287 if (mt->n_columns >= *allocated_columns) {
288 mt->columns = x2nrealloc(mt->columns, allocated_columns,
289 sizeof *mt->columns);
292 mt->select |= select;
293 c = &mt->columns[mt->n_columns++];
298 /* Check for duplicated column names. Return the first
299 * duplicated column's name if found. Otherwise return
301 const char * OVS_WARN_UNUSED_RESULT
302 ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
303 const struct ovsdb_table *table)
305 struct ovsdb_monitor_table *mt;
308 mt = shash_find_data(&m->tables, table->schema->name);
311 /* Check for duplicate columns. */
312 qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
313 compare_ovsdb_monitor_column);
314 for (i = 1; i < mt->n_columns; i++) {
315 if (mt->columns[i].column == mt->columns[i - 1].column) {
316 return mt->columns[i].column->name;
325 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
328 struct ovsdb_monitor_changes *changes;
330 changes = xzalloc(sizeof *changes);
332 changes->transaction = next_txn;
335 hmap_init(&changes->rows);
336 hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
339 static struct ovsdb_monitor_changes *
340 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
341 uint64_t transaction)
343 struct ovsdb_monitor_changes *changes;
344 size_t hash = hash_uint64(transaction);
346 HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
347 if (changes->transaction == transaction) {
355 /* Stop currently tracking changes to table 'mt' since 'transaction'.
357 * Return 'true' if the 'transaction' is being tracked. 'false' otherwise. */
359 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
360 uint64_t transaction)
362 struct ovsdb_monitor_changes *changes =
363 ovsdb_monitor_table_find_changes(mt, transaction);
365 ovs_assert(changes->transaction == transaction);
366 if (--changes->n_refs == 0) {
367 hmap_remove(&mt->changes, &changes->hmap_node);
368 ovsdb_monitor_changes_destroy(changes);
373 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
376 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
377 uint64_t transaction)
379 struct ovsdb_monitor_changes *changes;
381 changes = ovsdb_monitor_table_find_changes(mt, transaction);
385 ovsdb_monitor_table_add_changes(mt, transaction);
390 ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
392 struct ovsdb_monitor_row *row, *next;
394 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
395 hmap_remove(&changes->rows, &row->hmap_node);
396 ovsdb_monitor_row_destroy(changes->mt, row);
398 hmap_destroy(&changes->rows);
402 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
403 * 'mt', or NULL if no row update should be sent.
405 * The caller should specify 'initial' as true if the returned JSON is going to
406 * be used as part of the initial reply to a "monitor" request, false if it is
407 * going to be used as part of an "update" notification.
409 * 'changed' must be a scratch buffer for internal use that is at least
410 * bitmap_n_bytes(mt->n_columns) bytes long. */
412 ovsdb_monitor_compose_row_update(
413 const struct ovsdb_monitor_table *mt,
414 const struct ovsdb_monitor_row *row,
415 bool initial, unsigned long int *changed)
417 enum ovsdb_monitor_selection type;
418 struct json *old_json, *new_json;
419 struct json *row_json;
422 type = (initial ? OJMS_INITIAL
423 : !row->old ? OJMS_INSERT
424 : !row->new ? OJMS_DELETE
426 if (!(mt->select & type)) {
430 if (type == OJMS_MODIFY) {
434 memset(changed, 0, bitmap_n_bytes(mt->n_columns));
435 for (i = 0; i < mt->n_columns; i++) {
436 const struct ovsdb_column *c = mt->columns[i].column;
437 if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
438 bitmap_set1(changed, i);
443 /* No actual changes: presumably a row changed and then
444 * changed back later. */
449 row_json = json_object_create();
450 old_json = new_json = NULL;
451 if (type & (OJMS_DELETE | OJMS_MODIFY)) {
452 old_json = json_object_create();
453 json_object_put(row_json, "old", old_json);
455 if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
456 new_json = json_object_create();
457 json_object_put(row_json, "new", new_json);
459 for (i = 0; i < mt->n_columns; i++) {
460 const struct ovsdb_monitor_column *c = &mt->columns[i];
462 if (!(type & c->select)) {
463 /* We don't care about this type of change for this
464 * particular column (but we will care about it for some
469 if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
470 || type == OJMS_DELETE) {
471 json_object_put(old_json, c->column->name,
472 ovsdb_datum_to_json(&row->old[i],
475 if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
476 json_object_put(new_json, c->column->name,
477 ovsdb_datum_to_json(&row->new[i],
485 /* Constructs and returns JSON for a <table-updates> object (as described in
486 * RFC 7047) for all the outstanding changes within 'monitor', and deletes all
487 * the outstanding changes from 'monitor'. Returns NULL if no update needs to
490 * The caller should specify 'initial' as true if the returned JSON is going to
491 * be used as part of the initial reply to a "monitor" request, false if it is
492 * going to be used as part of an "update" notification.
494 * 'unflushed' should point to value that is the transaction ID that did
495 * was not updated. The update contains changes between
496 * ['unflushed, ovsdb->n_transcations]. Before the function returns, this
497 * value will be updated to ovsdb->n_transactions + 1, ready for the next
500 ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
501 bool initial, uint64_t *unflushed)
503 struct shash_node *node;
504 unsigned long int *changed;
507 uint64_t prev_txn = *unflushed;
508 uint64_t next_txn = dbmon->n_transactions + 1;
511 SHASH_FOR_EACH (node, &dbmon->tables) {
512 struct ovsdb_monitor_table *mt = node->data;
514 max_columns = MAX(max_columns, mt->n_columns);
516 changed = xmalloc(bitmap_n_bytes(max_columns));
519 SHASH_FOR_EACH (node, &dbmon->tables) {
520 struct ovsdb_monitor_table *mt = node->data;
521 struct ovsdb_monitor_row *row, *next;
522 struct ovsdb_monitor_changes *changes;
523 struct json *table_json = NULL;
525 changes = ovsdb_monitor_table_find_changes(mt, prev_txn);
527 ovsdb_monitor_table_track_changes(mt, next_txn);
531 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
532 struct json *row_json;
534 row_json = ovsdb_monitor_compose_row_update(
535 mt, row, initial, changed);
537 char uuid[UUID_LEN + 1];
539 /* Create JSON object for transaction overall. */
541 json = json_object_create();
544 /* Create JSON object for transaction on this table. */
546 table_json = json_object_create();
547 json_object_put(json, mt->table->schema->name, table_json);
550 /* Add JSON row to JSON table. */
551 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
552 json_object_put(table_json, uuid, row_json);
555 hmap_remove(&changes->rows, &row->hmap_node);
556 ovsdb_monitor_row_destroy(mt, row);
559 ovsdb_monitor_table_untrack_changes(mt, prev_txn);
560 ovsdb_monitor_table_track_changes(mt, next_txn);
563 *unflushed = next_txn;
569 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
570 uint64_t next_transaction)
572 ovs_assert(next_transaction <= dbmon->n_transactions + 1);
573 return (next_transaction <= dbmon->n_transactions);
577 ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
578 const struct ovsdb_table *table,
579 enum ovsdb_monitor_selection select)
581 struct ovsdb_monitor_table * mt;
583 mt = shash_find_data(&dbmon->tables, table->schema->name);
584 mt->select |= select;
587 struct ovsdb_monitor_aux {
588 const struct ovsdb_monitor *monitor;
589 struct ovsdb_monitor_table *mt;
593 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
594 const struct ovsdb_monitor *m)
601 ovsdb_monitor_changes_update(const struct ovsdb_row *old,
602 const struct ovsdb_row *new,
603 const struct ovsdb_monitor_table *mt,
604 struct ovsdb_monitor_changes *changes)
606 const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
607 struct ovsdb_monitor_row *change;
609 change = ovsdb_monitor_changes_row_find(changes, uuid);
611 change = xzalloc(sizeof *change);
612 hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
613 change->uuid = *uuid;
614 change->old = clone_monitor_row_data(mt, old);
615 change->new = clone_monitor_row_data(mt, new);
618 update_monitor_row_data(mt, new, change->new);
620 free_monitor_row_data(mt, change->new);
624 /* This row was added then deleted. Forget about it. */
625 hmap_remove(&changes->rows, &change->hmap_node);
633 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
634 const struct ovsdb_row *new,
635 const unsigned long int *changed OVS_UNUSED,
638 struct ovsdb_monitor_aux *aux = aux_;
639 const struct ovsdb_monitor *m = aux->monitor;
640 struct ovsdb_table *table = new ? new->table : old->table;
641 struct ovsdb_monitor_table *mt;
642 struct ovsdb_monitor_changes *changes;
644 if (!aux->mt || table != aux->mt->table) {
645 aux->mt = shash_find_data(&m->tables, table->schema->name);
647 /* We don't care about rows in this table at all. Tell the caller
654 HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
655 ovsdb_monitor_changes_update(old, new, mt, changes);
661 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
663 struct ovsdb_monitor_aux aux;
664 struct shash_node *node;
666 ovsdb_monitor_init_aux(&aux, dbmon);
667 SHASH_FOR_EACH (node, &dbmon->tables) {
668 struct ovsdb_monitor_table *mt = node->data;
670 if (mt->select & OJMS_INITIAL) {
671 struct ovsdb_row *row;
673 if (hmap_is_empty(&mt->changes)) {
674 ovsdb_monitor_table_add_changes(mt, 0);
677 HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
678 ovsdb_monitor_change_cb(NULL, row, NULL, &aux);
685 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
686 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
688 struct jsonrpc_monitor_node *jm;
690 /* Find and remove the jsonrpc monitor from the list. */
691 LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
692 if (jm->jsonrpc_monitor == jsonrpc_monitor) {
693 list_remove(&jm->node);
696 /* Destroy ovsdb monitor if this is the last user. */
697 if (list_is_empty(&dbmon->jsonrpc_monitors)) {
698 ovsdb_monitor_destroy(dbmon);
705 /* Should never reach here. jsonrpc_monitor should be on the list. */
710 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
712 struct shash_node *node;
714 list_remove(&dbmon->replica.node);
716 SHASH_FOR_EACH (node, &dbmon->tables) {
717 struct ovsdb_monitor_table *mt = node->data;
718 struct ovsdb_monitor_changes *changes, *next;
720 HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
721 hmap_remove(&mt->changes, &changes->hmap_node);
722 ovsdb_monitor_changes_destroy(changes);
727 shash_destroy(&dbmon->tables);
731 static struct ovsdb_error *
732 ovsdb_monitor_commit(struct ovsdb_replica *replica,
733 const struct ovsdb_txn *txn,
734 bool durable OVS_UNUSED)
736 struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
737 struct ovsdb_monitor_aux aux;
739 ovsdb_monitor_init_aux(&aux, m);
740 ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
747 ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
749 struct ovsdb_monitor *dbmon = ovsdb_monitor_cast(replica);
750 struct jsonrpc_monitor_node *jm, *next;
752 /* Delete all front end monitors. Removing the last front
753 * end monitor will also destroy the corresponding 'ovsdb_monitor'.
754 * ovsdb monitor will also be destroied. */
755 LIST_FOR_EACH_SAFE(jm, next, node, &dbmon->jsonrpc_monitors) {
756 ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor);
760 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
761 ovsdb_monitor_commit,
762 ovsdb_monitor_destroy_callback,