1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
24 #include "openvswitch/dynamic-string.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
31 #include "condition.h"
32 #include "poll-loop.h"
33 #include "reconnect.h"
40 #include "transaction.h"
42 #include "openvswitch/vlog.h"
44 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
46 struct ovsdb_jsonrpc_remote;
47 struct ovsdb_jsonrpc_session;
49 /* Set false to defeature monitor_cond, causing jsonrpc to respond to
50 * monitor_cond method with an error. */
51 static bool monitor_cond_enable__ = true;
53 /* Message rate-limiting. */
54 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
57 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
58 struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
59 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
60 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
61 static void ovsdb_jsonrpc_session_get_memory_usage_all(
62 const struct ovsdb_jsonrpc_remote *, struct simap *usage);
63 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
64 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
65 static void ovsdb_jsonrpc_session_set_all_options(
66 struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
67 static bool ovsdb_jsonrpc_active_session_get_status(
68 const struct ovsdb_jsonrpc_remote *,
69 struct ovsdb_jsonrpc_remote_status *);
70 static void ovsdb_jsonrpc_session_get_status(
71 const struct ovsdb_jsonrpc_session *,
72 struct ovsdb_jsonrpc_remote_status *);
73 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
74 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
75 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
76 struct jsonrpc_msg *);
79 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
81 struct json *id, struct json *params);
82 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
83 struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
84 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
85 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
86 static void ovsdb_jsonrpc_trigger_complete_done(
87 struct ovsdb_jsonrpc_session *);
90 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
91 struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
92 enum ovsdb_monitor_version, const struct json *request_id);
93 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_change(
94 struct ovsdb_jsonrpc_session *s,
96 const struct json *request_id);
97 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
98 struct ovsdb_jsonrpc_session *,
99 struct json_array *params,
100 const struct json *request_id);
101 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
102 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
103 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
104 static struct json *ovsdb_jsonrpc_monitor_compose_update(
105 struct ovsdb_jsonrpc_monitor *monitor, bool initial);
106 static struct jsonrpc_msg * ovsdb_jsonrpc_create_notify(
107 const struct ovsdb_jsonrpc_monitor *m,
108 struct json *params);
111 /* JSON-RPC database server. */
113 struct ovsdb_jsonrpc_server {
114 struct ovsdb_server up;
115 unsigned int n_sessions;
116 struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
119 /* A configured remote. This is either a passive stream listener plus a list
120 * of the currently connected sessions, or a list of exactly one active
122 struct ovsdb_jsonrpc_remote {
123 struct ovsdb_jsonrpc_server *server;
124 struct pstream *listener; /* Listener, if passive. */
125 struct ovs_list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
129 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
130 struct ovsdb_jsonrpc_server *, const char *name,
131 const struct ovsdb_jsonrpc_options *options
133 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
135 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
137 * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
138 * which 'server' should provide access. */
139 struct ovsdb_jsonrpc_server *
140 ovsdb_jsonrpc_server_create(void)
142 struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
143 ovsdb_server_init(&server->up);
144 shash_init(&server->remotes);
148 /* Adds 'db' to the set of databases served out by 'svr'. Returns true if
149 * successful, false if 'db''s name is the same as some database already in
152 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *svr, struct ovsdb *db)
154 /* The OVSDB protocol doesn't have a way to notify a client that a
155 * database has been added. If some client tried to use the database
156 * that we're adding and failed, then forcing it to reconnect seems like
157 * a reasonable way to make it try again.
159 * If this is too big of a hammer in practice, we could be more selective,
160 * e.g. disconnect only connections that actually tried to use a database
161 * with 'db''s name. */
162 ovsdb_jsonrpc_server_reconnect(svr);
164 return ovsdb_server_add_db(&svr->up, db);
167 /* Removes 'db' from the set of databases served out by 'svr'. Returns
168 * true if successful, false if there is no database associated with 'db'. */
170 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server *svr,
173 /* There might be pointers to 'db' from 'svr', such as monitors or
174 * outstanding transactions. Disconnect all JSON-RPC connections to avoid
175 * accesses to freed memory.
177 * If this is too big of a hammer in practice, we could be more selective,
178 * e.g. disconnect only connections that actually reference 'db'. */
179 ovsdb_jsonrpc_server_reconnect(svr);
181 return ovsdb_server_remove_db(&svr->up, db);
185 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
187 struct shash_node *node, *next;
189 SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
190 ovsdb_jsonrpc_server_del_remote(node);
192 shash_destroy(&svr->remotes);
193 ovsdb_server_destroy(&svr->up);
197 struct ovsdb_jsonrpc_options *
198 ovsdb_jsonrpc_default_options(const char *target)
200 struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
201 options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
202 options->probe_interval = (stream_or_pstream_needs_probes(target)
203 ? RECONNECT_DEFAULT_PROBE_INTERVAL
208 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
209 * options in the struct ovsdb_jsonrpc_options supplied as the data values.
211 * A remote is an active or passive stream connection method, e.g. "pssl:" or
214 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
215 const struct shash *new_remotes)
217 struct shash_node *node, *next;
219 SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
220 struct ovsdb_jsonrpc_remote *remote = node->data;
221 struct ovsdb_jsonrpc_options *options
222 = shash_find_data(new_remotes, node->name);
225 VLOG_INFO("%s: remote deconfigured", node->name);
226 ovsdb_jsonrpc_server_del_remote(node);
227 } else if (options->dscp != remote->dscp) {
228 ovsdb_jsonrpc_server_del_remote(node);
231 SHASH_FOR_EACH (node, new_remotes) {
232 const struct ovsdb_jsonrpc_options *options = node->data;
233 struct ovsdb_jsonrpc_remote *remote;
235 remote = shash_find_data(&svr->remotes, node->name);
237 remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
243 ovsdb_jsonrpc_session_set_all_options(remote, options);
247 static struct ovsdb_jsonrpc_remote *
248 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
250 const struct ovsdb_jsonrpc_options *options)
252 struct ovsdb_jsonrpc_remote *remote;
253 struct pstream *listener;
256 error = jsonrpc_pstream_open(name, &listener, options->dscp);
257 if (error && error != EAFNOSUPPORT) {
258 VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, ovs_strerror(error));
262 remote = xmalloc(sizeof *remote);
263 remote->server = svr;
264 remote->listener = listener;
265 ovs_list_init(&remote->sessions);
266 remote->dscp = options->dscp;
267 shash_add(&svr->remotes, name, remote);
270 ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name, true));
276 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
278 struct ovsdb_jsonrpc_remote *remote = node->data;
280 ovsdb_jsonrpc_session_close_all(remote);
281 pstream_close(remote->listener);
282 shash_delete(&remote->server->remotes, node);
286 /* Stores status information for the remote named 'target', which should have
287 * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
288 * into '*status'. On success returns true, on failure (if 'svr' doesn't have
289 * a remote named 'target' or if that remote is an outbound remote that has no
290 * active connections) returns false. On failure, 'status' will be zeroed.
293 ovsdb_jsonrpc_server_get_remote_status(
294 const struct ovsdb_jsonrpc_server *svr, const char *target,
295 struct ovsdb_jsonrpc_remote_status *status)
297 const struct ovsdb_jsonrpc_remote *remote;
299 memset(status, 0, sizeof *status);
301 remote = shash_find_data(&svr->remotes, target);
307 if (remote->listener) {
308 status->bound_port = pstream_get_bound_port(remote->listener);
309 status->is_connected = !ovs_list_is_empty(&remote->sessions);
310 status->n_connections = ovs_list_size(&remote->sessions);
314 return ovsdb_jsonrpc_active_session_get_status(remote, status);
318 ovsdb_jsonrpc_server_free_remote_status(
319 struct ovsdb_jsonrpc_remote_status *status)
321 free(status->locks_held);
322 free(status->locks_waiting);
323 free(status->locks_lost);
326 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
329 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
331 struct shash_node *node;
333 SHASH_FOR_EACH (node, &svr->remotes) {
334 struct ovsdb_jsonrpc_remote *remote = node->data;
336 ovsdb_jsonrpc_session_reconnect_all(remote);
341 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
343 struct shash_node *node;
345 SHASH_FOR_EACH (node, &svr->remotes) {
346 struct ovsdb_jsonrpc_remote *remote = node->data;
348 if (remote->listener) {
349 struct stream *stream;
352 error = pstream_accept(remote->listener, &stream);
354 struct jsonrpc_session *js;
355 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
357 ovsdb_jsonrpc_session_create(remote, js);
358 } else if (error != EAGAIN) {
359 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
360 pstream_get_name(remote->listener),
361 ovs_strerror(error));
365 ovsdb_jsonrpc_session_run_all(remote);
370 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
372 struct shash_node *node;
374 SHASH_FOR_EACH (node, &svr->remotes) {
375 struct ovsdb_jsonrpc_remote *remote = node->data;
377 if (remote->listener) {
378 pstream_wait(remote->listener);
381 ovsdb_jsonrpc_session_wait_all(remote);
385 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
386 * memory_report(). */
388 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
391 struct shash_node *node;
393 simap_increase(usage, "sessions", svr->n_sessions);
394 SHASH_FOR_EACH (node, &svr->remotes) {
395 struct ovsdb_jsonrpc_remote *remote = node->data;
397 ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
401 /* JSON-RPC database server session. */
403 struct ovsdb_jsonrpc_session {
404 struct ovs_list node; /* Element in remote's sessions list. */
405 struct ovsdb_session up;
406 struct ovsdb_jsonrpc_remote *remote;
409 struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
412 struct hmap monitors; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
414 /* Network connectivity. */
415 struct jsonrpc_session *js; /* JSON-RPC session. */
416 unsigned int js_seqno; /* Last jsonrpc_session_get_seqno() value. */
419 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
420 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
421 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
422 static void ovsdb_jsonrpc_session_get_memory_usage(
423 const struct ovsdb_jsonrpc_session *, struct simap *usage);
424 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
425 struct jsonrpc_msg *);
426 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
427 struct jsonrpc_msg *);
429 static struct ovsdb_jsonrpc_session *
430 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
431 struct jsonrpc_session *js)
433 struct ovsdb_jsonrpc_session *s;
435 s = xzalloc(sizeof *s);
436 ovsdb_session_init(&s->up, &remote->server->up);
438 ovs_list_push_back(&remote->sessions, &s->node);
439 hmap_init(&s->triggers);
440 hmap_init(&s->monitors);
442 s->js_seqno = jsonrpc_session_get_seqno(js);
444 remote->server->n_sessions++;
450 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
452 ovsdb_jsonrpc_monitor_remove_all(s);
453 ovsdb_jsonrpc_session_unlock_all(s);
454 ovsdb_jsonrpc_trigger_complete_all(s);
456 hmap_destroy(&s->monitors);
457 hmap_destroy(&s->triggers);
459 jsonrpc_session_close(s->js);
460 ovs_list_remove(&s->node);
461 s->remote->server->n_sessions--;
462 ovsdb_session_destroy(&s->up);
467 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
469 jsonrpc_session_run(s->js);
470 if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
471 s->js_seqno = jsonrpc_session_get_seqno(s->js);
472 ovsdb_jsonrpc_trigger_complete_all(s);
473 ovsdb_jsonrpc_monitor_remove_all(s);
474 ovsdb_jsonrpc_session_unlock_all(s);
477 ovsdb_jsonrpc_trigger_complete_done(s);
479 if (!jsonrpc_session_get_backlog(s->js)) {
480 struct jsonrpc_msg *msg;
482 ovsdb_jsonrpc_monitor_flush_all(s);
484 msg = jsonrpc_session_recv(s->js);
486 if (msg->type == JSONRPC_REQUEST) {
487 ovsdb_jsonrpc_session_got_request(s, msg);
488 } else if (msg->type == JSONRPC_NOTIFY) {
489 ovsdb_jsonrpc_session_got_notify(s, msg);
491 VLOG_WARN("%s: received unexpected %s message",
492 jsonrpc_session_get_name(s->js),
493 jsonrpc_msg_type_to_string(msg->type));
494 jsonrpc_session_force_reconnect(s->js);
495 jsonrpc_msg_destroy(msg);
499 return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
503 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
504 const struct ovsdb_jsonrpc_options *options)
506 jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
507 jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
508 jsonrpc_session_set_dscp(session->js, options->dscp);
512 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
514 struct ovsdb_jsonrpc_session *s, *next;
516 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
517 int error = ovsdb_jsonrpc_session_run(s);
519 ovsdb_jsonrpc_session_close(s);
525 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
527 jsonrpc_session_wait(s->js);
528 if (!jsonrpc_session_get_backlog(s->js)) {
529 if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
530 poll_immediate_wake();
532 jsonrpc_session_recv_wait(s->js);
538 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
540 struct ovsdb_jsonrpc_session *s;
542 LIST_FOR_EACH (s, node, &remote->sessions) {
543 ovsdb_jsonrpc_session_wait(s);
548 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
551 simap_increase(usage, "triggers", hmap_count(&s->triggers));
552 simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
556 ovsdb_jsonrpc_session_get_memory_usage_all(
557 const struct ovsdb_jsonrpc_remote *remote,
560 struct ovsdb_jsonrpc_session *s;
562 LIST_FOR_EACH (s, node, &remote->sessions) {
563 ovsdb_jsonrpc_session_get_memory_usage(s, usage);
568 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
570 struct ovsdb_jsonrpc_session *s, *next;
572 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
573 ovsdb_jsonrpc_session_close(s);
577 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
580 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
582 struct ovsdb_jsonrpc_session *s, *next;
584 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
585 jsonrpc_session_force_reconnect(s->js);
586 if (!jsonrpc_session_is_alive(s->js)) {
587 ovsdb_jsonrpc_session_close(s);
592 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
595 * (The dscp value can't be changed directly; the caller must instead close and
596 * re-open the session.) */
598 ovsdb_jsonrpc_session_set_all_options(
599 struct ovsdb_jsonrpc_remote *remote,
600 const struct ovsdb_jsonrpc_options *options)
602 struct ovsdb_jsonrpc_session *s;
604 LIST_FOR_EACH (s, node, &remote->sessions) {
605 ovsdb_jsonrpc_session_set_options(s, options);
609 /* Sets the 'status' of for the 'remote' with an outgoing connection. */
611 ovsdb_jsonrpc_active_session_get_status(
612 const struct ovsdb_jsonrpc_remote *remote,
613 struct ovsdb_jsonrpc_remote_status *status)
615 const struct ovs_list *sessions = &remote->sessions;
616 const struct ovsdb_jsonrpc_session *s;
618 if (ovs_list_is_empty(sessions)) {
622 ovs_assert(ovs_list_is_singleton(sessions));
623 s = CONTAINER_OF(ovs_list_front(sessions), struct ovsdb_jsonrpc_session, node);
624 ovsdb_jsonrpc_session_get_status(s, status);
625 status->n_connections = 1;
631 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_session *session,
632 struct ovsdb_jsonrpc_remote_status *status)
634 const struct ovsdb_jsonrpc_session *s = session;
635 const struct jsonrpc_session *js;
636 struct ovsdb_lock_waiter *waiter;
637 struct reconnect_stats rstats;
638 struct ds locks_held, locks_waiting, locks_lost;
642 status->is_connected = jsonrpc_session_is_connected(js);
643 status->last_error = jsonrpc_session_get_status(js);
645 jsonrpc_session_get_reconnect_stats(js, &rstats);
646 status->state = rstats.state;
647 status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
648 ? UINT_MAX : rstats.msec_since_connect / 1000;
649 status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
650 ? UINT_MAX : rstats.msec_since_disconnect / 1000;
652 ds_init(&locks_held);
653 ds_init(&locks_waiting);
654 ds_init(&locks_lost);
655 HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
658 string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
659 : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
661 if (string->length) {
662 ds_put_char(string, ' ');
664 ds_put_cstr(string, waiter->lock_name);
666 status->locks_held = ds_steal_cstr(&locks_held);
667 status->locks_waiting = ds_steal_cstr(&locks_waiting);
668 status->locks_lost = ds_steal_cstr(&locks_lost);
671 /* Examines 'request' to determine the database to which it relates, and then
672 * searches 's' to find that database:
674 * - If successful, returns the database and sets '*replyp' to NULL.
676 * - If no such database exists, returns NULL and sets '*replyp' to an
677 * appropriate JSON-RPC error reply, owned by the caller. */
678 static struct ovsdb *
679 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session *s,
680 const struct jsonrpc_msg *request,
681 struct jsonrpc_msg **replyp)
683 struct json_array *params;
684 struct ovsdb_error *error;
688 params = json_array(request->params);
689 if (!params->n || params->elems[0]->type != JSON_STRING) {
690 error = ovsdb_syntax_error(
691 request->params, NULL,
692 "%s request params must begin with <db-name>", request->method);
696 db_name = params->elems[0]->u.string;
697 db = shash_find_data(&s->up.server->dbs, db_name);
699 error = ovsdb_syntax_error(
700 request->params, "unknown database",
701 "%s request specifies unknown database %s",
702 request->method, db_name);
710 *replyp = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
711 ovsdb_error_destroy(error);
715 static struct ovsdb_error *
716 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
717 const char **lock_namep)
719 const struct json_array *params;
721 params = json_array(request->params);
722 if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
723 !ovsdb_parser_is_id(json_string(params->elems[0]))) {
725 return ovsdb_syntax_error(request->params, NULL,
726 "%s request params must be <id>",
730 *lock_namep = json_string(params->elems[0]);
735 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
736 const char *lock_name,
739 struct ovsdb_jsonrpc_session *s;
742 s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
743 params = json_array_create_1(json_string_create(lock_name));
744 ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params));
747 static struct jsonrpc_msg *
748 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
749 struct jsonrpc_msg *request,
750 enum ovsdb_lock_mode mode)
752 struct ovsdb_lock_waiter *waiter;
753 struct jsonrpc_msg *reply;
754 struct ovsdb_error *error;
755 struct ovsdb_session *victim;
756 const char *lock_name;
759 error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
764 /* Report error if this session has issued a "lock" or "steal" without a
765 * matching "unlock" for this lock. */
766 waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
768 error = ovsdb_syntax_error(
769 request->params, NULL,
770 "must issue \"unlock\" before new \"%s\"", request->method);
774 /* Get the lock, add us as a waiter. */
775 waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
778 ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
781 result = json_object_create();
782 json_object_put(result, "locked",
783 json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
785 return jsonrpc_create_reply(result, request->id);
788 reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
789 ovsdb_error_destroy(error);
794 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
796 struct ovsdb_lock_waiter *waiter, *next;
798 HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
799 ovsdb_jsonrpc_session_unlock__(waiter);
804 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
806 struct ovsdb_lock *lock = waiter->lock;
809 struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
811 ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
813 /* ovsdb_server_lock() might have freed 'lock'. */
817 ovsdb_lock_waiter_destroy(waiter);
820 static struct jsonrpc_msg *
821 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
822 struct jsonrpc_msg *request)
824 struct ovsdb_lock_waiter *waiter;
825 struct jsonrpc_msg *reply;
826 struct ovsdb_error *error;
827 const char *lock_name;
829 error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
834 /* Report error if this session has not issued a "lock" or "steal" for this
836 waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
838 error = ovsdb_syntax_error(
839 request->params, NULL, "\"unlock\" without \"lock\" or \"steal\"");
843 ovsdb_jsonrpc_session_unlock__(waiter);
845 return jsonrpc_create_reply(json_object_create(), request->id);
848 reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
849 ovsdb_error_destroy(error);
853 static struct jsonrpc_msg *
854 execute_transaction(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
855 struct jsonrpc_msg *request)
857 ovsdb_jsonrpc_trigger_create(s, db, request->id, request->params);
859 request->params = NULL;
860 jsonrpc_msg_destroy(request);
865 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
866 struct jsonrpc_msg *request)
868 struct jsonrpc_msg *reply;
870 if (!strcmp(request->method, "transact")) {
871 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
873 reply = execute_transaction(s, db, request);
875 } else if (!strcmp(request->method, "monitor") ||
876 (monitor_cond_enable__ && !strcmp(request->method,
878 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
880 int l = strlen(request->method) - strlen("monitor");
881 enum ovsdb_monitor_version version = l ? OVSDB_MONITOR_V2
883 reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
884 version, request->id);
886 } else if (!strcmp(request->method, "monitor_cond_change")) {
887 reply = ovsdb_jsonrpc_monitor_cond_change(s, request->params,
889 } else if (!strcmp(request->method, "monitor_cancel")) {
890 reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
892 } else if (!strcmp(request->method, "get_schema")) {
893 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
895 reply = jsonrpc_create_reply(ovsdb_schema_to_json(db->schema),
898 } else if (!strcmp(request->method, "list_dbs")) {
899 size_t n_dbs = shash_count(&s->up.server->dbs);
900 struct shash_node *node;
904 dbs = xmalloc(n_dbs * sizeof *dbs);
906 SHASH_FOR_EACH (node, &s->up.server->dbs) {
907 dbs[i++] = json_string_create(node->name);
909 reply = jsonrpc_create_reply(json_array_create(dbs, n_dbs),
911 } else if (!strcmp(request->method, "lock")) {
912 reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
913 } else if (!strcmp(request->method, "steal")) {
914 reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
915 } else if (!strcmp(request->method, "unlock")) {
916 reply = ovsdb_jsonrpc_session_unlock(s, request);
917 } else if (!strcmp(request->method, "echo")) {
918 reply = jsonrpc_create_reply(json_clone(request->params), request->id);
920 reply = jsonrpc_create_error(json_string_create("unknown method"),
925 jsonrpc_msg_destroy(request);
926 ovsdb_jsonrpc_session_send(s, reply);
931 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
933 if (json_array(request->params)->n == 1) {
934 struct ovsdb_jsonrpc_trigger *t;
937 id = request->params->u.array.elems[0];
938 t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
940 ovsdb_jsonrpc_trigger_complete(t);
946 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
947 struct jsonrpc_msg *request)
949 if (!strcmp(request->method, "cancel")) {
950 execute_cancel(s, request);
952 jsonrpc_msg_destroy(request);
956 ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s,
957 struct jsonrpc_msg *msg)
959 ovsdb_jsonrpc_monitor_flush_all(s);
960 jsonrpc_session_send(s->js, msg);
963 /* JSON-RPC database server triggers.
965 * (Every transaction is treated as a trigger even if it doesn't actually have
966 * any "wait" operations.) */
968 struct ovsdb_jsonrpc_trigger {
969 struct ovsdb_trigger trigger;
970 struct hmap_node hmap_node; /* In session's "triggers" hmap. */
975 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
976 struct json *id, struct json *params)
978 struct ovsdb_jsonrpc_trigger *t;
981 /* Check for duplicate ID. */
982 hash = json_hash(id, 0);
983 t = ovsdb_jsonrpc_trigger_find(s, id, hash);
985 struct jsonrpc_msg *msg;
987 msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
989 ovsdb_jsonrpc_session_send(s, msg);
991 json_destroy(params);
995 /* Insert into trigger table. */
996 t = xmalloc(sizeof *t);
997 ovsdb_trigger_init(&s->up, db, &t->trigger, params, time_msec());
999 hmap_insert(&s->triggers, &t->hmap_node, hash);
1001 /* Complete early if possible. */
1002 if (ovsdb_trigger_is_complete(&t->trigger)) {
1003 ovsdb_jsonrpc_trigger_complete(t);
1007 static struct ovsdb_jsonrpc_trigger *
1008 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
1009 const struct json *id, size_t hash)
1011 struct ovsdb_jsonrpc_trigger *t;
1013 HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
1014 if (json_equal(t->id, id)) {
1023 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
1025 struct ovsdb_jsonrpc_session *s;
1027 s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
1029 if (jsonrpc_session_is_connected(s->js)) {
1030 struct jsonrpc_msg *reply;
1031 struct json *result;
1033 result = ovsdb_trigger_steal_result(&t->trigger);
1035 reply = jsonrpc_create_reply(result, t->id);
1037 reply = jsonrpc_create_error(json_string_create("canceled"),
1040 ovsdb_jsonrpc_session_send(s, reply);
1043 json_destroy(t->id);
1044 ovsdb_trigger_destroy(&t->trigger);
1045 hmap_remove(&s->triggers, &t->hmap_node);
1050 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
1052 struct ovsdb_jsonrpc_trigger *t, *next;
1053 HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
1054 ovsdb_jsonrpc_trigger_complete(t);
1059 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
1061 while (!ovs_list_is_empty(&s->up.completions)) {
1062 struct ovsdb_jsonrpc_trigger *t
1063 = CONTAINER_OF(s->up.completions.next,
1064 struct ovsdb_jsonrpc_trigger, trigger.node);
1065 ovsdb_jsonrpc_trigger_complete(t);
1069 /* Jsonrpc front end monitor. */
1070 struct ovsdb_jsonrpc_monitor {
1071 struct ovsdb_jsonrpc_session *session;
1073 struct hmap_node node; /* In ovsdb_jsonrpc_session's "monitors". */
1074 struct json *monitor_id;
1075 struct ovsdb_monitor *dbmon;
1076 uint64_t unflushed; /* The first transaction that has not been
1077 flushed to the jsonrpc remote client. */
1078 enum ovsdb_monitor_version version;
1079 struct ovsdb_monitor_session_condition *condition;/* Session's condition */
1082 static struct ovsdb_jsonrpc_monitor *
1083 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
1084 const struct json *monitor_id)
1086 struct ovsdb_jsonrpc_monitor *m;
1088 HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
1089 if (json_equal(m->monitor_id, monitor_id)) {
1098 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
1100 const struct json *json;
1102 json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
1103 return json ? json_boolean(json) : default_value;
1106 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
1107 ovsdb_jsonrpc_parse_monitor_request(
1108 struct ovsdb_monitor *dbmon,
1109 const struct ovsdb_table *table,
1110 struct ovsdb_monitor_session_condition *cond,
1111 const struct json *monitor_request)
1113 const struct ovsdb_table_schema *ts = table->schema;
1114 enum ovsdb_monitor_selection select;
1115 const struct json *columns, *select_json, *where = NULL;
1116 struct ovsdb_parser parser;
1117 struct ovsdb_error *error;
1119 ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
1121 where = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
1123 columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1125 select_json = ovsdb_parser_member(&parser, "select",
1126 OP_OBJECT | OP_OPTIONAL);
1128 error = ovsdb_parser_finish(&parser);
1135 ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1136 if (parse_bool(&parser, "initial", true)) {
1137 select |= OJMS_INITIAL;
1139 if (parse_bool(&parser, "insert", true)) {
1140 select |= OJMS_INSERT;
1142 if (parse_bool(&parser, "delete", true)) {
1143 select |= OJMS_DELETE;
1145 if (parse_bool(&parser, "modify", true)) {
1146 select |= OJMS_MODIFY;
1148 error = ovsdb_parser_finish(&parser);
1153 select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1156 ovsdb_monitor_table_add_select(dbmon, table, select);
1160 if (columns->type != JSON_ARRAY) {
1161 return ovsdb_syntax_error(columns, NULL,
1162 "array of column names expected");
1165 for (i = 0; i < columns->u.array.n; i++) {
1166 const struct ovsdb_column *column;
1169 if (columns->u.array.elems[i]->type != JSON_STRING) {
1170 return ovsdb_syntax_error(columns, NULL,
1171 "array of column names expected");
1174 s = columns->u.array.elems[i]->u.string;
1175 column = shash_find_data(&table->schema->columns, s);
1177 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1180 if (ovsdb_monitor_add_column(dbmon, table, column,
1182 return ovsdb_syntax_error(columns, NULL, "column %s "
1183 "mentioned more than once",
1188 struct shash_node *node;
1190 SHASH_FOR_EACH (node, &ts->columns) {
1191 const struct ovsdb_column *column = node->data;
1192 if (column->index != OVSDB_COL_UUID) {
1193 if (ovsdb_monitor_add_column(dbmon, table, column,
1195 return ovsdb_syntax_error(columns, NULL, "column %s "
1196 "mentioned more than once",
1203 error = ovsdb_monitor_table_condition_create(cond, table, where);
1212 static struct jsonrpc_msg *
1213 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1214 struct json *params,
1215 enum ovsdb_monitor_version version,
1216 const struct json *request_id)
1218 struct ovsdb_jsonrpc_monitor *m = NULL;
1219 struct ovsdb_monitor *dbmon = NULL;
1220 struct json *monitor_id, *monitor_requests;
1221 struct ovsdb_error *error = NULL;
1222 struct shash_node *node;
1225 if (json_array(params)->n != 3) {
1226 error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1229 monitor_id = params->u.array.elems[1];
1230 monitor_requests = params->u.array.elems[2];
1231 if (monitor_requests->type != JSON_OBJECT) {
1232 error = ovsdb_syntax_error(monitor_requests, NULL,
1233 "monitor-requests must be object");
1237 if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1238 error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1242 m = xzalloc(sizeof *m);
1245 m->dbmon = ovsdb_monitor_create(db, m);
1246 if (version == OVSDB_MONITOR_V2) {
1247 m->condition = ovsdb_monitor_session_condition_create();
1250 m->version = version;
1251 hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1252 m->monitor_id = json_clone(monitor_id);
1254 SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1255 const struct ovsdb_table *table;
1256 const struct json *mr_value;
1259 table = ovsdb_get_table(m->db, node->name);
1261 error = ovsdb_syntax_error(NULL, NULL,
1262 "no table named %s", node->name);
1266 ovsdb_monitor_add_table(m->dbmon, table);
1268 /* Parse columns. */
1269 mr_value = node->data;
1270 if (mr_value->type == JSON_ARRAY) {
1271 const struct json_array *array = &mr_value->u.array;
1273 for (i = 0; i < array->n; i++) {
1274 error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
1283 error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
1293 dbmon = ovsdb_monitor_add(m->dbmon);
1294 if (dbmon != m->dbmon) {
1295 /* Found an exisiting dbmon, reuse the current one. */
1296 ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
1297 ovsdb_monitor_add_jsonrpc_monitor(dbmon, m);
1301 /* Only now we can bind session's condition to ovsdb_monitor */
1303 ovsdb_monitor_condition_bind(m->dbmon, m->condition);
1306 ovsdb_monitor_get_initial(m->dbmon);
1307 json = ovsdb_jsonrpc_monitor_compose_update(m, true);
1308 json = json ? json : json_object_create();
1309 return jsonrpc_create_reply(json, request_id);
1313 ovsdb_jsonrpc_monitor_destroy(m);
1316 json = ovsdb_error_to_json(error);
1317 ovsdb_error_destroy(error);
1318 return jsonrpc_create_error(json, request_id);
1321 static struct ovsdb_error *
1322 ovsdb_jsonrpc_parse_monitor_cond_change_request(
1323 struct ovsdb_jsonrpc_monitor *m,
1324 const struct ovsdb_table *table,
1325 const struct json *cond_change_req)
1327 const struct ovsdb_table_schema *ts = table->schema;
1328 const struct json *condition, *columns;
1329 struct ovsdb_parser parser;
1330 struct ovsdb_error *error;
1332 ovsdb_parser_init(&parser, cond_change_req, "table %s", ts->name);
1333 columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1334 condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
1336 error = ovsdb_parser_finish(&parser);
1342 error = ovsdb_syntax_error(cond_change_req, NULL, "changing columns "
1346 error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition, table,
1352 static struct jsonrpc_msg *
1353 ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
1354 struct json *params,
1355 const struct json *request_id)
1357 struct ovsdb_error *error;
1358 struct ovsdb_jsonrpc_monitor *m;
1359 struct json *monitor_cond_change_reqs;
1360 struct shash_node *node;
1363 if (json_array(params)->n != 3) {
1364 error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1368 m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
1370 error = ovsdb_syntax_error(request_id, NULL,
1371 "unknown monitor session");
1375 monitor_cond_change_reqs = params->u.array.elems[2];
1376 if (monitor_cond_change_reqs->type != JSON_OBJECT) {
1378 ovsdb_syntax_error(NULL, NULL,
1379 "monitor-cond-change-requests must be object");
1383 SHASH_FOR_EACH (node, json_object(monitor_cond_change_reqs)) {
1384 const struct ovsdb_table *table;
1385 const struct json *mr_value;
1388 table = ovsdb_get_table(m->db, node->name);
1390 error = ovsdb_syntax_error(NULL, NULL,
1391 "no table named %s", node->name);
1394 if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
1395 error = ovsdb_syntax_error(NULL, NULL,
1396 "no table named %s in monitor session",
1401 mr_value = node->data;
1402 if (mr_value->type == JSON_ARRAY) {
1403 const struct json_array *array = &mr_value->u.array;
1405 for (i = 0; i < array->n; i++) {
1406 error = ovsdb_jsonrpc_parse_monitor_cond_change_request(
1407 m, table, array->elems[i]);
1413 error = ovsdb_syntax_error(
1415 "table %s no monitor-cond-change JSON array",
1421 /* Change monitor id */
1422 hmap_remove(&s->monitors, &m->node);
1423 json_destroy(m->monitor_id);
1424 m->monitor_id = json_clone(params->u.array.elems[1]);
1425 hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
1427 /* Send the new update, if any, represents the difference from the old
1428 * condition and the new one. */
1429 struct json *update_json;
1431 update_json = ovsdb_monitor_get_update(m->dbmon, false, true,
1432 &m->unflushed, m->condition, m->version);
1434 struct jsonrpc_msg *msg;
1435 struct json *params;
1437 params = json_array_create_2(json_clone(m->monitor_id), update_json);
1438 msg = ovsdb_jsonrpc_create_notify(m, params);
1439 jsonrpc_session_send(s->js, msg);
1442 return jsonrpc_create_reply(json_object_create(), request_id);
1446 json = ovsdb_error_to_json(error);
1447 ovsdb_error_destroy(error);
1448 return jsonrpc_create_error(json, request_id);
1451 static struct jsonrpc_msg *
1452 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1453 struct json_array *params,
1454 const struct json *request_id)
1456 if (params->n != 1) {
1457 return jsonrpc_create_error(json_string_create("invalid parameters"),
1460 struct ovsdb_jsonrpc_monitor *m;
1462 m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1464 return jsonrpc_create_error(json_string_create("unknown monitor"),
1467 ovsdb_jsonrpc_monitor_destroy(m);
1468 return jsonrpc_create_reply(json_object_create(), request_id);
1474 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1476 struct ovsdb_jsonrpc_monitor *m, *next;
1478 HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1479 ovsdb_jsonrpc_monitor_destroy(m);
1483 static struct json *
1484 ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
1488 if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
1492 return ovsdb_monitor_get_update(m->dbmon, initial, false,
1493 &m->unflushed, m->condition, m->version);
1497 ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
1499 struct ovsdb_jsonrpc_monitor *m;
1501 HMAP_FOR_EACH (m, node, &s->monitors) {
1502 if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
1511 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor *m)
1513 json_destroy(m->monitor_id);
1514 hmap_remove(&m->session->monitors, &m->node);
1515 ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
1516 ovsdb_monitor_session_condition_destroy(m->condition);
1520 static struct jsonrpc_msg *
1521 ovsdb_jsonrpc_create_notify(const struct ovsdb_jsonrpc_monitor *m,
1522 struct json *params)
1526 switch(m->version) {
1527 case OVSDB_MONITOR_V1:
1530 case OVSDB_MONITOR_V2:
1533 case OVSDB_MONITOR_VERSION_MAX:
1538 return jsonrpc_create_notify(method, params);
1542 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
1544 struct ovsdb_jsonrpc_monitor *m;
1546 HMAP_FOR_EACH (m, node, &s->monitors) {
1549 json = ovsdb_jsonrpc_monitor_compose_update(m, false);
1551 struct jsonrpc_msg *msg;
1552 struct json *params;
1554 params = json_array_create_2(json_clone(m->monitor_id), json);
1555 msg = ovsdb_jsonrpc_create_notify(m, params);
1556 jsonrpc_session_send(s->js, msg);
1562 ovsdb_jsonrpc_disable_monitor_cond(void)
1564 /* Once disabled, it is not possible to re-enable it. */
1565 monitor_cond_enable__ = false;