ovsdb: rename variables in ovsdb_monitor_get_update()
[cascardo/ovs.git] / ovsdb / monitor.c
1 /*
2  * Copyright (c) 2015 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include <errno.h>
20
21 #include "bitmap.h"
22 #include "column.h"
23 #include "dynamic-string.h"
24 #include "json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
28 #include "ovsdb.h"
29 #include "row.h"
30 #include "simap.h"
31 #include "hash.h"
32 #include "table.h"
33 #include "hash.h"
34 #include "timeval.h"
35 #include "transaction.h"
36 #include "jsonrpc-server.h"
37 #include "monitor.h"
38 #include "openvswitch/vlog.h"
39
40
41 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
42 static struct hmap ovsdb_monitors = HMAP_INITIALIZER(&ovsdb_monitors);
43
44 /*  Backend monitor.
45  *
46  *  ovsdb_monitor keep track of the ovsdb changes.
47  */
48
49 /* A collection of tables being monitored. */
50 struct ovsdb_monitor {
51     struct ovsdb_replica replica;
52     struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
53     struct ovs_list jsonrpc_monitors;  /* Contains "jsonrpc_monitor_node"s. */
54     struct ovsdb *db;
55     uint64_t n_transactions;      /* Count number of committed transactions. */
56     struct hmap_node hmap_node;   /* Elements within ovsdb_monitors.  */
57     struct hmap json_cache;       /* Contains "ovsdb_monitor_json_cache_node"s.*/
58 };
59
60 /* A json object of updates between 'from_txn' and 'dbmon->n_transactions'
61  * inclusive.  */
62 struct ovsdb_monitor_json_cache_node {
63     struct hmap_node hmap_node;   /* Elements in json cache. */
64     enum ovsdb_monitor_version version;
65     uint64_t from_txn;
66     struct json *json;            /* Null, or a cloned of json */
67 };
68
69 struct jsonrpc_monitor_node {
70     struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
71     struct ovs_list node;
72 };
73
74 /* A particular column being monitored. */
75 struct ovsdb_monitor_column {
76     const struct ovsdb_column *column;
77     enum ovsdb_monitor_selection select;
78 };
79
80 /* A row that has changed in a monitored table. */
81 struct ovsdb_monitor_row {
82     struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
83     struct uuid uuid;           /* UUID of row that changed. */
84     struct ovsdb_datum *old;    /* Old data, NULL for an inserted row. */
85     struct ovsdb_datum *new;    /* New data, NULL for a deleted row. */
86 };
87
88 /* Contains 'struct ovsdb_monitor_row's for rows that have been
89  * updated but not yet flushed to all the jsonrpc connection.
90  *
91  * 'n_refs' represent the number of jsonrpc connections that have
92  * not received updates. Generate the update for the last jsonprc
93  * connection will also destroy the whole "struct ovsdb_monitor_changes"
94  * object.
95  *
96  * 'transaction' stores the first update's transaction id.
97  * */
98 struct ovsdb_monitor_changes {
99     struct ovsdb_monitor_table *mt;
100     struct hmap rows;
101     int n_refs;
102     uint64_t transaction;
103     struct hmap_node hmap_node;  /* Element in ovsdb_monitor_tables' changes
104                                     hmap.  */
105 };
106
107 /* A particular table being monitored. */
108 struct ovsdb_monitor_table {
109     const struct ovsdb_table *table;
110
111     /* This is the union (bitwise-OR) of the 'select' values in all of the
112      * members of 'columns' below. */
113     enum ovsdb_monitor_selection select;
114
115     /* Columns being monitored. */
116     struct ovsdb_monitor_column *columns;
117     size_t n_columns;
118
119     /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
120     struct hmap changes;
121 };
122
123 typedef struct json *
124 (*compose_row_update_cb_func)(const struct ovsdb_monitor_table *mt,
125                               const struct ovsdb_monitor_row *row,
126                               bool initial, unsigned long int *changed);
127
128 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
129 static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
130     struct ovsdb_monitor_table *mt, uint64_t next_txn);
131 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
132     struct ovsdb_monitor_table *mt, uint64_t unflushed);
133 static void ovsdb_monitor_changes_destroy(
134                                   struct ovsdb_monitor_changes *changes);
135 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
136                                   uint64_t unflushed);
137
138 static uint32_t
139 json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn)
140 {
141     uint32_t hash;
142
143     hash = hash_uint64(version);
144     hash = hash_uint64_basis(from_txn, hash);
145
146     return hash;
147 }
148
149 static struct ovsdb_monitor_json_cache_node *
150 ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
151                                 enum ovsdb_monitor_version version,
152                                 uint64_t from_txn)
153 {
154     struct ovsdb_monitor_json_cache_node *node;
155     uint32_t hash = json_cache_hash(version, from_txn);
156
157     HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) {
158         if (node->from_txn == from_txn && node->version == version) {
159             return node;
160         }
161     }
162
163     return NULL;
164 }
165
166 static void
167 ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon,
168                                 enum ovsdb_monitor_version version,
169                                 uint64_t from_txn, struct json *json)
170 {
171     struct ovsdb_monitor_json_cache_node *node;
172     uint32_t hash = json_cache_hash(version, from_txn);
173
174     node = xmalloc(sizeof *node);
175
176     node->version = version;
177     node->from_txn = from_txn;
178     node->json = json ? json_clone(json) : NULL;
179
180     hmap_insert(&dbmon->json_cache, &node->hmap_node, hash);
181 }
182
183 static void
184 ovsdb_monitor_json_cache_flush(struct ovsdb_monitor *dbmon)
185 {
186     struct ovsdb_monitor_json_cache_node *node, *next;
187
188     HMAP_FOR_EACH_SAFE(node, next, hmap_node, &dbmon->json_cache) {
189         hmap_remove(&dbmon->json_cache, &node->hmap_node);
190         json_destroy(node->json);
191         free(node);
192     }
193 }
194
195 static int
196 compare_ovsdb_monitor_column(const void *a_, const void *b_)
197 {
198     const struct ovsdb_monitor_column *a = a_;
199     const struct ovsdb_monitor_column *b = b_;
200
201     return a->column < b->column ? -1 : a->column > b->column;
202 }
203
204 static struct ovsdb_monitor *
205 ovsdb_monitor_cast(struct ovsdb_replica *replica)
206 {
207     ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
208     return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
209 }
210
211 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
212  * given 'uuid', or NULL if there is no such row. */
213 static struct ovsdb_monitor_row *
214 ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
215                                const struct uuid *uuid)
216 {
217     struct ovsdb_monitor_row *row;
218
219     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
220                              &changes->rows) {
221         if (uuid_equals(uuid, &row->uuid)) {
222             return row;
223         }
224     }
225     return NULL;
226 }
227
228 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
229  * copies of the data in 'row' drawn from the columns represented by
230  * mt->columns[].  Returns the array.
231  *
232  * If 'row' is NULL, returns NULL. */
233 static struct ovsdb_datum *
234 clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
235                        const struct ovsdb_row *row)
236 {
237     struct ovsdb_datum *data;
238     size_t i;
239
240     if (!row) {
241         return NULL;
242     }
243
244     data = xmalloc(mt->n_columns * sizeof *data);
245     for (i = 0; i < mt->n_columns; i++) {
246         const struct ovsdb_column *c = mt->columns[i].column;
247         const struct ovsdb_datum *src = &row->fields[c->index];
248         struct ovsdb_datum *dst = &data[i];
249         const struct ovsdb_type *type = &c->type;
250
251         ovsdb_datum_clone(dst, src, type);
252     }
253     return data;
254 }
255
256 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
257  * in 'row' drawn from the columns represented by mt->columns[]. */
258 static void
259 update_monitor_row_data(const struct ovsdb_monitor_table *mt,
260                         const struct ovsdb_row *row,
261                         struct ovsdb_datum *data)
262 {
263     size_t i;
264
265     for (i = 0; i < mt->n_columns; i++) {
266         const struct ovsdb_column *c = mt->columns[i].column;
267         const struct ovsdb_datum *src = &row->fields[c->index];
268         struct ovsdb_datum *dst = &data[i];
269         const struct ovsdb_type *type = &c->type;
270
271         if (!ovsdb_datum_equals(src, dst, type)) {
272             ovsdb_datum_destroy(dst, type);
273             ovsdb_datum_clone(dst, src, type);
274         }
275     }
276 }
277
278 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
279  * from mt->columns[], plus 'data' itself. */
280 static void
281 free_monitor_row_data(const struct ovsdb_monitor_table *mt,
282                       struct ovsdb_datum *data)
283 {
284     if (data) {
285         size_t i;
286
287         for (i = 0; i < mt->n_columns; i++) {
288             const struct ovsdb_column *c = mt->columns[i].column;
289
290             ovsdb_datum_destroy(&data[i], &c->type);
291         }
292         free(data);
293     }
294 }
295
296 /* Frees 'row', which must have been created from 'mt'. */
297 static void
298 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
299                           struct ovsdb_monitor_row *row)
300 {
301     if (row) {
302         free_monitor_row_data(mt, row->old);
303         free_monitor_row_data(mt, row->new);
304         free(row);
305     }
306 }
307
308 void
309 ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
310                                   struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
311 {
312     struct jsonrpc_monitor_node *jm;
313
314     jm = xzalloc(sizeof *jm);
315     jm->jsonrpc_monitor = jsonrpc_monitor;
316     list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
317 }
318
319 struct ovsdb_monitor *
320 ovsdb_monitor_create(struct ovsdb *db,
321                      struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
322 {
323     struct ovsdb_monitor *dbmon;
324
325     dbmon = xzalloc(sizeof *dbmon);
326
327     ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
328     ovsdb_add_replica(db, &dbmon->replica);
329     list_init(&dbmon->jsonrpc_monitors);
330     dbmon->db = db;
331     dbmon->n_transactions = 0;
332     shash_init(&dbmon->tables);
333     hmap_node_nullify(&dbmon->hmap_node);
334     hmap_init(&dbmon->json_cache);
335
336     ovsdb_monitor_add_jsonrpc_monitor(dbmon, jsonrpc_monitor);
337     return dbmon;
338 }
339
340 void
341 ovsdb_monitor_add_table(struct ovsdb_monitor *m,
342                         const struct ovsdb_table *table)
343 {
344     struct ovsdb_monitor_table *mt;
345
346     mt = xzalloc(sizeof *mt);
347     mt->table = table;
348     shash_add(&m->tables, table->schema->name, mt);
349     hmap_init(&mt->changes);
350 }
351
352 void
353 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
354                          const struct ovsdb_table *table,
355                          const struct ovsdb_column *column,
356                          enum ovsdb_monitor_selection select,
357                          size_t *allocated_columns)
358 {
359     struct ovsdb_monitor_table *mt;
360     struct ovsdb_monitor_column *c;
361
362     mt = shash_find_data(&dbmon->tables, table->schema->name);
363
364     if (mt->n_columns >= *allocated_columns) {
365         mt->columns = x2nrealloc(mt->columns, allocated_columns,
366                                  sizeof *mt->columns);
367     }
368
369     mt->select |= select;
370     c = &mt->columns[mt->n_columns++];
371     c->column = column;
372     c->select = select;
373 }
374
375 /* Check for duplicated column names. Return the first
376  * duplicated column's name if found. Otherwise return
377  * NULL.  */
378 const char * OVS_WARN_UNUSED_RESULT
379 ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
380                                      const struct ovsdb_table *table)
381 {
382     struct ovsdb_monitor_table *mt;
383     int i;
384
385     mt = shash_find_data(&m->tables, table->schema->name);
386
387     if (mt) {
388         /* Check for duplicate columns. */
389         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
390               compare_ovsdb_monitor_column);
391         for (i = 1; i < mt->n_columns; i++) {
392             if (mt->columns[i].column == mt->columns[i - 1].column) {
393                 return mt->columns[i].column->name;
394             }
395         }
396     }
397
398     return NULL;
399 }
400
401 static struct ovsdb_monitor_changes *
402 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
403                                 uint64_t next_txn)
404 {
405     struct ovsdb_monitor_changes *changes;
406
407     changes = xzalloc(sizeof *changes);
408
409     changes->transaction = next_txn;
410     changes->mt = mt;
411     changes->n_refs = 1;
412     hmap_init(&changes->rows);
413     hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
414
415     return changes;
416 };
417
418 static struct ovsdb_monitor_changes *
419 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
420                                  uint64_t transaction)
421 {
422     struct ovsdb_monitor_changes *changes;
423     size_t hash = hash_uint64(transaction);
424
425     HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
426         if (changes->transaction == transaction) {
427             return changes;
428         }
429     }
430
431     return NULL;
432 }
433
434 /* Stop currently tracking changes to table 'mt' since 'transaction'. */
435 static void
436 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
437                                     uint64_t transaction)
438 {
439     struct ovsdb_monitor_changes *changes =
440                 ovsdb_monitor_table_find_changes(mt, transaction);
441     if (changes) {
442         if (--changes->n_refs == 0) {
443             hmap_remove(&mt->changes, &changes->hmap_node);
444             ovsdb_monitor_changes_destroy(changes);
445         }
446     }
447 }
448
449 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
450  */
451 static void
452 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
453                                   uint64_t transaction)
454 {
455     struct ovsdb_monitor_changes *changes;
456
457     changes = ovsdb_monitor_table_find_changes(mt, transaction);
458     if (changes) {
459         changes->n_refs++;
460     } else {
461         ovsdb_monitor_table_add_changes(mt, transaction);
462     }
463 }
464
465 static void
466 ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
467 {
468     struct ovsdb_monitor_row *row, *next;
469
470     HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
471         hmap_remove(&changes->rows, &row->hmap_node);
472         ovsdb_monitor_row_destroy(changes->mt, row);
473     }
474     hmap_destroy(&changes->rows);
475     free(changes);
476 }
477
478 static enum ovsdb_monitor_selection
479 ovsdb_monitor_row_update_type(bool initial, const bool old, const bool new)
480 {
481     return initial ? OJMS_INITIAL
482             : !old ? OJMS_INSERT
483             : !new ? OJMS_DELETE
484             : OJMS_MODIFY;
485 }
486 static bool
487 ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
488                               const struct ovsdb_monitor_row *row,
489                               enum ovsdb_monitor_selection type,
490                               unsigned long int *changed)
491 {
492     if (!(mt->select & type)) {
493         return true;
494     }
495
496     if (type == OJMS_MODIFY) {
497         size_t i, n_changes;
498
499         n_changes = 0;
500         memset(changed, 0, bitmap_n_bytes(mt->n_columns));
501         for (i = 0; i < mt->n_columns; i++) {
502             const struct ovsdb_column *c = mt->columns[i].column;
503             if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
504                 bitmap_set1(changed, i);
505                 n_changes++;
506             }
507         }
508         if (!n_changes) {
509             /* No actual changes: presumably a row changed and then
510              * changed back later. */
511             return true;
512         }
513     }
514
515     return false;
516 }
517
518 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
519  * 'mt', or NULL if no row update should be sent.
520  *
521  * The caller should specify 'initial' as true if the returned JSON is going to
522  * be used as part of the initial reply to a "monitor" request, false if it is
523  * going to be used as part of an "update" notification.
524  *
525  * 'changed' must be a scratch buffer for internal use that is at least
526  * bitmap_n_bytes(mt->n_columns) bytes long. */
527 static struct json *
528 ovsdb_monitor_compose_row_update(
529     const struct ovsdb_monitor_table *mt,
530     const struct ovsdb_monitor_row *row,
531     bool initial, unsigned long int *changed)
532 {
533     enum ovsdb_monitor_selection type;
534     struct json *old_json, *new_json;
535     struct json *row_json;
536     size_t i;
537
538     type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
539     if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
540         return NULL;
541     }
542
543     row_json = json_object_create();
544     old_json = new_json = NULL;
545     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
546         old_json = json_object_create();
547         json_object_put(row_json, "old", old_json);
548     }
549     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
550         new_json = json_object_create();
551         json_object_put(row_json, "new", new_json);
552     }
553     for (i = 0; i < mt->n_columns; i++) {
554         const struct ovsdb_monitor_column *c = &mt->columns[i];
555
556         if (!(type & c->select)) {
557             /* We don't care about this type of change for this
558              * particular column (but we will care about it for some
559              * other column). */
560             continue;
561         }
562
563         if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
564             || type == OJMS_DELETE) {
565             json_object_put(old_json, c->column->name,
566                             ovsdb_datum_to_json(&row->old[i],
567                                                 &c->column->type));
568         }
569         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
570             json_object_put(new_json, c->column->name,
571                             ovsdb_datum_to_json(&row->new[i],
572                                                 &c->column->type));
573         }
574     }
575
576     return row_json;
577 }
578
579 /* Returns JSON for a <row-update2> (as described in ovsdb-server(1) mapage)
580  * for 'row' within * 'mt', or NULL if no row update should be sent.
581  *
582  * The caller should specify 'initial' as true if the returned JSON is
583  * going to be used as part of the initial reply to a "monitor2" request,
584  * false if it is going to be used as part of an "update2" notification.
585  *
586  * 'changed' must be a scratch buffer for internal use that is at least
587  * bitmap_n_bytes(mt->n_columns) bytes long. */
588 static struct json *
589 ovsdb_monitor_compose_row_update2(
590     const struct ovsdb_monitor_table *mt,
591     const struct ovsdb_monitor_row *row,
592     bool initial, unsigned long int *changed)
593 {
594     enum ovsdb_monitor_selection type;
595     struct json *row_update2, *diff_json;
596     size_t i;
597
598     type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
599     if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
600         return NULL;
601     }
602
603     row_update2 = json_object_create();
604     if (type == OJMS_DELETE) {
605         json_object_put(row_update2, "delete", json_null_create());
606     } else {
607         diff_json = json_object_create();
608         const char *op;
609
610         for (i = 0; i < mt->n_columns; i++) {
611             const struct ovsdb_monitor_column *c = &mt->columns[i];
612
613             if (!(type & c->select)) {
614                 /* We don't care about this type of change for this
615                  * particular column (but we will care about it for some
616                  * other column). */
617                 continue;
618             }
619
620             if (type == OJMS_MODIFY) {
621                 struct ovsdb_datum diff;
622
623                 if (!bitmap_is_set(changed, i)) {
624                     continue;
625                 }
626
627                 ovsdb_datum_diff(&diff ,&row->old[i], &row->new[i],
628                                         &c->column->type);
629                 json_object_put(diff_json, c->column->name,
630                                 ovsdb_datum_to_json(&diff, &c->column->type));
631                 ovsdb_datum_destroy(&diff, &c->column->type);
632             } else {
633                 if (!ovsdb_datum_is_default(&row->new[i], &c->column->type)) {
634                     json_object_put(diff_json, c->column->name,
635                                     ovsdb_datum_to_json(&row->new[i],
636                                                         &c->column->type));
637                 }
638             }
639         }
640
641         op = type == OJMS_INITIAL ? "initial"
642                                   : type == OJMS_MODIFY ? "modify" : "insert";
643         json_object_put(row_update2, op, diff_json);
644     }
645
646     return row_update2;
647 }
648
649 static size_t
650 ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
651 {
652     struct shash_node *node;
653     size_t max_columns = 0;
654
655     SHASH_FOR_EACH (node, &dbmon->tables) {
656         struct ovsdb_monitor_table *mt = node->data;
657
658         max_columns = MAX(max_columns, mt->n_columns);
659     }
660
661     return max_columns;
662 }
663
664 /* Constructs and returns JSON for a <table-updates> object (as described in
665  * RFC 7047) for all the outstanding changes within 'monitor', starting from
666  * 'transaction'.  */
667 static struct json*
668 ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
669                              bool initial, uint64_t transaction,
670                              compose_row_update_cb_func row_update)
671 {
672     struct shash_node *node;
673     struct json *json;
674     size_t max_columns = ovsdb_monitor_max_columns(dbmon);
675     unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
676
677     json = NULL;
678     SHASH_FOR_EACH (node, &dbmon->tables) {
679         struct ovsdb_monitor_table *mt = node->data;
680         struct ovsdb_monitor_row *row, *next;
681         struct ovsdb_monitor_changes *changes;
682         struct json *table_json = NULL;
683
684         changes = ovsdb_monitor_table_find_changes(mt, transaction);
685         if (!changes) {
686             continue;
687         }
688
689         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
690             struct json *row_json;
691
692             row_json = (*row_update)(mt, row, initial, changed);
693             if (row_json) {
694                 char uuid[UUID_LEN + 1];
695
696                 /* Create JSON object for transaction overall. */
697                 if (!json) {
698                     json = json_object_create();
699                 }
700
701                 /* Create JSON object for transaction on this table. */
702                 if (!table_json) {
703                     table_json = json_object_create();
704                     json_object_put(json, mt->table->schema->name, table_json);
705                 }
706
707                 /* Add JSON row to JSON table. */
708                 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
709                 json_object_put(table_json, uuid, row_json);
710             }
711         }
712     }
713     free(changed);
714
715     return json;
716 }
717
718 /* Returns JSON for a <table-updates> object (as described in RFC 7047)
719  * for all the outstanding changes within 'monitor' that starts from
720  * '*unflushed' transaction id.
721  *
722  * The caller should specify 'initial' as true if the returned JSON is going to
723  * be used as part of the initial reply to a "monitor" request, false if it is
724  * going to be used as part of an "update" notification. */
725 struct json *
726 ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon,
727                          bool initial, uint64_t *unflushed_,
728                          enum ovsdb_monitor_version version)
729 {
730     struct ovsdb_monitor_json_cache_node *cache_node;
731     struct shash_node *node;
732     struct json *json;
733     const uint64_t unflushed = *unflushed_;
734     const uint64_t next_unflushed = dbmon->n_transactions + 1;
735
736     /* Return a clone of cached json if one exists. Otherwise,
737      * generate a new one and add it to the cache.  */
738     cache_node = ovsdb_monitor_json_cache_search(dbmon, version, unflushed);
739     if (cache_node) {
740         json = cache_node->json ? json_clone(cache_node->json) : NULL;
741     } else {
742         if (version == OVSDB_MONITOR_V1) {
743             json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
744                                         ovsdb_monitor_compose_row_update);
745         } else {
746             ovs_assert(version == OVSDB_MONITOR_V2);
747             json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
748                                         ovsdb_monitor_compose_row_update2);
749         }
750         ovsdb_monitor_json_cache_insert(dbmon, version, unflushed, json);
751     }
752
753     /* Maintain transaction id of 'changes'. */
754     SHASH_FOR_EACH (node, &dbmon->tables) {
755         struct ovsdb_monitor_table *mt = node->data;
756
757         ovsdb_monitor_table_untrack_changes(mt, unflushed);
758         ovsdb_monitor_table_track_changes(mt, next_unflushed);
759     }
760     *unflushed_ = next_unflushed;
761
762     return json;
763 }
764
765 bool
766 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
767                           uint64_t next_transaction)
768 {
769     ovs_assert(next_transaction <= dbmon->n_transactions + 1);
770     return (next_transaction <= dbmon->n_transactions);
771 }
772
773 void
774 ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
775                                const struct ovsdb_table *table,
776                                enum ovsdb_monitor_selection select)
777 {
778     struct ovsdb_monitor_table * mt;
779
780     mt = shash_find_data(&dbmon->tables, table->schema->name);
781     mt->select |= select;
782 }
783
784  /*
785  * If a row's change type (insert, delete or modify) matches that of
786  * the monitor, they should be sent to the monitor's clients as updates.
787  * Of cause, the monitor should also internally update with this change.
788  *
789  * When a change type does not require client side update, the monitor
790  * may still need to keep track of certain changes in order to generate
791  * correct future updates.  For example, the monitor internal state should
792  * be updated whenever a new row is inserted, in order to generate the
793  * correct initial state, regardless if a insert change type is being
794  * monitored.
795  *
796  * On the other hand, if a transaction only contains changes to columns
797  * that are not monitored, this transaction can be safely ignored by the
798  * monitor.
799  *
800  * Thus, the order of the declaration is important:
801  * 'OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE' always implies
802  * 'OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE', but not vice versa.  */
803 enum ovsdb_monitor_changes_efficacy {
804     OVSDB_CHANGES_NO_EFFECT,                /* Monitor does not care about this
805                                                change.  */
806     OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE,  /* Monitor internal updates. */
807     OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE,  /* Client needs to be updated.  */
808 };
809
810 struct ovsdb_monitor_aux {
811     const struct ovsdb_monitor *monitor;
812     struct ovsdb_monitor_table *mt;
813     enum ovsdb_monitor_changes_efficacy efficacy;
814 };
815
816 static void
817 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
818                        const struct ovsdb_monitor *m)
819 {
820     aux->monitor = m;
821     aux->mt = NULL;
822     aux->efficacy = OVSDB_CHANGES_NO_EFFECT;
823 }
824
825 static void
826 ovsdb_monitor_changes_update(const struct ovsdb_row *old,
827                              const struct ovsdb_row *new,
828                              const struct ovsdb_monitor_table *mt,
829                              struct ovsdb_monitor_changes *changes)
830 {
831     const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
832     struct ovsdb_monitor_row *change;
833
834     change = ovsdb_monitor_changes_row_find(changes, uuid);
835     if (!change) {
836         change = xzalloc(sizeof *change);
837         hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
838         change->uuid = *uuid;
839         change->old = clone_monitor_row_data(mt, old);
840         change->new = clone_monitor_row_data(mt, new);
841     } else {
842         if (new) {
843             update_monitor_row_data(mt, new, change->new);
844         } else {
845             free_monitor_row_data(mt, change->new);
846             change->new = NULL;
847
848             if (!change->old) {
849                 /* This row was added then deleted.  Forget about it. */
850                 hmap_remove(&changes->rows, &change->hmap_node);
851                 free(change);
852             }
853         }
854     }
855 }
856
857 static bool
858 ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
859                               const unsigned long int *changed)
860 {
861     size_t i;
862
863     for (i = 0; i < mt->n_columns; i++) {
864         size_t column_index = mt->columns[i].column->index;
865
866         if (bitmap_is_set(changed, column_index)) {
867             return true;
868         }
869     }
870
871     return false;
872 }
873
874 /* Return the efficacy of a row's change to a monitor table.
875  *
876  * Please see the block comment above 'ovsdb_monitor_changes_efficacy'
877  * definition form more information.  */
878 static enum ovsdb_monitor_changes_efficacy
879 ovsdb_monitor_changes_classify(enum ovsdb_monitor_selection type,
880                                const struct ovsdb_monitor_table *mt,
881                                const unsigned long int *changed)
882 {
883     if (type == OJMS_MODIFY &&
884         !ovsdb_monitor_columns_changed(mt, changed)) {
885         return OVSDB_CHANGES_NO_EFFECT;
886     }
887
888     return (mt->select & type)
889                 ?  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
890                 :  OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
891 }
892
893 static bool
894 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
895                         const struct ovsdb_row *new,
896                         const unsigned long int *changed,
897                         void *aux_)
898 {
899     struct ovsdb_monitor_aux *aux = aux_;
900     const struct ovsdb_monitor *m = aux->monitor;
901     struct ovsdb_table *table = new ? new->table : old->table;
902     struct ovsdb_monitor_table *mt;
903     struct ovsdb_monitor_changes *changes;
904
905     if (!aux->mt || table != aux->mt->table) {
906         aux->mt = shash_find_data(&m->tables, table->schema->name);
907         if (!aux->mt) {
908             /* We don't care about rows in this table at all.  Tell the caller
909              * to skip it.  */
910             return false;
911         }
912     }
913     mt = aux->mt;
914
915     HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
916         enum ovsdb_monitor_changes_efficacy efficacy;
917         enum ovsdb_monitor_selection type;
918
919         type = ovsdb_monitor_row_update_type(false, old, new);
920         efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
921         if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
922             ovsdb_monitor_changes_update(old, new, mt, changes);
923         }
924
925         if (aux->efficacy < efficacy) {
926             aux->efficacy = efficacy;
927         }
928     }
929
930     return true;
931 }
932
933 void
934 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
935 {
936     struct ovsdb_monitor_aux aux;
937     struct shash_node *node;
938
939     ovsdb_monitor_init_aux(&aux, dbmon);
940     SHASH_FOR_EACH (node, &dbmon->tables) {
941         struct ovsdb_monitor_table *mt = node->data;
942
943         if (mt->select & OJMS_INITIAL) {
944             struct ovsdb_row *row;
945             struct ovsdb_monitor_changes *changes;
946
947             changes = ovsdb_monitor_table_find_changes(mt, 0);
948             if (!changes) {
949                 changes = ovsdb_monitor_table_add_changes(mt, 0);
950                 HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
951                     ovsdb_monitor_changes_update(NULL, row, mt, changes);
952                 }
953             } else {
954                 changes->n_refs++;
955             }
956         }
957     }
958 }
959
960 void
961 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
962                    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
963 {
964     struct jsonrpc_monitor_node *jm;
965
966     if (list_is_empty(&dbmon->jsonrpc_monitors)) {
967         ovsdb_monitor_destroy(dbmon);
968         return;
969     }
970
971     /* Find and remove the jsonrpc monitor from the list.  */
972     LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
973         if (jm->jsonrpc_monitor == jsonrpc_monitor) {
974             list_remove(&jm->node);
975             free(jm);
976
977             /* Destroy ovsdb monitor if this is the last user.  */
978             if (list_is_empty(&dbmon->jsonrpc_monitors)) {
979                 ovsdb_monitor_destroy(dbmon);
980             }
981
982             return;
983         };
984     }
985
986     /* Should never reach here. jsonrpc_monitor should be on the list.  */
987     OVS_NOT_REACHED();
988 }
989
990 static bool
991 ovsdb_monitor_table_equal(const struct ovsdb_monitor_table *a,
992                           const struct ovsdb_monitor_table *b)
993 {
994     size_t i;
995
996     if ((a->table != b->table) ||
997         (a->select != b->select) ||
998         (a->n_columns != b->n_columns)) {
999         return false;
1000     }
1001
1002     for (i = 0; i < a->n_columns; i++) {
1003         if ((a->columns[i].column != b->columns[i].column) ||
1004             (a->columns[i].select != b->columns[i].select)) {
1005             return false;
1006         }
1007     }
1008
1009     return true;
1010 }
1011
1012 static bool
1013 ovsdb_monitor_equal(const struct ovsdb_monitor *a,
1014                     const struct ovsdb_monitor *b)
1015 {
1016     struct shash_node *node;
1017
1018     if (shash_count(&a->tables) != shash_count(&b->tables)) {
1019         return false;
1020     }
1021
1022     SHASH_FOR_EACH(node, &a->tables) {
1023         const struct ovsdb_monitor_table *mta = node->data;
1024         const struct ovsdb_monitor_table *mtb;
1025
1026         mtb = shash_find_data(&b->tables, node->name);
1027         if (!mtb) {
1028             return false;
1029         }
1030
1031         if (!ovsdb_monitor_table_equal(mta, mtb)) {
1032             return false;
1033         }
1034     }
1035
1036     return true;
1037 }
1038
1039 static size_t
1040 ovsdb_monitor_hash(const struct ovsdb_monitor *dbmon, size_t basis)
1041 {
1042     const struct shash_node **nodes;
1043     size_t i, j, n;
1044
1045     nodes = shash_sort(&dbmon->tables);
1046     n = shash_count(&dbmon->tables);
1047
1048     for (i = 0; i < n; i++) {
1049         struct ovsdb_monitor_table *mt = nodes[i]->data;
1050
1051         basis = hash_pointer(mt->table, basis);
1052         basis = hash_3words(mt->select, mt->n_columns, basis);
1053
1054         for (j = 0; j < mt->n_columns; j++) {
1055             basis = hash_pointer(mt->columns[j].column, basis);
1056             basis = hash_2words(mt->columns[j].select, basis);
1057         }
1058     }
1059     free(nodes);
1060
1061     return basis;
1062 }
1063
1064 struct ovsdb_monitor *
1065 ovsdb_monitor_add(struct ovsdb_monitor *new_dbmon)
1066 {
1067     struct ovsdb_monitor *dbmon;
1068     size_t hash;
1069
1070     /* New_dbmon should be associated with only one jsonrpc
1071      * connections.  */
1072     ovs_assert(list_is_singleton(&new_dbmon->jsonrpc_monitors));
1073
1074     hash = ovsdb_monitor_hash(new_dbmon, 0);
1075     HMAP_FOR_EACH_WITH_HASH(dbmon, hmap_node, hash, &ovsdb_monitors) {
1076         if (ovsdb_monitor_equal(dbmon,  new_dbmon)) {
1077             return dbmon;
1078         }
1079     }
1080
1081     hmap_insert(&ovsdb_monitors, &new_dbmon->hmap_node, hash);
1082     return new_dbmon;
1083 }
1084
1085 static void
1086 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
1087 {
1088     struct shash_node *node;
1089
1090     list_remove(&dbmon->replica.node);
1091
1092     if (!hmap_node_is_null(&dbmon->hmap_node)) {
1093         hmap_remove(&ovsdb_monitors, &dbmon->hmap_node);
1094     }
1095
1096     ovsdb_monitor_json_cache_flush(dbmon);
1097     hmap_destroy(&dbmon->json_cache);
1098
1099     SHASH_FOR_EACH (node, &dbmon->tables) {
1100         struct ovsdb_monitor_table *mt = node->data;
1101         struct ovsdb_monitor_changes *changes, *next;
1102
1103         HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
1104             hmap_remove(&mt->changes, &changes->hmap_node);
1105             ovsdb_monitor_changes_destroy(changes);
1106         }
1107         hmap_destroy(&mt->changes);
1108         free(mt->columns);
1109         free(mt);
1110     }
1111     shash_destroy(&dbmon->tables);
1112     free(dbmon);
1113 }
1114
1115 static struct ovsdb_error *
1116 ovsdb_monitor_commit(struct ovsdb_replica *replica,
1117                      const struct ovsdb_txn *txn,
1118                      bool durable OVS_UNUSED)
1119 {
1120     struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
1121     struct ovsdb_monitor_aux aux;
1122
1123     ovsdb_monitor_init_aux(&aux, m);
1124     /* Update ovsdb_monitor's transaction number for
1125      * each transaction, before calling ovsdb_monitor_change_cb().  */
1126     m->n_transactions++;
1127     ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
1128
1129     switch(aux.efficacy) {
1130     case OVSDB_CHANGES_NO_EFFECT:
1131         /* The transaction is ignored by the monitor.
1132          * Roll back the 'n_transactions' as if the transaction
1133          * has never happened. */
1134         m->n_transactions--;
1135         break;
1136     case OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE:
1137         /* Nothing.  */
1138         break;
1139     case  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE:
1140         ovsdb_monitor_json_cache_flush(m);
1141         break;
1142     }
1143
1144     return NULL;
1145 }
1146
1147 static void
1148 ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
1149 {
1150     struct ovsdb_monitor *dbmon = ovsdb_monitor_cast(replica);
1151     struct jsonrpc_monitor_node *jm, *next;
1152
1153     /* Delete all front end monitors. Removing the last front
1154      * end monitor will also destroy the corresponding 'ovsdb_monitor'.
1155      * ovsdb monitor will also be destroied.  */
1156     LIST_FOR_EACH_SAFE(jm, next, node, &dbmon->jsonrpc_monitors) {
1157         ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor);
1158     }
1159 }
1160
1161 /* Add some memory usage statics for monitors into 'usage', for use with
1162  * memory_report().  */
1163 void
1164 ovsdb_monitor_get_memory_usage(struct simap *usage)
1165 {
1166     simap_put(usage, "monitors", hmap_count(&ovsdb_monitors));
1167 }
1168
1169 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1170     ovsdb_monitor_commit,
1171     ovsdb_monitor_destroy_callback,
1172 };