5228a0a7717c55ad8ca3591da46031e5434a8c59
[cascardo/ovs.git] / ovsdb / monitor.c
1 /*
2  * Copyright (c) 2015 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include <errno.h>
20
21 #include "bitmap.h"
22 #include "column.h"
23 #include "openvswitch/dynamic-string.h"
24 #include "json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
28 #include "ovsdb.h"
29 #include "row.h"
30 #include "simap.h"
31 #include "hash.h"
32 #include "table.h"
33 #include "hash.h"
34 #include "timeval.h"
35 #include "transaction.h"
36 #include "jsonrpc-server.h"
37 #include "monitor.h"
38 #include "openvswitch/vlog.h"
39
40
41 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
42 static struct hmap ovsdb_monitors = HMAP_INITIALIZER(&ovsdb_monitors);
43
44 /*  Backend monitor.
45  *
46  *  ovsdb_monitor keep track of the ovsdb changes.
47  */
48
49 /* A collection of tables being monitored. */
50 struct ovsdb_monitor {
51     struct ovsdb_replica replica;
52     struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
53     struct ovs_list jsonrpc_monitors;  /* Contains "jsonrpc_monitor_node"s. */
54     struct ovsdb *db;
55     uint64_t n_transactions;      /* Count number of committed transactions. */
56     struct hmap_node hmap_node;   /* Elements within ovsdb_monitors.  */
57     struct hmap json_cache;       /* Contains "ovsdb_monitor_json_cache_node"s.*/
58 };
59
60 /* A json object of updates between 'from_txn' and 'dbmon->n_transactions'
61  * inclusive.  */
62 struct ovsdb_monitor_json_cache_node {
63     struct hmap_node hmap_node;   /* Elements in json cache. */
64     enum ovsdb_monitor_version version;
65     uint64_t from_txn;
66     struct json *json;            /* Null, or a cloned of json */
67 };
68
69 struct jsonrpc_monitor_node {
70     struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
71     struct ovs_list node;
72 };
73
74 /* A particular column being monitored. */
75 struct ovsdb_monitor_column {
76     const struct ovsdb_column *column;
77     enum ovsdb_monitor_selection select;
78     bool monitored;
79 };
80
81 /* A row that has changed in a monitored table. */
82 struct ovsdb_monitor_row {
83     struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
84     struct uuid uuid;           /* UUID of row that changed. */
85     struct ovsdb_datum *old;    /* Old data, NULL for an inserted row. */
86     struct ovsdb_datum *new;    /* New data, NULL for a deleted row. */
87 };
88
89 /* Contains 'struct ovsdb_monitor_row's for rows that have been
90  * updated but not yet flushed to all the jsonrpc connection.
91  *
92  * 'n_refs' represent the number of jsonrpc connections that have
93  * not received updates. Generate the update for the last jsonprc
94  * connection will also destroy the whole "struct ovsdb_monitor_changes"
95  * object.
96  *
97  * 'transaction' stores the first update's transaction id.
98  * */
99 struct ovsdb_monitor_changes {
100     struct ovsdb_monitor_table *mt;
101     struct hmap rows;
102     int n_refs;
103     uint64_t transaction;
104     struct hmap_node hmap_node;  /* Element in ovsdb_monitor_tables' changes
105                                     hmap.  */
106 };
107
108 /* A particular table being monitored. */
109 struct ovsdb_monitor_table {
110     const struct ovsdb_table *table;
111
112     /* This is the union (bitwise-OR) of the 'select' values in all of the
113      * members of 'columns' below. */
114     enum ovsdb_monitor_selection select;
115
116     /* Columns being monitored. */
117     struct ovsdb_monitor_column *columns;
118     size_t n_columns;
119     size_t n_monitored_columns;
120     size_t allocated_columns;
121
122     /* Columns in ovsdb_monitor_row have different indexes then in
123      * ovsdb_row. This field maps between column->index to the index in the
124      * ovsdb_monitor_row. It is used for condition evaluation. */
125     unsigned int *columns_index_map;
126
127     /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
128     struct hmap changes;
129 };
130
131 typedef struct json *
132 (*compose_row_update_cb_func)(const struct ovsdb_monitor_table *mt,
133                               const struct ovsdb_monitor_row *row,
134                               bool initial, unsigned long int *changed);
135
136 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
137 static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
138     struct ovsdb_monitor_table *mt, uint64_t next_txn);
139 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
140     struct ovsdb_monitor_table *mt, uint64_t unflushed);
141 static void ovsdb_monitor_changes_destroy(
142                                   struct ovsdb_monitor_changes *changes);
143 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
144                                   uint64_t unflushed);
145
146 static uint32_t
147 json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn)
148 {
149     uint32_t hash;
150
151     hash = hash_uint64(version);
152     hash = hash_uint64_basis(from_txn, hash);
153
154     return hash;
155 }
156
157 static struct ovsdb_monitor_json_cache_node *
158 ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
159                                 enum ovsdb_monitor_version version,
160                                 uint64_t from_txn)
161 {
162     struct ovsdb_monitor_json_cache_node *node;
163     uint32_t hash = json_cache_hash(version, from_txn);
164
165     HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) {
166         if (node->from_txn == from_txn && node->version == version) {
167             return node;
168         }
169     }
170
171     return NULL;
172 }
173
174 static void
175 ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon,
176                                 enum ovsdb_monitor_version version,
177                                 uint64_t from_txn, struct json *json)
178 {
179     struct ovsdb_monitor_json_cache_node *node;
180     uint32_t hash = json_cache_hash(version, from_txn);
181
182     node = xmalloc(sizeof *node);
183
184     node->version = version;
185     node->from_txn = from_txn;
186     node->json = json ? json_clone(json) : NULL;
187
188     hmap_insert(&dbmon->json_cache, &node->hmap_node, hash);
189 }
190
191 static void
192 ovsdb_monitor_json_cache_flush(struct ovsdb_monitor *dbmon)
193 {
194     struct ovsdb_monitor_json_cache_node *node;
195
196     HMAP_FOR_EACH_POP(node, hmap_node, &dbmon->json_cache) {
197         json_destroy(node->json);
198         free(node);
199     }
200 }
201
202 static int
203 compare_ovsdb_monitor_column(const void *a_, const void *b_)
204 {
205     const struct ovsdb_monitor_column *a = a_;
206     const struct ovsdb_monitor_column *b = b_;
207
208     /* put all monitored columns at the begining */
209     if (a->monitored != b->monitored) {
210         return a->monitored ? -1 : 1;
211     }
212
213     return a->column < b->column ? -1 : a->column > b->column;
214 }
215
216 static struct ovsdb_monitor *
217 ovsdb_monitor_cast(struct ovsdb_replica *replica)
218 {
219     ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
220     return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
221 }
222
223 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
224  * given 'uuid', or NULL if there is no such row. */
225 static struct ovsdb_monitor_row *
226 ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
227                                const struct uuid *uuid)
228 {
229     struct ovsdb_monitor_row *row;
230
231     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
232                              &changes->rows) {
233         if (uuid_equals(uuid, &row->uuid)) {
234             return row;
235         }
236     }
237     return NULL;
238 }
239
240 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
241  * copies of the data in 'row' drawn from the columns represented by
242  * mt->columns[].  Returns the array.
243  *
244  * If 'row' is NULL, returns NULL. */
245 static struct ovsdb_datum *
246 clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
247                        const struct ovsdb_row *row)
248 {
249     struct ovsdb_datum *data;
250     size_t i;
251
252     if (!row) {
253         return NULL;
254     }
255
256     data = xmalloc(mt->n_columns * sizeof *data);
257     for (i = 0; i < mt->n_columns; i++) {
258         const struct ovsdb_column *c = mt->columns[i].column;
259         const struct ovsdb_datum *src = &row->fields[c->index];
260         struct ovsdb_datum *dst = &data[i];
261         const struct ovsdb_type *type = &c->type;
262
263         ovsdb_datum_clone(dst, src, type);
264     }
265     return data;
266 }
267
268 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
269  * in 'row' drawn from the columns represented by mt->columns[]. */
270 static void
271 update_monitor_row_data(const struct ovsdb_monitor_table *mt,
272                         const struct ovsdb_row *row,
273                         struct ovsdb_datum *data)
274 {
275     size_t i;
276
277     for (i = 0; i < mt->n_columns; i++) {
278         const struct ovsdb_column *c = mt->columns[i].column;
279         const struct ovsdb_datum *src = &row->fields[c->index];
280         struct ovsdb_datum *dst = &data[i];
281         const struct ovsdb_type *type = &c->type;
282
283         if (!ovsdb_datum_equals(src, dst, type)) {
284             ovsdb_datum_destroy(dst, type);
285             ovsdb_datum_clone(dst, src, type);
286         }
287     }
288 }
289
290 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
291  * from mt->columns[], plus 'data' itself. */
292 static void
293 free_monitor_row_data(const struct ovsdb_monitor_table *mt,
294                       struct ovsdb_datum *data)
295 {
296     if (data) {
297         size_t i;
298
299         for (i = 0; i < mt->n_columns; i++) {
300             const struct ovsdb_column *c = mt->columns[i].column;
301
302             ovsdb_datum_destroy(&data[i], &c->type);
303         }
304         free(data);
305     }
306 }
307
308 /* Frees 'row', which must have been created from 'mt'. */
309 static void
310 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
311                           struct ovsdb_monitor_row *row)
312 {
313     if (row) {
314         free_monitor_row_data(mt, row->old);
315         free_monitor_row_data(mt, row->new);
316         free(row);
317     }
318 }
319
320 static void
321 ovsdb_monitor_columns_sort(struct ovsdb_monitor *dbmon)
322 {
323     int i;
324     struct shash_node *node;
325
326     SHASH_FOR_EACH (node, &dbmon->tables) {
327         struct ovsdb_monitor_table *mt = node->data;
328
329         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
330               compare_ovsdb_monitor_column);
331         for (i = 0; i < mt->n_columns; i++) {
332             /* re-set index map due to sort */
333             mt->columns_index_map[mt->columns[i].column->index] = i;
334         }
335     }
336 }
337
338 void
339 ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
340                                   struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
341 {
342     struct jsonrpc_monitor_node *jm;
343
344     jm = xzalloc(sizeof *jm);
345     jm->jsonrpc_monitor = jsonrpc_monitor;
346     ovs_list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
347 }
348
349 struct ovsdb_monitor *
350 ovsdb_monitor_create(struct ovsdb *db,
351                      struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
352 {
353     struct ovsdb_monitor *dbmon;
354
355     dbmon = xzalloc(sizeof *dbmon);
356
357     ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
358     ovsdb_add_replica(db, &dbmon->replica);
359     ovs_list_init(&dbmon->jsonrpc_monitors);
360     dbmon->db = db;
361     dbmon->n_transactions = 0;
362     shash_init(&dbmon->tables);
363     hmap_node_nullify(&dbmon->hmap_node);
364     hmap_init(&dbmon->json_cache);
365
366     ovsdb_monitor_add_jsonrpc_monitor(dbmon, jsonrpc_monitor);
367     return dbmon;
368 }
369
370 void
371 ovsdb_monitor_add_table(struct ovsdb_monitor *m,
372                         const struct ovsdb_table *table)
373 {
374     struct ovsdb_monitor_table *mt;
375     int i;
376     size_t n_columns = shash_count(&table->schema->columns);
377
378     mt = xzalloc(sizeof *mt);
379     mt->table = table;
380     shash_add(&m->tables, table->schema->name, mt);
381     hmap_init(&mt->changes);
382     mt->columns_index_map =
383         xmalloc(sizeof *mt->columns_index_map * n_columns);
384     for (i = 0; i < n_columns; i++) {
385         mt->columns_index_map[i] = -1;
386     }
387 }
388
389 const char *
390 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
391                          const struct ovsdb_table *table,
392                          const struct ovsdb_column *column,
393                          enum ovsdb_monitor_selection select,
394                          bool monitored)
395 {
396     struct ovsdb_monitor_table *mt;
397     struct ovsdb_monitor_column *c;
398
399     mt = shash_find_data(&dbmon->tables, table->schema->name);
400
401     /* Check for column duplication. Return duplicated column name. */
402     if (mt->columns_index_map[column->index] != -1) {
403         return column->name;
404     }
405
406     if (mt->n_columns >= mt->allocated_columns) {
407         mt->columns = x2nrealloc(mt->columns, &mt->allocated_columns,
408                                  sizeof *mt->columns);
409     }
410
411     mt->select |= select;
412     mt->columns_index_map[column->index] = mt->n_columns;
413     c = &mt->columns[mt->n_columns++];
414     c->column = column;
415     c->select = select;
416     c->monitored = monitored;
417     if (monitored) {
418         mt->n_monitored_columns++;
419     }
420
421     return NULL;
422 }
423
424 static struct ovsdb_monitor_changes *
425 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
426                                 uint64_t next_txn)
427 {
428     struct ovsdb_monitor_changes *changes;
429
430     changes = xzalloc(sizeof *changes);
431
432     changes->transaction = next_txn;
433     changes->mt = mt;
434     changes->n_refs = 1;
435     hmap_init(&changes->rows);
436     hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
437
438     return changes;
439 };
440
441 static struct ovsdb_monitor_changes *
442 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
443                                  uint64_t transaction)
444 {
445     struct ovsdb_monitor_changes *changes;
446     size_t hash = hash_uint64(transaction);
447
448     HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
449         if (changes->transaction == transaction) {
450             return changes;
451         }
452     }
453
454     return NULL;
455 }
456
457 /* Stop currently tracking changes to table 'mt' since 'transaction'. */
458 static void
459 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
460                                     uint64_t transaction)
461 {
462     struct ovsdb_monitor_changes *changes =
463                 ovsdb_monitor_table_find_changes(mt, transaction);
464     if (changes) {
465         if (--changes->n_refs == 0) {
466             hmap_remove(&mt->changes, &changes->hmap_node);
467             ovsdb_monitor_changes_destroy(changes);
468         }
469     }
470 }
471
472 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
473  */
474 static void
475 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
476                                   uint64_t transaction)
477 {
478     struct ovsdb_monitor_changes *changes;
479
480     changes = ovsdb_monitor_table_find_changes(mt, transaction);
481     if (changes) {
482         changes->n_refs++;
483     } else {
484         ovsdb_monitor_table_add_changes(mt, transaction);
485     }
486 }
487
488 static void
489 ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
490 {
491     struct ovsdb_monitor_row *row, *next;
492
493     HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
494         hmap_remove(&changes->rows, &row->hmap_node);
495         ovsdb_monitor_row_destroy(changes->mt, row);
496     }
497     hmap_destroy(&changes->rows);
498     free(changes);
499 }
500
501 static enum ovsdb_monitor_selection
502 ovsdb_monitor_row_update_type(bool initial, const bool old, const bool new)
503 {
504     return initial ? OJMS_INITIAL
505             : !old ? OJMS_INSERT
506             : !new ? OJMS_DELETE
507             : OJMS_MODIFY;
508 }
509 static bool
510 ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
511                               const struct ovsdb_monitor_row *row,
512                               enum ovsdb_monitor_selection type,
513                               unsigned long int *changed)
514 {
515     if (!(mt->select & type)) {
516         return true;
517     }
518
519     if (type == OJMS_MODIFY) {
520         size_t i, n_changes;
521
522         n_changes = 0;
523         memset(changed, 0, bitmap_n_bytes(mt->n_columns));
524         for (i = 0; i < mt->n_columns; i++) {
525             const struct ovsdb_column *c = mt->columns[i].column;
526             if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
527                 bitmap_set1(changed, i);
528                 n_changes++;
529             }
530         }
531         if (!n_changes) {
532             /* No actual changes: presumably a row changed and then
533              * changed back later. */
534             return true;
535         }
536     }
537
538     return false;
539 }
540
541 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
542  * 'mt', or NULL if no row update should be sent.
543  *
544  * The caller should specify 'initial' as true if the returned JSON is going to
545  * be used as part of the initial reply to a "monitor" request, false if it is
546  * going to be used as part of an "update" notification.
547  *
548  * 'changed' must be a scratch buffer for internal use that is at least
549  * bitmap_n_bytes(mt->n_columns) bytes long. */
550 static struct json *
551 ovsdb_monitor_compose_row_update(
552     const struct ovsdb_monitor_table *mt,
553     const struct ovsdb_monitor_row *row,
554     bool initial, unsigned long int *changed)
555 {
556     enum ovsdb_monitor_selection type;
557     struct json *old_json, *new_json;
558     struct json *row_json;
559     size_t i;
560
561     type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
562     if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
563         return NULL;
564     }
565
566     row_json = json_object_create();
567     old_json = new_json = NULL;
568     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
569         old_json = json_object_create();
570         json_object_put(row_json, "old", old_json);
571     }
572     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
573         new_json = json_object_create();
574         json_object_put(row_json, "new", new_json);
575     }
576     for (i = 0; i < mt->n_monitored_columns; i++) {
577         const struct ovsdb_monitor_column *c = &mt->columns[i];
578
579         if (!c->monitored || !(type & c->select))  {
580             /* We don't care about this type of change for this
581              * particular column (but we will care about it for some
582              * other column). */
583             continue;
584         }
585
586         if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
587             || type == OJMS_DELETE) {
588             json_object_put(old_json, c->column->name,
589                             ovsdb_datum_to_json(&row->old[i],
590                                                 &c->column->type));
591         }
592         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
593             json_object_put(new_json, c->column->name,
594                             ovsdb_datum_to_json(&row->new[i],
595                                                 &c->column->type));
596         }
597     }
598
599     return row_json;
600 }
601
602 /* Returns JSON for a <row-update2> (as described in ovsdb-server(1) mapage)
603  * for 'row' within * 'mt', or NULL if no row update should be sent.
604  *
605  * The caller should specify 'initial' as true if the returned JSON is
606  * going to be used as part of the initial reply to a "monitor2" request,
607  * false if it is going to be used as part of an "update2" notification.
608  *
609  * 'changed' must be a scratch buffer for internal use that is at least
610  * bitmap_n_bytes(mt->n_columns) bytes long. */
611 static struct json *
612 ovsdb_monitor_compose_row_update2(
613     const struct ovsdb_monitor_table *mt,
614     const struct ovsdb_monitor_row *row,
615     bool initial, unsigned long int *changed)
616 {
617     enum ovsdb_monitor_selection type;
618     struct json *row_update2, *diff_json;
619     size_t i;
620
621     type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
622     if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
623         return NULL;
624     }
625
626     row_update2 = json_object_create();
627     if (type == OJMS_DELETE) {
628         json_object_put(row_update2, "delete", json_null_create());
629     } else {
630         diff_json = json_object_create();
631         const char *op;
632
633         for (i = 0; i < mt->n_monitored_columns; i++) {
634             const struct ovsdb_monitor_column *c = &mt->columns[i];
635
636             if (!c->monitored || !(type & c->select))  {
637                 /* We don't care about this type of change for this
638                  * particular column (but we will care about it for some
639                  * other column). */
640                 continue;
641             }
642
643             if (type == OJMS_MODIFY) {
644                 struct ovsdb_datum diff;
645
646                 if (!bitmap_is_set(changed, i)) {
647                     continue;
648                 }
649
650                 ovsdb_datum_diff(&diff ,&row->old[i], &row->new[i],
651                                         &c->column->type);
652                 json_object_put(diff_json, c->column->name,
653                                 ovsdb_datum_to_json(&diff, &c->column->type));
654                 ovsdb_datum_destroy(&diff, &c->column->type);
655             } else {
656                 if (!ovsdb_datum_is_default(&row->new[i], &c->column->type)) {
657                     json_object_put(diff_json, c->column->name,
658                                     ovsdb_datum_to_json(&row->new[i],
659                                                         &c->column->type));
660                 }
661             }
662         }
663
664         op = type == OJMS_INITIAL ? "initial"
665                                   : type == OJMS_MODIFY ? "modify" : "insert";
666         json_object_put(row_update2, op, diff_json);
667     }
668
669     return row_update2;
670 }
671
672 static size_t
673 ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
674 {
675     struct shash_node *node;
676     size_t max_columns = 0;
677
678     SHASH_FOR_EACH (node, &dbmon->tables) {
679         struct ovsdb_monitor_table *mt = node->data;
680
681         max_columns = MAX(max_columns, mt->n_columns);
682     }
683
684     return max_columns;
685 }
686
687 /* Constructs and returns JSON for a <table-updates> object (as described in
688  * RFC 7047) for all the outstanding changes within 'monitor', starting from
689  * 'transaction'.  */
690 static struct json*
691 ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
692                              bool initial, uint64_t transaction,
693                              compose_row_update_cb_func row_update)
694 {
695     struct shash_node *node;
696     struct json *json;
697     size_t max_columns = ovsdb_monitor_max_columns(dbmon);
698     unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
699
700     json = NULL;
701     SHASH_FOR_EACH (node, &dbmon->tables) {
702         struct ovsdb_monitor_table *mt = node->data;
703         struct ovsdb_monitor_row *row, *next;
704         struct ovsdb_monitor_changes *changes;
705         struct json *table_json = NULL;
706
707         changes = ovsdb_monitor_table_find_changes(mt, transaction);
708         if (!changes) {
709             continue;
710         }
711
712         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
713             struct json *row_json;
714
715             row_json = (*row_update)(mt, row, initial, changed);
716             if (row_json) {
717                 char uuid[UUID_LEN + 1];
718
719                 /* Create JSON object for transaction overall. */
720                 if (!json) {
721                     json = json_object_create();
722                 }
723
724                 /* Create JSON object for transaction on this table. */
725                 if (!table_json) {
726                     table_json = json_object_create();
727                     json_object_put(json, mt->table->schema->name, table_json);
728                 }
729
730                 /* Add JSON row to JSON table. */
731                 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
732                 json_object_put(table_json, uuid, row_json);
733             }
734         }
735     }
736     free(changed);
737
738     return json;
739 }
740
741 /* Returns JSON for a <table-updates> object (as described in RFC 7047)
742  * for all the outstanding changes within 'monitor' that starts from
743  * '*unflushed' transaction id.
744  *
745  * The caller should specify 'initial' as true if the returned JSON is going to
746  * be used as part of the initial reply to a "monitor" request, false if it is
747  * going to be used as part of an "update" notification. */
748 struct json *
749 ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon,
750                          bool initial, uint64_t *unflushed_,
751                          enum ovsdb_monitor_version version)
752 {
753     struct ovsdb_monitor_json_cache_node *cache_node;
754     struct shash_node *node;
755     struct json *json;
756     const uint64_t unflushed = *unflushed_;
757     const uint64_t next_unflushed = dbmon->n_transactions + 1;
758
759     /* Return a clone of cached json if one exists. Otherwise,
760      * generate a new one and add it to the cache.  */
761     cache_node = ovsdb_monitor_json_cache_search(dbmon, version, unflushed);
762     if (cache_node) {
763         json = cache_node->json ? json_clone(cache_node->json) : NULL;
764     } else {
765         if (version == OVSDB_MONITOR_V1) {
766             json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
767                                         ovsdb_monitor_compose_row_update);
768         } else {
769             ovs_assert(version == OVSDB_MONITOR_V2);
770             json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
771                                         ovsdb_monitor_compose_row_update2);
772         }
773         ovsdb_monitor_json_cache_insert(dbmon, version, unflushed, json);
774     }
775
776     /* Maintain transaction id of 'changes'. */
777     SHASH_FOR_EACH (node, &dbmon->tables) {
778         struct ovsdb_monitor_table *mt = node->data;
779
780         ovsdb_monitor_table_untrack_changes(mt, unflushed);
781         ovsdb_monitor_table_track_changes(mt, next_unflushed);
782     }
783     *unflushed_ = next_unflushed;
784
785     return json;
786 }
787
788 bool
789 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
790                           uint64_t next_transaction)
791 {
792     ovs_assert(next_transaction <= dbmon->n_transactions + 1);
793     return (next_transaction <= dbmon->n_transactions);
794 }
795
796 void
797 ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
798                                const struct ovsdb_table *table,
799                                enum ovsdb_monitor_selection select)
800 {
801     struct ovsdb_monitor_table * mt;
802
803     mt = shash_find_data(&dbmon->tables, table->schema->name);
804     mt->select |= select;
805 }
806
807  /*
808  * If a row's change type (insert, delete or modify) matches that of
809  * the monitor, they should be sent to the monitor's clients as updates.
810  * Of cause, the monitor should also internally update with this change.
811  *
812  * When a change type does not require client side update, the monitor
813  * may still need to keep track of certain changes in order to generate
814  * correct future updates.  For example, the monitor internal state should
815  * be updated whenever a new row is inserted, in order to generate the
816  * correct initial state, regardless if a insert change type is being
817  * monitored.
818  *
819  * On the other hand, if a transaction only contains changes to columns
820  * that are not monitored, this transaction can be safely ignored by the
821  * monitor.
822  *
823  * Thus, the order of the declaration is important:
824  * 'OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE' always implies
825  * 'OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE', but not vice versa.  */
826 enum ovsdb_monitor_changes_efficacy {
827     OVSDB_CHANGES_NO_EFFECT,                /* Monitor does not care about this
828                                                change.  */
829     OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE,  /* Monitor internal updates. */
830     OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE,  /* Client needs to be updated.  */
831 };
832
833 struct ovsdb_monitor_aux {
834     const struct ovsdb_monitor *monitor;
835     struct ovsdb_monitor_table *mt;
836     enum ovsdb_monitor_changes_efficacy efficacy;
837 };
838
839 static void
840 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
841                        const struct ovsdb_monitor *m)
842 {
843     aux->monitor = m;
844     aux->mt = NULL;
845     aux->efficacy = OVSDB_CHANGES_NO_EFFECT;
846 }
847
848 static void
849 ovsdb_monitor_changes_update(const struct ovsdb_row *old,
850                              const struct ovsdb_row *new,
851                              const struct ovsdb_monitor_table *mt,
852                              struct ovsdb_monitor_changes *changes)
853 {
854     const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
855     struct ovsdb_monitor_row *change;
856
857     change = ovsdb_monitor_changes_row_find(changes, uuid);
858     if (!change) {
859         change = xzalloc(sizeof *change);
860         hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
861         change->uuid = *uuid;
862         change->old = clone_monitor_row_data(mt, old);
863         change->new = clone_monitor_row_data(mt, new);
864     } else {
865         if (new) {
866             update_monitor_row_data(mt, new, change->new);
867         } else {
868             free_monitor_row_data(mt, change->new);
869             change->new = NULL;
870
871             if (!change->old) {
872                 /* This row was added then deleted.  Forget about it. */
873                 hmap_remove(&changes->rows, &change->hmap_node);
874                 free(change);
875             }
876         }
877     }
878 }
879
880 static bool
881 ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
882                               const unsigned long int *changed)
883 {
884     size_t i;
885
886     for (i = 0; i < mt->n_columns; i++) {
887         size_t column_index = mt->columns[i].column->index;
888
889         if (bitmap_is_set(changed, column_index)) {
890             return true;
891         }
892     }
893
894     return false;
895 }
896
897 /* Return the efficacy of a row's change to a monitor table.
898  *
899  * Please see the block comment above 'ovsdb_monitor_changes_efficacy'
900  * definition form more information.  */
901 static enum ovsdb_monitor_changes_efficacy
902 ovsdb_monitor_changes_classify(enum ovsdb_monitor_selection type,
903                                const struct ovsdb_monitor_table *mt,
904                                const unsigned long int *changed)
905 {
906     if (type == OJMS_MODIFY &&
907         !ovsdb_monitor_columns_changed(mt, changed)) {
908         return OVSDB_CHANGES_NO_EFFECT;
909     }
910
911     return (mt->select & type)
912                 ?  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
913                 :  OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
914 }
915
916 static bool
917 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
918                         const struct ovsdb_row *new,
919                         const unsigned long int *changed,
920                         void *aux_)
921 {
922     struct ovsdb_monitor_aux *aux = aux_;
923     const struct ovsdb_monitor *m = aux->monitor;
924     struct ovsdb_table *table = new ? new->table : old->table;
925     struct ovsdb_monitor_table *mt;
926     struct ovsdb_monitor_changes *changes;
927
928     if (!aux->mt || table != aux->mt->table) {
929         aux->mt = shash_find_data(&m->tables, table->schema->name);
930         if (!aux->mt) {
931             /* We don't care about rows in this table at all.  Tell the caller
932              * to skip it.  */
933             return false;
934         }
935     }
936     mt = aux->mt;
937
938     HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
939         enum ovsdb_monitor_changes_efficacy efficacy;
940         enum ovsdb_monitor_selection type;
941
942         type = ovsdb_monitor_row_update_type(false, old, new);
943         efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
944         if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
945             ovsdb_monitor_changes_update(old, new, mt, changes);
946         }
947
948         if (aux->efficacy < efficacy) {
949             aux->efficacy = efficacy;
950         }
951     }
952
953     return true;
954 }
955
956 void
957 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
958 {
959     struct ovsdb_monitor_aux aux;
960     struct shash_node *node;
961
962     ovsdb_monitor_init_aux(&aux, dbmon);
963     SHASH_FOR_EACH (node, &dbmon->tables) {
964         struct ovsdb_monitor_table *mt = node->data;
965
966         if (mt->select & OJMS_INITIAL) {
967             struct ovsdb_row *row;
968             struct ovsdb_monitor_changes *changes;
969
970             changes = ovsdb_monitor_table_find_changes(mt, 0);
971             if (!changes) {
972                 changes = ovsdb_monitor_table_add_changes(mt, 0);
973                 HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
974                     ovsdb_monitor_changes_update(NULL, row, mt, changes);
975                 }
976             } else {
977                 changes->n_refs++;
978             }
979         }
980     }
981 }
982
983 void
984 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
985                    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
986                    uint64_t unflushed)
987 {
988     struct jsonrpc_monitor_node *jm;
989
990     if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
991         ovsdb_monitor_destroy(dbmon);
992         return;
993     }
994
995     /* Find and remove the jsonrpc monitor from the list.  */
996     LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
997         if (jm->jsonrpc_monitor == jsonrpc_monitor) {
998             /* Release the tracked changes. */
999             struct shash_node *node;
1000             SHASH_FOR_EACH (node, &dbmon->tables) {
1001                 struct ovsdb_monitor_table *mt = node->data;
1002                 ovsdb_monitor_table_untrack_changes(mt, unflushed);
1003             }
1004             ovs_list_remove(&jm->node);
1005             free(jm);
1006
1007             /* Destroy ovsdb monitor if this is the last user.  */
1008             if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
1009                 ovsdb_monitor_destroy(dbmon);
1010             }
1011
1012             return;
1013         };
1014     }
1015
1016     /* Should never reach here. jsonrpc_monitor should be on the list.  */
1017     OVS_NOT_REACHED();
1018 }
1019
1020 static bool
1021 ovsdb_monitor_table_equal(const struct ovsdb_monitor_table *a,
1022                           const struct ovsdb_monitor_table *b)
1023 {
1024     size_t i;
1025
1026     ovs_assert(b->n_columns == b->n_monitored_columns);
1027
1028     if ((a->table != b->table) ||
1029         (a->select != b->select) ||
1030         (a->n_monitored_columns != b->n_monitored_columns)) {
1031         return false;
1032     }
1033
1034     /* Compare only monitored columns that must be sorted already */
1035     for (i = 0; i < a->n_monitored_columns; i++) {
1036         if ((a->columns[i].column != b->columns[i].column) ||
1037             (a->columns[i].select != b->columns[i].select)) {
1038             return false;
1039         }
1040     }
1041     return true;
1042 }
1043
1044 static bool
1045 ovsdb_monitor_equal(const struct ovsdb_monitor *a,
1046                     const struct ovsdb_monitor *b)
1047 {
1048     struct shash_node *node;
1049
1050     if (shash_count(&a->tables) != shash_count(&b->tables)) {
1051         return false;
1052     }
1053
1054     SHASH_FOR_EACH(node, &a->tables) {
1055         const struct ovsdb_monitor_table *mta = node->data;
1056         const struct ovsdb_monitor_table *mtb;
1057
1058         mtb = shash_find_data(&b->tables, node->name);
1059         if (!mtb) {
1060             return false;
1061         }
1062
1063         if (!ovsdb_monitor_table_equal(mta, mtb)) {
1064             return false;
1065         }
1066     }
1067
1068     return true;
1069 }
1070
1071 static size_t
1072 ovsdb_monitor_hash(const struct ovsdb_monitor *dbmon, size_t basis)
1073 {
1074     const struct shash_node **nodes;
1075     size_t i, j, n;
1076
1077     nodes = shash_sort(&dbmon->tables);
1078     n = shash_count(&dbmon->tables);
1079
1080     for (i = 0; i < n; i++) {
1081         struct ovsdb_monitor_table *mt = nodes[i]->data;
1082
1083         basis = hash_pointer(mt->table, basis);
1084         basis = hash_3words(mt->select, mt->n_columns, basis);
1085
1086         for (j = 0; j < mt->n_columns; j++) {
1087             basis = hash_pointer(mt->columns[j].column, basis);
1088             basis = hash_2words(mt->columns[j].select, basis);
1089         }
1090     }
1091     free(nodes);
1092
1093     return basis;
1094 }
1095
1096 struct ovsdb_monitor *
1097 ovsdb_monitor_add(struct ovsdb_monitor *new_dbmon)
1098 {
1099     struct ovsdb_monitor *dbmon;
1100     size_t hash;
1101
1102     /* New_dbmon should be associated with only one jsonrpc
1103      * connections.  */
1104     ovs_assert(ovs_list_is_singleton(&new_dbmon->jsonrpc_monitors));
1105
1106     ovsdb_monitor_columns_sort(new_dbmon);
1107
1108     hash = ovsdb_monitor_hash(new_dbmon, 0);
1109     HMAP_FOR_EACH_WITH_HASH(dbmon, hmap_node, hash, &ovsdb_monitors) {
1110         if (ovsdb_monitor_equal(dbmon,  new_dbmon)) {
1111             return dbmon;
1112         }
1113     }
1114
1115     hmap_insert(&ovsdb_monitors, &new_dbmon->hmap_node, hash);
1116     return new_dbmon;
1117 }
1118
1119 static void
1120 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
1121 {
1122     struct shash_node *node;
1123
1124     ovs_list_remove(&dbmon->replica.node);
1125
1126     if (!hmap_node_is_null(&dbmon->hmap_node)) {
1127         hmap_remove(&ovsdb_monitors, &dbmon->hmap_node);
1128     }
1129
1130     ovsdb_monitor_json_cache_flush(dbmon);
1131     hmap_destroy(&dbmon->json_cache);
1132
1133     SHASH_FOR_EACH (node, &dbmon->tables) {
1134         struct ovsdb_monitor_table *mt = node->data;
1135         struct ovsdb_monitor_changes *changes, *next;
1136
1137         HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
1138             hmap_remove(&mt->changes, &changes->hmap_node);
1139             ovsdb_monitor_changes_destroy(changes);
1140         }
1141         hmap_destroy(&mt->changes);
1142         free(mt->columns);
1143         free(mt->columns_index_map);
1144         free(mt);
1145     }
1146     shash_destroy(&dbmon->tables);
1147     free(dbmon);
1148 }
1149
1150 static struct ovsdb_error *
1151 ovsdb_monitor_commit(struct ovsdb_replica *replica,
1152                      const struct ovsdb_txn *txn,
1153                      bool durable OVS_UNUSED)
1154 {
1155     struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
1156     struct ovsdb_monitor_aux aux;
1157
1158     ovsdb_monitor_init_aux(&aux, m);
1159     /* Update ovsdb_monitor's transaction number for
1160      * each transaction, before calling ovsdb_monitor_change_cb().  */
1161     m->n_transactions++;
1162     ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
1163
1164     switch(aux.efficacy) {
1165     case OVSDB_CHANGES_NO_EFFECT:
1166         /* The transaction is ignored by the monitor.
1167          * Roll back the 'n_transactions' as if the transaction
1168          * has never happened. */
1169         m->n_transactions--;
1170         break;
1171     case OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE:
1172         /* Nothing.  */
1173         break;
1174     case  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE:
1175         ovsdb_monitor_json_cache_flush(m);
1176         break;
1177     }
1178
1179     return NULL;
1180 }
1181
1182 static void
1183 ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
1184 {
1185     struct ovsdb_monitor *dbmon = ovsdb_monitor_cast(replica);
1186     struct jsonrpc_monitor_node *jm, *next;
1187
1188     /* Delete all front end monitors. Removing the last front
1189      * end monitor will also destroy the corresponding 'ovsdb_monitor'.
1190      * ovsdb monitor will also be destroied.  */
1191     LIST_FOR_EACH_SAFE(jm, next, node, &dbmon->jsonrpc_monitors) {
1192         ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor);
1193     }
1194 }
1195
1196 /* Add some memory usage statics for monitors into 'usage', for use with
1197  * memory_report().  */
1198 void
1199 ovsdb_monitor_get_memory_usage(struct simap *usage)
1200 {
1201     struct ovsdb_monitor *dbmon;
1202     simap_put(usage, "monitors", hmap_count(&ovsdb_monitors));
1203
1204     HMAP_FOR_EACH(dbmon, hmap_node,  &ovsdb_monitors) {
1205         simap_increase(usage, "json-caches", hmap_count(&dbmon->json_cache));
1206     }
1207 }
1208
1209 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1210     ovsdb_monitor_commit,
1211     ovsdb_monitor_destroy_callback,
1212 };