e910e3fe73c62d63ce1daf54341455488f807a2e
[cascardo/ovs.git] / ovsdb / monitor.c
1 /*
2  * Copyright (c) 2015 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include <errno.h>
20
21 #include "bitmap.h"
22 #include "column.h"
23 #include "openvswitch/dynamic-string.h"
24 #include "json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
28 #include "ovsdb.h"
29 #include "row.h"
30 #include "simap.h"
31 #include "hash.h"
32 #include "table.h"
33 #include "hash.h"
34 #include "timeval.h"
35 #include "transaction.h"
36 #include "jsonrpc-server.h"
37 #include "monitor.h"
38 #include "openvswitch/vlog.h"
39
40
41 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
42 static struct hmap ovsdb_monitors = HMAP_INITIALIZER(&ovsdb_monitors);
43
44 /*  Backend monitor.
45  *
46  *  ovsdb_monitor keep track of the ovsdb changes.
47  */
48
49 /* A collection of tables being monitored. */
50 struct ovsdb_monitor {
51     struct ovsdb_replica replica;
52     struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
53     struct ovs_list jsonrpc_monitors;  /* Contains "jsonrpc_monitor_node"s. */
54     struct ovsdb *db;
55     uint64_t n_transactions;      /* Count number of committed transactions. */
56     struct hmap_node hmap_node;   /* Elements within ovsdb_monitors.  */
57     struct hmap json_cache;       /* Contains "ovsdb_monitor_json_cache_node"s.*/
58 };
59
60 /* A json object of updates between 'from_txn' and 'dbmon->n_transactions'
61  * inclusive.  */
62 struct ovsdb_monitor_json_cache_node {
63     struct hmap_node hmap_node;   /* Elements in json cache. */
64     enum ovsdb_monitor_version version;
65     uint64_t from_txn;
66     struct json *json;            /* Null, or a cloned of json */
67 };
68
69 struct jsonrpc_monitor_node {
70     struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
71     struct ovs_list node;
72 };
73
74 /* A particular column being monitored. */
75 struct ovsdb_monitor_column {
76     const struct ovsdb_column *column;
77     enum ovsdb_monitor_selection select;
78 };
79
80 /* A row that has changed in a monitored table. */
81 struct ovsdb_monitor_row {
82     struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
83     struct uuid uuid;           /* UUID of row that changed. */
84     struct ovsdb_datum *old;    /* Old data, NULL for an inserted row. */
85     struct ovsdb_datum *new;    /* New data, NULL for a deleted row. */
86 };
87
88 /* Contains 'struct ovsdb_monitor_row's for rows that have been
89  * updated but not yet flushed to all the jsonrpc connection.
90  *
91  * 'n_refs' represent the number of jsonrpc connections that have
92  * not received updates. Generate the update for the last jsonprc
93  * connection will also destroy the whole "struct ovsdb_monitor_changes"
94  * object.
95  *
96  * 'transaction' stores the first update's transaction id.
97  * */
98 struct ovsdb_monitor_changes {
99     struct ovsdb_monitor_table *mt;
100     struct hmap rows;
101     int n_refs;
102     uint64_t transaction;
103     struct hmap_node hmap_node;  /* Element in ovsdb_monitor_tables' changes
104                                     hmap.  */
105 };
106
107 /* A particular table being monitored. */
108 struct ovsdb_monitor_table {
109     const struct ovsdb_table *table;
110
111     /* This is the union (bitwise-OR) of the 'select' values in all of the
112      * members of 'columns' below. */
113     enum ovsdb_monitor_selection select;
114
115     /* Columns being monitored. */
116     struct ovsdb_monitor_column *columns;
117     size_t n_columns;
118
119     /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
120     struct hmap changes;
121 };
122
123 typedef struct json *
124 (*compose_row_update_cb_func)(const struct ovsdb_monitor_table *mt,
125                               const struct ovsdb_monitor_row *row,
126                               bool initial, unsigned long int *changed);
127
128 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
129 static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
130     struct ovsdb_monitor_table *mt, uint64_t next_txn);
131 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
132     struct ovsdb_monitor_table *mt, uint64_t unflushed);
133 static void ovsdb_monitor_changes_destroy(
134                                   struct ovsdb_monitor_changes *changes);
135 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
136                                   uint64_t unflushed);
137
138 static uint32_t
139 json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn)
140 {
141     uint32_t hash;
142
143     hash = hash_uint64(version);
144     hash = hash_uint64_basis(from_txn, hash);
145
146     return hash;
147 }
148
149 static struct ovsdb_monitor_json_cache_node *
150 ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
151                                 enum ovsdb_monitor_version version,
152                                 uint64_t from_txn)
153 {
154     struct ovsdb_monitor_json_cache_node *node;
155     uint32_t hash = json_cache_hash(version, from_txn);
156
157     HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) {
158         if (node->from_txn == from_txn && node->version == version) {
159             return node;
160         }
161     }
162
163     return NULL;
164 }
165
166 static void
167 ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon,
168                                 enum ovsdb_monitor_version version,
169                                 uint64_t from_txn, struct json *json)
170 {
171     struct ovsdb_monitor_json_cache_node *node;
172     uint32_t hash = json_cache_hash(version, from_txn);
173
174     node = xmalloc(sizeof *node);
175
176     node->version = version;
177     node->from_txn = from_txn;
178     node->json = json ? json_clone(json) : NULL;
179
180     hmap_insert(&dbmon->json_cache, &node->hmap_node, hash);
181 }
182
183 static void
184 ovsdb_monitor_json_cache_flush(struct ovsdb_monitor *dbmon)
185 {
186     struct ovsdb_monitor_json_cache_node *node;
187
188     HMAP_FOR_EACH_POP(node, hmap_node, &dbmon->json_cache) {
189         json_destroy(node->json);
190         free(node);
191     }
192 }
193
194 static int
195 compare_ovsdb_monitor_column(const void *a_, const void *b_)
196 {
197     const struct ovsdb_monitor_column *a = a_;
198     const struct ovsdb_monitor_column *b = b_;
199
200     return a->column < b->column ? -1 : a->column > b->column;
201 }
202
203 static struct ovsdb_monitor *
204 ovsdb_monitor_cast(struct ovsdb_replica *replica)
205 {
206     ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
207     return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
208 }
209
210 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
211  * given 'uuid', or NULL if there is no such row. */
212 static struct ovsdb_monitor_row *
213 ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
214                                const struct uuid *uuid)
215 {
216     struct ovsdb_monitor_row *row;
217
218     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
219                              &changes->rows) {
220         if (uuid_equals(uuid, &row->uuid)) {
221             return row;
222         }
223     }
224     return NULL;
225 }
226
227 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
228  * copies of the data in 'row' drawn from the columns represented by
229  * mt->columns[].  Returns the array.
230  *
231  * If 'row' is NULL, returns NULL. */
232 static struct ovsdb_datum *
233 clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
234                        const struct ovsdb_row *row)
235 {
236     struct ovsdb_datum *data;
237     size_t i;
238
239     if (!row) {
240         return NULL;
241     }
242
243     data = xmalloc(mt->n_columns * sizeof *data);
244     for (i = 0; i < mt->n_columns; i++) {
245         const struct ovsdb_column *c = mt->columns[i].column;
246         const struct ovsdb_datum *src = &row->fields[c->index];
247         struct ovsdb_datum *dst = &data[i];
248         const struct ovsdb_type *type = &c->type;
249
250         ovsdb_datum_clone(dst, src, type);
251     }
252     return data;
253 }
254
255 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
256  * in 'row' drawn from the columns represented by mt->columns[]. */
257 static void
258 update_monitor_row_data(const struct ovsdb_monitor_table *mt,
259                         const struct ovsdb_row *row,
260                         struct ovsdb_datum *data)
261 {
262     size_t i;
263
264     for (i = 0; i < mt->n_columns; i++) {
265         const struct ovsdb_column *c = mt->columns[i].column;
266         const struct ovsdb_datum *src = &row->fields[c->index];
267         struct ovsdb_datum *dst = &data[i];
268         const struct ovsdb_type *type = &c->type;
269
270         if (!ovsdb_datum_equals(src, dst, type)) {
271             ovsdb_datum_destroy(dst, type);
272             ovsdb_datum_clone(dst, src, type);
273         }
274     }
275 }
276
277 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
278  * from mt->columns[], plus 'data' itself. */
279 static void
280 free_monitor_row_data(const struct ovsdb_monitor_table *mt,
281                       struct ovsdb_datum *data)
282 {
283     if (data) {
284         size_t i;
285
286         for (i = 0; i < mt->n_columns; i++) {
287             const struct ovsdb_column *c = mt->columns[i].column;
288
289             ovsdb_datum_destroy(&data[i], &c->type);
290         }
291         free(data);
292     }
293 }
294
295 /* Frees 'row', which must have been created from 'mt'. */
296 static void
297 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
298                           struct ovsdb_monitor_row *row)
299 {
300     if (row) {
301         free_monitor_row_data(mt, row->old);
302         free_monitor_row_data(mt, row->new);
303         free(row);
304     }
305 }
306
307 void
308 ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
309                                   struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
310 {
311     struct jsonrpc_monitor_node *jm;
312
313     jm = xzalloc(sizeof *jm);
314     jm->jsonrpc_monitor = jsonrpc_monitor;
315     ovs_list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
316 }
317
318 struct ovsdb_monitor *
319 ovsdb_monitor_create(struct ovsdb *db,
320                      struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
321 {
322     struct ovsdb_monitor *dbmon;
323
324     dbmon = xzalloc(sizeof *dbmon);
325
326     ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
327     ovsdb_add_replica(db, &dbmon->replica);
328     ovs_list_init(&dbmon->jsonrpc_monitors);
329     dbmon->db = db;
330     dbmon->n_transactions = 0;
331     shash_init(&dbmon->tables);
332     hmap_node_nullify(&dbmon->hmap_node);
333     hmap_init(&dbmon->json_cache);
334
335     ovsdb_monitor_add_jsonrpc_monitor(dbmon, jsonrpc_monitor);
336     return dbmon;
337 }
338
339 void
340 ovsdb_monitor_add_table(struct ovsdb_monitor *m,
341                         const struct ovsdb_table *table)
342 {
343     struct ovsdb_monitor_table *mt;
344
345     mt = xzalloc(sizeof *mt);
346     mt->table = table;
347     shash_add(&m->tables, table->schema->name, mt);
348     hmap_init(&mt->changes);
349 }
350
351 void
352 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
353                          const struct ovsdb_table *table,
354                          const struct ovsdb_column *column,
355                          enum ovsdb_monitor_selection select,
356                          size_t *allocated_columns)
357 {
358     struct ovsdb_monitor_table *mt;
359     struct ovsdb_monitor_column *c;
360
361     mt = shash_find_data(&dbmon->tables, table->schema->name);
362
363     if (mt->n_columns >= *allocated_columns) {
364         mt->columns = x2nrealloc(mt->columns, allocated_columns,
365                                  sizeof *mt->columns);
366     }
367
368     mt->select |= select;
369     c = &mt->columns[mt->n_columns++];
370     c->column = column;
371     c->select = select;
372 }
373
374 /* Check for duplicated column names. Return the first
375  * duplicated column's name if found. Otherwise return
376  * NULL.  */
377 const char * OVS_WARN_UNUSED_RESULT
378 ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
379                                      const struct ovsdb_table *table)
380 {
381     struct ovsdb_monitor_table *mt;
382     int i;
383
384     mt = shash_find_data(&m->tables, table->schema->name);
385
386     if (mt) {
387         /* Check for duplicate columns. */
388         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
389               compare_ovsdb_monitor_column);
390         for (i = 1; i < mt->n_columns; i++) {
391             if (mt->columns[i].column == mt->columns[i - 1].column) {
392                 return mt->columns[i].column->name;
393             }
394         }
395     }
396
397     return NULL;
398 }
399
400 static struct ovsdb_monitor_changes *
401 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
402                                 uint64_t next_txn)
403 {
404     struct ovsdb_monitor_changes *changes;
405
406     changes = xzalloc(sizeof *changes);
407
408     changes->transaction = next_txn;
409     changes->mt = mt;
410     changes->n_refs = 1;
411     hmap_init(&changes->rows);
412     hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
413
414     return changes;
415 };
416
417 static struct ovsdb_monitor_changes *
418 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
419                                  uint64_t transaction)
420 {
421     struct ovsdb_monitor_changes *changes;
422     size_t hash = hash_uint64(transaction);
423
424     HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
425         if (changes->transaction == transaction) {
426             return changes;
427         }
428     }
429
430     return NULL;
431 }
432
433 /* Stop currently tracking changes to table 'mt' since 'transaction'. */
434 static void
435 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
436                                     uint64_t transaction)
437 {
438     struct ovsdb_monitor_changes *changes =
439                 ovsdb_monitor_table_find_changes(mt, transaction);
440     if (changes) {
441         if (--changes->n_refs == 0) {
442             hmap_remove(&mt->changes, &changes->hmap_node);
443             ovsdb_monitor_changes_destroy(changes);
444         }
445     }
446 }
447
448 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
449  */
450 static void
451 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
452                                   uint64_t transaction)
453 {
454     struct ovsdb_monitor_changes *changes;
455
456     changes = ovsdb_monitor_table_find_changes(mt, transaction);
457     if (changes) {
458         changes->n_refs++;
459     } else {
460         ovsdb_monitor_table_add_changes(mt, transaction);
461     }
462 }
463
464 static void
465 ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
466 {
467     struct ovsdb_monitor_row *row, *next;
468
469     HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
470         hmap_remove(&changes->rows, &row->hmap_node);
471         ovsdb_monitor_row_destroy(changes->mt, row);
472     }
473     hmap_destroy(&changes->rows);
474     free(changes);
475 }
476
477 static enum ovsdb_monitor_selection
478 ovsdb_monitor_row_update_type(bool initial, const bool old, const bool new)
479 {
480     return initial ? OJMS_INITIAL
481             : !old ? OJMS_INSERT
482             : !new ? OJMS_DELETE
483             : OJMS_MODIFY;
484 }
485 static bool
486 ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
487                               const struct ovsdb_monitor_row *row,
488                               enum ovsdb_monitor_selection type,
489                               unsigned long int *changed)
490 {
491     if (!(mt->select & type)) {
492         return true;
493     }
494
495     if (type == OJMS_MODIFY) {
496         size_t i, n_changes;
497
498         n_changes = 0;
499         memset(changed, 0, bitmap_n_bytes(mt->n_columns));
500         for (i = 0; i < mt->n_columns; i++) {
501             const struct ovsdb_column *c = mt->columns[i].column;
502             if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
503                 bitmap_set1(changed, i);
504                 n_changes++;
505             }
506         }
507         if (!n_changes) {
508             /* No actual changes: presumably a row changed and then
509              * changed back later. */
510             return true;
511         }
512     }
513
514     return false;
515 }
516
517 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
518  * 'mt', or NULL if no row update should be sent.
519  *
520  * The caller should specify 'initial' as true if the returned JSON is going to
521  * be used as part of the initial reply to a "monitor" request, false if it is
522  * going to be used as part of an "update" notification.
523  *
524  * 'changed' must be a scratch buffer for internal use that is at least
525  * bitmap_n_bytes(mt->n_columns) bytes long. */
526 static struct json *
527 ovsdb_monitor_compose_row_update(
528     const struct ovsdb_monitor_table *mt,
529     const struct ovsdb_monitor_row *row,
530     bool initial, unsigned long int *changed)
531 {
532     enum ovsdb_monitor_selection type;
533     struct json *old_json, *new_json;
534     struct json *row_json;
535     size_t i;
536
537     type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
538     if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
539         return NULL;
540     }
541
542     row_json = json_object_create();
543     old_json = new_json = NULL;
544     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
545         old_json = json_object_create();
546         json_object_put(row_json, "old", old_json);
547     }
548     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
549         new_json = json_object_create();
550         json_object_put(row_json, "new", new_json);
551     }
552     for (i = 0; i < mt->n_columns; i++) {
553         const struct ovsdb_monitor_column *c = &mt->columns[i];
554
555         if (!(type & c->select)) {
556             /* We don't care about this type of change for this
557              * particular column (but we will care about it for some
558              * other column). */
559             continue;
560         }
561
562         if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
563             || type == OJMS_DELETE) {
564             json_object_put(old_json, c->column->name,
565                             ovsdb_datum_to_json(&row->old[i],
566                                                 &c->column->type));
567         }
568         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
569             json_object_put(new_json, c->column->name,
570                             ovsdb_datum_to_json(&row->new[i],
571                                                 &c->column->type));
572         }
573     }
574
575     return row_json;
576 }
577
578 /* Returns JSON for a <row-update2> (as described in ovsdb-server(1) mapage)
579  * for 'row' within * 'mt', or NULL if no row update should be sent.
580  *
581  * The caller should specify 'initial' as true if the returned JSON is
582  * going to be used as part of the initial reply to a "monitor2" request,
583  * false if it is going to be used as part of an "update2" notification.
584  *
585  * 'changed' must be a scratch buffer for internal use that is at least
586  * bitmap_n_bytes(mt->n_columns) bytes long. */
587 static struct json *
588 ovsdb_monitor_compose_row_update2(
589     const struct ovsdb_monitor_table *mt,
590     const struct ovsdb_monitor_row *row,
591     bool initial, unsigned long int *changed)
592 {
593     enum ovsdb_monitor_selection type;
594     struct json *row_update2, *diff_json;
595     size_t i;
596
597     type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
598     if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
599         return NULL;
600     }
601
602     row_update2 = json_object_create();
603     if (type == OJMS_DELETE) {
604         json_object_put(row_update2, "delete", json_null_create());
605     } else {
606         diff_json = json_object_create();
607         const char *op;
608
609         for (i = 0; i < mt->n_columns; i++) {
610             const struct ovsdb_monitor_column *c = &mt->columns[i];
611
612             if (!(type & c->select)) {
613                 /* We don't care about this type of change for this
614                  * particular column (but we will care about it for some
615                  * other column). */
616                 continue;
617             }
618
619             if (type == OJMS_MODIFY) {
620                 struct ovsdb_datum diff;
621
622                 if (!bitmap_is_set(changed, i)) {
623                     continue;
624                 }
625
626                 ovsdb_datum_diff(&diff ,&row->old[i], &row->new[i],
627                                         &c->column->type);
628                 json_object_put(diff_json, c->column->name,
629                                 ovsdb_datum_to_json(&diff, &c->column->type));
630                 ovsdb_datum_destroy(&diff, &c->column->type);
631             } else {
632                 if (!ovsdb_datum_is_default(&row->new[i], &c->column->type)) {
633                     json_object_put(diff_json, c->column->name,
634                                     ovsdb_datum_to_json(&row->new[i],
635                                                         &c->column->type));
636                 }
637             }
638         }
639
640         op = type == OJMS_INITIAL ? "initial"
641                                   : type == OJMS_MODIFY ? "modify" : "insert";
642         json_object_put(row_update2, op, diff_json);
643     }
644
645     return row_update2;
646 }
647
648 static size_t
649 ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
650 {
651     struct shash_node *node;
652     size_t max_columns = 0;
653
654     SHASH_FOR_EACH (node, &dbmon->tables) {
655         struct ovsdb_monitor_table *mt = node->data;
656
657         max_columns = MAX(max_columns, mt->n_columns);
658     }
659
660     return max_columns;
661 }
662
663 /* Constructs and returns JSON for a <table-updates> object (as described in
664  * RFC 7047) for all the outstanding changes within 'monitor', starting from
665  * 'transaction'.  */
666 static struct json*
667 ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
668                              bool initial, uint64_t transaction,
669                              compose_row_update_cb_func row_update)
670 {
671     struct shash_node *node;
672     struct json *json;
673     size_t max_columns = ovsdb_monitor_max_columns(dbmon);
674     unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
675
676     json = NULL;
677     SHASH_FOR_EACH (node, &dbmon->tables) {
678         struct ovsdb_monitor_table *mt = node->data;
679         struct ovsdb_monitor_row *row, *next;
680         struct ovsdb_monitor_changes *changes;
681         struct json *table_json = NULL;
682
683         changes = ovsdb_monitor_table_find_changes(mt, transaction);
684         if (!changes) {
685             continue;
686         }
687
688         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
689             struct json *row_json;
690
691             row_json = (*row_update)(mt, row, initial, changed);
692             if (row_json) {
693                 char uuid[UUID_LEN + 1];
694
695                 /* Create JSON object for transaction overall. */
696                 if (!json) {
697                     json = json_object_create();
698                 }
699
700                 /* Create JSON object for transaction on this table. */
701                 if (!table_json) {
702                     table_json = json_object_create();
703                     json_object_put(json, mt->table->schema->name, table_json);
704                 }
705
706                 /* Add JSON row to JSON table. */
707                 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
708                 json_object_put(table_json, uuid, row_json);
709             }
710         }
711     }
712     free(changed);
713
714     return json;
715 }
716
717 /* Returns JSON for a <table-updates> object (as described in RFC 7047)
718  * for all the outstanding changes within 'monitor' that starts from
719  * '*unflushed' transaction id.
720  *
721  * The caller should specify 'initial' as true if the returned JSON is going to
722  * be used as part of the initial reply to a "monitor" request, false if it is
723  * going to be used as part of an "update" notification. */
724 struct json *
725 ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon,
726                          bool initial, uint64_t *unflushed_,
727                          enum ovsdb_monitor_version version)
728 {
729     struct ovsdb_monitor_json_cache_node *cache_node;
730     struct shash_node *node;
731     struct json *json;
732     const uint64_t unflushed = *unflushed_;
733     const uint64_t next_unflushed = dbmon->n_transactions + 1;
734
735     /* Return a clone of cached json if one exists. Otherwise,
736      * generate a new one and add it to the cache.  */
737     cache_node = ovsdb_monitor_json_cache_search(dbmon, version, unflushed);
738     if (cache_node) {
739         json = cache_node->json ? json_clone(cache_node->json) : NULL;
740     } else {
741         if (version == OVSDB_MONITOR_V1) {
742             json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
743                                         ovsdb_monitor_compose_row_update);
744         } else {
745             ovs_assert(version == OVSDB_MONITOR_V2);
746             json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
747                                         ovsdb_monitor_compose_row_update2);
748         }
749         ovsdb_monitor_json_cache_insert(dbmon, version, unflushed, json);
750     }
751
752     /* Maintain transaction id of 'changes'. */
753     SHASH_FOR_EACH (node, &dbmon->tables) {
754         struct ovsdb_monitor_table *mt = node->data;
755
756         ovsdb_monitor_table_untrack_changes(mt, unflushed);
757         ovsdb_monitor_table_track_changes(mt, next_unflushed);
758     }
759     *unflushed_ = next_unflushed;
760
761     return json;
762 }
763
764 bool
765 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
766                           uint64_t next_transaction)
767 {
768     ovs_assert(next_transaction <= dbmon->n_transactions + 1);
769     return (next_transaction <= dbmon->n_transactions);
770 }
771
772 void
773 ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
774                                const struct ovsdb_table *table,
775                                enum ovsdb_monitor_selection select)
776 {
777     struct ovsdb_monitor_table * mt;
778
779     mt = shash_find_data(&dbmon->tables, table->schema->name);
780     mt->select |= select;
781 }
782
783  /*
784  * If a row's change type (insert, delete or modify) matches that of
785  * the monitor, they should be sent to the monitor's clients as updates.
786  * Of cause, the monitor should also internally update with this change.
787  *
788  * When a change type does not require client side update, the monitor
789  * may still need to keep track of certain changes in order to generate
790  * correct future updates.  For example, the monitor internal state should
791  * be updated whenever a new row is inserted, in order to generate the
792  * correct initial state, regardless if a insert change type is being
793  * monitored.
794  *
795  * On the other hand, if a transaction only contains changes to columns
796  * that are not monitored, this transaction can be safely ignored by the
797  * monitor.
798  *
799  * Thus, the order of the declaration is important:
800  * 'OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE' always implies
801  * 'OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE', but not vice versa.  */
802 enum ovsdb_monitor_changes_efficacy {
803     OVSDB_CHANGES_NO_EFFECT,                /* Monitor does not care about this
804                                                change.  */
805     OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE,  /* Monitor internal updates. */
806     OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE,  /* Client needs to be updated.  */
807 };
808
809 struct ovsdb_monitor_aux {
810     const struct ovsdb_monitor *monitor;
811     struct ovsdb_monitor_table *mt;
812     enum ovsdb_monitor_changes_efficacy efficacy;
813 };
814
815 static void
816 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
817                        const struct ovsdb_monitor *m)
818 {
819     aux->monitor = m;
820     aux->mt = NULL;
821     aux->efficacy = OVSDB_CHANGES_NO_EFFECT;
822 }
823
824 static void
825 ovsdb_monitor_changes_update(const struct ovsdb_row *old,
826                              const struct ovsdb_row *new,
827                              const struct ovsdb_monitor_table *mt,
828                              struct ovsdb_monitor_changes *changes)
829 {
830     const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
831     struct ovsdb_monitor_row *change;
832
833     change = ovsdb_monitor_changes_row_find(changes, uuid);
834     if (!change) {
835         change = xzalloc(sizeof *change);
836         hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
837         change->uuid = *uuid;
838         change->old = clone_monitor_row_data(mt, old);
839         change->new = clone_monitor_row_data(mt, new);
840     } else {
841         if (new) {
842             update_monitor_row_data(mt, new, change->new);
843         } else {
844             free_monitor_row_data(mt, change->new);
845             change->new = NULL;
846
847             if (!change->old) {
848                 /* This row was added then deleted.  Forget about it. */
849                 hmap_remove(&changes->rows, &change->hmap_node);
850                 free(change);
851             }
852         }
853     }
854 }
855
856 static bool
857 ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
858                               const unsigned long int *changed)
859 {
860     size_t i;
861
862     for (i = 0; i < mt->n_columns; i++) {
863         size_t column_index = mt->columns[i].column->index;
864
865         if (bitmap_is_set(changed, column_index)) {
866             return true;
867         }
868     }
869
870     return false;
871 }
872
873 /* Return the efficacy of a row's change to a monitor table.
874  *
875  * Please see the block comment above 'ovsdb_monitor_changes_efficacy'
876  * definition form more information.  */
877 static enum ovsdb_monitor_changes_efficacy
878 ovsdb_monitor_changes_classify(enum ovsdb_monitor_selection type,
879                                const struct ovsdb_monitor_table *mt,
880                                const unsigned long int *changed)
881 {
882     if (type == OJMS_MODIFY &&
883         !ovsdb_monitor_columns_changed(mt, changed)) {
884         return OVSDB_CHANGES_NO_EFFECT;
885     }
886
887     return (mt->select & type)
888                 ?  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
889                 :  OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
890 }
891
892 static bool
893 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
894                         const struct ovsdb_row *new,
895                         const unsigned long int *changed,
896                         void *aux_)
897 {
898     struct ovsdb_monitor_aux *aux = aux_;
899     const struct ovsdb_monitor *m = aux->monitor;
900     struct ovsdb_table *table = new ? new->table : old->table;
901     struct ovsdb_monitor_table *mt;
902     struct ovsdb_monitor_changes *changes;
903
904     if (!aux->mt || table != aux->mt->table) {
905         aux->mt = shash_find_data(&m->tables, table->schema->name);
906         if (!aux->mt) {
907             /* We don't care about rows in this table at all.  Tell the caller
908              * to skip it.  */
909             return false;
910         }
911     }
912     mt = aux->mt;
913
914     HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
915         enum ovsdb_monitor_changes_efficacy efficacy;
916         enum ovsdb_monitor_selection type;
917
918         type = ovsdb_monitor_row_update_type(false, old, new);
919         efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
920         if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
921             ovsdb_monitor_changes_update(old, new, mt, changes);
922         }
923
924         if (aux->efficacy < efficacy) {
925             aux->efficacy = efficacy;
926         }
927     }
928
929     return true;
930 }
931
932 void
933 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
934 {
935     struct ovsdb_monitor_aux aux;
936     struct shash_node *node;
937
938     ovsdb_monitor_init_aux(&aux, dbmon);
939     SHASH_FOR_EACH (node, &dbmon->tables) {
940         struct ovsdb_monitor_table *mt = node->data;
941
942         if (mt->select & OJMS_INITIAL) {
943             struct ovsdb_row *row;
944             struct ovsdb_monitor_changes *changes;
945
946             changes = ovsdb_monitor_table_find_changes(mt, 0);
947             if (!changes) {
948                 changes = ovsdb_monitor_table_add_changes(mt, 0);
949                 HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
950                     ovsdb_monitor_changes_update(NULL, row, mt, changes);
951                 }
952             } else {
953                 changes->n_refs++;
954             }
955         }
956     }
957 }
958
959 void
960 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
961                    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
962                    uint64_t unflushed)
963 {
964     struct jsonrpc_monitor_node *jm;
965
966     if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
967         ovsdb_monitor_destroy(dbmon);
968         return;
969     }
970
971     /* Find and remove the jsonrpc monitor from the list.  */
972     LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
973         if (jm->jsonrpc_monitor == jsonrpc_monitor) {
974             /* Release the tracked changes. */
975             struct shash_node *node;
976             SHASH_FOR_EACH (node, &dbmon->tables) {
977                 struct ovsdb_monitor_table *mt = node->data;
978                 ovsdb_monitor_table_untrack_changes(mt, unflushed);
979             }
980             ovs_list_remove(&jm->node);
981             free(jm);
982
983             /* Destroy ovsdb monitor if this is the last user.  */
984             if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
985                 ovsdb_monitor_destroy(dbmon);
986             }
987
988             return;
989         };
990     }
991
992     /* Should never reach here. jsonrpc_monitor should be on the list.  */
993     OVS_NOT_REACHED();
994 }
995
996 static bool
997 ovsdb_monitor_table_equal(const struct ovsdb_monitor_table *a,
998                           const struct ovsdb_monitor_table *b)
999 {
1000     size_t i;
1001
1002     if ((a->table != b->table) ||
1003         (a->select != b->select) ||
1004         (a->n_columns != b->n_columns)) {
1005         return false;
1006     }
1007
1008     for (i = 0; i < a->n_columns; i++) {
1009         if ((a->columns[i].column != b->columns[i].column) ||
1010             (a->columns[i].select != b->columns[i].select)) {
1011             return false;
1012         }
1013     }
1014
1015     return true;
1016 }
1017
1018 static bool
1019 ovsdb_monitor_equal(const struct ovsdb_monitor *a,
1020                     const struct ovsdb_monitor *b)
1021 {
1022     struct shash_node *node;
1023
1024     if (shash_count(&a->tables) != shash_count(&b->tables)) {
1025         return false;
1026     }
1027
1028     SHASH_FOR_EACH(node, &a->tables) {
1029         const struct ovsdb_monitor_table *mta = node->data;
1030         const struct ovsdb_monitor_table *mtb;
1031
1032         mtb = shash_find_data(&b->tables, node->name);
1033         if (!mtb) {
1034             return false;
1035         }
1036
1037         if (!ovsdb_monitor_table_equal(mta, mtb)) {
1038             return false;
1039         }
1040     }
1041
1042     return true;
1043 }
1044
1045 static size_t
1046 ovsdb_monitor_hash(const struct ovsdb_monitor *dbmon, size_t basis)
1047 {
1048     const struct shash_node **nodes;
1049     size_t i, j, n;
1050
1051     nodes = shash_sort(&dbmon->tables);
1052     n = shash_count(&dbmon->tables);
1053
1054     for (i = 0; i < n; i++) {
1055         struct ovsdb_monitor_table *mt = nodes[i]->data;
1056
1057         basis = hash_pointer(mt->table, basis);
1058         basis = hash_3words(mt->select, mt->n_columns, basis);
1059
1060         for (j = 0; j < mt->n_columns; j++) {
1061             basis = hash_pointer(mt->columns[j].column, basis);
1062             basis = hash_2words(mt->columns[j].select, basis);
1063         }
1064     }
1065     free(nodes);
1066
1067     return basis;
1068 }
1069
1070 struct ovsdb_monitor *
1071 ovsdb_monitor_add(struct ovsdb_monitor *new_dbmon)
1072 {
1073     struct ovsdb_monitor *dbmon;
1074     size_t hash;
1075
1076     /* New_dbmon should be associated with only one jsonrpc
1077      * connections.  */
1078     ovs_assert(ovs_list_is_singleton(&new_dbmon->jsonrpc_monitors));
1079
1080     hash = ovsdb_monitor_hash(new_dbmon, 0);
1081     HMAP_FOR_EACH_WITH_HASH(dbmon, hmap_node, hash, &ovsdb_monitors) {
1082         if (ovsdb_monitor_equal(dbmon,  new_dbmon)) {
1083             return dbmon;
1084         }
1085     }
1086
1087     hmap_insert(&ovsdb_monitors, &new_dbmon->hmap_node, hash);
1088     return new_dbmon;
1089 }
1090
1091 static void
1092 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
1093 {
1094     struct shash_node *node;
1095
1096     ovs_list_remove(&dbmon->replica.node);
1097
1098     if (!hmap_node_is_null(&dbmon->hmap_node)) {
1099         hmap_remove(&ovsdb_monitors, &dbmon->hmap_node);
1100     }
1101
1102     ovsdb_monitor_json_cache_flush(dbmon);
1103     hmap_destroy(&dbmon->json_cache);
1104
1105     SHASH_FOR_EACH (node, &dbmon->tables) {
1106         struct ovsdb_monitor_table *mt = node->data;
1107         struct ovsdb_monitor_changes *changes, *next;
1108
1109         HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
1110             hmap_remove(&mt->changes, &changes->hmap_node);
1111             ovsdb_monitor_changes_destroy(changes);
1112         }
1113         hmap_destroy(&mt->changes);
1114         free(mt->columns);
1115         free(mt);
1116     }
1117     shash_destroy(&dbmon->tables);
1118     free(dbmon);
1119 }
1120
1121 static struct ovsdb_error *
1122 ovsdb_monitor_commit(struct ovsdb_replica *replica,
1123                      const struct ovsdb_txn *txn,
1124                      bool durable OVS_UNUSED)
1125 {
1126     struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
1127     struct ovsdb_monitor_aux aux;
1128
1129     ovsdb_monitor_init_aux(&aux, m);
1130     /* Update ovsdb_monitor's transaction number for
1131      * each transaction, before calling ovsdb_monitor_change_cb().  */
1132     m->n_transactions++;
1133     ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
1134
1135     switch(aux.efficacy) {
1136     case OVSDB_CHANGES_NO_EFFECT:
1137         /* The transaction is ignored by the monitor.
1138          * Roll back the 'n_transactions' as if the transaction
1139          * has never happened. */
1140         m->n_transactions--;
1141         break;
1142     case OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE:
1143         /* Nothing.  */
1144         break;
1145     case  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE:
1146         ovsdb_monitor_json_cache_flush(m);
1147         break;
1148     }
1149
1150     return NULL;
1151 }
1152
1153 static void
1154 ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
1155 {
1156     struct ovsdb_monitor *dbmon = ovsdb_monitor_cast(replica);
1157     struct jsonrpc_monitor_node *jm, *next;
1158
1159     /* Delete all front end monitors. Removing the last front
1160      * end monitor will also destroy the corresponding 'ovsdb_monitor'.
1161      * ovsdb monitor will also be destroied.  */
1162     LIST_FOR_EACH_SAFE(jm, next, node, &dbmon->jsonrpc_monitors) {
1163         ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor);
1164     }
1165 }
1166
1167 /* Add some memory usage statics for monitors into 'usage', for use with
1168  * memory_report().  */
1169 void
1170 ovsdb_monitor_get_memory_usage(struct simap *usage)
1171 {
1172     struct ovsdb_monitor *dbmon;
1173     simap_put(usage, "monitors", hmap_count(&ovsdb_monitors));
1174
1175     HMAP_FOR_EACH(dbmon, hmap_node,  &ovsdb_monitors) {
1176         simap_increase(usage, "json-caches", hmap_count(&dbmon->json_cache));
1177     }
1178 }
1179
1180 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1181     ovsdb_monitor_commit,
1182     ovsdb_monitor_destroy_callback,
1183 };