72fdcd7055e3a07c771561135068d9df4681c9be
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <assert.h>
21 #include <errno.h>
22
23 #include "bitmap.h"
24 #include "column.h"
25 #include "dynamic-string.h"
26 #include "json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
30 #include "ovsdb.h"
31 #include "reconnect.h"
32 #include "row.h"
33 #include "server.h"
34 #include "simap.h"
35 #include "stream.h"
36 #include "table.h"
37 #include "timeval.h"
38 #include "transaction.h"
39 #include "trigger.h"
40 #include "vlog.h"
41
42 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
43
44 struct ovsdb_jsonrpc_remote;
45 struct ovsdb_jsonrpc_session;
46
47 /* Message rate-limiting. */
48 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
49
50 /* Sessions. */
51 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
52     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
53 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
54 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
55 static void ovsdb_jsonrpc_session_get_memory_usage_all(
56     const struct ovsdb_jsonrpc_remote *, struct simap *usage);
57 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
58 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
59 static void ovsdb_jsonrpc_session_set_all_options(
60     struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
61 static bool ovsdb_jsonrpc_session_get_status(
62     const struct ovsdb_jsonrpc_remote *,
63     struct ovsdb_jsonrpc_remote_status *);
64 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
65 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
66
67 /* Triggers. */
68 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
69                                          struct ovsdb *,
70                                          struct json *id, struct json *params);
71 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
72     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
73 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
74 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
75 static void ovsdb_jsonrpc_trigger_complete_done(
76     struct ovsdb_jsonrpc_session *);
77
78 /* Monitors. */
79 static struct json *ovsdb_jsonrpc_monitor_create(
80     struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params);
81 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
82     struct ovsdb_jsonrpc_session *,
83     struct json_array *params,
84     const struct json *request_id);
85 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
86 \f
87 /* JSON-RPC database server. */
88
89 struct ovsdb_jsonrpc_server {
90     struct ovsdb_server up;
91     unsigned int n_sessions, max_sessions;
92     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
93 };
94
95 /* A configured remote.  This is either a passive stream listener plus a list
96  * of the currently connected sessions, or a list of exactly one active
97  * session. */
98 struct ovsdb_jsonrpc_remote {
99     struct ovsdb_jsonrpc_server *server;
100     struct pstream *listener;   /* Listener, if passive. */
101     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
102     uint8_t dscp;
103 };
104
105 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
106     struct ovsdb_jsonrpc_server *, const char *name,
107     const struct ovsdb_jsonrpc_options *options
108 );
109 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
110
111 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
112  *
113  * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
114  * which 'server' should provide access. */
115 struct ovsdb_jsonrpc_server *
116 ovsdb_jsonrpc_server_create(void)
117 {
118     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
119     ovsdb_server_init(&server->up);
120     server->max_sessions = 64;
121     shash_init(&server->remotes);
122     return server;
123 }
124
125 /* Adds 'db' to the set of databases served out by 'svr'.  Returns true if
126  * successful, false if 'db''s name is the same as some database already in
127  * 'server'. */
128 bool
129 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *svr, struct ovsdb *db)
130 {
131     return ovsdb_server_add_db(&svr->up, db);
132 }
133
134 void
135 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
136 {
137     struct shash_node *node, *next;
138
139     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
140         ovsdb_jsonrpc_server_del_remote(node);
141     }
142     shash_destroy(&svr->remotes);
143     ovsdb_server_destroy(&svr->up);
144     free(svr);
145 }
146
147 struct ovsdb_jsonrpc_options *
148 ovsdb_jsonrpc_default_options(const char *target)
149 {
150     struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
151     options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
152     options->probe_interval = (stream_or_pstream_needs_probes(target)
153                                ? RECONNECT_DEFAULT_PROBE_INTERVAL
154                                : 0);
155     return options;
156 }
157
158 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
159  * options in the struct ovsdb_jsonrpc_options supplied as the data values.
160  *
161  * A remote is an active or passive stream connection method, e.g. "pssl:" or
162  * "tcp:1.2.3.4". */
163 void
164 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
165                                  const struct shash *new_remotes)
166 {
167     struct shash_node *node, *next;
168
169     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
170         if (!shash_find(new_remotes, node->name)) {
171             VLOG_INFO("%s: remote deconfigured", node->name);
172             ovsdb_jsonrpc_server_del_remote(node);
173         }
174     }
175     SHASH_FOR_EACH (node, new_remotes) {
176         const struct ovsdb_jsonrpc_options *options = node->data;
177         struct ovsdb_jsonrpc_remote *remote;
178
179         remote = shash_find_data(&svr->remotes, node->name);
180         if (!remote) {
181             remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
182             if (!remote) {
183                 continue;
184             }
185         }
186
187         ovsdb_jsonrpc_session_set_all_options(remote, options);
188     }
189 }
190
191 static struct ovsdb_jsonrpc_remote *
192 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
193                                 const char *name,
194                                 const struct ovsdb_jsonrpc_options *options)
195 {
196     struct ovsdb_jsonrpc_remote *remote;
197     struct pstream *listener;
198     int error;
199
200     error = jsonrpc_pstream_open(name, &listener, options->dscp);
201     if (error && error != EAFNOSUPPORT) {
202         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
203         return NULL;
204     }
205
206     remote = xmalloc(sizeof *remote);
207     remote->server = svr;
208     remote->listener = listener;
209     list_init(&remote->sessions);
210     remote->dscp = options->dscp;
211     shash_add(&svr->remotes, name, remote);
212
213     if (!listener) {
214         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
215     }
216     return remote;
217 }
218
219 static void
220 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
221 {
222     struct ovsdb_jsonrpc_remote *remote = node->data;
223
224     ovsdb_jsonrpc_session_close_all(remote);
225     pstream_close(remote->listener);
226     shash_delete(&remote->server->remotes, node);
227     free(remote);
228 }
229
230 /* Stores status information for the remote named 'target', which should have
231  * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
232  * into '*status'.  On success returns true, on failure (if 'svr' doesn't have
233  * a remote named 'target' or if that remote is an inbound remote that has no
234  * active connections) returns false.  On failure, 'status' will be zeroed.
235  */
236 bool
237 ovsdb_jsonrpc_server_get_remote_status(
238     const struct ovsdb_jsonrpc_server *svr, const char *target,
239     struct ovsdb_jsonrpc_remote_status *status)
240 {
241     const struct ovsdb_jsonrpc_remote *remote;
242
243     memset(status, 0, sizeof *status);
244
245     remote = shash_find_data(&svr->remotes, target);
246     return remote && ovsdb_jsonrpc_session_get_status(remote, status);
247 }
248
249 void
250 ovsdb_jsonrpc_server_free_remote_status(
251     struct ovsdb_jsonrpc_remote_status *status)
252 {
253     free(status->locks_held);
254     free(status->locks_waiting);
255     free(status->locks_lost);
256 }
257
258 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
259  * reconnect. */
260 void
261 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
262 {
263     struct shash_node *node;
264
265     SHASH_FOR_EACH (node, &svr->remotes) {
266         struct ovsdb_jsonrpc_remote *remote = node->data;
267
268         ovsdb_jsonrpc_session_reconnect_all(remote);
269     }
270 }
271
272 void
273 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
274 {
275     struct shash_node *node;
276
277     SHASH_FOR_EACH (node, &svr->remotes) {
278         struct ovsdb_jsonrpc_remote *remote = node->data;
279
280         if (remote->listener && svr->n_sessions < svr->max_sessions) {
281             struct stream *stream;
282             int error;
283
284             error = pstream_accept(remote->listener, &stream);
285             if (!error) {
286                 struct jsonrpc_session *js;
287                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
288                                                      remote->dscp);
289                 ovsdb_jsonrpc_session_create(remote, js);
290             } else if (error != EAGAIN) {
291                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
292                              pstream_get_name(remote->listener),
293                              strerror(error));
294             }
295         }
296
297         ovsdb_jsonrpc_session_run_all(remote);
298     }
299 }
300
301 void
302 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
303 {
304     struct shash_node *node;
305
306     SHASH_FOR_EACH (node, &svr->remotes) {
307         struct ovsdb_jsonrpc_remote *remote = node->data;
308
309         if (remote->listener && svr->n_sessions < svr->max_sessions) {
310             pstream_wait(remote->listener);
311         }
312
313         ovsdb_jsonrpc_session_wait_all(remote);
314     }
315 }
316
317 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
318  * memory_report(). */
319 void
320 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
321                                       struct simap *usage)
322 {
323     struct shash_node *node;
324
325     simap_increase(usage, "sessions", svr->n_sessions);
326     SHASH_FOR_EACH (node, &svr->remotes) {
327         struct ovsdb_jsonrpc_remote *remote = node->data;
328
329         ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
330     }
331 }
332 \f
333 /* JSON-RPC database server session. */
334
335 struct ovsdb_jsonrpc_session {
336     struct list node;           /* Element in remote's sessions list. */
337     struct ovsdb_session up;
338     struct ovsdb_jsonrpc_remote *remote;
339
340     /* Triggers. */
341     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
342
343     /* Monitors. */
344     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
345
346     /* Network connectivity. */
347     struct jsonrpc_session *js;  /* JSON-RPC session. */
348     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
349 };
350
351 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
352 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
353 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
354 static void ovsdb_jsonrpc_session_get_memory_usage(
355     const struct ovsdb_jsonrpc_session *, struct simap *usage);
356 static void ovsdb_jsonrpc_session_set_options(
357     struct ovsdb_jsonrpc_session *, const struct ovsdb_jsonrpc_options *);
358 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
359                                              struct jsonrpc_msg *);
360 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
361                                              struct jsonrpc_msg *);
362
363 static struct ovsdb_jsonrpc_session *
364 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
365                              struct jsonrpc_session *js)
366 {
367     struct ovsdb_jsonrpc_session *s;
368
369     s = xzalloc(sizeof *s);
370     ovsdb_session_init(&s->up, &remote->server->up);
371     s->remote = remote;
372     list_push_back(&remote->sessions, &s->node);
373     hmap_init(&s->triggers);
374     hmap_init(&s->monitors);
375     s->js = js;
376     s->js_seqno = jsonrpc_session_get_seqno(js);
377
378     remote->server->n_sessions++;
379
380     return s;
381 }
382
383 static void
384 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
385 {
386     ovsdb_jsonrpc_monitor_remove_all(s);
387     ovsdb_jsonrpc_session_unlock_all(s);
388     jsonrpc_session_close(s->js);
389     list_remove(&s->node);
390     s->remote->server->n_sessions--;
391     ovsdb_session_destroy(&s->up);
392     free(s);
393 }
394
395 static int
396 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
397 {
398     jsonrpc_session_run(s->js);
399     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
400         s->js_seqno = jsonrpc_session_get_seqno(s->js);
401         ovsdb_jsonrpc_trigger_complete_all(s);
402         ovsdb_jsonrpc_monitor_remove_all(s);
403         ovsdb_jsonrpc_session_unlock_all(s);
404     }
405
406     ovsdb_jsonrpc_trigger_complete_done(s);
407
408     if (!jsonrpc_session_get_backlog(s->js)) {
409         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
410         if (msg) {
411             if (msg->type == JSONRPC_REQUEST) {
412                 ovsdb_jsonrpc_session_got_request(s, msg);
413             } else if (msg->type == JSONRPC_NOTIFY) {
414                 ovsdb_jsonrpc_session_got_notify(s, msg);
415             } else {
416                 VLOG_WARN("%s: received unexpected %s message",
417                           jsonrpc_session_get_name(s->js),
418                           jsonrpc_msg_type_to_string(msg->type));
419                 jsonrpc_session_force_reconnect(s->js);
420                 jsonrpc_msg_destroy(msg);
421             }
422         }
423     }
424     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
425 }
426
427 static void
428 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
429                                   const struct ovsdb_jsonrpc_options *options)
430 {
431     jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
432     jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
433     jsonrpc_session_set_dscp(session->js, options->dscp);
434 }
435
436 static void
437 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
438 {
439     struct ovsdb_jsonrpc_session *s, *next;
440
441     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
442         int error = ovsdb_jsonrpc_session_run(s);
443         if (error) {
444             ovsdb_jsonrpc_session_close(s);
445         }
446     }
447 }
448
449 static void
450 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
451 {
452     jsonrpc_session_wait(s->js);
453     if (!jsonrpc_session_get_backlog(s->js)) {
454         jsonrpc_session_recv_wait(s->js);
455     }
456 }
457
458 static void
459 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
460 {
461     struct ovsdb_jsonrpc_session *s;
462
463     LIST_FOR_EACH (s, node, &remote->sessions) {
464         ovsdb_jsonrpc_session_wait(s);
465     }
466 }
467
468 static void
469 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
470                                        struct simap *usage)
471 {
472     simap_increase(usage, "triggers", hmap_count(&s->triggers));
473     simap_increase(usage, "monitors", hmap_count(&s->monitors));
474     simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
475 }
476
477 static void
478 ovsdb_jsonrpc_session_get_memory_usage_all(
479     const struct ovsdb_jsonrpc_remote *remote,
480     struct simap *usage)
481 {
482     struct ovsdb_jsonrpc_session *s;
483
484     LIST_FOR_EACH (s, node, &remote->sessions) {
485         ovsdb_jsonrpc_session_get_memory_usage(s, usage);
486     }
487 }
488
489 static void
490 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
491 {
492     struct ovsdb_jsonrpc_session *s, *next;
493
494     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
495         ovsdb_jsonrpc_session_close(s);
496     }
497 }
498
499 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
500  * reconnect. */
501 static void
502 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
503 {
504     struct ovsdb_jsonrpc_session *s, *next;
505
506     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
507         jsonrpc_session_force_reconnect(s->js);
508         if (!jsonrpc_session_is_alive(s->js)) {
509             ovsdb_jsonrpc_session_close(s);
510         }
511     }
512 }
513
514 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
515  * 'options'. */
516 static void
517 ovsdb_jsonrpc_session_set_all_options(
518     struct ovsdb_jsonrpc_remote *remote,
519     const struct ovsdb_jsonrpc_options *options)
520 {
521     struct ovsdb_jsonrpc_session *s;
522
523     if (remote->listener) {
524         int error;
525
526         error = pstream_set_dscp(remote->listener, options->dscp);
527         if (error) {
528             VLOG_ERR("%s: set_dscp failed %s",
529                      pstream_get_name(remote->listener), strerror(error));
530         } else {
531             remote->dscp = options->dscp;
532         }
533         /*
534          * TODO:XXX race window between setting dscp to listening socket
535          * and accepting socket. Accepted socket may have old dscp value.
536          * Ignore this race window for now.
537          */
538     }
539     LIST_FOR_EACH (s, node, &remote->sessions) {
540         ovsdb_jsonrpc_session_set_options(s, options);
541     }
542 }
543
544 static bool
545 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_remote *remote,
546                                  struct ovsdb_jsonrpc_remote_status *status)
547 {
548     const struct ovsdb_jsonrpc_session *s;
549     const struct jsonrpc_session *js;
550     struct ovsdb_lock_waiter *waiter;
551     struct reconnect_stats rstats;
552     struct ds locks_held, locks_waiting, locks_lost;
553
554     if (list_is_empty(&remote->sessions)) {
555         return false;
556     }
557     s = CONTAINER_OF(remote->sessions.next, struct ovsdb_jsonrpc_session, node);
558     js = s->js;
559
560     status->is_connected = jsonrpc_session_is_connected(js);
561     status->last_error = jsonrpc_session_get_status(js);
562
563     jsonrpc_session_get_reconnect_stats(js, &rstats);
564     status->state = rstats.state;
565     status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
566         ? UINT_MAX : rstats.msec_since_connect / 1000;
567     status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
568         ? UINT_MAX : rstats.msec_since_disconnect / 1000;
569
570     ds_init(&locks_held);
571     ds_init(&locks_waiting);
572     ds_init(&locks_lost);
573     HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
574         struct ds *string;
575
576         string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
577                   : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
578                   : &locks_lost);
579         if (string->length) {
580             ds_put_char(string, ' ');
581         }
582         ds_put_cstr(string, waiter->lock_name);
583     }
584     status->locks_held = ds_steal_cstr(&locks_held);
585     status->locks_waiting = ds_steal_cstr(&locks_waiting);
586     status->locks_lost = ds_steal_cstr(&locks_lost);
587
588     status->n_connections = list_size(&remote->sessions);
589
590     return true;
591 }
592
593 /* Examines 'request' to determine the database to which it relates, and then
594  * searches 's' to find that database:
595  *
596  *    - If successful, returns the database and sets '*replyp' to NULL.
597  *
598  *    - If no such database exists, returns NULL and sets '*replyp' to an
599  *      appropriate JSON-RPC error reply, owned by the caller. */
600 static struct ovsdb *
601 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session *s,
602                         const struct jsonrpc_msg *request,
603                         struct jsonrpc_msg **replyp)
604 {
605     struct json_array *params;
606     struct ovsdb_error *error;
607     const char *db_name;
608     struct ovsdb *db;
609
610     params = json_array(request->params);
611     if (!params->n || params->elems[0]->type != JSON_STRING) {
612         error = ovsdb_syntax_error(
613             request->params, NULL,
614             "%s request params must begin with <db-name>", request->method);
615         goto error;
616     }
617
618     db_name = params->elems[0]->u.string;
619     db = shash_find_data(&s->up.server->dbs, db_name);
620     if (!db) {
621         error = ovsdb_syntax_error(
622             request->params, "unknown database",
623             "%s request specifies unknown database %s",
624             request->method, db_name);
625         goto error;
626     }
627
628     *replyp = NULL;
629     return db;
630
631 error:
632     *replyp = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
633     ovsdb_error_destroy(error);
634     return NULL;
635 }
636
637 static struct ovsdb_error *
638 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
639                                       const char **lock_namep)
640 {
641     const struct json_array *params;
642
643     params = json_array(request->params);
644     if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
645         !ovsdb_parser_is_id(json_string(params->elems[0]))) {
646         *lock_namep = NULL;
647         return ovsdb_syntax_error(request->params, NULL,
648                                   "%s request params must be <id>",
649                                   request->method);
650     }
651
652     *lock_namep = json_string(params->elems[0]);
653     return NULL;
654 }
655
656 static void
657 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
658                              const char *lock_name,
659                              const char *method)
660 {
661     struct ovsdb_jsonrpc_session *s;
662     struct json *params;
663
664     s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
665     params = json_array_create_1(json_string_create(lock_name));
666     jsonrpc_session_send(s->js, jsonrpc_create_notify(method, params));
667 }
668
669 static struct jsonrpc_msg *
670 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
671                            struct jsonrpc_msg *request,
672                            enum ovsdb_lock_mode mode)
673 {
674     struct ovsdb_lock_waiter *waiter;
675     struct jsonrpc_msg *reply;
676     struct ovsdb_error *error;
677     struct ovsdb_session *victim;
678     const char *lock_name;
679     struct json *result;
680
681     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
682     if (error) {
683         goto error;
684     }
685
686     /* Report error if this session has issued a "lock" or "steal" without a
687      * matching "unlock" for this lock. */
688     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
689     if (waiter) {
690         error = ovsdb_syntax_error(
691             request->params, NULL,
692             "must issue \"unlock\" before new \"%s\"", request->method);
693         goto error;
694     }
695
696     /* Get the lock, add us as a waiter. */
697     waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
698                                &victim);
699     if (victim) {
700         ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
701     }
702
703     result = json_object_create();
704     json_object_put(result, "locked",
705                     json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
706
707     return jsonrpc_create_reply(result, request->id);
708
709 error:
710     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
711     ovsdb_error_destroy(error);
712     return reply;
713 }
714
715 static void
716 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
717 {
718     struct ovsdb_lock_waiter *waiter, *next;
719
720     HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
721         ovsdb_jsonrpc_session_unlock__(waiter);
722     }
723 }
724
725 static void
726 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
727 {
728     struct ovsdb_lock *lock = waiter->lock;
729
730     if (lock) {
731         struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
732         if (new_owner) {
733             ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
734         } else {
735             /* ovsdb_server_lock() might have freed 'lock'. */
736         }
737     }
738
739     ovsdb_lock_waiter_destroy(waiter);
740 }
741
742 static struct jsonrpc_msg *
743 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
744                              struct jsonrpc_msg *request)
745 {
746     struct ovsdb_lock_waiter *waiter;
747     struct jsonrpc_msg *reply;
748     struct ovsdb_error *error;
749     const char *lock_name;
750
751     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
752     if (error) {
753         goto error;
754     }
755
756     /* Report error if this session has not issued a "lock" or "steal" for this
757      * lock. */
758     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
759     if (!waiter) {
760         error = ovsdb_syntax_error(
761             request->params, NULL, "\"unlock\" without \"lock\" or \"steal\"");
762         goto error;
763     }
764
765     ovsdb_jsonrpc_session_unlock__(waiter);
766
767     return jsonrpc_create_reply(json_object_create(), request->id);
768
769 error:
770     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
771     ovsdb_error_destroy(error);
772     return reply;
773 }
774
775 static struct jsonrpc_msg *
776 execute_transaction(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
777                     struct jsonrpc_msg *request)
778 {
779     ovsdb_jsonrpc_trigger_create(s, db, request->id, request->params);
780     request->id = NULL;
781     request->params = NULL;
782     jsonrpc_msg_destroy(request);
783     return NULL;
784 }
785
786 static void
787 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
788                                   struct jsonrpc_msg *request)
789 {
790     struct jsonrpc_msg *reply;
791
792     if (!strcmp(request->method, "transact")) {
793         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
794         if (!reply) {
795             reply = execute_transaction(s, db, request);
796         }
797     } else if (!strcmp(request->method, "monitor")) {
798         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
799         if (!reply) {
800             reply = jsonrpc_create_reply(
801                 ovsdb_jsonrpc_monitor_create(s, db, request->params),
802                 request->id);
803         }
804     } else if (!strcmp(request->method, "monitor_cancel")) {
805         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
806                                              request->id);
807     } else if (!strcmp(request->method, "get_schema")) {
808         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
809         if (!reply) {
810             reply = jsonrpc_create_reply(ovsdb_schema_to_json(db->schema),
811                                          request->id);
812         }
813     } else if (!strcmp(request->method, "list_dbs")) {
814         size_t n_dbs = shash_count(&s->up.server->dbs);
815         struct shash_node *node;
816         struct json **dbs;
817         size_t i;
818
819         dbs = xmalloc(n_dbs * sizeof *dbs);
820         i = 0;
821         SHASH_FOR_EACH (node, &s->up.server->dbs) {
822             dbs[i++] = json_string_create(node->name);
823         }
824         reply = jsonrpc_create_reply(json_array_create(dbs, n_dbs),
825                                      request->id);
826     } else if (!strcmp(request->method, "lock")) {
827         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
828     } else if (!strcmp(request->method, "steal")) {
829         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
830     } else if (!strcmp(request->method, "unlock")) {
831         reply = ovsdb_jsonrpc_session_unlock(s, request);
832     } else if (!strcmp(request->method, "echo")) {
833         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
834     } else {
835         reply = jsonrpc_create_error(json_string_create("unknown method"),
836                                      request->id);
837     }
838
839     if (reply) {
840         jsonrpc_msg_destroy(request);
841         jsonrpc_session_send(s->js, reply);
842     }
843 }
844
845 static void
846 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
847 {
848     if (json_array(request->params)->n == 1) {
849         struct ovsdb_jsonrpc_trigger *t;
850         struct json *id;
851
852         id = request->params->u.array.elems[0];
853         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
854         if (t) {
855             ovsdb_jsonrpc_trigger_complete(t);
856         }
857     }
858 }
859
860 static void
861 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
862                                  struct jsonrpc_msg *request)
863 {
864     if (!strcmp(request->method, "cancel")) {
865         execute_cancel(s, request);
866     }
867     jsonrpc_msg_destroy(request);
868 }
869 \f
870 /* JSON-RPC database server triggers.
871  *
872  * (Every transaction is treated as a trigger even if it doesn't actually have
873  * any "wait" operations.) */
874
875 struct ovsdb_jsonrpc_trigger {
876     struct ovsdb_trigger trigger;
877     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
878     struct json *id;
879 };
880
881 static void
882 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
883                              struct json *id, struct json *params)
884 {
885     struct ovsdb_jsonrpc_trigger *t;
886     size_t hash;
887
888     /* Check for duplicate ID. */
889     hash = json_hash(id, 0);
890     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
891     if (t) {
892         struct jsonrpc_msg *msg;
893
894         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
895                                    id);
896         jsonrpc_session_send(s->js, msg);
897         json_destroy(id);
898         json_destroy(params);
899         return;
900     }
901
902     /* Insert into trigger table. */
903     t = xmalloc(sizeof *t);
904     ovsdb_trigger_init(&s->up, db, &t->trigger, params, time_msec());
905     t->id = id;
906     hmap_insert(&s->triggers, &t->hmap_node, hash);
907
908     /* Complete early if possible. */
909     if (ovsdb_trigger_is_complete(&t->trigger)) {
910         ovsdb_jsonrpc_trigger_complete(t);
911     }
912 }
913
914 static struct ovsdb_jsonrpc_trigger *
915 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
916                            const struct json *id, size_t hash)
917 {
918     struct ovsdb_jsonrpc_trigger *t;
919
920     HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
921         if (json_equal(t->id, id)) {
922             return t;
923         }
924     }
925
926     return NULL;
927 }
928
929 static void
930 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
931 {
932     struct ovsdb_jsonrpc_session *s;
933
934     s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
935
936     if (jsonrpc_session_is_connected(s->js)) {
937         struct jsonrpc_msg *reply;
938         struct json *result;
939
940         result = ovsdb_trigger_steal_result(&t->trigger);
941         if (result) {
942             reply = jsonrpc_create_reply(result, t->id);
943         } else {
944             reply = jsonrpc_create_error(json_string_create("canceled"),
945                                          t->id);
946         }
947         jsonrpc_session_send(s->js, reply);
948     }
949
950     json_destroy(t->id);
951     ovsdb_trigger_destroy(&t->trigger);
952     hmap_remove(&s->triggers, &t->hmap_node);
953     free(t);
954 }
955
956 static void
957 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
958 {
959     struct ovsdb_jsonrpc_trigger *t, *next;
960     HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
961         ovsdb_jsonrpc_trigger_complete(t);
962     }
963 }
964
965 static void
966 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
967 {
968     while (!list_is_empty(&s->up.completions)) {
969         struct ovsdb_jsonrpc_trigger *t
970             = CONTAINER_OF(s->up.completions.next,
971                            struct ovsdb_jsonrpc_trigger, trigger.node);
972         ovsdb_jsonrpc_trigger_complete(t);
973     }
974 }
975 \f
976 /* JSON-RPC database table monitors. */
977
978 enum ovsdb_jsonrpc_monitor_selection {
979     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
980     OJMS_INSERT = 1 << 1,       /* New rows. */
981     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
982     OJMS_MODIFY = 1 << 3        /* Modified rows. */
983 };
984
985 /* A particular column being monitored. */
986 struct ovsdb_jsonrpc_monitor_column {
987     const struct ovsdb_column *column;
988     enum ovsdb_jsonrpc_monitor_selection select;
989 };
990
991 /* A particular table being monitored. */
992 struct ovsdb_jsonrpc_monitor_table {
993     const struct ovsdb_table *table;
994
995     /* This is the union (bitwise-OR) of the 'select' values in all of the
996      * members of 'columns' below. */
997     enum ovsdb_jsonrpc_monitor_selection select;
998
999     /* Columns being monitored. */
1000     struct ovsdb_jsonrpc_monitor_column *columns;
1001     size_t n_columns;
1002 };
1003
1004 /* A collection of tables being monitored. */
1005 struct ovsdb_jsonrpc_monitor {
1006     struct ovsdb_replica replica;
1007     struct ovsdb_jsonrpc_session *session;
1008     struct ovsdb *db;
1009     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
1010
1011     struct json *monitor_id;
1012     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
1013 };
1014
1015 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
1016
1017 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
1018     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
1019 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
1020 static struct json *ovsdb_jsonrpc_monitor_get_initial(
1021     const struct ovsdb_jsonrpc_monitor *);
1022
1023 static bool
1024 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
1025 {
1026     const struct json *json;
1027
1028     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
1029     return json ? json_boolean(json) : default_value;
1030 }
1031
1032 struct ovsdb_jsonrpc_monitor *
1033 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
1034                            const struct json *monitor_id)
1035 {
1036     struct ovsdb_jsonrpc_monitor *m;
1037
1038     HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
1039         if (json_equal(m->monitor_id, monitor_id)) {
1040             return m;
1041         }
1042     }
1043
1044     return NULL;
1045 }
1046
1047 static void
1048 ovsdb_jsonrpc_add_monitor_column(struct ovsdb_jsonrpc_monitor_table *mt,
1049                                  const struct ovsdb_column *column,
1050                                  enum ovsdb_jsonrpc_monitor_selection select,
1051                                  size_t *allocated_columns)
1052 {
1053     struct ovsdb_jsonrpc_monitor_column *c;
1054
1055     if (mt->n_columns >= *allocated_columns) {
1056         mt->columns = x2nrealloc(mt->columns, allocated_columns,
1057                                  sizeof *mt->columns);
1058     }
1059
1060     c = &mt->columns[mt->n_columns++];
1061     c->column = column;
1062     c->select = select;
1063 }
1064
1065 static int
1066 compare_ovsdb_jsonrpc_monitor_column(const void *a_, const void *b_)
1067 {
1068     const struct ovsdb_jsonrpc_monitor_column *a = a_;
1069     const struct ovsdb_jsonrpc_monitor_column *b = b_;
1070
1071     return a->column < b->column ? -1 : a->column > b->column;
1072 }
1073
1074 static struct ovsdb_error * WARN_UNUSED_RESULT
1075 ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
1076                                     const struct json *monitor_request,
1077                                     size_t *allocated_columns)
1078 {
1079     const struct ovsdb_table_schema *ts = mt->table->schema;
1080     enum ovsdb_jsonrpc_monitor_selection select;
1081     const struct json *columns, *select_json;
1082     struct ovsdb_parser parser;
1083     struct ovsdb_error *error;
1084
1085     ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
1086     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1087     select_json = ovsdb_parser_member(&parser, "select",
1088                                       OP_OBJECT | OP_OPTIONAL);
1089     error = ovsdb_parser_finish(&parser);
1090     if (error) {
1091         return error;
1092     }
1093
1094     if (select_json) {
1095         select = 0;
1096         ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1097         if (parse_bool(&parser, "initial", true)) {
1098             select |= OJMS_INITIAL;
1099         }
1100         if (parse_bool(&parser, "insert", true)) {
1101             select |= OJMS_INSERT;
1102         }
1103         if (parse_bool(&parser, "delete", true)) {
1104             select |= OJMS_DELETE;
1105         }
1106         if (parse_bool(&parser, "modify", true)) {
1107             select |= OJMS_MODIFY;
1108         }
1109         error = ovsdb_parser_finish(&parser);
1110         if (error) {
1111             return error;
1112         }
1113     } else {
1114         select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1115     }
1116     mt->select |= select;
1117
1118     if (columns) {
1119         size_t i;
1120
1121         if (columns->type != JSON_ARRAY) {
1122             return ovsdb_syntax_error(columns, NULL,
1123                                       "array of column names expected");
1124         }
1125
1126         for (i = 0; i < columns->u.array.n; i++) {
1127             const struct ovsdb_column *column;
1128             const char *s;
1129
1130             if (columns->u.array.elems[i]->type != JSON_STRING) {
1131                 return ovsdb_syntax_error(columns, NULL,
1132                                           "array of column names expected");
1133             }
1134
1135             s = columns->u.array.elems[i]->u.string;
1136             column = shash_find_data(&mt->table->schema->columns, s);
1137             if (!column) {
1138                 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1139                                           "column name", s);
1140             }
1141             ovsdb_jsonrpc_add_monitor_column(mt, column, select,
1142                                              allocated_columns);
1143         }
1144     } else {
1145         struct shash_node *node;
1146
1147         SHASH_FOR_EACH (node, &ts->columns) {
1148             const struct ovsdb_column *column = node->data;
1149             if (column->index != OVSDB_COL_UUID) {
1150                 ovsdb_jsonrpc_add_monitor_column(mt, column, select,
1151                                                  allocated_columns);
1152             }
1153         }
1154     }
1155
1156     return NULL;
1157 }
1158
1159 static struct json *
1160 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1161                              struct json *params)
1162 {
1163     struct ovsdb_jsonrpc_monitor *m = NULL;
1164     struct json *monitor_id, *monitor_requests;
1165     struct ovsdb_error *error = NULL;
1166     struct shash_node *node;
1167     struct json *json;
1168
1169     if (json_array(params)->n != 3) {
1170         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1171         goto error;
1172     }
1173     monitor_id = params->u.array.elems[1];
1174     monitor_requests = params->u.array.elems[2];
1175     if (monitor_requests->type != JSON_OBJECT) {
1176         error = ovsdb_syntax_error(monitor_requests, NULL,
1177                                    "monitor-requests must be object");
1178         goto error;
1179     }
1180
1181     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1182         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1183         goto error;
1184     }
1185
1186     m = xzalloc(sizeof *m);
1187     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
1188     ovsdb_add_replica(db, &m->replica);
1189     m->session = s;
1190     m->db = db;
1191     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1192     m->monitor_id = json_clone(monitor_id);
1193     shash_init(&m->tables);
1194
1195     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1196         const struct ovsdb_table *table;
1197         struct ovsdb_jsonrpc_monitor_table *mt;
1198         size_t allocated_columns;
1199         const struct json *mr_value;
1200         size_t i;
1201
1202         table = ovsdb_get_table(m->db, node->name);
1203         if (!table) {
1204             error = ovsdb_syntax_error(NULL, NULL,
1205                                        "no table named %s", node->name);
1206             goto error;
1207         }
1208
1209         mt = xzalloc(sizeof *mt);
1210         mt->table = table;
1211         shash_add(&m->tables, table->schema->name, mt);
1212
1213         /* Parse columns. */
1214         mr_value = node->data;
1215         allocated_columns = 0;
1216         if (mr_value->type == JSON_ARRAY) {
1217             const struct json_array *array = &mr_value->u.array;
1218
1219             for (i = 0; i < array->n; i++) {
1220                 error = ovsdb_jsonrpc_parse_monitor_request(
1221                     mt, array->elems[i], &allocated_columns);
1222                 if (error) {
1223                     goto error;
1224                 }
1225             }
1226         } else {
1227             error = ovsdb_jsonrpc_parse_monitor_request(
1228                 mt, mr_value, &allocated_columns);
1229             if (error) {
1230                 goto error;
1231             }
1232         }
1233
1234         /* Check for duplicate columns. */
1235         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
1236               compare_ovsdb_jsonrpc_monitor_column);
1237         for (i = 1; i < mt->n_columns; i++) {
1238             if (mt->columns[i].column == mt->columns[i - 1].column) {
1239                 error = ovsdb_syntax_error(mr_value, NULL, "column %s "
1240                                            "mentioned more than once",
1241                                            mt->columns[i].column->name);
1242                 goto error;
1243             }
1244         }
1245     }
1246
1247     return ovsdb_jsonrpc_monitor_get_initial(m);
1248
1249 error:
1250     if (m) {
1251         ovsdb_remove_replica(m->db, &m->replica);
1252     }
1253
1254     json = ovsdb_error_to_json(error);
1255     ovsdb_error_destroy(error);
1256     return json;
1257 }
1258
1259 static struct jsonrpc_msg *
1260 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1261                              struct json_array *params,
1262                              const struct json *request_id)
1263 {
1264     if (params->n != 1) {
1265         return jsonrpc_create_error(json_string_create("invalid parameters"),
1266                                     request_id);
1267     } else {
1268         struct ovsdb_jsonrpc_monitor *m;
1269
1270         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1271         if (!m) {
1272             return jsonrpc_create_error(json_string_create("unknown monitor"),
1273                                         request_id);
1274         } else {
1275             ovsdb_remove_replica(m->db, &m->replica);
1276             return jsonrpc_create_reply(json_object_create(), request_id);
1277         }
1278     }
1279 }
1280
1281 static void
1282 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1283 {
1284     struct ovsdb_jsonrpc_monitor *m, *next;
1285
1286     HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1287         ovsdb_remove_replica(m->db, &m->replica);
1288     }
1289 }
1290
1291 static struct ovsdb_jsonrpc_monitor *
1292 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
1293 {
1294     assert(replica->class == &ovsdb_jsonrpc_replica_class);
1295     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
1296 }
1297
1298 struct ovsdb_jsonrpc_monitor_aux {
1299     bool initial;               /* Sending initial contents of table? */
1300     const struct ovsdb_jsonrpc_monitor *monitor;
1301     struct json *json;          /* JSON for the whole transaction. */
1302
1303     /* Current table.  */
1304     struct ovsdb_jsonrpc_monitor_table *mt;
1305     struct json *table_json;    /* JSON for table's transaction. */
1306 };
1307
1308 static bool
1309 any_reportable_change(const struct ovsdb_jsonrpc_monitor_table *mt,
1310                       const unsigned long int *changed)
1311 {
1312     size_t i;
1313
1314     for (i = 0; i < mt->n_columns; i++) {
1315         const struct ovsdb_jsonrpc_monitor_column *c = &mt->columns[i];
1316         unsigned int idx = c->column->index;
1317
1318         if (c->select & OJMS_MODIFY && bitmap_is_set(changed, idx)) {
1319             return true;
1320         }
1321     }
1322
1323     return false;
1324 }
1325
1326 static bool
1327 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
1328                                 const struct ovsdb_row *new,
1329                                 const unsigned long int *changed,
1330                                 void *aux_)
1331 {
1332     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
1333     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
1334     struct ovsdb_table *table = new ? new->table : old->table;
1335     enum ovsdb_jsonrpc_monitor_selection type;
1336     struct json *old_json, *new_json;
1337     struct json *row_json;
1338     char uuid[UUID_LEN + 1];
1339     size_t i;
1340
1341     if (!aux->mt || table != aux->mt->table) {
1342         aux->mt = shash_find_data(&m->tables, table->schema->name);
1343         aux->table_json = NULL;
1344         if (!aux->mt) {
1345             /* We don't care about rows in this table at all.  Tell the caller
1346              * to skip it.  */
1347             return false;
1348         }
1349     }
1350
1351     type = (aux->initial ? OJMS_INITIAL
1352             : !old ? OJMS_INSERT
1353             : !new ? OJMS_DELETE
1354             : OJMS_MODIFY);
1355     if (!(aux->mt->select & type)) {
1356         /* We don't care about this type of change (but do want to be called
1357          * back for changes to other rows in the same table). */
1358         return true;
1359     }
1360
1361     if (type == OJMS_MODIFY && !any_reportable_change(aux->mt, changed)) {
1362         /* Nothing of interest changed. */
1363         return true;
1364     }
1365
1366     old_json = new_json = NULL;
1367     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
1368         old_json = json_object_create();
1369     }
1370     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1371         new_json = json_object_create();
1372     }
1373     for (i = 0; i < aux->mt->n_columns; i++) {
1374         const struct ovsdb_jsonrpc_monitor_column *c = &aux->mt->columns[i];
1375         const struct ovsdb_column *column = c->column;
1376         unsigned int idx = c->column->index;
1377
1378         if (!(type & c->select)) {
1379             /* We don't care about this type of change for this particular
1380              * column (but we will care about it for some other column). */
1381             continue;
1382         }
1383
1384         if ((type == OJMS_MODIFY && bitmap_is_set(changed, idx))
1385             || type == OJMS_DELETE) {
1386             json_object_put(old_json, column->name,
1387                             ovsdb_datum_to_json(&old->fields[idx],
1388                                                 &column->type));
1389         }
1390         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1391             json_object_put(new_json, column->name,
1392                             ovsdb_datum_to_json(&new->fields[idx],
1393                                                 &column->type));
1394         }
1395     }
1396
1397     /* Create JSON object for transaction overall. */
1398     if (!aux->json) {
1399         aux->json = json_object_create();
1400     }
1401
1402     /* Create JSON object for transaction on this table. */
1403     if (!aux->table_json) {
1404         aux->table_json = json_object_create();
1405         json_object_put(aux->json, aux->mt->table->schema->name,
1406                         aux->table_json);
1407     }
1408
1409     /* Create JSON object for transaction on this row. */
1410     row_json = json_object_create();
1411     if (old_json) {
1412         json_object_put(row_json, "old", old_json);
1413     }
1414     if (new_json) {
1415         json_object_put(row_json, "new", new_json);
1416     }
1417
1418     /* Add JSON row to JSON table. */
1419     snprintf(uuid, sizeof uuid,
1420              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
1421     json_object_put(aux->table_json, uuid, row_json);
1422
1423     return true;
1424 }
1425
1426 static void
1427 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
1428                                const struct ovsdb_jsonrpc_monitor *m,
1429                                bool initial)
1430 {
1431     aux->initial = initial;
1432     aux->monitor = m;
1433     aux->json = NULL;
1434     aux->mt = NULL;
1435     aux->table_json = NULL;
1436 }
1437
1438 static struct ovsdb_error *
1439 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
1440                              const struct ovsdb_txn *txn,
1441                              bool durable OVS_UNUSED)
1442 {
1443     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1444     struct ovsdb_jsonrpc_monitor_aux aux;
1445
1446     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
1447     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
1448     if (aux.json) {
1449         struct jsonrpc_msg *msg;
1450         struct json *params;
1451
1452         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
1453                                      aux.json);
1454         msg = jsonrpc_create_notify("update", params);
1455         jsonrpc_session_send(aux.monitor->session->js, msg);
1456     }
1457
1458     return NULL;
1459 }
1460
1461 static struct json *
1462 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
1463 {
1464     struct ovsdb_jsonrpc_monitor_aux aux;
1465     struct shash_node *node;
1466
1467     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
1468     SHASH_FOR_EACH (node, &m->tables) {
1469         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1470
1471         if (mt->select & OJMS_INITIAL) {
1472             struct ovsdb_row *row;
1473
1474             HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1475                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, NULL, &aux);
1476             }
1477         }
1478     }
1479     return aux.json ? aux.json : json_object_create();
1480 }
1481
1482 static void
1483 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
1484 {
1485     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1486     struct shash_node *node;
1487
1488     json_destroy(m->monitor_id);
1489     SHASH_FOR_EACH (node, &m->tables) {
1490         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1491         free(mt->columns);
1492         free(mt);
1493     }
1494     shash_destroy(&m->tables);
1495     hmap_remove(&m->session->monitors, &m->node);
1496     free(m);
1497 }
1498
1499 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1500     ovsdb_jsonrpc_monitor_commit,
1501     ovsdb_jsonrpc_monitor_destroy
1502 };