json: Move from lib to include/openvswitch.
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "bitmap.h"
23 #include "column.h"
24 #include "openvswitch/dynamic-string.h"
25 #include "monitor.h"
26 #include "openvswitch/json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
30 #include "ovsdb.h"
31 #include "condition.h"
32 #include "poll-loop.h"
33 #include "reconnect.h"
34 #include "row.h"
35 #include "server.h"
36 #include "simap.h"
37 #include "stream.h"
38 #include "table.h"
39 #include "timeval.h"
40 #include "transaction.h"
41 #include "trigger.h"
42 #include "util.h"
43 #include "openvswitch/vlog.h"
44
45 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
46
47 struct ovsdb_jsonrpc_remote;
48 struct ovsdb_jsonrpc_session;
49
50 /* Set false to defeature monitor_cond, causing jsonrpc to respond to
51  * monitor_cond method with an error.  */
52 static bool monitor_cond_enable__ = true;
53
54 /* Message rate-limiting. */
55 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
56
57 /* Sessions. */
58 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
59     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
60 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
61 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
62 static void ovsdb_jsonrpc_session_get_memory_usage_all(
63     const struct ovsdb_jsonrpc_remote *, struct simap *usage);
64 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
65 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
66 static void ovsdb_jsonrpc_session_set_all_options(
67     struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
68 static bool ovsdb_jsonrpc_active_session_get_status(
69     const struct ovsdb_jsonrpc_remote *,
70     struct ovsdb_jsonrpc_remote_status *);
71 static void ovsdb_jsonrpc_session_get_status(
72     const struct ovsdb_jsonrpc_session *,
73     struct ovsdb_jsonrpc_remote_status *);
74 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
75 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
76 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
77                                        struct jsonrpc_msg *);
78
79 /* Triggers. */
80 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
81                                          struct ovsdb *,
82                                          struct json *id, struct json *params);
83 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
84     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
85 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
86 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
87 static void ovsdb_jsonrpc_trigger_complete_done(
88     struct ovsdb_jsonrpc_session *);
89
90 /* Monitors. */
91 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
92     struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
93     enum ovsdb_monitor_version, const struct json *request_id);
94 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_change(
95     struct ovsdb_jsonrpc_session *s,
96     struct json *params,
97     const struct json *request_id);
98 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
99     struct ovsdb_jsonrpc_session *,
100     struct json_array *params,
101     const struct json *request_id);
102 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
103 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
104 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
105 static struct json *ovsdb_jsonrpc_monitor_compose_update(
106     struct ovsdb_jsonrpc_monitor *monitor, bool initial);
107 static struct jsonrpc_msg * ovsdb_jsonrpc_create_notify(
108                                         const struct ovsdb_jsonrpc_monitor *m,
109                                         struct json *params);
110
111 \f
112 /* JSON-RPC database server. */
113
114 struct ovsdb_jsonrpc_server {
115     struct ovsdb_server up;
116     unsigned int n_sessions;
117     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
118 };
119
120 /* A configured remote.  This is either a passive stream listener plus a list
121  * of the currently connected sessions, or a list of exactly one active
122  * session. */
123 struct ovsdb_jsonrpc_remote {
124     struct ovsdb_jsonrpc_server *server;
125     struct pstream *listener;   /* Listener, if passive. */
126     struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
127     uint8_t dscp;
128 };
129
130 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
131     struct ovsdb_jsonrpc_server *, const char *name,
132     const struct ovsdb_jsonrpc_options *options
133 );
134 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
135
136 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
137  *
138  * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
139  * which 'server' should provide access. */
140 struct ovsdb_jsonrpc_server *
141 ovsdb_jsonrpc_server_create(void)
142 {
143     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
144     ovsdb_server_init(&server->up);
145     shash_init(&server->remotes);
146     return server;
147 }
148
149 /* Adds 'db' to the set of databases served out by 'svr'.  Returns true if
150  * successful, false if 'db''s name is the same as some database already in
151  * 'server'. */
152 bool
153 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *svr, struct ovsdb *db)
154 {
155     /* The OVSDB protocol doesn't have a way to notify a client that a
156      * database has been added.  If some client tried to use the database
157      * that we're adding and failed, then forcing it to reconnect seems like
158      * a reasonable way to make it try again.
159      *
160      * If this is too big of a hammer in practice, we could be more selective,
161      * e.g. disconnect only connections that actually tried to use a database
162      * with 'db''s name. */
163     ovsdb_jsonrpc_server_reconnect(svr);
164
165     return ovsdb_server_add_db(&svr->up, db);
166 }
167
168 /* Removes 'db' from the set of databases served out by 'svr'.  Returns
169  * true if successful, false if there is no database associated with 'db'. */
170 bool
171 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server *svr,
172                                struct ovsdb *db)
173 {
174     /* There might be pointers to 'db' from 'svr', such as monitors or
175      * outstanding transactions.  Disconnect all JSON-RPC connections to avoid
176      * accesses to freed memory.
177      *
178      * If this is too big of a hammer in practice, we could be more selective,
179      * e.g. disconnect only connections that actually reference 'db'. */
180     ovsdb_jsonrpc_server_reconnect(svr);
181
182     return ovsdb_server_remove_db(&svr->up, db);
183 }
184
185 void
186 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
187 {
188     struct shash_node *node, *next;
189
190     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
191         ovsdb_jsonrpc_server_del_remote(node);
192     }
193     shash_destroy(&svr->remotes);
194     ovsdb_server_destroy(&svr->up);
195     free(svr);
196 }
197
198 struct ovsdb_jsonrpc_options *
199 ovsdb_jsonrpc_default_options(const char *target)
200 {
201     struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
202     options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
203     options->probe_interval = (stream_or_pstream_needs_probes(target)
204                                ? RECONNECT_DEFAULT_PROBE_INTERVAL
205                                : 0);
206     return options;
207 }
208
209 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
210  * options in the struct ovsdb_jsonrpc_options supplied as the data values.
211  *
212  * A remote is an active or passive stream connection method, e.g. "pssl:" or
213  * "tcp:1.2.3.4". */
214 void
215 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
216                                  const struct shash *new_remotes)
217 {
218     struct shash_node *node, *next;
219
220     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
221         struct ovsdb_jsonrpc_remote *remote = node->data;
222         struct ovsdb_jsonrpc_options *options
223             = shash_find_data(new_remotes, node->name);
224
225         if (!options) {
226             VLOG_INFO("%s: remote deconfigured", node->name);
227             ovsdb_jsonrpc_server_del_remote(node);
228         } else if (options->dscp != remote->dscp) {
229             ovsdb_jsonrpc_server_del_remote(node);
230          }
231     }
232     SHASH_FOR_EACH (node, new_remotes) {
233         const struct ovsdb_jsonrpc_options *options = node->data;
234         struct ovsdb_jsonrpc_remote *remote;
235
236         remote = shash_find_data(&svr->remotes, node->name);
237         if (!remote) {
238             remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
239             if (!remote) {
240                 continue;
241             }
242         }
243
244         ovsdb_jsonrpc_session_set_all_options(remote, options);
245     }
246 }
247
248 static struct ovsdb_jsonrpc_remote *
249 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
250                                 const char *name,
251                                 const struct ovsdb_jsonrpc_options *options)
252 {
253     struct ovsdb_jsonrpc_remote *remote;
254     struct pstream *listener;
255     int error;
256
257     error = jsonrpc_pstream_open(name, &listener, options->dscp);
258     if (error && error != EAFNOSUPPORT) {
259         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, ovs_strerror(error));
260         return NULL;
261     }
262
263     remote = xmalloc(sizeof *remote);
264     remote->server = svr;
265     remote->listener = listener;
266     ovs_list_init(&remote->sessions);
267     remote->dscp = options->dscp;
268     shash_add(&svr->remotes, name, remote);
269
270     if (!listener) {
271         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name, true));
272     }
273     return remote;
274 }
275
276 static void
277 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
278 {
279     struct ovsdb_jsonrpc_remote *remote = node->data;
280
281     ovsdb_jsonrpc_session_close_all(remote);
282     pstream_close(remote->listener);
283     shash_delete(&remote->server->remotes, node);
284     free(remote);
285 }
286
287 /* Stores status information for the remote named 'target', which should have
288  * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
289  * into '*status'.  On success returns true, on failure (if 'svr' doesn't have
290  * a remote named 'target' or if that remote is an outbound remote that has no
291  * active connections) returns false.  On failure, 'status' will be zeroed.
292  */
293 bool
294 ovsdb_jsonrpc_server_get_remote_status(
295     const struct ovsdb_jsonrpc_server *svr, const char *target,
296     struct ovsdb_jsonrpc_remote_status *status)
297 {
298     const struct ovsdb_jsonrpc_remote *remote;
299
300     memset(status, 0, sizeof *status);
301
302     remote = shash_find_data(&svr->remotes, target);
303
304     if (!remote) {
305         return false;
306     }
307
308     if (remote->listener) {
309         status->bound_port = pstream_get_bound_port(remote->listener);
310         status->is_connected = !ovs_list_is_empty(&remote->sessions);
311         status->n_connections = ovs_list_size(&remote->sessions);
312         return true;
313     }
314
315     return ovsdb_jsonrpc_active_session_get_status(remote, status);
316 }
317
318 void
319 ovsdb_jsonrpc_server_free_remote_status(
320     struct ovsdb_jsonrpc_remote_status *status)
321 {
322     free(status->locks_held);
323     free(status->locks_waiting);
324     free(status->locks_lost);
325 }
326
327 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
328  * reconnect. */
329 void
330 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
331 {
332     struct shash_node *node;
333
334     SHASH_FOR_EACH (node, &svr->remotes) {
335         struct ovsdb_jsonrpc_remote *remote = node->data;
336
337         ovsdb_jsonrpc_session_reconnect_all(remote);
338     }
339 }
340
341 void
342 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
343 {
344     struct shash_node *node;
345
346     SHASH_FOR_EACH (node, &svr->remotes) {
347         struct ovsdb_jsonrpc_remote *remote = node->data;
348
349         if (remote->listener) {
350             struct stream *stream;
351             int error;
352
353             error = pstream_accept(remote->listener, &stream);
354             if (!error) {
355                 struct jsonrpc_session *js;
356                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
357                                                      remote->dscp);
358                 ovsdb_jsonrpc_session_create(remote, js);
359             } else if (error != EAGAIN) {
360                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
361                              pstream_get_name(remote->listener),
362                              ovs_strerror(error));
363             }
364         }
365
366         ovsdb_jsonrpc_session_run_all(remote);
367     }
368 }
369
370 void
371 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
372 {
373     struct shash_node *node;
374
375     SHASH_FOR_EACH (node, &svr->remotes) {
376         struct ovsdb_jsonrpc_remote *remote = node->data;
377
378         if (remote->listener) {
379             pstream_wait(remote->listener);
380         }
381
382         ovsdb_jsonrpc_session_wait_all(remote);
383     }
384 }
385
386 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
387  * memory_report(). */
388 void
389 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
390                                       struct simap *usage)
391 {
392     struct shash_node *node;
393
394     simap_increase(usage, "sessions", svr->n_sessions);
395     SHASH_FOR_EACH (node, &svr->remotes) {
396         struct ovsdb_jsonrpc_remote *remote = node->data;
397
398         ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
399     }
400 }
401 \f
402 /* JSON-RPC database server session. */
403
404 struct ovsdb_jsonrpc_session {
405     struct ovs_list node;       /* Element in remote's sessions list. */
406     struct ovsdb_session up;
407     struct ovsdb_jsonrpc_remote *remote;
408
409     /* Triggers. */
410     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
411
412     /* Monitors. */
413     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
414
415     /* Network connectivity. */
416     struct jsonrpc_session *js;  /* JSON-RPC session. */
417     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
418 };
419
420 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
421 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
422 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
423 static void ovsdb_jsonrpc_session_get_memory_usage(
424     const struct ovsdb_jsonrpc_session *, struct simap *usage);
425 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
426                                               struct jsonrpc_msg *);
427 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
428                                              struct jsonrpc_msg *);
429
430 static struct ovsdb_jsonrpc_session *
431 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
432                              struct jsonrpc_session *js)
433 {
434     struct ovsdb_jsonrpc_session *s;
435
436     s = xzalloc(sizeof *s);
437     ovsdb_session_init(&s->up, &remote->server->up);
438     s->remote = remote;
439     ovs_list_push_back(&remote->sessions, &s->node);
440     hmap_init(&s->triggers);
441     hmap_init(&s->monitors);
442     s->js = js;
443     s->js_seqno = jsonrpc_session_get_seqno(js);
444
445     remote->server->n_sessions++;
446
447     return s;
448 }
449
450 static void
451 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
452 {
453     ovsdb_jsonrpc_monitor_remove_all(s);
454     ovsdb_jsonrpc_session_unlock_all(s);
455     ovsdb_jsonrpc_trigger_complete_all(s);
456
457     hmap_destroy(&s->monitors);
458     hmap_destroy(&s->triggers);
459
460     jsonrpc_session_close(s->js);
461     ovs_list_remove(&s->node);
462     s->remote->server->n_sessions--;
463     ovsdb_session_destroy(&s->up);
464     free(s);
465 }
466
467 static int
468 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
469 {
470     jsonrpc_session_run(s->js);
471     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
472         s->js_seqno = jsonrpc_session_get_seqno(s->js);
473         ovsdb_jsonrpc_trigger_complete_all(s);
474         ovsdb_jsonrpc_monitor_remove_all(s);
475         ovsdb_jsonrpc_session_unlock_all(s);
476     }
477
478     ovsdb_jsonrpc_trigger_complete_done(s);
479
480     if (!jsonrpc_session_get_backlog(s->js)) {
481         struct jsonrpc_msg *msg;
482
483         ovsdb_jsonrpc_monitor_flush_all(s);
484
485         msg = jsonrpc_session_recv(s->js);
486         if (msg) {
487             if (msg->type == JSONRPC_REQUEST) {
488                 ovsdb_jsonrpc_session_got_request(s, msg);
489             } else if (msg->type == JSONRPC_NOTIFY) {
490                 ovsdb_jsonrpc_session_got_notify(s, msg);
491             } else {
492                 VLOG_WARN("%s: received unexpected %s message",
493                           jsonrpc_session_get_name(s->js),
494                           jsonrpc_msg_type_to_string(msg->type));
495                 jsonrpc_session_force_reconnect(s->js);
496                 jsonrpc_msg_destroy(msg);
497             }
498         }
499     }
500     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
501 }
502
503 static void
504 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
505                                   const struct ovsdb_jsonrpc_options *options)
506 {
507     jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
508     jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
509     jsonrpc_session_set_dscp(session->js, options->dscp);
510 }
511
512 static void
513 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
514 {
515     struct ovsdb_jsonrpc_session *s, *next;
516
517     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
518         int error = ovsdb_jsonrpc_session_run(s);
519         if (error) {
520             ovsdb_jsonrpc_session_close(s);
521         }
522     }
523 }
524
525 static void
526 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
527 {
528     jsonrpc_session_wait(s->js);
529     if (!jsonrpc_session_get_backlog(s->js)) {
530         if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
531             poll_immediate_wake();
532         } else {
533             jsonrpc_session_recv_wait(s->js);
534         }
535     }
536 }
537
538 static void
539 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
540 {
541     struct ovsdb_jsonrpc_session *s;
542
543     LIST_FOR_EACH (s, node, &remote->sessions) {
544         ovsdb_jsonrpc_session_wait(s);
545     }
546 }
547
548 static void
549 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
550                                        struct simap *usage)
551 {
552     simap_increase(usage, "triggers", hmap_count(&s->triggers));
553     simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
554 }
555
556 static void
557 ovsdb_jsonrpc_session_get_memory_usage_all(
558     const struct ovsdb_jsonrpc_remote *remote,
559     struct simap *usage)
560 {
561     struct ovsdb_jsonrpc_session *s;
562
563     LIST_FOR_EACH (s, node, &remote->sessions) {
564         ovsdb_jsonrpc_session_get_memory_usage(s, usage);
565     }
566 }
567
568 static void
569 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
570 {
571     struct ovsdb_jsonrpc_session *s, *next;
572
573     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
574         ovsdb_jsonrpc_session_close(s);
575     }
576 }
577
578 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
579  * reconnect. */
580 static void
581 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
582 {
583     struct ovsdb_jsonrpc_session *s, *next;
584
585     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
586         jsonrpc_session_force_reconnect(s->js);
587         if (!jsonrpc_session_is_alive(s->js)) {
588             ovsdb_jsonrpc_session_close(s);
589         }
590     }
591 }
592
593 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
594  * 'options'.
595  *
596  * (The dscp value can't be changed directly; the caller must instead close and
597  * re-open the session.) */
598 static void
599 ovsdb_jsonrpc_session_set_all_options(
600     struct ovsdb_jsonrpc_remote *remote,
601     const struct ovsdb_jsonrpc_options *options)
602 {
603     struct ovsdb_jsonrpc_session *s;
604
605     LIST_FOR_EACH (s, node, &remote->sessions) {
606         ovsdb_jsonrpc_session_set_options(s, options);
607     }
608 }
609
610 /* Sets the 'status' of for the 'remote' with an outgoing connection.   */
611 static bool
612 ovsdb_jsonrpc_active_session_get_status(
613     const struct ovsdb_jsonrpc_remote *remote,
614     struct ovsdb_jsonrpc_remote_status *status)
615 {
616     const struct ovs_list *sessions = &remote->sessions;
617     const struct ovsdb_jsonrpc_session *s;
618
619     if (ovs_list_is_empty(sessions)) {
620         return false;
621     }
622
623     ovs_assert(ovs_list_is_singleton(sessions));
624     s = CONTAINER_OF(ovs_list_front(sessions), struct ovsdb_jsonrpc_session, node);
625     ovsdb_jsonrpc_session_get_status(s, status);
626     status->n_connections = 1;
627
628     return true;
629 }
630
631 static void
632 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_session *session,
633                                  struct ovsdb_jsonrpc_remote_status *status)
634 {
635     const struct ovsdb_jsonrpc_session *s = session;
636     const struct jsonrpc_session *js;
637     struct ovsdb_lock_waiter *waiter;
638     struct reconnect_stats rstats;
639     struct ds locks_held, locks_waiting, locks_lost;
640
641     js = s->js;
642
643     status->is_connected = jsonrpc_session_is_connected(js);
644     status->last_error = jsonrpc_session_get_status(js);
645
646     jsonrpc_session_get_reconnect_stats(js, &rstats);
647     status->state = rstats.state;
648     status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
649         ? UINT_MAX : rstats.msec_since_connect / 1000;
650     status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
651         ? UINT_MAX : rstats.msec_since_disconnect / 1000;
652
653     ds_init(&locks_held);
654     ds_init(&locks_waiting);
655     ds_init(&locks_lost);
656     HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
657         struct ds *string;
658
659         string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
660                   : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
661                   : &locks_lost);
662         if (string->length) {
663             ds_put_char(string, ' ');
664         }
665         ds_put_cstr(string, waiter->lock_name);
666     }
667     status->locks_held = ds_steal_cstr(&locks_held);
668     status->locks_waiting = ds_steal_cstr(&locks_waiting);
669     status->locks_lost = ds_steal_cstr(&locks_lost);
670 }
671
672 /* Examines 'request' to determine the database to which it relates, and then
673  * searches 's' to find that database:
674  *
675  *    - If successful, returns the database and sets '*replyp' to NULL.
676  *
677  *    - If no such database exists, returns NULL and sets '*replyp' to an
678  *      appropriate JSON-RPC error reply, owned by the caller. */
679 static struct ovsdb *
680 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session *s,
681                         const struct jsonrpc_msg *request,
682                         struct jsonrpc_msg **replyp)
683 {
684     struct json_array *params;
685     struct ovsdb_error *error;
686     const char *db_name;
687     struct ovsdb *db;
688
689     params = json_array(request->params);
690     if (!params->n || params->elems[0]->type != JSON_STRING) {
691         error = ovsdb_syntax_error(
692             request->params, NULL,
693             "%s request params must begin with <db-name>", request->method);
694         goto error;
695     }
696
697     db_name = params->elems[0]->u.string;
698     db = shash_find_data(&s->up.server->dbs, db_name);
699     if (!db) {
700         error = ovsdb_syntax_error(
701             request->params, "unknown database",
702             "%s request specifies unknown database %s",
703             request->method, db_name);
704         goto error;
705     }
706
707     *replyp = NULL;
708     return db;
709
710 error:
711     *replyp = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
712     ovsdb_error_destroy(error);
713     return NULL;
714 }
715
716 static struct ovsdb_error *
717 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
718                                       const char **lock_namep)
719 {
720     const struct json_array *params;
721
722     params = json_array(request->params);
723     if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
724         !ovsdb_parser_is_id(json_string(params->elems[0]))) {
725         *lock_namep = NULL;
726         return ovsdb_syntax_error(request->params, NULL,
727                                   "%s request params must be <id>",
728                                   request->method);
729     }
730
731     *lock_namep = json_string(params->elems[0]);
732     return NULL;
733 }
734
735 static void
736 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
737                              const char *lock_name,
738                              const char *method)
739 {
740     struct ovsdb_jsonrpc_session *s;
741     struct json *params;
742
743     s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
744     params = json_array_create_1(json_string_create(lock_name));
745     ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params));
746 }
747
748 static struct jsonrpc_msg *
749 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
750                            struct jsonrpc_msg *request,
751                            enum ovsdb_lock_mode mode)
752 {
753     struct ovsdb_lock_waiter *waiter;
754     struct jsonrpc_msg *reply;
755     struct ovsdb_error *error;
756     struct ovsdb_session *victim;
757     const char *lock_name;
758     struct json *result;
759
760     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
761     if (error) {
762         goto error;
763     }
764
765     /* Report error if this session has issued a "lock" or "steal" without a
766      * matching "unlock" for this lock. */
767     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
768     if (waiter) {
769         error = ovsdb_syntax_error(
770             request->params, NULL,
771             "must issue \"unlock\" before new \"%s\"", request->method);
772         goto error;
773     }
774
775     /* Get the lock, add us as a waiter. */
776     waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
777                                &victim);
778     if (victim) {
779         ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
780     }
781
782     result = json_object_create();
783     json_object_put(result, "locked",
784                     json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
785
786     return jsonrpc_create_reply(result, request->id);
787
788 error:
789     reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
790     ovsdb_error_destroy(error);
791     return reply;
792 }
793
794 static void
795 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
796 {
797     struct ovsdb_lock_waiter *waiter, *next;
798
799     HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
800         ovsdb_jsonrpc_session_unlock__(waiter);
801     }
802 }
803
804 static void
805 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
806 {
807     struct ovsdb_lock *lock = waiter->lock;
808
809     if (lock) {
810         struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
811         if (new_owner) {
812             ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
813         } else {
814             /* ovsdb_server_lock() might have freed 'lock'. */
815         }
816     }
817
818     ovsdb_lock_waiter_destroy(waiter);
819 }
820
821 static struct jsonrpc_msg *
822 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
823                              struct jsonrpc_msg *request)
824 {
825     struct ovsdb_lock_waiter *waiter;
826     struct jsonrpc_msg *reply;
827     struct ovsdb_error *error;
828     const char *lock_name;
829
830     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
831     if (error) {
832         goto error;
833     }
834
835     /* Report error if this session has not issued a "lock" or "steal" for this
836      * lock. */
837     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
838     if (!waiter) {
839         error = ovsdb_syntax_error(
840             request->params, NULL, "\"unlock\" without \"lock\" or \"steal\"");
841         goto error;
842     }
843
844     ovsdb_jsonrpc_session_unlock__(waiter);
845
846     return jsonrpc_create_reply(json_object_create(), request->id);
847
848 error:
849     reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
850     ovsdb_error_destroy(error);
851     return reply;
852 }
853
854 static struct jsonrpc_msg *
855 execute_transaction(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
856                     struct jsonrpc_msg *request)
857 {
858     ovsdb_jsonrpc_trigger_create(s, db, request->id, request->params);
859     request->id = NULL;
860     request->params = NULL;
861     jsonrpc_msg_destroy(request);
862     return NULL;
863 }
864
865 static void
866 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
867                                   struct jsonrpc_msg *request)
868 {
869     struct jsonrpc_msg *reply;
870
871     if (!strcmp(request->method, "transact")) {
872         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
873         if (!reply) {
874             reply = execute_transaction(s, db, request);
875         }
876     } else if (!strcmp(request->method, "monitor") ||
877                (monitor_cond_enable__ && !strcmp(request->method,
878                                                  "monitor_cond"))) {
879         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
880         if (!reply) {
881             int l = strlen(request->method) - strlen("monitor");
882             enum ovsdb_monitor_version version = l ? OVSDB_MONITOR_V2
883                                                    : OVSDB_MONITOR_V1;
884             reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
885                                                  version, request->id);
886         }
887     } else if (!strcmp(request->method, "monitor_cond_change")) {
888         reply = ovsdb_jsonrpc_monitor_cond_change(s, request->params,
889                                                   request->id);
890     } else if (!strcmp(request->method, "monitor_cancel")) {
891         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
892                                              request->id);
893     } else if (!strcmp(request->method, "get_schema")) {
894         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
895         if (!reply) {
896             reply = jsonrpc_create_reply(ovsdb_schema_to_json(db->schema),
897                                          request->id);
898         }
899     } else if (!strcmp(request->method, "list_dbs")) {
900         size_t n_dbs = shash_count(&s->up.server->dbs);
901         struct shash_node *node;
902         struct json **dbs;
903         size_t i;
904
905         dbs = xmalloc(n_dbs * sizeof *dbs);
906         i = 0;
907         SHASH_FOR_EACH (node, &s->up.server->dbs) {
908             dbs[i++] = json_string_create(node->name);
909         }
910         reply = jsonrpc_create_reply(json_array_create(dbs, n_dbs),
911                                      request->id);
912     } else if (!strcmp(request->method, "lock")) {
913         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
914     } else if (!strcmp(request->method, "steal")) {
915         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
916     } else if (!strcmp(request->method, "unlock")) {
917         reply = ovsdb_jsonrpc_session_unlock(s, request);
918     } else if (!strcmp(request->method, "echo")) {
919         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
920     } else {
921         reply = jsonrpc_create_error(json_string_create("unknown method"),
922                                      request->id);
923     }
924
925     if (reply) {
926         jsonrpc_msg_destroy(request);
927         ovsdb_jsonrpc_session_send(s, reply);
928     }
929 }
930
931 static void
932 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
933 {
934     if (json_array(request->params)->n == 1) {
935         struct ovsdb_jsonrpc_trigger *t;
936         struct json *id;
937
938         id = request->params->u.array.elems[0];
939         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
940         if (t) {
941             ovsdb_jsonrpc_trigger_complete(t);
942         }
943     }
944 }
945
946 static void
947 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
948                                  struct jsonrpc_msg *request)
949 {
950     if (!strcmp(request->method, "cancel")) {
951         execute_cancel(s, request);
952     }
953     jsonrpc_msg_destroy(request);
954 }
955
956 static void
957 ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s,
958                            struct jsonrpc_msg *msg)
959 {
960     ovsdb_jsonrpc_monitor_flush_all(s);
961     jsonrpc_session_send(s->js, msg);
962 }
963 \f
964 /* JSON-RPC database server triggers.
965  *
966  * (Every transaction is treated as a trigger even if it doesn't actually have
967  * any "wait" operations.) */
968
969 struct ovsdb_jsonrpc_trigger {
970     struct ovsdb_trigger trigger;
971     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
972     struct json *id;
973 };
974
975 static void
976 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
977                              struct json *id, struct json *params)
978 {
979     struct ovsdb_jsonrpc_trigger *t;
980     size_t hash;
981
982     /* Check for duplicate ID. */
983     hash = json_hash(id, 0);
984     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
985     if (t) {
986         struct jsonrpc_msg *msg;
987
988         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
989                                    id);
990         ovsdb_jsonrpc_session_send(s, msg);
991         json_destroy(id);
992         json_destroy(params);
993         return;
994     }
995
996     /* Insert into trigger table. */
997     t = xmalloc(sizeof *t);
998     ovsdb_trigger_init(&s->up, db, &t->trigger, params, time_msec());
999     t->id = id;
1000     hmap_insert(&s->triggers, &t->hmap_node, hash);
1001
1002     /* Complete early if possible. */
1003     if (ovsdb_trigger_is_complete(&t->trigger)) {
1004         ovsdb_jsonrpc_trigger_complete(t);
1005     }
1006 }
1007
1008 static struct ovsdb_jsonrpc_trigger *
1009 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
1010                            const struct json *id, size_t hash)
1011 {
1012     struct ovsdb_jsonrpc_trigger *t;
1013
1014     HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
1015         if (json_equal(t->id, id)) {
1016             return t;
1017         }
1018     }
1019
1020     return NULL;
1021 }
1022
1023 static void
1024 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
1025 {
1026     struct ovsdb_jsonrpc_session *s;
1027
1028     s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
1029
1030     if (jsonrpc_session_is_connected(s->js)) {
1031         struct jsonrpc_msg *reply;
1032         struct json *result;
1033
1034         result = ovsdb_trigger_steal_result(&t->trigger);
1035         if (result) {
1036             reply = jsonrpc_create_reply(result, t->id);
1037         } else {
1038             reply = jsonrpc_create_error(json_string_create("canceled"),
1039                                          t->id);
1040         }
1041         ovsdb_jsonrpc_session_send(s, reply);
1042     }
1043
1044     json_destroy(t->id);
1045     ovsdb_trigger_destroy(&t->trigger);
1046     hmap_remove(&s->triggers, &t->hmap_node);
1047     free(t);
1048 }
1049
1050 static void
1051 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
1052 {
1053     struct ovsdb_jsonrpc_trigger *t, *next;
1054     HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
1055         ovsdb_jsonrpc_trigger_complete(t);
1056     }
1057 }
1058
1059 static void
1060 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
1061 {
1062     while (!ovs_list_is_empty(&s->up.completions)) {
1063         struct ovsdb_jsonrpc_trigger *t
1064             = CONTAINER_OF(s->up.completions.next,
1065                            struct ovsdb_jsonrpc_trigger, trigger.node);
1066         ovsdb_jsonrpc_trigger_complete(t);
1067     }
1068 }
1069 \f
1070 /* Jsonrpc front end monitor. */
1071 struct ovsdb_jsonrpc_monitor {
1072     struct ovsdb_jsonrpc_session *session;
1073     struct ovsdb *db;
1074     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
1075     struct json *monitor_id;
1076     struct ovsdb_monitor *dbmon;
1077     uint64_t unflushed;         /* The first transaction that has not been
1078                                        flushed to the jsonrpc remote client. */
1079     enum ovsdb_monitor_version version;
1080     struct ovsdb_monitor_session_condition *condition;/* Session's condition */
1081 };
1082
1083 static struct ovsdb_jsonrpc_monitor *
1084 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
1085                            const struct json *monitor_id)
1086 {
1087     struct ovsdb_jsonrpc_monitor *m;
1088
1089     HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
1090         if (json_equal(m->monitor_id, monitor_id)) {
1091             return m;
1092         }
1093     }
1094
1095     return NULL;
1096 }
1097
1098 static bool
1099 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
1100 {
1101     const struct json *json;
1102
1103     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
1104     return json ? json_boolean(json) : default_value;
1105 }
1106
1107 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
1108 ovsdb_jsonrpc_parse_monitor_request(
1109                                struct ovsdb_monitor *dbmon,
1110                                const struct ovsdb_table *table,
1111                                struct ovsdb_monitor_session_condition *cond,
1112                                const struct json *monitor_request)
1113 {
1114     const struct ovsdb_table_schema *ts = table->schema;
1115     enum ovsdb_monitor_selection select;
1116     const struct json *columns, *select_json, *where = NULL;
1117     struct ovsdb_parser parser;
1118     struct ovsdb_error *error;
1119
1120     ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
1121     if (cond) {
1122         where = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
1123     }
1124     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1125
1126     select_json = ovsdb_parser_member(&parser, "select",
1127                                       OP_OBJECT | OP_OPTIONAL);
1128
1129     error = ovsdb_parser_finish(&parser);
1130     if (error) {
1131         return error;
1132     }
1133
1134     if (select_json) {
1135         select = 0;
1136         ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1137         if (parse_bool(&parser, "initial", true)) {
1138             select |= OJMS_INITIAL;
1139         }
1140         if (parse_bool(&parser, "insert", true)) {
1141             select |= OJMS_INSERT;
1142         }
1143         if (parse_bool(&parser, "delete", true)) {
1144             select |= OJMS_DELETE;
1145         }
1146         if (parse_bool(&parser, "modify", true)) {
1147             select |= OJMS_MODIFY;
1148         }
1149         error = ovsdb_parser_finish(&parser);
1150         if (error) {
1151             return error;
1152         }
1153     } else {
1154         select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1155     }
1156
1157     ovsdb_monitor_table_add_select(dbmon, table, select);
1158     if (columns) {
1159         size_t i;
1160
1161         if (columns->type != JSON_ARRAY) {
1162             return ovsdb_syntax_error(columns, NULL,
1163                                       "array of column names expected");
1164         }
1165
1166         for (i = 0; i < columns->u.array.n; i++) {
1167             const struct ovsdb_column *column;
1168             const char *s;
1169
1170             if (columns->u.array.elems[i]->type != JSON_STRING) {
1171                 return ovsdb_syntax_error(columns, NULL,
1172                                           "array of column names expected");
1173             }
1174
1175             s = columns->u.array.elems[i]->u.string;
1176             column = shash_find_data(&table->schema->columns, s);
1177             if (!column) {
1178                 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1179                                           "column name", s);
1180             }
1181             if (ovsdb_monitor_add_column(dbmon, table, column,
1182                                          select, true)) {
1183                 return ovsdb_syntax_error(columns, NULL, "column %s "
1184                                           "mentioned more than once",
1185                                           column->name);
1186             }
1187         }
1188     } else {
1189         struct shash_node *node;
1190
1191         SHASH_FOR_EACH (node, &ts->columns) {
1192             const struct ovsdb_column *column = node->data;
1193             if (column->index != OVSDB_COL_UUID) {
1194                 if (ovsdb_monitor_add_column(dbmon, table, column,
1195                                              select, true)) {
1196                     return ovsdb_syntax_error(columns, NULL, "column %s "
1197                                               "mentioned more than once",
1198                                               column->name);
1199                 }
1200             }
1201         }
1202     }
1203     if (cond) {
1204         error = ovsdb_monitor_table_condition_create(cond, table, where);
1205         if (error) {
1206             return error;
1207         }
1208     }
1209
1210     return NULL;
1211 }
1212
1213 static struct jsonrpc_msg *
1214 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1215                              struct json *params,
1216                              enum ovsdb_monitor_version version,
1217                              const struct json *request_id)
1218 {
1219     struct ovsdb_jsonrpc_monitor *m = NULL;
1220     struct ovsdb_monitor *dbmon = NULL;
1221     struct json *monitor_id, *monitor_requests;
1222     struct ovsdb_error *error = NULL;
1223     struct shash_node *node;
1224     struct json *json;
1225
1226     if (json_array(params)->n != 3) {
1227         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1228         goto error;
1229     }
1230     monitor_id = params->u.array.elems[1];
1231     monitor_requests = params->u.array.elems[2];
1232     if (monitor_requests->type != JSON_OBJECT) {
1233         error = ovsdb_syntax_error(monitor_requests, NULL,
1234                                    "monitor-requests must be object");
1235         goto error;
1236     }
1237
1238     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1239         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1240         goto error;
1241     }
1242
1243     m = xzalloc(sizeof *m);
1244     m->session = s;
1245     m->db = db;
1246     m->dbmon = ovsdb_monitor_create(db, m);
1247     if (version == OVSDB_MONITOR_V2) {
1248         m->condition = ovsdb_monitor_session_condition_create();
1249     }
1250     m->unflushed = 0;
1251     m->version = version;
1252     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1253     m->monitor_id = json_clone(monitor_id);
1254
1255     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1256         const struct ovsdb_table *table;
1257         const struct json *mr_value;
1258         size_t i;
1259
1260         table = ovsdb_get_table(m->db, node->name);
1261         if (!table) {
1262             error = ovsdb_syntax_error(NULL, NULL,
1263                                        "no table named %s", node->name);
1264             goto error;
1265         }
1266
1267         ovsdb_monitor_add_table(m->dbmon, table);
1268
1269         /* Parse columns. */
1270         mr_value = node->data;
1271         if (mr_value->type == JSON_ARRAY) {
1272             const struct json_array *array = &mr_value->u.array;
1273
1274             for (i = 0; i < array->n; i++) {
1275                 error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
1276                                                             table,
1277                                                             m->condition,
1278                                                             array->elems[i]);
1279                 if (error) {
1280                     goto error;
1281                 }
1282             }
1283         } else {
1284             error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
1285                                                         table,
1286                                                         m->condition,
1287                                                         mr_value);
1288             if (error) {
1289                 goto error;
1290             }
1291         }
1292     }
1293
1294     dbmon = ovsdb_monitor_add(m->dbmon);
1295     if (dbmon != m->dbmon) {
1296         /* Found an exisiting dbmon, reuse the current one. */
1297         ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
1298         ovsdb_monitor_add_jsonrpc_monitor(dbmon, m);
1299         m->dbmon = dbmon;
1300     }
1301
1302     /* Only now we can bind session's condition to ovsdb_monitor */
1303     if (m->condition) {
1304         ovsdb_monitor_condition_bind(m->dbmon, m->condition);
1305     }
1306
1307     ovsdb_monitor_get_initial(m->dbmon);
1308     json = ovsdb_jsonrpc_monitor_compose_update(m, true);
1309     json = json ? json : json_object_create();
1310     return jsonrpc_create_reply(json, request_id);
1311
1312 error:
1313     if (m) {
1314         ovsdb_jsonrpc_monitor_destroy(m);
1315     }
1316
1317     json = ovsdb_error_to_json(error);
1318     ovsdb_error_destroy(error);
1319     return jsonrpc_create_error(json, request_id);
1320 }
1321
1322 static struct ovsdb_error *
1323 ovsdb_jsonrpc_parse_monitor_cond_change_request(
1324                                 struct ovsdb_jsonrpc_monitor *m,
1325                                 const struct ovsdb_table *table,
1326                                 const struct json *cond_change_req)
1327 {
1328     const struct ovsdb_table_schema *ts = table->schema;
1329     const struct json *condition, *columns;
1330     struct ovsdb_parser parser;
1331     struct ovsdb_error *error;
1332
1333     ovsdb_parser_init(&parser, cond_change_req, "table %s", ts->name);
1334     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1335     condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
1336
1337     error = ovsdb_parser_finish(&parser);
1338     if (error) {
1339         return error;
1340     }
1341
1342     if (columns) {
1343         error = ovsdb_syntax_error(cond_change_req, NULL, "changing columns "
1344                                    "is unsupported");
1345         return error;
1346     }
1347     error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition, table,
1348                                                  condition);
1349
1350     return error;
1351 }
1352
1353 static struct jsonrpc_msg *
1354 ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
1355                                   struct json *params,
1356                                   const struct json *request_id)
1357 {
1358     struct ovsdb_error *error;
1359     struct ovsdb_jsonrpc_monitor *m;
1360     struct json *monitor_cond_change_reqs;
1361     struct shash_node *node;
1362     struct json *json;
1363
1364     if (json_array(params)->n != 3) {
1365         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1366         goto error;
1367     }
1368
1369     m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
1370     if (!m) {
1371         error = ovsdb_syntax_error(request_id, NULL,
1372                                    "unknown monitor session");
1373         goto error;
1374     }
1375
1376     monitor_cond_change_reqs = params->u.array.elems[2];
1377     if (monitor_cond_change_reqs->type != JSON_OBJECT) {
1378         error =
1379             ovsdb_syntax_error(NULL, NULL,
1380                                "monitor-cond-change-requests must be object");
1381         goto error;
1382     }
1383
1384     SHASH_FOR_EACH (node, json_object(monitor_cond_change_reqs)) {
1385         const struct ovsdb_table *table;
1386         const struct json *mr_value;
1387         size_t i;
1388
1389         table = ovsdb_get_table(m->db, node->name);
1390         if (!table) {
1391             error = ovsdb_syntax_error(NULL, NULL,
1392                                        "no table named %s", node->name);
1393             goto error;
1394         }
1395         if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
1396             error = ovsdb_syntax_error(NULL, NULL,
1397                                        "no table named %s in monitor session",
1398                                        node->name);
1399             goto error;
1400         }
1401
1402         mr_value = node->data;
1403         if (mr_value->type == JSON_ARRAY) {
1404             const struct json_array *array = &mr_value->u.array;
1405
1406             for (i = 0; i < array->n; i++) {
1407                 error = ovsdb_jsonrpc_parse_monitor_cond_change_request(
1408                                             m, table, array->elems[i]);
1409                 if (error) {
1410                     goto error;
1411                 }
1412             }
1413         } else {
1414             error = ovsdb_syntax_error(
1415                        NULL, NULL,
1416                        "table %s no monitor-cond-change JSON array",
1417                        node->name);
1418             goto error;
1419         }
1420     }
1421
1422     /* Change monitor id */
1423     hmap_remove(&s->monitors, &m->node);
1424     json_destroy(m->monitor_id);
1425     m->monitor_id = json_clone(params->u.array.elems[1]);
1426     hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
1427
1428     /* Send the new update, if any,  represents the difference from the old
1429      * condition and the new one. */
1430     struct json *update_json;
1431
1432     update_json = ovsdb_monitor_get_update(m->dbmon, false, true,
1433                                     &m->unflushed, m->condition, m->version);
1434     if (update_json) {
1435         struct jsonrpc_msg *msg;
1436         struct json *params;
1437
1438         params = json_array_create_2(json_clone(m->monitor_id), update_json);
1439         msg = ovsdb_jsonrpc_create_notify(m, params);
1440         jsonrpc_session_send(s->js, msg);
1441     }
1442
1443     return jsonrpc_create_reply(json_object_create(), request_id);
1444
1445 error:
1446
1447     json = ovsdb_error_to_json(error);
1448     ovsdb_error_destroy(error);
1449     return jsonrpc_create_error(json, request_id);
1450 }
1451
1452 static struct jsonrpc_msg *
1453 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1454                              struct json_array *params,
1455                              const struct json *request_id)
1456 {
1457     if (params->n != 1) {
1458         return jsonrpc_create_error(json_string_create("invalid parameters"),
1459                                     request_id);
1460     } else {
1461         struct ovsdb_jsonrpc_monitor *m;
1462
1463         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1464         if (!m) {
1465             return jsonrpc_create_error(json_string_create("unknown monitor"),
1466                                         request_id);
1467         } else {
1468             ovsdb_jsonrpc_monitor_destroy(m);
1469             return jsonrpc_create_reply(json_object_create(), request_id);
1470         }
1471     }
1472 }
1473
1474 static void
1475 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1476 {
1477     struct ovsdb_jsonrpc_monitor *m, *next;
1478
1479     HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1480         ovsdb_jsonrpc_monitor_destroy(m);
1481     }
1482 }
1483
1484 static struct json *
1485 ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
1486                                      bool initial)
1487 {
1488
1489     if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
1490         return NULL;
1491     }
1492
1493     return ovsdb_monitor_get_update(m->dbmon, initial, false,
1494                                     &m->unflushed, m->condition, m->version);
1495 }
1496
1497 static bool
1498 ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
1499 {
1500     struct ovsdb_jsonrpc_monitor *m;
1501
1502     HMAP_FOR_EACH (m, node, &s->monitors) {
1503         if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
1504             return true;
1505         }
1506     }
1507
1508     return false;
1509 }
1510
1511 void
1512 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor *m)
1513 {
1514     json_destroy(m->monitor_id);
1515     hmap_remove(&m->session->monitors, &m->node);
1516     ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
1517     ovsdb_monitor_session_condition_destroy(m->condition);
1518     free(m);
1519 }
1520
1521 static struct jsonrpc_msg *
1522 ovsdb_jsonrpc_create_notify(const struct ovsdb_jsonrpc_monitor *m,
1523                             struct json *params)
1524 {
1525     const char *method;
1526
1527     switch(m->version) {
1528     case OVSDB_MONITOR_V1:
1529         method = "update";
1530         break;
1531     case OVSDB_MONITOR_V2:
1532         method = "update2";
1533         break;
1534     case OVSDB_MONITOR_VERSION_MAX:
1535     default:
1536         OVS_NOT_REACHED();
1537     }
1538
1539     return jsonrpc_create_notify(method, params);
1540 }
1541
1542 static void
1543 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
1544 {
1545     struct ovsdb_jsonrpc_monitor *m;
1546
1547     HMAP_FOR_EACH (m, node, &s->monitors) {
1548         struct json *json;
1549
1550         json = ovsdb_jsonrpc_monitor_compose_update(m, false);
1551         if (json) {
1552             struct jsonrpc_msg *msg;
1553             struct json *params;
1554
1555             params = json_array_create_2(json_clone(m->monitor_id), json);
1556             msg = ovsdb_jsonrpc_create_notify(m, params);
1557             jsonrpc_session_send(s->js, msg);
1558         }
1559     }
1560 }
1561
1562 void
1563 ovsdb_jsonrpc_disable_monitor_cond(void)
1564 {
1565     /* Once disabled, it is not possible to re-enable it. */
1566     monitor_cond_enable__ = false;
1567 }