ovsdb: generate update notifications for monitor_cond session
[cascardo/ovs.git] / ovsdb / monitor.c
1 /*
2  * Copyright (c) 2015 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include <errno.h>
20
21 #include "bitmap.h"
22 #include "column.h"
23 #include "openvswitch/dynamic-string.h"
24 #include "json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
28 #include "ovsdb.h"
29 #include "row.h"
30 #include "condition.h"
31 #include "simap.h"
32 #include "hash.h"
33 #include "table.h"
34 #include "hash.h"
35 #include "timeval.h"
36 #include "transaction.h"
37 #include "jsonrpc-server.h"
38 #include "monitor.h"
39 #include "openvswitch/vlog.h"
40
41 VLOG_DEFINE_THIS_MODULE(ovsdb_monitor);
42
43 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
44 static struct hmap ovsdb_monitors = HMAP_INITIALIZER(&ovsdb_monitors);
45
46 /* Keep state of session's conditions */
47 struct ovsdb_monitor_session_condition {
48     bool conditional;
49     size_t n_true_cnd;
50     struct shash tables;     /* Contains
51                               *   "struct ovsdb_monitor_table_condition *"s. */
52 };
53
54 /* Monitored table session's conditions */
55 struct ovsdb_monitor_table_condition {
56     const struct ovsdb_table *table;
57     struct ovsdb_monitor_table *mt;
58     struct ovsdb_condition old_condition;
59     struct ovsdb_condition new_condition;
60 };
61
62 /*  Backend monitor.
63  *
64  *  ovsdb_monitor keep track of the ovsdb changes.
65  */
66
67 /* A collection of tables being monitored. */
68 struct ovsdb_monitor {
69     struct ovsdb_replica replica;
70     struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
71     struct ovs_list jsonrpc_monitors;  /* Contains "jsonrpc_monitor_node"s. */
72     struct ovsdb *db;
73     uint64_t n_transactions;      /* Count number of committed transactions. */
74     struct hmap_node hmap_node;   /* Elements within ovsdb_monitors.  */
75     struct hmap json_cache;       /* Contains "ovsdb_monitor_json_cache_node"s.*/
76 };
77
78 /* A json object of updates between 'from_txn' and 'dbmon->n_transactions'
79  * inclusive.  */
80 struct ovsdb_monitor_json_cache_node {
81     struct hmap_node hmap_node;   /* Elements in json cache. */
82     enum ovsdb_monitor_version version;
83     uint64_t from_txn;
84     struct json *json;            /* Null, or a cloned of json */
85 };
86
87 struct jsonrpc_monitor_node {
88     struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
89     struct ovs_list node;
90 };
91
92 /* A particular column being monitored. */
93 struct ovsdb_monitor_column {
94     const struct ovsdb_column *column;
95     enum ovsdb_monitor_selection select;
96     bool monitored;
97 };
98
99 /* A row that has changed in a monitored table. */
100 struct ovsdb_monitor_row {
101     struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
102     struct uuid uuid;           /* UUID of row that changed. */
103     struct ovsdb_datum *old;    /* Old data, NULL for an inserted row. */
104     struct ovsdb_datum *new;    /* New data, NULL for a deleted row. */
105 };
106
107 /* Contains 'struct ovsdb_monitor_row's for rows that have been
108  * updated but not yet flushed to all the jsonrpc connection.
109  *
110  * 'n_refs' represent the number of jsonrpc connections that have
111  * not received updates. Generate the update for the last jsonprc
112  * connection will also destroy the whole "struct ovsdb_monitor_changes"
113  * object.
114  *
115  * 'transaction' stores the first update's transaction id.
116  * */
117 struct ovsdb_monitor_changes {
118     struct ovsdb_monitor_table *mt;
119     struct hmap rows;
120     int n_refs;
121     uint64_t transaction;
122     struct hmap_node hmap_node;  /* Element in ovsdb_monitor_tables' changes
123                                     hmap.  */
124 };
125
126 /* A particular table being monitored. */
127 struct ovsdb_monitor_table {
128     const struct ovsdb_table *table;
129
130     /* This is the union (bitwise-OR) of the 'select' values in all of the
131      * members of 'columns' below. */
132     enum ovsdb_monitor_selection select;
133
134     /* Columns being monitored. */
135     struct ovsdb_monitor_column *columns;
136     size_t n_columns;
137     size_t n_monitored_columns;
138     size_t allocated_columns;
139
140     /* Columns in ovsdb_monitor_row have different indexes then in
141      * ovsdb_row. This field maps between column->index to the index in the
142      * ovsdb_monitor_row. It is used for condition evaluation. */
143     unsigned int *columns_index_map;
144
145     /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
146     struct hmap changes;
147 };
148
149 typedef struct json *
150 (*compose_row_update_cb_func)
151     (const struct ovsdb_monitor_table *mt,
152      const struct ovsdb_monitor_session_condition * condition,
153      const struct ovsdb_monitor_row *row,
154      bool initial, unsigned long int *changed);
155
156 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
157 static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
158     struct ovsdb_monitor_table *mt, uint64_t next_txn);
159 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
160     struct ovsdb_monitor_table *mt, uint64_t unflushed);
161 static void ovsdb_monitor_changes_destroy(
162                                   struct ovsdb_monitor_changes *changes);
163 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
164                                   uint64_t unflushed);
165
166 static uint32_t
167 json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn)
168 {
169     uint32_t hash;
170
171     hash = hash_uint64(version);
172     hash = hash_uint64_basis(from_txn, hash);
173
174     return hash;
175 }
176
177 static struct ovsdb_monitor_json_cache_node *
178 ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
179                                 enum ovsdb_monitor_version version,
180                                 uint64_t from_txn)
181 {
182     struct ovsdb_monitor_json_cache_node *node;
183     uint32_t hash = json_cache_hash(version, from_txn);
184
185     HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) {
186         if (node->from_txn == from_txn && node->version == version) {
187             return node;
188         }
189     }
190
191     return NULL;
192 }
193
194 static void
195 ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon,
196                                 enum ovsdb_monitor_version version,
197                                 uint64_t from_txn, struct json *json)
198 {
199     struct ovsdb_monitor_json_cache_node *node;
200     uint32_t hash = json_cache_hash(version, from_txn);
201
202     node = xmalloc(sizeof *node);
203
204     node->version = version;
205     node->from_txn = from_txn;
206     node->json = json ? json_clone(json) : NULL;
207
208     hmap_insert(&dbmon->json_cache, &node->hmap_node, hash);
209 }
210
211 static void
212 ovsdb_monitor_json_cache_flush(struct ovsdb_monitor *dbmon)
213 {
214     struct ovsdb_monitor_json_cache_node *node;
215
216     HMAP_FOR_EACH_POP(node, hmap_node, &dbmon->json_cache) {
217         json_destroy(node->json);
218         free(node);
219     }
220 }
221
222 static int
223 compare_ovsdb_monitor_column(const void *a_, const void *b_)
224 {
225     const struct ovsdb_monitor_column *a = a_;
226     const struct ovsdb_monitor_column *b = b_;
227
228     /* put all monitored columns at the begining */
229     if (a->monitored != b->monitored) {
230         return a->monitored ? -1 : 1;
231     }
232
233     return a->column < b->column ? -1 : a->column > b->column;
234 }
235
236 static struct ovsdb_monitor *
237 ovsdb_monitor_cast(struct ovsdb_replica *replica)
238 {
239     ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
240     return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
241 }
242
243 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
244  * given 'uuid', or NULL if there is no such row. */
245 static struct ovsdb_monitor_row *
246 ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
247                                const struct uuid *uuid)
248 {
249     struct ovsdb_monitor_row *row;
250
251     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
252                              &changes->rows) {
253         if (uuid_equals(uuid, &row->uuid)) {
254             return row;
255         }
256     }
257     return NULL;
258 }
259
260 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
261  * copies of the data in 'row' drawn from the columns represented by
262  * mt->columns[].  Returns the array.
263  *
264  * If 'row' is NULL, returns NULL. */
265 static struct ovsdb_datum *
266 clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
267                        const struct ovsdb_row *row)
268 {
269     struct ovsdb_datum *data;
270     size_t i;
271
272     if (!row) {
273         return NULL;
274     }
275
276     data = xmalloc(mt->n_columns * sizeof *data);
277     for (i = 0; i < mt->n_columns; i++) {
278         const struct ovsdb_column *c = mt->columns[i].column;
279         const struct ovsdb_datum *src = &row->fields[c->index];
280         struct ovsdb_datum *dst = &data[i];
281         const struct ovsdb_type *type = &c->type;
282
283         ovsdb_datum_clone(dst, src, type);
284     }
285     return data;
286 }
287
288 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
289  * in 'row' drawn from the columns represented by mt->columns[]. */
290 static void
291 update_monitor_row_data(const struct ovsdb_monitor_table *mt,
292                         const struct ovsdb_row *row,
293                         struct ovsdb_datum *data)
294 {
295     size_t i;
296
297     for (i = 0; i < mt->n_columns; i++) {
298         const struct ovsdb_column *c = mt->columns[i].column;
299         const struct ovsdb_datum *src = &row->fields[c->index];
300         struct ovsdb_datum *dst = &data[i];
301         const struct ovsdb_type *type = &c->type;
302
303         if (!ovsdb_datum_equals(src, dst, type)) {
304             ovsdb_datum_destroy(dst, type);
305             ovsdb_datum_clone(dst, src, type);
306         }
307     }
308 }
309
310 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
311  * from mt->columns[], plus 'data' itself. */
312 static void
313 free_monitor_row_data(const struct ovsdb_monitor_table *mt,
314                       struct ovsdb_datum *data)
315 {
316     if (data) {
317         size_t i;
318
319         for (i = 0; i < mt->n_columns; i++) {
320             const struct ovsdb_column *c = mt->columns[i].column;
321
322             ovsdb_datum_destroy(&data[i], &c->type);
323         }
324         free(data);
325     }
326 }
327
328 /* Frees 'row', which must have been created from 'mt'. */
329 static void
330 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
331                           struct ovsdb_monitor_row *row)
332 {
333     if (row) {
334         free_monitor_row_data(mt, row->old);
335         free_monitor_row_data(mt, row->new);
336         free(row);
337     }
338 }
339
340 static void
341 ovsdb_monitor_columns_sort(struct ovsdb_monitor *dbmon)
342 {
343     int i;
344     struct shash_node *node;
345
346     SHASH_FOR_EACH (node, &dbmon->tables) {
347         struct ovsdb_monitor_table *mt = node->data;
348
349         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
350               compare_ovsdb_monitor_column);
351         for (i = 0; i < mt->n_columns; i++) {
352             /* re-set index map due to sort */
353             mt->columns_index_map[mt->columns[i].column->index] = i;
354         }
355     }
356 }
357
358 void
359 ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
360                                   struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
361 {
362     struct jsonrpc_monitor_node *jm;
363
364     jm = xzalloc(sizeof *jm);
365     jm->jsonrpc_monitor = jsonrpc_monitor;
366     ovs_list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
367 }
368
369 struct ovsdb_monitor *
370 ovsdb_monitor_create(struct ovsdb *db,
371                      struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
372 {
373     struct ovsdb_monitor *dbmon;
374
375     dbmon = xzalloc(sizeof *dbmon);
376
377     ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
378     ovsdb_add_replica(db, &dbmon->replica);
379     ovs_list_init(&dbmon->jsonrpc_monitors);
380     dbmon->db = db;
381     dbmon->n_transactions = 0;
382     shash_init(&dbmon->tables);
383     hmap_node_nullify(&dbmon->hmap_node);
384     hmap_init(&dbmon->json_cache);
385
386     ovsdb_monitor_add_jsonrpc_monitor(dbmon, jsonrpc_monitor);
387     return dbmon;
388 }
389
390 void
391 ovsdb_monitor_add_table(struct ovsdb_monitor *m,
392                         const struct ovsdb_table *table)
393 {
394     struct ovsdb_monitor_table *mt;
395     int i;
396     size_t n_columns = shash_count(&table->schema->columns);
397
398     mt = xzalloc(sizeof *mt);
399     mt->table = table;
400     shash_add(&m->tables, table->schema->name, mt);
401     hmap_init(&mt->changes);
402     mt->columns_index_map =
403         xmalloc(sizeof *mt->columns_index_map * n_columns);
404     for (i = 0; i < n_columns; i++) {
405         mt->columns_index_map[i] = -1;
406     }
407 }
408
409 const char *
410 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
411                          const struct ovsdb_table *table,
412                          const struct ovsdb_column *column,
413                          enum ovsdb_monitor_selection select,
414                          bool monitored)
415 {
416     struct ovsdb_monitor_table *mt;
417     struct ovsdb_monitor_column *c;
418
419     mt = shash_find_data(&dbmon->tables, table->schema->name);
420
421     /* Check for column duplication. Return duplicated column name. */
422     if (mt->columns_index_map[column->index] != -1) {
423         return column->name;
424     }
425
426     if (mt->n_columns >= mt->allocated_columns) {
427         mt->columns = x2nrealloc(mt->columns, &mt->allocated_columns,
428                                  sizeof *mt->columns);
429     }
430
431     mt->select |= select;
432     mt->columns_index_map[column->index] = mt->n_columns;
433     c = &mt->columns[mt->n_columns++];
434     c->column = column;
435     c->select = select;
436     c->monitored = monitored;
437     if (monitored) {
438         mt->n_monitored_columns++;
439     }
440
441     return NULL;
442 }
443
444 static void
445 ovsdb_monitor_condition_add_columns(struct ovsdb_monitor *dbmon,
446                                     const struct ovsdb_table *table,
447                                     struct ovsdb_condition *condition)
448 {
449     size_t n_columns;
450     int i;
451     const struct ovsdb_column **columns =
452         ovsdb_condition_get_columns(condition, &n_columns);
453
454     for (i = 0; i < n_columns; i++) {
455         ovsdb_monitor_add_column(dbmon, table, columns[i],
456                                  OJMS_NONE, false);
457     }
458
459     free(columns);
460 }
461
462 /* Bind this session's condition to ovsdb_monitor */
463 void
464 ovsdb_monitor_condition_bind(struct ovsdb_monitor *dbmon,
465                           struct ovsdb_monitor_session_condition *cond)
466 {
467     struct shash_node *node;
468
469     SHASH_FOR_EACH(node, &cond->tables) {
470         struct ovsdb_monitor_table_condition *mtc = node->data;
471         struct ovsdb_monitor_table *mt =
472             shash_find_data(&dbmon->tables, mtc->table->schema->name);
473
474         mtc->mt = mt;
475         ovsdb_monitor_condition_add_columns(dbmon, mtc->table,
476                                             &mtc->new_condition);
477     }
478 }
479
480 static struct ovsdb_monitor_changes *
481 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
482                                 uint64_t next_txn)
483 {
484     struct ovsdb_monitor_changes *changes;
485
486     changes = xzalloc(sizeof *changes);
487
488     changes->transaction = next_txn;
489     changes->mt = mt;
490     changes->n_refs = 1;
491     hmap_init(&changes->rows);
492     hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
493
494     return changes;
495 };
496
497 static struct ovsdb_monitor_changes *
498 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
499                                  uint64_t transaction)
500 {
501     struct ovsdb_monitor_changes *changes;
502     size_t hash = hash_uint64(transaction);
503
504     HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
505         if (changes->transaction == transaction) {
506             return changes;
507         }
508     }
509
510     return NULL;
511 }
512
513 /* Stop currently tracking changes to table 'mt' since 'transaction'. */
514 static void
515 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
516                                     uint64_t transaction)
517 {
518     struct ovsdb_monitor_changes *changes =
519                 ovsdb_monitor_table_find_changes(mt, transaction);
520     if (changes) {
521         if (--changes->n_refs == 0) {
522             hmap_remove(&mt->changes, &changes->hmap_node);
523             ovsdb_monitor_changes_destroy(changes);
524         }
525     }
526 }
527
528 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
529  */
530 static void
531 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
532                                   uint64_t transaction)
533 {
534     struct ovsdb_monitor_changes *changes;
535
536     changes = ovsdb_monitor_table_find_changes(mt, transaction);
537     if (changes) {
538         changes->n_refs++;
539     } else {
540         ovsdb_monitor_table_add_changes(mt, transaction);
541     }
542 }
543
544 static void
545 ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
546 {
547     struct ovsdb_monitor_row *row, *next;
548
549     HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
550         hmap_remove(&changes->rows, &row->hmap_node);
551         ovsdb_monitor_row_destroy(changes->mt, row);
552     }
553     hmap_destroy(&changes->rows);
554     free(changes);
555 }
556
557 static enum ovsdb_monitor_selection
558 ovsdb_monitor_row_update_type(bool initial, const bool old, const bool new)
559 {
560     return initial ? OJMS_INITIAL
561             : !old ? OJMS_INSERT
562             : !new ? OJMS_DELETE
563             : OJMS_MODIFY;
564 }
565
566 /* Set conditional monitoring mode only if we have non-empty condition in one
567  * of the tables at least */
568 static inline void
569 ovsdb_monitor_session_condition_set_mode(
570                                   struct ovsdb_monitor_session_condition *cond)
571 {
572     cond->conditional = shash_count(&cond->tables) !=
573         cond->n_true_cnd;
574 }
575
576 /* Returnes an empty allocated session's condition state holder */
577 struct ovsdb_monitor_session_condition *
578 ovsdb_monitor_session_condition_create(void)
579 {
580     struct ovsdb_monitor_session_condition *condition =
581         xzalloc(sizeof *condition);
582
583     condition->conditional = false;
584     shash_init(&condition->tables);
585     return condition;
586 }
587
588 void
589 ovsdb_monitor_session_condition_destroy(
590                            struct ovsdb_monitor_session_condition *condition)
591 {
592     struct shash_node *node, *next;
593
594     if (!condition) {
595         return;
596     }
597
598     SHASH_FOR_EACH_SAFE (node, next, &condition->tables) {
599         struct ovsdb_monitor_table_condition *mtc = node->data;
600
601         ovsdb_condition_destroy(&mtc->new_condition);
602         ovsdb_condition_destroy(&mtc->old_condition);
603         shash_delete(&condition->tables, node);
604         free(mtc);
605     }
606     free(condition);
607 }
608
609 struct ovsdb_error *
610 ovsdb_monitor_table_condition_create(
611                          struct ovsdb_monitor_session_condition *condition,
612                          const struct ovsdb_table *table,
613                          const struct json *json_cnd)
614 {
615     struct ovsdb_monitor_table_condition *mtc;
616     struct ovsdb_error *error;
617
618     mtc = xzalloc(sizeof *mtc);
619     mtc->table = table;
620     ovsdb_condition_init(&mtc->old_condition);
621     ovsdb_condition_init(&mtc->new_condition);
622
623     if (json_cnd) {
624         error = ovsdb_condition_from_json(table->schema,
625                                           json_cnd,
626                                           NULL,
627                                           &mtc->old_condition);
628         if (error) {
629             free(mtc);
630             return error;
631         }
632     }
633
634     shash_add(&condition->tables, table->schema->name, mtc);
635     /* On session startup old == new condition */
636     ovsdb_condition_clone(&mtc->new_condition, &mtc->old_condition);
637     if (ovsdb_condition_is_true(&mtc->old_condition)) {
638         condition->n_true_cnd++;
639         ovsdb_monitor_session_condition_set_mode(condition);
640     }
641
642     return NULL;
643 }
644
645 static bool
646 ovsdb_monitor_get_table_conditions(
647                       const struct ovsdb_monitor_table *mt,
648                       const struct ovsdb_monitor_session_condition *condition,
649                       struct ovsdb_condition **old_condition,
650                       struct ovsdb_condition **new_condition)
651 {
652     if (!condition) {
653         return false;
654     }
655
656     struct ovsdb_monitor_table_condition *mtc =
657         shash_find_data(&condition->tables, mt->table->schema->name);
658
659     if (!mtc) {
660         return false;
661     }
662     *old_condition = &mtc->old_condition;
663     *new_condition = &mtc->new_condition;
664
665     return true;
666 }
667
668 static enum ovsdb_monitor_selection
669 ovsdb_monitor_row_update_type_condition(
670                       const struct ovsdb_monitor_table *mt,
671                       const struct ovsdb_monitor_session_condition *condition,
672                       bool initial,
673                       const struct ovsdb_datum *old,
674                       const struct ovsdb_datum *new)
675 {
676     struct ovsdb_condition *old_condition, *new_condition;
677     enum ovsdb_monitor_selection type =
678         ovsdb_monitor_row_update_type(initial, old, new);
679
680     if (ovsdb_monitor_get_table_conditions(mt,
681                                            condition,
682                                            &old_condition,
683                                            &new_condition)) {
684         bool old_cond = !old ? false
685             : ovsdb_condition_empty_or_match_any(old,
686                                                  old_condition,
687                                                  mt->columns_index_map);
688         bool new_cond = !new ? false
689             : ovsdb_condition_empty_or_match_any(new,
690                                                  new_condition,
691                                                  mt->columns_index_map);
692
693         if (!old_cond && !new_cond) {
694             type = OJMS_NONE;
695         }
696
697         switch (type) {
698         case OJMS_INITIAL:
699         case OJMS_INSERT:
700             if (!new_cond) {
701                 type = OJMS_NONE;
702             }
703             break;
704         case OJMS_MODIFY:
705             type = !old_cond ? OJMS_INSERT : !new_cond
706                 ? OJMS_DELETE : OJMS_MODIFY;
707             break;
708         case OJMS_DELETE:
709             if (!old_cond) {
710                 type = OJMS_NONE;
711             }
712             break;
713         case OJMS_NONE:
714             break;
715         }
716     }
717     return type;
718 }
719
720 static bool
721 ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
722                               const struct ovsdb_monitor_row *row,
723                               enum ovsdb_monitor_selection type,
724                               unsigned long int *changed)
725 {
726     if (!(mt->select & type)) {
727         return true;
728     }
729
730     if (type == OJMS_MODIFY) {
731         size_t i, n_changes;
732
733         n_changes = 0;
734         memset(changed, 0, bitmap_n_bytes(mt->n_columns));
735         for (i = 0; i < mt->n_columns; i++) {
736             const struct ovsdb_column *c = mt->columns[i].column;
737             if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
738                 bitmap_set1(changed, i);
739                 n_changes++;
740             }
741         }
742         if (!n_changes) {
743             /* No actual changes: presumably a row changed and then
744              * changed back later. */
745             return true;
746         }
747     }
748
749     return false;
750 }
751
752 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
753  * 'mt', or NULL if no row update should be sent.
754  *
755  * The caller should specify 'initial' as true if the returned JSON is going to
756  * be used as part of the initial reply to a "monitor" request, false if it is
757  * going to be used as part of an "update" notification.
758  *
759  * 'changed' must be a scratch buffer for internal use that is at least
760  * bitmap_n_bytes(mt->n_columns) bytes long. */
761 static struct json *
762 ovsdb_monitor_compose_row_update(
763     const struct ovsdb_monitor_table *mt,
764     const struct ovsdb_monitor_session_condition *condition OVS_UNUSED,
765     const struct ovsdb_monitor_row *row,
766     bool initial, unsigned long int *changed)
767 {
768     enum ovsdb_monitor_selection type;
769     struct json *old_json, *new_json;
770     struct json *row_json;
771     size_t i;
772
773     type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
774     if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
775         return NULL;
776     }
777
778     row_json = json_object_create();
779     old_json = new_json = NULL;
780     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
781         old_json = json_object_create();
782         json_object_put(row_json, "old", old_json);
783     }
784     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
785         new_json = json_object_create();
786         json_object_put(row_json, "new", new_json);
787     }
788     for (i = 0; i < mt->n_monitored_columns; i++) {
789         const struct ovsdb_monitor_column *c = &mt->columns[i];
790
791         if (!c->monitored || !(type & c->select))  {
792             /* We don't care about this type of change for this
793              * particular column (but we will care about it for some
794              * other column). */
795             continue;
796         }
797
798         if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
799             || type == OJMS_DELETE) {
800             json_object_put(old_json, c->column->name,
801                             ovsdb_datum_to_json(&row->old[i],
802                                                 &c->column->type));
803         }
804         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
805             json_object_put(new_json, c->column->name,
806                             ovsdb_datum_to_json(&row->new[i],
807                                                 &c->column->type));
808         }
809     }
810
811     return row_json;
812 }
813
814 /* Returns JSON for a <row-update2> (as described in ovsdb-server(1) mapage)
815  * for 'row' within * 'mt', or NULL if no row update should be sent.
816  *
817  * The caller should specify 'initial' as true if the returned JSON is
818  * going to be used as part of the initial reply to a "monitor2" request,
819  * false if it is going to be used as part of an "update2" notification.
820  *
821  * 'changed' must be a scratch buffer for internal use that is at least
822  * bitmap_n_bytes(mt->n_columns) bytes long. */
823 static struct json *
824 ovsdb_monitor_compose_row_update2(
825     const struct ovsdb_monitor_table *mt,
826     const struct ovsdb_monitor_session_condition *condition,
827     const struct ovsdb_monitor_row *row,
828     bool initial, unsigned long int *changed)
829 {
830     enum ovsdb_monitor_selection type;
831     struct json *row_update2, *diff_json;
832     size_t i;
833
834     type = ovsdb_monitor_row_update_type_condition(mt, condition, initial,
835                                                    row->old, row->new);
836     if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
837         return NULL;
838     }
839
840     row_update2 = json_object_create();
841     if (type == OJMS_DELETE) {
842         json_object_put(row_update2, "delete", json_null_create());
843     } else {
844         diff_json = json_object_create();
845         const char *op;
846
847         for (i = 0; i < mt->n_monitored_columns; i++) {
848             const struct ovsdb_monitor_column *c = &mt->columns[i];
849
850             if (!c->monitored || !(type & c->select))  {
851                 /* We don't care about this type of change for this
852                  * particular column (but we will care about it for some
853                  * other column). */
854                 continue;
855             }
856
857             if (type == OJMS_MODIFY) {
858                 struct ovsdb_datum diff;
859
860                 if (!bitmap_is_set(changed, i)) {
861                     continue;
862                 }
863
864                 ovsdb_datum_diff(&diff ,&row->old[i], &row->new[i],
865                                         &c->column->type);
866                 json_object_put(diff_json, c->column->name,
867                                 ovsdb_datum_to_json(&diff, &c->column->type));
868                 ovsdb_datum_destroy(&diff, &c->column->type);
869             } else {
870                 if (!ovsdb_datum_is_default(&row->new[i], &c->column->type)) {
871                     json_object_put(diff_json, c->column->name,
872                                     ovsdb_datum_to_json(&row->new[i],
873                                                         &c->column->type));
874                 }
875             }
876         }
877
878         op = type == OJMS_INITIAL ? "initial"
879                                   : type == OJMS_MODIFY ? "modify" : "insert";
880         json_object_put(row_update2, op, diff_json);
881     }
882
883     return row_update2;
884 }
885
886 static size_t
887 ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
888 {
889     struct shash_node *node;
890     size_t max_columns = 0;
891
892     SHASH_FOR_EACH (node, &dbmon->tables) {
893         struct ovsdb_monitor_table *mt = node->data;
894
895         max_columns = MAX(max_columns, mt->n_columns);
896     }
897
898     return max_columns;
899 }
900
901 /* Constructs and returns JSON for a <table-updates> object (as described in
902  * RFC 7047) for all the outstanding changes within 'monitor', starting from
903  * 'transaction'.  */
904 static struct json*
905 ovsdb_monitor_compose_update(
906                       struct ovsdb_monitor *dbmon,
907                       bool initial, uint64_t transaction,
908                       const struct ovsdb_monitor_session_condition *condition,
909                       compose_row_update_cb_func row_update)
910 {
911     struct shash_node *node;
912     struct json *json;
913     size_t max_columns = ovsdb_monitor_max_columns(dbmon);
914     unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
915
916     json = NULL;
917     SHASH_FOR_EACH (node, &dbmon->tables) {
918         struct ovsdb_monitor_table *mt = node->data;
919         struct ovsdb_monitor_row *row, *next;
920         struct ovsdb_monitor_changes *changes;
921         struct json *table_json = NULL;
922
923         changes = ovsdb_monitor_table_find_changes(mt, transaction);
924         if (!changes) {
925             continue;
926         }
927
928         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
929             struct json *row_json;
930
931             row_json = (*row_update)(mt, condition, row, initial, changed);
932             if (row_json) {
933                 char uuid[UUID_LEN + 1];
934
935                 /* Create JSON object for transaction overall. */
936                 if (!json) {
937                     json = json_object_create();
938                 }
939
940                 /* Create JSON object for transaction on this table. */
941                 if (!table_json) {
942                     table_json = json_object_create();
943                     json_object_put(json, mt->table->schema->name, table_json);
944                 }
945
946                 /* Add JSON row to JSON table. */
947                 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
948                 json_object_put(table_json, uuid, row_json);
949             }
950         }
951     }
952     free(changed);
953
954     return json;
955 }
956
957 /* Returns JSON for a <table-updates> object (as described in RFC 7047)
958  * for all the outstanding changes within 'monitor' that starts from
959  * '*unflushed' transaction id.
960  *
961  * The caller should specify 'initial' as true if the returned JSON is going to
962  * be used as part of the initial reply to a "monitor" request, false if it is
963  * going to be used as part of an "update" notification. */
964 struct json *
965 ovsdb_monitor_get_update(
966              struct ovsdb_monitor *dbmon,
967              bool initial, uint64_t *unflushed_,
968              const struct ovsdb_monitor_session_condition *condition,
969              enum ovsdb_monitor_version version)
970 {
971     struct ovsdb_monitor_json_cache_node *cache_node = NULL;
972     struct shash_node *node;
973     struct json *json;
974     const uint64_t unflushed = *unflushed_;
975     const uint64_t next_unflushed = dbmon->n_transactions + 1;
976
977     /* Return a clone of cached json if one exists. Otherwise,
978      * generate a new one and add it to the cache.  */
979     if (!condition || !condition->conditional) {
980         cache_node = ovsdb_monitor_json_cache_search(dbmon, version,
981                                                      unflushed);
982     }
983     if (cache_node) {
984         json = cache_node->json ? json_clone(cache_node->json) : NULL;
985     } else {
986         if (version == OVSDB_MONITOR_V1) {
987             json =
988                ovsdb_monitor_compose_update(dbmon, initial, unflushed,
989                                             condition,
990                                             ovsdb_monitor_compose_row_update);
991         } else {
992             ovs_assert(version == OVSDB_MONITOR_V2);
993             json =
994                ovsdb_monitor_compose_update(dbmon, initial, unflushed,
995                                             condition,
996                                             ovsdb_monitor_compose_row_update2);
997         }
998         if (!condition || !condition->conditional) {
999             ovsdb_monitor_json_cache_insert(dbmon, version, unflushed, json);
1000         }
1001     }
1002
1003     /* Maintain transaction id of 'changes'. */
1004     SHASH_FOR_EACH (node, &dbmon->tables) {
1005         struct ovsdb_monitor_table *mt = node->data;
1006
1007         ovsdb_monitor_table_untrack_changes(mt, unflushed);
1008         ovsdb_monitor_table_track_changes(mt, next_unflushed);
1009     }
1010     *unflushed_ = next_unflushed;
1011
1012     return json;
1013 }
1014
1015 bool
1016 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
1017                           uint64_t next_transaction)
1018 {
1019     ovs_assert(next_transaction <= dbmon->n_transactions + 1);
1020     return (next_transaction <= dbmon->n_transactions);
1021 }
1022
1023 void
1024 ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
1025                                const struct ovsdb_table *table,
1026                                enum ovsdb_monitor_selection select)
1027 {
1028     struct ovsdb_monitor_table * mt;
1029
1030     mt = shash_find_data(&dbmon->tables, table->schema->name);
1031     mt->select |= select;
1032 }
1033
1034  /*
1035  * If a row's change type (insert, delete or modify) matches that of
1036  * the monitor, they should be sent to the monitor's clients as updates.
1037  * Of cause, the monitor should also internally update with this change.
1038  *
1039  * When a change type does not require client side update, the monitor
1040  * may still need to keep track of certain changes in order to generate
1041  * correct future updates.  For example, the monitor internal state should
1042  * be updated whenever a new row is inserted, in order to generate the
1043  * correct initial state, regardless if a insert change type is being
1044  * monitored.
1045  *
1046  * On the other hand, if a transaction only contains changes to columns
1047  * that are not monitored, this transaction can be safely ignored by the
1048  * monitor.
1049  *
1050  * Thus, the order of the declaration is important:
1051  * 'OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE' always implies
1052  * 'OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE', but not vice versa.  */
1053 enum ovsdb_monitor_changes_efficacy {
1054     OVSDB_CHANGES_NO_EFFECT,                /* Monitor does not care about this
1055                                                change.  */
1056     OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE,  /* Monitor internal updates. */
1057     OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE,  /* Client needs to be updated.  */
1058 };
1059
1060 struct ovsdb_monitor_aux {
1061     const struct ovsdb_monitor *monitor;
1062     struct ovsdb_monitor_table *mt;
1063     enum ovsdb_monitor_changes_efficacy efficacy;
1064 };
1065
1066 static void
1067 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
1068                        const struct ovsdb_monitor *m)
1069 {
1070     aux->monitor = m;
1071     aux->mt = NULL;
1072     aux->efficacy = OVSDB_CHANGES_NO_EFFECT;
1073 }
1074
1075 static void
1076 ovsdb_monitor_changes_update(const struct ovsdb_row *old,
1077                              const struct ovsdb_row *new,
1078                              const struct ovsdb_monitor_table *mt,
1079                              struct ovsdb_monitor_changes *changes)
1080 {
1081     const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
1082     struct ovsdb_monitor_row *change;
1083
1084     change = ovsdb_monitor_changes_row_find(changes, uuid);
1085     if (!change) {
1086         change = xzalloc(sizeof *change);
1087         hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
1088         change->uuid = *uuid;
1089         change->old = clone_monitor_row_data(mt, old);
1090         change->new = clone_monitor_row_data(mt, new);
1091     } else {
1092         if (new) {
1093             update_monitor_row_data(mt, new, change->new);
1094         } else {
1095             free_monitor_row_data(mt, change->new);
1096             change->new = NULL;
1097
1098             if (!change->old) {
1099                 /* This row was added then deleted.  Forget about it. */
1100                 hmap_remove(&changes->rows, &change->hmap_node);
1101                 free(change);
1102             }
1103         }
1104     }
1105 }
1106
1107 static bool
1108 ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
1109                               const unsigned long int *changed)
1110 {
1111     size_t i;
1112
1113     for (i = 0; i < mt->n_columns; i++) {
1114         size_t column_index = mt->columns[i].column->index;
1115
1116         if (bitmap_is_set(changed, column_index)) {
1117             return true;
1118         }
1119     }
1120
1121     return false;
1122 }
1123
1124 /* Return the efficacy of a row's change to a monitor table.
1125  *
1126  * Please see the block comment above 'ovsdb_monitor_changes_efficacy'
1127  * definition form more information.  */
1128 static enum ovsdb_monitor_changes_efficacy
1129 ovsdb_monitor_changes_classify(enum ovsdb_monitor_selection type,
1130                                const struct ovsdb_monitor_table *mt,
1131                                const unsigned long int *changed)
1132 {
1133     if (type == OJMS_MODIFY &&
1134         !ovsdb_monitor_columns_changed(mt, changed)) {
1135         return OVSDB_CHANGES_NO_EFFECT;
1136     }
1137
1138     if (type == OJMS_MODIFY) {
1139         /* Condition might turn a modify operation to insert or delete */
1140         type |= OJMS_INSERT | OJMS_DELETE;
1141     }
1142
1143     return (mt->select & type)
1144                 ?  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
1145                 :  OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
1146 }
1147
1148 static bool
1149 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
1150                         const struct ovsdb_row *new,
1151                         const unsigned long int *changed,
1152                         void *aux_)
1153 {
1154     struct ovsdb_monitor_aux *aux = aux_;
1155     const struct ovsdb_monitor *m = aux->monitor;
1156     struct ovsdb_table *table = new ? new->table : old->table;
1157     struct ovsdb_monitor_table *mt;
1158     struct ovsdb_monitor_changes *changes;
1159
1160     if (!aux->mt || table != aux->mt->table) {
1161         aux->mt = shash_find_data(&m->tables, table->schema->name);
1162         if (!aux->mt) {
1163             /* We don't care about rows in this table at all.  Tell the caller
1164              * to skip it.  */
1165             return false;
1166         }
1167     }
1168     mt = aux->mt;
1169
1170     enum ovsdb_monitor_selection type =
1171         ovsdb_monitor_row_update_type(false, old, new);
1172     enum ovsdb_monitor_changes_efficacy efficacy =
1173         ovsdb_monitor_changes_classify(type, mt, changed);
1174
1175     HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
1176         if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
1177             ovsdb_monitor_changes_update(old, new, mt, changes);
1178         }
1179     }
1180     if (aux->efficacy < efficacy) {
1181         aux->efficacy = efficacy;
1182     }
1183
1184     return true;
1185 }
1186
1187 void
1188 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
1189 {
1190     struct ovsdb_monitor_aux aux;
1191     struct shash_node *node;
1192
1193     ovsdb_monitor_init_aux(&aux, dbmon);
1194     SHASH_FOR_EACH (node, &dbmon->tables) {
1195         struct ovsdb_monitor_table *mt = node->data;
1196
1197         if (mt->select & OJMS_INITIAL) {
1198             struct ovsdb_row *row;
1199             struct ovsdb_monitor_changes *changes;
1200
1201             changes = ovsdb_monitor_table_find_changes(mt, 0);
1202             if (!changes) {
1203                 changes = ovsdb_monitor_table_add_changes(mt, 0);
1204                 HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1205                     ovsdb_monitor_changes_update(NULL, row, mt, changes);
1206                 }
1207             } else {
1208                 changes->n_refs++;
1209             }
1210         }
1211     }
1212 }
1213
1214 void
1215 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
1216                    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
1217                    uint64_t unflushed)
1218 {
1219     struct jsonrpc_monitor_node *jm;
1220
1221     if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
1222         ovsdb_monitor_destroy(dbmon);
1223         return;
1224     }
1225
1226     /* Find and remove the jsonrpc monitor from the list.  */
1227     LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
1228         if (jm->jsonrpc_monitor == jsonrpc_monitor) {
1229             /* Release the tracked changes. */
1230             struct shash_node *node;
1231             SHASH_FOR_EACH (node, &dbmon->tables) {
1232                 struct ovsdb_monitor_table *mt = node->data;
1233                 ovsdb_monitor_table_untrack_changes(mt, unflushed);
1234             }
1235             ovs_list_remove(&jm->node);
1236             free(jm);
1237
1238             /* Destroy ovsdb monitor if this is the last user.  */
1239             if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
1240                 ovsdb_monitor_destroy(dbmon);
1241             }
1242
1243             return;
1244         };
1245     }
1246
1247     /* Should never reach here. jsonrpc_monitor should be on the list.  */
1248     OVS_NOT_REACHED();
1249 }
1250
1251 static bool
1252 ovsdb_monitor_table_equal(const struct ovsdb_monitor_table *a,
1253                           const struct ovsdb_monitor_table *b)
1254 {
1255     size_t i;
1256
1257     ovs_assert(b->n_columns == b->n_monitored_columns);
1258
1259     if ((a->table != b->table) ||
1260         (a->select != b->select) ||
1261         (a->n_monitored_columns != b->n_monitored_columns)) {
1262         return false;
1263     }
1264
1265     /* Compare only monitored columns that must be sorted already */
1266     for (i = 0; i < a->n_monitored_columns; i++) {
1267         if ((a->columns[i].column != b->columns[i].column) ||
1268             (a->columns[i].select != b->columns[i].select)) {
1269             return false;
1270         }
1271     }
1272     return true;
1273 }
1274
1275 static bool
1276 ovsdb_monitor_equal(const struct ovsdb_monitor *a,
1277                     const struct ovsdb_monitor *b)
1278 {
1279     struct shash_node *node;
1280
1281     if (shash_count(&a->tables) != shash_count(&b->tables)) {
1282         return false;
1283     }
1284
1285     SHASH_FOR_EACH(node, &a->tables) {
1286         const struct ovsdb_monitor_table *mta = node->data;
1287         const struct ovsdb_monitor_table *mtb;
1288
1289         mtb = shash_find_data(&b->tables, node->name);
1290         if (!mtb) {
1291             return false;
1292         }
1293
1294         if (!ovsdb_monitor_table_equal(mta, mtb)) {
1295             return false;
1296         }
1297     }
1298
1299     return true;
1300 }
1301
1302 static size_t
1303 ovsdb_monitor_hash(const struct ovsdb_monitor *dbmon, size_t basis)
1304 {
1305     const struct shash_node **nodes;
1306     size_t i, j, n;
1307
1308     nodes = shash_sort(&dbmon->tables);
1309     n = shash_count(&dbmon->tables);
1310
1311     for (i = 0; i < n; i++) {
1312         struct ovsdb_monitor_table *mt = nodes[i]->data;
1313
1314         basis = hash_pointer(mt->table, basis);
1315         basis = hash_3words(mt->select, mt->n_columns, basis);
1316
1317         for (j = 0; j < mt->n_columns; j++) {
1318             basis = hash_pointer(mt->columns[j].column, basis);
1319             basis = hash_2words(mt->columns[j].select, basis);
1320         }
1321     }
1322     free(nodes);
1323
1324     return basis;
1325 }
1326
1327 struct ovsdb_monitor *
1328 ovsdb_monitor_add(struct ovsdb_monitor *new_dbmon)
1329 {
1330     struct ovsdb_monitor *dbmon;
1331     size_t hash;
1332
1333     /* New_dbmon should be associated with only one jsonrpc
1334      * connections.  */
1335     ovs_assert(ovs_list_is_singleton(&new_dbmon->jsonrpc_monitors));
1336
1337     ovsdb_monitor_columns_sort(new_dbmon);
1338
1339     hash = ovsdb_monitor_hash(new_dbmon, 0);
1340     HMAP_FOR_EACH_WITH_HASH(dbmon, hmap_node, hash, &ovsdb_monitors) {
1341         if (ovsdb_monitor_equal(dbmon,  new_dbmon)) {
1342             return dbmon;
1343         }
1344     }
1345
1346     hmap_insert(&ovsdb_monitors, &new_dbmon->hmap_node, hash);
1347     return new_dbmon;
1348 }
1349
1350 static void
1351 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
1352 {
1353     struct shash_node *node;
1354
1355     ovs_list_remove(&dbmon->replica.node);
1356
1357     if (!hmap_node_is_null(&dbmon->hmap_node)) {
1358         hmap_remove(&ovsdb_monitors, &dbmon->hmap_node);
1359     }
1360
1361     ovsdb_monitor_json_cache_flush(dbmon);
1362     hmap_destroy(&dbmon->json_cache);
1363
1364     SHASH_FOR_EACH (node, &dbmon->tables) {
1365         struct ovsdb_monitor_table *mt = node->data;
1366         struct ovsdb_monitor_changes *changes, *next;
1367
1368         HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
1369             hmap_remove(&mt->changes, &changes->hmap_node);
1370             ovsdb_monitor_changes_destroy(changes);
1371         }
1372         hmap_destroy(&mt->changes);
1373         free(mt->columns);
1374         free(mt->columns_index_map);
1375         free(mt);
1376     }
1377     shash_destroy(&dbmon->tables);
1378     free(dbmon);
1379 }
1380
1381 static struct ovsdb_error *
1382 ovsdb_monitor_commit(struct ovsdb_replica *replica,
1383                      const struct ovsdb_txn *txn,
1384                      bool durable OVS_UNUSED)
1385 {
1386     struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
1387     struct ovsdb_monitor_aux aux;
1388
1389     ovsdb_monitor_init_aux(&aux, m);
1390     /* Update ovsdb_monitor's transaction number for
1391      * each transaction, before calling ovsdb_monitor_change_cb().  */
1392     m->n_transactions++;
1393     ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
1394
1395     switch(aux.efficacy) {
1396     case OVSDB_CHANGES_NO_EFFECT:
1397         /* The transaction is ignored by the monitor.
1398          * Roll back the 'n_transactions' as if the transaction
1399          * has never happened. */
1400         m->n_transactions--;
1401         break;
1402     case OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE:
1403         /* Nothing.  */
1404         break;
1405     case  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE:
1406         ovsdb_monitor_json_cache_flush(m);
1407         break;
1408     }
1409
1410     return NULL;
1411 }
1412
1413 static void
1414 ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
1415 {
1416     struct ovsdb_monitor *dbmon = ovsdb_monitor_cast(replica);
1417     struct jsonrpc_monitor_node *jm, *next;
1418
1419     /* Delete all front end monitors. Removing the last front
1420      * end monitor will also destroy the corresponding 'ovsdb_monitor'.
1421      * ovsdb monitor will also be destroied.  */
1422     LIST_FOR_EACH_SAFE(jm, next, node, &dbmon->jsonrpc_monitors) {
1423         ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor);
1424     }
1425 }
1426
1427 /* Add some memory usage statics for monitors into 'usage', for use with
1428  * memory_report().  */
1429 void
1430 ovsdb_monitor_get_memory_usage(struct simap *usage)
1431 {
1432     struct ovsdb_monitor *dbmon;
1433     simap_put(usage, "monitors", hmap_count(&ovsdb_monitors));
1434
1435     HMAP_FOR_EACH(dbmon, hmap_node,  &ovsdb_monitors) {
1436         simap_increase(usage, "json-caches", hmap_count(&dbmon->json_cache));
1437     }
1438 }
1439
1440 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1441     ovsdb_monitor_commit,
1442     ovsdb_monitor_destroy_callback,
1443 };