df495b0aa1303d213764f0e71af41c6620e233a1
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "bitmap.h"
23 #include "column.h"
24 #include "dynamic-string.h"
25 #include "json.h"
26 #include "jsonrpc.h"
27 #include "ovsdb-error.h"
28 #include "ovsdb-parser.h"
29 #include "ovsdb.h"
30 #include "poll-loop.h"
31 #include "reconnect.h"
32 #include "row.h"
33 #include "server.h"
34 #include "simap.h"
35 #include "stream.h"
36 #include "table.h"
37 #include "timeval.h"
38 #include "transaction.h"
39 #include "trigger.h"
40 #include "openvswitch/vlog.h"
41
42 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
43
44 struct ovsdb_jsonrpc_remote;
45 struct ovsdb_jsonrpc_session;
46
47 /* Message rate-limiting. */
48 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
49
50 /* Sessions. */
51 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
52     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
53 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
54 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
55 static void ovsdb_jsonrpc_session_get_memory_usage_all(
56     const struct ovsdb_jsonrpc_remote *, struct simap *usage);
57 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
58 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
59 static void ovsdb_jsonrpc_session_set_all_options(
60     struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
61 static bool ovsdb_jsonrpc_session_get_status(
62     const struct ovsdb_jsonrpc_remote *,
63     struct ovsdb_jsonrpc_remote_status *);
64 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
65 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
66 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
67                                        struct jsonrpc_msg *);
68
69 /* Triggers. */
70 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
71                                          struct ovsdb *,
72                                          struct json *id, struct json *params);
73 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
74     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
75 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
76 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
77 static void ovsdb_jsonrpc_trigger_complete_done(
78     struct ovsdb_jsonrpc_session *);
79
80 /* Monitors. */
81 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
82     struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
83     const struct json *request_id);
84 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
85     struct ovsdb_jsonrpc_session *,
86     struct json_array *params,
87     const struct json *request_id);
88 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
89 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
90 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
91 \f
92 /* JSON-RPC database server. */
93
94 struct ovsdb_jsonrpc_server {
95     struct ovsdb_server up;
96     unsigned int n_sessions, max_sessions;
97     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
98 };
99
100 /* A configured remote.  This is either a passive stream listener plus a list
101  * of the currently connected sessions, or a list of exactly one active
102  * session. */
103 struct ovsdb_jsonrpc_remote {
104     struct ovsdb_jsonrpc_server *server;
105     struct pstream *listener;   /* Listener, if passive. */
106     struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
107     uint8_t dscp;
108 };
109
110 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
111     struct ovsdb_jsonrpc_server *, const char *name,
112     const struct ovsdb_jsonrpc_options *options
113 );
114 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
115
116 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
117  *
118  * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
119  * which 'server' should provide access. */
120 struct ovsdb_jsonrpc_server *
121 ovsdb_jsonrpc_server_create(void)
122 {
123     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
124     ovsdb_server_init(&server->up);
125     server->max_sessions = 330;   /* Random limit. */
126     shash_init(&server->remotes);
127     return server;
128 }
129
130 /* Adds 'db' to the set of databases served out by 'svr'.  Returns true if
131  * successful, false if 'db''s name is the same as some database already in
132  * 'server'. */
133 bool
134 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *svr, struct ovsdb *db)
135 {
136     /* The OVSDB protocol doesn't have a way to notify a client that a
137      * database has been added.  If some client tried to use the database
138      * that we're adding and failed, then forcing it to reconnect seems like
139      * a reasonable way to make it try again.
140      *
141      * If this is too big of a hammer in practice, we could be more selective,
142      * e.g. disconnect only connections that actually tried to use a database
143      * with 'db''s name. */
144     ovsdb_jsonrpc_server_reconnect(svr);
145
146     return ovsdb_server_add_db(&svr->up, db);
147 }
148
149 /* Removes 'db' from the set of databases served out by 'svr'.  Returns
150  * true if successful, false if there is no database associated with 'db'. */
151 bool
152 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server *svr,
153                                struct ovsdb *db)
154 {
155     /* There might be pointers to 'db' from 'svr', such as monitors or
156      * outstanding transactions.  Disconnect all JSON-RPC connections to avoid
157      * accesses to freed memory.
158      *
159      * If this is too big of a hammer in practice, we could be more selective,
160      * e.g. disconnect only connections that actually reference 'db'. */
161     ovsdb_jsonrpc_server_reconnect(svr);
162
163     return ovsdb_server_remove_db(&svr->up, db);
164 }
165
166 void
167 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
168 {
169     struct shash_node *node, *next;
170
171     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
172         ovsdb_jsonrpc_server_del_remote(node);
173     }
174     shash_destroy(&svr->remotes);
175     ovsdb_server_destroy(&svr->up);
176     free(svr);
177 }
178
179 struct ovsdb_jsonrpc_options *
180 ovsdb_jsonrpc_default_options(const char *target)
181 {
182     struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
183     options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
184     options->probe_interval = (stream_or_pstream_needs_probes(target)
185                                ? RECONNECT_DEFAULT_PROBE_INTERVAL
186                                : 0);
187     return options;
188 }
189
190 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
191  * options in the struct ovsdb_jsonrpc_options supplied as the data values.
192  *
193  * A remote is an active or passive stream connection method, e.g. "pssl:" or
194  * "tcp:1.2.3.4". */
195 void
196 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
197                                  const struct shash *new_remotes)
198 {
199     struct shash_node *node, *next;
200
201     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
202         struct ovsdb_jsonrpc_remote *remote = node->data;
203         struct ovsdb_jsonrpc_options *options
204             = shash_find_data(new_remotes, node->name);
205
206         if (!options) {
207             VLOG_INFO("%s: remote deconfigured", node->name);
208             ovsdb_jsonrpc_server_del_remote(node);
209         } else if (options->dscp != remote->dscp) {
210             ovsdb_jsonrpc_server_del_remote(node);
211          }
212     }
213     SHASH_FOR_EACH (node, new_remotes) {
214         const struct ovsdb_jsonrpc_options *options = node->data;
215         struct ovsdb_jsonrpc_remote *remote;
216
217         remote = shash_find_data(&svr->remotes, node->name);
218         if (!remote) {
219             remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
220             if (!remote) {
221                 continue;
222             }
223         }
224
225         ovsdb_jsonrpc_session_set_all_options(remote, options);
226     }
227 }
228
229 static struct ovsdb_jsonrpc_remote *
230 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
231                                 const char *name,
232                                 const struct ovsdb_jsonrpc_options *options)
233 {
234     struct ovsdb_jsonrpc_remote *remote;
235     struct pstream *listener;
236     int error;
237
238     error = jsonrpc_pstream_open(name, &listener, options->dscp);
239     if (error && error != EAFNOSUPPORT) {
240         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, ovs_strerror(error));
241         return NULL;
242     }
243
244     remote = xmalloc(sizeof *remote);
245     remote->server = svr;
246     remote->listener = listener;
247     list_init(&remote->sessions);
248     remote->dscp = options->dscp;
249     shash_add(&svr->remotes, name, remote);
250
251     if (!listener) {
252         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name, true));
253     }
254     return remote;
255 }
256
257 static void
258 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
259 {
260     struct ovsdb_jsonrpc_remote *remote = node->data;
261
262     ovsdb_jsonrpc_session_close_all(remote);
263     pstream_close(remote->listener);
264     shash_delete(&remote->server->remotes, node);
265     free(remote);
266 }
267
268 /* Stores status information for the remote named 'target', which should have
269  * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
270  * into '*status'.  On success returns true, on failure (if 'svr' doesn't have
271  * a remote named 'target' or if that remote is an inbound remote that has no
272  * active connections) returns false.  On failure, 'status' will be zeroed.
273  */
274 bool
275 ovsdb_jsonrpc_server_get_remote_status(
276     const struct ovsdb_jsonrpc_server *svr, const char *target,
277     struct ovsdb_jsonrpc_remote_status *status)
278 {
279     const struct ovsdb_jsonrpc_remote *remote;
280
281     memset(status, 0, sizeof *status);
282
283     remote = shash_find_data(&svr->remotes, target);
284     return remote && ovsdb_jsonrpc_session_get_status(remote, status);
285 }
286
287 void
288 ovsdb_jsonrpc_server_free_remote_status(
289     struct ovsdb_jsonrpc_remote_status *status)
290 {
291     free(status->locks_held);
292     free(status->locks_waiting);
293     free(status->locks_lost);
294 }
295
296 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
297  * reconnect. */
298 void
299 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
300 {
301     struct shash_node *node;
302
303     SHASH_FOR_EACH (node, &svr->remotes) {
304         struct ovsdb_jsonrpc_remote *remote = node->data;
305
306         ovsdb_jsonrpc_session_reconnect_all(remote);
307     }
308 }
309
310 void
311 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
312 {
313     struct shash_node *node;
314
315     SHASH_FOR_EACH (node, &svr->remotes) {
316         struct ovsdb_jsonrpc_remote *remote = node->data;
317
318         if (remote->listener) {
319             if (svr->n_sessions < svr->max_sessions) {
320                 struct stream *stream;
321                 int error;
322
323                 error = pstream_accept(remote->listener, &stream);
324                 if (!error) {
325                     struct jsonrpc_session *js;
326                     js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
327                                                          remote->dscp);
328                     ovsdb_jsonrpc_session_create(remote, js);
329                 } else if (error != EAGAIN) {
330                     VLOG_WARN_RL(&rl, "%s: accept failed: %s",
331                                  pstream_get_name(remote->listener),
332                                  ovs_strerror(error));
333                 }
334             } else {
335                 VLOG_WARN_RL(&rl, "%s: connection exceeded maximum (%d)",
336                              pstream_get_name(remote->listener),
337                              svr->max_sessions);
338             }
339         }
340
341         ovsdb_jsonrpc_session_run_all(remote);
342     }
343 }
344
345 void
346 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
347 {
348     struct shash_node *node;
349
350     SHASH_FOR_EACH (node, &svr->remotes) {
351         struct ovsdb_jsonrpc_remote *remote = node->data;
352
353         if (remote->listener && svr->n_sessions < svr->max_sessions) {
354             pstream_wait(remote->listener);
355         }
356
357         ovsdb_jsonrpc_session_wait_all(remote);
358     }
359 }
360
361 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
362  * memory_report(). */
363 void
364 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
365                                       struct simap *usage)
366 {
367     struct shash_node *node;
368
369     simap_increase(usage, "sessions", svr->n_sessions);
370     SHASH_FOR_EACH (node, &svr->remotes) {
371         struct ovsdb_jsonrpc_remote *remote = node->data;
372
373         ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
374     }
375 }
376 \f
377 /* JSON-RPC database server session. */
378
379 struct ovsdb_jsonrpc_session {
380     struct ovs_list node;       /* Element in remote's sessions list. */
381     struct ovsdb_session up;
382     struct ovsdb_jsonrpc_remote *remote;
383
384     /* Triggers. */
385     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
386
387     /* Monitors. */
388     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
389
390     /* Network connectivity. */
391     struct jsonrpc_session *js;  /* JSON-RPC session. */
392     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
393 };
394
395 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
396 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
397 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
398 static void ovsdb_jsonrpc_session_get_memory_usage(
399     const struct ovsdb_jsonrpc_session *, struct simap *usage);
400 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
401                                              struct jsonrpc_msg *);
402 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
403                                              struct jsonrpc_msg *);
404
405 static struct ovsdb_jsonrpc_session *
406 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
407                              struct jsonrpc_session *js)
408 {
409     struct ovsdb_jsonrpc_session *s;
410
411     s = xzalloc(sizeof *s);
412     ovsdb_session_init(&s->up, &remote->server->up);
413     s->remote = remote;
414     list_push_back(&remote->sessions, &s->node);
415     hmap_init(&s->triggers);
416     hmap_init(&s->monitors);
417     s->js = js;
418     s->js_seqno = jsonrpc_session_get_seqno(js);
419
420     remote->server->n_sessions++;
421
422     return s;
423 }
424
425 static void
426 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
427 {
428     ovsdb_jsonrpc_monitor_remove_all(s);
429     ovsdb_jsonrpc_session_unlock_all(s);
430     ovsdb_jsonrpc_trigger_complete_all(s);
431
432     hmap_destroy(&s->monitors);
433     hmap_destroy(&s->triggers);
434
435     jsonrpc_session_close(s->js);
436     list_remove(&s->node);
437     s->remote->server->n_sessions--;
438     ovsdb_session_destroy(&s->up);
439     free(s);
440 }
441
442 static int
443 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
444 {
445     jsonrpc_session_run(s->js);
446     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
447         s->js_seqno = jsonrpc_session_get_seqno(s->js);
448         ovsdb_jsonrpc_trigger_complete_all(s);
449         ovsdb_jsonrpc_monitor_remove_all(s);
450         ovsdb_jsonrpc_session_unlock_all(s);
451     }
452
453     ovsdb_jsonrpc_trigger_complete_done(s);
454
455     if (!jsonrpc_session_get_backlog(s->js)) {
456         struct jsonrpc_msg *msg;
457
458         ovsdb_jsonrpc_monitor_flush_all(s);
459
460         msg = jsonrpc_session_recv(s->js);
461         if (msg) {
462             if (msg->type == JSONRPC_REQUEST) {
463                 ovsdb_jsonrpc_session_got_request(s, msg);
464             } else if (msg->type == JSONRPC_NOTIFY) {
465                 ovsdb_jsonrpc_session_got_notify(s, msg);
466             } else {
467                 VLOG_WARN("%s: received unexpected %s message",
468                           jsonrpc_session_get_name(s->js),
469                           jsonrpc_msg_type_to_string(msg->type));
470                 jsonrpc_session_force_reconnect(s->js);
471                 jsonrpc_msg_destroy(msg);
472             }
473         }
474     }
475     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
476 }
477
478 static void
479 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
480                                   const struct ovsdb_jsonrpc_options *options)
481 {
482     jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
483     jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
484     jsonrpc_session_set_dscp(session->js, options->dscp);
485 }
486
487 static void
488 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
489 {
490     struct ovsdb_jsonrpc_session *s, *next;
491
492     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
493         int error = ovsdb_jsonrpc_session_run(s);
494         if (error) {
495             ovsdb_jsonrpc_session_close(s);
496         }
497     }
498 }
499
500 static void
501 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
502 {
503     jsonrpc_session_wait(s->js);
504     if (!jsonrpc_session_get_backlog(s->js)) {
505         if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
506             poll_immediate_wake();
507         } else {
508             jsonrpc_session_recv_wait(s->js);
509         }
510     }
511 }
512
513 static void
514 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
515 {
516     struct ovsdb_jsonrpc_session *s;
517
518     LIST_FOR_EACH (s, node, &remote->sessions) {
519         ovsdb_jsonrpc_session_wait(s);
520     }
521 }
522
523 static void
524 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
525                                        struct simap *usage)
526 {
527     simap_increase(usage, "triggers", hmap_count(&s->triggers));
528     simap_increase(usage, "monitors", hmap_count(&s->monitors));
529     simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
530 }
531
532 static void
533 ovsdb_jsonrpc_session_get_memory_usage_all(
534     const struct ovsdb_jsonrpc_remote *remote,
535     struct simap *usage)
536 {
537     struct ovsdb_jsonrpc_session *s;
538
539     LIST_FOR_EACH (s, node, &remote->sessions) {
540         ovsdb_jsonrpc_session_get_memory_usage(s, usage);
541     }
542 }
543
544 static void
545 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
546 {
547     struct ovsdb_jsonrpc_session *s, *next;
548
549     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
550         ovsdb_jsonrpc_session_close(s);
551     }
552 }
553
554 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
555  * reconnect. */
556 static void
557 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
558 {
559     struct ovsdb_jsonrpc_session *s, *next;
560
561     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
562         jsonrpc_session_force_reconnect(s->js);
563         if (!jsonrpc_session_is_alive(s->js)) {
564             ovsdb_jsonrpc_session_close(s);
565         }
566     }
567 }
568
569 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
570  * 'options'.
571  *
572  * (The dscp value can't be changed directly; the caller must instead close and
573  * re-open the session.) */
574 static void
575 ovsdb_jsonrpc_session_set_all_options(
576     struct ovsdb_jsonrpc_remote *remote,
577     const struct ovsdb_jsonrpc_options *options)
578 {
579     struct ovsdb_jsonrpc_session *s;
580
581     LIST_FOR_EACH (s, node, &remote->sessions) {
582         ovsdb_jsonrpc_session_set_options(s, options);
583     }
584 }
585
586 static bool
587 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_remote *remote,
588                                  struct ovsdb_jsonrpc_remote_status *status)
589 {
590     const struct ovsdb_jsonrpc_session *s;
591     const struct jsonrpc_session *js;
592     struct ovsdb_lock_waiter *waiter;
593     struct reconnect_stats rstats;
594     struct ds locks_held, locks_waiting, locks_lost;
595
596     status->bound_port = (remote->listener
597                           ? pstream_get_bound_port(remote->listener)
598                           : htons(0));
599
600     if (list_is_empty(&remote->sessions)) {
601         return false;
602     }
603     s = CONTAINER_OF(remote->sessions.next, struct ovsdb_jsonrpc_session, node);
604     js = s->js;
605
606     status->is_connected = jsonrpc_session_is_connected(js);
607     status->last_error = jsonrpc_session_get_status(js);
608
609     jsonrpc_session_get_reconnect_stats(js, &rstats);
610     status->state = rstats.state;
611     status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
612         ? UINT_MAX : rstats.msec_since_connect / 1000;
613     status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
614         ? UINT_MAX : rstats.msec_since_disconnect / 1000;
615
616     ds_init(&locks_held);
617     ds_init(&locks_waiting);
618     ds_init(&locks_lost);
619     HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
620         struct ds *string;
621
622         string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
623                   : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
624                   : &locks_lost);
625         if (string->length) {
626             ds_put_char(string, ' ');
627         }
628         ds_put_cstr(string, waiter->lock_name);
629     }
630     status->locks_held = ds_steal_cstr(&locks_held);
631     status->locks_waiting = ds_steal_cstr(&locks_waiting);
632     status->locks_lost = ds_steal_cstr(&locks_lost);
633
634     status->n_connections = list_size(&remote->sessions);
635
636     return true;
637 }
638
639 /* Examines 'request' to determine the database to which it relates, and then
640  * searches 's' to find that database:
641  *
642  *    - If successful, returns the database and sets '*replyp' to NULL.
643  *
644  *    - If no such database exists, returns NULL and sets '*replyp' to an
645  *      appropriate JSON-RPC error reply, owned by the caller. */
646 static struct ovsdb *
647 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session *s,
648                         const struct jsonrpc_msg *request,
649                         struct jsonrpc_msg **replyp)
650 {
651     struct json_array *params;
652     struct ovsdb_error *error;
653     const char *db_name;
654     struct ovsdb *db;
655
656     params = json_array(request->params);
657     if (!params->n || params->elems[0]->type != JSON_STRING) {
658         error = ovsdb_syntax_error(
659             request->params, NULL,
660             "%s request params must begin with <db-name>", request->method);
661         goto error;
662     }
663
664     db_name = params->elems[0]->u.string;
665     db = shash_find_data(&s->up.server->dbs, db_name);
666     if (!db) {
667         error = ovsdb_syntax_error(
668             request->params, "unknown database",
669             "%s request specifies unknown database %s",
670             request->method, db_name);
671         goto error;
672     }
673
674     *replyp = NULL;
675     return db;
676
677 error:
678     *replyp = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
679     ovsdb_error_destroy(error);
680     return NULL;
681 }
682
683 static struct ovsdb_error *
684 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
685                                       const char **lock_namep)
686 {
687     const struct json_array *params;
688
689     params = json_array(request->params);
690     if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
691         !ovsdb_parser_is_id(json_string(params->elems[0]))) {
692         *lock_namep = NULL;
693         return ovsdb_syntax_error(request->params, NULL,
694                                   "%s request params must be <id>",
695                                   request->method);
696     }
697
698     *lock_namep = json_string(params->elems[0]);
699     return NULL;
700 }
701
702 static void
703 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
704                              const char *lock_name,
705                              const char *method)
706 {
707     struct ovsdb_jsonrpc_session *s;
708     struct json *params;
709
710     s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
711     params = json_array_create_1(json_string_create(lock_name));
712     ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params));
713 }
714
715 static struct jsonrpc_msg *
716 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
717                            struct jsonrpc_msg *request,
718                            enum ovsdb_lock_mode mode)
719 {
720     struct ovsdb_lock_waiter *waiter;
721     struct jsonrpc_msg *reply;
722     struct ovsdb_error *error;
723     struct ovsdb_session *victim;
724     const char *lock_name;
725     struct json *result;
726
727     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
728     if (error) {
729         goto error;
730     }
731
732     /* Report error if this session has issued a "lock" or "steal" without a
733      * matching "unlock" for this lock. */
734     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
735     if (waiter) {
736         error = ovsdb_syntax_error(
737             request->params, NULL,
738             "must issue \"unlock\" before new \"%s\"", request->method);
739         goto error;
740     }
741
742     /* Get the lock, add us as a waiter. */
743     waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
744                                &victim);
745     if (victim) {
746         ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
747     }
748
749     result = json_object_create();
750     json_object_put(result, "locked",
751                     json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
752
753     return jsonrpc_create_reply(result, request->id);
754
755 error:
756     reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
757     ovsdb_error_destroy(error);
758     return reply;
759 }
760
761 static void
762 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
763 {
764     struct ovsdb_lock_waiter *waiter, *next;
765
766     HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
767         ovsdb_jsonrpc_session_unlock__(waiter);
768     }
769 }
770
771 static void
772 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
773 {
774     struct ovsdb_lock *lock = waiter->lock;
775
776     if (lock) {
777         struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
778         if (new_owner) {
779             ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
780         } else {
781             /* ovsdb_server_lock() might have freed 'lock'. */
782         }
783     }
784
785     ovsdb_lock_waiter_destroy(waiter);
786 }
787
788 static struct jsonrpc_msg *
789 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
790                              struct jsonrpc_msg *request)
791 {
792     struct ovsdb_lock_waiter *waiter;
793     struct jsonrpc_msg *reply;
794     struct ovsdb_error *error;
795     const char *lock_name;
796
797     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
798     if (error) {
799         goto error;
800     }
801
802     /* Report error if this session has not issued a "lock" or "steal" for this
803      * lock. */
804     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
805     if (!waiter) {
806         error = ovsdb_syntax_error(
807             request->params, NULL, "\"unlock\" without \"lock\" or \"steal\"");
808         goto error;
809     }
810
811     ovsdb_jsonrpc_session_unlock__(waiter);
812
813     return jsonrpc_create_reply(json_object_create(), request->id);
814
815 error:
816     reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
817     ovsdb_error_destroy(error);
818     return reply;
819 }
820
821 static struct jsonrpc_msg *
822 execute_transaction(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
823                     struct jsonrpc_msg *request)
824 {
825     ovsdb_jsonrpc_trigger_create(s, db, request->id, request->params);
826     request->id = NULL;
827     request->params = NULL;
828     jsonrpc_msg_destroy(request);
829     return NULL;
830 }
831
832 static void
833 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
834                                   struct jsonrpc_msg *request)
835 {
836     struct jsonrpc_msg *reply;
837
838     if (!strcmp(request->method, "transact")) {
839         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
840         if (!reply) {
841             reply = execute_transaction(s, db, request);
842         }
843     } else if (!strcmp(request->method, "monitor")) {
844         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
845         if (!reply) {
846             reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
847                                                  request->id);
848         }
849     } else if (!strcmp(request->method, "monitor_cancel")) {
850         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
851                                              request->id);
852     } else if (!strcmp(request->method, "get_schema")) {
853         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
854         if (!reply) {
855             reply = jsonrpc_create_reply(ovsdb_schema_to_json(db->schema),
856                                          request->id);
857         }
858     } else if (!strcmp(request->method, "list_dbs")) {
859         size_t n_dbs = shash_count(&s->up.server->dbs);
860         struct shash_node *node;
861         struct json **dbs;
862         size_t i;
863
864         dbs = xmalloc(n_dbs * sizeof *dbs);
865         i = 0;
866         SHASH_FOR_EACH (node, &s->up.server->dbs) {
867             dbs[i++] = json_string_create(node->name);
868         }
869         reply = jsonrpc_create_reply(json_array_create(dbs, n_dbs),
870                                      request->id);
871     } else if (!strcmp(request->method, "lock")) {
872         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
873     } else if (!strcmp(request->method, "steal")) {
874         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
875     } else if (!strcmp(request->method, "unlock")) {
876         reply = ovsdb_jsonrpc_session_unlock(s, request);
877     } else if (!strcmp(request->method, "echo")) {
878         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
879     } else {
880         reply = jsonrpc_create_error(json_string_create("unknown method"),
881                                      request->id);
882     }
883
884     if (reply) {
885         jsonrpc_msg_destroy(request);
886         ovsdb_jsonrpc_session_send(s, reply);
887     }
888 }
889
890 static void
891 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
892 {
893     if (json_array(request->params)->n == 1) {
894         struct ovsdb_jsonrpc_trigger *t;
895         struct json *id;
896
897         id = request->params->u.array.elems[0];
898         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
899         if (t) {
900             ovsdb_jsonrpc_trigger_complete(t);
901         }
902     }
903 }
904
905 static void
906 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
907                                  struct jsonrpc_msg *request)
908 {
909     if (!strcmp(request->method, "cancel")) {
910         execute_cancel(s, request);
911     }
912     jsonrpc_msg_destroy(request);
913 }
914
915 static void
916 ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s,
917                            struct jsonrpc_msg *msg)
918 {
919     ovsdb_jsonrpc_monitor_flush_all(s);
920     jsonrpc_session_send(s->js, msg);
921 }
922 \f
923 /* JSON-RPC database server triggers.
924  *
925  * (Every transaction is treated as a trigger even if it doesn't actually have
926  * any "wait" operations.) */
927
928 struct ovsdb_jsonrpc_trigger {
929     struct ovsdb_trigger trigger;
930     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
931     struct json *id;
932 };
933
934 static void
935 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
936                              struct json *id, struct json *params)
937 {
938     struct ovsdb_jsonrpc_trigger *t;
939     size_t hash;
940
941     /* Check for duplicate ID. */
942     hash = json_hash(id, 0);
943     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
944     if (t) {
945         struct jsonrpc_msg *msg;
946
947         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
948                                    id);
949         ovsdb_jsonrpc_session_send(s, msg);
950         json_destroy(id);
951         json_destroy(params);
952         return;
953     }
954
955     /* Insert into trigger table. */
956     t = xmalloc(sizeof *t);
957     ovsdb_trigger_init(&s->up, db, &t->trigger, params, time_msec());
958     t->id = id;
959     hmap_insert(&s->triggers, &t->hmap_node, hash);
960
961     /* Complete early if possible. */
962     if (ovsdb_trigger_is_complete(&t->trigger)) {
963         ovsdb_jsonrpc_trigger_complete(t);
964     }
965 }
966
967 static struct ovsdb_jsonrpc_trigger *
968 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
969                            const struct json *id, size_t hash)
970 {
971     struct ovsdb_jsonrpc_trigger *t;
972
973     HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
974         if (json_equal(t->id, id)) {
975             return t;
976         }
977     }
978
979     return NULL;
980 }
981
982 static void
983 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
984 {
985     struct ovsdb_jsonrpc_session *s;
986
987     s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
988
989     if (jsonrpc_session_is_connected(s->js)) {
990         struct jsonrpc_msg *reply;
991         struct json *result;
992
993         result = ovsdb_trigger_steal_result(&t->trigger);
994         if (result) {
995             reply = jsonrpc_create_reply(result, t->id);
996         } else {
997             reply = jsonrpc_create_error(json_string_create("canceled"),
998                                          t->id);
999         }
1000         ovsdb_jsonrpc_session_send(s, reply);
1001     }
1002
1003     json_destroy(t->id);
1004     ovsdb_trigger_destroy(&t->trigger);
1005     hmap_remove(&s->triggers, &t->hmap_node);
1006     free(t);
1007 }
1008
1009 static void
1010 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
1011 {
1012     struct ovsdb_jsonrpc_trigger *t, *next;
1013     HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
1014         ovsdb_jsonrpc_trigger_complete(t);
1015     }
1016 }
1017
1018 static void
1019 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
1020 {
1021     while (!list_is_empty(&s->up.completions)) {
1022         struct ovsdb_jsonrpc_trigger *t
1023             = CONTAINER_OF(s->up.completions.next,
1024                            struct ovsdb_jsonrpc_trigger, trigger.node);
1025         ovsdb_jsonrpc_trigger_complete(t);
1026     }
1027 }
1028 \f
1029 /* database table monitors. */
1030
1031 enum ovsdb_monitor_selection {
1032     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
1033     OJMS_INSERT = 1 << 1,       /* New rows. */
1034     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
1035     OJMS_MODIFY = 1 << 3        /* Modified rows. */
1036 };
1037
1038 /* A particular column being monitored. */
1039 struct ovsdb_monitor_column {
1040     const struct ovsdb_column *column;
1041     enum ovsdb_monitor_selection select;
1042 };
1043
1044 /* A row that has changed in a monitored table. */
1045 struct ovsdb_monitor_row {
1046     struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
1047     struct uuid uuid;           /* UUID of row that changed. */
1048     struct ovsdb_datum *old;    /* Old data, NULL for an inserted row. */
1049     struct ovsdb_datum *new;    /* New data, NULL for a deleted row. */
1050 };
1051
1052 /* A particular table being monitored. */
1053 struct ovsdb_monitor_table {
1054     const struct ovsdb_table *table;
1055
1056     /* This is the union (bitwise-OR) of the 'select' values in all of the
1057      * members of 'columns' below. */
1058     enum ovsdb_monitor_selection select;
1059
1060     /* Columns being monitored. */
1061     struct ovsdb_monitor_column *columns;
1062     size_t n_columns;
1063
1064     /* Contains 'struct ovsdb_monitor_row's for rows that have been
1065      * updated but not yet flushed to the jsonrpc connection. */
1066     struct hmap changes;
1067 };
1068
1069 struct ovsdb_jsonrpc_monitor;
1070 /*  Backend monitor.
1071  *
1072  *  ovsdb_monitor keep track of the ovsdb changes.
1073  */
1074 /* A collection of tables being monitored. */
1075 struct ovsdb_monitor {
1076     struct ovsdb_replica replica;
1077     struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
1078     struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
1079 };
1080
1081 /* Jsonrpc front end monitor. */
1082 struct ovsdb_jsonrpc_monitor {
1083     struct ovsdb_jsonrpc_session *session;
1084     struct ovsdb *db;
1085     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
1086
1087     struct json *monitor_id;
1088     struct ovsdb_monitor *dbmon;
1089 };
1090
1091 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
1092
1093 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
1094     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
1095 static void ovsdb_monitor_destroy(struct ovsdb_replica *);
1096 static struct json *ovsdb_monitor_get_initial(
1097     const struct ovsdb_monitor *);
1098
1099 static bool
1100 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
1101 {
1102     const struct json *json;
1103
1104     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
1105     return json ? json_boolean(json) : default_value;
1106 }
1107
1108 struct ovsdb_jsonrpc_monitor *
1109 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
1110                            const struct json *monitor_id)
1111 {
1112     struct ovsdb_jsonrpc_monitor *m;
1113
1114     HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
1115         if (json_equal(m->monitor_id, monitor_id)) {
1116             return m;
1117         }
1118     }
1119
1120     return NULL;
1121 }
1122
1123 static void
1124 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
1125                          const struct ovsdb_table *table,
1126                          const struct ovsdb_column *column,
1127                          enum ovsdb_monitor_selection select,
1128                          size_t *allocated_columns)
1129 {
1130     struct ovsdb_monitor_table *mt;
1131     struct ovsdb_monitor_column *c;
1132
1133     mt = shash_find_data(&dbmon->tables, table->schema->name);
1134
1135     if (mt->n_columns >= *allocated_columns) {
1136         mt->columns = x2nrealloc(mt->columns, allocated_columns,
1137                                  sizeof *mt->columns);
1138     }
1139
1140     c = &mt->columns[mt->n_columns++];
1141     c->column = column;
1142     c->select = select;
1143 }
1144
1145 static int
1146 compare_ovsdb_monitor_column(const void *a_, const void *b_)
1147 {
1148     const struct ovsdb_monitor_column *a = a_;
1149     const struct ovsdb_monitor_column *b = b_;
1150
1151     return a->column < b->column ? -1 : a->column > b->column;
1152 }
1153
1154 static void
1155 ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
1156                                const struct ovsdb_table *table,
1157                                enum ovsdb_monitor_selection select)
1158 {
1159     struct ovsdb_monitor_table * mt;
1160     mt = shash_find_data(&dbmon->tables, table->schema->name);
1161     mt->select |= select;
1162 }
1163
1164 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
1165 ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_monitor *dbmon,
1166                                     const struct ovsdb_table *table,
1167                                     const struct json *monitor_request,
1168                                     size_t *allocated_columns)
1169 {
1170     const struct ovsdb_table_schema *ts = table->schema;
1171     enum ovsdb_monitor_selection select;
1172     const struct json *columns, *select_json;
1173     struct ovsdb_parser parser;
1174     struct ovsdb_error *error;
1175
1176     ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
1177     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1178     select_json = ovsdb_parser_member(&parser, "select",
1179                                       OP_OBJECT | OP_OPTIONAL);
1180     error = ovsdb_parser_finish(&parser);
1181     if (error) {
1182         return error;
1183     }
1184
1185     if (select_json) {
1186         select = 0;
1187         ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1188         if (parse_bool(&parser, "initial", true)) {
1189             select |= OJMS_INITIAL;
1190         }
1191         if (parse_bool(&parser, "insert", true)) {
1192             select |= OJMS_INSERT;
1193         }
1194         if (parse_bool(&parser, "delete", true)) {
1195             select |= OJMS_DELETE;
1196         }
1197         if (parse_bool(&parser, "modify", true)) {
1198             select |= OJMS_MODIFY;
1199         }
1200         error = ovsdb_parser_finish(&parser);
1201         if (error) {
1202             return error;
1203         }
1204     } else {
1205         select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1206     }
1207
1208     ovsdb_monitor_table_add_select(dbmon, table, select);
1209     if (columns) {
1210         size_t i;
1211
1212         if (columns->type != JSON_ARRAY) {
1213             return ovsdb_syntax_error(columns, NULL,
1214                                       "array of column names expected");
1215         }
1216
1217         for (i = 0; i < columns->u.array.n; i++) {
1218             const struct ovsdb_column *column;
1219             const char *s;
1220
1221             if (columns->u.array.elems[i]->type != JSON_STRING) {
1222                 return ovsdb_syntax_error(columns, NULL,
1223                                           "array of column names expected");
1224             }
1225
1226             s = columns->u.array.elems[i]->u.string;
1227             column = shash_find_data(&table->schema->columns, s);
1228             if (!column) {
1229                 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1230                                           "column name", s);
1231             }
1232             ovsdb_monitor_add_column(dbmon, table, column, select,
1233                                      allocated_columns);
1234         }
1235     } else {
1236         struct shash_node *node;
1237
1238         SHASH_FOR_EACH (node, &ts->columns) {
1239             const struct ovsdb_column *column = node->data;
1240             if (column->index != OVSDB_COL_UUID) {
1241                 ovsdb_monitor_add_column(dbmon, table, column, select,
1242                                          allocated_columns);
1243             }
1244         }
1245     }
1246
1247     return NULL;
1248 }
1249
1250 static struct ovsdb_monitor *
1251 ovsdb_monitor_create(struct ovsdb *db,
1252                      struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
1253                      const struct ovsdb_replica_class *replica_class)
1254 {
1255     struct ovsdb_monitor *m;
1256
1257     m = xzalloc(sizeof *m);
1258
1259     ovsdb_replica_init(&m->replica, replica_class);
1260     ovsdb_add_replica(db, &m->replica);
1261     m->jsonrpc_monitor = jsonrpc_monitor;
1262     shash_init(&m->tables);
1263
1264     return m;
1265 }
1266
1267 static void
1268 ovsdb_monitor_add_table(struct ovsdb_monitor *m,
1269                         const struct ovsdb_table *table)
1270 {
1271     struct ovsdb_monitor_table *mt;
1272
1273     mt = xzalloc(sizeof *mt);
1274     mt->table = table;
1275     hmap_init(&mt->changes);
1276     shash_add(&m->tables, table->schema->name, mt);
1277 }
1278
1279 /* Check for duplicated column names. Return the first
1280  * duplicated column's name if found. Otherwise return
1281  * NULL.  */
1282 static const char * OVS_WARN_UNUSED_RESULT
1283 ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
1284                           const struct ovsdb_table *table)
1285 {
1286     struct ovsdb_monitor_table *mt;
1287     int i;
1288
1289     mt = shash_find_data(&m->tables, table->schema->name);
1290
1291     if (mt) {
1292         /* Check for duplicate columns. */
1293         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
1294               compare_ovsdb_monitor_column);
1295         for (i = 1; i < mt->n_columns; i++) {
1296             if (mt->columns[i].column == mt->columns[i - 1].column) {
1297                 return mt->columns[i].column->name;
1298             }
1299         }
1300     }
1301
1302     return NULL;
1303 }
1304
1305 static struct jsonrpc_msg *
1306 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1307                              struct json *params,
1308                              const struct json *request_id)
1309 {
1310     struct ovsdb_jsonrpc_monitor *m = NULL;
1311     struct json *monitor_id, *monitor_requests;
1312     struct ovsdb_error *error = NULL;
1313     struct shash_node *node;
1314     struct json *json;
1315
1316     if (json_array(params)->n != 3) {
1317         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1318         goto error;
1319     }
1320     monitor_id = params->u.array.elems[1];
1321     monitor_requests = params->u.array.elems[2];
1322     if (monitor_requests->type != JSON_OBJECT) {
1323         error = ovsdb_syntax_error(monitor_requests, NULL,
1324                                    "monitor-requests must be object");
1325         goto error;
1326     }
1327
1328     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1329         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1330         goto error;
1331     }
1332
1333     m = xzalloc(sizeof *m);
1334     m->session = s;
1335     m->db = db;
1336     m->dbmon = ovsdb_monitor_create(db, m, &ovsdb_jsonrpc_replica_class);
1337     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1338     m->monitor_id = json_clone(monitor_id);
1339
1340     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1341         const struct ovsdb_table *table;
1342         const char *column_name;
1343         size_t allocated_columns;
1344         const struct json *mr_value;
1345         size_t i;
1346
1347         table = ovsdb_get_table(m->db, node->name);
1348         if (!table) {
1349             error = ovsdb_syntax_error(NULL, NULL,
1350                                        "no table named %s", node->name);
1351             goto error;
1352         }
1353
1354         ovsdb_monitor_add_table(m->dbmon, table);
1355
1356         /* Parse columns. */
1357         mr_value = node->data;
1358         allocated_columns = 0;
1359         if (mr_value->type == JSON_ARRAY) {
1360             const struct json_array *array = &mr_value->u.array;
1361
1362             for (i = 0; i < array->n; i++) {
1363                 error = ovsdb_jsonrpc_parse_monitor_request(
1364                     m->dbmon, table, array->elems[i], &allocated_columns);
1365                 if (error) {
1366                     goto error;
1367                 }
1368             }
1369         } else {
1370             error = ovsdb_jsonrpc_parse_monitor_request(
1371                 m->dbmon, table, mr_value, &allocated_columns);
1372             if (error) {
1373                 goto error;
1374             }
1375         }
1376
1377         column_name = ovsdb_monitor_table_check_duplicates(m->dbmon, table);
1378
1379         if (column_name) {
1380             error = ovsdb_syntax_error(mr_value, NULL, "column %s "
1381                                        "mentioned more than once",
1382                                         column_name);
1383             goto error;
1384         }
1385     }
1386
1387     return jsonrpc_create_reply(ovsdb_monitor_get_initial(m->dbmon),
1388                                 request_id);
1389
1390 error:
1391     if (m) {
1392         ovsdb_remove_replica(m->db, &m->dbmon->replica);
1393     }
1394
1395     json = ovsdb_error_to_json(error);
1396     ovsdb_error_destroy(error);
1397     return jsonrpc_create_error(json, request_id);
1398 }
1399
1400 static struct jsonrpc_msg *
1401 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1402                              struct json_array *params,
1403                              const struct json *request_id)
1404 {
1405     if (params->n != 1) {
1406         return jsonrpc_create_error(json_string_create("invalid parameters"),
1407                                     request_id);
1408     } else {
1409         struct ovsdb_jsonrpc_monitor *m;
1410
1411         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1412         if (!m) {
1413             return jsonrpc_create_error(json_string_create("unknown monitor"),
1414                                         request_id);
1415         } else {
1416             ovsdb_remove_replica(m->db, &m->dbmon->replica);
1417             return jsonrpc_create_reply(json_object_create(), request_id);
1418         }
1419     }
1420 }
1421
1422 static void
1423 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1424 {
1425     struct ovsdb_jsonrpc_monitor *m, *next;
1426
1427     HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1428         ovsdb_remove_replica(m->db, &m->dbmon->replica);
1429     }
1430 }
1431
1432 static struct ovsdb_monitor *
1433 ovsdb_monitor_cast(struct ovsdb_replica *replica)
1434 {
1435     ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
1436     return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
1437 }
1438
1439 struct ovsdb_monitor_aux {
1440     const struct ovsdb_monitor *monitor;
1441     struct ovsdb_monitor_table *mt;
1442 };
1443
1444 /* Finds and returns the ovsdb_monitor_row in 'mt->changes' for the
1445  * given 'uuid', or NULL if there is no such row. */
1446 static struct ovsdb_monitor_row *
1447 ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt,
1448                        const struct uuid *uuid)
1449 {
1450     struct ovsdb_monitor_row *row;
1451
1452     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &mt->changes) {
1453         if (uuid_equals(uuid, &row->uuid)) {
1454             return row;
1455         }
1456     }
1457     return NULL;
1458 }
1459
1460 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
1461  * copies of the data in 'row' drawn from the columns represented by
1462  * mt->columns[].  Returns the array.
1463  *
1464  * If 'row' is NULL, returns NULL. */
1465 static struct ovsdb_datum *
1466 clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
1467                        const struct ovsdb_row *row)
1468 {
1469     struct ovsdb_datum *data;
1470     size_t i;
1471
1472     if (!row) {
1473         return NULL;
1474     }
1475
1476     data = xmalloc(mt->n_columns * sizeof *data);
1477     for (i = 0; i < mt->n_columns; i++) {
1478         const struct ovsdb_column *c = mt->columns[i].column;
1479         const struct ovsdb_datum *src = &row->fields[c->index];
1480         struct ovsdb_datum *dst = &data[i];
1481         const struct ovsdb_type *type = &c->type;
1482
1483         ovsdb_datum_clone(dst, src, type);
1484     }
1485     return data;
1486 }
1487
1488 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
1489  * in 'row' drawn from the columns represented by mt->columns[]. */
1490 static void
1491 update_monitor_row_data(const struct ovsdb_monitor_table *mt,
1492                         const struct ovsdb_row *row,
1493                         struct ovsdb_datum *data)
1494 {
1495     size_t i;
1496
1497     for (i = 0; i < mt->n_columns; i++) {
1498         const struct ovsdb_column *c = mt->columns[i].column;
1499         const struct ovsdb_datum *src = &row->fields[c->index];
1500         struct ovsdb_datum *dst = &data[i];
1501         const struct ovsdb_type *type = &c->type;
1502
1503         if (!ovsdb_datum_equals(src, dst, type)) {
1504             ovsdb_datum_destroy(dst, type);
1505             ovsdb_datum_clone(dst, src, type);
1506         }
1507     }
1508 }
1509
1510 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
1511  * from mt->columns[], plus 'data' itself. */
1512 static void
1513 free_monitor_row_data(const struct ovsdb_monitor_table *mt,
1514                       struct ovsdb_datum *data)
1515 {
1516     if (data) {
1517         size_t i;
1518
1519         for (i = 0; i < mt->n_columns; i++) {
1520             const struct ovsdb_column *c = mt->columns[i].column;
1521
1522             ovsdb_datum_destroy(&data[i], &c->type);
1523         }
1524         free(data);
1525     }
1526 }
1527
1528 /* Frees 'row', which must have been created from 'mt'. */
1529 static void
1530 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
1531                           struct ovsdb_monitor_row *row)
1532 {
1533     if (row) {
1534         free_monitor_row_data(mt, row->old);
1535         free_monitor_row_data(mt, row->new);
1536         free(row);
1537     }
1538 }
1539
1540 static bool
1541 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
1542                         const struct ovsdb_row *new,
1543                         const unsigned long int *changed OVS_UNUSED,
1544                         void *aux_)
1545 {
1546     struct ovsdb_monitor_aux *aux = aux_;
1547     const struct ovsdb_monitor *m = aux->monitor;
1548     struct ovsdb_table *table = new ? new->table : old->table;
1549     const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
1550     struct ovsdb_monitor_row *change;
1551     struct ovsdb_monitor_table *mt;
1552
1553     if (!aux->mt || table != aux->mt->table) {
1554         aux->mt = shash_find_data(&m->tables, table->schema->name);
1555         if (!aux->mt) {
1556             /* We don't care about rows in this table at all.  Tell the caller
1557              * to skip it.  */
1558             return false;
1559         }
1560     }
1561     mt = aux->mt;
1562
1563     change = ovsdb_monitor_row_find(mt, uuid);
1564     if (!change) {
1565         change = xmalloc(sizeof *change);
1566         hmap_insert(&mt->changes, &change->hmap_node, uuid_hash(uuid));
1567         change->uuid = *uuid;
1568         change->old = clone_monitor_row_data(mt, old);
1569         change->new = clone_monitor_row_data(mt, new);
1570     } else {
1571         if (new) {
1572             update_monitor_row_data(mt, new, change->new);
1573         } else {
1574             free_monitor_row_data(mt, change->new);
1575             change->new = NULL;
1576
1577             if (!change->old) {
1578                 /* This row was added then deleted.  Forget about it. */
1579                 hmap_remove(&mt->changes, &change->hmap_node);
1580                 free(change);
1581             }
1582         }
1583     }
1584     return true;
1585 }
1586
1587 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
1588  * 'mt', or NULL if no row update should be sent.
1589  *
1590  * The caller should specify 'initial' as true if the returned JSON is going to
1591  * be used as part of the initial reply to a "monitor" request, false if it is
1592  * going to be used as part of an "update" notification.
1593  *
1594  * 'changed' must be a scratch buffer for internal use that is at least
1595  * bitmap_n_bytes(mt->n_columns) bytes long. */
1596 static struct json *
1597 ovsdb_monitor_compose_row_update(
1598     const struct ovsdb_monitor_table *mt,
1599     const struct ovsdb_monitor_row *row,
1600     bool initial, unsigned long int *changed)
1601 {
1602     enum ovsdb_monitor_selection type;
1603     struct json *old_json, *new_json;
1604     struct json *row_json;
1605     size_t i;
1606
1607     type = (initial ? OJMS_INITIAL
1608             : !row->old ? OJMS_INSERT
1609             : !row->new ? OJMS_DELETE
1610             : OJMS_MODIFY);
1611     if (!(mt->select & type)) {
1612         return NULL;
1613     }
1614
1615     if (type == OJMS_MODIFY) {
1616         size_t n_changes;
1617
1618         n_changes = 0;
1619         memset(changed, 0, bitmap_n_bytes(mt->n_columns));
1620         for (i = 0; i < mt->n_columns; i++) {
1621             const struct ovsdb_column *c = mt->columns[i].column;
1622             if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
1623                 bitmap_set1(changed, i);
1624                 n_changes++;
1625             }
1626         }
1627         if (!n_changes) {
1628             /* No actual changes: presumably a row changed and then
1629              * changed back later. */
1630             return NULL;
1631         }
1632     }
1633
1634     row_json = json_object_create();
1635     old_json = new_json = NULL;
1636     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
1637         old_json = json_object_create();
1638         json_object_put(row_json, "old", old_json);
1639     }
1640     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1641         new_json = json_object_create();
1642         json_object_put(row_json, "new", new_json);
1643     }
1644     for (i = 0; i < mt->n_columns; i++) {
1645         const struct ovsdb_monitor_column *c = &mt->columns[i];
1646
1647         if (!(type & c->select)) {
1648             /* We don't care about this type of change for this
1649              * particular column (but we will care about it for some
1650              * other column). */
1651             continue;
1652         }
1653
1654         if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
1655             || type == OJMS_DELETE) {
1656             json_object_put(old_json, c->column->name,
1657                             ovsdb_datum_to_json(&row->old[i],
1658                                                 &c->column->type));
1659         }
1660         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1661             json_object_put(new_json, c->column->name,
1662                             ovsdb_datum_to_json(&row->new[i],
1663                                                 &c->column->type));
1664         }
1665     }
1666
1667     return row_json;
1668 }
1669
1670 /* Constructs and returns JSON for a <table-updates> object (as described in
1671  * RFC 7047) for all the outstanding changes within 'monitor', and deletes all
1672  * the outstanding changes from 'monitor'.  Returns NULL if no update needs to
1673  * be sent.
1674  *
1675  * The caller should specify 'initial' as true if the returned JSON is going to
1676  * be used as part of the initial reply to a "monitor" request, false if it is
1677  * going to be used as part of an "update" notification. */
1678 static struct json *
1679 ovsdb_monitor_compose_table_update(
1680     const struct ovsdb_monitor *dbmon, bool initial)
1681 {
1682     struct shash_node *node;
1683     unsigned long int *changed;
1684     struct json *json;
1685     size_t max_columns;
1686
1687     max_columns = 0;
1688     SHASH_FOR_EACH (node, &dbmon->tables) {
1689         struct ovsdb_monitor_table *mt = node->data;
1690
1691         max_columns = MAX(max_columns, mt->n_columns);
1692     }
1693     changed = xmalloc(bitmap_n_bytes(max_columns));
1694
1695     json = NULL;
1696     SHASH_FOR_EACH (node, &dbmon->tables) {
1697         struct ovsdb_monitor_table *mt = node->data;
1698         struct ovsdb_monitor_row *row, *next;
1699         struct json *table_json = NULL;
1700
1701         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
1702             struct json *row_json;
1703
1704             row_json = ovsdb_monitor_compose_row_update(
1705                 mt, row, initial, changed);
1706             if (row_json) {
1707                 char uuid[UUID_LEN + 1];
1708
1709                 /* Create JSON object for transaction overall. */
1710                 if (!json) {
1711                     json = json_object_create();
1712                 }
1713
1714                 /* Create JSON object for transaction on this table. */
1715                 if (!table_json) {
1716                     table_json = json_object_create();
1717                     json_object_put(json, mt->table->schema->name, table_json);
1718                 }
1719
1720                 /* Add JSON row to JSON table. */
1721                 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
1722                 json_object_put(table_json, uuid, row_json);
1723             }
1724
1725             hmap_remove(&mt->changes, &row->hmap_node);
1726             ovsdb_monitor_row_destroy(mt, row);
1727         }
1728     }
1729
1730     free(changed);
1731
1732     return json;
1733 }
1734
1735 static struct json *
1736 ovsdb_jsonrpc_monitor_compose_table_update(
1737     const struct ovsdb_jsonrpc_monitor *monitor, bool initial)
1738 {
1739     return ovsdb_monitor_compose_table_update(monitor->dbmon, initial);
1740 }
1741
1742 static bool
1743 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon)
1744 {
1745     struct shash_node *node;
1746
1747     SHASH_FOR_EACH (node, &dbmon->tables) {
1748         struct ovsdb_monitor_table *mt = node->data;
1749
1750         if (!hmap_is_empty(&mt->changes)) {
1751             return true;
1752         }
1753     }
1754     return false;
1755 }
1756
1757 static bool
1758 ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
1759 {
1760     struct ovsdb_jsonrpc_monitor *m;
1761
1762     HMAP_FOR_EACH (m, node, &s->monitors) {
1763         if (ovsdb_monitor_needs_flush(m->dbmon)) {
1764             return true;
1765         }
1766     }
1767
1768     return false;
1769 }
1770
1771 static void
1772 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
1773 {
1774     struct ovsdb_jsonrpc_monitor *m;
1775
1776     HMAP_FOR_EACH (m, node, &s->monitors) {
1777         struct json *json;
1778
1779         json = ovsdb_jsonrpc_monitor_compose_table_update(m, false);
1780         if (json) {
1781             struct jsonrpc_msg *msg;
1782             struct json *params;
1783
1784             params = json_array_create_2(json_clone(m->monitor_id), json);
1785             msg = jsonrpc_create_notify("update", params);
1786             jsonrpc_session_send(s->js, msg);
1787         }
1788     }
1789 }
1790
1791 static void
1792 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
1793                        const struct ovsdb_monitor *m)
1794 {
1795     aux->monitor = m;
1796     aux->mt = NULL;
1797 }
1798
1799 static struct ovsdb_error *
1800 ovsdb_monitor_commit(struct ovsdb_replica *replica,
1801                      const struct ovsdb_txn *txn,
1802                      bool durable OVS_UNUSED)
1803 {
1804     struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
1805     struct ovsdb_monitor_aux aux;
1806
1807     ovsdb_monitor_init_aux(&aux, m);
1808     ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
1809
1810     return NULL;
1811 }
1812
1813 static struct json *
1814 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
1815 {
1816     struct ovsdb_monitor_aux aux;
1817     struct shash_node *node;
1818     struct json *json;
1819
1820     ovsdb_monitor_init_aux(&aux, dbmon);
1821     SHASH_FOR_EACH (node, &dbmon->tables) {
1822         struct ovsdb_monitor_table *mt = node->data;
1823
1824         if (mt->select & OJMS_INITIAL) {
1825             struct ovsdb_row *row;
1826
1827             HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1828                 ovsdb_monitor_change_cb(NULL, row, NULL, &aux);
1829             }
1830         }
1831     }
1832     json = ovsdb_monitor_compose_table_update(dbmon, true);
1833     return json ? json : json_object_create();
1834 }
1835
1836 static void
1837 ovsdb_monitor_destroy(struct ovsdb_replica *replica)
1838 {
1839     struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
1840     struct ovsdb_jsonrpc_monitor *jsonrpc_monitor = m->jsonrpc_monitor;
1841     struct shash_node *node;
1842
1843     json_destroy(jsonrpc_monitor->monitor_id);
1844     SHASH_FOR_EACH (node, &m->tables) {
1845         struct ovsdb_monitor_table *mt = node->data;
1846         struct ovsdb_monitor_row *row, *next;
1847
1848         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
1849             hmap_remove(&mt->changes, &row->hmap_node);
1850             ovsdb_monitor_row_destroy(mt, row);
1851         }
1852         hmap_destroy(&mt->changes);
1853
1854         free(mt->columns);
1855         free(mt);
1856     }
1857     shash_destroy(&m->tables);
1858     hmap_remove(&jsonrpc_monitor->session->monitors, &jsonrpc_monitor->node);
1859     free(jsonrpc_monitor);
1860     free(m);
1861 }
1862
1863 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1864     ovsdb_monitor_commit,
1865     ovsdb_monitor_destroy
1866 };