stream: Remove spurious #includes from header file.
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <assert.h>
21 #include <errno.h>
22
23 #include "column.h"
24 #include "json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
28 #include "ovsdb.h"
29 #include "reconnect.h"
30 #include "row.h"
31 #include "stream.h"
32 #include "table.h"
33 #include "timeval.h"
34 #include "transaction.h"
35 #include "trigger.h"
36
37 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
38 #include "vlog.h"
39
40 struct ovsdb_jsonrpc_remote;
41 struct ovsdb_jsonrpc_session;
42
43 /* Message rate-limiting. */
44 struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
45
46 /* Sessions. */
47 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
48     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
49 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
50 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
51 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
52
53 /* Triggers. */
54 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
55                                          struct json *id, struct json *params);
56 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
57     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
58 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
59 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
60 static void ovsdb_jsonrpc_trigger_complete_done(
61     struct ovsdb_jsonrpc_session *);
62
63 /* Monitors. */
64 static struct json *ovsdb_jsonrpc_monitor_create(
65     struct ovsdb_jsonrpc_session *, struct json *params);
66 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
67     struct ovsdb_jsonrpc_session *,
68     struct json_array *params,
69     const struct json *request_id);
70 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
71 \f
72 /* JSON-RPC database server. */
73
74 struct ovsdb_jsonrpc_server {
75     struct ovsdb *db;
76     unsigned int n_sessions, max_sessions;
77     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
78 };
79
80 /* A configured remote.  This is either a passive stream listener plus a list
81  * of the currently connected sessions, or a list of exactly one active
82  * session. */
83 struct ovsdb_jsonrpc_remote {
84     struct ovsdb_jsonrpc_server *server;
85     struct pstream *listener;   /* Listener, if passive. */
86     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
87 };
88
89 static void ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *,
90                                             const char *name);
91 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
92
93 struct ovsdb_jsonrpc_server *
94 ovsdb_jsonrpc_server_create(struct ovsdb *db)
95 {
96     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
97     server->db = db;
98     server->max_sessions = 64;
99     shash_init(&server->remotes);
100     return server;
101 }
102
103 /* Sets 'svr''s current set of remotes to the names in 'new_remotes'.  The data
104  * values in 'new_remotes' are ignored.
105  *
106  * A remote is an active or passive stream connection method, e.g. "pssl:" or
107  * "tcp:1.2.3.4". */
108 void
109 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
110                                  const struct shash *new_remotes)
111 {
112     struct shash_node *node, *next;
113
114     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
115         if (!shash_find(new_remotes, node->name)) {
116             ovsdb_jsonrpc_server_del_remote(node);
117         }
118     }
119     SHASH_FOR_EACH (node, new_remotes) {
120         if (!shash_find(&svr->remotes, node->name)) {
121             ovsdb_jsonrpc_server_add_remote(svr, node->name);
122         }
123     }
124 }
125
126 static void
127 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
128                                 const char *name)
129 {
130     struct ovsdb_jsonrpc_remote *remote;
131     struct pstream *listener;
132     int error;
133
134     error = pstream_open(name, &listener);
135     if (error && error != EAFNOSUPPORT) {
136         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
137         return;
138     }
139
140     remote = xmalloc(sizeof *remote);
141     remote->server = svr;
142     remote->listener = listener;
143     list_init(&remote->sessions);
144     shash_add(&svr->remotes, name, remote);
145
146     if (!listener) {
147         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
148     }
149 }
150
151 static void
152 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
153 {
154     struct ovsdb_jsonrpc_remote *remote = node->data;
155
156     ovsdb_jsonrpc_session_close_all(remote);
157     pstream_close(remote->listener);
158     shash_delete(&remote->server->remotes, node);
159     free(remote);
160 }
161
162 void
163 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
164 {
165     struct shash_node *node;
166
167     SHASH_FOR_EACH (node, &svr->remotes) {
168         struct ovsdb_jsonrpc_remote *remote = node->data;
169
170         if (remote->listener && svr->n_sessions < svr->max_sessions) {
171             struct stream *stream;
172             int error;
173
174             error = pstream_accept(remote->listener, &stream);
175             if (!error) {
176                 struct jsonrpc_session *js;
177                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream));
178                 ovsdb_jsonrpc_session_create(remote, js);
179             } else if (error != EAGAIN) {
180                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
181                              pstream_get_name(remote->listener),
182                              strerror(error));
183             }
184         }
185
186         ovsdb_jsonrpc_session_run_all(remote);
187     }
188 }
189
190 void
191 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
192 {
193     struct shash_node *node;
194
195     SHASH_FOR_EACH (node, &svr->remotes) {
196         struct ovsdb_jsonrpc_remote *remote = node->data;
197
198         if (remote->listener && svr->n_sessions < svr->max_sessions) {
199             pstream_wait(remote->listener);
200         }
201
202         ovsdb_jsonrpc_session_wait_all(remote);
203     }
204 }
205 \f
206 /* JSON-RPC database server session. */
207
208 struct ovsdb_jsonrpc_session {
209     struct ovsdb_jsonrpc_remote *remote;
210     struct list node;           /* Element in remote's sessions list. */
211
212     /* Triggers. */
213     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
214     struct list completions;    /* Completed triggers. */
215
216     /* Monitors. */
217     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
218
219     /* Network connectivity. */
220     struct jsonrpc_session *js;  /* JSON-RPC session. */
221     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
222 };
223
224 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
225 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
226 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
227 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
228                                              struct jsonrpc_msg *);
229 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
230                                              struct jsonrpc_msg *);
231
232 static struct ovsdb_jsonrpc_session *
233 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
234                              struct jsonrpc_session *js)
235 {
236     struct ovsdb_jsonrpc_session *s;
237
238     s = xzalloc(sizeof *s);
239     s->remote = remote;
240     list_push_back(&remote->sessions, &s->node);
241     hmap_init(&s->triggers);
242     hmap_init(&s->monitors);
243     list_init(&s->completions);
244     s->js = js;
245     s->js_seqno = jsonrpc_session_get_seqno(js);
246
247     remote->server->n_sessions++;
248
249     return s;
250 }
251
252 static void
253 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
254 {
255     jsonrpc_session_close(s->js);
256     list_remove(&s->node);
257     s->remote->server->n_sessions--;
258 }
259
260 static int
261 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
262 {
263     jsonrpc_session_run(s->js);
264     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
265         s->js_seqno = jsonrpc_session_get_seqno(s->js);
266         ovsdb_jsonrpc_trigger_complete_all(s);
267         ovsdb_jsonrpc_monitor_remove_all(s);
268     }
269
270     ovsdb_jsonrpc_trigger_complete_done(s);
271
272     if (!jsonrpc_session_get_backlog(s->js)) {
273         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
274         if (msg) {
275             if (msg->type == JSONRPC_REQUEST) {
276                 ovsdb_jsonrpc_session_got_request(s, msg);
277             } else if (msg->type == JSONRPC_NOTIFY) {
278                 ovsdb_jsonrpc_session_got_notify(s, msg);
279             } else {
280                 VLOG_WARN("%s: received unexpected %s message",
281                           jsonrpc_session_get_name(s->js),
282                           jsonrpc_msg_type_to_string(msg->type));
283                 jsonrpc_session_force_reconnect(s->js);
284                 jsonrpc_msg_destroy(msg);
285             }
286         }
287     }
288     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
289 }
290
291 static void
292 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
293 {
294     struct ovsdb_jsonrpc_session *s, *next;
295
296     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
297                         &remote->sessions) {
298         int error = ovsdb_jsonrpc_session_run(s);
299         if (error) {
300             ovsdb_jsonrpc_session_close(s);
301         }
302     }
303 }
304
305 static void
306 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
307 {
308     jsonrpc_session_wait(s->js);
309     if (!jsonrpc_session_get_backlog(s->js)) {
310         jsonrpc_session_recv_wait(s->js);
311     }
312 }
313
314 static void
315 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
316 {
317     struct ovsdb_jsonrpc_session *s;
318
319     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &remote->sessions) {
320         ovsdb_jsonrpc_session_wait(s);
321     }
322 }
323
324 static void
325 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
326 {
327     struct ovsdb_jsonrpc_session *s, *next;
328
329     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
330                         &remote->sessions) {
331         ovsdb_jsonrpc_session_close(s);
332     }
333 }
334
335 static struct jsonrpc_msg *
336 execute_transaction(struct ovsdb_jsonrpc_session *s,
337                     struct jsonrpc_msg *request)
338 {
339     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
340     request->id = NULL;
341     request->params = NULL;
342     return NULL;
343 }
344
345 static void
346 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
347                                   struct jsonrpc_msg *request)
348 {
349     struct jsonrpc_msg *reply;
350
351     if (!strcmp(request->method, "transact")) {
352         reply = execute_transaction(s, request);
353     } else if (!strcmp(request->method, "monitor")) {
354         reply = jsonrpc_create_reply(
355             ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
356     } else if (!strcmp(request->method, "monitor_cancel")) {
357         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
358                                              request->id);
359     } else if (!strcmp(request->method, "get_schema")) {
360         reply = jsonrpc_create_reply(
361             ovsdb_schema_to_json(s->remote->server->db->schema), request->id);
362     } else if (!strcmp(request->method, "echo")) {
363         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
364     } else {
365         reply = jsonrpc_create_error(json_string_create("unknown method"),
366                                      request->id);
367     }
368
369     if (reply) {
370         jsonrpc_msg_destroy(request);
371         jsonrpc_session_send(s->js, reply);
372     }
373 }
374
375 static void
376 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
377 {
378     if (json_array(request->params)->n == 1) {
379         struct ovsdb_jsonrpc_trigger *t;
380         struct json *id;
381
382         id = request->params->u.array.elems[0];
383         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
384         if (t) {
385             ovsdb_jsonrpc_trigger_complete(t);
386         }
387     }
388 }
389
390 static void
391 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
392                                  struct jsonrpc_msg *request)
393 {
394     if (!strcmp(request->method, "cancel")) {
395         execute_cancel(s, request);
396     }
397     jsonrpc_msg_destroy(request);
398 }
399 \f
400 /* JSON-RPC database server triggers.
401  *
402  * (Every transaction is treated as a trigger even if it doesn't actually have
403  * any "wait" operations.) */
404
405 struct ovsdb_jsonrpc_trigger {
406     struct ovsdb_trigger trigger;
407     struct ovsdb_jsonrpc_session *session;
408     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
409     struct json *id;
410 };
411
412 static void
413 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
414                              struct json *id, struct json *params)
415 {
416     struct ovsdb_jsonrpc_trigger *t;
417     size_t hash;
418
419     /* Check for duplicate ID. */
420     hash = json_hash(id, 0);
421     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
422     if (t) {
423         struct jsonrpc_msg *msg;
424
425         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
426                                    id);
427         jsonrpc_session_send(s->js, msg);
428         json_destroy(id);
429         json_destroy(params);
430         return;
431     }
432
433     /* Insert into trigger table. */
434     t = xmalloc(sizeof *t);
435     ovsdb_trigger_init(s->remote->server->db,
436                        &t->trigger, params, &s->completions,
437                        time_msec());
438     t->session = s;
439     t->id = id;
440     hmap_insert(&s->triggers, &t->hmap_node, hash);
441
442     /* Complete early if possible. */
443     if (ovsdb_trigger_is_complete(&t->trigger)) {
444         ovsdb_jsonrpc_trigger_complete(t);
445     }
446 }
447
448 static struct ovsdb_jsonrpc_trigger *
449 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
450                            const struct json *id, size_t hash)
451 {
452     struct ovsdb_jsonrpc_trigger *t;
453
454     HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
455                              &s->triggers) {
456         if (json_equal(t->id, id)) {
457             return t;
458         }
459     }
460
461     return NULL;
462 }
463
464 static void
465 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
466 {
467     struct ovsdb_jsonrpc_session *s = t->session;
468
469     if (jsonrpc_session_is_connected(s->js)) {
470         struct jsonrpc_msg *reply;
471         struct json *result;
472
473         result = ovsdb_trigger_steal_result(&t->trigger);
474         if (result) {
475             reply = jsonrpc_create_reply(result, t->id);
476         } else {
477             reply = jsonrpc_create_error(json_string_create("canceled"),
478                                          t->id);
479         }
480         jsonrpc_session_send(s->js, reply);
481     }
482
483     json_destroy(t->id);
484     ovsdb_trigger_destroy(&t->trigger);
485     hmap_remove(&s->triggers, &t->hmap_node);
486     free(t);
487 }
488
489 static void
490 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
491 {
492     struct ovsdb_jsonrpc_trigger *t, *next;
493     HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
494                         &s->triggers) {
495         ovsdb_jsonrpc_trigger_complete(t);
496     }
497 }
498
499 static void
500 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
501 {
502     while (!list_is_empty(&s->completions)) {
503         struct ovsdb_jsonrpc_trigger *t
504             = CONTAINER_OF(s->completions.next,
505                            struct ovsdb_jsonrpc_trigger, trigger.node);
506         ovsdb_jsonrpc_trigger_complete(t);
507     }
508 }
509 \f
510 /* JSON-RPC database table monitors. */
511
512 enum ovsdb_jsonrpc_monitor_selection {
513     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
514     OJMS_INSERT = 1 << 1,       /* New rows. */
515     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
516     OJMS_MODIFY = 1 << 3        /* Modified rows. */
517 };
518
519 struct ovsdb_jsonrpc_monitor_table {
520     const struct ovsdb_table *table;
521     enum ovsdb_jsonrpc_monitor_selection select;
522     struct ovsdb_column_set columns;
523 };
524
525 struct ovsdb_jsonrpc_monitor {
526     struct ovsdb_replica replica;
527     struct ovsdb_jsonrpc_session *session;
528     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
529
530     struct json *monitor_id;
531     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
532 };
533
534 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
535
536 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
537     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
538 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
539 static struct json *ovsdb_jsonrpc_monitor_get_initial(
540     const struct ovsdb_jsonrpc_monitor *);
541
542 static bool
543 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
544 {
545     const struct json *json;
546
547     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
548     return json ? json_boolean(json) : default_value;
549 }
550
551 struct ovsdb_jsonrpc_monitor *
552 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
553                            const struct json *monitor_id)
554 {
555     struct ovsdb_jsonrpc_monitor *m;
556
557     HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node,
558                              json_hash(monitor_id, 0), &s->monitors) {
559         if (json_equal(m->monitor_id, monitor_id)) {
560             return m;
561         }
562     }
563
564     return NULL;
565 }
566
567 static struct json *
568 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
569                              struct json *params)
570 {
571     struct ovsdb_jsonrpc_monitor *m = NULL;
572     struct json *monitor_id, *monitor_requests;
573     struct ovsdb_error *error = NULL;
574     struct shash_node *node;
575     struct json *json;
576
577     if (json_array(params)->n != 2) {
578         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
579         goto error;
580     }
581     monitor_id = params->u.array.elems[0];
582     monitor_requests = params->u.array.elems[1];
583     if (monitor_requests->type != JSON_OBJECT) {
584         error = ovsdb_syntax_error(monitor_requests, NULL,
585                                    "monitor-requests must be object");
586         goto error;
587     }
588
589     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
590         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
591         goto error;
592     }
593
594     m = xzalloc(sizeof *m);
595     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
596     ovsdb_add_replica(s->remote->server->db, &m->replica);
597     m->session = s;
598     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
599     m->monitor_id = json_clone(monitor_id);
600     shash_init(&m->tables);
601
602     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
603         const struct ovsdb_table *table;
604         struct ovsdb_jsonrpc_monitor_table *mt;
605         const struct json *columns_json, *select_json;
606         struct ovsdb_parser parser;
607
608         table = ovsdb_get_table(s->remote->server->db, node->name);
609         if (!table) {
610             error = ovsdb_syntax_error(NULL, NULL,
611                                        "no table named %s", node->name);
612             goto error;
613         }
614
615         mt = xzalloc(sizeof *mt);
616         mt->table = table;
617         mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
618         ovsdb_column_set_init(&mt->columns);
619         shash_add(&m->tables, table->schema->name, mt);
620
621         ovsdb_parser_init(&parser, node->data, "table %s", node->name);
622         columns_json = ovsdb_parser_member(&parser, "columns",
623                                            OP_ARRAY | OP_OPTIONAL);
624         select_json = ovsdb_parser_member(&parser, "select",
625                                           OP_OBJECT | OP_OPTIONAL);
626         error = ovsdb_parser_finish(&parser);
627         if (error) {
628             goto error;
629         }
630
631         if (columns_json) {
632             error = ovsdb_column_set_from_json(columns_json, table,
633                                                &mt->columns);
634             if (error) {
635                 goto error;
636             }
637         } else {
638             struct shash_node *node;
639
640             SHASH_FOR_EACH (node, &table->schema->columns) {
641                 const struct ovsdb_column *column = node->data;
642                 if (column->index != OVSDB_COL_UUID) {
643                     ovsdb_column_set_add(&mt->columns, column);
644                 }
645             }
646         }
647
648         if (select_json) {
649             mt->select = 0;
650             ovsdb_parser_init(&parser, select_json, "table %s select",
651                               table->schema->name);
652             if (parse_bool(&parser, "initial", true)) {
653                 mt->select |= OJMS_INITIAL;
654             }
655             if (parse_bool(&parser, "insert", true)) {
656                 mt->select |= OJMS_INSERT;
657             }
658             if (parse_bool(&parser, "delete", true)) {
659                 mt->select |= OJMS_DELETE;
660             }
661             if (parse_bool(&parser, "modify", true)) {
662                 mt->select |= OJMS_MODIFY;
663             }
664             error = ovsdb_parser_finish(&parser);
665             if (error) {
666                 goto error;
667             }
668         }
669     }
670
671     return ovsdb_jsonrpc_monitor_get_initial(m);
672
673 error:
674     if (m) {
675         ovsdb_remove_replica(s->remote->server->db, &m->replica);
676     }
677
678     json = ovsdb_error_to_json(error);
679     ovsdb_error_destroy(error);
680     return json;
681 }
682
683 static struct jsonrpc_msg *
684 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
685                              struct json_array *params,
686                              const struct json *request_id)
687 {
688     if (params->n != 1) {
689         return jsonrpc_create_error(json_string_create("invalid parameters"),
690                                     request_id);
691     } else {
692         struct ovsdb_jsonrpc_monitor *m;
693
694         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
695         if (!m) {
696             return jsonrpc_create_error(json_string_create("unknown monitor"),
697                                         request_id);
698         } else {
699             ovsdb_remove_replica(s->remote->server->db, &m->replica);
700             return jsonrpc_create_reply(json_object_create(), request_id);
701         }
702     }
703 }
704
705 static void
706 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
707 {
708     struct ovsdb_jsonrpc_monitor *m, *next;
709
710     HMAP_FOR_EACH_SAFE (m, next,
711                         struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
712         ovsdb_remove_replica(s->remote->server->db, &m->replica);
713     }
714 }
715
716 static struct ovsdb_jsonrpc_monitor *
717 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
718 {
719     assert(replica->class == &ovsdb_jsonrpc_replica_class);
720     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
721 }
722
723 struct ovsdb_jsonrpc_monitor_aux {
724     bool initial;               /* Sending initial contents of table? */
725     const struct ovsdb_jsonrpc_monitor *monitor;
726     struct json *json;          /* JSON for the whole transaction. */
727
728     /* Current table.  */
729     struct ovsdb_jsonrpc_monitor_table *mt;
730     struct json *table_json;    /* JSON for table's transaction. */
731 };
732
733 static bool
734 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
735                                 const struct ovsdb_row *new,
736                                 void *aux_)
737 {
738     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
739     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
740     struct ovsdb_table *table = new ? new->table : old->table;
741     enum ovsdb_jsonrpc_monitor_selection type;
742     struct json *old_json, *new_json;
743     struct json *row_json;
744     char uuid[UUID_LEN + 1];
745     int n_changed;
746     size_t i;
747
748     if (!aux->mt || table != aux->mt->table) {
749         aux->mt = shash_find_data(&m->tables, table->schema->name);
750         aux->table_json = NULL;
751         if (!aux->mt) {
752             /* We don't care about rows in this table at all.  Tell the caller
753              * to skip it.  */
754             return false;
755         }
756     }
757
758     type = (aux->initial ? OJMS_INITIAL
759             : !old ? OJMS_INSERT
760             : !new ? OJMS_DELETE
761             : OJMS_MODIFY);
762     if (!(aux->mt->select & type)) {
763         /* We don't care about this type of change (but do want to be called
764          * back for changes to other rows in the same table). */
765         return true;
766     }
767
768     old_json = new_json = NULL;
769     n_changed = 0;
770     for (i = 0; i < aux->mt->columns.n_columns; i++) {
771         const struct ovsdb_column *column = aux->mt->columns.columns[i];
772         unsigned int idx = column->index;
773         bool changed = false;
774
775         if (type == OJMS_MODIFY) {
776             changed = !ovsdb_datum_equals(&old->fields[idx],
777                                           &new->fields[idx], &column->type);
778             n_changed += changed;
779         }
780         if (changed || type == OJMS_DELETE) {
781             if (!old_json) {
782                 old_json = json_object_create();
783             }
784             json_object_put(old_json, column->name,
785                             ovsdb_datum_to_json(&old->fields[idx],
786                                                 &column->type));
787         }
788         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
789             if (!new_json) {
790                 new_json = json_object_create();
791             }
792             json_object_put(new_json, column->name,
793                             ovsdb_datum_to_json(&new->fields[idx],
794                                                 &column->type));
795         }
796     }
797     if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) {
798         /* No reportable changes. */
799         json_destroy(old_json);
800         json_destroy(new_json);
801         return true;
802     }
803
804     /* Create JSON object for transaction overall. */
805     if (!aux->json) {
806         aux->json = json_object_create();
807     }
808
809     /* Create JSON object for transaction on this table. */
810     if (!aux->table_json) {
811         aux->table_json = json_object_create();
812         json_object_put(aux->json, aux->mt->table->schema->name,
813                         aux->table_json);
814     }
815
816     /* Create JSON object for transaction on this row. */
817     row_json = json_object_create();
818     if (old_json) {
819         json_object_put(row_json, "old", old_json);
820     }
821     if (new_json) {
822         json_object_put(row_json, "new", new_json);
823     }
824
825     /* Add JSON row to JSON table. */
826     snprintf(uuid, sizeof uuid,
827              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
828     json_object_put(aux->table_json, uuid, row_json);
829
830     return true;
831 }
832
833 static void
834 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
835                                const struct ovsdb_jsonrpc_monitor *m,
836                                bool initial)
837 {
838     aux->initial = initial;
839     aux->monitor = m;
840     aux->json = NULL;
841     aux->mt = NULL;
842     aux->table_json = NULL;
843 }
844
845 static struct ovsdb_error *
846 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
847                              const struct ovsdb_txn *txn, bool durable UNUSED)
848 {
849     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
850     struct ovsdb_jsonrpc_monitor_aux aux;
851
852     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
853     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
854     if (aux.json) {
855         struct jsonrpc_msg *msg;
856         struct json *params;
857
858         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
859                                      aux.json);
860         msg = jsonrpc_create_notify("update", params);
861         jsonrpc_session_send(aux.monitor->session->js, msg);
862     }
863
864     return NULL;
865 }
866
867 static struct json *
868 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
869 {
870     struct ovsdb_jsonrpc_monitor_aux aux;
871     struct shash_node *node;
872
873     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
874     SHASH_FOR_EACH (node, &m->tables) {
875         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
876
877         if (mt->select & OJMS_INITIAL) {
878             struct ovsdb_row *row;
879
880             HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
881                            &mt->table->rows) {
882                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux);
883             }
884         }
885     }
886     return aux.json ? aux.json : json_object_create();
887 }
888
889 static void
890 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
891 {
892     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
893     struct shash_node *node;
894
895     json_destroy(m->monitor_id);
896     SHASH_FOR_EACH (node, &m->tables) {
897         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
898         ovsdb_column_set_destroy(&mt->columns);
899         free(mt);
900     }
901     shash_destroy(&m->tables);
902     hmap_remove(&m->session->monitors, &m->node);
903     free(m);
904 }
905
906 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
907     ovsdb_jsonrpc_monitor_commit,
908     ovsdb_jsonrpc_monitor_destroy
909 };