ovsdb: Add support for multiple databases to the protocol.
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <assert.h>
21 #include <errno.h>
22
23 #include "column.h"
24 #include "json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
28 #include "ovsdb.h"
29 #include "reconnect.h"
30 #include "row.h"
31 #include "stream.h"
32 #include "table.h"
33 #include "timeval.h"
34 #include "transaction.h"
35 #include "trigger.h"
36
37 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
38 #include "vlog.h"
39
40 struct ovsdb_jsonrpc_remote;
41 struct ovsdb_jsonrpc_session;
42
43 /* Message rate-limiting. */
44 struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
45
46 /* Sessions. */
47 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
48     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
49 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
50 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
51 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
52
53 /* Triggers. */
54 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
55                                          struct json *id, struct json *params);
56 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
57     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
58 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
59 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
60 static void ovsdb_jsonrpc_trigger_complete_done(
61     struct ovsdb_jsonrpc_session *);
62
63 /* Monitors. */
64 static struct json *ovsdb_jsonrpc_monitor_create(
65     struct ovsdb_jsonrpc_session *, struct json *params);
66 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
67     struct ovsdb_jsonrpc_session *,
68     struct json_array *params,
69     const struct json *request_id);
70 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
71 \f
72 /* JSON-RPC database server. */
73
74 struct ovsdb_jsonrpc_server {
75     struct ovsdb *db;
76     unsigned int n_sessions, max_sessions;
77     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
78 };
79
80 /* A configured remote.  This is either a passive stream listener plus a list
81  * of the currently connected sessions, or a list of exactly one active
82  * session. */
83 struct ovsdb_jsonrpc_remote {
84     struct ovsdb_jsonrpc_server *server;
85     struct pstream *listener;   /* Listener, if passive. */
86     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
87 };
88
89 static void ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *,
90                                             const char *name);
91 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
92
93 struct ovsdb_jsonrpc_server *
94 ovsdb_jsonrpc_server_create(struct ovsdb *db)
95 {
96     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
97     server->db = db;
98     server->max_sessions = 64;
99     shash_init(&server->remotes);
100     return server;
101 }
102
103 void
104 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
105 {
106     struct shash_node *node, *next;
107
108     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
109         ovsdb_jsonrpc_server_del_remote(node);
110     }
111     shash_destroy(&svr->remotes);
112     free(svr);
113 }
114
115 /* Sets 'svr''s current set of remotes to the names in 'new_remotes'.  The data
116  * values in 'new_remotes' are ignored.
117  *
118  * A remote is an active or passive stream connection method, e.g. "pssl:" or
119  * "tcp:1.2.3.4". */
120 void
121 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
122                                  const struct shash *new_remotes)
123 {
124     struct shash_node *node, *next;
125
126     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
127         if (!shash_find(new_remotes, node->name)) {
128             ovsdb_jsonrpc_server_del_remote(node);
129         }
130     }
131     SHASH_FOR_EACH (node, new_remotes) {
132         if (!shash_find(&svr->remotes, node->name)) {
133             ovsdb_jsonrpc_server_add_remote(svr, node->name);
134         }
135     }
136 }
137
138 static void
139 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
140                                 const char *name)
141 {
142     struct ovsdb_jsonrpc_remote *remote;
143     struct pstream *listener;
144     int error;
145
146     error = pstream_open(name, &listener);
147     if (error && error != EAFNOSUPPORT) {
148         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
149         return;
150     }
151
152     remote = xmalloc(sizeof *remote);
153     remote->server = svr;
154     remote->listener = listener;
155     list_init(&remote->sessions);
156     shash_add(&svr->remotes, name, remote);
157
158     if (!listener) {
159         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
160     }
161 }
162
163 static void
164 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
165 {
166     struct ovsdb_jsonrpc_remote *remote = node->data;
167
168     ovsdb_jsonrpc_session_close_all(remote);
169     pstream_close(remote->listener);
170     shash_delete(&remote->server->remotes, node);
171     free(remote);
172 }
173
174 void
175 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
176 {
177     struct shash_node *node;
178
179     SHASH_FOR_EACH (node, &svr->remotes) {
180         struct ovsdb_jsonrpc_remote *remote = node->data;
181
182         if (remote->listener && svr->n_sessions < svr->max_sessions) {
183             struct stream *stream;
184             int error;
185
186             error = pstream_accept(remote->listener, &stream);
187             if (!error) {
188                 struct jsonrpc_session *js;
189                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream));
190                 ovsdb_jsonrpc_session_create(remote, js);
191             } else if (error != EAGAIN) {
192                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
193                              pstream_get_name(remote->listener),
194                              strerror(error));
195             }
196         }
197
198         ovsdb_jsonrpc_session_run_all(remote);
199     }
200 }
201
202 void
203 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
204 {
205     struct shash_node *node;
206
207     SHASH_FOR_EACH (node, &svr->remotes) {
208         struct ovsdb_jsonrpc_remote *remote = node->data;
209
210         if (remote->listener && svr->n_sessions < svr->max_sessions) {
211             pstream_wait(remote->listener);
212         }
213
214         ovsdb_jsonrpc_session_wait_all(remote);
215     }
216 }
217 \f
218 /* JSON-RPC database server session. */
219
220 struct ovsdb_jsonrpc_session {
221     struct ovsdb_jsonrpc_remote *remote;
222     struct list node;           /* Element in remote's sessions list. */
223
224     /* Triggers. */
225     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
226     struct list completions;    /* Completed triggers. */
227
228     /* Monitors. */
229     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
230
231     /* Network connectivity. */
232     struct jsonrpc_session *js;  /* JSON-RPC session. */
233     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
234 };
235
236 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
237 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
238 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
239 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
240                                              struct jsonrpc_msg *);
241 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
242                                              struct jsonrpc_msg *);
243
244 static struct ovsdb_jsonrpc_session *
245 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
246                              struct jsonrpc_session *js)
247 {
248     struct ovsdb_jsonrpc_session *s;
249
250     s = xzalloc(sizeof *s);
251     s->remote = remote;
252     list_push_back(&remote->sessions, &s->node);
253     hmap_init(&s->triggers);
254     hmap_init(&s->monitors);
255     list_init(&s->completions);
256     s->js = js;
257     s->js_seqno = jsonrpc_session_get_seqno(js);
258
259     remote->server->n_sessions++;
260
261     return s;
262 }
263
264 static void
265 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
266 {
267     ovsdb_jsonrpc_monitor_remove_all(s);
268     jsonrpc_session_close(s->js);
269     list_remove(&s->node);
270     s->remote->server->n_sessions--;
271     free(s);
272 }
273
274 static int
275 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
276 {
277     jsonrpc_session_run(s->js);
278     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
279         s->js_seqno = jsonrpc_session_get_seqno(s->js);
280         ovsdb_jsonrpc_trigger_complete_all(s);
281         ovsdb_jsonrpc_monitor_remove_all(s);
282     }
283
284     ovsdb_jsonrpc_trigger_complete_done(s);
285
286     if (!jsonrpc_session_get_backlog(s->js)) {
287         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
288         if (msg) {
289             if (msg->type == JSONRPC_REQUEST) {
290                 ovsdb_jsonrpc_session_got_request(s, msg);
291             } else if (msg->type == JSONRPC_NOTIFY) {
292                 ovsdb_jsonrpc_session_got_notify(s, msg);
293             } else {
294                 VLOG_WARN("%s: received unexpected %s message",
295                           jsonrpc_session_get_name(s->js),
296                           jsonrpc_msg_type_to_string(msg->type));
297                 jsonrpc_session_force_reconnect(s->js);
298                 jsonrpc_msg_destroy(msg);
299             }
300         }
301     }
302     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
303 }
304
305 static void
306 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
307 {
308     struct ovsdb_jsonrpc_session *s, *next;
309
310     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
311                         &remote->sessions) {
312         int error = ovsdb_jsonrpc_session_run(s);
313         if (error) {
314             ovsdb_jsonrpc_session_close(s);
315         }
316     }
317 }
318
319 static void
320 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
321 {
322     jsonrpc_session_wait(s->js);
323     if (!jsonrpc_session_get_backlog(s->js)) {
324         jsonrpc_session_recv_wait(s->js);
325     }
326 }
327
328 static void
329 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
330 {
331     struct ovsdb_jsonrpc_session *s;
332
333     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &remote->sessions) {
334         ovsdb_jsonrpc_session_wait(s);
335     }
336 }
337
338 static void
339 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
340 {
341     struct ovsdb_jsonrpc_session *s, *next;
342
343     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
344                         &remote->sessions) {
345         ovsdb_jsonrpc_session_close(s);
346     }
347 }
348
349 static const char *
350 get_db_name(const struct ovsdb_jsonrpc_session *s)
351 {
352     return s->remote->server->db->schema->name;
353 }
354
355 static struct jsonrpc_msg *
356 ovsdb_jsonrpc_check_db_name(const struct ovsdb_jsonrpc_session *s,
357                             const struct jsonrpc_msg *request)
358 {
359     struct json_array *params;
360     const char *want_db_name;
361     const char *have_db_name;
362     struct ovsdb_error *error;
363     struct jsonrpc_msg *reply;
364
365     params = json_array(request->params);
366     if (!params->n || params->elems[0]->type != JSON_STRING) {
367         error = ovsdb_syntax_error(
368             request->params, NULL,
369             "%s request params must begin with <db-name>", request->method);
370         goto error;
371     }
372
373     want_db_name = params->elems[0]->u.string;
374     have_db_name = get_db_name(s);
375     if (strcmp(want_db_name, have_db_name)) {
376         error = ovsdb_syntax_error(
377             request->params, "unknown database",
378             "%s request specifies unknown database %s",
379             request->method, want_db_name);
380         goto error;
381     }
382
383     return NULL;
384
385 error:
386     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
387     ovsdb_error_destroy(error);
388     return reply;
389 }
390
391 static struct jsonrpc_msg *
392 execute_transaction(struct ovsdb_jsonrpc_session *s,
393                     struct jsonrpc_msg *request)
394 {
395     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
396     request->id = NULL;
397     request->params = NULL;
398     jsonrpc_msg_destroy(request);
399     return NULL;
400 }
401
402 static void
403 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
404                                   struct jsonrpc_msg *request)
405 {
406     struct jsonrpc_msg *reply;
407
408     if (!strcmp(request->method, "transact")) {
409         reply = ovsdb_jsonrpc_check_db_name(s, request);
410         if (!reply) {
411             reply = execute_transaction(s, request);
412         }
413     } else if (!strcmp(request->method, "monitor")) {
414         reply = ovsdb_jsonrpc_check_db_name(s, request);
415         if (!reply) {
416             reply = jsonrpc_create_reply(
417                 ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
418         }
419     } else if (!strcmp(request->method, "monitor_cancel")) {
420         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
421                                              request->id);
422     } else if (!strcmp(request->method, "get_schema")) {
423         reply = ovsdb_jsonrpc_check_db_name(s, request);
424         if (!reply) {
425             reply = jsonrpc_create_reply(
426                 ovsdb_schema_to_json(s->remote->server->db->schema),
427                 request->id);
428         }
429     } else if (!strcmp(request->method, "list_dbs")) {
430         reply = jsonrpc_create_reply(
431             json_array_create_1(json_string_create(get_db_name(s))),
432             request->id);
433     } else if (!strcmp(request->method, "echo")) {
434         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
435     } else {
436         reply = jsonrpc_create_error(json_string_create("unknown method"),
437                                      request->id);
438     }
439
440     if (reply) {
441         jsonrpc_msg_destroy(request);
442         jsonrpc_session_send(s->js, reply);
443     }
444 }
445
446 static void
447 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
448 {
449     if (json_array(request->params)->n == 1) {
450         struct ovsdb_jsonrpc_trigger *t;
451         struct json *id;
452
453         id = request->params->u.array.elems[0];
454         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
455         if (t) {
456             ovsdb_jsonrpc_trigger_complete(t);
457         }
458     }
459 }
460
461 static void
462 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
463                                  struct jsonrpc_msg *request)
464 {
465     if (!strcmp(request->method, "cancel")) {
466         execute_cancel(s, request);
467     }
468     jsonrpc_msg_destroy(request);
469 }
470 \f
471 /* JSON-RPC database server triggers.
472  *
473  * (Every transaction is treated as a trigger even if it doesn't actually have
474  * any "wait" operations.) */
475
476 struct ovsdb_jsonrpc_trigger {
477     struct ovsdb_trigger trigger;
478     struct ovsdb_jsonrpc_session *session;
479     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
480     struct json *id;
481 };
482
483 static void
484 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
485                              struct json *id, struct json *params)
486 {
487     struct ovsdb_jsonrpc_trigger *t;
488     size_t hash;
489
490     /* Check for duplicate ID. */
491     hash = json_hash(id, 0);
492     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
493     if (t) {
494         struct jsonrpc_msg *msg;
495
496         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
497                                    id);
498         jsonrpc_session_send(s->js, msg);
499         json_destroy(id);
500         json_destroy(params);
501         return;
502     }
503
504     /* Insert into trigger table. */
505     t = xmalloc(sizeof *t);
506     ovsdb_trigger_init(s->remote->server->db,
507                        &t->trigger, params, &s->completions,
508                        time_msec());
509     t->session = s;
510     t->id = id;
511     hmap_insert(&s->triggers, &t->hmap_node, hash);
512
513     /* Complete early if possible. */
514     if (ovsdb_trigger_is_complete(&t->trigger)) {
515         ovsdb_jsonrpc_trigger_complete(t);
516     }
517 }
518
519 static struct ovsdb_jsonrpc_trigger *
520 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
521                            const struct json *id, size_t hash)
522 {
523     struct ovsdb_jsonrpc_trigger *t;
524
525     HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
526                              &s->triggers) {
527         if (json_equal(t->id, id)) {
528             return t;
529         }
530     }
531
532     return NULL;
533 }
534
535 static void
536 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
537 {
538     struct ovsdb_jsonrpc_session *s = t->session;
539
540     if (jsonrpc_session_is_connected(s->js)) {
541         struct jsonrpc_msg *reply;
542         struct json *result;
543
544         result = ovsdb_trigger_steal_result(&t->trigger);
545         if (result) {
546             reply = jsonrpc_create_reply(result, t->id);
547         } else {
548             reply = jsonrpc_create_error(json_string_create("canceled"),
549                                          t->id);
550         }
551         jsonrpc_session_send(s->js, reply);
552     }
553
554     json_destroy(t->id);
555     ovsdb_trigger_destroy(&t->trigger);
556     hmap_remove(&s->triggers, &t->hmap_node);
557     free(t);
558 }
559
560 static void
561 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
562 {
563     struct ovsdb_jsonrpc_trigger *t, *next;
564     HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
565                         &s->triggers) {
566         ovsdb_jsonrpc_trigger_complete(t);
567     }
568 }
569
570 static void
571 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
572 {
573     while (!list_is_empty(&s->completions)) {
574         struct ovsdb_jsonrpc_trigger *t
575             = CONTAINER_OF(s->completions.next,
576                            struct ovsdb_jsonrpc_trigger, trigger.node);
577         ovsdb_jsonrpc_trigger_complete(t);
578     }
579 }
580 \f
581 /* JSON-RPC database table monitors. */
582
583 enum ovsdb_jsonrpc_monitor_selection {
584     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
585     OJMS_INSERT = 1 << 1,       /* New rows. */
586     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
587     OJMS_MODIFY = 1 << 3        /* Modified rows. */
588 };
589
590 struct ovsdb_jsonrpc_monitor_table {
591     const struct ovsdb_table *table;
592     enum ovsdb_jsonrpc_monitor_selection select;
593     struct ovsdb_column_set columns;
594 };
595
596 struct ovsdb_jsonrpc_monitor {
597     struct ovsdb_replica replica;
598     struct ovsdb_jsonrpc_session *session;
599     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
600
601     struct json *monitor_id;
602     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
603 };
604
605 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
606
607 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
608     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
609 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
610 static struct json *ovsdb_jsonrpc_monitor_get_initial(
611     const struct ovsdb_jsonrpc_monitor *);
612
613 static bool
614 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
615 {
616     const struct json *json;
617
618     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
619     return json ? json_boolean(json) : default_value;
620 }
621
622 struct ovsdb_jsonrpc_monitor *
623 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
624                            const struct json *monitor_id)
625 {
626     struct ovsdb_jsonrpc_monitor *m;
627
628     HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node,
629                              json_hash(monitor_id, 0), &s->monitors) {
630         if (json_equal(m->monitor_id, monitor_id)) {
631             return m;
632         }
633     }
634
635     return NULL;
636 }
637
638 static struct json *
639 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
640                              struct json *params)
641 {
642     struct ovsdb_jsonrpc_monitor *m = NULL;
643     struct json *monitor_id, *monitor_requests;
644     struct ovsdb_error *error = NULL;
645     struct shash_node *node;
646     struct json *json;
647
648     if (json_array(params)->n != 3) {
649         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
650         goto error;
651     }
652     monitor_id = params->u.array.elems[1];
653     monitor_requests = params->u.array.elems[2];
654     if (monitor_requests->type != JSON_OBJECT) {
655         error = ovsdb_syntax_error(monitor_requests, NULL,
656                                    "monitor-requests must be object");
657         goto error;
658     }
659
660     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
661         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
662         goto error;
663     }
664
665     m = xzalloc(sizeof *m);
666     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
667     ovsdb_add_replica(s->remote->server->db, &m->replica);
668     m->session = s;
669     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
670     m->monitor_id = json_clone(monitor_id);
671     shash_init(&m->tables);
672
673     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
674         const struct ovsdb_table *table;
675         struct ovsdb_jsonrpc_monitor_table *mt;
676         const struct json *columns_json, *select_json;
677         struct ovsdb_parser parser;
678
679         table = ovsdb_get_table(s->remote->server->db, node->name);
680         if (!table) {
681             error = ovsdb_syntax_error(NULL, NULL,
682                                        "no table named %s", node->name);
683             goto error;
684         }
685
686         mt = xzalloc(sizeof *mt);
687         mt->table = table;
688         mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
689         ovsdb_column_set_init(&mt->columns);
690         shash_add(&m->tables, table->schema->name, mt);
691
692         ovsdb_parser_init(&parser, node->data, "table %s", node->name);
693         columns_json = ovsdb_parser_member(&parser, "columns",
694                                            OP_ARRAY | OP_OPTIONAL);
695         select_json = ovsdb_parser_member(&parser, "select",
696                                           OP_OBJECT | OP_OPTIONAL);
697         error = ovsdb_parser_finish(&parser);
698         if (error) {
699             goto error;
700         }
701
702         if (columns_json) {
703             error = ovsdb_column_set_from_json(columns_json, table,
704                                                &mt->columns);
705             if (error) {
706                 goto error;
707             }
708         } else {
709             struct shash_node *node;
710
711             SHASH_FOR_EACH (node, &table->schema->columns) {
712                 const struct ovsdb_column *column = node->data;
713                 if (column->index != OVSDB_COL_UUID) {
714                     ovsdb_column_set_add(&mt->columns, column);
715                 }
716             }
717         }
718
719         if (select_json) {
720             mt->select = 0;
721             ovsdb_parser_init(&parser, select_json, "table %s select",
722                               table->schema->name);
723             if (parse_bool(&parser, "initial", true)) {
724                 mt->select |= OJMS_INITIAL;
725             }
726             if (parse_bool(&parser, "insert", true)) {
727                 mt->select |= OJMS_INSERT;
728             }
729             if (parse_bool(&parser, "delete", true)) {
730                 mt->select |= OJMS_DELETE;
731             }
732             if (parse_bool(&parser, "modify", true)) {
733                 mt->select |= OJMS_MODIFY;
734             }
735             error = ovsdb_parser_finish(&parser);
736             if (error) {
737                 goto error;
738             }
739         }
740     }
741
742     return ovsdb_jsonrpc_monitor_get_initial(m);
743
744 error:
745     if (m) {
746         ovsdb_remove_replica(s->remote->server->db, &m->replica);
747     }
748
749     json = ovsdb_error_to_json(error);
750     ovsdb_error_destroy(error);
751     return json;
752 }
753
754 static struct jsonrpc_msg *
755 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
756                              struct json_array *params,
757                              const struct json *request_id)
758 {
759     if (params->n != 1) {
760         return jsonrpc_create_error(json_string_create("invalid parameters"),
761                                     request_id);
762     } else {
763         struct ovsdb_jsonrpc_monitor *m;
764
765         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
766         if (!m) {
767             return jsonrpc_create_error(json_string_create("unknown monitor"),
768                                         request_id);
769         } else {
770             ovsdb_remove_replica(s->remote->server->db, &m->replica);
771             return jsonrpc_create_reply(json_object_create(), request_id);
772         }
773     }
774 }
775
776 static void
777 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
778 {
779     struct ovsdb_jsonrpc_monitor *m, *next;
780
781     HMAP_FOR_EACH_SAFE (m, next,
782                         struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
783         ovsdb_remove_replica(s->remote->server->db, &m->replica);
784     }
785 }
786
787 static struct ovsdb_jsonrpc_monitor *
788 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
789 {
790     assert(replica->class == &ovsdb_jsonrpc_replica_class);
791     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
792 }
793
794 struct ovsdb_jsonrpc_monitor_aux {
795     bool initial;               /* Sending initial contents of table? */
796     const struct ovsdb_jsonrpc_monitor *monitor;
797     struct json *json;          /* JSON for the whole transaction. */
798
799     /* Current table.  */
800     struct ovsdb_jsonrpc_monitor_table *mt;
801     struct json *table_json;    /* JSON for table's transaction. */
802 };
803
804 static bool
805 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
806                                 const struct ovsdb_row *new,
807                                 void *aux_)
808 {
809     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
810     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
811     struct ovsdb_table *table = new ? new->table : old->table;
812     enum ovsdb_jsonrpc_monitor_selection type;
813     struct json *old_json, *new_json;
814     struct json *row_json;
815     char uuid[UUID_LEN + 1];
816     int n_changed;
817     size_t i;
818
819     if (!aux->mt || table != aux->mt->table) {
820         aux->mt = shash_find_data(&m->tables, table->schema->name);
821         aux->table_json = NULL;
822         if (!aux->mt) {
823             /* We don't care about rows in this table at all.  Tell the caller
824              * to skip it.  */
825             return false;
826         }
827     }
828
829     type = (aux->initial ? OJMS_INITIAL
830             : !old ? OJMS_INSERT
831             : !new ? OJMS_DELETE
832             : OJMS_MODIFY);
833     if (!(aux->mt->select & type)) {
834         /* We don't care about this type of change (but do want to be called
835          * back for changes to other rows in the same table). */
836         return true;
837     }
838
839     old_json = new_json = NULL;
840     n_changed = 0;
841     for (i = 0; i < aux->mt->columns.n_columns; i++) {
842         const struct ovsdb_column *column = aux->mt->columns.columns[i];
843         unsigned int idx = column->index;
844         bool changed = false;
845
846         if (type == OJMS_MODIFY) {
847             changed = !ovsdb_datum_equals(&old->fields[idx],
848                                           &new->fields[idx], &column->type);
849             n_changed += changed;
850         }
851         if (changed || type == OJMS_DELETE) {
852             if (!old_json) {
853                 old_json = json_object_create();
854             }
855             json_object_put(old_json, column->name,
856                             ovsdb_datum_to_json(&old->fields[idx],
857                                                 &column->type));
858         }
859         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
860             if (!new_json) {
861                 new_json = json_object_create();
862             }
863             json_object_put(new_json, column->name,
864                             ovsdb_datum_to_json(&new->fields[idx],
865                                                 &column->type));
866         }
867     }
868     if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) {
869         /* No reportable changes. */
870         json_destroy(old_json);
871         json_destroy(new_json);
872         return true;
873     }
874
875     /* Create JSON object for transaction overall. */
876     if (!aux->json) {
877         aux->json = json_object_create();
878     }
879
880     /* Create JSON object for transaction on this table. */
881     if (!aux->table_json) {
882         aux->table_json = json_object_create();
883         json_object_put(aux->json, aux->mt->table->schema->name,
884                         aux->table_json);
885     }
886
887     /* Create JSON object for transaction on this row. */
888     row_json = json_object_create();
889     if (old_json) {
890         json_object_put(row_json, "old", old_json);
891     }
892     if (new_json) {
893         json_object_put(row_json, "new", new_json);
894     }
895
896     /* Add JSON row to JSON table. */
897     snprintf(uuid, sizeof uuid,
898              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
899     json_object_put(aux->table_json, uuid, row_json);
900
901     return true;
902 }
903
904 static void
905 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
906                                const struct ovsdb_jsonrpc_monitor *m,
907                                bool initial)
908 {
909     aux->initial = initial;
910     aux->monitor = m;
911     aux->json = NULL;
912     aux->mt = NULL;
913     aux->table_json = NULL;
914 }
915
916 static struct ovsdb_error *
917 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
918                              const struct ovsdb_txn *txn, bool durable UNUSED)
919 {
920     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
921     struct ovsdb_jsonrpc_monitor_aux aux;
922
923     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
924     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
925     if (aux.json) {
926         struct jsonrpc_msg *msg;
927         struct json *params;
928
929         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
930                                      aux.json);
931         msg = jsonrpc_create_notify("update", params);
932         jsonrpc_session_send(aux.monitor->session->js, msg);
933     }
934
935     return NULL;
936 }
937
938 static struct json *
939 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
940 {
941     struct ovsdb_jsonrpc_monitor_aux aux;
942     struct shash_node *node;
943
944     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
945     SHASH_FOR_EACH (node, &m->tables) {
946         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
947
948         if (mt->select & OJMS_INITIAL) {
949             struct ovsdb_row *row;
950
951             HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
952                            &mt->table->rows) {
953                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux);
954             }
955         }
956     }
957     return aux.json ? aux.json : json_object_create();
958 }
959
960 static void
961 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
962 {
963     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
964     struct shash_node *node;
965
966     json_destroy(m->monitor_id);
967     SHASH_FOR_EACH (node, &m->tables) {
968         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
969         ovsdb_column_set_destroy(&mt->columns);
970         free(mt);
971     }
972     shash_destroy(&m->tables);
973     hmap_remove(&m->session->monitors, &m->node);
974     free(m);
975 }
976
977 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
978     ovsdb_jsonrpc_monitor_commit,
979     ovsdb_jsonrpc_monitor_destroy
980 };