55148971aa997ae646980e5a8a7fcff9005caf2b
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "bitmap.h"
23 #include "column.h"
24 #include "dynamic-string.h"
25 #include "json.h"
26 #include "jsonrpc.h"
27 #include "ovsdb-error.h"
28 #include "ovsdb-parser.h"
29 #include "ovsdb.h"
30 #include "poll-loop.h"
31 #include "reconnect.h"
32 #include "row.h"
33 #include "server.h"
34 #include "simap.h"
35 #include "stream.h"
36 #include "table.h"
37 #include "timeval.h"
38 #include "transaction.h"
39 #include "trigger.h"
40 #include "monitor.h"
41 #include "openvswitch/vlog.h"
42
43 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
44
45 struct ovsdb_jsonrpc_remote;
46 struct ovsdb_jsonrpc_session;
47
48 /* Message rate-limiting. */
49 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
50
51 /* Sessions. */
52 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
53     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
54 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
55 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
56 static void ovsdb_jsonrpc_session_get_memory_usage_all(
57     const struct ovsdb_jsonrpc_remote *, struct simap *usage);
58 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
59 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
60 static void ovsdb_jsonrpc_session_set_all_options(
61     struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
62 static bool ovsdb_jsonrpc_session_get_status(
63     const struct ovsdb_jsonrpc_remote *,
64     struct ovsdb_jsonrpc_remote_status *);
65 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
66 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
67 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
68                                        struct jsonrpc_msg *);
69
70 /* Triggers. */
71 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
72                                          struct ovsdb *,
73                                          struct json *id, struct json *params);
74 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
75     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
76 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
77 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
78 static void ovsdb_jsonrpc_trigger_complete_done(
79     struct ovsdb_jsonrpc_session *);
80
81 /* Monitors. */
82 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
83     struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
84     const struct json *request_id);
85 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
86     struct ovsdb_jsonrpc_session *,
87     struct json_array *params,
88     const struct json *request_id);
89 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
90 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
91 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
92 \f
93 /* JSON-RPC database server. */
94
95 struct ovsdb_jsonrpc_server {
96     struct ovsdb_server up;
97     unsigned int n_sessions, max_sessions;
98     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
99 };
100
101 /* A configured remote.  This is either a passive stream listener plus a list
102  * of the currently connected sessions, or a list of exactly one active
103  * session. */
104 struct ovsdb_jsonrpc_remote {
105     struct ovsdb_jsonrpc_server *server;
106     struct pstream *listener;   /* Listener, if passive. */
107     struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
108     uint8_t dscp;
109 };
110
111 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
112     struct ovsdb_jsonrpc_server *, const char *name,
113     const struct ovsdb_jsonrpc_options *options
114 );
115 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
116
117 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
118  *
119  * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
120  * which 'server' should provide access. */
121 struct ovsdb_jsonrpc_server *
122 ovsdb_jsonrpc_server_create(void)
123 {
124     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
125     ovsdb_server_init(&server->up);
126     server->max_sessions = 330;   /* Random limit. */
127     shash_init(&server->remotes);
128     return server;
129 }
130
131 /* Adds 'db' to the set of databases served out by 'svr'.  Returns true if
132  * successful, false if 'db''s name is the same as some database already in
133  * 'server'. */
134 bool
135 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *svr, struct ovsdb *db)
136 {
137     /* The OVSDB protocol doesn't have a way to notify a client that a
138      * database has been added.  If some client tried to use the database
139      * that we're adding and failed, then forcing it to reconnect seems like
140      * a reasonable way to make it try again.
141      *
142      * If this is too big of a hammer in practice, we could be more selective,
143      * e.g. disconnect only connections that actually tried to use a database
144      * with 'db''s name. */
145     ovsdb_jsonrpc_server_reconnect(svr);
146
147     return ovsdb_server_add_db(&svr->up, db);
148 }
149
150 /* Removes 'db' from the set of databases served out by 'svr'.  Returns
151  * true if successful, false if there is no database associated with 'db'. */
152 bool
153 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server *svr,
154                                struct ovsdb *db)
155 {
156     /* There might be pointers to 'db' from 'svr', such as monitors or
157      * outstanding transactions.  Disconnect all JSON-RPC connections to avoid
158      * accesses to freed memory.
159      *
160      * If this is too big of a hammer in practice, we could be more selective,
161      * e.g. disconnect only connections that actually reference 'db'. */
162     ovsdb_jsonrpc_server_reconnect(svr);
163
164     return ovsdb_server_remove_db(&svr->up, db);
165 }
166
167 void
168 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
169 {
170     struct shash_node *node, *next;
171
172     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
173         ovsdb_jsonrpc_server_del_remote(node);
174     }
175     shash_destroy(&svr->remotes);
176     ovsdb_server_destroy(&svr->up);
177     free(svr);
178 }
179
180 struct ovsdb_jsonrpc_options *
181 ovsdb_jsonrpc_default_options(const char *target)
182 {
183     struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
184     options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
185     options->probe_interval = (stream_or_pstream_needs_probes(target)
186                                ? RECONNECT_DEFAULT_PROBE_INTERVAL
187                                : 0);
188     return options;
189 }
190
191 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
192  * options in the struct ovsdb_jsonrpc_options supplied as the data values.
193  *
194  * A remote is an active or passive stream connection method, e.g. "pssl:" or
195  * "tcp:1.2.3.4". */
196 void
197 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
198                                  const struct shash *new_remotes)
199 {
200     struct shash_node *node, *next;
201
202     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
203         struct ovsdb_jsonrpc_remote *remote = node->data;
204         struct ovsdb_jsonrpc_options *options
205             = shash_find_data(new_remotes, node->name);
206
207         if (!options) {
208             VLOG_INFO("%s: remote deconfigured", node->name);
209             ovsdb_jsonrpc_server_del_remote(node);
210         } else if (options->dscp != remote->dscp) {
211             ovsdb_jsonrpc_server_del_remote(node);
212          }
213     }
214     SHASH_FOR_EACH (node, new_remotes) {
215         const struct ovsdb_jsonrpc_options *options = node->data;
216         struct ovsdb_jsonrpc_remote *remote;
217
218         remote = shash_find_data(&svr->remotes, node->name);
219         if (!remote) {
220             remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
221             if (!remote) {
222                 continue;
223             }
224         }
225
226         ovsdb_jsonrpc_session_set_all_options(remote, options);
227     }
228 }
229
230 static struct ovsdb_jsonrpc_remote *
231 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
232                                 const char *name,
233                                 const struct ovsdb_jsonrpc_options *options)
234 {
235     struct ovsdb_jsonrpc_remote *remote;
236     struct pstream *listener;
237     int error;
238
239     error = jsonrpc_pstream_open(name, &listener, options->dscp);
240     if (error && error != EAFNOSUPPORT) {
241         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, ovs_strerror(error));
242         return NULL;
243     }
244
245     remote = xmalloc(sizeof *remote);
246     remote->server = svr;
247     remote->listener = listener;
248     list_init(&remote->sessions);
249     remote->dscp = options->dscp;
250     shash_add(&svr->remotes, name, remote);
251
252     if (!listener) {
253         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name, true));
254     }
255     return remote;
256 }
257
258 static void
259 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
260 {
261     struct ovsdb_jsonrpc_remote *remote = node->data;
262
263     ovsdb_jsonrpc_session_close_all(remote);
264     pstream_close(remote->listener);
265     shash_delete(&remote->server->remotes, node);
266     free(remote);
267 }
268
269 /* Stores status information for the remote named 'target', which should have
270  * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
271  * into '*status'.  On success returns true, on failure (if 'svr' doesn't have
272  * a remote named 'target' or if that remote is an inbound remote that has no
273  * active connections) returns false.  On failure, 'status' will be zeroed.
274  */
275 bool
276 ovsdb_jsonrpc_server_get_remote_status(
277     const struct ovsdb_jsonrpc_server *svr, const char *target,
278     struct ovsdb_jsonrpc_remote_status *status)
279 {
280     const struct ovsdb_jsonrpc_remote *remote;
281
282     memset(status, 0, sizeof *status);
283
284     remote = shash_find_data(&svr->remotes, target);
285     return remote && ovsdb_jsonrpc_session_get_status(remote, status);
286 }
287
288 void
289 ovsdb_jsonrpc_server_free_remote_status(
290     struct ovsdb_jsonrpc_remote_status *status)
291 {
292     free(status->locks_held);
293     free(status->locks_waiting);
294     free(status->locks_lost);
295 }
296
297 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
298  * reconnect. */
299 void
300 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
301 {
302     struct shash_node *node;
303
304     SHASH_FOR_EACH (node, &svr->remotes) {
305         struct ovsdb_jsonrpc_remote *remote = node->data;
306
307         ovsdb_jsonrpc_session_reconnect_all(remote);
308     }
309 }
310
311 void
312 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
313 {
314     struct shash_node *node;
315
316     SHASH_FOR_EACH (node, &svr->remotes) {
317         struct ovsdb_jsonrpc_remote *remote = node->data;
318
319         if (remote->listener) {
320             if (svr->n_sessions < svr->max_sessions) {
321                 struct stream *stream;
322                 int error;
323
324                 error = pstream_accept(remote->listener, &stream);
325                 if (!error) {
326                     struct jsonrpc_session *js;
327                     js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
328                                                          remote->dscp);
329                     ovsdb_jsonrpc_session_create(remote, js);
330                 } else if (error != EAGAIN) {
331                     VLOG_WARN_RL(&rl, "%s: accept failed: %s",
332                                  pstream_get_name(remote->listener),
333                                  ovs_strerror(error));
334                 }
335             } else {
336                 VLOG_WARN_RL(&rl, "%s: connection exceeded maximum (%d)",
337                              pstream_get_name(remote->listener),
338                              svr->max_sessions);
339             }
340         }
341
342         ovsdb_jsonrpc_session_run_all(remote);
343     }
344 }
345
346 void
347 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
348 {
349     struct shash_node *node;
350
351     SHASH_FOR_EACH (node, &svr->remotes) {
352         struct ovsdb_jsonrpc_remote *remote = node->data;
353
354         if (remote->listener && svr->n_sessions < svr->max_sessions) {
355             pstream_wait(remote->listener);
356         }
357
358         ovsdb_jsonrpc_session_wait_all(remote);
359     }
360 }
361
362 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
363  * memory_report(). */
364 void
365 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
366                                       struct simap *usage)
367 {
368     struct shash_node *node;
369
370     simap_increase(usage, "sessions", svr->n_sessions);
371     SHASH_FOR_EACH (node, &svr->remotes) {
372         struct ovsdb_jsonrpc_remote *remote = node->data;
373
374         ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
375     }
376 }
377 \f
378 /* JSON-RPC database server session. */
379
380 struct ovsdb_jsonrpc_session {
381     struct ovs_list node;       /* Element in remote's sessions list. */
382     struct ovsdb_session up;
383     struct ovsdb_jsonrpc_remote *remote;
384
385     /* Triggers. */
386     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
387
388     /* Monitors. */
389     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
390
391     /* Network connectivity. */
392     struct jsonrpc_session *js;  /* JSON-RPC session. */
393     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
394 };
395
396 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
397 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
398 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
399 static void ovsdb_jsonrpc_session_get_memory_usage(
400     const struct ovsdb_jsonrpc_session *, struct simap *usage);
401 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
402                                              struct jsonrpc_msg *);
403 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
404                                              struct jsonrpc_msg *);
405
406 static struct ovsdb_jsonrpc_session *
407 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
408                              struct jsonrpc_session *js)
409 {
410     struct ovsdb_jsonrpc_session *s;
411
412     s = xzalloc(sizeof *s);
413     ovsdb_session_init(&s->up, &remote->server->up);
414     s->remote = remote;
415     list_push_back(&remote->sessions, &s->node);
416     hmap_init(&s->triggers);
417     hmap_init(&s->monitors);
418     s->js = js;
419     s->js_seqno = jsonrpc_session_get_seqno(js);
420
421     remote->server->n_sessions++;
422
423     return s;
424 }
425
426 static void
427 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
428 {
429     ovsdb_jsonrpc_monitor_remove_all(s);
430     ovsdb_jsonrpc_session_unlock_all(s);
431     ovsdb_jsonrpc_trigger_complete_all(s);
432
433     hmap_destroy(&s->monitors);
434     hmap_destroy(&s->triggers);
435
436     jsonrpc_session_close(s->js);
437     list_remove(&s->node);
438     s->remote->server->n_sessions--;
439     ovsdb_session_destroy(&s->up);
440     free(s);
441 }
442
443 static int
444 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
445 {
446     jsonrpc_session_run(s->js);
447     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
448         s->js_seqno = jsonrpc_session_get_seqno(s->js);
449         ovsdb_jsonrpc_trigger_complete_all(s);
450         ovsdb_jsonrpc_monitor_remove_all(s);
451         ovsdb_jsonrpc_session_unlock_all(s);
452     }
453
454     ovsdb_jsonrpc_trigger_complete_done(s);
455
456     if (!jsonrpc_session_get_backlog(s->js)) {
457         struct jsonrpc_msg *msg;
458
459         ovsdb_jsonrpc_monitor_flush_all(s);
460
461         msg = jsonrpc_session_recv(s->js);
462         if (msg) {
463             if (msg->type == JSONRPC_REQUEST) {
464                 ovsdb_jsonrpc_session_got_request(s, msg);
465             } else if (msg->type == JSONRPC_NOTIFY) {
466                 ovsdb_jsonrpc_session_got_notify(s, msg);
467             } else {
468                 VLOG_WARN("%s: received unexpected %s message",
469                           jsonrpc_session_get_name(s->js),
470                           jsonrpc_msg_type_to_string(msg->type));
471                 jsonrpc_session_force_reconnect(s->js);
472                 jsonrpc_msg_destroy(msg);
473             }
474         }
475     }
476     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
477 }
478
479 static void
480 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
481                                   const struct ovsdb_jsonrpc_options *options)
482 {
483     jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
484     jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
485     jsonrpc_session_set_dscp(session->js, options->dscp);
486 }
487
488 static void
489 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
490 {
491     struct ovsdb_jsonrpc_session *s, *next;
492
493     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
494         int error = ovsdb_jsonrpc_session_run(s);
495         if (error) {
496             ovsdb_jsonrpc_session_close(s);
497         }
498     }
499 }
500
501 static void
502 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
503 {
504     jsonrpc_session_wait(s->js);
505     if (!jsonrpc_session_get_backlog(s->js)) {
506         if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
507             poll_immediate_wake();
508         } else {
509             jsonrpc_session_recv_wait(s->js);
510         }
511     }
512 }
513
514 static void
515 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
516 {
517     struct ovsdb_jsonrpc_session *s;
518
519     LIST_FOR_EACH (s, node, &remote->sessions) {
520         ovsdb_jsonrpc_session_wait(s);
521     }
522 }
523
524 static void
525 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
526                                        struct simap *usage)
527 {
528     simap_increase(usage, "triggers", hmap_count(&s->triggers));
529     simap_increase(usage, "monitors", hmap_count(&s->monitors));
530     simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
531 }
532
533 static void
534 ovsdb_jsonrpc_session_get_memory_usage_all(
535     const struct ovsdb_jsonrpc_remote *remote,
536     struct simap *usage)
537 {
538     struct ovsdb_jsonrpc_session *s;
539
540     LIST_FOR_EACH (s, node, &remote->sessions) {
541         ovsdb_jsonrpc_session_get_memory_usage(s, usage);
542     }
543 }
544
545 static void
546 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
547 {
548     struct ovsdb_jsonrpc_session *s, *next;
549
550     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
551         ovsdb_jsonrpc_session_close(s);
552     }
553 }
554
555 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
556  * reconnect. */
557 static void
558 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
559 {
560     struct ovsdb_jsonrpc_session *s, *next;
561
562     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
563         jsonrpc_session_force_reconnect(s->js);
564         if (!jsonrpc_session_is_alive(s->js)) {
565             ovsdb_jsonrpc_session_close(s);
566         }
567     }
568 }
569
570 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
571  * 'options'.
572  *
573  * (The dscp value can't be changed directly; the caller must instead close and
574  * re-open the session.) */
575 static void
576 ovsdb_jsonrpc_session_set_all_options(
577     struct ovsdb_jsonrpc_remote *remote,
578     const struct ovsdb_jsonrpc_options *options)
579 {
580     struct ovsdb_jsonrpc_session *s;
581
582     LIST_FOR_EACH (s, node, &remote->sessions) {
583         ovsdb_jsonrpc_session_set_options(s, options);
584     }
585 }
586
587 static bool
588 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_remote *remote,
589                                  struct ovsdb_jsonrpc_remote_status *status)
590 {
591     const struct ovsdb_jsonrpc_session *s;
592     const struct jsonrpc_session *js;
593     struct ovsdb_lock_waiter *waiter;
594     struct reconnect_stats rstats;
595     struct ds locks_held, locks_waiting, locks_lost;
596
597     status->bound_port = (remote->listener
598                           ? pstream_get_bound_port(remote->listener)
599                           : htons(0));
600
601     if (list_is_empty(&remote->sessions)) {
602         return false;
603     }
604     s = CONTAINER_OF(remote->sessions.next, struct ovsdb_jsonrpc_session, node);
605     js = s->js;
606
607     status->is_connected = jsonrpc_session_is_connected(js);
608     status->last_error = jsonrpc_session_get_status(js);
609
610     jsonrpc_session_get_reconnect_stats(js, &rstats);
611     status->state = rstats.state;
612     status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
613         ? UINT_MAX : rstats.msec_since_connect / 1000;
614     status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
615         ? UINT_MAX : rstats.msec_since_disconnect / 1000;
616
617     ds_init(&locks_held);
618     ds_init(&locks_waiting);
619     ds_init(&locks_lost);
620     HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
621         struct ds *string;
622
623         string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
624                   : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
625                   : &locks_lost);
626         if (string->length) {
627             ds_put_char(string, ' ');
628         }
629         ds_put_cstr(string, waiter->lock_name);
630     }
631     status->locks_held = ds_steal_cstr(&locks_held);
632     status->locks_waiting = ds_steal_cstr(&locks_waiting);
633     status->locks_lost = ds_steal_cstr(&locks_lost);
634
635     status->n_connections = list_size(&remote->sessions);
636
637     return true;
638 }
639
640 /* Examines 'request' to determine the database to which it relates, and then
641  * searches 's' to find that database:
642  *
643  *    - If successful, returns the database and sets '*replyp' to NULL.
644  *
645  *    - If no such database exists, returns NULL and sets '*replyp' to an
646  *      appropriate JSON-RPC error reply, owned by the caller. */
647 static struct ovsdb *
648 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session *s,
649                         const struct jsonrpc_msg *request,
650                         struct jsonrpc_msg **replyp)
651 {
652     struct json_array *params;
653     struct ovsdb_error *error;
654     const char *db_name;
655     struct ovsdb *db;
656
657     params = json_array(request->params);
658     if (!params->n || params->elems[0]->type != JSON_STRING) {
659         error = ovsdb_syntax_error(
660             request->params, NULL,
661             "%s request params must begin with <db-name>", request->method);
662         goto error;
663     }
664
665     db_name = params->elems[0]->u.string;
666     db = shash_find_data(&s->up.server->dbs, db_name);
667     if (!db) {
668         error = ovsdb_syntax_error(
669             request->params, "unknown database",
670             "%s request specifies unknown database %s",
671             request->method, db_name);
672         goto error;
673     }
674
675     *replyp = NULL;
676     return db;
677
678 error:
679     *replyp = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
680     ovsdb_error_destroy(error);
681     return NULL;
682 }
683
684 static struct ovsdb_error *
685 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
686                                       const char **lock_namep)
687 {
688     const struct json_array *params;
689
690     params = json_array(request->params);
691     if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
692         !ovsdb_parser_is_id(json_string(params->elems[0]))) {
693         *lock_namep = NULL;
694         return ovsdb_syntax_error(request->params, NULL,
695                                   "%s request params must be <id>",
696                                   request->method);
697     }
698
699     *lock_namep = json_string(params->elems[0]);
700     return NULL;
701 }
702
703 static void
704 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
705                              const char *lock_name,
706                              const char *method)
707 {
708     struct ovsdb_jsonrpc_session *s;
709     struct json *params;
710
711     s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
712     params = json_array_create_1(json_string_create(lock_name));
713     ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params));
714 }
715
716 static struct jsonrpc_msg *
717 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
718                            struct jsonrpc_msg *request,
719                            enum ovsdb_lock_mode mode)
720 {
721     struct ovsdb_lock_waiter *waiter;
722     struct jsonrpc_msg *reply;
723     struct ovsdb_error *error;
724     struct ovsdb_session *victim;
725     const char *lock_name;
726     struct json *result;
727
728     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
729     if (error) {
730         goto error;
731     }
732
733     /* Report error if this session has issued a "lock" or "steal" without a
734      * matching "unlock" for this lock. */
735     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
736     if (waiter) {
737         error = ovsdb_syntax_error(
738             request->params, NULL,
739             "must issue \"unlock\" before new \"%s\"", request->method);
740         goto error;
741     }
742
743     /* Get the lock, add us as a waiter. */
744     waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
745                                &victim);
746     if (victim) {
747         ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
748     }
749
750     result = json_object_create();
751     json_object_put(result, "locked",
752                     json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
753
754     return jsonrpc_create_reply(result, request->id);
755
756 error:
757     reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
758     ovsdb_error_destroy(error);
759     return reply;
760 }
761
762 static void
763 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
764 {
765     struct ovsdb_lock_waiter *waiter, *next;
766
767     HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
768         ovsdb_jsonrpc_session_unlock__(waiter);
769     }
770 }
771
772 static void
773 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
774 {
775     struct ovsdb_lock *lock = waiter->lock;
776
777     if (lock) {
778         struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
779         if (new_owner) {
780             ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
781         } else {
782             /* ovsdb_server_lock() might have freed 'lock'. */
783         }
784     }
785
786     ovsdb_lock_waiter_destroy(waiter);
787 }
788
789 static struct jsonrpc_msg *
790 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
791                              struct jsonrpc_msg *request)
792 {
793     struct ovsdb_lock_waiter *waiter;
794     struct jsonrpc_msg *reply;
795     struct ovsdb_error *error;
796     const char *lock_name;
797
798     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
799     if (error) {
800         goto error;
801     }
802
803     /* Report error if this session has not issued a "lock" or "steal" for this
804      * lock. */
805     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
806     if (!waiter) {
807         error = ovsdb_syntax_error(
808             request->params, NULL, "\"unlock\" without \"lock\" or \"steal\"");
809         goto error;
810     }
811
812     ovsdb_jsonrpc_session_unlock__(waiter);
813
814     return jsonrpc_create_reply(json_object_create(), request->id);
815
816 error:
817     reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
818     ovsdb_error_destroy(error);
819     return reply;
820 }
821
822 static struct jsonrpc_msg *
823 execute_transaction(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
824                     struct jsonrpc_msg *request)
825 {
826     ovsdb_jsonrpc_trigger_create(s, db, request->id, request->params);
827     request->id = NULL;
828     request->params = NULL;
829     jsonrpc_msg_destroy(request);
830     return NULL;
831 }
832
833 static void
834 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
835                                   struct jsonrpc_msg *request)
836 {
837     struct jsonrpc_msg *reply;
838
839     if (!strcmp(request->method, "transact")) {
840         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
841         if (!reply) {
842             reply = execute_transaction(s, db, request);
843         }
844     } else if (!strcmp(request->method, "monitor")) {
845         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
846         if (!reply) {
847             reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
848                                                  request->id);
849         }
850     } else if (!strcmp(request->method, "monitor_cancel")) {
851         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
852                                              request->id);
853     } else if (!strcmp(request->method, "get_schema")) {
854         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
855         if (!reply) {
856             reply = jsonrpc_create_reply(ovsdb_schema_to_json(db->schema),
857                                          request->id);
858         }
859     } else if (!strcmp(request->method, "list_dbs")) {
860         size_t n_dbs = shash_count(&s->up.server->dbs);
861         struct shash_node *node;
862         struct json **dbs;
863         size_t i;
864
865         dbs = xmalloc(n_dbs * sizeof *dbs);
866         i = 0;
867         SHASH_FOR_EACH (node, &s->up.server->dbs) {
868             dbs[i++] = json_string_create(node->name);
869         }
870         reply = jsonrpc_create_reply(json_array_create(dbs, n_dbs),
871                                      request->id);
872     } else if (!strcmp(request->method, "lock")) {
873         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
874     } else if (!strcmp(request->method, "steal")) {
875         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
876     } else if (!strcmp(request->method, "unlock")) {
877         reply = ovsdb_jsonrpc_session_unlock(s, request);
878     } else if (!strcmp(request->method, "echo")) {
879         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
880     } else {
881         reply = jsonrpc_create_error(json_string_create("unknown method"),
882                                      request->id);
883     }
884
885     if (reply) {
886         jsonrpc_msg_destroy(request);
887         ovsdb_jsonrpc_session_send(s, reply);
888     }
889 }
890
891 static void
892 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
893 {
894     if (json_array(request->params)->n == 1) {
895         struct ovsdb_jsonrpc_trigger *t;
896         struct json *id;
897
898         id = request->params->u.array.elems[0];
899         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
900         if (t) {
901             ovsdb_jsonrpc_trigger_complete(t);
902         }
903     }
904 }
905
906 static void
907 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
908                                  struct jsonrpc_msg *request)
909 {
910     if (!strcmp(request->method, "cancel")) {
911         execute_cancel(s, request);
912     }
913     jsonrpc_msg_destroy(request);
914 }
915
916 static void
917 ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s,
918                            struct jsonrpc_msg *msg)
919 {
920     ovsdb_jsonrpc_monitor_flush_all(s);
921     jsonrpc_session_send(s->js, msg);
922 }
923 \f
924 /* JSON-RPC database server triggers.
925  *
926  * (Every transaction is treated as a trigger even if it doesn't actually have
927  * any "wait" operations.) */
928
929 struct ovsdb_jsonrpc_trigger {
930     struct ovsdb_trigger trigger;
931     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
932     struct json *id;
933 };
934
935 static void
936 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
937                              struct json *id, struct json *params)
938 {
939     struct ovsdb_jsonrpc_trigger *t;
940     size_t hash;
941
942     /* Check for duplicate ID. */
943     hash = json_hash(id, 0);
944     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
945     if (t) {
946         struct jsonrpc_msg *msg;
947
948         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
949                                    id);
950         ovsdb_jsonrpc_session_send(s, msg);
951         json_destroy(id);
952         json_destroy(params);
953         return;
954     }
955
956     /* Insert into trigger table. */
957     t = xmalloc(sizeof *t);
958     ovsdb_trigger_init(&s->up, db, &t->trigger, params, time_msec());
959     t->id = id;
960     hmap_insert(&s->triggers, &t->hmap_node, hash);
961
962     /* Complete early if possible. */
963     if (ovsdb_trigger_is_complete(&t->trigger)) {
964         ovsdb_jsonrpc_trigger_complete(t);
965     }
966 }
967
968 static struct ovsdb_jsonrpc_trigger *
969 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
970                            const struct json *id, size_t hash)
971 {
972     struct ovsdb_jsonrpc_trigger *t;
973
974     HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
975         if (json_equal(t->id, id)) {
976             return t;
977         }
978     }
979
980     return NULL;
981 }
982
983 static void
984 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
985 {
986     struct ovsdb_jsonrpc_session *s;
987
988     s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
989
990     if (jsonrpc_session_is_connected(s->js)) {
991         struct jsonrpc_msg *reply;
992         struct json *result;
993
994         result = ovsdb_trigger_steal_result(&t->trigger);
995         if (result) {
996             reply = jsonrpc_create_reply(result, t->id);
997         } else {
998             reply = jsonrpc_create_error(json_string_create("canceled"),
999                                          t->id);
1000         }
1001         ovsdb_jsonrpc_session_send(s, reply);
1002     }
1003
1004     json_destroy(t->id);
1005     ovsdb_trigger_destroy(&t->trigger);
1006     hmap_remove(&s->triggers, &t->hmap_node);
1007     free(t);
1008 }
1009
1010 static void
1011 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
1012 {
1013     struct ovsdb_jsonrpc_trigger *t, *next;
1014     HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
1015         ovsdb_jsonrpc_trigger_complete(t);
1016     }
1017 }
1018
1019 static void
1020 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
1021 {
1022     while (!list_is_empty(&s->up.completions)) {
1023         struct ovsdb_jsonrpc_trigger *t
1024             = CONTAINER_OF(s->up.completions.next,
1025                            struct ovsdb_jsonrpc_trigger, trigger.node);
1026         ovsdb_jsonrpc_trigger_complete(t);
1027     }
1028 }
1029 \f
1030 /* Jsonrpc front end monitor. */
1031 struct ovsdb_jsonrpc_monitor {
1032     struct ovsdb_jsonrpc_session *session;
1033     struct ovsdb *db;
1034     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
1035     struct json *monitor_id;
1036     struct ovsdb_monitor *dbmon;
1037 };
1038
1039 static struct ovsdb_jsonrpc_monitor *
1040 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
1041                            const struct json *monitor_id)
1042 {
1043     struct ovsdb_jsonrpc_monitor *m;
1044
1045     HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
1046         if (json_equal(m->monitor_id, monitor_id)) {
1047             return m;
1048         }
1049     }
1050
1051     return NULL;
1052 }
1053
1054 static bool
1055 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
1056 {
1057     const struct json *json;
1058
1059     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
1060     return json ? json_boolean(json) : default_value;
1061 }
1062
1063 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
1064 ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_monitor *dbmon,
1065                                     const struct ovsdb_table *table,
1066                                     const struct json *monitor_request,
1067                                     size_t *allocated_columns)
1068 {
1069     const struct ovsdb_table_schema *ts = table->schema;
1070     enum ovsdb_monitor_selection select;
1071     const struct json *columns, *select_json;
1072     struct ovsdb_parser parser;
1073     struct ovsdb_error *error;
1074
1075     ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
1076     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1077     select_json = ovsdb_parser_member(&parser, "select",
1078                                       OP_OBJECT | OP_OPTIONAL);
1079     error = ovsdb_parser_finish(&parser);
1080     if (error) {
1081         return error;
1082     }
1083
1084     if (select_json) {
1085         select = 0;
1086         ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1087         if (parse_bool(&parser, "initial", true)) {
1088             select |= OJMS_INITIAL;
1089         }
1090         if (parse_bool(&parser, "insert", true)) {
1091             select |= OJMS_INSERT;
1092         }
1093         if (parse_bool(&parser, "delete", true)) {
1094             select |= OJMS_DELETE;
1095         }
1096         if (parse_bool(&parser, "modify", true)) {
1097             select |= OJMS_MODIFY;
1098         }
1099         error = ovsdb_parser_finish(&parser);
1100         if (error) {
1101             return error;
1102         }
1103     } else {
1104         select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1105     }
1106
1107     ovsdb_monitor_table_add_select(dbmon, table, select);
1108     if (columns) {
1109         size_t i;
1110
1111         if (columns->type != JSON_ARRAY) {
1112             return ovsdb_syntax_error(columns, NULL,
1113                                       "array of column names expected");
1114         }
1115
1116         for (i = 0; i < columns->u.array.n; i++) {
1117             const struct ovsdb_column *column;
1118             const char *s;
1119
1120             if (columns->u.array.elems[i]->type != JSON_STRING) {
1121                 return ovsdb_syntax_error(columns, NULL,
1122                                           "array of column names expected");
1123             }
1124
1125             s = columns->u.array.elems[i]->u.string;
1126             column = shash_find_data(&table->schema->columns, s);
1127             if (!column) {
1128                 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1129                                           "column name", s);
1130             }
1131             ovsdb_monitor_add_column(dbmon, table, column, select,
1132                                      allocated_columns);
1133         }
1134     } else {
1135         struct shash_node *node;
1136
1137         SHASH_FOR_EACH (node, &ts->columns) {
1138             const struct ovsdb_column *column = node->data;
1139             if (column->index != OVSDB_COL_UUID) {
1140                 ovsdb_monitor_add_column(dbmon, table, column, select,
1141                                          allocated_columns);
1142             }
1143         }
1144     }
1145
1146     return NULL;
1147 }
1148
1149 static struct jsonrpc_msg *
1150 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1151                              struct json *params,
1152                              const struct json *request_id)
1153 {
1154     struct ovsdb_jsonrpc_monitor *m = NULL;
1155     struct json *monitor_id, *monitor_requests;
1156     struct ovsdb_error *error = NULL;
1157     struct shash_node *node;
1158     struct json *json;
1159
1160     if (json_array(params)->n != 3) {
1161         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1162         goto error;
1163     }
1164     monitor_id = params->u.array.elems[1];
1165     monitor_requests = params->u.array.elems[2];
1166     if (monitor_requests->type != JSON_OBJECT) {
1167         error = ovsdb_syntax_error(monitor_requests, NULL,
1168                                    "monitor-requests must be object");
1169         goto error;
1170     }
1171
1172     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1173         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1174         goto error;
1175     }
1176
1177     m = xzalloc(sizeof *m);
1178     m->session = s;
1179     m->db = db;
1180     m->dbmon = ovsdb_monitor_create(db, m);
1181     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1182     m->monitor_id = json_clone(monitor_id);
1183
1184     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1185         const struct ovsdb_table *table;
1186         const char *column_name;
1187         size_t allocated_columns;
1188         const struct json *mr_value;
1189         size_t i;
1190
1191         table = ovsdb_get_table(m->db, node->name);
1192         if (!table) {
1193             error = ovsdb_syntax_error(NULL, NULL,
1194                                        "no table named %s", node->name);
1195             goto error;
1196         }
1197
1198         ovsdb_monitor_add_table(m->dbmon, table);
1199
1200         /* Parse columns. */
1201         mr_value = node->data;
1202         allocated_columns = 0;
1203         if (mr_value->type == JSON_ARRAY) {
1204             const struct json_array *array = &mr_value->u.array;
1205
1206             for (i = 0; i < array->n; i++) {
1207                 error = ovsdb_jsonrpc_parse_monitor_request(
1208                     m->dbmon, table, array->elems[i], &allocated_columns);
1209                 if (error) {
1210                     goto error;
1211                 }
1212             }
1213         } else {
1214             error = ovsdb_jsonrpc_parse_monitor_request(
1215                 m->dbmon, table, mr_value, &allocated_columns);
1216             if (error) {
1217                 goto error;
1218             }
1219         }
1220
1221         column_name = ovsdb_monitor_table_check_duplicates(m->dbmon, table);
1222
1223         if (column_name) {
1224             error = ovsdb_syntax_error(mr_value, NULL, "column %s "
1225                                        "mentioned more than once",
1226                                         column_name);
1227             goto error;
1228         }
1229     }
1230
1231     return jsonrpc_create_reply(ovsdb_monitor_get_initial(m->dbmon),
1232                                 request_id);
1233
1234 error:
1235     if (m) {
1236         ovsdb_jsonrpc_monitor_destroy(m);
1237     }
1238
1239     json = ovsdb_error_to_json(error);
1240     ovsdb_error_destroy(error);
1241     return jsonrpc_create_error(json, request_id);
1242 }
1243
1244 static struct jsonrpc_msg *
1245 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1246                              struct json_array *params,
1247                              const struct json *request_id)
1248 {
1249     if (params->n != 1) {
1250         return jsonrpc_create_error(json_string_create("invalid parameters"),
1251                                     request_id);
1252     } else {
1253         struct ovsdb_jsonrpc_monitor *m;
1254
1255         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1256         if (!m) {
1257             return jsonrpc_create_error(json_string_create("unknown monitor"),
1258                                         request_id);
1259         } else {
1260             ovsdb_jsonrpc_monitor_destroy(m);
1261             return jsonrpc_create_reply(json_object_create(), request_id);
1262         }
1263     }
1264 }
1265
1266 static void
1267 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1268 {
1269     struct ovsdb_jsonrpc_monitor *m, *next;
1270
1271     HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1272         ovsdb_jsonrpc_monitor_destroy(m);
1273     }
1274 }
1275
1276 static struct json *
1277 ovsdb_jsonrpc_monitor_compose_table_update(
1278     const struct ovsdb_jsonrpc_monitor *monitor, bool initial)
1279 {
1280     return ovsdb_monitor_compose_table_update(monitor->dbmon, initial);
1281 }
1282
1283 static bool
1284 ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
1285 {
1286     struct ovsdb_jsonrpc_monitor *m;
1287
1288     HMAP_FOR_EACH (m, node, &s->monitors) {
1289         if (ovsdb_monitor_needs_flush(m->dbmon)) {
1290             return true;
1291         }
1292     }
1293
1294     return false;
1295 }
1296
1297 void
1298 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor *m)
1299 {
1300     json_destroy(m->monitor_id);
1301     hmap_remove(&m->session->monitors, &m->node);
1302     ovsdb_monitor_destroy(m->dbmon);
1303     free(m);
1304 }
1305
1306 static void
1307 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
1308 {
1309     struct ovsdb_jsonrpc_monitor *m;
1310
1311     HMAP_FOR_EACH (m, node, &s->monitors) {
1312         struct json *json;
1313
1314         json = ovsdb_jsonrpc_monitor_compose_table_update(m, false);
1315         if (json) {
1316             struct jsonrpc_msg *msg;
1317             struct json *params;
1318
1319             params = json_array_create_2(json_clone(m->monitor_id), json);
1320             msg = jsonrpc_create_notify("update", params);
1321             jsonrpc_session_send(s->js, msg);
1322         }
1323     }
1324 }