ovsdb-server: Fix various memory leaks.
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <assert.h>
21 #include <errno.h>
22
23 #include "column.h"
24 #include "json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
28 #include "ovsdb.h"
29 #include "reconnect.h"
30 #include "row.h"
31 #include "stream.h"
32 #include "table.h"
33 #include "timeval.h"
34 #include "transaction.h"
35 #include "trigger.h"
36
37 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
38 #include "vlog.h"
39
40 struct ovsdb_jsonrpc_remote;
41 struct ovsdb_jsonrpc_session;
42
43 /* Message rate-limiting. */
44 struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
45
46 /* Sessions. */
47 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
48     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
49 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
50 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
51 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
52
53 /* Triggers. */
54 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
55                                          struct json *id, struct json *params);
56 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
57     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
58 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
59 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
60 static void ovsdb_jsonrpc_trigger_complete_done(
61     struct ovsdb_jsonrpc_session *);
62
63 /* Monitors. */
64 static struct json *ovsdb_jsonrpc_monitor_create(
65     struct ovsdb_jsonrpc_session *, struct json *params);
66 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
67     struct ovsdb_jsonrpc_session *,
68     struct json_array *params,
69     const struct json *request_id);
70 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
71 \f
72 /* JSON-RPC database server. */
73
74 struct ovsdb_jsonrpc_server {
75     struct ovsdb *db;
76     unsigned int n_sessions, max_sessions;
77     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
78 };
79
80 /* A configured remote.  This is either a passive stream listener plus a list
81  * of the currently connected sessions, or a list of exactly one active
82  * session. */
83 struct ovsdb_jsonrpc_remote {
84     struct ovsdb_jsonrpc_server *server;
85     struct pstream *listener;   /* Listener, if passive. */
86     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
87 };
88
89 static void ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *,
90                                             const char *name);
91 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
92
93 struct ovsdb_jsonrpc_server *
94 ovsdb_jsonrpc_server_create(struct ovsdb *db)
95 {
96     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
97     server->db = db;
98     server->max_sessions = 64;
99     shash_init(&server->remotes);
100     return server;
101 }
102
103 /* Sets 'svr''s current set of remotes to the names in 'new_remotes'.  The data
104  * values in 'new_remotes' are ignored.
105  *
106  * A remote is an active or passive stream connection method, e.g. "pssl:" or
107  * "tcp:1.2.3.4". */
108 void
109 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
110                                  const struct shash *new_remotes)
111 {
112     struct shash_node *node, *next;
113
114     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
115         if (!shash_find(new_remotes, node->name)) {
116             ovsdb_jsonrpc_server_del_remote(node);
117         }
118     }
119     SHASH_FOR_EACH (node, new_remotes) {
120         if (!shash_find(&svr->remotes, node->name)) {
121             ovsdb_jsonrpc_server_add_remote(svr, node->name);
122         }
123     }
124 }
125
126 static void
127 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
128                                 const char *name)
129 {
130     struct ovsdb_jsonrpc_remote *remote;
131     struct pstream *listener;
132     int error;
133
134     error = pstream_open(name, &listener);
135     if (error && error != EAFNOSUPPORT) {
136         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
137         return;
138     }
139
140     remote = xmalloc(sizeof *remote);
141     remote->server = svr;
142     remote->listener = listener;
143     list_init(&remote->sessions);
144     shash_add(&svr->remotes, name, remote);
145
146     if (!listener) {
147         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
148     }
149 }
150
151 static void
152 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
153 {
154     struct ovsdb_jsonrpc_remote *remote = node->data;
155
156     ovsdb_jsonrpc_session_close_all(remote);
157     pstream_close(remote->listener);
158     shash_delete(&remote->server->remotes, node);
159     free(remote);
160 }
161
162 void
163 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
164 {
165     struct shash_node *node;
166
167     SHASH_FOR_EACH (node, &svr->remotes) {
168         struct ovsdb_jsonrpc_remote *remote = node->data;
169
170         if (remote->listener && svr->n_sessions < svr->max_sessions) {
171             struct stream *stream;
172             int error;
173
174             error = pstream_accept(remote->listener, &stream);
175             if (!error) {
176                 struct jsonrpc_session *js;
177                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream));
178                 ovsdb_jsonrpc_session_create(remote, js);
179             } else if (error != EAGAIN) {
180                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
181                              pstream_get_name(remote->listener),
182                              strerror(error));
183             }
184         }
185
186         ovsdb_jsonrpc_session_run_all(remote);
187     }
188 }
189
190 void
191 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
192 {
193     struct shash_node *node;
194
195     SHASH_FOR_EACH (node, &svr->remotes) {
196         struct ovsdb_jsonrpc_remote *remote = node->data;
197
198         if (remote->listener && svr->n_sessions < svr->max_sessions) {
199             pstream_wait(remote->listener);
200         }
201
202         ovsdb_jsonrpc_session_wait_all(remote);
203     }
204 }
205 \f
206 /* JSON-RPC database server session. */
207
208 struct ovsdb_jsonrpc_session {
209     struct ovsdb_jsonrpc_remote *remote;
210     struct list node;           /* Element in remote's sessions list. */
211
212     /* Triggers. */
213     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
214     struct list completions;    /* Completed triggers. */
215
216     /* Monitors. */
217     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
218
219     /* Network connectivity. */
220     struct jsonrpc_session *js;  /* JSON-RPC session. */
221     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
222 };
223
224 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
225 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
226 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
227 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
228                                              struct jsonrpc_msg *);
229 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
230                                              struct jsonrpc_msg *);
231
232 static struct ovsdb_jsonrpc_session *
233 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
234                              struct jsonrpc_session *js)
235 {
236     struct ovsdb_jsonrpc_session *s;
237
238     s = xzalloc(sizeof *s);
239     s->remote = remote;
240     list_push_back(&remote->sessions, &s->node);
241     hmap_init(&s->triggers);
242     hmap_init(&s->monitors);
243     list_init(&s->completions);
244     s->js = js;
245     s->js_seqno = jsonrpc_session_get_seqno(js);
246
247     remote->server->n_sessions++;
248
249     return s;
250 }
251
252 static void
253 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
254 {
255     ovsdb_jsonrpc_monitor_remove_all(s);
256     jsonrpc_session_close(s->js);
257     list_remove(&s->node);
258     s->remote->server->n_sessions--;
259     free(s);
260 }
261
262 static int
263 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
264 {
265     jsonrpc_session_run(s->js);
266     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
267         s->js_seqno = jsonrpc_session_get_seqno(s->js);
268         ovsdb_jsonrpc_trigger_complete_all(s);
269         ovsdb_jsonrpc_monitor_remove_all(s);
270     }
271
272     ovsdb_jsonrpc_trigger_complete_done(s);
273
274     if (!jsonrpc_session_get_backlog(s->js)) {
275         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
276         if (msg) {
277             if (msg->type == JSONRPC_REQUEST) {
278                 ovsdb_jsonrpc_session_got_request(s, msg);
279             } else if (msg->type == JSONRPC_NOTIFY) {
280                 ovsdb_jsonrpc_session_got_notify(s, msg);
281             } else {
282                 VLOG_WARN("%s: received unexpected %s message",
283                           jsonrpc_session_get_name(s->js),
284                           jsonrpc_msg_type_to_string(msg->type));
285                 jsonrpc_session_force_reconnect(s->js);
286                 jsonrpc_msg_destroy(msg);
287             }
288         }
289     }
290     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
291 }
292
293 static void
294 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
295 {
296     struct ovsdb_jsonrpc_session *s, *next;
297
298     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
299                         &remote->sessions) {
300         int error = ovsdb_jsonrpc_session_run(s);
301         if (error) {
302             ovsdb_jsonrpc_session_close(s);
303         }
304     }
305 }
306
307 static void
308 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
309 {
310     jsonrpc_session_wait(s->js);
311     if (!jsonrpc_session_get_backlog(s->js)) {
312         jsonrpc_session_recv_wait(s->js);
313     }
314 }
315
316 static void
317 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
318 {
319     struct ovsdb_jsonrpc_session *s;
320
321     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &remote->sessions) {
322         ovsdb_jsonrpc_session_wait(s);
323     }
324 }
325
326 static void
327 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
328 {
329     struct ovsdb_jsonrpc_session *s, *next;
330
331     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
332                         &remote->sessions) {
333         ovsdb_jsonrpc_session_close(s);
334     }
335 }
336
337 static struct jsonrpc_msg *
338 execute_transaction(struct ovsdb_jsonrpc_session *s,
339                     struct jsonrpc_msg *request)
340 {
341     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
342     request->id = NULL;
343     request->params = NULL;
344     jsonrpc_msg_destroy(request);
345     return NULL;
346 }
347
348 static void
349 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
350                                   struct jsonrpc_msg *request)
351 {
352     struct jsonrpc_msg *reply;
353
354     if (!strcmp(request->method, "transact")) {
355         reply = execute_transaction(s, request);
356     } else if (!strcmp(request->method, "monitor")) {
357         reply = jsonrpc_create_reply(
358             ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
359     } else if (!strcmp(request->method, "monitor_cancel")) {
360         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
361                                              request->id);
362     } else if (!strcmp(request->method, "get_schema")) {
363         reply = jsonrpc_create_reply(
364             ovsdb_schema_to_json(s->remote->server->db->schema), request->id);
365     } else if (!strcmp(request->method, "echo")) {
366         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
367     } else {
368         reply = jsonrpc_create_error(json_string_create("unknown method"),
369                                      request->id);
370     }
371
372     if (reply) {
373         jsonrpc_msg_destroy(request);
374         jsonrpc_session_send(s->js, reply);
375     }
376 }
377
378 static void
379 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
380 {
381     if (json_array(request->params)->n == 1) {
382         struct ovsdb_jsonrpc_trigger *t;
383         struct json *id;
384
385         id = request->params->u.array.elems[0];
386         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
387         if (t) {
388             ovsdb_jsonrpc_trigger_complete(t);
389         }
390     }
391 }
392
393 static void
394 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
395                                  struct jsonrpc_msg *request)
396 {
397     if (!strcmp(request->method, "cancel")) {
398         execute_cancel(s, request);
399     }
400     jsonrpc_msg_destroy(request);
401 }
402 \f
403 /* JSON-RPC database server triggers.
404  *
405  * (Every transaction is treated as a trigger even if it doesn't actually have
406  * any "wait" operations.) */
407
408 struct ovsdb_jsonrpc_trigger {
409     struct ovsdb_trigger trigger;
410     struct ovsdb_jsonrpc_session *session;
411     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
412     struct json *id;
413 };
414
415 static void
416 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
417                              struct json *id, struct json *params)
418 {
419     struct ovsdb_jsonrpc_trigger *t;
420     size_t hash;
421
422     /* Check for duplicate ID. */
423     hash = json_hash(id, 0);
424     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
425     if (t) {
426         struct jsonrpc_msg *msg;
427
428         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
429                                    id);
430         jsonrpc_session_send(s->js, msg);
431         json_destroy(id);
432         json_destroy(params);
433         return;
434     }
435
436     /* Insert into trigger table. */
437     t = xmalloc(sizeof *t);
438     ovsdb_trigger_init(s->remote->server->db,
439                        &t->trigger, params, &s->completions,
440                        time_msec());
441     t->session = s;
442     t->id = id;
443     hmap_insert(&s->triggers, &t->hmap_node, hash);
444
445     /* Complete early if possible. */
446     if (ovsdb_trigger_is_complete(&t->trigger)) {
447         ovsdb_jsonrpc_trigger_complete(t);
448     }
449 }
450
451 static struct ovsdb_jsonrpc_trigger *
452 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
453                            const struct json *id, size_t hash)
454 {
455     struct ovsdb_jsonrpc_trigger *t;
456
457     HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
458                              &s->triggers) {
459         if (json_equal(t->id, id)) {
460             return t;
461         }
462     }
463
464     return NULL;
465 }
466
467 static void
468 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
469 {
470     struct ovsdb_jsonrpc_session *s = t->session;
471
472     if (jsonrpc_session_is_connected(s->js)) {
473         struct jsonrpc_msg *reply;
474         struct json *result;
475
476         result = ovsdb_trigger_steal_result(&t->trigger);
477         if (result) {
478             reply = jsonrpc_create_reply(result, t->id);
479         } else {
480             reply = jsonrpc_create_error(json_string_create("canceled"),
481                                          t->id);
482         }
483         jsonrpc_session_send(s->js, reply);
484     }
485
486     json_destroy(t->id);
487     ovsdb_trigger_destroy(&t->trigger);
488     hmap_remove(&s->triggers, &t->hmap_node);
489     free(t);
490 }
491
492 static void
493 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
494 {
495     struct ovsdb_jsonrpc_trigger *t, *next;
496     HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
497                         &s->triggers) {
498         ovsdb_jsonrpc_trigger_complete(t);
499     }
500 }
501
502 static void
503 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
504 {
505     while (!list_is_empty(&s->completions)) {
506         struct ovsdb_jsonrpc_trigger *t
507             = CONTAINER_OF(s->completions.next,
508                            struct ovsdb_jsonrpc_trigger, trigger.node);
509         ovsdb_jsonrpc_trigger_complete(t);
510     }
511 }
512 \f
513 /* JSON-RPC database table monitors. */
514
515 enum ovsdb_jsonrpc_monitor_selection {
516     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
517     OJMS_INSERT = 1 << 1,       /* New rows. */
518     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
519     OJMS_MODIFY = 1 << 3        /* Modified rows. */
520 };
521
522 struct ovsdb_jsonrpc_monitor_table {
523     const struct ovsdb_table *table;
524     enum ovsdb_jsonrpc_monitor_selection select;
525     struct ovsdb_column_set columns;
526 };
527
528 struct ovsdb_jsonrpc_monitor {
529     struct ovsdb_replica replica;
530     struct ovsdb_jsonrpc_session *session;
531     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
532
533     struct json *monitor_id;
534     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
535 };
536
537 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
538
539 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
540     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
541 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
542 static struct json *ovsdb_jsonrpc_monitor_get_initial(
543     const struct ovsdb_jsonrpc_monitor *);
544
545 static bool
546 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
547 {
548     const struct json *json;
549
550     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
551     return json ? json_boolean(json) : default_value;
552 }
553
554 struct ovsdb_jsonrpc_monitor *
555 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
556                            const struct json *monitor_id)
557 {
558     struct ovsdb_jsonrpc_monitor *m;
559
560     HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node,
561                              json_hash(monitor_id, 0), &s->monitors) {
562         if (json_equal(m->monitor_id, monitor_id)) {
563             return m;
564         }
565     }
566
567     return NULL;
568 }
569
570 static struct json *
571 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
572                              struct json *params)
573 {
574     struct ovsdb_jsonrpc_monitor *m = NULL;
575     struct json *monitor_id, *monitor_requests;
576     struct ovsdb_error *error = NULL;
577     struct shash_node *node;
578     struct json *json;
579
580     if (json_array(params)->n != 2) {
581         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
582         goto error;
583     }
584     monitor_id = params->u.array.elems[0];
585     monitor_requests = params->u.array.elems[1];
586     if (monitor_requests->type != JSON_OBJECT) {
587         error = ovsdb_syntax_error(monitor_requests, NULL,
588                                    "monitor-requests must be object");
589         goto error;
590     }
591
592     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
593         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
594         goto error;
595     }
596
597     m = xzalloc(sizeof *m);
598     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
599     ovsdb_add_replica(s->remote->server->db, &m->replica);
600     m->session = s;
601     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
602     m->monitor_id = json_clone(monitor_id);
603     shash_init(&m->tables);
604
605     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
606         const struct ovsdb_table *table;
607         struct ovsdb_jsonrpc_monitor_table *mt;
608         const struct json *columns_json, *select_json;
609         struct ovsdb_parser parser;
610
611         table = ovsdb_get_table(s->remote->server->db, node->name);
612         if (!table) {
613             error = ovsdb_syntax_error(NULL, NULL,
614                                        "no table named %s", node->name);
615             goto error;
616         }
617
618         mt = xzalloc(sizeof *mt);
619         mt->table = table;
620         mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
621         ovsdb_column_set_init(&mt->columns);
622         shash_add(&m->tables, table->schema->name, mt);
623
624         ovsdb_parser_init(&parser, node->data, "table %s", node->name);
625         columns_json = ovsdb_parser_member(&parser, "columns",
626                                            OP_ARRAY | OP_OPTIONAL);
627         select_json = ovsdb_parser_member(&parser, "select",
628                                           OP_OBJECT | OP_OPTIONAL);
629         error = ovsdb_parser_finish(&parser);
630         if (error) {
631             goto error;
632         }
633
634         if (columns_json) {
635             error = ovsdb_column_set_from_json(columns_json, table,
636                                                &mt->columns);
637             if (error) {
638                 goto error;
639             }
640         } else {
641             struct shash_node *node;
642
643             SHASH_FOR_EACH (node, &table->schema->columns) {
644                 const struct ovsdb_column *column = node->data;
645                 if (column->index != OVSDB_COL_UUID) {
646                     ovsdb_column_set_add(&mt->columns, column);
647                 }
648             }
649         }
650
651         if (select_json) {
652             mt->select = 0;
653             ovsdb_parser_init(&parser, select_json, "table %s select",
654                               table->schema->name);
655             if (parse_bool(&parser, "initial", true)) {
656                 mt->select |= OJMS_INITIAL;
657             }
658             if (parse_bool(&parser, "insert", true)) {
659                 mt->select |= OJMS_INSERT;
660             }
661             if (parse_bool(&parser, "delete", true)) {
662                 mt->select |= OJMS_DELETE;
663             }
664             if (parse_bool(&parser, "modify", true)) {
665                 mt->select |= OJMS_MODIFY;
666             }
667             error = ovsdb_parser_finish(&parser);
668             if (error) {
669                 goto error;
670             }
671         }
672     }
673
674     return ovsdb_jsonrpc_monitor_get_initial(m);
675
676 error:
677     if (m) {
678         ovsdb_remove_replica(s->remote->server->db, &m->replica);
679     }
680
681     json = ovsdb_error_to_json(error);
682     ovsdb_error_destroy(error);
683     return json;
684 }
685
686 static struct jsonrpc_msg *
687 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
688                              struct json_array *params,
689                              const struct json *request_id)
690 {
691     if (params->n != 1) {
692         return jsonrpc_create_error(json_string_create("invalid parameters"),
693                                     request_id);
694     } else {
695         struct ovsdb_jsonrpc_monitor *m;
696
697         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
698         if (!m) {
699             return jsonrpc_create_error(json_string_create("unknown monitor"),
700                                         request_id);
701         } else {
702             ovsdb_remove_replica(s->remote->server->db, &m->replica);
703             return jsonrpc_create_reply(json_object_create(), request_id);
704         }
705     }
706 }
707
708 static void
709 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
710 {
711     struct ovsdb_jsonrpc_monitor *m, *next;
712
713     HMAP_FOR_EACH_SAFE (m, next,
714                         struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
715         ovsdb_remove_replica(s->remote->server->db, &m->replica);
716     }
717 }
718
719 static struct ovsdb_jsonrpc_monitor *
720 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
721 {
722     assert(replica->class == &ovsdb_jsonrpc_replica_class);
723     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
724 }
725
726 struct ovsdb_jsonrpc_monitor_aux {
727     bool initial;               /* Sending initial contents of table? */
728     const struct ovsdb_jsonrpc_monitor *monitor;
729     struct json *json;          /* JSON for the whole transaction. */
730
731     /* Current table.  */
732     struct ovsdb_jsonrpc_monitor_table *mt;
733     struct json *table_json;    /* JSON for table's transaction. */
734 };
735
736 static bool
737 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
738                                 const struct ovsdb_row *new,
739                                 void *aux_)
740 {
741     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
742     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
743     struct ovsdb_table *table = new ? new->table : old->table;
744     enum ovsdb_jsonrpc_monitor_selection type;
745     struct json *old_json, *new_json;
746     struct json *row_json;
747     char uuid[UUID_LEN + 1];
748     int n_changed;
749     size_t i;
750
751     if (!aux->mt || table != aux->mt->table) {
752         aux->mt = shash_find_data(&m->tables, table->schema->name);
753         aux->table_json = NULL;
754         if (!aux->mt) {
755             /* We don't care about rows in this table at all.  Tell the caller
756              * to skip it.  */
757             return false;
758         }
759     }
760
761     type = (aux->initial ? OJMS_INITIAL
762             : !old ? OJMS_INSERT
763             : !new ? OJMS_DELETE
764             : OJMS_MODIFY);
765     if (!(aux->mt->select & type)) {
766         /* We don't care about this type of change (but do want to be called
767          * back for changes to other rows in the same table). */
768         return true;
769     }
770
771     old_json = new_json = NULL;
772     n_changed = 0;
773     for (i = 0; i < aux->mt->columns.n_columns; i++) {
774         const struct ovsdb_column *column = aux->mt->columns.columns[i];
775         unsigned int idx = column->index;
776         bool changed = false;
777
778         if (type == OJMS_MODIFY) {
779             changed = !ovsdb_datum_equals(&old->fields[idx],
780                                           &new->fields[idx], &column->type);
781             n_changed += changed;
782         }
783         if (changed || type == OJMS_DELETE) {
784             if (!old_json) {
785                 old_json = json_object_create();
786             }
787             json_object_put(old_json, column->name,
788                             ovsdb_datum_to_json(&old->fields[idx],
789                                                 &column->type));
790         }
791         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
792             if (!new_json) {
793                 new_json = json_object_create();
794             }
795             json_object_put(new_json, column->name,
796                             ovsdb_datum_to_json(&new->fields[idx],
797                                                 &column->type));
798         }
799     }
800     if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) {
801         /* No reportable changes. */
802         json_destroy(old_json);
803         json_destroy(new_json);
804         return true;
805     }
806
807     /* Create JSON object for transaction overall. */
808     if (!aux->json) {
809         aux->json = json_object_create();
810     }
811
812     /* Create JSON object for transaction on this table. */
813     if (!aux->table_json) {
814         aux->table_json = json_object_create();
815         json_object_put(aux->json, aux->mt->table->schema->name,
816                         aux->table_json);
817     }
818
819     /* Create JSON object for transaction on this row. */
820     row_json = json_object_create();
821     if (old_json) {
822         json_object_put(row_json, "old", old_json);
823     }
824     if (new_json) {
825         json_object_put(row_json, "new", new_json);
826     }
827
828     /* Add JSON row to JSON table. */
829     snprintf(uuid, sizeof uuid,
830              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
831     json_object_put(aux->table_json, uuid, row_json);
832
833     return true;
834 }
835
836 static void
837 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
838                                const struct ovsdb_jsonrpc_monitor *m,
839                                bool initial)
840 {
841     aux->initial = initial;
842     aux->monitor = m;
843     aux->json = NULL;
844     aux->mt = NULL;
845     aux->table_json = NULL;
846 }
847
848 static struct ovsdb_error *
849 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
850                              const struct ovsdb_txn *txn, bool durable UNUSED)
851 {
852     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
853     struct ovsdb_jsonrpc_monitor_aux aux;
854
855     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
856     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
857     if (aux.json) {
858         struct jsonrpc_msg *msg;
859         struct json *params;
860
861         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
862                                      aux.json);
863         msg = jsonrpc_create_notify("update", params);
864         jsonrpc_session_send(aux.monitor->session->js, msg);
865     }
866
867     return NULL;
868 }
869
870 static struct json *
871 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
872 {
873     struct ovsdb_jsonrpc_monitor_aux aux;
874     struct shash_node *node;
875
876     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
877     SHASH_FOR_EACH (node, &m->tables) {
878         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
879
880         if (mt->select & OJMS_INITIAL) {
881             struct ovsdb_row *row;
882
883             HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
884                            &mt->table->rows) {
885                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux);
886             }
887         }
888     }
889     return aux.json ? aux.json : json_object_create();
890 }
891
892 static void
893 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
894 {
895     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
896     struct shash_node *node;
897
898     json_destroy(m->monitor_id);
899     SHASH_FOR_EACH (node, &m->tables) {
900         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
901         ovsdb_column_set_destroy(&mt->columns);
902         free(mt);
903     }
904     shash_destroy(&m->tables);
905     hmap_remove(&m->session->monitors, &m->node);
906     free(m);
907 }
908
909 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
910     ovsdb_jsonrpc_monitor_commit,
911     ovsdb_jsonrpc_monitor_destroy
912 };