ovsdb-monitor: allow multiple of 'ovsdb_monitor_changes' in each ovsdb monitor table
[cascardo/ovs.git] / ovsdb / monitor.c
1 /*
2  * Copyright (c) 2015 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include <errno.h>
20
21 #include "bitmap.h"
22 #include "column.h"
23 #include "dynamic-string.h"
24 #include "json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
28 #include "ovsdb.h"
29 #include "row.h"
30 #include "simap.h"
31 #include "hash.h"
32 #include "table.h"
33 #include "timeval.h"
34 #include "transaction.h"
35 #include "jsonrpc-server.h"
36 #include "monitor.h"
37 #include "openvswitch/vlog.h"
38
39
40 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
41
42 /*  Backend monitor.
43  *
44  *  ovsdb_monitor keep track of the ovsdb changes.
45  */
46
47 /* A collection of tables being monitored. */
48 struct ovsdb_monitor {
49     struct ovsdb_replica replica;
50     struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
51     struct ovs_list jsonrpc_monitors;  /* Contains "jsonrpc_monitor_node"s. */
52     struct ovsdb *db;
53     uint64_t n_transactions;      /* Count number of committed transactions. */
54 };
55
56 struct jsonrpc_monitor_node {
57     struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
58     struct ovs_list node;
59 };
60
61 /* A particular column being monitored. */
62 struct ovsdb_monitor_column {
63     const struct ovsdb_column *column;
64     enum ovsdb_monitor_selection select;
65 };
66
67 /* A row that has changed in a monitored table. */
68 struct ovsdb_monitor_row {
69     struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
70     struct uuid uuid;           /* UUID of row that changed. */
71     struct ovsdb_datum *old;    /* Old data, NULL for an inserted row. */
72     struct ovsdb_datum *new;    /* New data, NULL for a deleted row. */
73 };
74
75 /* Contains 'struct ovsdb_monitor_row's for rows that have been
76  * updated but not yet flushed to all the jsonrpc connection.
77  *
78  * 'n_refs' represent the number of jsonrpc connections that have
79  * not received updates. Generate the update for the last jsonprc
80  * connection will also destroy the whole "struct ovsdb_monitor_changes"
81  * object.
82  *
83  * 'transaction' stores the first update's transaction id.
84  * */
85 struct ovsdb_monitor_changes {
86     struct ovsdb_monitor_table *mt;
87     struct hmap rows;
88     int n_refs;
89     uint64_t transaction;
90     struct hmap_node hmap_node;  /* Element in ovsdb_monitor_tables' changes
91                                     hmap.  */
92 };
93
94 /* A particular table being monitored. */
95 struct ovsdb_monitor_table {
96     const struct ovsdb_table *table;
97
98     /* This is the union (bitwise-OR) of the 'select' values in all of the
99      * members of 'columns' below. */
100     enum ovsdb_monitor_selection select;
101
102     /* Columns being monitored. */
103     struct ovsdb_monitor_column *columns;
104     size_t n_columns;
105
106     /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
107     struct hmap changes;
108 };
109
110 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
111 static void ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
112                                             uint64_t next_txn);
113 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
114     struct ovsdb_monitor_table *mt, uint64_t unflushed);
115 static void ovsdb_monitor_changes_destroy(
116                                   struct ovsdb_monitor_changes *changes);
117 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
118                                   uint64_t unflushed);
119
120 static int
121 compare_ovsdb_monitor_column(const void *a_, const void *b_)
122 {
123     const struct ovsdb_monitor_column *a = a_;
124     const struct ovsdb_monitor_column *b = b_;
125
126     return a->column < b->column ? -1 : a->column > b->column;
127 }
128
129 static struct ovsdb_monitor *
130 ovsdb_monitor_cast(struct ovsdb_replica *replica)
131 {
132     ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
133     return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
134 }
135
136 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
137  * given 'uuid', or NULL if there is no such row. */
138 static struct ovsdb_monitor_row *
139 ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
140                                const struct uuid *uuid)
141 {
142     struct ovsdb_monitor_row *row;
143
144     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
145                              &changes->rows) {
146         if (uuid_equals(uuid, &row->uuid)) {
147             return row;
148         }
149     }
150     return NULL;
151 }
152
153 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
154  * copies of the data in 'row' drawn from the columns represented by
155  * mt->columns[].  Returns the array.
156  *
157  * If 'row' is NULL, returns NULL. */
158 static struct ovsdb_datum *
159 clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
160                        const struct ovsdb_row *row)
161 {
162     struct ovsdb_datum *data;
163     size_t i;
164
165     if (!row) {
166         return NULL;
167     }
168
169     data = xmalloc(mt->n_columns * sizeof *data);
170     for (i = 0; i < mt->n_columns; i++) {
171         const struct ovsdb_column *c = mt->columns[i].column;
172         const struct ovsdb_datum *src = &row->fields[c->index];
173         struct ovsdb_datum *dst = &data[i];
174         const struct ovsdb_type *type = &c->type;
175
176         ovsdb_datum_clone(dst, src, type);
177     }
178     return data;
179 }
180
181 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
182  * in 'row' drawn from the columns represented by mt->columns[]. */
183 static void
184 update_monitor_row_data(const struct ovsdb_monitor_table *mt,
185                         const struct ovsdb_row *row,
186                         struct ovsdb_datum *data)
187 {
188     size_t i;
189
190     for (i = 0; i < mt->n_columns; i++) {
191         const struct ovsdb_column *c = mt->columns[i].column;
192         const struct ovsdb_datum *src = &row->fields[c->index];
193         struct ovsdb_datum *dst = &data[i];
194         const struct ovsdb_type *type = &c->type;
195
196         if (!ovsdb_datum_equals(src, dst, type)) {
197             ovsdb_datum_destroy(dst, type);
198             ovsdb_datum_clone(dst, src, type);
199         }
200     }
201 }
202
203 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
204  * from mt->columns[], plus 'data' itself. */
205 static void
206 free_monitor_row_data(const struct ovsdb_monitor_table *mt,
207                       struct ovsdb_datum *data)
208 {
209     if (data) {
210         size_t i;
211
212         for (i = 0; i < mt->n_columns; i++) {
213             const struct ovsdb_column *c = mt->columns[i].column;
214
215             ovsdb_datum_destroy(&data[i], &c->type);
216         }
217         free(data);
218     }
219 }
220
221 /* Frees 'row', which must have been created from 'mt'. */
222 static void
223 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
224                           struct ovsdb_monitor_row *row)
225 {
226     if (row) {
227         free_monitor_row_data(mt, row->old);
228         free_monitor_row_data(mt, row->new);
229         free(row);
230     }
231 }
232
233 struct ovsdb_monitor *
234 ovsdb_monitor_create(struct ovsdb *db,
235                      struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
236 {
237     struct ovsdb_monitor *dbmon;
238     struct jsonrpc_monitor_node *jm;
239
240     dbmon = xzalloc(sizeof *dbmon);
241
242     ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
243     ovsdb_add_replica(db, &dbmon->replica);
244     list_init(&dbmon->jsonrpc_monitors);
245     dbmon->db = db;
246     dbmon->n_transactions = 0;
247     shash_init(&dbmon->tables);
248
249     jm = xzalloc(sizeof *jm);
250     jm->jsonrpc_monitor = jsonrpc_monitor;
251     list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
252
253     return dbmon;
254 }
255
256 void
257 ovsdb_monitor_add_table(struct ovsdb_monitor *m,
258                         const struct ovsdb_table *table)
259 {
260     struct ovsdb_monitor_table *mt;
261
262     mt = xzalloc(sizeof *mt);
263     mt->table = table;
264     shash_add(&m->tables, table->schema->name, mt);
265     hmap_init(&mt->changes);
266 }
267
268 void
269 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
270                          const struct ovsdb_table *table,
271                          const struct ovsdb_column *column,
272                          enum ovsdb_monitor_selection select,
273                          size_t *allocated_columns)
274 {
275     struct ovsdb_monitor_table *mt;
276     struct ovsdb_monitor_column *c;
277
278     mt = shash_find_data(&dbmon->tables, table->schema->name);
279
280     if (mt->n_columns >= *allocated_columns) {
281         mt->columns = x2nrealloc(mt->columns, allocated_columns,
282                                  sizeof *mt->columns);
283     }
284
285     mt->select |= select;
286     c = &mt->columns[mt->n_columns++];
287     c->column = column;
288     c->select = select;
289 }
290
291 /* Check for duplicated column names. Return the first
292  * duplicated column's name if found. Otherwise return
293  * NULL.  */
294 const char * OVS_WARN_UNUSED_RESULT
295 ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
296                                      const struct ovsdb_table *table)
297 {
298     struct ovsdb_monitor_table *mt;
299     int i;
300
301     mt = shash_find_data(&m->tables, table->schema->name);
302
303     if (mt) {
304         /* Check for duplicate columns. */
305         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
306               compare_ovsdb_monitor_column);
307         for (i = 1; i < mt->n_columns; i++) {
308             if (mt->columns[i].column == mt->columns[i - 1].column) {
309                 return mt->columns[i].column->name;
310             }
311         }
312     }
313
314     return NULL;
315 }
316
317 static void
318 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
319                                 uint64_t next_txn)
320 {
321     struct ovsdb_monitor_changes *changes;
322
323     changes = xzalloc(sizeof *changes);
324
325     changes->transaction = next_txn;
326     changes->mt = mt;
327     changes->n_refs = 1;
328     hmap_init(&changes->rows);
329     hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
330 };
331
332 static struct ovsdb_monitor_changes *
333 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
334                                  uint64_t transaction)
335 {
336     struct ovsdb_monitor_changes *changes;
337     size_t hash = hash_uint64(transaction);
338
339     HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
340         if (changes->transaction == transaction) {
341             return changes;
342         }
343     }
344
345     return NULL;
346 }
347
348 /* Stop currently tracking changes to table 'mt' since 'transaction'.
349  *
350  * Return 'true' if the 'transaction' is being tracked. 'false' otherwise. */
351 static void
352 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
353                                     uint64_t transaction)
354 {
355     struct ovsdb_monitor_changes *changes =
356                 ovsdb_monitor_table_find_changes(mt, transaction);
357     if (changes) {
358         ovs_assert(changes->transaction == transaction);
359         if (--changes->n_refs == 0) {
360             hmap_remove(&mt->changes, &changes->hmap_node);
361             ovsdb_monitor_changes_destroy(changes);
362         }
363     }
364 }
365
366 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
367  */
368 static void
369 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
370                                   uint64_t transaction)
371 {
372     struct ovsdb_monitor_changes *changes;
373
374     changes = ovsdb_monitor_table_find_changes(mt, transaction);
375     if (changes) {
376         changes->n_refs++;
377     } else {
378         ovsdb_monitor_table_add_changes(mt, transaction);
379     }
380 }
381
382 static void
383 ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
384 {
385     struct ovsdb_monitor_row *row, *next;
386
387     HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
388         hmap_remove(&changes->rows, &row->hmap_node);
389         ovsdb_monitor_row_destroy(changes->mt, row);
390     }
391     hmap_destroy(&changes->rows);
392     free(changes);
393 }
394
395 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
396  * 'mt', or NULL if no row update should be sent.
397  *
398  * The caller should specify 'initial' as true if the returned JSON is going to
399  * be used as part of the initial reply to a "monitor" request, false if it is
400  * going to be used as part of an "update" notification.
401  *
402  * 'changed' must be a scratch buffer for internal use that is at least
403  * bitmap_n_bytes(mt->n_columns) bytes long. */
404 static struct json *
405 ovsdb_monitor_compose_row_update(
406     const struct ovsdb_monitor_table *mt,
407     const struct ovsdb_monitor_row *row,
408     bool initial, unsigned long int *changed)
409 {
410     enum ovsdb_monitor_selection type;
411     struct json *old_json, *new_json;
412     struct json *row_json;
413     size_t i;
414
415     type = (initial ? OJMS_INITIAL
416             : !row->old ? OJMS_INSERT
417             : !row->new ? OJMS_DELETE
418             : OJMS_MODIFY);
419     if (!(mt->select & type)) {
420         return NULL;
421     }
422
423     if (type == OJMS_MODIFY) {
424         size_t n_changes;
425
426         n_changes = 0;
427         memset(changed, 0, bitmap_n_bytes(mt->n_columns));
428         for (i = 0; i < mt->n_columns; i++) {
429             const struct ovsdb_column *c = mt->columns[i].column;
430             if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
431                 bitmap_set1(changed, i);
432                 n_changes++;
433             }
434         }
435         if (!n_changes) {
436             /* No actual changes: presumably a row changed and then
437              * changed back later. */
438             return NULL;
439         }
440     }
441
442     row_json = json_object_create();
443     old_json = new_json = NULL;
444     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
445         old_json = json_object_create();
446         json_object_put(row_json, "old", old_json);
447     }
448     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
449         new_json = json_object_create();
450         json_object_put(row_json, "new", new_json);
451     }
452     for (i = 0; i < mt->n_columns; i++) {
453         const struct ovsdb_monitor_column *c = &mt->columns[i];
454
455         if (!(type & c->select)) {
456             /* We don't care about this type of change for this
457              * particular column (but we will care about it for some
458              * other column). */
459             continue;
460         }
461
462         if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
463             || type == OJMS_DELETE) {
464             json_object_put(old_json, c->column->name,
465                             ovsdb_datum_to_json(&row->old[i],
466                                                 &c->column->type));
467         }
468         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
469             json_object_put(new_json, c->column->name,
470                             ovsdb_datum_to_json(&row->new[i],
471                                                 &c->column->type));
472         }
473     }
474
475     return row_json;
476 }
477
478 /* Constructs and returns JSON for a <table-updates> object (as described in
479  * RFC 7047) for all the outstanding changes within 'monitor', and deletes all
480  * the outstanding changes from 'monitor'.  Returns NULL if no update needs to
481  * be sent.
482  *
483  * The caller should specify 'initial' as true if the returned JSON is going to
484  * be used as part of the initial reply to a "monitor" request, false if it is
485  * going to be used as part of an "update" notification.
486  *
487  * 'unflushed' should point to value that is the transaction ID that did
488  * was not updated. The update contains changes between
489  * ['unflushed, ovsdb->n_transcations]. Before the function returns, this
490  * value will be updated to ovsdb->n_transactions + 1, ready for the next
491  * update.  */
492 struct json *
493 ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
494                              bool initial, uint64_t *unflushed)
495 {
496     struct shash_node *node;
497     unsigned long int *changed;
498     struct json *json;
499     size_t max_columns;
500     uint64_t prev_txn = *unflushed;
501     uint64_t next_txn = dbmon->n_transactions + 1;
502
503     max_columns = 0;
504     SHASH_FOR_EACH (node, &dbmon->tables) {
505         struct ovsdb_monitor_table *mt = node->data;
506
507         max_columns = MAX(max_columns, mt->n_columns);
508     }
509     changed = xmalloc(bitmap_n_bytes(max_columns));
510
511     json = NULL;
512     SHASH_FOR_EACH (node, &dbmon->tables) {
513         struct ovsdb_monitor_table *mt = node->data;
514         struct ovsdb_monitor_row *row, *next;
515         struct ovsdb_monitor_changes *changes;
516         struct json *table_json = NULL;
517
518         changes = ovsdb_monitor_table_find_changes(mt, prev_txn);
519         if (!changes) {
520             ovsdb_monitor_table_track_changes(mt, next_txn);
521             continue;
522         }
523
524         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
525             struct json *row_json;
526
527             row_json = ovsdb_monitor_compose_row_update(
528                 mt, row, initial, changed);
529             if (row_json) {
530                 char uuid[UUID_LEN + 1];
531
532                 /* Create JSON object for transaction overall. */
533                 if (!json) {
534                     json = json_object_create();
535                 }
536
537                 /* Create JSON object for transaction on this table. */
538                 if (!table_json) {
539                     table_json = json_object_create();
540                     json_object_put(json, mt->table->schema->name, table_json);
541                 }
542
543                 /* Add JSON row to JSON table. */
544                 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
545                 json_object_put(table_json, uuid, row_json);
546             }
547
548             hmap_remove(&changes->rows, &row->hmap_node);
549             ovsdb_monitor_row_destroy(mt, row);
550         }
551
552         ovsdb_monitor_table_untrack_changes(mt, prev_txn);
553         ovsdb_monitor_table_track_changes(mt, next_txn);
554     }
555
556     *unflushed = next_txn;
557     free(changed);
558     return json;
559 }
560
561 bool
562 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
563                           uint64_t next_transaction)
564 {
565     ovs_assert(next_transaction <= dbmon->n_transactions + 1);
566     return (next_transaction <= dbmon->n_transactions);
567 }
568
569 void
570 ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
571                                const struct ovsdb_table *table,
572                                enum ovsdb_monitor_selection select)
573 {
574     struct ovsdb_monitor_table * mt;
575
576     mt = shash_find_data(&dbmon->tables, table->schema->name);
577     mt->select |= select;
578 }
579
580 struct ovsdb_monitor_aux {
581     const struct ovsdb_monitor *monitor;
582     struct ovsdb_monitor_table *mt;
583 };
584
585 static void
586 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
587                        const struct ovsdb_monitor *m)
588 {
589     aux->monitor = m;
590     aux->mt = NULL;
591 }
592
593 static void
594 ovsdb_monitor_changes_update(const struct ovsdb_row *old,
595                              const struct ovsdb_row *new,
596                              const struct ovsdb_monitor_table *mt,
597                              struct ovsdb_monitor_changes *changes)
598 {
599     const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
600     struct ovsdb_monitor_row *change;
601
602     change = ovsdb_monitor_changes_row_find(changes, uuid);
603     if (!change) {
604         change = xzalloc(sizeof *change);
605         hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
606         change->uuid = *uuid;
607         change->old = clone_monitor_row_data(mt, old);
608         change->new = clone_monitor_row_data(mt, new);
609     } else {
610         if (new) {
611             update_monitor_row_data(mt, new, change->new);
612         } else {
613             free_monitor_row_data(mt, change->new);
614             change->new = NULL;
615
616             if (!change->old) {
617                 /* This row was added then deleted.  Forget about it. */
618                 hmap_remove(&changes->rows, &change->hmap_node);
619                 free(change);
620             }
621         }
622     }
623 }
624
625 static bool
626 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
627                         const struct ovsdb_row *new,
628                         const unsigned long int *changed OVS_UNUSED,
629                         void *aux_)
630 {
631     struct ovsdb_monitor_aux *aux = aux_;
632     const struct ovsdb_monitor *m = aux->monitor;
633     struct ovsdb_table *table = new ? new->table : old->table;
634     struct ovsdb_monitor_table *mt;
635     struct ovsdb_monitor_changes *changes;
636
637     if (!aux->mt || table != aux->mt->table) {
638         aux->mt = shash_find_data(&m->tables, table->schema->name);
639         if (!aux->mt) {
640             /* We don't care about rows in this table at all.  Tell the caller
641              * to skip it.  */
642             return false;
643         }
644     }
645     mt = aux->mt;
646
647     HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
648         ovsdb_monitor_changes_update(old, new, mt, changes);
649     }
650     return true;
651 }
652
653 void
654 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
655 {
656     struct ovsdb_monitor_aux aux;
657     struct shash_node *node;
658
659     ovsdb_monitor_init_aux(&aux, dbmon);
660     SHASH_FOR_EACH (node, &dbmon->tables) {
661         struct ovsdb_monitor_table *mt = node->data;
662
663         if (mt->select & OJMS_INITIAL) {
664             struct ovsdb_row *row;
665
666             if (hmap_is_empty(&mt->changes)) {
667                 ovsdb_monitor_table_add_changes(mt, 0);
668             }
669
670             HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
671                 ovsdb_monitor_change_cb(NULL, row, NULL, &aux);
672             }
673         }
674     }
675 }
676
677 void
678 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
679                    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
680 {
681     struct jsonrpc_monitor_node *jm;
682
683     /* Find and remove the jsonrpc monitor from the list.  */
684     LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
685         if (jm->jsonrpc_monitor == jsonrpc_monitor) {
686             list_remove(&jm->node);
687             free(jm);
688
689             /* Destroy ovsdb monitor if this is the last user.  */
690             if (list_is_empty(&dbmon->jsonrpc_monitors)) {
691                 ovsdb_monitor_destroy(dbmon);
692             }
693
694             return;
695         };
696     }
697
698     /* Should never reach here. jsonrpc_monitor should be on the list.  */
699     OVS_NOT_REACHED();
700 }
701
702 static void
703 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
704 {
705     struct shash_node *node;
706
707     list_remove(&dbmon->replica.node);
708
709     SHASH_FOR_EACH (node, &dbmon->tables) {
710         struct ovsdb_monitor_table *mt = node->data;
711         struct ovsdb_monitor_changes *changes, *next;
712
713         HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
714             hmap_remove(&mt->changes, &changes->hmap_node);
715             ovsdb_monitor_changes_destroy(changes);
716         }
717         free(mt->columns);
718         free(mt);
719     }
720     shash_destroy(&dbmon->tables);
721     free(dbmon);
722 }
723
724 static struct ovsdb_error *
725 ovsdb_monitor_commit(struct ovsdb_replica *replica,
726                      const struct ovsdb_txn *txn,
727                      bool durable OVS_UNUSED)
728 {
729     struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
730     struct ovsdb_monitor_aux aux;
731
732     ovsdb_monitor_init_aux(&aux, m);
733     ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
734     m->n_transactions++;
735
736     return NULL;
737 }
738
739 static void
740 ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
741 {
742     struct ovsdb_monitor *dbmon = ovsdb_monitor_cast(replica);
743     struct jsonrpc_monitor_node *jm, *next;
744
745     /* Delete all front end monitors. Removing the last front
746      * end monitor will also destroy the corresponding 'ovsdb_monitor'.
747      * ovsdb monitor will also be destroied.  */
748     LIST_FOR_EACH_SAFE(jm, next, node, &dbmon->jsonrpc_monitors) {
749         ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor);
750     }
751 }
752
753 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
754     ovsdb_monitor_commit,
755     ovsdb_monitor_destroy_callback,
756 };