ovsdb-idlc: Add comments for remaining non-"set" non-static functions.
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "bitmap.h"
23 #include "column.h"
24 #include "dynamic-string.h"
25 #include "json.h"
26 #include "jsonrpc.h"
27 #include "ovsdb-error.h"
28 #include "ovsdb-parser.h"
29 #include "ovsdb.h"
30 #include "poll-loop.h"
31 #include "reconnect.h"
32 #include "row.h"
33 #include "server.h"
34 #include "simap.h"
35 #include "stream.h"
36 #include "table.h"
37 #include "timeval.h"
38 #include "transaction.h"
39 #include "trigger.h"
40 #include "openvswitch/vlog.h"
41
42 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
43
44 struct ovsdb_jsonrpc_remote;
45 struct ovsdb_jsonrpc_session;
46
47 /* Message rate-limiting. */
48 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
49
50 /* Sessions. */
51 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
52     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
53 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
54 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
55 static void ovsdb_jsonrpc_session_get_memory_usage_all(
56     const struct ovsdb_jsonrpc_remote *, struct simap *usage);
57 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
58 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
59 static void ovsdb_jsonrpc_session_set_all_options(
60     struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
61 static bool ovsdb_jsonrpc_session_get_status(
62     const struct ovsdb_jsonrpc_remote *,
63     struct ovsdb_jsonrpc_remote_status *);
64 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
65 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
66 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
67                                        struct jsonrpc_msg *);
68
69 /* Triggers. */
70 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
71                                          struct ovsdb *,
72                                          struct json *id, struct json *params);
73 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
74     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
75 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
76 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
77 static void ovsdb_jsonrpc_trigger_complete_done(
78     struct ovsdb_jsonrpc_session *);
79
80 /* Monitors. */
81 static struct json *ovsdb_jsonrpc_monitor_create(
82     struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params);
83 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
84     struct ovsdb_jsonrpc_session *,
85     struct json_array *params,
86     const struct json *request_id);
87 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
88 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
89 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
90 \f
91 /* JSON-RPC database server. */
92
93 struct ovsdb_jsonrpc_server {
94     struct ovsdb_server up;
95     unsigned int n_sessions, max_sessions;
96     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
97 };
98
99 /* A configured remote.  This is either a passive stream listener plus a list
100  * of the currently connected sessions, or a list of exactly one active
101  * session. */
102 struct ovsdb_jsonrpc_remote {
103     struct ovsdb_jsonrpc_server *server;
104     struct pstream *listener;   /* Listener, if passive. */
105     struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
106     uint8_t dscp;
107 };
108
109 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
110     struct ovsdb_jsonrpc_server *, const char *name,
111     const struct ovsdb_jsonrpc_options *options
112 );
113 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
114
115 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
116  *
117  * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
118  * which 'server' should provide access. */
119 struct ovsdb_jsonrpc_server *
120 ovsdb_jsonrpc_server_create(void)
121 {
122     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
123     ovsdb_server_init(&server->up);
124     server->max_sessions = 330;   /* Random limit. */
125     shash_init(&server->remotes);
126     return server;
127 }
128
129 /* Adds 'db' to the set of databases served out by 'svr'.  Returns true if
130  * successful, false if 'db''s name is the same as some database already in
131  * 'server'. */
132 bool
133 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *svr, struct ovsdb *db)
134 {
135     /* The OVSDB protocol doesn't have a way to notify a client that a
136      * database has been added.  If some client tried to use the database
137      * that we're adding and failed, then forcing it to reconnect seems like
138      * a reasonable way to make it try again.
139      *
140      * If this is too big of a hammer in practice, we could be more selective,
141      * e.g. disconnect only connections that actually tried to use a database
142      * with 'db''s name. */
143     ovsdb_jsonrpc_server_reconnect(svr);
144
145     return ovsdb_server_add_db(&svr->up, db);
146 }
147
148 /* Removes 'db' from the set of databases served out by 'svr'.  Returns
149  * true if successful, false if there is no database associated with 'db'. */
150 bool
151 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server *svr,
152                                struct ovsdb *db)
153 {
154     /* There might be pointers to 'db' from 'svr', such as monitors or
155      * outstanding transactions.  Disconnect all JSON-RPC connections to avoid
156      * accesses to freed memory.
157      *
158      * If this is too big of a hammer in practice, we could be more selective,
159      * e.g. disconnect only connections that actually reference 'db'. */
160     ovsdb_jsonrpc_server_reconnect(svr);
161
162     return ovsdb_server_remove_db(&svr->up, db);
163 }
164
165 void
166 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
167 {
168     struct shash_node *node, *next;
169
170     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
171         ovsdb_jsonrpc_server_del_remote(node);
172     }
173     shash_destroy(&svr->remotes);
174     ovsdb_server_destroy(&svr->up);
175     free(svr);
176 }
177
178 struct ovsdb_jsonrpc_options *
179 ovsdb_jsonrpc_default_options(const char *target)
180 {
181     struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
182     options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
183     options->probe_interval = (stream_or_pstream_needs_probes(target)
184                                ? RECONNECT_DEFAULT_PROBE_INTERVAL
185                                : 0);
186     return options;
187 }
188
189 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
190  * options in the struct ovsdb_jsonrpc_options supplied as the data values.
191  *
192  * A remote is an active or passive stream connection method, e.g. "pssl:" or
193  * "tcp:1.2.3.4". */
194 void
195 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
196                                  const struct shash *new_remotes)
197 {
198     struct shash_node *node, *next;
199
200     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
201         struct ovsdb_jsonrpc_remote *remote = node->data;
202         struct ovsdb_jsonrpc_options *options
203             = shash_find_data(new_remotes, node->name);
204
205         if (!options) {
206             VLOG_INFO("%s: remote deconfigured", node->name);
207             ovsdb_jsonrpc_server_del_remote(node);
208         } else if (options->dscp != remote->dscp) {
209             ovsdb_jsonrpc_server_del_remote(node);
210          }
211     }
212     SHASH_FOR_EACH (node, new_remotes) {
213         const struct ovsdb_jsonrpc_options *options = node->data;
214         struct ovsdb_jsonrpc_remote *remote;
215
216         remote = shash_find_data(&svr->remotes, node->name);
217         if (!remote) {
218             remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
219             if (!remote) {
220                 continue;
221             }
222         }
223
224         ovsdb_jsonrpc_session_set_all_options(remote, options);
225     }
226 }
227
228 static struct ovsdb_jsonrpc_remote *
229 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
230                                 const char *name,
231                                 const struct ovsdb_jsonrpc_options *options)
232 {
233     struct ovsdb_jsonrpc_remote *remote;
234     struct pstream *listener;
235     int error;
236
237     error = jsonrpc_pstream_open(name, &listener, options->dscp);
238     if (error && error != EAFNOSUPPORT) {
239         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, ovs_strerror(error));
240         return NULL;
241     }
242
243     remote = xmalloc(sizeof *remote);
244     remote->server = svr;
245     remote->listener = listener;
246     list_init(&remote->sessions);
247     remote->dscp = options->dscp;
248     shash_add(&svr->remotes, name, remote);
249
250     if (!listener) {
251         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name, true));
252     }
253     return remote;
254 }
255
256 static void
257 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
258 {
259     struct ovsdb_jsonrpc_remote *remote = node->data;
260
261     ovsdb_jsonrpc_session_close_all(remote);
262     pstream_close(remote->listener);
263     shash_delete(&remote->server->remotes, node);
264     free(remote);
265 }
266
267 /* Stores status information for the remote named 'target', which should have
268  * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
269  * into '*status'.  On success returns true, on failure (if 'svr' doesn't have
270  * a remote named 'target' or if that remote is an inbound remote that has no
271  * active connections) returns false.  On failure, 'status' will be zeroed.
272  */
273 bool
274 ovsdb_jsonrpc_server_get_remote_status(
275     const struct ovsdb_jsonrpc_server *svr, const char *target,
276     struct ovsdb_jsonrpc_remote_status *status)
277 {
278     const struct ovsdb_jsonrpc_remote *remote;
279
280     memset(status, 0, sizeof *status);
281
282     remote = shash_find_data(&svr->remotes, target);
283     return remote && ovsdb_jsonrpc_session_get_status(remote, status);
284 }
285
286 void
287 ovsdb_jsonrpc_server_free_remote_status(
288     struct ovsdb_jsonrpc_remote_status *status)
289 {
290     free(status->locks_held);
291     free(status->locks_waiting);
292     free(status->locks_lost);
293 }
294
295 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
296  * reconnect. */
297 void
298 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
299 {
300     struct shash_node *node;
301
302     SHASH_FOR_EACH (node, &svr->remotes) {
303         struct ovsdb_jsonrpc_remote *remote = node->data;
304
305         ovsdb_jsonrpc_session_reconnect_all(remote);
306     }
307 }
308
309 void
310 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
311 {
312     struct shash_node *node;
313
314     SHASH_FOR_EACH (node, &svr->remotes) {
315         struct ovsdb_jsonrpc_remote *remote = node->data;
316
317         if (remote->listener) {
318             if (svr->n_sessions < svr->max_sessions) {
319                 struct stream *stream;
320                 int error;
321
322                 error = pstream_accept(remote->listener, &stream);
323                 if (!error) {
324                     struct jsonrpc_session *js;
325                     js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
326                                                          remote->dscp);
327                     ovsdb_jsonrpc_session_create(remote, js);
328                 } else if (error != EAGAIN) {
329                     VLOG_WARN_RL(&rl, "%s: accept failed: %s",
330                                  pstream_get_name(remote->listener),
331                                  ovs_strerror(error));
332                 }
333             } else {
334                 VLOG_WARN_RL(&rl, "%s: connection exceeded maximum (%d)",
335                              pstream_get_name(remote->listener),
336                              svr->max_sessions);
337             }
338         }
339
340         ovsdb_jsonrpc_session_run_all(remote);
341     }
342 }
343
344 void
345 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
346 {
347     struct shash_node *node;
348
349     SHASH_FOR_EACH (node, &svr->remotes) {
350         struct ovsdb_jsonrpc_remote *remote = node->data;
351
352         if (remote->listener && svr->n_sessions < svr->max_sessions) {
353             pstream_wait(remote->listener);
354         }
355
356         ovsdb_jsonrpc_session_wait_all(remote);
357     }
358 }
359
360 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
361  * memory_report(). */
362 void
363 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
364                                       struct simap *usage)
365 {
366     struct shash_node *node;
367
368     simap_increase(usage, "sessions", svr->n_sessions);
369     SHASH_FOR_EACH (node, &svr->remotes) {
370         struct ovsdb_jsonrpc_remote *remote = node->data;
371
372         ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
373     }
374 }
375 \f
376 /* JSON-RPC database server session. */
377
378 struct ovsdb_jsonrpc_session {
379     struct ovs_list node;       /* Element in remote's sessions list. */
380     struct ovsdb_session up;
381     struct ovsdb_jsonrpc_remote *remote;
382
383     /* Triggers. */
384     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
385
386     /* Monitors. */
387     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
388
389     /* Network connectivity. */
390     struct jsonrpc_session *js;  /* JSON-RPC session. */
391     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
392 };
393
394 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
395 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
396 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
397 static void ovsdb_jsonrpc_session_get_memory_usage(
398     const struct ovsdb_jsonrpc_session *, struct simap *usage);
399 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
400                                              struct jsonrpc_msg *);
401 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
402                                              struct jsonrpc_msg *);
403
404 static struct ovsdb_jsonrpc_session *
405 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
406                              struct jsonrpc_session *js)
407 {
408     struct ovsdb_jsonrpc_session *s;
409
410     s = xzalloc(sizeof *s);
411     ovsdb_session_init(&s->up, &remote->server->up);
412     s->remote = remote;
413     list_push_back(&remote->sessions, &s->node);
414     hmap_init(&s->triggers);
415     hmap_init(&s->monitors);
416     s->js = js;
417     s->js_seqno = jsonrpc_session_get_seqno(js);
418
419     remote->server->n_sessions++;
420
421     return s;
422 }
423
424 static void
425 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
426 {
427     ovsdb_jsonrpc_monitor_remove_all(s);
428     ovsdb_jsonrpc_session_unlock_all(s);
429     ovsdb_jsonrpc_trigger_complete_all(s);
430
431     hmap_destroy(&s->monitors);
432     hmap_destroy(&s->triggers);
433
434     jsonrpc_session_close(s->js);
435     list_remove(&s->node);
436     s->remote->server->n_sessions--;
437     ovsdb_session_destroy(&s->up);
438     free(s);
439 }
440
441 static int
442 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
443 {
444     jsonrpc_session_run(s->js);
445     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
446         s->js_seqno = jsonrpc_session_get_seqno(s->js);
447         ovsdb_jsonrpc_trigger_complete_all(s);
448         ovsdb_jsonrpc_monitor_remove_all(s);
449         ovsdb_jsonrpc_session_unlock_all(s);
450     }
451
452     ovsdb_jsonrpc_trigger_complete_done(s);
453
454     if (!jsonrpc_session_get_backlog(s->js)) {
455         struct jsonrpc_msg *msg;
456
457         ovsdb_jsonrpc_monitor_flush_all(s);
458
459         msg = jsonrpc_session_recv(s->js);
460         if (msg) {
461             if (msg->type == JSONRPC_REQUEST) {
462                 ovsdb_jsonrpc_session_got_request(s, msg);
463             } else if (msg->type == JSONRPC_NOTIFY) {
464                 ovsdb_jsonrpc_session_got_notify(s, msg);
465             } else {
466                 VLOG_WARN("%s: received unexpected %s message",
467                           jsonrpc_session_get_name(s->js),
468                           jsonrpc_msg_type_to_string(msg->type));
469                 jsonrpc_session_force_reconnect(s->js);
470                 jsonrpc_msg_destroy(msg);
471             }
472         }
473     }
474     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
475 }
476
477 static void
478 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
479                                   const struct ovsdb_jsonrpc_options *options)
480 {
481     jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
482     jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
483     jsonrpc_session_set_dscp(session->js, options->dscp);
484 }
485
486 static void
487 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
488 {
489     struct ovsdb_jsonrpc_session *s, *next;
490
491     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
492         int error = ovsdb_jsonrpc_session_run(s);
493         if (error) {
494             ovsdb_jsonrpc_session_close(s);
495         }
496     }
497 }
498
499 static void
500 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
501 {
502     jsonrpc_session_wait(s->js);
503     if (!jsonrpc_session_get_backlog(s->js)) {
504         if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
505             poll_immediate_wake();
506         } else {
507             jsonrpc_session_recv_wait(s->js);
508         }
509     }
510 }
511
512 static void
513 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
514 {
515     struct ovsdb_jsonrpc_session *s;
516
517     LIST_FOR_EACH (s, node, &remote->sessions) {
518         ovsdb_jsonrpc_session_wait(s);
519     }
520 }
521
522 static void
523 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
524                                        struct simap *usage)
525 {
526     simap_increase(usage, "triggers", hmap_count(&s->triggers));
527     simap_increase(usage, "monitors", hmap_count(&s->monitors));
528     simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
529 }
530
531 static void
532 ovsdb_jsonrpc_session_get_memory_usage_all(
533     const struct ovsdb_jsonrpc_remote *remote,
534     struct simap *usage)
535 {
536     struct ovsdb_jsonrpc_session *s;
537
538     LIST_FOR_EACH (s, node, &remote->sessions) {
539         ovsdb_jsonrpc_session_get_memory_usage(s, usage);
540     }
541 }
542
543 static void
544 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
545 {
546     struct ovsdb_jsonrpc_session *s, *next;
547
548     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
549         ovsdb_jsonrpc_session_close(s);
550     }
551 }
552
553 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
554  * reconnect. */
555 static void
556 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
557 {
558     struct ovsdb_jsonrpc_session *s, *next;
559
560     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
561         jsonrpc_session_force_reconnect(s->js);
562         if (!jsonrpc_session_is_alive(s->js)) {
563             ovsdb_jsonrpc_session_close(s);
564         }
565     }
566 }
567
568 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
569  * 'options'.
570  *
571  * (The dscp value can't be changed directly; the caller must instead close and
572  * re-open the session.) */
573 static void
574 ovsdb_jsonrpc_session_set_all_options(
575     struct ovsdb_jsonrpc_remote *remote,
576     const struct ovsdb_jsonrpc_options *options)
577 {
578     struct ovsdb_jsonrpc_session *s;
579
580     LIST_FOR_EACH (s, node, &remote->sessions) {
581         ovsdb_jsonrpc_session_set_options(s, options);
582     }
583 }
584
585 static bool
586 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_remote *remote,
587                                  struct ovsdb_jsonrpc_remote_status *status)
588 {
589     const struct ovsdb_jsonrpc_session *s;
590     const struct jsonrpc_session *js;
591     struct ovsdb_lock_waiter *waiter;
592     struct reconnect_stats rstats;
593     struct ds locks_held, locks_waiting, locks_lost;
594
595     status->bound_port = (remote->listener
596                           ? pstream_get_bound_port(remote->listener)
597                           : htons(0));
598
599     if (list_is_empty(&remote->sessions)) {
600         return false;
601     }
602     s = CONTAINER_OF(remote->sessions.next, struct ovsdb_jsonrpc_session, node);
603     js = s->js;
604
605     status->is_connected = jsonrpc_session_is_connected(js);
606     status->last_error = jsonrpc_session_get_status(js);
607
608     jsonrpc_session_get_reconnect_stats(js, &rstats);
609     status->state = rstats.state;
610     status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
611         ? UINT_MAX : rstats.msec_since_connect / 1000;
612     status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
613         ? UINT_MAX : rstats.msec_since_disconnect / 1000;
614
615     ds_init(&locks_held);
616     ds_init(&locks_waiting);
617     ds_init(&locks_lost);
618     HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
619         struct ds *string;
620
621         string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
622                   : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
623                   : &locks_lost);
624         if (string->length) {
625             ds_put_char(string, ' ');
626         }
627         ds_put_cstr(string, waiter->lock_name);
628     }
629     status->locks_held = ds_steal_cstr(&locks_held);
630     status->locks_waiting = ds_steal_cstr(&locks_waiting);
631     status->locks_lost = ds_steal_cstr(&locks_lost);
632
633     status->n_connections = list_size(&remote->sessions);
634
635     return true;
636 }
637
638 /* Examines 'request' to determine the database to which it relates, and then
639  * searches 's' to find that database:
640  *
641  *    - If successful, returns the database and sets '*replyp' to NULL.
642  *
643  *    - If no such database exists, returns NULL and sets '*replyp' to an
644  *      appropriate JSON-RPC error reply, owned by the caller. */
645 static struct ovsdb *
646 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session *s,
647                         const struct jsonrpc_msg *request,
648                         struct jsonrpc_msg **replyp)
649 {
650     struct json_array *params;
651     struct ovsdb_error *error;
652     const char *db_name;
653     struct ovsdb *db;
654
655     params = json_array(request->params);
656     if (!params->n || params->elems[0]->type != JSON_STRING) {
657         error = ovsdb_syntax_error(
658             request->params, NULL,
659             "%s request params must begin with <db-name>", request->method);
660         goto error;
661     }
662
663     db_name = params->elems[0]->u.string;
664     db = shash_find_data(&s->up.server->dbs, db_name);
665     if (!db) {
666         error = ovsdb_syntax_error(
667             request->params, "unknown database",
668             "%s request specifies unknown database %s",
669             request->method, db_name);
670         goto error;
671     }
672
673     *replyp = NULL;
674     return db;
675
676 error:
677     *replyp = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
678     ovsdb_error_destroy(error);
679     return NULL;
680 }
681
682 static struct ovsdb_error *
683 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
684                                       const char **lock_namep)
685 {
686     const struct json_array *params;
687
688     params = json_array(request->params);
689     if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
690         !ovsdb_parser_is_id(json_string(params->elems[0]))) {
691         *lock_namep = NULL;
692         return ovsdb_syntax_error(request->params, NULL,
693                                   "%s request params must be <id>",
694                                   request->method);
695     }
696
697     *lock_namep = json_string(params->elems[0]);
698     return NULL;
699 }
700
701 static void
702 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
703                              const char *lock_name,
704                              const char *method)
705 {
706     struct ovsdb_jsonrpc_session *s;
707     struct json *params;
708
709     s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
710     params = json_array_create_1(json_string_create(lock_name));
711     ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params));
712 }
713
714 static struct jsonrpc_msg *
715 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
716                            struct jsonrpc_msg *request,
717                            enum ovsdb_lock_mode mode)
718 {
719     struct ovsdb_lock_waiter *waiter;
720     struct jsonrpc_msg *reply;
721     struct ovsdb_error *error;
722     struct ovsdb_session *victim;
723     const char *lock_name;
724     struct json *result;
725
726     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
727     if (error) {
728         goto error;
729     }
730
731     /* Report error if this session has issued a "lock" or "steal" without a
732      * matching "unlock" for this lock. */
733     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
734     if (waiter) {
735         error = ovsdb_syntax_error(
736             request->params, NULL,
737             "must issue \"unlock\" before new \"%s\"", request->method);
738         goto error;
739     }
740
741     /* Get the lock, add us as a waiter. */
742     waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
743                                &victim);
744     if (victim) {
745         ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
746     }
747
748     result = json_object_create();
749     json_object_put(result, "locked",
750                     json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
751
752     return jsonrpc_create_reply(result, request->id);
753
754 error:
755     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
756     ovsdb_error_destroy(error);
757     return reply;
758 }
759
760 static void
761 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
762 {
763     struct ovsdb_lock_waiter *waiter, *next;
764
765     HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
766         ovsdb_jsonrpc_session_unlock__(waiter);
767     }
768 }
769
770 static void
771 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
772 {
773     struct ovsdb_lock *lock = waiter->lock;
774
775     if (lock) {
776         struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
777         if (new_owner) {
778             ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
779         } else {
780             /* ovsdb_server_lock() might have freed 'lock'. */
781         }
782     }
783
784     ovsdb_lock_waiter_destroy(waiter);
785 }
786
787 static struct jsonrpc_msg *
788 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
789                              struct jsonrpc_msg *request)
790 {
791     struct ovsdb_lock_waiter *waiter;
792     struct jsonrpc_msg *reply;
793     struct ovsdb_error *error;
794     const char *lock_name;
795
796     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
797     if (error) {
798         goto error;
799     }
800
801     /* Report error if this session has not issued a "lock" or "steal" for this
802      * lock. */
803     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
804     if (!waiter) {
805         error = ovsdb_syntax_error(
806             request->params, NULL, "\"unlock\" without \"lock\" or \"steal\"");
807         goto error;
808     }
809
810     ovsdb_jsonrpc_session_unlock__(waiter);
811
812     return jsonrpc_create_reply(json_object_create(), request->id);
813
814 error:
815     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
816     ovsdb_error_destroy(error);
817     return reply;
818 }
819
820 static struct jsonrpc_msg *
821 execute_transaction(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
822                     struct jsonrpc_msg *request)
823 {
824     ovsdb_jsonrpc_trigger_create(s, db, request->id, request->params);
825     request->id = NULL;
826     request->params = NULL;
827     jsonrpc_msg_destroy(request);
828     return NULL;
829 }
830
831 static void
832 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
833                                   struct jsonrpc_msg *request)
834 {
835     struct jsonrpc_msg *reply;
836
837     if (!strcmp(request->method, "transact")) {
838         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
839         if (!reply) {
840             reply = execute_transaction(s, db, request);
841         }
842     } else if (!strcmp(request->method, "monitor")) {
843         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
844         if (!reply) {
845             reply = jsonrpc_create_reply(
846                 ovsdb_jsonrpc_monitor_create(s, db, request->params),
847                 request->id);
848         }
849     } else if (!strcmp(request->method, "monitor_cancel")) {
850         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
851                                              request->id);
852     } else if (!strcmp(request->method, "get_schema")) {
853         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
854         if (!reply) {
855             reply = jsonrpc_create_reply(ovsdb_schema_to_json(db->schema),
856                                          request->id);
857         }
858     } else if (!strcmp(request->method, "list_dbs")) {
859         size_t n_dbs = shash_count(&s->up.server->dbs);
860         struct shash_node *node;
861         struct json **dbs;
862         size_t i;
863
864         dbs = xmalloc(n_dbs * sizeof *dbs);
865         i = 0;
866         SHASH_FOR_EACH (node, &s->up.server->dbs) {
867             dbs[i++] = json_string_create(node->name);
868         }
869         reply = jsonrpc_create_reply(json_array_create(dbs, n_dbs),
870                                      request->id);
871     } else if (!strcmp(request->method, "lock")) {
872         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
873     } else if (!strcmp(request->method, "steal")) {
874         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
875     } else if (!strcmp(request->method, "unlock")) {
876         reply = ovsdb_jsonrpc_session_unlock(s, request);
877     } else if (!strcmp(request->method, "echo")) {
878         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
879     } else {
880         reply = jsonrpc_create_error(json_string_create("unknown method"),
881                                      request->id);
882     }
883
884     if (reply) {
885         jsonrpc_msg_destroy(request);
886         ovsdb_jsonrpc_session_send(s, reply);
887     }
888 }
889
890 static void
891 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
892 {
893     if (json_array(request->params)->n == 1) {
894         struct ovsdb_jsonrpc_trigger *t;
895         struct json *id;
896
897         id = request->params->u.array.elems[0];
898         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
899         if (t) {
900             ovsdb_jsonrpc_trigger_complete(t);
901         }
902     }
903 }
904
905 static void
906 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
907                                  struct jsonrpc_msg *request)
908 {
909     if (!strcmp(request->method, "cancel")) {
910         execute_cancel(s, request);
911     }
912     jsonrpc_msg_destroy(request);
913 }
914
915 static void
916 ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s,
917                            struct jsonrpc_msg *msg)
918 {
919     ovsdb_jsonrpc_monitor_flush_all(s);
920     jsonrpc_session_send(s->js, msg);
921 }
922 \f
923 /* JSON-RPC database server triggers.
924  *
925  * (Every transaction is treated as a trigger even if it doesn't actually have
926  * any "wait" operations.) */
927
928 struct ovsdb_jsonrpc_trigger {
929     struct ovsdb_trigger trigger;
930     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
931     struct json *id;
932 };
933
934 static void
935 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
936                              struct json *id, struct json *params)
937 {
938     struct ovsdb_jsonrpc_trigger *t;
939     size_t hash;
940
941     /* Check for duplicate ID. */
942     hash = json_hash(id, 0);
943     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
944     if (t) {
945         struct jsonrpc_msg *msg;
946
947         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
948                                    id);
949         ovsdb_jsonrpc_session_send(s, msg);
950         json_destroy(id);
951         json_destroy(params);
952         return;
953     }
954
955     /* Insert into trigger table. */
956     t = xmalloc(sizeof *t);
957     ovsdb_trigger_init(&s->up, db, &t->trigger, params, time_msec());
958     t->id = id;
959     hmap_insert(&s->triggers, &t->hmap_node, hash);
960
961     /* Complete early if possible. */
962     if (ovsdb_trigger_is_complete(&t->trigger)) {
963         ovsdb_jsonrpc_trigger_complete(t);
964     }
965 }
966
967 static struct ovsdb_jsonrpc_trigger *
968 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
969                            const struct json *id, size_t hash)
970 {
971     struct ovsdb_jsonrpc_trigger *t;
972
973     HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
974         if (json_equal(t->id, id)) {
975             return t;
976         }
977     }
978
979     return NULL;
980 }
981
982 static void
983 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
984 {
985     struct ovsdb_jsonrpc_session *s;
986
987     s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
988
989     if (jsonrpc_session_is_connected(s->js)) {
990         struct jsonrpc_msg *reply;
991         struct json *result;
992
993         result = ovsdb_trigger_steal_result(&t->trigger);
994         if (result) {
995             reply = jsonrpc_create_reply(result, t->id);
996         } else {
997             reply = jsonrpc_create_error(json_string_create("canceled"),
998                                          t->id);
999         }
1000         ovsdb_jsonrpc_session_send(s, reply);
1001     }
1002
1003     json_destroy(t->id);
1004     ovsdb_trigger_destroy(&t->trigger);
1005     hmap_remove(&s->triggers, &t->hmap_node);
1006     free(t);
1007 }
1008
1009 static void
1010 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
1011 {
1012     struct ovsdb_jsonrpc_trigger *t, *next;
1013     HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
1014         ovsdb_jsonrpc_trigger_complete(t);
1015     }
1016 }
1017
1018 static void
1019 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
1020 {
1021     while (!list_is_empty(&s->up.completions)) {
1022         struct ovsdb_jsonrpc_trigger *t
1023             = CONTAINER_OF(s->up.completions.next,
1024                            struct ovsdb_jsonrpc_trigger, trigger.node);
1025         ovsdb_jsonrpc_trigger_complete(t);
1026     }
1027 }
1028 \f
1029 /* JSON-RPC database table monitors. */
1030
1031 enum ovsdb_jsonrpc_monitor_selection {
1032     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
1033     OJMS_INSERT = 1 << 1,       /* New rows. */
1034     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
1035     OJMS_MODIFY = 1 << 3        /* Modified rows. */
1036 };
1037
1038 /* A particular column being monitored. */
1039 struct ovsdb_jsonrpc_monitor_column {
1040     const struct ovsdb_column *column;
1041     enum ovsdb_jsonrpc_monitor_selection select;
1042 };
1043
1044 /* A row that has changed in a monitored table. */
1045 struct ovsdb_jsonrpc_monitor_row {
1046     struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
1047     struct uuid uuid;           /* UUID of row that changed. */
1048     struct ovsdb_datum *old;    /* Old data, NULL for an inserted row. */
1049     struct ovsdb_datum *new;    /* New data, NULL for a deleted row. */
1050 };
1051
1052 /* A particular table being monitored. */
1053 struct ovsdb_jsonrpc_monitor_table {
1054     const struct ovsdb_table *table;
1055
1056     /* This is the union (bitwise-OR) of the 'select' values in all of the
1057      * members of 'columns' below. */
1058     enum ovsdb_jsonrpc_monitor_selection select;
1059
1060     /* Columns being monitored. */
1061     struct ovsdb_jsonrpc_monitor_column *columns;
1062     size_t n_columns;
1063
1064     /* Contains 'struct ovsdb_jsonrpc_monitor_row's for rows that have been
1065      * updated but not yet flushed to the jsonrpc connection. */
1066     struct hmap changes;
1067 };
1068
1069 /* A collection of tables being monitored. */
1070 struct ovsdb_jsonrpc_monitor {
1071     struct ovsdb_replica replica;
1072     struct ovsdb_jsonrpc_session *session;
1073     struct ovsdb *db;
1074     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
1075
1076     struct json *monitor_id;
1077     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
1078 };
1079
1080 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
1081
1082 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
1083     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
1084 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
1085 static struct json *ovsdb_jsonrpc_monitor_get_initial(
1086     const struct ovsdb_jsonrpc_monitor *);
1087
1088 static bool
1089 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
1090 {
1091     const struct json *json;
1092
1093     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
1094     return json ? json_boolean(json) : default_value;
1095 }
1096
1097 struct ovsdb_jsonrpc_monitor *
1098 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
1099                            const struct json *monitor_id)
1100 {
1101     struct ovsdb_jsonrpc_monitor *m;
1102
1103     HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
1104         if (json_equal(m->monitor_id, monitor_id)) {
1105             return m;
1106         }
1107     }
1108
1109     return NULL;
1110 }
1111
1112 static void
1113 ovsdb_jsonrpc_add_monitor_column(struct ovsdb_jsonrpc_monitor_table *mt,
1114                                  const struct ovsdb_column *column,
1115                                  enum ovsdb_jsonrpc_monitor_selection select,
1116                                  size_t *allocated_columns)
1117 {
1118     struct ovsdb_jsonrpc_monitor_column *c;
1119
1120     if (mt->n_columns >= *allocated_columns) {
1121         mt->columns = x2nrealloc(mt->columns, allocated_columns,
1122                                  sizeof *mt->columns);
1123     }
1124
1125     c = &mt->columns[mt->n_columns++];
1126     c->column = column;
1127     c->select = select;
1128 }
1129
1130 static int
1131 compare_ovsdb_jsonrpc_monitor_column(const void *a_, const void *b_)
1132 {
1133     const struct ovsdb_jsonrpc_monitor_column *a = a_;
1134     const struct ovsdb_jsonrpc_monitor_column *b = b_;
1135
1136     return a->column < b->column ? -1 : a->column > b->column;
1137 }
1138
1139 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
1140 ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
1141                                     const struct json *monitor_request,
1142                                     size_t *allocated_columns)
1143 {
1144     const struct ovsdb_table_schema *ts = mt->table->schema;
1145     enum ovsdb_jsonrpc_monitor_selection select;
1146     const struct json *columns, *select_json;
1147     struct ovsdb_parser parser;
1148     struct ovsdb_error *error;
1149
1150     ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
1151     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1152     select_json = ovsdb_parser_member(&parser, "select",
1153                                       OP_OBJECT | OP_OPTIONAL);
1154     error = ovsdb_parser_finish(&parser);
1155     if (error) {
1156         return error;
1157     }
1158
1159     if (select_json) {
1160         select = 0;
1161         ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1162         if (parse_bool(&parser, "initial", true)) {
1163             select |= OJMS_INITIAL;
1164         }
1165         if (parse_bool(&parser, "insert", true)) {
1166             select |= OJMS_INSERT;
1167         }
1168         if (parse_bool(&parser, "delete", true)) {
1169             select |= OJMS_DELETE;
1170         }
1171         if (parse_bool(&parser, "modify", true)) {
1172             select |= OJMS_MODIFY;
1173         }
1174         error = ovsdb_parser_finish(&parser);
1175         if (error) {
1176             return error;
1177         }
1178     } else {
1179         select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1180     }
1181     mt->select |= select;
1182
1183     if (columns) {
1184         size_t i;
1185
1186         if (columns->type != JSON_ARRAY) {
1187             return ovsdb_syntax_error(columns, NULL,
1188                                       "array of column names expected");
1189         }
1190
1191         for (i = 0; i < columns->u.array.n; i++) {
1192             const struct ovsdb_column *column;
1193             const char *s;
1194
1195             if (columns->u.array.elems[i]->type != JSON_STRING) {
1196                 return ovsdb_syntax_error(columns, NULL,
1197                                           "array of column names expected");
1198             }
1199
1200             s = columns->u.array.elems[i]->u.string;
1201             column = shash_find_data(&mt->table->schema->columns, s);
1202             if (!column) {
1203                 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1204                                           "column name", s);
1205             }
1206             ovsdb_jsonrpc_add_monitor_column(mt, column, select,
1207                                              allocated_columns);
1208         }
1209     } else {
1210         struct shash_node *node;
1211
1212         SHASH_FOR_EACH (node, &ts->columns) {
1213             const struct ovsdb_column *column = node->data;
1214             if (column->index != OVSDB_COL_UUID) {
1215                 ovsdb_jsonrpc_add_monitor_column(mt, column, select,
1216                                                  allocated_columns);
1217             }
1218         }
1219     }
1220
1221     return NULL;
1222 }
1223
1224 static struct json *
1225 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1226                              struct json *params)
1227 {
1228     struct ovsdb_jsonrpc_monitor *m = NULL;
1229     struct json *monitor_id, *monitor_requests;
1230     struct ovsdb_error *error = NULL;
1231     struct shash_node *node;
1232     struct json *json;
1233
1234     if (json_array(params)->n != 3) {
1235         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1236         goto error;
1237     }
1238     monitor_id = params->u.array.elems[1];
1239     monitor_requests = params->u.array.elems[2];
1240     if (monitor_requests->type != JSON_OBJECT) {
1241         error = ovsdb_syntax_error(monitor_requests, NULL,
1242                                    "monitor-requests must be object");
1243         goto error;
1244     }
1245
1246     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1247         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1248         goto error;
1249     }
1250
1251     m = xzalloc(sizeof *m);
1252     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
1253     ovsdb_add_replica(db, &m->replica);
1254     m->session = s;
1255     m->db = db;
1256     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1257     m->monitor_id = json_clone(monitor_id);
1258     shash_init(&m->tables);
1259
1260     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1261         const struct ovsdb_table *table;
1262         struct ovsdb_jsonrpc_monitor_table *mt;
1263         size_t allocated_columns;
1264         const struct json *mr_value;
1265         size_t i;
1266
1267         table = ovsdb_get_table(m->db, node->name);
1268         if (!table) {
1269             error = ovsdb_syntax_error(NULL, NULL,
1270                                        "no table named %s", node->name);
1271             goto error;
1272         }
1273
1274         mt = xzalloc(sizeof *mt);
1275         mt->table = table;
1276         hmap_init(&mt->changes);
1277         shash_add(&m->tables, table->schema->name, mt);
1278
1279         /* Parse columns. */
1280         mr_value = node->data;
1281         allocated_columns = 0;
1282         if (mr_value->type == JSON_ARRAY) {
1283             const struct json_array *array = &mr_value->u.array;
1284
1285             for (i = 0; i < array->n; i++) {
1286                 error = ovsdb_jsonrpc_parse_monitor_request(
1287                     mt, array->elems[i], &allocated_columns);
1288                 if (error) {
1289                     goto error;
1290                 }
1291             }
1292         } else {
1293             error = ovsdb_jsonrpc_parse_monitor_request(
1294                 mt, mr_value, &allocated_columns);
1295             if (error) {
1296                 goto error;
1297             }
1298         }
1299
1300         /* Check for duplicate columns. */
1301         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
1302               compare_ovsdb_jsonrpc_monitor_column);
1303         for (i = 1; i < mt->n_columns; i++) {
1304             if (mt->columns[i].column == mt->columns[i - 1].column) {
1305                 error = ovsdb_syntax_error(mr_value, NULL, "column %s "
1306                                            "mentioned more than once",
1307                                            mt->columns[i].column->name);
1308                 goto error;
1309             }
1310         }
1311     }
1312
1313     return ovsdb_jsonrpc_monitor_get_initial(m);
1314
1315 error:
1316     if (m) {
1317         ovsdb_remove_replica(m->db, &m->replica);
1318     }
1319
1320     json = ovsdb_error_to_json(error);
1321     ovsdb_error_destroy(error);
1322     return json;
1323 }
1324
1325 static struct jsonrpc_msg *
1326 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1327                              struct json_array *params,
1328                              const struct json *request_id)
1329 {
1330     if (params->n != 1) {
1331         return jsonrpc_create_error(json_string_create("invalid parameters"),
1332                                     request_id);
1333     } else {
1334         struct ovsdb_jsonrpc_monitor *m;
1335
1336         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1337         if (!m) {
1338             return jsonrpc_create_error(json_string_create("unknown monitor"),
1339                                         request_id);
1340         } else {
1341             ovsdb_remove_replica(m->db, &m->replica);
1342             return jsonrpc_create_reply(json_object_create(), request_id);
1343         }
1344     }
1345 }
1346
1347 static void
1348 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1349 {
1350     struct ovsdb_jsonrpc_monitor *m, *next;
1351
1352     HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1353         ovsdb_remove_replica(m->db, &m->replica);
1354     }
1355 }
1356
1357 static struct ovsdb_jsonrpc_monitor *
1358 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
1359 {
1360     ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
1361     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
1362 }
1363
1364 struct ovsdb_jsonrpc_monitor_aux {
1365     const struct ovsdb_jsonrpc_monitor *monitor;
1366     struct ovsdb_jsonrpc_monitor_table *mt;
1367 };
1368
1369 /* Finds and returns the ovsdb_jsonrpc_monitor_row in 'mt->changes' for the
1370  * given 'uuid', or NULL if there is no such row. */
1371 static struct ovsdb_jsonrpc_monitor_row *
1372 ovsdb_jsonrpc_monitor_row_find(const struct ovsdb_jsonrpc_monitor_table *mt,
1373                                const struct uuid *uuid)
1374 {
1375     struct ovsdb_jsonrpc_monitor_row *row;
1376
1377     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &mt->changes) {
1378         if (uuid_equals(uuid, &row->uuid)) {
1379             return row;
1380         }
1381     }
1382     return NULL;
1383 }
1384
1385 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
1386  * copies of the data in 'row' drawn from the columns represented by
1387  * mt->columns[].  Returns the array.
1388  *
1389  * If 'row' is NULL, returns NULL. */
1390 static struct ovsdb_datum *
1391 clone_monitor_row_data(const struct ovsdb_jsonrpc_monitor_table *mt,
1392                        const struct ovsdb_row *row)
1393 {
1394     struct ovsdb_datum *data;
1395     size_t i;
1396
1397     if (!row) {
1398         return NULL;
1399     }
1400
1401     data = xmalloc(mt->n_columns * sizeof *data);
1402     for (i = 0; i < mt->n_columns; i++) {
1403         const struct ovsdb_column *c = mt->columns[i].column;
1404         const struct ovsdb_datum *src = &row->fields[c->index];
1405         struct ovsdb_datum *dst = &data[i];
1406         const struct ovsdb_type *type = &c->type;
1407
1408         ovsdb_datum_clone(dst, src, type);
1409     }
1410     return data;
1411 }
1412
1413 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
1414  * in 'row' drawn from the columns represented by mt->columns[]. */
1415 static void
1416 update_monitor_row_data(const struct ovsdb_jsonrpc_monitor_table *mt,
1417                         const struct ovsdb_row *row,
1418                         struct ovsdb_datum *data)
1419 {
1420     size_t i;
1421
1422     for (i = 0; i < mt->n_columns; i++) {
1423         const struct ovsdb_column *c = mt->columns[i].column;
1424         const struct ovsdb_datum *src = &row->fields[c->index];
1425         struct ovsdb_datum *dst = &data[i];
1426         const struct ovsdb_type *type = &c->type;
1427
1428         if (!ovsdb_datum_equals(src, dst, type)) {
1429             ovsdb_datum_destroy(dst, type);
1430             ovsdb_datum_clone(dst, src, type);
1431         }
1432     }
1433 }
1434
1435 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
1436  * from mt->columns[], plus 'data' itself. */
1437 static void
1438 free_monitor_row_data(const struct ovsdb_jsonrpc_monitor_table *mt,
1439                       struct ovsdb_datum *data)
1440 {
1441     if (data) {
1442         size_t i;
1443
1444         for (i = 0; i < mt->n_columns; i++) {
1445             const struct ovsdb_column *c = mt->columns[i].column;
1446
1447             ovsdb_datum_destroy(&data[i], &c->type);
1448         }
1449         free(data);
1450     }
1451 }
1452
1453 /* Frees 'row', which must have been created from 'mt'. */
1454 static void
1455 ovsdb_jsonrpc_monitor_row_destroy(const struct ovsdb_jsonrpc_monitor_table *mt,
1456                                   struct ovsdb_jsonrpc_monitor_row *row)
1457 {
1458     if (row) {
1459         free_monitor_row_data(mt, row->old);
1460         free_monitor_row_data(mt, row->new);
1461         free(row);
1462     }
1463 }
1464
1465 static bool
1466 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
1467                                 const struct ovsdb_row *new,
1468                                 const unsigned long int *changed OVS_UNUSED,
1469                                 void *aux_)
1470 {
1471     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
1472     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
1473     struct ovsdb_table *table = new ? new->table : old->table;
1474     const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
1475     struct ovsdb_jsonrpc_monitor_row *change;
1476     struct ovsdb_jsonrpc_monitor_table *mt;
1477
1478     if (!aux->mt || table != aux->mt->table) {
1479         aux->mt = shash_find_data(&m->tables, table->schema->name);
1480         if (!aux->mt) {
1481             /* We don't care about rows in this table at all.  Tell the caller
1482              * to skip it.  */
1483             return false;
1484         }
1485     }
1486     mt = aux->mt;
1487
1488     change = ovsdb_jsonrpc_monitor_row_find(mt, uuid);
1489     if (!change) {
1490         change = xmalloc(sizeof *change);
1491         hmap_insert(&mt->changes, &change->hmap_node, uuid_hash(uuid));
1492         change->uuid = *uuid;
1493         change->old = clone_monitor_row_data(mt, old);
1494         change->new = clone_monitor_row_data(mt, new);
1495     } else {
1496         if (new) {
1497             update_monitor_row_data(mt, new, change->new);
1498         } else {
1499             free_monitor_row_data(mt, change->new);
1500             change->new = NULL;
1501
1502             if (!change->old) {
1503                 /* This row was added then deleted.  Forget about it. */
1504                 hmap_remove(&mt->changes, &change->hmap_node);
1505                 free(change);
1506             }
1507         }
1508     }
1509     return true;
1510 }
1511
1512 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
1513  * 'mt', or NULL if no row update should be sent.
1514  *
1515  * The caller should specify 'initial' as true if the returned JSON is going to
1516  * be used as part of the initial reply to a "monitor" request, false if it is
1517  * going to be used as part of an "update" notification.
1518  *
1519  * 'changed' must be a scratch buffer for internal use that is at least
1520  * bitmap_n_bytes(mt->n_columns) bytes long. */
1521 static struct json *
1522 ovsdb_jsonrpc_monitor_compose_row_update(
1523     const struct ovsdb_jsonrpc_monitor_table *mt,
1524     const struct ovsdb_jsonrpc_monitor_row *row,
1525     bool initial, unsigned long int *changed)
1526 {
1527     enum ovsdb_jsonrpc_monitor_selection type;
1528     struct json *old_json, *new_json;
1529     struct json *row_json;
1530     size_t i;
1531
1532     type = (initial ? OJMS_INITIAL
1533             : !row->old ? OJMS_INSERT
1534             : !row->new ? OJMS_DELETE
1535             : OJMS_MODIFY);
1536     if (!(mt->select & type)) {
1537         return NULL;
1538     }
1539
1540     if (type == OJMS_MODIFY) {
1541         size_t n_changes;
1542
1543         n_changes = 0;
1544         memset(changed, 0, bitmap_n_bytes(mt->n_columns));
1545         for (i = 0; i < mt->n_columns; i++) {
1546             const struct ovsdb_column *c = mt->columns[i].column;
1547             if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
1548                 bitmap_set1(changed, i);
1549                 n_changes++;
1550             }
1551         }
1552         if (!n_changes) {
1553             /* No actual changes: presumably a row changed and then
1554              * changed back later. */
1555             return NULL;
1556         }
1557     }
1558
1559     row_json = json_object_create();
1560     old_json = new_json = NULL;
1561     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
1562         old_json = json_object_create();
1563         json_object_put(row_json, "old", old_json);
1564     }
1565     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1566         new_json = json_object_create();
1567         json_object_put(row_json, "new", new_json);
1568     }
1569     for (i = 0; i < mt->n_columns; i++) {
1570         const struct ovsdb_jsonrpc_monitor_column *c = &mt->columns[i];
1571
1572         if (!(type & c->select)) {
1573             /* We don't care about this type of change for this
1574              * particular column (but we will care about it for some
1575              * other column). */
1576             continue;
1577         }
1578
1579         if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
1580             || type == OJMS_DELETE) {
1581             json_object_put(old_json, c->column->name,
1582                             ovsdb_datum_to_json(&row->old[i],
1583                                                 &c->column->type));
1584         }
1585         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1586             json_object_put(new_json, c->column->name,
1587                             ovsdb_datum_to_json(&row->new[i],
1588                                                 &c->column->type));
1589         }
1590     }
1591
1592     return row_json;
1593 }
1594
1595 /* Constructs and returns JSON for a <table-updates> object (as described in
1596  * RFC 7047) for all the outstanding changes within 'monitor', and deletes all
1597  * the outstanding changes from 'monitor'.  Returns NULL if no update needs to
1598  * be sent.
1599  *
1600  * The caller should specify 'initial' as true if the returned JSON is going to
1601  * be used as part of the initial reply to a "monitor" request, false if it is
1602  * going to be used as part of an "update" notification. */
1603 static struct json *
1604 ovsdb_jsonrpc_monitor_compose_table_update(
1605     const struct ovsdb_jsonrpc_monitor *monitor, bool initial)
1606 {
1607     struct shash_node *node;
1608     unsigned long int *changed;
1609     struct json *json;
1610     size_t max_columns;
1611
1612     max_columns = 0;
1613     SHASH_FOR_EACH (node, &monitor->tables) {
1614         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1615
1616         max_columns = MAX(max_columns, mt->n_columns);
1617     }
1618     changed = xmalloc(bitmap_n_bytes(max_columns));
1619
1620     json = NULL;
1621     SHASH_FOR_EACH (node, &monitor->tables) {
1622         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1623         struct ovsdb_jsonrpc_monitor_row *row, *next;
1624         struct json *table_json = NULL;
1625
1626         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
1627             struct json *row_json;
1628
1629             row_json = ovsdb_jsonrpc_monitor_compose_row_update(
1630                 mt, row, initial, changed);
1631             if (row_json) {
1632                 char uuid[UUID_LEN + 1];
1633
1634                 /* Create JSON object for transaction overall. */
1635                 if (!json) {
1636                     json = json_object_create();
1637                 }
1638
1639                 /* Create JSON object for transaction on this table. */
1640                 if (!table_json) {
1641                     table_json = json_object_create();
1642                     json_object_put(json, mt->table->schema->name, table_json);
1643                 }
1644
1645                 /* Add JSON row to JSON table. */
1646                 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
1647                 json_object_put(table_json, uuid, row_json);
1648             }
1649
1650             hmap_remove(&mt->changes, &row->hmap_node);
1651             ovsdb_jsonrpc_monitor_row_destroy(mt, row);
1652         }
1653     }
1654
1655     free(changed);
1656
1657     return json;
1658 }
1659
1660 static bool
1661 ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
1662 {
1663     struct ovsdb_jsonrpc_monitor *m;
1664
1665     HMAP_FOR_EACH (m, node, &s->monitors) {
1666         struct shash_node *node;
1667
1668         SHASH_FOR_EACH (node, &m->tables) {
1669             struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1670
1671             if (!hmap_is_empty(&mt->changes)) {
1672                 return true;
1673             }
1674         }
1675     }
1676
1677     return false;
1678 }
1679
1680 static void
1681 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
1682 {
1683     struct ovsdb_jsonrpc_monitor *m;
1684
1685     HMAP_FOR_EACH (m, node, &s->monitors) {
1686         struct json *json;
1687
1688         json = ovsdb_jsonrpc_monitor_compose_table_update(m, false);
1689         if (json) {
1690             struct jsonrpc_msg *msg;
1691             struct json *params;
1692
1693             params = json_array_create_2(json_clone(m->monitor_id), json);
1694             msg = jsonrpc_create_notify("update", params);
1695             jsonrpc_session_send(s->js, msg);
1696         }
1697     }
1698 }
1699
1700 static void
1701 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
1702                                const struct ovsdb_jsonrpc_monitor *m)
1703 {
1704     aux->monitor = m;
1705     aux->mt = NULL;
1706 }
1707
1708 static struct ovsdb_error *
1709 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
1710                              const struct ovsdb_txn *txn,
1711                              bool durable OVS_UNUSED)
1712 {
1713     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1714     struct ovsdb_jsonrpc_monitor_aux aux;
1715
1716     ovsdb_jsonrpc_monitor_init_aux(&aux, m);
1717     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
1718
1719     return NULL;
1720 }
1721
1722 static struct json *
1723 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
1724 {
1725     struct ovsdb_jsonrpc_monitor_aux aux;
1726     struct shash_node *node;
1727     struct json *json;
1728
1729     ovsdb_jsonrpc_monitor_init_aux(&aux, m);
1730     SHASH_FOR_EACH (node, &m->tables) {
1731         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1732
1733         if (mt->select & OJMS_INITIAL) {
1734             struct ovsdb_row *row;
1735
1736             HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1737                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, NULL, &aux);
1738             }
1739         }
1740     }
1741     json = ovsdb_jsonrpc_monitor_compose_table_update(m, true);
1742     return json ? json : json_object_create();
1743 }
1744
1745 static void
1746 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
1747 {
1748     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1749     struct shash_node *node;
1750
1751     json_destroy(m->monitor_id);
1752     SHASH_FOR_EACH (node, &m->tables) {
1753         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1754         struct ovsdb_jsonrpc_monitor_row *row, *next;
1755
1756         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
1757             hmap_remove(&mt->changes, &row->hmap_node);
1758             ovsdb_jsonrpc_monitor_row_destroy(mt, row);
1759         }
1760         hmap_destroy(&mt->changes);
1761
1762         free(mt->columns);
1763         free(mt);
1764     }
1765     shash_destroy(&m->tables);
1766     hmap_remove(&m->session->monitors, &m->node);
1767     free(m);
1768 }
1769
1770 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1771     ovsdb_jsonrpc_monitor_commit,
1772     ovsdb_jsonrpc_monitor_destroy
1773 };