Rename nbs/nbr port names to nbsp/nbrp.
[cascardo/ovs.git] / ovsdb / monitor.c
1 /*
2  * Copyright (c) 2015 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include <errno.h>
20
21 #include "bitmap.h"
22 #include "column.h"
23 #include "openvswitch/dynamic-string.h"
24 #include "json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
28 #include "ovsdb.h"
29 #include "row.h"
30 #include "condition.h"
31 #include "simap.h"
32 #include "hash.h"
33 #include "table.h"
34 #include "hash.h"
35 #include "timeval.h"
36 #include "transaction.h"
37 #include "jsonrpc-server.h"
38 #include "monitor.h"
39 #include "openvswitch/vlog.h"
40
41 VLOG_DEFINE_THIS_MODULE(ovsdb_monitor);
42
43 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
44 static struct hmap ovsdb_monitors = HMAP_INITIALIZER(&ovsdb_monitors);
45
46 /* Keep state of session's conditions */
47 struct ovsdb_monitor_session_condition {
48     bool conditional;
49     size_t n_true_cnd;
50     struct shash tables;     /* Contains
51                               *   "struct ovsdb_monitor_table_condition *"s. */
52 };
53
54 /* Monitored table session's conditions */
55 struct ovsdb_monitor_table_condition {
56     const struct ovsdb_table *table;
57     struct ovsdb_monitor_table *mt;
58     struct ovsdb_condition old_condition;
59     struct ovsdb_condition new_condition;
60 };
61
62 /*  Backend monitor.
63  *
64  *  ovsdb_monitor keep track of the ovsdb changes.
65  */
66
67 /* A collection of tables being monitored. */
68 struct ovsdb_monitor {
69     struct ovsdb_replica replica;
70     struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
71     struct ovs_list jsonrpc_monitors;  /* Contains "jsonrpc_monitor_node"s. */
72     struct ovsdb *db;
73     uint64_t n_transactions;      /* Count number of committed transactions. */
74     struct hmap_node hmap_node;   /* Elements within ovsdb_monitors.  */
75     struct hmap json_cache;       /* Contains "ovsdb_monitor_json_cache_node"s.*/
76 };
77
78 /* A json object of updates between 'from_txn' and 'dbmon->n_transactions'
79  * inclusive.  */
80 struct ovsdb_monitor_json_cache_node {
81     struct hmap_node hmap_node;   /* Elements in json cache. */
82     enum ovsdb_monitor_version version;
83     uint64_t from_txn;
84     struct json *json;            /* Null, or a cloned of json */
85 };
86
87 struct jsonrpc_monitor_node {
88     struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
89     struct ovs_list node;
90 };
91
92 /* A particular column being monitored. */
93 struct ovsdb_monitor_column {
94     const struct ovsdb_column *column;
95     enum ovsdb_monitor_selection select;
96     bool monitored;
97 };
98
99 /* A row that has changed in a monitored table. */
100 struct ovsdb_monitor_row {
101     struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
102     struct uuid uuid;           /* UUID of row that changed. */
103     struct ovsdb_datum *old;    /* Old data, NULL for an inserted row. */
104     struct ovsdb_datum *new;    /* New data, NULL for a deleted row. */
105 };
106
107 /* Contains 'struct ovsdb_monitor_row's for rows that have been
108  * updated but not yet flushed to all the jsonrpc connection.
109  *
110  * 'n_refs' represent the number of jsonrpc connections that have
111  * not received updates. Generate the update for the last jsonprc
112  * connection will also destroy the whole "struct ovsdb_monitor_changes"
113  * object.
114  *
115  * 'transaction' stores the first update's transaction id.
116  * */
117 struct ovsdb_monitor_changes {
118     struct ovsdb_monitor_table *mt;
119     struct hmap rows;
120     int n_refs;
121     uint64_t transaction;
122     struct hmap_node hmap_node;  /* Element in ovsdb_monitor_tables' changes
123                                     hmap.  */
124 };
125
126 /* A particular table being monitored. */
127 struct ovsdb_monitor_table {
128     const struct ovsdb_table *table;
129
130     /* This is the union (bitwise-OR) of the 'select' values in all of the
131      * members of 'columns' below. */
132     enum ovsdb_monitor_selection select;
133
134     /* Columns being monitored. */
135     struct ovsdb_monitor_column *columns;
136     size_t n_columns;
137     size_t n_monitored_columns;
138     size_t allocated_columns;
139
140     /* Columns in ovsdb_monitor_row have different indexes then in
141      * ovsdb_row. This field maps between column->index to the index in the
142      * ovsdb_monitor_row. It is used for condition evaluation. */
143     unsigned int *columns_index_map;
144
145     /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
146     struct hmap changes;
147 };
148
149 enum ovsdb_monitor_row_type {
150     OVSDB_ROW,
151     OVSDB_MONITOR_ROW
152 };
153
154 typedef struct json *
155 (*compose_row_update_cb_func)
156     (const struct ovsdb_monitor_table *mt,
157      const struct ovsdb_monitor_session_condition * condition,
158      enum ovsdb_monitor_row_type row_type,
159      const void *,
160      bool initial, unsigned long int *changed);
161
162 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
163 static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
164     struct ovsdb_monitor_table *mt, uint64_t next_txn);
165 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
166     struct ovsdb_monitor_table *mt, uint64_t unflushed);
167 static void ovsdb_monitor_changes_destroy(
168                                   struct ovsdb_monitor_changes *changes);
169 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
170                                   uint64_t unflushed);
171
172 static uint32_t
173 json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn)
174 {
175     uint32_t hash;
176
177     hash = hash_uint64(version);
178     hash = hash_uint64_basis(from_txn, hash);
179
180     return hash;
181 }
182
183 static struct ovsdb_monitor_json_cache_node *
184 ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
185                                 enum ovsdb_monitor_version version,
186                                 uint64_t from_txn)
187 {
188     struct ovsdb_monitor_json_cache_node *node;
189     uint32_t hash = json_cache_hash(version, from_txn);
190
191     HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) {
192         if (node->from_txn == from_txn && node->version == version) {
193             return node;
194         }
195     }
196
197     return NULL;
198 }
199
200 static void
201 ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon,
202                                 enum ovsdb_monitor_version version,
203                                 uint64_t from_txn, struct json *json)
204 {
205     struct ovsdb_monitor_json_cache_node *node;
206     uint32_t hash = json_cache_hash(version, from_txn);
207
208     node = xmalloc(sizeof *node);
209
210     node->version = version;
211     node->from_txn = from_txn;
212     node->json = json ? json_clone(json) : NULL;
213
214     hmap_insert(&dbmon->json_cache, &node->hmap_node, hash);
215 }
216
217 static void
218 ovsdb_monitor_json_cache_flush(struct ovsdb_monitor *dbmon)
219 {
220     struct ovsdb_monitor_json_cache_node *node;
221
222     HMAP_FOR_EACH_POP(node, hmap_node, &dbmon->json_cache) {
223         json_destroy(node->json);
224         free(node);
225     }
226 }
227
228 static int
229 compare_ovsdb_monitor_column(const void *a_, const void *b_)
230 {
231     const struct ovsdb_monitor_column *a = a_;
232     const struct ovsdb_monitor_column *b = b_;
233
234     /* put all monitored columns at the begining */
235     if (a->monitored != b->monitored) {
236         return a->monitored ? -1 : 1;
237     }
238
239     return a->column < b->column ? -1 : a->column > b->column;
240 }
241
242 static struct ovsdb_monitor *
243 ovsdb_monitor_cast(struct ovsdb_replica *replica)
244 {
245     ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
246     return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
247 }
248
249 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
250  * given 'uuid', or NULL if there is no such row. */
251 static struct ovsdb_monitor_row *
252 ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
253                                const struct uuid *uuid)
254 {
255     struct ovsdb_monitor_row *row;
256
257     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
258                              &changes->rows) {
259         if (uuid_equals(uuid, &row->uuid)) {
260             return row;
261         }
262     }
263     return NULL;
264 }
265
266 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
267  * copies of the data in 'row' drawn from the columns represented by
268  * mt->columns[].  Returns the array.
269  *
270  * If 'row' is NULL, returns NULL. */
271 static struct ovsdb_datum *
272 clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
273                        const struct ovsdb_row *row)
274 {
275     struct ovsdb_datum *data;
276     size_t i;
277
278     if (!row) {
279         return NULL;
280     }
281
282     data = xmalloc(mt->n_columns * sizeof *data);
283     for (i = 0; i < mt->n_columns; i++) {
284         const struct ovsdb_column *c = mt->columns[i].column;
285         const struct ovsdb_datum *src = &row->fields[c->index];
286         struct ovsdb_datum *dst = &data[i];
287         const struct ovsdb_type *type = &c->type;
288
289         ovsdb_datum_clone(dst, src, type);
290     }
291     return data;
292 }
293
294 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
295  * in 'row' drawn from the columns represented by mt->columns[]. */
296 static void
297 update_monitor_row_data(const struct ovsdb_monitor_table *mt,
298                         const struct ovsdb_row *row,
299                         struct ovsdb_datum *data)
300 {
301     size_t i;
302
303     for (i = 0; i < mt->n_columns; i++) {
304         const struct ovsdb_column *c = mt->columns[i].column;
305         const struct ovsdb_datum *src = &row->fields[c->index];
306         struct ovsdb_datum *dst = &data[i];
307         const struct ovsdb_type *type = &c->type;
308
309         if (!ovsdb_datum_equals(src, dst, type)) {
310             ovsdb_datum_destroy(dst, type);
311             ovsdb_datum_clone(dst, src, type);
312         }
313     }
314 }
315
316 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
317  * from mt->columns[], plus 'data' itself. */
318 static void
319 free_monitor_row_data(const struct ovsdb_monitor_table *mt,
320                       struct ovsdb_datum *data)
321 {
322     if (data) {
323         size_t i;
324
325         for (i = 0; i < mt->n_columns; i++) {
326             const struct ovsdb_column *c = mt->columns[i].column;
327
328             ovsdb_datum_destroy(&data[i], &c->type);
329         }
330         free(data);
331     }
332 }
333
334 /* Frees 'row', which must have been created from 'mt'. */
335 static void
336 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
337                           struct ovsdb_monitor_row *row)
338 {
339     if (row) {
340         free_monitor_row_data(mt, row->old);
341         free_monitor_row_data(mt, row->new);
342         free(row);
343     }
344 }
345
346 static void
347 ovsdb_monitor_columns_sort(struct ovsdb_monitor *dbmon)
348 {
349     int i;
350     struct shash_node *node;
351
352     SHASH_FOR_EACH (node, &dbmon->tables) {
353         struct ovsdb_monitor_table *mt = node->data;
354
355         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
356               compare_ovsdb_monitor_column);
357         for (i = 0; i < mt->n_columns; i++) {
358             /* re-set index map due to sort */
359             mt->columns_index_map[mt->columns[i].column->index] = i;
360         }
361     }
362 }
363
364 void
365 ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
366                                   struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
367 {
368     struct jsonrpc_monitor_node *jm;
369
370     jm = xzalloc(sizeof *jm);
371     jm->jsonrpc_monitor = jsonrpc_monitor;
372     ovs_list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
373 }
374
375 struct ovsdb_monitor *
376 ovsdb_monitor_create(struct ovsdb *db,
377                      struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
378 {
379     struct ovsdb_monitor *dbmon;
380
381     dbmon = xzalloc(sizeof *dbmon);
382
383     ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
384     ovsdb_add_replica(db, &dbmon->replica);
385     ovs_list_init(&dbmon->jsonrpc_monitors);
386     dbmon->db = db;
387     dbmon->n_transactions = 0;
388     shash_init(&dbmon->tables);
389     hmap_node_nullify(&dbmon->hmap_node);
390     hmap_init(&dbmon->json_cache);
391
392     ovsdb_monitor_add_jsonrpc_monitor(dbmon, jsonrpc_monitor);
393     return dbmon;
394 }
395
396 void
397 ovsdb_monitor_add_table(struct ovsdb_monitor *m,
398                         const struct ovsdb_table *table)
399 {
400     struct ovsdb_monitor_table *mt;
401     int i;
402     size_t n_columns = shash_count(&table->schema->columns);
403
404     mt = xzalloc(sizeof *mt);
405     mt->table = table;
406     shash_add(&m->tables, table->schema->name, mt);
407     hmap_init(&mt->changes);
408     mt->columns_index_map =
409         xmalloc(sizeof *mt->columns_index_map * n_columns);
410     for (i = 0; i < n_columns; i++) {
411         mt->columns_index_map[i] = -1;
412     }
413 }
414
415 const char *
416 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
417                          const struct ovsdb_table *table,
418                          const struct ovsdb_column *column,
419                          enum ovsdb_monitor_selection select,
420                          bool monitored)
421 {
422     struct ovsdb_monitor_table *mt;
423     struct ovsdb_monitor_column *c;
424
425     mt = shash_find_data(&dbmon->tables, table->schema->name);
426
427     /* Check for column duplication. Return duplicated column name. */
428     if (mt->columns_index_map[column->index] != -1) {
429         return column->name;
430     }
431
432     if (mt->n_columns >= mt->allocated_columns) {
433         mt->columns = x2nrealloc(mt->columns, &mt->allocated_columns,
434                                  sizeof *mt->columns);
435     }
436
437     mt->select |= select;
438     mt->columns_index_map[column->index] = mt->n_columns;
439     c = &mt->columns[mt->n_columns++];
440     c->column = column;
441     c->select = select;
442     c->monitored = monitored;
443     if (monitored) {
444         mt->n_monitored_columns++;
445     }
446
447     return NULL;
448 }
449
450 static void
451 ovsdb_monitor_condition_add_columns(struct ovsdb_monitor *dbmon,
452                                     const struct ovsdb_table *table,
453                                     struct ovsdb_condition *condition)
454 {
455     size_t n_columns;
456     int i;
457     const struct ovsdb_column **columns =
458         ovsdb_condition_get_columns(condition, &n_columns);
459
460     for (i = 0; i < n_columns; i++) {
461         ovsdb_monitor_add_column(dbmon, table, columns[i],
462                                  OJMS_NONE, false);
463     }
464
465     free(columns);
466 }
467
468 /* Bind this session's condition to ovsdb_monitor */
469 void
470 ovsdb_monitor_condition_bind(struct ovsdb_monitor *dbmon,
471                           struct ovsdb_monitor_session_condition *cond)
472 {
473     struct shash_node *node;
474
475     SHASH_FOR_EACH(node, &cond->tables) {
476         struct ovsdb_monitor_table_condition *mtc = node->data;
477         struct ovsdb_monitor_table *mt =
478             shash_find_data(&dbmon->tables, mtc->table->schema->name);
479
480         mtc->mt = mt;
481         ovsdb_monitor_condition_add_columns(dbmon, mtc->table,
482                                             &mtc->new_condition);
483     }
484 }
485
486 bool
487 ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
488                            const struct ovsdb_table *table)
489 {
490     return shash_find_data(&m->tables, table->schema->name);
491 }
492
493 static struct ovsdb_monitor_changes *
494 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
495                                 uint64_t next_txn)
496 {
497     struct ovsdb_monitor_changes *changes;
498
499     changes = xzalloc(sizeof *changes);
500
501     changes->transaction = next_txn;
502     changes->mt = mt;
503     changes->n_refs = 1;
504     hmap_init(&changes->rows);
505     hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
506
507     return changes;
508 };
509
510 static struct ovsdb_monitor_changes *
511 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
512                                  uint64_t transaction)
513 {
514     struct ovsdb_monitor_changes *changes;
515     size_t hash = hash_uint64(transaction);
516
517     HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
518         if (changes->transaction == transaction) {
519             return changes;
520         }
521     }
522
523     return NULL;
524 }
525
526 /* Stop currently tracking changes to table 'mt' since 'transaction'. */
527 static void
528 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
529                                     uint64_t transaction)
530 {
531     struct ovsdb_monitor_changes *changes =
532                 ovsdb_monitor_table_find_changes(mt, transaction);
533     if (changes) {
534         if (--changes->n_refs == 0) {
535             hmap_remove(&mt->changes, &changes->hmap_node);
536             ovsdb_monitor_changes_destroy(changes);
537         }
538     }
539 }
540
541 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
542  */
543 static void
544 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
545                                   uint64_t transaction)
546 {
547     struct ovsdb_monitor_changes *changes;
548
549     changes = ovsdb_monitor_table_find_changes(mt, transaction);
550     if (changes) {
551         changes->n_refs++;
552     } else {
553         ovsdb_monitor_table_add_changes(mt, transaction);
554     }
555 }
556
557 static void
558 ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
559 {
560     struct ovsdb_monitor_row *row, *next;
561
562     HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
563         hmap_remove(&changes->rows, &row->hmap_node);
564         ovsdb_monitor_row_destroy(changes->mt, row);
565     }
566     hmap_destroy(&changes->rows);
567     free(changes);
568 }
569
570 static enum ovsdb_monitor_selection
571 ovsdb_monitor_row_update_type(bool initial, const bool old, const bool new)
572 {
573     return initial ? OJMS_INITIAL
574             : !old ? OJMS_INSERT
575             : !new ? OJMS_DELETE
576             : OJMS_MODIFY;
577 }
578
579 /* Set conditional monitoring mode only if we have non-empty condition in one
580  * of the tables at least */
581 static inline void
582 ovsdb_monitor_session_condition_set_mode(
583                                   struct ovsdb_monitor_session_condition *cond)
584 {
585     cond->conditional = shash_count(&cond->tables) !=
586         cond->n_true_cnd;
587 }
588
589 /* Returnes an empty allocated session's condition state holder */
590 struct ovsdb_monitor_session_condition *
591 ovsdb_monitor_session_condition_create(void)
592 {
593     struct ovsdb_monitor_session_condition *condition =
594         xzalloc(sizeof *condition);
595
596     condition->conditional = false;
597     shash_init(&condition->tables);
598     return condition;
599 }
600
601 void
602 ovsdb_monitor_session_condition_destroy(
603                            struct ovsdb_monitor_session_condition *condition)
604 {
605     struct shash_node *node, *next;
606
607     if (!condition) {
608         return;
609     }
610
611     SHASH_FOR_EACH_SAFE (node, next, &condition->tables) {
612         struct ovsdb_monitor_table_condition *mtc = node->data;
613
614         ovsdb_condition_destroy(&mtc->new_condition);
615         ovsdb_condition_destroy(&mtc->old_condition);
616         shash_delete(&condition->tables, node);
617         free(mtc);
618     }
619     free(condition);
620 }
621
622 struct ovsdb_error *
623 ovsdb_monitor_table_condition_create(
624                          struct ovsdb_monitor_session_condition *condition,
625                          const struct ovsdb_table *table,
626                          const struct json *json_cnd)
627 {
628     struct ovsdb_monitor_table_condition *mtc;
629     struct ovsdb_error *error;
630
631     mtc = xzalloc(sizeof *mtc);
632     mtc->table = table;
633     ovsdb_condition_init(&mtc->old_condition);
634     ovsdb_condition_init(&mtc->new_condition);
635
636     if (json_cnd) {
637         error = ovsdb_condition_from_json(table->schema,
638                                           json_cnd,
639                                           NULL,
640                                           &mtc->old_condition);
641         if (error) {
642             free(mtc);
643             return error;
644         }
645     }
646
647     shash_add(&condition->tables, table->schema->name, mtc);
648     /* On session startup old == new condition */
649     ovsdb_condition_clone(&mtc->new_condition, &mtc->old_condition);
650     if (ovsdb_condition_is_true(&mtc->old_condition)) {
651         condition->n_true_cnd++;
652         ovsdb_monitor_session_condition_set_mode(condition);
653     }
654
655     return NULL;
656 }
657
658 static bool
659 ovsdb_monitor_get_table_conditions(
660                       const struct ovsdb_monitor_table *mt,
661                       const struct ovsdb_monitor_session_condition *condition,
662                       struct ovsdb_condition **old_condition,
663                       struct ovsdb_condition **new_condition)
664 {
665     if (!condition) {
666         return false;
667     }
668
669     struct ovsdb_monitor_table_condition *mtc =
670         shash_find_data(&condition->tables, mt->table->schema->name);
671
672     if (!mtc) {
673         return false;
674     }
675     *old_condition = &mtc->old_condition;
676     *new_condition = &mtc->new_condition;
677
678     return true;
679 }
680
681 struct ovsdb_error *
682 ovsdb_monitor_table_condition_update(
683                             struct ovsdb_monitor *dbmon,
684                             struct ovsdb_monitor_session_condition *condition,
685                             const struct ovsdb_table *table,
686                             const struct json *cond_json)
687 {
688     struct ovsdb_monitor_table_condition *mtc =
689         shash_find_data(&condition->tables, table->schema->name);
690     struct ovsdb_error *error;
691     struct ovsdb_condition cond = OVSDB_CONDITION_INITIALIZER(&cond);
692
693     if (!condition) {
694         return NULL;
695     }
696
697     error = ovsdb_condition_from_json(table->schema, cond_json,
698                                       NULL, &cond);
699     if (error) {
700         return error;
701     }
702     ovsdb_condition_destroy(&mtc->new_condition);
703     ovsdb_condition_clone(&mtc->new_condition, &cond);
704     ovsdb_condition_destroy(&cond);
705     ovsdb_monitor_condition_add_columns(dbmon,
706                                         table,
707                                         &mtc->new_condition);
708
709     return NULL;
710 }
711
712 static void
713 ovsdb_monitor_table_condition_updated(struct ovsdb_monitor_table *mt,
714                     struct ovsdb_monitor_session_condition *condition)
715 {
716     struct ovsdb_monitor_table_condition *mtc =
717         shash_find_data(&condition->tables, mt->table->schema->name);
718
719     if (mtc) {
720         /* If conditional monitoring - set old condition to new condition */
721         if (ovsdb_condition_cmp_3way(&mtc->old_condition,
722                                      &mtc->new_condition)) {
723             if (ovsdb_condition_is_true(&mtc->new_condition)) {
724                                 if (!ovsdb_condition_is_true(&mtc->old_condition)) {
725                     condition->n_true_cnd++;
726                 }
727             } else {
728                 if (ovsdb_condition_is_true(&mtc->old_condition)) {
729                     condition->n_true_cnd--;
730                 }
731             }
732             ovsdb_condition_destroy(&mtc->old_condition);
733             ovsdb_condition_clone(&mtc->old_condition, &mtc->new_condition);
734             ovsdb_monitor_session_condition_set_mode(condition);
735         }
736     }
737 }
738
739 static enum ovsdb_monitor_selection
740 ovsdb_monitor_row_update_type_condition(
741                       const struct ovsdb_monitor_table *mt,
742                       const struct ovsdb_monitor_session_condition *condition,
743                       bool initial,
744                       enum ovsdb_monitor_row_type row_type,
745                       const struct ovsdb_datum *old,
746                       const struct ovsdb_datum *new)
747 {
748     struct ovsdb_condition *old_condition, *new_condition;
749     enum ovsdb_monitor_selection type =
750         ovsdb_monitor_row_update_type(initial, old, new);
751
752     if (ovsdb_monitor_get_table_conditions(mt,
753                                            condition,
754                                            &old_condition,
755                                            &new_condition)) {
756         bool old_cond = !old ? false
757             : ovsdb_condition_empty_or_match_any(old,
758                                                 old_condition,
759                                                 row_type == OVSDB_MONITOR_ROW ?
760                                                 mt->columns_index_map :
761                                                 NULL);
762         bool new_cond = !new ? false
763             : ovsdb_condition_empty_or_match_any(new,
764                                                 new_condition,
765                                                 row_type == OVSDB_MONITOR_ROW ?
766                                                 mt->columns_index_map :
767                                                 NULL);
768
769         if (!old_cond && !new_cond) {
770             type = OJMS_NONE;
771         }
772
773         switch (type) {
774         case OJMS_INITIAL:
775         case OJMS_INSERT:
776             if (!new_cond) {
777                 type = OJMS_NONE;
778             }
779             break;
780         case OJMS_MODIFY:
781             type = !old_cond ? OJMS_INSERT : !new_cond
782                 ? OJMS_DELETE : OJMS_MODIFY;
783             break;
784         case OJMS_DELETE:
785             if (!old_cond) {
786                 type = OJMS_NONE;
787             }
788             break;
789         case OJMS_NONE:
790             break;
791         }
792     }
793     return type;
794 }
795
796 static bool
797 ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
798                               enum ovsdb_monitor_row_type row_type,
799                               const struct ovsdb_datum *old,
800                               const struct ovsdb_datum *new,
801                               enum ovsdb_monitor_selection type,
802                               unsigned long int *changed)
803 {
804     if (!(mt->select & type)) {
805         return true;
806     }
807
808     if (type == OJMS_MODIFY) {
809         size_t i, n_changes;
810
811         n_changes = 0;
812         memset(changed, 0, bitmap_n_bytes(mt->n_columns));
813         for (i = 0; i < mt->n_columns; i++) {
814             const struct ovsdb_column *c = mt->columns[i].column;
815             size_t index = row_type == OVSDB_ROW ? c->index : i;
816             if (!ovsdb_datum_equals(&old[index], &new[index], &c->type)) {
817                 bitmap_set1(changed, i);
818                 n_changes++;
819             }
820         }
821         if (!n_changes) {
822             /* No actual changes: presumably a row changed and then
823              * changed back later. */
824             return true;
825         }
826     }
827
828     return false;
829 }
830
831 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
832  * 'mt', or NULL if no row update should be sent.
833  *
834  * The caller should specify 'initial' as true if the returned JSON is going to
835  * be used as part of the initial reply to a "monitor" request, false if it is
836  * going to be used as part of an "update" notification.
837  *
838  * 'changed' must be a scratch buffer for internal use that is at least
839  * bitmap_n_bytes(mt->n_columns) bytes long. */
840 static struct json *
841 ovsdb_monitor_compose_row_update(
842     const struct ovsdb_monitor_table *mt,
843     const struct ovsdb_monitor_session_condition *condition OVS_UNUSED,
844     enum ovsdb_monitor_row_type row_type OVS_UNUSED,
845     const void *_row,
846     bool initial, unsigned long int *changed)
847 {
848     const struct ovsdb_monitor_row *row = _row;
849     enum ovsdb_monitor_selection type;
850     struct json *old_json, *new_json;
851     struct json *row_json;
852     size_t i;
853
854     ovs_assert(row_type == OVSDB_MONITOR_ROW);
855     type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
856     if (ovsdb_monitor_row_skip_update(mt, row_type, row->old,
857                                       row->new, type, changed)) {
858         return NULL;
859     }
860
861     row_json = json_object_create();
862     old_json = new_json = NULL;
863     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
864         old_json = json_object_create();
865         json_object_put(row_json, "old", old_json);
866     }
867     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
868         new_json = json_object_create();
869         json_object_put(row_json, "new", new_json);
870     }
871     for (i = 0; i < mt->n_monitored_columns; i++) {
872         const struct ovsdb_monitor_column *c = &mt->columns[i];
873
874         if (!c->monitored || !(type & c->select))  {
875             /* We don't care about this type of change for this
876              * particular column (but we will care about it for some
877              * other column). */
878             continue;
879         }
880
881         if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
882             || type == OJMS_DELETE) {
883             json_object_put(old_json, c->column->name,
884                             ovsdb_datum_to_json(&row->old[i],
885                                                 &c->column->type));
886         }
887         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
888             json_object_put(new_json, c->column->name,
889                             ovsdb_datum_to_json(&row->new[i],
890                                                 &c->column->type));
891         }
892     }
893
894     return row_json;
895 }
896
897 /* Returns JSON for a <row-update2> (as described in ovsdb-server(1) mapage)
898  * for 'row' within * 'mt', or NULL if no row update should be sent.
899  *
900  * The caller should specify 'initial' as true if the returned JSON is
901  * going to be used as part of the initial reply to a "monitor_cond" request,
902  * false if it is going to be used as part of an "update2" notification.
903  *
904  * 'changed' must be a scratch buffer for internal use that is at least
905  * bitmap_n_bytes(mt->n_columns) bytes long. */
906 static struct json *
907 ovsdb_monitor_compose_row_update2(
908     const struct ovsdb_monitor_table *mt,
909     const struct ovsdb_monitor_session_condition *condition,
910     enum ovsdb_monitor_row_type row_type,
911     const void *_row,
912     bool initial, unsigned long int *changed)
913 {
914     enum ovsdb_monitor_selection type;
915     struct json *row_update2, *diff_json;
916     const struct ovsdb_datum *old, *new;
917     size_t i;
918
919     if (row_type == OVSDB_MONITOR_ROW) {
920         old = ((const struct ovsdb_monitor_row *)_row)->old;;
921         new = ((const struct ovsdb_monitor_row *)_row)->new;
922     } else {
923         old = new = ((const struct ovsdb_row *)_row)->fields;
924     }
925
926     type = ovsdb_monitor_row_update_type_condition(mt, condition, initial,
927                                                    row_type, old, new);
928     if (ovsdb_monitor_row_skip_update(mt, row_type, old, new, type, changed)) {
929         return NULL;
930     }
931
932     row_update2 = json_object_create();
933     if (type == OJMS_DELETE) {
934         json_object_put(row_update2, "delete", json_null_create());
935     } else {
936         diff_json = json_object_create();
937         const char *op;
938
939         for (i = 0; i < mt->n_monitored_columns; i++) {
940             const struct ovsdb_monitor_column *c = &mt->columns[i];
941             size_t index = row_type == OVSDB_ROW ? c->column->index : i;
942             if (!c->monitored || !(type & c->select))  {
943                 /* We don't care about this type of change for this
944                  * particular column (but we will care about it for some
945                  * other column). */
946                 continue;
947             }
948
949             if (type == OJMS_MODIFY) {
950                 struct ovsdb_datum diff;
951
952                 if (!bitmap_is_set(changed, i)) {
953                     continue;
954                 }
955
956                 ovsdb_datum_diff(&diff ,&old[index], &new[index],
957                                         &c->column->type);
958                 json_object_put(diff_json, c->column->name,
959                                 ovsdb_datum_to_json(&diff, &c->column->type));
960                 ovsdb_datum_destroy(&diff, &c->column->type);
961             } else {
962                 if (!ovsdb_datum_is_default(&new[index], &c->column->type)) {
963                     json_object_put(diff_json, c->column->name,
964                                     ovsdb_datum_to_json(&new[index],
965                                                         &c->column->type));
966                 }
967             }
968         }
969
970         op = type == OJMS_INITIAL ? "initial"
971                                   : type == OJMS_MODIFY ? "modify" : "insert";
972         json_object_put(row_update2, op, diff_json);
973     }
974
975     return row_update2;
976 }
977
978 static size_t
979 ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
980 {
981     struct shash_node *node;
982     size_t max_columns = 0;
983
984     SHASH_FOR_EACH (node, &dbmon->tables) {
985         struct ovsdb_monitor_table *mt = node->data;
986
987         max_columns = MAX(max_columns, mt->n_columns);
988     }
989
990     return max_columns;
991 }
992
993 static void
994 ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
995                            struct json **table_json, struct json *row_json,
996                            const struct uuid *row_uuid)
997 {
998     char uuid[UUID_LEN + 1];
999
1000     /* Create JSON object for transaction overall. */
1001     if (!*json) {
1002         *json = json_object_create();
1003     }
1004
1005     /* Create JSON object for transaction on this table. */
1006     if (!*table_json) {
1007         *table_json = json_object_create();
1008         json_object_put(*json, table_name, *table_json);
1009     }
1010
1011     /* Add JSON row to JSON table. */
1012     snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(row_uuid));
1013     json_object_put(*table_json, uuid, row_json);
1014 }
1015
1016 /* Constructs and returns JSON for a <table-updates> object (as described in
1017  * RFC 7047) for all the outstanding changes within 'monitor', starting from
1018  * 'transaction'.  */
1019 static struct json*
1020 ovsdb_monitor_compose_update(
1021                       struct ovsdb_monitor *dbmon,
1022                       bool initial, uint64_t transaction,
1023                       const struct ovsdb_monitor_session_condition *condition,
1024                       compose_row_update_cb_func row_update)
1025 {
1026     struct shash_node *node;
1027     struct json *json;
1028     size_t max_columns = ovsdb_monitor_max_columns(dbmon);
1029     unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
1030
1031     json = NULL;
1032     SHASH_FOR_EACH (node, &dbmon->tables) {
1033         struct ovsdb_monitor_table *mt = node->data;
1034         struct ovsdb_monitor_row *row, *next;
1035         struct ovsdb_monitor_changes *changes;
1036         struct json *table_json = NULL;
1037
1038         changes = ovsdb_monitor_table_find_changes(mt, transaction);
1039         if (!changes) {
1040             continue;
1041         }
1042
1043                 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
1044                         struct json *row_json;
1045                         row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
1046                                                                          initial, changed);
1047                         if (row_json) {
1048                                 ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
1049                                                                                    &table_json, row_json,
1050                                                                                    &row->uuid);
1051                         }
1052                 }
1053         }
1054     free(changed);
1055
1056     return json;
1057 }
1058
1059 static struct json*
1060 ovsdb_monitor_compose_cond_change_update(
1061                     struct ovsdb_monitor *dbmon,
1062                     struct ovsdb_monitor_session_condition *condition)
1063 {
1064     struct shash_node *node;
1065     struct json *json = NULL;
1066     size_t max_columns = ovsdb_monitor_max_columns(dbmon);
1067     unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
1068
1069     SHASH_FOR_EACH (node, &dbmon->tables) {
1070         struct ovsdb_monitor_table *mt = node->data;
1071         struct ovsdb_row *row;
1072         struct json *table_json = NULL;
1073         struct ovsdb_condition *old_condition, *new_condition;
1074
1075         if (!ovsdb_monitor_get_table_conditions(mt,
1076                                                 condition,
1077                                                 &old_condition,
1078                                                 &new_condition) ||
1079             !ovsdb_condition_cmp_3way(old_condition, new_condition)) {
1080             /* Nothing to update on this table */
1081             continue;
1082         }
1083
1084         /* Iterate over all rows in table */
1085         HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1086             struct json *row_json;
1087
1088             row_json = ovsdb_monitor_compose_row_update2(mt, condition,
1089                                                          OVSDB_ROW, row,
1090                                                          false, changed);
1091             if (row_json) {
1092                 ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
1093                                            &table_json, row_json,
1094                                            ovsdb_row_get_uuid(row));
1095             }
1096         }
1097         ovsdb_monitor_table_condition_updated(mt, condition);
1098     }
1099     free(changed);
1100
1101     return json;
1102 }
1103
1104 /* Returns JSON for a <table-updates> object (as described in RFC 7047)
1105  * for all the outstanding changes within 'monitor' that starts from
1106  * '*unflushed'.
1107  * If cond_updated is true all rows in the db that match conditions will be
1108  * sent.
1109  *
1110  * The caller should specify 'initial' as true if the returned JSON is going to
1111  * be used as part of the initial reply to a "monitor" request, false if it is
1112  * going to be used as part of an "update" notification. */
1113 struct json *
1114 ovsdb_monitor_get_update(
1115              struct ovsdb_monitor *dbmon,
1116              bool initial, bool cond_updated,
1117              uint64_t *unflushed_,
1118              struct ovsdb_monitor_session_condition *condition,
1119              enum ovsdb_monitor_version version)
1120 {
1121     struct ovsdb_monitor_json_cache_node *cache_node = NULL;
1122     struct shash_node *node;
1123     struct json *json;
1124     const uint64_t unflushed = *unflushed_;
1125     const uint64_t next_unflushed = dbmon->n_transactions + 1;
1126
1127     ovs_assert(cond_updated ? unflushed == next_unflushed : true);
1128
1129     /* Return a clone of cached json if one exists. Otherwise,
1130      * generate a new one and add it to the cache.  */
1131     if (!condition || (!condition->conditional && !cond_updated)) {
1132         cache_node = ovsdb_monitor_json_cache_search(dbmon, version,
1133                                                      unflushed);
1134     }
1135     if (cache_node) {
1136         json = cache_node->json ? json_clone(cache_node->json) : NULL;
1137     } else {
1138         if (version == OVSDB_MONITOR_V1) {
1139             json =
1140                ovsdb_monitor_compose_update(dbmon, initial, unflushed,
1141                                             condition,
1142                                             ovsdb_monitor_compose_row_update);
1143         } else {
1144             ovs_assert(version == OVSDB_MONITOR_V2);
1145             if (!cond_updated) {
1146                                 json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
1147                                                                                         condition,
1148                                                                                         ovsdb_monitor_compose_row_update2);
1149
1150                                 if (!condition || !condition->conditional) {
1151                                         ovsdb_monitor_json_cache_insert(dbmon, version, unflushed,
1152                                                                                                         json);
1153                                 }
1154                         } else {
1155                 /* Compose update on whole db due to condition update.
1156                    Session must be flushed (change list is empty)*/
1157                                 json =
1158                                         ovsdb_monitor_compose_cond_change_update(dbmon, condition);
1159                         }
1160                 }
1161     }
1162
1163     /* Maintain transaction id of 'changes'. */
1164     SHASH_FOR_EACH (node, &dbmon->tables) {
1165         struct ovsdb_monitor_table *mt = node->data;
1166
1167         ovsdb_monitor_table_untrack_changes(mt, unflushed);
1168         ovsdb_monitor_table_track_changes(mt, next_unflushed);
1169     }
1170     *unflushed_ = next_unflushed;
1171
1172     return json;
1173 }
1174
1175 bool
1176 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
1177                           uint64_t next_transaction)
1178 {
1179     ovs_assert(next_transaction <= dbmon->n_transactions + 1);
1180     return (next_transaction <= dbmon->n_transactions);
1181 }
1182
1183 void
1184 ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
1185                                const struct ovsdb_table *table,
1186                                enum ovsdb_monitor_selection select)
1187 {
1188     struct ovsdb_monitor_table * mt;
1189
1190     mt = shash_find_data(&dbmon->tables, table->schema->name);
1191     mt->select |= select;
1192 }
1193
1194  /*
1195  * If a row's change type (insert, delete or modify) matches that of
1196  * the monitor, they should be sent to the monitor's clients as updates.
1197  * Of cause, the monitor should also internally update with this change.
1198  *
1199  * When a change type does not require client side update, the monitor
1200  * may still need to keep track of certain changes in order to generate
1201  * correct future updates.  For example, the monitor internal state should
1202  * be updated whenever a new row is inserted, in order to generate the
1203  * correct initial state, regardless if a insert change type is being
1204  * monitored.
1205  *
1206  * On the other hand, if a transaction only contains changes to columns
1207  * that are not monitored, this transaction can be safely ignored by the
1208  * monitor.
1209  *
1210  * Thus, the order of the declaration is important:
1211  * 'OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE' always implies
1212  * 'OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE', but not vice versa.  */
1213 enum ovsdb_monitor_changes_efficacy {
1214     OVSDB_CHANGES_NO_EFFECT,                /* Monitor does not care about this
1215                                                change.  */
1216     OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE,  /* Monitor internal updates. */
1217     OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE,  /* Client needs to be updated.  */
1218 };
1219
1220 struct ovsdb_monitor_aux {
1221     const struct ovsdb_monitor *monitor;
1222     struct ovsdb_monitor_table *mt;
1223     enum ovsdb_monitor_changes_efficacy efficacy;
1224 };
1225
1226 static void
1227 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
1228                        const struct ovsdb_monitor *m)
1229 {
1230     aux->monitor = m;
1231     aux->mt = NULL;
1232     aux->efficacy = OVSDB_CHANGES_NO_EFFECT;
1233 }
1234
1235 static void
1236 ovsdb_monitor_changes_update(const struct ovsdb_row *old,
1237                              const struct ovsdb_row *new,
1238                              const struct ovsdb_monitor_table *mt,
1239                              struct ovsdb_monitor_changes *changes)
1240 {
1241     const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
1242     struct ovsdb_monitor_row *change;
1243
1244     change = ovsdb_monitor_changes_row_find(changes, uuid);
1245     if (!change) {
1246         change = xzalloc(sizeof *change);
1247         hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
1248         change->uuid = *uuid;
1249         change->old = clone_monitor_row_data(mt, old);
1250         change->new = clone_monitor_row_data(mt, new);
1251     } else {
1252         if (new) {
1253             update_monitor_row_data(mt, new, change->new);
1254         } else {
1255             free_monitor_row_data(mt, change->new);
1256             change->new = NULL;
1257
1258             if (!change->old) {
1259                 /* This row was added then deleted.  Forget about it. */
1260                 hmap_remove(&changes->rows, &change->hmap_node);
1261                 free(change);
1262             }
1263         }
1264     }
1265 }
1266
1267 static bool
1268 ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
1269                               const unsigned long int *changed)
1270 {
1271     size_t i;
1272
1273     for (i = 0; i < mt->n_columns; i++) {
1274         size_t column_index = mt->columns[i].column->index;
1275
1276         if (bitmap_is_set(changed, column_index)) {
1277             return true;
1278         }
1279     }
1280
1281     return false;
1282 }
1283
1284 /* Return the efficacy of a row's change to a monitor table.
1285  *
1286  * Please see the block comment above 'ovsdb_monitor_changes_efficacy'
1287  * definition form more information.  */
1288 static enum ovsdb_monitor_changes_efficacy
1289 ovsdb_monitor_changes_classify(enum ovsdb_monitor_selection type,
1290                                const struct ovsdb_monitor_table *mt,
1291                                const unsigned long int *changed)
1292 {
1293     if (type == OJMS_MODIFY &&
1294         !ovsdb_monitor_columns_changed(mt, changed)) {
1295         return OVSDB_CHANGES_NO_EFFECT;
1296     }
1297
1298     if (type == OJMS_MODIFY) {
1299         /* Condition might turn a modify operation to insert or delete */
1300         type |= OJMS_INSERT | OJMS_DELETE;
1301     }
1302
1303     return (mt->select & type)
1304                 ?  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
1305                 :  OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
1306 }
1307
1308 static bool
1309 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
1310                         const struct ovsdb_row *new,
1311                         const unsigned long int *changed,
1312                         void *aux_)
1313 {
1314     struct ovsdb_monitor_aux *aux = aux_;
1315     const struct ovsdb_monitor *m = aux->monitor;
1316     struct ovsdb_table *table = new ? new->table : old->table;
1317     struct ovsdb_monitor_table *mt;
1318     struct ovsdb_monitor_changes *changes;
1319
1320     if (!aux->mt || table != aux->mt->table) {
1321         aux->mt = shash_find_data(&m->tables, table->schema->name);
1322         if (!aux->mt) {
1323             /* We don't care about rows in this table at all.  Tell the caller
1324              * to skip it.  */
1325             return false;
1326         }
1327     }
1328     mt = aux->mt;
1329
1330     enum ovsdb_monitor_selection type =
1331         ovsdb_monitor_row_update_type(false, old, new);
1332     enum ovsdb_monitor_changes_efficacy efficacy =
1333         ovsdb_monitor_changes_classify(type, mt, changed);
1334
1335     HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
1336         if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
1337             ovsdb_monitor_changes_update(old, new, mt, changes);
1338         }
1339     }
1340     if (aux->efficacy < efficacy) {
1341         aux->efficacy = efficacy;
1342     }
1343
1344     return true;
1345 }
1346
1347 void
1348 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
1349 {
1350     struct shash_node *node;
1351
1352     SHASH_FOR_EACH (node, &dbmon->tables) {
1353         struct ovsdb_monitor_table *mt = node->data;
1354
1355         if (mt->select & OJMS_INITIAL) {
1356             struct ovsdb_row *row;
1357             struct ovsdb_monitor_changes *changes;
1358
1359             changes = ovsdb_monitor_table_find_changes(mt, 0);
1360             if (!changes) {
1361                 changes = ovsdb_monitor_table_add_changes(mt, 0);
1362                 HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1363                     ovsdb_monitor_changes_update(NULL, row, mt, changes);
1364                 }
1365             } else {
1366                 changes->n_refs++;
1367             }
1368         }
1369     }
1370 }
1371
1372 void
1373 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
1374                    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
1375                    uint64_t unflushed)
1376 {
1377     struct jsonrpc_monitor_node *jm;
1378
1379     if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
1380         ovsdb_monitor_destroy(dbmon);
1381         return;
1382     }
1383
1384     /* Find and remove the jsonrpc monitor from the list.  */
1385     LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
1386         if (jm->jsonrpc_monitor == jsonrpc_monitor) {
1387             /* Release the tracked changes. */
1388             struct shash_node *node;
1389             SHASH_FOR_EACH (node, &dbmon->tables) {
1390                 struct ovsdb_monitor_table *mt = node->data;
1391                 ovsdb_monitor_table_untrack_changes(mt, unflushed);
1392             }
1393             ovs_list_remove(&jm->node);
1394             free(jm);
1395
1396             /* Destroy ovsdb monitor if this is the last user.  */
1397             if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
1398                 ovsdb_monitor_destroy(dbmon);
1399             }
1400
1401             return;
1402         };
1403     }
1404
1405     /* Should never reach here. jsonrpc_monitor should be on the list.  */
1406     OVS_NOT_REACHED();
1407 }
1408
1409 static bool
1410 ovsdb_monitor_table_equal(const struct ovsdb_monitor_table *a,
1411                           const struct ovsdb_monitor_table *b)
1412 {
1413     size_t i;
1414
1415     ovs_assert(b->n_columns == b->n_monitored_columns);
1416
1417     if ((a->table != b->table) ||
1418         (a->select != b->select) ||
1419         (a->n_monitored_columns != b->n_monitored_columns)) {
1420         return false;
1421     }
1422
1423     /* Compare only monitored columns that must be sorted already */
1424     for (i = 0; i < a->n_monitored_columns; i++) {
1425         if ((a->columns[i].column != b->columns[i].column) ||
1426             (a->columns[i].select != b->columns[i].select)) {
1427             return false;
1428         }
1429     }
1430     return true;
1431 }
1432
1433 static bool
1434 ovsdb_monitor_equal(const struct ovsdb_monitor *a,
1435                     const struct ovsdb_monitor *b)
1436 {
1437     struct shash_node *node;
1438
1439     if (shash_count(&a->tables) != shash_count(&b->tables)) {
1440         return false;
1441     }
1442
1443     SHASH_FOR_EACH(node, &a->tables) {
1444         const struct ovsdb_monitor_table *mta = node->data;
1445         const struct ovsdb_monitor_table *mtb;
1446
1447         mtb = shash_find_data(&b->tables, node->name);
1448         if (!mtb) {
1449             return false;
1450         }
1451
1452         if (!ovsdb_monitor_table_equal(mta, mtb)) {
1453             return false;
1454         }
1455     }
1456
1457     return true;
1458 }
1459
1460 static size_t
1461 ovsdb_monitor_hash(const struct ovsdb_monitor *dbmon, size_t basis)
1462 {
1463     const struct shash_node **nodes;
1464     size_t i, j, n;
1465
1466     nodes = shash_sort(&dbmon->tables);
1467     n = shash_count(&dbmon->tables);
1468
1469     for (i = 0; i < n; i++) {
1470         struct ovsdb_monitor_table *mt = nodes[i]->data;
1471
1472         basis = hash_pointer(mt->table, basis);
1473         basis = hash_3words(mt->select, mt->n_columns, basis);
1474
1475         for (j = 0; j < mt->n_columns; j++) {
1476             basis = hash_pointer(mt->columns[j].column, basis);
1477             basis = hash_2words(mt->columns[j].select, basis);
1478         }
1479     }
1480     free(nodes);
1481
1482     return basis;
1483 }
1484
1485 struct ovsdb_monitor *
1486 ovsdb_monitor_add(struct ovsdb_monitor *new_dbmon)
1487 {
1488     struct ovsdb_monitor *dbmon;
1489     size_t hash;
1490
1491     /* New_dbmon should be associated with only one jsonrpc
1492      * connections.  */
1493     ovs_assert(ovs_list_is_singleton(&new_dbmon->jsonrpc_monitors));
1494
1495     ovsdb_monitor_columns_sort(new_dbmon);
1496
1497     hash = ovsdb_monitor_hash(new_dbmon, 0);
1498     HMAP_FOR_EACH_WITH_HASH(dbmon, hmap_node, hash, &ovsdb_monitors) {
1499         if (ovsdb_monitor_equal(dbmon,  new_dbmon)) {
1500             return dbmon;
1501         }
1502     }
1503
1504     hmap_insert(&ovsdb_monitors, &new_dbmon->hmap_node, hash);
1505     return new_dbmon;
1506 }
1507
1508 static void
1509 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
1510 {
1511     struct shash_node *node;
1512
1513     ovs_list_remove(&dbmon->replica.node);
1514
1515     if (!hmap_node_is_null(&dbmon->hmap_node)) {
1516         hmap_remove(&ovsdb_monitors, &dbmon->hmap_node);
1517     }
1518
1519     ovsdb_monitor_json_cache_flush(dbmon);
1520     hmap_destroy(&dbmon->json_cache);
1521
1522     SHASH_FOR_EACH (node, &dbmon->tables) {
1523         struct ovsdb_monitor_table *mt = node->data;
1524         struct ovsdb_monitor_changes *changes, *next;
1525
1526         HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
1527             hmap_remove(&mt->changes, &changes->hmap_node);
1528             ovsdb_monitor_changes_destroy(changes);
1529         }
1530         hmap_destroy(&mt->changes);
1531         free(mt->columns);
1532         free(mt->columns_index_map);
1533         free(mt);
1534     }
1535     shash_destroy(&dbmon->tables);
1536     free(dbmon);
1537 }
1538
1539 static struct ovsdb_error *
1540 ovsdb_monitor_commit(struct ovsdb_replica *replica,
1541                      const struct ovsdb_txn *txn,
1542                      bool durable OVS_UNUSED)
1543 {
1544     struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
1545     struct ovsdb_monitor_aux aux;
1546
1547     ovsdb_monitor_init_aux(&aux, m);
1548     /* Update ovsdb_monitor's transaction number for
1549      * each transaction, before calling ovsdb_monitor_change_cb().  */
1550     m->n_transactions++;
1551     ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
1552
1553     switch(aux.efficacy) {
1554     case OVSDB_CHANGES_NO_EFFECT:
1555         /* The transaction is ignored by the monitor.
1556          * Roll back the 'n_transactions' as if the transaction
1557          * has never happened. */
1558         m->n_transactions--;
1559         break;
1560     case OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE:
1561         /* Nothing.  */
1562         break;
1563     case  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE:
1564         ovsdb_monitor_json_cache_flush(m);
1565         break;
1566     }
1567
1568     return NULL;
1569 }
1570
1571 static void
1572 ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
1573 {
1574     struct ovsdb_monitor *dbmon = ovsdb_monitor_cast(replica);
1575     struct jsonrpc_monitor_node *jm, *next;
1576
1577     /* Delete all front end monitors. Removing the last front
1578      * end monitor will also destroy the corresponding 'ovsdb_monitor'.
1579      * ovsdb monitor will also be destroied.  */
1580     LIST_FOR_EACH_SAFE(jm, next, node, &dbmon->jsonrpc_monitors) {
1581         ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor);
1582     }
1583 }
1584
1585 /* Add some memory usage statics for monitors into 'usage', for use with
1586  * memory_report().  */
1587 void
1588 ovsdb_monitor_get_memory_usage(struct simap *usage)
1589 {
1590     struct ovsdb_monitor *dbmon;
1591     simap_put(usage, "monitors", hmap_count(&ovsdb_monitors));
1592
1593     HMAP_FOR_EACH(dbmon, hmap_node,  &ovsdb_monitors) {
1594         simap_increase(usage, "json-caches", hmap_count(&dbmon->json_cache));
1595     }
1596 }
1597
1598 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1599     ovsdb_monitor_commit,
1600     ovsdb_monitor_destroy_callback,
1601 };