ovsdb-server: Add support for multiple databases.
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <assert.h>
21 #include <errno.h>
22
23 #include "bitmap.h"
24 #include "column.h"
25 #include "dynamic-string.h"
26 #include "json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
30 #include "ovsdb.h"
31 #include "reconnect.h"
32 #include "row.h"
33 #include "server.h"
34 #include "simap.h"
35 #include "stream.h"
36 #include "table.h"
37 #include "timeval.h"
38 #include "transaction.h"
39 #include "trigger.h"
40 #include "vlog.h"
41
42 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
43
44 struct ovsdb_jsonrpc_remote;
45 struct ovsdb_jsonrpc_session;
46
47 /* Message rate-limiting. */
48 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
49
50 /* Sessions. */
51 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
52     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
53 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
54 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
55 static void ovsdb_jsonrpc_session_get_memory_usage_all(
56     const struct ovsdb_jsonrpc_remote *, struct simap *usage);
57 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
58 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
59 static void ovsdb_jsonrpc_session_set_all_options(
60     struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
61 static bool ovsdb_jsonrpc_session_get_status(
62     const struct ovsdb_jsonrpc_remote *,
63     struct ovsdb_jsonrpc_remote_status *);
64 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
65 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
66
67 /* Triggers. */
68 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
69                                          struct ovsdb *,
70                                          struct json *id, struct json *params);
71 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
72     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
73 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
74 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
75 static void ovsdb_jsonrpc_trigger_complete_done(
76     struct ovsdb_jsonrpc_session *);
77
78 /* Monitors. */
79 static struct json *ovsdb_jsonrpc_monitor_create(
80     struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params);
81 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
82     struct ovsdb_jsonrpc_session *,
83     struct json_array *params,
84     const struct json *request_id);
85 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
86 \f
87 /* JSON-RPC database server. */
88
89 struct ovsdb_jsonrpc_server {
90     struct ovsdb_server up;
91     unsigned int n_sessions, max_sessions;
92     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
93 };
94
95 /* A configured remote.  This is either a passive stream listener plus a list
96  * of the currently connected sessions, or a list of exactly one active
97  * session. */
98 struct ovsdb_jsonrpc_remote {
99     struct ovsdb_jsonrpc_server *server;
100     struct pstream *listener;   /* Listener, if passive. */
101     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
102 };
103
104 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
105     struct ovsdb_jsonrpc_server *, const char *name,
106     const struct ovsdb_jsonrpc_options *options
107 );
108 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
109
110 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
111  *
112  * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
113  * which 'server' should provide access. */
114 struct ovsdb_jsonrpc_server *
115 ovsdb_jsonrpc_server_create(void)
116 {
117     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
118     ovsdb_server_init(&server->up);
119     server->max_sessions = 64;
120     shash_init(&server->remotes);
121     return server;
122 }
123
124 /* Adds 'db' to the set of databases served out by 'svr'.  Returns true if
125  * successful, false if 'db''s name is the same as some database already in
126  * 'server'. */
127 bool
128 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *svr, struct ovsdb *db)
129 {
130     return ovsdb_server_add_db(&svr->up, db);
131 }
132
133 void
134 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
135 {
136     struct shash_node *node, *next;
137
138     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
139         ovsdb_jsonrpc_server_del_remote(node);
140     }
141     shash_destroy(&svr->remotes);
142     ovsdb_server_destroy(&svr->up);
143     free(svr);
144 }
145
146 struct ovsdb_jsonrpc_options *
147 ovsdb_jsonrpc_default_options(const char *target)
148 {
149     struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
150     options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
151     options->probe_interval = (stream_or_pstream_needs_probes(target)
152                                ? RECONNECT_DEFAULT_PROBE_INTERVAL
153                                : 0);
154     return options;
155 }
156
157 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
158  * options in the struct ovsdb_jsonrpc_options supplied as the data values.
159  *
160  * A remote is an active or passive stream connection method, e.g. "pssl:" or
161  * "tcp:1.2.3.4". */
162 void
163 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
164                                  const struct shash *new_remotes)
165 {
166     struct shash_node *node, *next;
167
168     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
169         if (!shash_find(new_remotes, node->name)) {
170             VLOG_INFO("%s: remote deconfigured", node->name);
171             ovsdb_jsonrpc_server_del_remote(node);
172         }
173     }
174     SHASH_FOR_EACH (node, new_remotes) {
175         const struct ovsdb_jsonrpc_options *options = node->data;
176         struct ovsdb_jsonrpc_remote *remote;
177
178         remote = shash_find_data(&svr->remotes, node->name);
179         if (!remote) {
180             remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
181             if (!remote) {
182                 continue;
183             }
184         }
185
186         ovsdb_jsonrpc_session_set_all_options(remote, options);
187     }
188 }
189
190 static struct ovsdb_jsonrpc_remote *
191 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
192                                 const char *name,
193                                 const struct ovsdb_jsonrpc_options *options)
194 {
195     struct ovsdb_jsonrpc_remote *remote;
196     struct pstream *listener;
197     int error;
198
199     error = jsonrpc_pstream_open(name, &listener, options->dscp);
200     if (error && error != EAFNOSUPPORT) {
201         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
202         return NULL;
203     }
204
205     remote = xmalloc(sizeof *remote);
206     remote->server = svr;
207     remote->listener = listener;
208     list_init(&remote->sessions);
209     shash_add(&svr->remotes, name, remote);
210
211     if (!listener) {
212         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
213     }
214     return remote;
215 }
216
217 static void
218 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
219 {
220     struct ovsdb_jsonrpc_remote *remote = node->data;
221
222     ovsdb_jsonrpc_session_close_all(remote);
223     pstream_close(remote->listener);
224     shash_delete(&remote->server->remotes, node);
225     free(remote);
226 }
227
228 /* Stores status information for the remote named 'target', which should have
229  * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
230  * into '*status'.  On success returns true, on failure (if 'svr' doesn't have
231  * a remote named 'target' or if that remote is an inbound remote that has no
232  * active connections) returns false.  On failure, 'status' will be zeroed.
233  */
234 bool
235 ovsdb_jsonrpc_server_get_remote_status(
236     const struct ovsdb_jsonrpc_server *svr, const char *target,
237     struct ovsdb_jsonrpc_remote_status *status)
238 {
239     const struct ovsdb_jsonrpc_remote *remote;
240
241     memset(status, 0, sizeof *status);
242
243     remote = shash_find_data(&svr->remotes, target);
244     return remote && ovsdb_jsonrpc_session_get_status(remote, status);
245 }
246
247 void
248 ovsdb_jsonrpc_server_free_remote_status(
249     struct ovsdb_jsonrpc_remote_status *status)
250 {
251     free(status->locks_held);
252     free(status->locks_waiting);
253     free(status->locks_lost);
254 }
255
256 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
257  * reconnect. */
258 void
259 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
260 {
261     struct shash_node *node;
262
263     SHASH_FOR_EACH (node, &svr->remotes) {
264         struct ovsdb_jsonrpc_remote *remote = node->data;
265
266         ovsdb_jsonrpc_session_reconnect_all(remote);
267     }
268 }
269
270 void
271 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
272 {
273     struct shash_node *node;
274
275     SHASH_FOR_EACH (node, &svr->remotes) {
276         struct ovsdb_jsonrpc_remote *remote = node->data;
277
278         if (remote->listener && svr->n_sessions < svr->max_sessions) {
279             struct stream *stream;
280             int error;
281
282             error = pstream_accept(remote->listener, &stream);
283             if (!error) {
284                 struct jsonrpc_session *js;
285                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream));
286                 ovsdb_jsonrpc_session_create(remote, js);
287             } else if (error != EAGAIN) {
288                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
289                              pstream_get_name(remote->listener),
290                              strerror(error));
291             }
292         }
293
294         ovsdb_jsonrpc_session_run_all(remote);
295     }
296 }
297
298 void
299 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
300 {
301     struct shash_node *node;
302
303     SHASH_FOR_EACH (node, &svr->remotes) {
304         struct ovsdb_jsonrpc_remote *remote = node->data;
305
306         if (remote->listener && svr->n_sessions < svr->max_sessions) {
307             pstream_wait(remote->listener);
308         }
309
310         ovsdb_jsonrpc_session_wait_all(remote);
311     }
312 }
313
314 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
315  * memory_report(). */
316 void
317 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
318                                       struct simap *usage)
319 {
320     struct shash_node *node;
321
322     simap_increase(usage, "sessions", svr->n_sessions);
323     SHASH_FOR_EACH (node, &svr->remotes) {
324         struct ovsdb_jsonrpc_remote *remote = node->data;
325
326         ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
327     }
328 }
329 \f
330 /* JSON-RPC database server session. */
331
332 struct ovsdb_jsonrpc_session {
333     struct list node;           /* Element in remote's sessions list. */
334     struct ovsdb_session up;
335     struct ovsdb_jsonrpc_remote *remote;
336
337     /* Triggers. */
338     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
339
340     /* Monitors. */
341     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
342
343     /* Network connectivity. */
344     struct jsonrpc_session *js;  /* JSON-RPC session. */
345     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
346 };
347
348 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
349 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
350 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
351 static void ovsdb_jsonrpc_session_get_memory_usage(
352     const struct ovsdb_jsonrpc_session *, struct simap *usage);
353 static void ovsdb_jsonrpc_session_set_options(
354     struct ovsdb_jsonrpc_session *, const struct ovsdb_jsonrpc_options *);
355 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
356                                              struct jsonrpc_msg *);
357 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
358                                              struct jsonrpc_msg *);
359
360 static struct ovsdb_jsonrpc_session *
361 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
362                              struct jsonrpc_session *js)
363 {
364     struct ovsdb_jsonrpc_session *s;
365
366     s = xzalloc(sizeof *s);
367     ovsdb_session_init(&s->up, &remote->server->up);
368     s->remote = remote;
369     list_push_back(&remote->sessions, &s->node);
370     hmap_init(&s->triggers);
371     hmap_init(&s->monitors);
372     s->js = js;
373     s->js_seqno = jsonrpc_session_get_seqno(js);
374
375     remote->server->n_sessions++;
376
377     return s;
378 }
379
380 static void
381 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
382 {
383     ovsdb_jsonrpc_monitor_remove_all(s);
384     ovsdb_jsonrpc_session_unlock_all(s);
385     jsonrpc_session_close(s->js);
386     list_remove(&s->node);
387     ovsdb_session_destroy(&s->up);
388     s->remote->server->n_sessions--;
389     ovsdb_session_destroy(&s->up);
390     free(s);
391 }
392
393 static int
394 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
395 {
396     jsonrpc_session_run(s->js);
397     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
398         s->js_seqno = jsonrpc_session_get_seqno(s->js);
399         ovsdb_jsonrpc_trigger_complete_all(s);
400         ovsdb_jsonrpc_monitor_remove_all(s);
401         ovsdb_jsonrpc_session_unlock_all(s);
402     }
403
404     ovsdb_jsonrpc_trigger_complete_done(s);
405
406     if (!jsonrpc_session_get_backlog(s->js)) {
407         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
408         if (msg) {
409             if (msg->type == JSONRPC_REQUEST) {
410                 ovsdb_jsonrpc_session_got_request(s, msg);
411             } else if (msg->type == JSONRPC_NOTIFY) {
412                 ovsdb_jsonrpc_session_got_notify(s, msg);
413             } else {
414                 VLOG_WARN("%s: received unexpected %s message",
415                           jsonrpc_session_get_name(s->js),
416                           jsonrpc_msg_type_to_string(msg->type));
417                 jsonrpc_session_force_reconnect(s->js);
418                 jsonrpc_msg_destroy(msg);
419             }
420         }
421     }
422     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
423 }
424
425 static void
426 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
427                                   const struct ovsdb_jsonrpc_options *options)
428 {
429     jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
430     jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
431     jsonrpc_session_set_dscp(session->js, options->dscp);
432 }
433
434 static void
435 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
436 {
437     struct ovsdb_jsonrpc_session *s, *next;
438
439     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
440         int error = ovsdb_jsonrpc_session_run(s);
441         if (error) {
442             ovsdb_jsonrpc_session_close(s);
443         }
444     }
445 }
446
447 static void
448 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
449 {
450     jsonrpc_session_wait(s->js);
451     if (!jsonrpc_session_get_backlog(s->js)) {
452         jsonrpc_session_recv_wait(s->js);
453     }
454 }
455
456 static void
457 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
458 {
459     struct ovsdb_jsonrpc_session *s;
460
461     LIST_FOR_EACH (s, node, &remote->sessions) {
462         ovsdb_jsonrpc_session_wait(s);
463     }
464 }
465
466 static void
467 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
468                                        struct simap *usage)
469 {
470     simap_increase(usage, "triggers", hmap_count(&s->triggers));
471     simap_increase(usage, "monitors", hmap_count(&s->monitors));
472     simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
473 }
474
475 static void
476 ovsdb_jsonrpc_session_get_memory_usage_all(
477     const struct ovsdb_jsonrpc_remote *remote,
478     struct simap *usage)
479 {
480     struct ovsdb_jsonrpc_session *s;
481
482     LIST_FOR_EACH (s, node, &remote->sessions) {
483         ovsdb_jsonrpc_session_get_memory_usage(s, usage);
484     }
485 }
486
487 static void
488 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
489 {
490     struct ovsdb_jsonrpc_session *s, *next;
491
492     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
493         ovsdb_jsonrpc_session_close(s);
494     }
495 }
496
497 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
498  * reconnect. */
499 static void
500 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
501 {
502     struct ovsdb_jsonrpc_session *s, *next;
503
504     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
505         jsonrpc_session_force_reconnect(s->js);
506         if (!jsonrpc_session_is_alive(s->js)) {
507             ovsdb_jsonrpc_session_close(s);
508         }
509     }
510 }
511
512 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
513  * 'options'. */
514 static void
515 ovsdb_jsonrpc_session_set_all_options(
516     struct ovsdb_jsonrpc_remote *remote,
517     const struct ovsdb_jsonrpc_options *options)
518 {
519     struct ovsdb_jsonrpc_session *s;
520
521     LIST_FOR_EACH (s, node, &remote->sessions) {
522         ovsdb_jsonrpc_session_set_options(s, options);
523     }
524 }
525
526 static bool
527 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_remote *remote,
528                                  struct ovsdb_jsonrpc_remote_status *status)
529 {
530     const struct ovsdb_jsonrpc_session *s;
531     const struct jsonrpc_session *js;
532     struct ovsdb_lock_waiter *waiter;
533     struct reconnect_stats rstats;
534     struct ds locks_held, locks_waiting, locks_lost;
535
536     if (list_is_empty(&remote->sessions)) {
537         return false;
538     }
539     s = CONTAINER_OF(remote->sessions.next, struct ovsdb_jsonrpc_session, node);
540     js = s->js;
541
542     status->is_connected = jsonrpc_session_is_connected(js);
543     status->last_error = jsonrpc_session_get_status(js);
544
545     jsonrpc_session_get_reconnect_stats(js, &rstats);
546     status->state = rstats.state;
547     status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
548         ? UINT_MAX : rstats.msec_since_connect / 1000;
549     status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
550         ? UINT_MAX : rstats.msec_since_disconnect / 1000;
551
552     ds_init(&locks_held);
553     ds_init(&locks_waiting);
554     ds_init(&locks_lost);
555     HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
556         struct ds *string;
557
558         string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
559                   : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
560                   : &locks_lost);
561         if (string->length) {
562             ds_put_char(string, ' ');
563         }
564         ds_put_cstr(string, waiter->lock_name);
565     }
566     status->locks_held = ds_steal_cstr(&locks_held);
567     status->locks_waiting = ds_steal_cstr(&locks_waiting);
568     status->locks_lost = ds_steal_cstr(&locks_lost);
569
570     status->n_connections = list_size(&remote->sessions);
571
572     return true;
573 }
574
575 /* Examines 'request' to determine the database to which it relates, and then
576  * searches 's' to find that database:
577  *
578  *    - If successful, returns the database and sets '*replyp' to NULL.
579  *
580  *    - If no such database exists, returns NULL and sets '*replyp' to an
581  *      appropriate JSON-RPC error reply, owned by the caller. */
582 static struct ovsdb *
583 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session *s,
584                         const struct jsonrpc_msg *request,
585                         struct jsonrpc_msg **replyp)
586 {
587     struct json_array *params;
588     struct ovsdb_error *error;
589     const char *db_name;
590     struct ovsdb *db;
591
592     params = json_array(request->params);
593     if (!params->n || params->elems[0]->type != JSON_STRING) {
594         error = ovsdb_syntax_error(
595             request->params, NULL,
596             "%s request params must begin with <db-name>", request->method);
597         goto error;
598     }
599
600     db_name = params->elems[0]->u.string;
601     db = shash_find_data(&s->up.server->dbs, db_name);
602     if (!db) {
603         error = ovsdb_syntax_error(
604             request->params, "unknown database",
605             "%s request specifies unknown database %s",
606             request->method, db_name);
607         goto error;
608     }
609
610     *replyp = NULL;
611     return db;
612
613 error:
614     *replyp = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
615     ovsdb_error_destroy(error);
616     return NULL;
617 }
618
619 static struct ovsdb_error *
620 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
621                                       const char **lock_namep)
622 {
623     const struct json_array *params;
624
625     params = json_array(request->params);
626     if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
627         !ovsdb_parser_is_id(json_string(params->elems[0]))) {
628         *lock_namep = NULL;
629         return ovsdb_syntax_error(request->params, NULL,
630                                   "%s request params must be <id>",
631                                   request->method);
632     }
633
634     *lock_namep = json_string(params->elems[0]);
635     return NULL;
636 }
637
638 static void
639 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
640                              const char *lock_name,
641                              const char *method)
642 {
643     struct ovsdb_jsonrpc_session *s;
644     struct json *params;
645
646     s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
647     params = json_array_create_1(json_string_create(lock_name));
648     jsonrpc_session_send(s->js, jsonrpc_create_notify(method, params));
649 }
650
651 static struct jsonrpc_msg *
652 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
653                            struct jsonrpc_msg *request,
654                            enum ovsdb_lock_mode mode)
655 {
656     struct ovsdb_lock_waiter *waiter;
657     struct jsonrpc_msg *reply;
658     struct ovsdb_error *error;
659     struct ovsdb_session *victim;
660     const char *lock_name;
661     struct json *result;
662
663     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
664     if (error) {
665         goto error;
666     }
667
668     /* Report error if this session has issued a "lock" or "steal" without a
669      * matching "unlock" for this lock. */
670     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
671     if (waiter) {
672         error = ovsdb_syntax_error(
673             request->params, NULL,
674             "must issue \"unlock\" before new \"%s\"", request->method);
675         goto error;
676     }
677
678     /* Get the lock, add us as a waiter. */
679     waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
680                                &victim);
681     if (victim) {
682         ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
683     }
684
685     result = json_object_create();
686     json_object_put(result, "locked",
687                     json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
688
689     return jsonrpc_create_reply(result, request->id);
690
691 error:
692     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
693     ovsdb_error_destroy(error);
694     return reply;
695 }
696
697 static void
698 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
699 {
700     struct ovsdb_lock_waiter *waiter, *next;
701
702     HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
703         ovsdb_jsonrpc_session_unlock__(waiter);
704     }
705 }
706
707 static void
708 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
709 {
710     struct ovsdb_lock *lock = waiter->lock;
711
712     if (lock) {
713         struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
714         if (new_owner) {
715             ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
716         } else {
717             /* ovsdb_server_lock() might have freed 'lock'. */
718         }
719     }
720
721     ovsdb_lock_waiter_destroy(waiter);
722 }
723
724 static struct jsonrpc_msg *
725 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
726                              struct jsonrpc_msg *request)
727 {
728     struct ovsdb_lock_waiter *waiter;
729     struct jsonrpc_msg *reply;
730     struct ovsdb_error *error;
731     const char *lock_name;
732
733     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
734     if (error) {
735         goto error;
736     }
737
738     /* Report error if this session has not issued a "lock" or "steal" for this
739      * lock. */
740     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
741     if (!waiter) {
742         error = ovsdb_syntax_error(
743             request->params, NULL, "\"unlock\" without \"lock\" or \"steal\"");
744         goto error;
745     }
746
747     ovsdb_jsonrpc_session_unlock__(waiter);
748
749     return jsonrpc_create_reply(json_object_create(), request->id);
750
751 error:
752     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
753     ovsdb_error_destroy(error);
754     return reply;
755 }
756
757 static struct jsonrpc_msg *
758 execute_transaction(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
759                     struct jsonrpc_msg *request)
760 {
761     ovsdb_jsonrpc_trigger_create(s, db, request->id, request->params);
762     request->id = NULL;
763     request->params = NULL;
764     jsonrpc_msg_destroy(request);
765     return NULL;
766 }
767
768 static void
769 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
770                                   struct jsonrpc_msg *request)
771 {
772     struct jsonrpc_msg *reply;
773
774     if (!strcmp(request->method, "transact")) {
775         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
776         if (!reply) {
777             reply = execute_transaction(s, db, request);
778         }
779     } else if (!strcmp(request->method, "monitor")) {
780         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
781         if (!reply) {
782             reply = jsonrpc_create_reply(
783                 ovsdb_jsonrpc_monitor_create(s, db, request->params),
784                 request->id);
785         }
786     } else if (!strcmp(request->method, "monitor_cancel")) {
787         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
788                                              request->id);
789     } else if (!strcmp(request->method, "get_schema")) {
790         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
791         if (!reply) {
792             reply = jsonrpc_create_reply(ovsdb_schema_to_json(db->schema),
793                                          request->id);
794         }
795     } else if (!strcmp(request->method, "list_dbs")) {
796         size_t n_dbs = shash_count(&s->up.server->dbs);
797         struct shash_node *node;
798         struct json **dbs;
799         size_t i;
800
801         dbs = xmalloc(n_dbs * sizeof *dbs);
802         i = 0;
803         SHASH_FOR_EACH (node, &s->up.server->dbs) {
804             dbs[i++] = json_string_create(node->name);
805         }
806         reply = jsonrpc_create_reply(json_array_create(dbs, n_dbs),
807                                      request->id);
808     } else if (!strcmp(request->method, "lock")) {
809         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
810     } else if (!strcmp(request->method, "steal")) {
811         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
812     } else if (!strcmp(request->method, "unlock")) {
813         reply = ovsdb_jsonrpc_session_unlock(s, request);
814     } else if (!strcmp(request->method, "echo")) {
815         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
816     } else {
817         reply = jsonrpc_create_error(json_string_create("unknown method"),
818                                      request->id);
819     }
820
821     if (reply) {
822         jsonrpc_msg_destroy(request);
823         jsonrpc_session_send(s->js, reply);
824     }
825 }
826
827 static void
828 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
829 {
830     if (json_array(request->params)->n == 1) {
831         struct ovsdb_jsonrpc_trigger *t;
832         struct json *id;
833
834         id = request->params->u.array.elems[0];
835         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
836         if (t) {
837             ovsdb_jsonrpc_trigger_complete(t);
838         }
839     }
840 }
841
842 static void
843 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
844                                  struct jsonrpc_msg *request)
845 {
846     if (!strcmp(request->method, "cancel")) {
847         execute_cancel(s, request);
848     }
849     jsonrpc_msg_destroy(request);
850 }
851 \f
852 /* JSON-RPC database server triggers.
853  *
854  * (Every transaction is treated as a trigger even if it doesn't actually have
855  * any "wait" operations.) */
856
857 struct ovsdb_jsonrpc_trigger {
858     struct ovsdb_trigger trigger;
859     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
860     struct json *id;
861 };
862
863 static void
864 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
865                              struct json *id, struct json *params)
866 {
867     struct ovsdb_jsonrpc_trigger *t;
868     size_t hash;
869
870     /* Check for duplicate ID. */
871     hash = json_hash(id, 0);
872     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
873     if (t) {
874         struct jsonrpc_msg *msg;
875
876         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
877                                    id);
878         jsonrpc_session_send(s->js, msg);
879         json_destroy(id);
880         json_destroy(params);
881         return;
882     }
883
884     /* Insert into trigger table. */
885     t = xmalloc(sizeof *t);
886     ovsdb_trigger_init(&s->up, db, &t->trigger, params, time_msec());
887     t->id = id;
888     hmap_insert(&s->triggers, &t->hmap_node, hash);
889
890     /* Complete early if possible. */
891     if (ovsdb_trigger_is_complete(&t->trigger)) {
892         ovsdb_jsonrpc_trigger_complete(t);
893     }
894 }
895
896 static struct ovsdb_jsonrpc_trigger *
897 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
898                            const struct json *id, size_t hash)
899 {
900     struct ovsdb_jsonrpc_trigger *t;
901
902     HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
903         if (json_equal(t->id, id)) {
904             return t;
905         }
906     }
907
908     return NULL;
909 }
910
911 static void
912 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
913 {
914     struct ovsdb_jsonrpc_session *s;
915
916     s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
917
918     if (jsonrpc_session_is_connected(s->js)) {
919         struct jsonrpc_msg *reply;
920         struct json *result;
921
922         result = ovsdb_trigger_steal_result(&t->trigger);
923         if (result) {
924             reply = jsonrpc_create_reply(result, t->id);
925         } else {
926             reply = jsonrpc_create_error(json_string_create("canceled"),
927                                          t->id);
928         }
929         jsonrpc_session_send(s->js, reply);
930     }
931
932     json_destroy(t->id);
933     ovsdb_trigger_destroy(&t->trigger);
934     hmap_remove(&s->triggers, &t->hmap_node);
935     free(t);
936 }
937
938 static void
939 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
940 {
941     struct ovsdb_jsonrpc_trigger *t, *next;
942     HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
943         ovsdb_jsonrpc_trigger_complete(t);
944     }
945 }
946
947 static void
948 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
949 {
950     while (!list_is_empty(&s->up.completions)) {
951         struct ovsdb_jsonrpc_trigger *t
952             = CONTAINER_OF(s->up.completions.next,
953                            struct ovsdb_jsonrpc_trigger, trigger.node);
954         ovsdb_jsonrpc_trigger_complete(t);
955     }
956 }
957 \f
958 /* JSON-RPC database table monitors. */
959
960 enum ovsdb_jsonrpc_monitor_selection {
961     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
962     OJMS_INSERT = 1 << 1,       /* New rows. */
963     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
964     OJMS_MODIFY = 1 << 3        /* Modified rows. */
965 };
966
967 /* A particular column being monitored. */
968 struct ovsdb_jsonrpc_monitor_column {
969     const struct ovsdb_column *column;
970     enum ovsdb_jsonrpc_monitor_selection select;
971 };
972
973 /* A particular table being monitored. */
974 struct ovsdb_jsonrpc_monitor_table {
975     const struct ovsdb_table *table;
976
977     /* This is the union (bitwise-OR) of the 'select' values in all of the
978      * members of 'columns' below. */
979     enum ovsdb_jsonrpc_monitor_selection select;
980
981     /* Columns being monitored. */
982     struct ovsdb_jsonrpc_monitor_column *columns;
983     size_t n_columns;
984 };
985
986 /* A collection of tables being monitored. */
987 struct ovsdb_jsonrpc_monitor {
988     struct ovsdb_replica replica;
989     struct ovsdb_jsonrpc_session *session;
990     struct ovsdb *db;
991     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
992
993     struct json *monitor_id;
994     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
995 };
996
997 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
998
999 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
1000     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
1001 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
1002 static struct json *ovsdb_jsonrpc_monitor_get_initial(
1003     const struct ovsdb_jsonrpc_monitor *);
1004
1005 static bool
1006 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
1007 {
1008     const struct json *json;
1009
1010     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
1011     return json ? json_boolean(json) : default_value;
1012 }
1013
1014 struct ovsdb_jsonrpc_monitor *
1015 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
1016                            const struct json *monitor_id)
1017 {
1018     struct ovsdb_jsonrpc_monitor *m;
1019
1020     HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
1021         if (json_equal(m->monitor_id, monitor_id)) {
1022             return m;
1023         }
1024     }
1025
1026     return NULL;
1027 }
1028
1029 static void
1030 ovsdb_jsonrpc_add_monitor_column(struct ovsdb_jsonrpc_monitor_table *mt,
1031                                  const struct ovsdb_column *column,
1032                                  enum ovsdb_jsonrpc_monitor_selection select,
1033                                  size_t *allocated_columns)
1034 {
1035     struct ovsdb_jsonrpc_monitor_column *c;
1036
1037     if (mt->n_columns >= *allocated_columns) {
1038         mt->columns = x2nrealloc(mt->columns, allocated_columns,
1039                                  sizeof *mt->columns);
1040     }
1041
1042     c = &mt->columns[mt->n_columns++];
1043     c->column = column;
1044     c->select = select;
1045 }
1046
1047 static int
1048 compare_ovsdb_jsonrpc_monitor_column(const void *a_, const void *b_)
1049 {
1050     const struct ovsdb_jsonrpc_monitor_column *a = a_;
1051     const struct ovsdb_jsonrpc_monitor_column *b = b_;
1052
1053     return a->column < b->column ? -1 : a->column > b->column;
1054 }
1055
1056 static struct ovsdb_error * WARN_UNUSED_RESULT
1057 ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
1058                                     const struct json *monitor_request,
1059                                     size_t *allocated_columns)
1060 {
1061     const struct ovsdb_table_schema *ts = mt->table->schema;
1062     enum ovsdb_jsonrpc_monitor_selection select;
1063     const struct json *columns, *select_json;
1064     struct ovsdb_parser parser;
1065     struct ovsdb_error *error;
1066
1067     ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
1068     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1069     select_json = ovsdb_parser_member(&parser, "select",
1070                                       OP_OBJECT | OP_OPTIONAL);
1071     error = ovsdb_parser_finish(&parser);
1072     if (error) {
1073         return error;
1074     }
1075
1076     if (select_json) {
1077         select = 0;
1078         ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1079         if (parse_bool(&parser, "initial", true)) {
1080             select |= OJMS_INITIAL;
1081         }
1082         if (parse_bool(&parser, "insert", true)) {
1083             select |= OJMS_INSERT;
1084         }
1085         if (parse_bool(&parser, "delete", true)) {
1086             select |= OJMS_DELETE;
1087         }
1088         if (parse_bool(&parser, "modify", true)) {
1089             select |= OJMS_MODIFY;
1090         }
1091         error = ovsdb_parser_finish(&parser);
1092         if (error) {
1093             return error;
1094         }
1095     } else {
1096         select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1097     }
1098     mt->select |= select;
1099
1100     if (columns) {
1101         size_t i;
1102
1103         if (columns->type != JSON_ARRAY) {
1104             return ovsdb_syntax_error(columns, NULL,
1105                                       "array of column names expected");
1106         }
1107
1108         for (i = 0; i < columns->u.array.n; i++) {
1109             const struct ovsdb_column *column;
1110             const char *s;
1111
1112             if (columns->u.array.elems[i]->type != JSON_STRING) {
1113                 return ovsdb_syntax_error(columns, NULL,
1114                                           "array of column names expected");
1115             }
1116
1117             s = columns->u.array.elems[i]->u.string;
1118             column = shash_find_data(&mt->table->schema->columns, s);
1119             if (!column) {
1120                 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1121                                           "column name", s);
1122             }
1123             ovsdb_jsonrpc_add_monitor_column(mt, column, select,
1124                                              allocated_columns);
1125         }
1126     } else {
1127         struct shash_node *node;
1128
1129         SHASH_FOR_EACH (node, &ts->columns) {
1130             const struct ovsdb_column *column = node->data;
1131             if (column->index != OVSDB_COL_UUID) {
1132                 ovsdb_jsonrpc_add_monitor_column(mt, column, select,
1133                                                  allocated_columns);
1134             }
1135         }
1136     }
1137
1138     return NULL;
1139 }
1140
1141 static struct json *
1142 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1143                              struct json *params)
1144 {
1145     struct ovsdb_jsonrpc_monitor *m = NULL;
1146     struct json *monitor_id, *monitor_requests;
1147     struct ovsdb_error *error = NULL;
1148     struct shash_node *node;
1149     struct json *json;
1150
1151     if (json_array(params)->n != 3) {
1152         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1153         goto error;
1154     }
1155     monitor_id = params->u.array.elems[1];
1156     monitor_requests = params->u.array.elems[2];
1157     if (monitor_requests->type != JSON_OBJECT) {
1158         error = ovsdb_syntax_error(monitor_requests, NULL,
1159                                    "monitor-requests must be object");
1160         goto error;
1161     }
1162
1163     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1164         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1165         goto error;
1166     }
1167
1168     m = xzalloc(sizeof *m);
1169     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
1170     ovsdb_add_replica(db, &m->replica);
1171     m->session = s;
1172     m->db = db;
1173     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1174     m->monitor_id = json_clone(monitor_id);
1175     shash_init(&m->tables);
1176
1177     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1178         const struct ovsdb_table *table;
1179         struct ovsdb_jsonrpc_monitor_table *mt;
1180         size_t allocated_columns;
1181         const struct json *mr_value;
1182         size_t i;
1183
1184         table = ovsdb_get_table(m->db, node->name);
1185         if (!table) {
1186             error = ovsdb_syntax_error(NULL, NULL,
1187                                        "no table named %s", node->name);
1188             goto error;
1189         }
1190
1191         mt = xzalloc(sizeof *mt);
1192         mt->table = table;
1193         shash_add(&m->tables, table->schema->name, mt);
1194
1195         /* Parse columns. */
1196         mr_value = node->data;
1197         allocated_columns = 0;
1198         if (mr_value->type == JSON_ARRAY) {
1199             const struct json_array *array = &mr_value->u.array;
1200
1201             for (i = 0; i < array->n; i++) {
1202                 error = ovsdb_jsonrpc_parse_monitor_request(
1203                     mt, array->elems[i], &allocated_columns);
1204                 if (error) {
1205                     goto error;
1206                 }
1207             }
1208         } else {
1209             error = ovsdb_jsonrpc_parse_monitor_request(
1210                 mt, mr_value, &allocated_columns);
1211             if (error) {
1212                 goto error;
1213             }
1214         }
1215
1216         /* Check for duplicate columns. */
1217         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
1218               compare_ovsdb_jsonrpc_monitor_column);
1219         for (i = 1; i < mt->n_columns; i++) {
1220             if (mt->columns[i].column == mt->columns[i - 1].column) {
1221                 error = ovsdb_syntax_error(mr_value, NULL, "column %s "
1222                                            "mentioned more than once",
1223                                            mt->columns[i].column->name);
1224                 goto error;
1225             }
1226         }
1227     }
1228
1229     return ovsdb_jsonrpc_monitor_get_initial(m);
1230
1231 error:
1232     if (m) {
1233         ovsdb_remove_replica(m->db, &m->replica);
1234     }
1235
1236     json = ovsdb_error_to_json(error);
1237     ovsdb_error_destroy(error);
1238     return json;
1239 }
1240
1241 static struct jsonrpc_msg *
1242 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1243                              struct json_array *params,
1244                              const struct json *request_id)
1245 {
1246     if (params->n != 1) {
1247         return jsonrpc_create_error(json_string_create("invalid parameters"),
1248                                     request_id);
1249     } else {
1250         struct ovsdb_jsonrpc_monitor *m;
1251
1252         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1253         if (!m) {
1254             return jsonrpc_create_error(json_string_create("unknown monitor"),
1255                                         request_id);
1256         } else {
1257             ovsdb_remove_replica(m->db, &m->replica);
1258             return jsonrpc_create_reply(json_object_create(), request_id);
1259         }
1260     }
1261 }
1262
1263 static void
1264 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1265 {
1266     struct ovsdb_jsonrpc_monitor *m, *next;
1267
1268     HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1269         ovsdb_remove_replica(m->db, &m->replica);
1270     }
1271 }
1272
1273 static struct ovsdb_jsonrpc_monitor *
1274 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
1275 {
1276     assert(replica->class == &ovsdb_jsonrpc_replica_class);
1277     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
1278 }
1279
1280 struct ovsdb_jsonrpc_monitor_aux {
1281     bool initial;               /* Sending initial contents of table? */
1282     const struct ovsdb_jsonrpc_monitor *monitor;
1283     struct json *json;          /* JSON for the whole transaction. */
1284
1285     /* Current table.  */
1286     struct ovsdb_jsonrpc_monitor_table *mt;
1287     struct json *table_json;    /* JSON for table's transaction. */
1288 };
1289
1290 static bool
1291 any_reportable_change(const struct ovsdb_jsonrpc_monitor_table *mt,
1292                       const unsigned long int *changed)
1293 {
1294     size_t i;
1295
1296     for (i = 0; i < mt->n_columns; i++) {
1297         const struct ovsdb_jsonrpc_monitor_column *c = &mt->columns[i];
1298         unsigned int idx = c->column->index;
1299
1300         if (c->select & OJMS_MODIFY && bitmap_is_set(changed, idx)) {
1301             return true;
1302         }
1303     }
1304
1305     return false;
1306 }
1307
1308 static bool
1309 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
1310                                 const struct ovsdb_row *new,
1311                                 const unsigned long int *changed,
1312                                 void *aux_)
1313 {
1314     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
1315     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
1316     struct ovsdb_table *table = new ? new->table : old->table;
1317     enum ovsdb_jsonrpc_monitor_selection type;
1318     struct json *old_json, *new_json;
1319     struct json *row_json;
1320     char uuid[UUID_LEN + 1];
1321     size_t i;
1322
1323     if (!aux->mt || table != aux->mt->table) {
1324         aux->mt = shash_find_data(&m->tables, table->schema->name);
1325         aux->table_json = NULL;
1326         if (!aux->mt) {
1327             /* We don't care about rows in this table at all.  Tell the caller
1328              * to skip it.  */
1329             return false;
1330         }
1331     }
1332
1333     type = (aux->initial ? OJMS_INITIAL
1334             : !old ? OJMS_INSERT
1335             : !new ? OJMS_DELETE
1336             : OJMS_MODIFY);
1337     if (!(aux->mt->select & type)) {
1338         /* We don't care about this type of change (but do want to be called
1339          * back for changes to other rows in the same table). */
1340         return true;
1341     }
1342
1343     if (type == OJMS_MODIFY && !any_reportable_change(aux->mt, changed)) {
1344         /* Nothing of interest changed. */
1345         return true;
1346     }
1347
1348     old_json = new_json = NULL;
1349     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
1350         old_json = json_object_create();
1351     }
1352     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1353         new_json = json_object_create();
1354     }
1355     for (i = 0; i < aux->mt->n_columns; i++) {
1356         const struct ovsdb_jsonrpc_monitor_column *c = &aux->mt->columns[i];
1357         const struct ovsdb_column *column = c->column;
1358         unsigned int idx = c->column->index;
1359
1360         if (!(type & c->select)) {
1361             /* We don't care about this type of change for this particular
1362              * column (but we will care about it for some other column). */
1363             continue;
1364         }
1365
1366         if ((type == OJMS_MODIFY && bitmap_is_set(changed, idx))
1367             || type == OJMS_DELETE) {
1368             json_object_put(old_json, column->name,
1369                             ovsdb_datum_to_json(&old->fields[idx],
1370                                                 &column->type));
1371         }
1372         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1373             json_object_put(new_json, column->name,
1374                             ovsdb_datum_to_json(&new->fields[idx],
1375                                                 &column->type));
1376         }
1377     }
1378
1379     /* Create JSON object for transaction overall. */
1380     if (!aux->json) {
1381         aux->json = json_object_create();
1382     }
1383
1384     /* Create JSON object for transaction on this table. */
1385     if (!aux->table_json) {
1386         aux->table_json = json_object_create();
1387         json_object_put(aux->json, aux->mt->table->schema->name,
1388                         aux->table_json);
1389     }
1390
1391     /* Create JSON object for transaction on this row. */
1392     row_json = json_object_create();
1393     if (old_json) {
1394         json_object_put(row_json, "old", old_json);
1395     }
1396     if (new_json) {
1397         json_object_put(row_json, "new", new_json);
1398     }
1399
1400     /* Add JSON row to JSON table. */
1401     snprintf(uuid, sizeof uuid,
1402              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
1403     json_object_put(aux->table_json, uuid, row_json);
1404
1405     return true;
1406 }
1407
1408 static void
1409 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
1410                                const struct ovsdb_jsonrpc_monitor *m,
1411                                bool initial)
1412 {
1413     aux->initial = initial;
1414     aux->monitor = m;
1415     aux->json = NULL;
1416     aux->mt = NULL;
1417     aux->table_json = NULL;
1418 }
1419
1420 static struct ovsdb_error *
1421 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
1422                              const struct ovsdb_txn *txn,
1423                              bool durable OVS_UNUSED)
1424 {
1425     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1426     struct ovsdb_jsonrpc_monitor_aux aux;
1427
1428     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
1429     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
1430     if (aux.json) {
1431         struct jsonrpc_msg *msg;
1432         struct json *params;
1433
1434         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
1435                                      aux.json);
1436         msg = jsonrpc_create_notify("update", params);
1437         jsonrpc_session_send(aux.monitor->session->js, msg);
1438     }
1439
1440     return NULL;
1441 }
1442
1443 static struct json *
1444 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
1445 {
1446     struct ovsdb_jsonrpc_monitor_aux aux;
1447     struct shash_node *node;
1448
1449     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
1450     SHASH_FOR_EACH (node, &m->tables) {
1451         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1452
1453         if (mt->select & OJMS_INITIAL) {
1454             struct ovsdb_row *row;
1455
1456             HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1457                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, NULL, &aux);
1458             }
1459         }
1460     }
1461     return aux.json ? aux.json : json_object_create();
1462 }
1463
1464 static void
1465 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
1466 {
1467     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1468     struct shash_node *node;
1469
1470     json_destroy(m->monitor_id);
1471     SHASH_FOR_EACH (node, &m->tables) {
1472         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1473         free(mt->columns);
1474         free(mt);
1475     }
1476     shash_destroy(&m->tables);
1477     hmap_remove(&m->session->monitors, &m->node);
1478     free(m);
1479 }
1480
1481 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1482     ovsdb_jsonrpc_monitor_commit,
1483     ovsdb_jsonrpc_monitor_destroy
1484 };