3ab05a364c291c64b488481eda38d852a14e7687
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "coverage.h"
27 #include "openvswitch/dynamic-string.h"
28 #include "fatal-signal.h"
29 #include "json.h"
30 #include "jsonrpc.h"
31 #include "ovsdb/ovsdb.h"
32 #include "ovsdb/table.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-idl-provider.h"
36 #include "ovsdb-parser.h"
37 #include "poll-loop.h"
38 #include "shash.h"
39 #include "sset.h"
40 #include "util.h"
41 #include "openvswitch/vlog.h"
42
43 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
44
45 COVERAGE_DEFINE(txn_uncommitted);
46 COVERAGE_DEFINE(txn_unchanged);
47 COVERAGE_DEFINE(txn_incomplete);
48 COVERAGE_DEFINE(txn_aborted);
49 COVERAGE_DEFINE(txn_success);
50 COVERAGE_DEFINE(txn_try_again);
51 COVERAGE_DEFINE(txn_not_locked);
52 COVERAGE_DEFINE(txn_error);
53
54 /* An arc from one idl_row to another.  When row A contains a UUID that
55  * references row B, this is represented by an arc from A (the source) to B
56  * (the destination).
57  *
58  * Arcs from a row to itself are omitted, that is, src and dst are always
59  * different.
60  *
61  * Arcs are never duplicated, that is, even if there are multiple references
62  * from A to B, there is only a single arc from A to B.
63  *
64  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
65  * A.  Both an arc and its converse may both be present, if each row refers
66  * to the other circularly.
67  *
68  * The source and destination row may be in the same table or in different
69  * tables.
70  */
71 struct ovsdb_idl_arc {
72     struct ovs_list src_node;   /* In src->src_arcs list. */
73     struct ovs_list dst_node;   /* In dst->dst_arcs list. */
74     struct ovsdb_idl_row *src;  /* Source row. */
75     struct ovsdb_idl_row *dst;  /* Destination row. */
76 };
77
78 enum ovsdb_idl_state {
79     IDL_S_SCHEMA_REQUESTED,
80     IDL_S_MONITOR_REQUESTED,
81     IDL_S_MONITORING,
82     IDL_S_MONITOR2_REQUESTED,
83     IDL_S_MONITORING2,
84     IDL_S_NO_SCHEMA
85 };
86
87 struct ovsdb_idl {
88     const struct ovsdb_idl_class *class;
89     struct jsonrpc_session *session;
90     struct shash table_by_name;
91     struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
92     unsigned int change_seqno;
93     bool verify_write_only;
94
95     /* Session state. */
96     unsigned int state_seqno;
97     enum ovsdb_idl_state state;
98     struct json *request_id;
99     struct json *schema;
100
101     /* Database locking. */
102     char *lock_name;            /* Name of lock we need, NULL if none. */
103     bool has_lock;              /* Has db server told us we have the lock? */
104     bool is_lock_contended;     /* Has db server told us we can't get lock? */
105     struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
106
107     /* Transaction support. */
108     struct ovsdb_idl_txn *txn;
109     struct hmap outstanding_txns;
110 };
111
112 struct ovsdb_idl_txn {
113     struct hmap_node hmap_node;
114     struct json *request_id;
115     struct ovsdb_idl *idl;
116     struct hmap txn_rows;
117     enum ovsdb_idl_txn_status status;
118     char *error;
119     bool dry_run;
120     struct ds comment;
121
122     /* Increments. */
123     const char *inc_table;
124     const char *inc_column;
125     struct uuid inc_row;
126     unsigned int inc_index;
127     int64_t inc_new_value;
128
129     /* Inserted rows. */
130     struct hmap inserted_rows;  /* Contains "struct ovsdb_idl_txn_insert"s. */
131 };
132
133 struct ovsdb_idl_txn_insert {
134     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
135     struct uuid dummy;          /* Dummy UUID used locally. */
136     int op_index;               /* Index into transaction's operation array. */
137     struct uuid real;           /* Real UUID used by database server. */
138 };
139
140 enum ovsdb_update_version {
141     OVSDB_UPDATE,               /* RFC 7047 "update" method. */
142     OVSDB_UPDATE2               /* "update2" Extension to RFC 7047.
143                                    See ovsdb-server(1) for more information. */
144 };
145
146 /* Name arrays indexed by 'enum ovsdb_update_version'. */
147 static const char *table_updates_names[] = {"table_updates", "table_updates2"};
148 static const char *table_update_names[] = {"table_update", "table_update2"};
149 static const char *row_update_names[] = {"row_update", "row_update2"};
150
151 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
152 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
153
154 static void ovsdb_idl_clear(struct ovsdb_idl *);
155 static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
156 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
157 static void ovsdb_idl_send_monitor2_request(struct ovsdb_idl *);
158 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *,
159                                    enum ovsdb_update_version);
160 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
161                                                     const struct json *,
162                                                     enum ovsdb_update_version);
163 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
164                                      const struct uuid *,
165                                      const struct json *old,
166                                      const struct json *new);
167 static bool ovsdb_idl_process_update2(struct ovsdb_idl_table *,
168                                       const struct uuid *,
169                                       const char *operation,
170                                       const struct json *row);
171 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
172 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
173 static bool ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
174 static bool ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *,
175                                          const struct json *);
176
177 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
178 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
179     const struct ovsdb_idl_table_class *);
180 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
181                                                   const struct uuid *);
182 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
183 static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *);
184
185 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
186 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
187 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
188 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
189 static void ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *, bool destroy_dsts);
190
191 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
192 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
193                                         const struct jsonrpc_msg *msg);
194
195 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
196 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
197 static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
198                                        const struct json *);
199 static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
200                                         const struct json *params,
201                                         bool new_has_lock);
202 static struct ovsdb_idl_table *
203 ovsdb_idl_table_from_class(const struct ovsdb_idl *,
204                            const struct ovsdb_idl_table_class *);
205 static bool ovsdb_idl_track_is_set(struct ovsdb_idl_table *table);
206
207 /* Creates and returns a connection to database 'remote', which should be in a
208  * form acceptable to jsonrpc_session_open().  The connection will maintain an
209  * in-memory replica of the remote database whose schema is described by
210  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
211  * by ovsdb-idlc.)
212  *
213  * Passes 'retry' to jsonrpc_session_open().  See that function for
214  * documentation.
215  *
216  * If 'monitor_everything_by_default' is true, then everything in the remote
217  * database will be replicated by default.  ovsdb_idl_omit() and
218  * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
219  * monitoring.
220  *
221  * If 'monitor_everything_by_default' is false, then no columns or tables will
222  * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
223  * must be used to choose some columns or tables to replicate.
224  */
225 struct ovsdb_idl *
226 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
227                  bool monitor_everything_by_default, bool retry)
228 {
229     struct ovsdb_idl *idl;
230     uint8_t default_mode;
231     size_t i;
232
233     default_mode = (monitor_everything_by_default
234                     ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
235                     : 0);
236
237     idl = xzalloc(sizeof *idl);
238     idl->class = class;
239     idl->session = jsonrpc_session_open(remote, retry);
240     shash_init(&idl->table_by_name);
241     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
242     for (i = 0; i < class->n_tables; i++) {
243         const struct ovsdb_idl_table_class *tc = &class->tables[i];
244         struct ovsdb_idl_table *table = &idl->tables[i];
245         size_t j;
246
247         shash_add_assert(&idl->table_by_name, tc->name, table);
248         table->class = tc;
249         table->modes = xmalloc(tc->n_columns);
250         memset(table->modes, default_mode, tc->n_columns);
251         table->need_table = false;
252         shash_init(&table->columns);
253         for (j = 0; j < tc->n_columns; j++) {
254             const struct ovsdb_idl_column *column = &tc->columns[j];
255
256             shash_add_assert(&table->columns, column->name, column);
257         }
258         hmap_init(&table->rows);
259         ovs_list_init(&table->track_list);
260         table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
261             = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
262             = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
263         table->idl = idl;
264     }
265
266     idl->state_seqno = UINT_MAX;
267     idl->request_id = NULL;
268     idl->schema = NULL;
269
270     hmap_init(&idl->outstanding_txns);
271
272     return idl;
273 }
274
275 /* Destroys 'idl' and all of the data structures that it manages. */
276 void
277 ovsdb_idl_destroy(struct ovsdb_idl *idl)
278 {
279     if (idl) {
280         size_t i;
281
282         ovs_assert(!idl->txn);
283         ovsdb_idl_clear(idl);
284         jsonrpc_session_close(idl->session);
285
286         for (i = 0; i < idl->class->n_tables; i++) {
287             struct ovsdb_idl_table *table = &idl->tables[i];
288             shash_destroy(&table->columns);
289             hmap_destroy(&table->rows);
290             free(table->modes);
291         }
292         shash_destroy(&idl->table_by_name);
293         free(idl->tables);
294         json_destroy(idl->request_id);
295         free(idl->lock_name);
296         json_destroy(idl->lock_request_id);
297         json_destroy(idl->schema);
298         hmap_destroy(&idl->outstanding_txns);
299         free(idl);
300     }
301 }
302
303 static void
304 ovsdb_idl_clear(struct ovsdb_idl *idl)
305 {
306     bool changed = false;
307     size_t i;
308
309     for (i = 0; i < idl->class->n_tables; i++) {
310         struct ovsdb_idl_table *table = &idl->tables[i];
311         struct ovsdb_idl_row *row, *next_row;
312
313         if (hmap_is_empty(&table->rows)) {
314             continue;
315         }
316
317         changed = true;
318         HMAP_FOR_EACH_SAFE (row, next_row, hmap_node, &table->rows) {
319             struct ovsdb_idl_arc *arc, *next_arc;
320
321             if (!ovsdb_idl_row_is_orphan(row)) {
322                 ovsdb_idl_row_unparse(row);
323             }
324             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
325                 free(arc);
326             }
327             /* No need to do anything with dst_arcs: some node has those arcs
328              * as forward arcs and will destroy them itself. */
329
330             if (!ovs_list_is_empty(&row->track_node)) {
331                 ovs_list_remove(&row->track_node);
332             }
333
334             ovsdb_idl_row_destroy(row);
335         }
336     }
337
338     ovsdb_idl_track_clear(idl);
339
340     if (changed) {
341         idl->change_seqno++;
342     }
343 }
344
345 /* Processes a batch of messages from the database server on 'idl'.  This may
346  * cause the IDL's contents to change.  The client may check for that with
347  * ovsdb_idl_get_seqno(). */
348 void
349 ovsdb_idl_run(struct ovsdb_idl *idl)
350 {
351     int i;
352
353     ovs_assert(!idl->txn);
354     jsonrpc_session_run(idl->session);
355     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
356         struct jsonrpc_msg *msg;
357         unsigned int seqno;
358
359         seqno = jsonrpc_session_get_seqno(idl->session);
360         if (idl->state_seqno != seqno) {
361             idl->state_seqno = seqno;
362             json_destroy(idl->request_id);
363             idl->request_id = NULL;
364             ovsdb_idl_txn_abort_all(idl);
365
366             ovsdb_idl_send_schema_request(idl);
367             idl->state = IDL_S_SCHEMA_REQUESTED;
368             if (idl->lock_name) {
369                 ovsdb_idl_send_lock_request(idl);
370             }
371         }
372
373         msg = jsonrpc_session_recv(idl->session);
374         if (!msg) {
375             break;
376         }
377
378         if (msg->type == JSONRPC_NOTIFY
379             && !strcmp(msg->method, "update2")
380             && msg->params->type == JSON_ARRAY
381             && msg->params->u.array.n == 2
382             && msg->params->u.array.elems[0]->type == JSON_NULL) {
383             /* Database contents changed. */
384             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
385                                    OVSDB_UPDATE2);
386         } else if (msg->type == JSONRPC_REPLY
387                    && idl->request_id
388                    && json_equal(idl->request_id, msg->id)) {
389             json_destroy(idl->request_id);
390             idl->request_id = NULL;
391
392             switch (idl->state) {
393             case IDL_S_SCHEMA_REQUESTED:
394                 /* Reply to our "get_schema" request. */
395                 idl->schema = json_clone(msg->result);
396                 ovsdb_idl_send_monitor2_request(idl);
397                 idl->state = IDL_S_MONITOR2_REQUESTED;
398                 break;
399
400             case IDL_S_MONITOR_REQUESTED:
401             case IDL_S_MONITOR2_REQUESTED:
402                 /* Reply to our "monitor" or "monitor2" request. */
403                 idl->change_seqno++;
404                 ovsdb_idl_clear(idl);
405                 if (idl->state == IDL_S_MONITOR_REQUESTED) {
406                     idl->state = IDL_S_MONITORING;
407                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE);
408                 } else { /* IDL_S_MONITOR2_REQUESTED. */
409                     idl->state = IDL_S_MONITORING2;
410                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE2);
411                 }
412
413                 /* Schema is not useful after monitor request is accepted
414                  * by the server.  */
415                 json_destroy(idl->schema);
416                 idl->schema = NULL;
417                 break;
418
419             case IDL_S_MONITORING:
420             case IDL_S_MONITORING2:
421             case IDL_S_NO_SCHEMA:
422             default:
423                 OVS_NOT_REACHED();
424             }
425         } else if (msg->type == JSONRPC_NOTIFY
426             && !strcmp(msg->method, "update")
427             && msg->params->type == JSON_ARRAY
428             && msg->params->u.array.n == 2
429             && msg->params->u.array.elems[0]->type == JSON_NULL) {
430             /* Database contents changed. */
431             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
432                                    OVSDB_UPDATE);
433         } else if (msg->type == JSONRPC_REPLY
434                    && idl->lock_request_id
435                    && json_equal(idl->lock_request_id, msg->id)) {
436             /* Reply to our "lock" request. */
437             ovsdb_idl_parse_lock_reply(idl, msg->result);
438         } else if (msg->type == JSONRPC_NOTIFY
439                    && !strcmp(msg->method, "locked")) {
440             /* We got our lock. */
441             ovsdb_idl_parse_lock_notify(idl, msg->params, true);
442         } else if (msg->type == JSONRPC_NOTIFY
443                    && !strcmp(msg->method, "stolen")) {
444             /* Someone else stole our lock. */
445             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
446         } else if (msg->type == JSONRPC_ERROR
447                    && idl->state == IDL_S_MONITOR2_REQUESTED
448                    && idl->request_id
449                    && json_equal(idl->request_id, msg->id)) {
450             if (msg->error && !strcmp(json_string(msg->error),
451                                       "unknown method")) {
452                 /* Fall back to using "monitor" method.  */
453                 json_destroy(idl->request_id);
454                 idl->request_id = NULL;
455                 ovsdb_idl_send_monitor_request(idl);
456                 idl->state = IDL_S_MONITOR_REQUESTED;
457             }
458         } else if (msg->type == JSONRPC_ERROR
459                    && idl->state == IDL_S_SCHEMA_REQUESTED
460                    && idl->request_id
461                    && json_equal(idl->request_id, msg->id)) {
462                 json_destroy(idl->request_id);
463                 idl->request_id = NULL;
464                 VLOG_ERR("%s: requested schema not found",
465                          jsonrpc_session_get_name(idl->session));
466                 idl->state = IDL_S_NO_SCHEMA;
467         } else if ((msg->type == JSONRPC_ERROR
468                     || msg->type == JSONRPC_REPLY)
469                    && ovsdb_idl_txn_process_reply(idl, msg)) {
470             /* ovsdb_idl_txn_process_reply() did everything needful. */
471         } else {
472             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
473              * a transaction before we receive the reply, so keep the log level
474              * low. */
475             VLOG_DBG("%s: received unexpected %s message",
476                      jsonrpc_session_get_name(idl->session),
477                      jsonrpc_msg_type_to_string(msg->type));
478         }
479         jsonrpc_msg_destroy(msg);
480     }
481     ovsdb_idl_row_destroy_postprocess(idl);
482 }
483
484 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
485  * do or when activity occurs on a transaction on 'idl'. */
486 void
487 ovsdb_idl_wait(struct ovsdb_idl *idl)
488 {
489     jsonrpc_session_wait(idl->session);
490     jsonrpc_session_recv_wait(idl->session);
491 }
492
493 /* Returns a "sequence number" that represents the state of 'idl'.  When
494  * ovsdb_idl_run() changes the database, the sequence number changes.  The
495  * initial fetch of the entire contents of the remote database is considered to
496  * be one kind of change.  Successfully acquiring a lock, if one has been
497  * configured with ovsdb_idl_set_lock(), is also considered to be a change.
498  *
499  * As long as the sequence number does not change, the client may continue to
500  * use any data structures it obtains from 'idl'.  But when it changes, the
501  * client must not access any of these data structures again, because they
502  * could have freed or reused for other purposes.
503  *
504  * The sequence number can occasionally change even if the database does not.
505  * This happens if the connection to the database drops and reconnects, which
506  * causes the database contents to be reloaded even if they didn't change.  (It
507  * could also happen if the database server sends out a "change" that reflects
508  * what the IDL already thought was in the database.  The database server is
509  * not supposed to do that, but bugs could in theory cause it to do so.) */
510 unsigned int
511 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
512 {
513     return idl->change_seqno;
514 }
515
516 /* Returns true if 'idl' successfully connected to the remote database and
517  * retrieved its contents (even if the connection subsequently dropped and is
518  * in the process of reconnecting).  If so, then 'idl' contains an atomic
519  * snapshot of the database's contents (but it might be arbitrarily old if the
520  * connection dropped).
521  *
522  * Returns false if 'idl' has never connected or retrieved the database's
523  * contents.  If so, 'idl' is empty. */
524 bool
525 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
526 {
527     return ovsdb_idl_get_seqno(idl) != 0;
528 }
529
530 /* Reconfigures 'idl' so that it would reconnect to the database, if
531  * connection was dropped. */
532 void
533 ovsdb_idl_enable_reconnect(struct ovsdb_idl *idl)
534 {
535     jsonrpc_session_enable_reconnect(idl->session);
536 }
537
538 /* Forces 'idl' to drop its connection to the database and reconnect.  In the
539  * meantime, the contents of 'idl' will not change. */
540 void
541 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
542 {
543     jsonrpc_session_force_reconnect(idl->session);
544 }
545
546 /* Some IDL users should only write to write-only columns.  Furthermore,
547  * writing to a column which is not write-only can cause serious performance
548  * degradations for these users.  This function causes 'idl' to reject writes
549  * to columns which are not marked write only using ovsdb_idl_omit_alert(). */
550 void
551 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
552 {
553     idl->verify_write_only = true;
554 }
555
556 /* Returns true if 'idl' is currently connected or trying to connect
557  * and a negative response to a schema request has not been received */
558 bool
559 ovsdb_idl_is_alive(const struct ovsdb_idl *idl)
560 {
561     return jsonrpc_session_is_alive(idl->session) &&
562            idl->state != IDL_S_NO_SCHEMA;
563 }
564
565 /* Returns the last error reported on a connection by 'idl'.  The return value
566  * is 0 only if no connection made by 'idl' has ever encountered an error and
567  * a negative response to a schema request has never been received. See
568  * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value
569  * interpretation. */
570 int
571 ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
572 {
573     int err;
574
575     err = jsonrpc_session_get_last_error(idl->session);
576
577     if (err) {
578         return err;
579     } else if (idl->state == IDL_S_NO_SCHEMA) {
580         return ENOENT;
581     } else {
582         return 0;
583     }
584 }
585
586 /* Sets the "probe interval" for 'idl->session' to 'probe_interval', in
587  * milliseconds.
588  */
589 void
590 ovsdb_idl_set_probe_interval(const struct ovsdb_idl *idl, int probe_interval)
591 {
592     jsonrpc_session_set_probe_interval(idl->session, probe_interval);
593 }
594 \f
595 static unsigned char *
596 ovsdb_idl_get_mode(struct ovsdb_idl *idl,
597                    const struct ovsdb_idl_column *column)
598 {
599     size_t i;
600
601     ovs_assert(!idl->change_seqno);
602
603     for (i = 0; i < idl->class->n_tables; i++) {
604         const struct ovsdb_idl_table *table = &idl->tables[i];
605         const struct ovsdb_idl_table_class *tc = table->class;
606
607         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
608             return &table->modes[column - tc->columns];
609         }
610     }
611
612     OVS_NOT_REACHED();
613 }
614
615 static void
616 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
617 {
618     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
619         struct ovsdb_idl_table *table;
620
621         table = shash_find_data(&idl->table_by_name,
622                                 base->u.uuid.refTableName);
623         if (table) {
624             table->need_table = true;
625         } else {
626             VLOG_WARN("%s IDL class missing referenced table %s",
627                       idl->class->database, base->u.uuid.refTableName);
628         }
629     }
630 }
631
632 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
633  * ensures that any tables referenced by 'column' will be replicated, even if
634  * no columns in that table are selected for replication (see
635  * ovsdb_idl_add_table() for more information).
636  *
637  * This function is only useful if 'monitor_everything_by_default' was false in
638  * the call to ovsdb_idl_create().  This function should be called between
639  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
640  */
641 void
642 ovsdb_idl_add_column(struct ovsdb_idl *idl,
643                      const struct ovsdb_idl_column *column)
644 {
645     *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
646     add_ref_table(idl, &column->type.key);
647     add_ref_table(idl, &column->type.value);
648 }
649
650 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
651  * no columns are selected for replication. Just the necessary data for table
652  * references will be replicated (the UUID of the rows, for instance), any
653  * columns not selected for replication will remain unreplicated.
654  * This can be useful because it allows 'idl' to keep track of what rows in the
655  * table actually exist, which in turn allows columns that reference the table
656  * to have accurate contents. (The IDL presents the database with references to
657  * rows that do not exist removed.)
658  *
659  * This function is only useful if 'monitor_everything_by_default' was false in
660  * the call to ovsdb_idl_create().  This function should be called between
661  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
662  */
663 void
664 ovsdb_idl_add_table(struct ovsdb_idl *idl,
665                     const struct ovsdb_idl_table_class *tc)
666 {
667     size_t i;
668
669     for (i = 0; i < idl->class->n_tables; i++) {
670         struct ovsdb_idl_table *table = &idl->tables[i];
671
672         if (table->class == tc) {
673             table->need_table = true;
674             return;
675         }
676     }
677
678     OVS_NOT_REACHED();
679 }
680
681 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
682  *
683  * This function should be called between ovsdb_idl_create() and the first call
684  * to ovsdb_idl_run().
685  */
686 void
687 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
688                      const struct ovsdb_idl_column *column)
689 {
690     *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
691 }
692
693 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
694  * OVSDB_IDL_MONITOR for details.
695  *
696  * This function should be called between ovsdb_idl_create() and the first call
697  * to ovsdb_idl_run().
698  */
699 void
700 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
701 {
702     *ovsdb_idl_get_mode(idl, column) = 0;
703 }
704
705 /* Returns the most recent IDL change sequence number that caused a
706  * insert, modify or delete update to the table with class 'table_class'.
707  */
708 unsigned int
709 ovsdb_idl_table_get_seqno(const struct ovsdb_idl *idl,
710                           const struct ovsdb_idl_table_class *table_class)
711 {
712     struct ovsdb_idl_table *table
713         = ovsdb_idl_table_from_class(idl, table_class);
714     unsigned int max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_INSERT];
715
716     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]) {
717         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY];
718     }
719     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_DELETE]) {
720         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_DELETE];
721     }
722     return max_seqno;
723 }
724
725 /* For each row that contains tracked columns, IDL stores the most
726  * recent IDL change sequence numbers associateed with insert, modify
727  * and delete updates to the table.
728  */
729 unsigned int
730 ovsdb_idl_row_get_seqno(const struct ovsdb_idl_row *row,
731                         enum ovsdb_idl_change change)
732 {
733     return row->change_seqno[change];
734 }
735
736 /* Turns on OVSDB_IDL_TRACK for 'column' in 'idl', ensuring that
737  * all rows whose 'column' is modified are traced. Similarly, insert
738  * or delete of rows having 'column' are tracked. Clients are able
739  * to retrive the tracked rows with the ovsdb_idl_track_get_*()
740  * functions.
741  *
742  * This function should be called between ovsdb_idl_create() and
743  * the first call to ovsdb_idl_run(). The column to be tracked
744  * should have OVSDB_IDL_ALERT turned on.
745  */
746 void
747 ovsdb_idl_track_add_column(struct ovsdb_idl *idl,
748                            const struct ovsdb_idl_column *column)
749 {
750     if (!(*ovsdb_idl_get_mode(idl, column) & OVSDB_IDL_ALERT)) {
751         ovsdb_idl_add_column(idl, column);
752     }
753     *ovsdb_idl_get_mode(idl, column) |= OVSDB_IDL_TRACK;
754 }
755
756 void
757 ovsdb_idl_track_add_all(struct ovsdb_idl *idl)
758 {
759     size_t i, j;
760
761     for (i = 0; i < idl->class->n_tables; i++) {
762         const struct ovsdb_idl_table_class *tc = &idl->class->tables[i];
763
764         for (j = 0; j < tc->n_columns; j++) {
765             const struct ovsdb_idl_column *column = &tc->columns[j];
766             ovsdb_idl_track_add_column(idl, column);
767         }
768     }
769 }
770
771 /* Returns true if 'table' has any tracked column. */
772 static bool
773 ovsdb_idl_track_is_set(struct ovsdb_idl_table *table)
774 {
775     size_t i;
776
777     for (i = 0; i < table->class->n_columns; i++) {
778         if (table->modes[i] & OVSDB_IDL_TRACK) {
779             return true;
780         }
781     }
782    return false;
783 }
784
785 /* Returns the first tracked row in table with class 'table_class'
786  * for the specified 'idl'. Returns NULL if there are no tracked rows */
787 const struct ovsdb_idl_row *
788 ovsdb_idl_track_get_first(const struct ovsdb_idl *idl,
789                           const struct ovsdb_idl_table_class *table_class)
790 {
791     struct ovsdb_idl_table *table
792         = ovsdb_idl_table_from_class(idl, table_class);
793
794     if (!ovs_list_is_empty(&table->track_list)) {
795         return CONTAINER_OF(ovs_list_front(&table->track_list), struct ovsdb_idl_row, track_node);
796     }
797     return NULL;
798 }
799
800 /* Returns the next tracked row in table after the specified 'row'
801  * (in no particular order). Returns NULL if there are no tracked rows */
802 const struct ovsdb_idl_row *
803 ovsdb_idl_track_get_next(const struct ovsdb_idl_row *row)
804 {
805     if (row->track_node.next != &row->table->track_list) {
806         return CONTAINER_OF(row->track_node.next, struct ovsdb_idl_row, track_node);
807     }
808
809     return NULL;
810 }
811
812 /* Returns true if a tracked 'column' in 'row' was updated by IDL, false
813  * otherwise. The tracking data is cleared by ovsdb_idl_track_clear()
814  *
815  * Function returns false if 'column' is not tracked (see
816  * ovsdb_idl_track_add_column()).
817  */
818 bool
819 ovsdb_idl_track_is_updated(const struct ovsdb_idl_row *row,
820                            const struct ovsdb_idl_column *column)
821 {
822     const struct ovsdb_idl_table_class *class;
823     size_t column_idx;
824
825     class = row->table->class;
826     column_idx = column - class->columns;
827
828     if (row->updated && bitmap_is_set(row->updated, column_idx)) {
829         return true;
830     } else {
831         return false;
832     }
833 }
834
835 /* Flushes the tracked rows. Client calls this function after calling
836  * ovsdb_idl_run() and read all tracked rows with the ovsdb_idl_track_get_*()
837  * functions. This is usually done at the end of the client's processing
838  * loop when it is ready to do ovsdb_idl_run() again.
839  */
840 void
841 ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
842 {
843     size_t i;
844
845     for (i = 0; i < idl->class->n_tables; i++) {
846         struct ovsdb_idl_table *table = &idl->tables[i];
847
848         if (!ovs_list_is_empty(&table->track_list)) {
849             struct ovsdb_idl_row *row, *next;
850
851             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
852                 if (row->updated) {
853                     free(row->updated);
854                     row->updated = NULL;
855                 }
856                 ovs_list_remove(&row->track_node);
857                 ovs_list_init(&row->track_node);
858                 if (ovsdb_idl_row_is_orphan(row)) {
859                     ovsdb_idl_row_clear_old(row);
860                     free(row);
861                 }
862             }
863         }
864     }
865 }
866
867 \f
868 static void
869 ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
870 {
871     struct jsonrpc_msg *msg;
872
873     json_destroy(idl->request_id);
874     msg = jsonrpc_create_request(
875         "get_schema",
876         json_array_create_1(json_string_create(idl->class->database)),
877         &idl->request_id);
878     jsonrpc_session_send(idl->session, msg);
879 }
880
881 static void
882 log_error(struct ovsdb_error *error)
883 {
884     char *s = ovsdb_error_to_string(error);
885     VLOG_WARN("error parsing database schema: %s", s);
886     free(s);
887     ovsdb_error_destroy(error);
888 }
889
890 /* Frees 'schema', which is in the format returned by parse_schema(). */
891 static void
892 free_schema(struct shash *schema)
893 {
894     if (schema) {
895         struct shash_node *node, *next;
896
897         SHASH_FOR_EACH_SAFE (node, next, schema) {
898             struct sset *sset = node->data;
899             sset_destroy(sset);
900             free(sset);
901             shash_delete(schema, node);
902         }
903         shash_destroy(schema);
904         free(schema);
905     }
906 }
907
908 /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
909  * 7047, to obtain the names of its rows and columns.  If successful, returns
910  * an shash whose keys are table names and whose values are ssets, where each
911  * sset contains the names of its table's columns.  On failure (due to a parse
912  * error), returns NULL.
913  *
914  * It would also be possible to use the general-purpose OVSDB schema parser in
915  * ovsdb-server, but that's overkill, possibly too strict for the current use
916  * case, and would require restructuring ovsdb-server to separate the schema
917  * code from the rest. */
918 static struct shash *
919 parse_schema(const struct json *schema_json)
920 {
921     struct ovsdb_parser parser;
922     const struct json *tables_json;
923     struct ovsdb_error *error;
924     struct shash_node *node;
925     struct shash *schema;
926
927     ovsdb_parser_init(&parser, schema_json, "database schema");
928     tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
929     error = ovsdb_parser_destroy(&parser);
930     if (error) {
931         log_error(error);
932         return NULL;
933     }
934
935     schema = xmalloc(sizeof *schema);
936     shash_init(schema);
937     SHASH_FOR_EACH (node, json_object(tables_json)) {
938         const char *table_name = node->name;
939         const struct json *json = node->data;
940         const struct json *columns_json;
941
942         ovsdb_parser_init(&parser, json, "table schema for table %s",
943                           table_name);
944         columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
945         error = ovsdb_parser_destroy(&parser);
946         if (error) {
947             log_error(error);
948             free_schema(schema);
949             return NULL;
950         }
951
952         struct sset *columns = xmalloc(sizeof *columns);
953         sset_init(columns);
954
955         struct shash_node *node2;
956         SHASH_FOR_EACH (node2, json_object(columns_json)) {
957             const char *column_name = node2->name;
958             sset_add(columns, column_name);
959         }
960         shash_add(schema, table_name, columns);
961     }
962     return schema;
963 }
964
965 static void
966 ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
967                                  const char *method)
968 {
969     struct shash *schema;
970     struct json *monitor_requests;
971     struct jsonrpc_msg *msg;
972     size_t i;
973
974     schema = parse_schema(idl->schema);
975     monitor_requests = json_object_create();
976     for (i = 0; i < idl->class->n_tables; i++) {
977         const struct ovsdb_idl_table *table = &idl->tables[i];
978         const struct ovsdb_idl_table_class *tc = table->class;
979         struct json *monitor_request, *columns;
980         const struct sset *table_schema;
981         size_t j;
982
983         table_schema = (schema
984                         ? shash_find_data(schema, table->class->name)
985                         : NULL);
986
987         columns = table->need_table ? json_array_create_empty() : NULL;
988         for (j = 0; j < tc->n_columns; j++) {
989             const struct ovsdb_idl_column *column = &tc->columns[j];
990             if (table->modes[j] & OVSDB_IDL_MONITOR) {
991                 if (table_schema
992                     && !sset_contains(table_schema, column->name)) {
993                     VLOG_WARN("%s table in %s database lacks %s column "
994                               "(database needs upgrade?)",
995                               table->class->name, idl->class->database,
996                               column->name);
997                     continue;
998                 }
999                 if (!columns) {
1000                     columns = json_array_create_empty();
1001                 }
1002                 json_array_add(columns, json_string_create(column->name));
1003             }
1004         }
1005
1006         if (columns) {
1007             if (schema && !table_schema) {
1008                 VLOG_WARN("%s database lacks %s table "
1009                           "(database needs upgrade?)",
1010                           idl->class->database, table->class->name);
1011                 json_destroy(columns);
1012                 continue;
1013             }
1014
1015             monitor_request = json_object_create();
1016             json_object_put(monitor_request, "columns", columns);
1017             json_object_put(monitor_requests, tc->name, monitor_request);
1018         }
1019     }
1020     free_schema(schema);
1021
1022     json_destroy(idl->request_id);
1023     msg = jsonrpc_create_request(
1024         method,
1025         json_array_create_3(json_string_create(idl->class->database),
1026                             json_null_create(), monitor_requests),
1027         &idl->request_id);
1028     jsonrpc_session_send(idl->session, msg);
1029 }
1030
1031 static void
1032 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
1033 {
1034     ovsdb_idl_send_monitor_request__(idl, "monitor");
1035 }
1036
1037 static void
1038 log_parse_update_error(struct ovsdb_error *error)
1039 {
1040         if (!VLOG_DROP_WARN(&syntax_rl)) {
1041             char *s = ovsdb_error_to_string(error);
1042             VLOG_WARN_RL(&syntax_rl, "%s", s);
1043             free(s);
1044         }
1045         ovsdb_error_destroy(error);
1046 }
1047
1048 static void
1049 ovsdb_idl_send_monitor2_request(struct ovsdb_idl *idl)
1050 {
1051     ovsdb_idl_send_monitor_request__(idl, "monitor2");
1052 }
1053
1054 static void
1055 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates,
1056                        enum ovsdb_update_version version)
1057 {
1058     struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates,
1059                                                          version);
1060     if (error) {
1061         log_parse_update_error(error);
1062     }
1063 }
1064
1065 static struct ovsdb_error *
1066 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
1067                          const struct json *table_updates,
1068                          enum ovsdb_update_version version)
1069 {
1070     const struct shash_node *tables_node;
1071     const char *table_updates_name = table_updates_names[version];
1072     const char *table_update_name = table_update_names[version];
1073     const char *row_update_name = row_update_names[version];
1074
1075     if (table_updates->type != JSON_OBJECT) {
1076         return ovsdb_syntax_error(table_updates, NULL,
1077                                   "<%s> is not an object",
1078                                   table_updates_name);
1079     }
1080
1081     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
1082         const struct json *table_update = tables_node->data;
1083         const struct shash_node *table_node;
1084         struct ovsdb_idl_table *table;
1085
1086         table = shash_find_data(&idl->table_by_name, tables_node->name);
1087         if (!table) {
1088             return ovsdb_syntax_error(
1089                 table_updates, NULL,
1090                 "<%s> includes unknown table \"%s\"",
1091                 table_updates_name,
1092                 tables_node->name);
1093         }
1094
1095         if (table_update->type != JSON_OBJECT) {
1096             return ovsdb_syntax_error(table_update, NULL,
1097                                       "<%s> for table \"%s\" is "
1098                                       "not an object",
1099                                       table_update_name,
1100                                       table->class->name);
1101         }
1102         SHASH_FOR_EACH (table_node, json_object(table_update)) {
1103             const struct json *row_update = table_node->data;
1104             const struct json *old_json, *new_json;
1105             struct uuid uuid;
1106
1107             if (!uuid_from_string(&uuid, table_node->name)) {
1108                 return ovsdb_syntax_error(table_update, NULL,
1109                                           "<%s> for table \"%s\" "
1110                                           "contains bad UUID "
1111                                           "\"%s\" as member name",
1112                                           table_update_name,
1113                                           table->class->name,
1114                                           table_node->name);
1115             }
1116             if (row_update->type != JSON_OBJECT) {
1117                 return ovsdb_syntax_error(row_update, NULL,
1118                                           "<%s> for table \"%s\" "
1119                                           "contains <%s> for %s that "
1120                                           "is not an object",
1121                                           table_update_name,
1122                                           table->class->name,
1123                                           row_update_name,
1124                                           table_node->name);
1125             }
1126
1127             switch(version) {
1128             case OVSDB_UPDATE:
1129                 old_json = shash_find_data(json_object(row_update), "old");
1130                 new_json = shash_find_data(json_object(row_update), "new");
1131                 if (old_json && old_json->type != JSON_OBJECT) {
1132                     return ovsdb_syntax_error(old_json, NULL,
1133                                               "\"old\" <row> is not object");
1134                 } else if (new_json && new_json->type != JSON_OBJECT) {
1135                     return ovsdb_syntax_error(new_json, NULL,
1136                                               "\"new\" <row> is not object");
1137                 } else if ((old_json != NULL) + (new_json != NULL)
1138                            != shash_count(json_object(row_update))) {
1139                     return ovsdb_syntax_error(row_update, NULL,
1140                                               "<row-update> contains "
1141                                               "unexpected member");
1142                 } else if (!old_json && !new_json) {
1143                     return ovsdb_syntax_error(row_update, NULL,
1144                                               "<row-update> missing \"old\" "
1145                                               "and \"new\" members");
1146                 }
1147
1148                 if (ovsdb_idl_process_update(table, &uuid, old_json,
1149                                              new_json)) {
1150                     idl->change_seqno++;
1151                 }
1152                 break;
1153
1154             case OVSDB_UPDATE2: {
1155                 const char *ops[] = {"modify", "insert", "delete", "initial"};
1156                 const char *operation;
1157                 const struct json *row;
1158                 int i;
1159
1160                 for (i = 0; i < ARRAY_SIZE(ops); i++) {
1161                     operation = ops[i];
1162                     row = shash_find_data(json_object(row_update), operation);
1163
1164                     if (row)  {
1165                         if (ovsdb_idl_process_update2(table, &uuid, operation,
1166                                                       row)) {
1167                             idl->change_seqno++;
1168                         }
1169                         break;
1170                     }
1171                 }
1172
1173                 /* row_update2 should contain one of the objects */
1174                 if (i == ARRAY_SIZE(ops)) {
1175                     return ovsdb_syntax_error(row_update, NULL,
1176                                               "<row_update2> includes unknown "
1177                                               "object");
1178                 }
1179                 break;
1180             }
1181
1182             default:
1183                 OVS_NOT_REACHED();
1184             }
1185         }
1186     }
1187
1188     return NULL;
1189 }
1190
1191 static struct ovsdb_idl_row *
1192 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
1193 {
1194     struct ovsdb_idl_row *row;
1195
1196     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &table->rows) {
1197         if (uuid_equals(&row->uuid, uuid)) {
1198             return row;
1199         }
1200     }
1201     return NULL;
1202 }
1203
1204 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1205  * otherwise. */
1206 static bool
1207 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
1208                          const struct uuid *uuid, const struct json *old,
1209                          const struct json *new)
1210 {
1211     struct ovsdb_idl_row *row;
1212
1213     row = ovsdb_idl_get_row(table, uuid);
1214     if (!new) {
1215         /* Delete row. */
1216         if (row && !ovsdb_idl_row_is_orphan(row)) {
1217             /* XXX perhaps we should check the 'old' values? */
1218             ovsdb_idl_delete_row(row);
1219         } else {
1220             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1221                          "from table %s",
1222                          UUID_ARGS(uuid), table->class->name);
1223             return false;
1224         }
1225     } else if (!old) {
1226         /* Insert row. */
1227         if (!row) {
1228             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1229         } else if (ovsdb_idl_row_is_orphan(row)) {
1230             ovsdb_idl_insert_row(row, new);
1231         } else {
1232             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1233                          "table %s", UUID_ARGS(uuid), table->class->name);
1234             return ovsdb_idl_modify_row(row, new);
1235         }
1236     } else {
1237         /* Modify row. */
1238         if (row) {
1239             /* XXX perhaps we should check the 'old' values? */
1240             if (!ovsdb_idl_row_is_orphan(row)) {
1241                 return ovsdb_idl_modify_row(row, new);
1242             } else {
1243                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1244                              "referenced row "UUID_FMT" in table %s",
1245                              UUID_ARGS(uuid), table->class->name);
1246                 ovsdb_idl_insert_row(row, new);
1247             }
1248         } else {
1249             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1250                          "in table %s", UUID_ARGS(uuid), table->class->name);
1251             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1252         }
1253     }
1254
1255     return true;
1256 }
1257
1258 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1259  * otherwise. */
1260 static bool
1261 ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
1262                           const struct uuid *uuid,
1263                           const char *operation,
1264                           const struct json *json_row)
1265 {
1266     struct ovsdb_idl_row *row;
1267
1268     row = ovsdb_idl_get_row(table, uuid);
1269     if (!strcmp(operation, "delete")) {
1270         /* Delete row. */
1271         if (row && !ovsdb_idl_row_is_orphan(row)) {
1272             ovsdb_idl_delete_row(row);
1273         } else {
1274             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1275                          "from table %s",
1276                          UUID_ARGS(uuid), table->class->name);
1277             return false;
1278         }
1279     } else if (!strcmp(operation, "insert") || !strcmp(operation, "initial")) {
1280         /* Insert row. */
1281         if (!row) {
1282             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), json_row);
1283         } else if (ovsdb_idl_row_is_orphan(row)) {
1284             ovsdb_idl_insert_row(row, json_row);
1285         } else {
1286             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1287                          "table %s", UUID_ARGS(uuid), table->class->name);
1288             ovsdb_idl_delete_row(row);
1289             ovsdb_idl_insert_row(row, json_row);
1290         }
1291     } else if (!strcmp(operation, "modify")) {
1292         /* Modify row. */
1293         if (row) {
1294             if (!ovsdb_idl_row_is_orphan(row)) {
1295                 return ovsdb_idl_modify_row_by_diff(row, json_row);
1296             } else {
1297                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1298                              "referenced row "UUID_FMT" in table %s",
1299                              UUID_ARGS(uuid), table->class->name);
1300                 return false;
1301             }
1302         } else {
1303             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1304                          "in table %s", UUID_ARGS(uuid), table->class->name);
1305             return false;
1306         }
1307     } else {
1308             VLOG_WARN_RL(&semantic_rl, "unknown operation %s to "
1309                          "table %s", operation, table->class->name);
1310             return false;
1311     }
1312
1313     return true;
1314 }
1315
1316 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1317  * otherwise.
1318  *
1319  * Change 'row' either with the content of 'row_json' or by apply 'diff'.
1320  * Caller needs to provide either valid 'row_json' or 'diff', but not
1321  * both.  */
1322 static bool
1323 ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
1324                        const struct json *diff_json,
1325                        enum ovsdb_idl_change change)
1326 {
1327     struct ovsdb_idl_table *table = row->table;
1328     const struct ovsdb_idl_table_class *class = table->class;
1329     struct shash_node *node;
1330     bool changed = false;
1331     bool apply_diff = diff_json != NULL;
1332     const struct json *json = apply_diff ? diff_json : row_json;
1333
1334     SHASH_FOR_EACH (node, json_object(json)) {
1335         const char *column_name = node->name;
1336         const struct ovsdb_idl_column *column;
1337         struct ovsdb_datum datum;
1338         struct ovsdb_error *error;
1339         unsigned int column_idx;
1340         struct ovsdb_datum *old;
1341
1342         column = shash_find_data(&table->columns, column_name);
1343         if (!column) {
1344             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
1345                          column_name, UUID_ARGS(&row->uuid));
1346             continue;
1347         }
1348
1349         column_idx = column - table->class->columns;
1350         old = &row->old[column_idx];
1351
1352         error = NULL;
1353         if (apply_diff) {
1354             struct ovsdb_datum diff;
1355
1356             ovs_assert(!row_json);
1357             error = ovsdb_transient_datum_from_json(&diff, &column->type,
1358                                                     node->data);
1359             if (!error) {
1360                 error = ovsdb_datum_apply_diff(&datum, old, &diff,
1361                                                &column->type);
1362                 ovsdb_datum_destroy(&diff, &column->type);
1363             }
1364         } else {
1365             ovs_assert(!diff_json);
1366             error = ovsdb_datum_from_json(&datum, &column->type, node->data,
1367                                           NULL);
1368         }
1369
1370         if (!error) {
1371             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
1372                 ovsdb_datum_swap(old, &datum);
1373                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
1374                     changed = true;
1375                     row->change_seqno[change]
1376                         = row->table->change_seqno[change]
1377                         = row->table->idl->change_seqno + 1;
1378                     if (table->modes[column_idx] & OVSDB_IDL_TRACK) {
1379                         if (!ovs_list_is_empty(&row->track_node)) {
1380                             ovs_list_remove(&row->track_node);
1381                         }
1382                         ovs_list_push_back(&row->table->track_list,
1383                                        &row->track_node);
1384                         if (!row->updated) {
1385                             row->updated = bitmap_allocate(class->n_columns);
1386                         }
1387                         bitmap_set1(row->updated, column_idx);
1388                     }
1389                 }
1390             } else {
1391                 /* Didn't really change but the OVSDB monitor protocol always
1392                  * includes every value in a row. */
1393             }
1394
1395             ovsdb_datum_destroy(&datum, &column->type);
1396         } else {
1397             char *s = ovsdb_error_to_string(error);
1398             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
1399                          " in table %s: %s", column_name,
1400                          UUID_ARGS(&row->uuid), table->class->name, s);
1401             free(s);
1402             ovsdb_error_destroy(error);
1403         }
1404     }
1405     return changed;
1406 }
1407
1408 static bool
1409 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json,
1410                      enum ovsdb_idl_change change)
1411 {
1412     return ovsdb_idl_row_change__(row, row_json, NULL, change);
1413 }
1414
1415 static bool
1416 ovsdb_idl_row_apply_diff(struct ovsdb_idl_row *row,
1417                          const struct json *diff_json,
1418                          enum ovsdb_idl_change change)
1419 {
1420     return ovsdb_idl_row_change__(row, NULL, diff_json, change);
1421 }
1422
1423 /* When a row A refers to row B through a column with a "refTable" constraint,
1424  * but row B does not exist, row B is called an "orphan row".  Orphan rows
1425  * should not persist, because the database enforces referential integrity, but
1426  * they can appear transiently as changes from the database are received (the
1427  * database doesn't try to topologically sort them and circular references mean
1428  * it isn't always possible anyhow).
1429  *
1430  * This function returns true if 'row' is an orphan row, otherwise false.
1431  */
1432 static bool
1433 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
1434 {
1435     return !row->old && !row->new;
1436 }
1437
1438 /* Returns true if 'row' is conceptually part of the database as modified by
1439  * the current transaction (if any), false otherwise.
1440  *
1441  * This function will return true if 'row' is not an orphan (see the comment on
1442  * ovsdb_idl_row_is_orphan()) and:
1443  *
1444  *   - 'row' exists in the database and has not been deleted within the
1445  *     current transaction (if any).
1446  *
1447  *   - 'row' was inserted within the current transaction and has not been
1448  *     deleted.  (In the latter case you should not have passed 'row' in at
1449  *     all, because ovsdb_idl_txn_delete() freed it.)
1450  *
1451  * This function will return false if 'row' is an orphan or if 'row' was
1452  * deleted within the current transaction.
1453  */
1454 static bool
1455 ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
1456 {
1457     return row->new != NULL;
1458 }
1459
1460 static void
1461 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
1462 {
1463     const struct ovsdb_idl_table_class *class = row->table->class;
1464     size_t i;
1465
1466     for (i = 0; i < class->n_columns; i++) {
1467         const struct ovsdb_idl_column *c = &class->columns[i];
1468         (c->parse)(row, &row->old[i]);
1469     }
1470 }
1471
1472 static void
1473 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
1474 {
1475     const struct ovsdb_idl_table_class *class = row->table->class;
1476     size_t i;
1477
1478     for (i = 0; i < class->n_columns; i++) {
1479         const struct ovsdb_idl_column *c = &class->columns[i];
1480         (c->unparse)(row);
1481     }
1482 }
1483
1484 static void
1485 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
1486 {
1487     ovs_assert(row->old == row->new);
1488     if (!ovsdb_idl_row_is_orphan(row)) {
1489         const struct ovsdb_idl_table_class *class = row->table->class;
1490         size_t i;
1491
1492         for (i = 0; i < class->n_columns; i++) {
1493             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
1494         }
1495         free(row->old);
1496         row->old = row->new = NULL;
1497     }
1498 }
1499
1500 static void
1501 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
1502 {
1503     if (row->old != row->new) {
1504         if (row->new) {
1505             const struct ovsdb_idl_table_class *class = row->table->class;
1506             size_t i;
1507
1508             if (row->written) {
1509                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
1510                     ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
1511                 }
1512             }
1513             free(row->new);
1514             free(row->written);
1515             row->written = NULL;
1516         }
1517         row->new = row->old;
1518     }
1519 }
1520
1521 static void
1522 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
1523 {
1524     struct ovsdb_idl_arc *arc, *next;
1525
1526     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
1527      * that this causes to be unreferenced, if tracking is not enabled.
1528      * If tracking is enabled, orphaned nodes are removed from hmap but not
1529      * freed.
1530      */
1531     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
1532         ovs_list_remove(&arc->dst_node);
1533         if (destroy_dsts
1534             && ovsdb_idl_row_is_orphan(arc->dst)
1535             && ovs_list_is_empty(&arc->dst->dst_arcs)) {
1536             ovsdb_idl_row_destroy(arc->dst);
1537         }
1538         free(arc);
1539     }
1540     ovs_list_init(&row->src_arcs);
1541 }
1542
1543 /* Force nodes that reference 'row' to reparse. */
1544 static void
1545 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
1546 {
1547     struct ovsdb_idl_arc *arc, *next;
1548
1549     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
1550      * 'arc', so we need to use the "safe" variant of list traversal.  However,
1551      * calling an ovsdb_idl_column's 'parse' function will add an arc
1552      * equivalent to 'arc' to row->arcs.  That could be a problem for
1553      * traversal, but it adds it at the beginning of the list to prevent us
1554      * from stumbling upon it again.
1555      *
1556      * (If duplicate arcs were possible then we would need to make sure that
1557      * 'next' didn't also point into 'arc''s destination, but we forbid
1558      * duplicate arcs.) */
1559     LIST_FOR_EACH_SAFE (arc, next, dst_node, &row->dst_arcs) {
1560         struct ovsdb_idl_row *ref = arc->src;
1561
1562         ovsdb_idl_row_unparse(ref);
1563         ovsdb_idl_row_clear_arcs(ref, false);
1564         ovsdb_idl_row_parse(ref);
1565     }
1566 }
1567
1568 static struct ovsdb_idl_row *
1569 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
1570 {
1571     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
1572     class->row_init(row);
1573     ovs_list_init(&row->src_arcs);
1574     ovs_list_init(&row->dst_arcs);
1575     hmap_node_nullify(&row->txn_node);
1576     ovs_list_init(&row->track_node);
1577     return row;
1578 }
1579
1580 static struct ovsdb_idl_row *
1581 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
1582 {
1583     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
1584     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
1585     row->uuid = *uuid;
1586     row->table = table;
1587     return row;
1588 }
1589
1590 static void
1591 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
1592 {
1593     if (row) {
1594         ovsdb_idl_row_clear_old(row);
1595         hmap_remove(&row->table->rows, &row->hmap_node);
1596         if (ovsdb_idl_track_is_set(row->table)) {
1597             row->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1598                 = row->table->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1599                 = row->table->idl->change_seqno + 1;
1600         }
1601         if (!ovs_list_is_empty(&row->track_node)) {
1602             ovs_list_remove(&row->track_node);
1603         }
1604         ovs_list_push_back(&row->table->track_list, &row->track_node);
1605     }
1606 }
1607
1608 static void
1609 ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
1610 {
1611     size_t i;
1612
1613     for (i = 0; i < idl->class->n_tables; i++) {
1614         struct ovsdb_idl_table *table = &idl->tables[i];
1615
1616         if (!ovs_list_is_empty(&table->track_list)) {
1617             struct ovsdb_idl_row *row, *next;
1618
1619             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
1620                 if (!ovsdb_idl_track_is_set(row->table)) {
1621                     ovs_list_remove(&row->track_node);
1622                     free(row);
1623                 }
1624             }
1625         }
1626     }
1627 }
1628
1629 static void
1630 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
1631 {
1632     const struct ovsdb_idl_table_class *class = row->table->class;
1633     size_t i;
1634
1635     ovs_assert(!row->old && !row->new);
1636     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
1637     for (i = 0; i < class->n_columns; i++) {
1638         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
1639     }
1640     ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_INSERT);
1641     ovsdb_idl_row_parse(row);
1642
1643     ovsdb_idl_row_reparse_backrefs(row);
1644 }
1645
1646 static void
1647 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
1648 {
1649     ovsdb_idl_row_unparse(row);
1650     ovsdb_idl_row_clear_arcs(row, true);
1651     ovsdb_idl_row_clear_old(row);
1652     if (ovs_list_is_empty(&row->dst_arcs)) {
1653         ovsdb_idl_row_destroy(row);
1654     } else {
1655         ovsdb_idl_row_reparse_backrefs(row);
1656     }
1657 }
1658
1659 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1660  * otherwise. */
1661 static bool
1662 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
1663 {
1664     bool changed;
1665
1666     ovsdb_idl_row_unparse(row);
1667     ovsdb_idl_row_clear_arcs(row, true);
1668     changed = ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_MODIFY);
1669     ovsdb_idl_row_parse(row);
1670
1671     return changed;
1672 }
1673
1674 static bool
1675 ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *row,
1676                              const struct json *diff_json)
1677 {
1678     bool changed;
1679
1680     ovsdb_idl_row_unparse(row);
1681     ovsdb_idl_row_clear_arcs(row, true);
1682     changed = ovsdb_idl_row_apply_diff(row, diff_json,
1683                                        OVSDB_IDL_CHANGE_MODIFY);
1684     ovsdb_idl_row_parse(row);
1685
1686     return changed;
1687 }
1688
1689 static bool
1690 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
1691 {
1692     const struct ovsdb_idl_arc *arc;
1693
1694     /* No self-arcs. */
1695     if (src == dst) {
1696         return false;
1697     }
1698
1699     /* No duplicate arcs.
1700      *
1701      * We only need to test whether the first arc in dst->dst_arcs originates
1702      * at 'src', since we add all of the arcs from a given source in a clump
1703      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
1704      * added at the front of the dst_arcs list. */
1705     if (ovs_list_is_empty(&dst->dst_arcs)) {
1706         return true;
1707     }
1708     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
1709     return arc->src != src;
1710 }
1711
1712 static struct ovsdb_idl_table *
1713 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
1714                            const struct ovsdb_idl_table_class *table_class)
1715 {
1716     return &idl->tables[table_class - idl->class->tables];
1717 }
1718
1719 /* Called by ovsdb-idlc generated code. */
1720 struct ovsdb_idl_row *
1721 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
1722                       struct ovsdb_idl_table_class *dst_table_class,
1723                       const struct uuid *dst_uuid)
1724 {
1725     struct ovsdb_idl *idl = src->table->idl;
1726     struct ovsdb_idl_table *dst_table;
1727     struct ovsdb_idl_arc *arc;
1728     struct ovsdb_idl_row *dst;
1729
1730     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
1731     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
1732     if (idl->txn) {
1733         /* We're being called from ovsdb_idl_txn_write().  We must not update
1734          * any arcs, because the transaction will be backed out at commit or
1735          * abort time and we don't want our graph screwed up.
1736          *
1737          * Just return the destination row, if there is one and it has not been
1738          * deleted. */
1739         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
1740             return dst;
1741         }
1742         return NULL;
1743     } else {
1744         /* We're being called from some other context.  Update the graph. */
1745         if (!dst) {
1746             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
1747         }
1748
1749         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
1750         if (may_add_arc(src, dst)) {
1751             /* The arc *must* be added at the front of the dst_arcs list.  See
1752              * ovsdb_idl_row_reparse_backrefs() for details. */
1753             arc = xmalloc(sizeof *arc);
1754             ovs_list_push_front(&src->src_arcs, &arc->src_node);
1755             ovs_list_push_front(&dst->dst_arcs, &arc->dst_node);
1756             arc->src = src;
1757             arc->dst = dst;
1758         }
1759
1760         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
1761     }
1762 }
1763
1764 /* Searches 'tc''s table in 'idl' for a row with UUID 'uuid'.  Returns a
1765  * pointer to the row if there is one, otherwise a null pointer.  */
1766 const struct ovsdb_idl_row *
1767 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
1768                            const struct ovsdb_idl_table_class *tc,
1769                            const struct uuid *uuid)
1770 {
1771     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
1772 }
1773
1774 static struct ovsdb_idl_row *
1775 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
1776 {
1777     for (; node; node = hmap_next(&table->rows, node)) {
1778         struct ovsdb_idl_row *row;
1779
1780         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
1781         if (ovsdb_idl_row_exists(row)) {
1782             return row;
1783         }
1784     }
1785     return NULL;
1786 }
1787
1788 /* Returns a row in 'table_class''s table in 'idl', or a null pointer if that
1789  * table is empty.
1790  *
1791  * Database tables are internally maintained as hash tables, so adding or
1792  * removing rows while traversing the same table can cause some rows to be
1793  * visited twice or not at apply. */
1794 const struct ovsdb_idl_row *
1795 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
1796                     const struct ovsdb_idl_table_class *table_class)
1797 {
1798     struct ovsdb_idl_table *table
1799         = ovsdb_idl_table_from_class(idl, table_class);
1800     return next_real_row(table, hmap_first(&table->rows));
1801 }
1802
1803 /* Returns a row following 'row' within its table, or a null pointer if 'row'
1804  * is the last row in its table. */
1805 const struct ovsdb_idl_row *
1806 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
1807 {
1808     struct ovsdb_idl_table *table = row->table;
1809
1810     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
1811 }
1812
1813 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
1814  * transaction has changed 'column''s value, the modified value is returned.
1815  *
1816  * The caller must not modify or free the returned value.
1817  *
1818  * Various kinds of changes can invalidate the returned value: writing to the
1819  * same 'column' in 'row' (e.g. with ovsdb_idl_txn_write()), deleting 'row'
1820  * (e.g. with ovsdb_idl_txn_delete()), or completing an ongoing transaction
1821  * (e.g. with ovsdb_idl_txn_commit() or ovsdb_idl_txn_abort()).  If the
1822  * returned value is needed for a long time, it is best to make a copy of it
1823  * with ovsdb_datum_clone(). */
1824 const struct ovsdb_datum *
1825 ovsdb_idl_read(const struct ovsdb_idl_row *row,
1826                const struct ovsdb_idl_column *column)
1827 {
1828     const struct ovsdb_idl_table_class *class;
1829     size_t column_idx;
1830
1831     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
1832
1833     class = row->table->class;
1834     column_idx = column - class->columns;
1835
1836     ovs_assert(row->new != NULL);
1837     ovs_assert(column_idx < class->n_columns);
1838
1839     if (row->written && bitmap_is_set(row->written, column_idx)) {
1840         return &row->new[column_idx];
1841     } else if (row->old) {
1842         return &row->old[column_idx];
1843     } else {
1844         return ovsdb_datum_default(&column->type);
1845     }
1846 }
1847
1848 /* Same as ovsdb_idl_read(), except that it also asserts that 'column' has key
1849  * type 'key_type' and value type 'value_type'.  (Scalar and set types will
1850  * have a value type of OVSDB_TYPE_VOID.)
1851  *
1852  * This is useful in code that "knows" that a particular column has a given
1853  * type, so that it will abort if someone changes the column's type without
1854  * updating the code that uses it. */
1855 const struct ovsdb_datum *
1856 ovsdb_idl_get(const struct ovsdb_idl_row *row,
1857               const struct ovsdb_idl_column *column,
1858               enum ovsdb_atomic_type key_type OVS_UNUSED,
1859               enum ovsdb_atomic_type value_type OVS_UNUSED)
1860 {
1861     ovs_assert(column->type.key.type == key_type);
1862     ovs_assert(column->type.value.type == value_type);
1863
1864     return ovsdb_idl_read(row, column);
1865 }
1866
1867 /* Returns true if the field represented by 'column' in 'row' may be modified,
1868  * false if it is immutable.
1869  *
1870  * Normally, whether a field is mutable is controlled by its column's schema.
1871  * However, an immutable column can be set to any initial value at the time of
1872  * insertion, so if 'row' is a new row (one that is being added as part of the
1873  * current transaction, supposing that a transaction is in progress) then even
1874  * its "immutable" fields are actually mutable. */
1875 bool
1876 ovsdb_idl_is_mutable(const struct ovsdb_idl_row *row,
1877                      const struct ovsdb_idl_column *column)
1878 {
1879     return column->mutable || (row->new && !row->old);
1880 }
1881
1882 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
1883  * to all-zero-bits by some other entity.  If 'row' was set up some other way
1884  * then the return value is indeterminate. */
1885 bool
1886 ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
1887 {
1888     return row->table == NULL;
1889 }
1890 \f
1891 /* Transactions. */
1892
1893 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1894                                    enum ovsdb_idl_txn_status);
1895
1896 /* Returns a string representation of 'status'.  The caller must not modify or
1897  * free the returned string.
1898  *
1899  * The return value is probably useful only for debug log messages and unit
1900  * tests. */
1901 const char *
1902 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
1903 {
1904     switch (status) {
1905     case TXN_UNCOMMITTED:
1906         return "uncommitted";
1907     case TXN_UNCHANGED:
1908         return "unchanged";
1909     case TXN_INCOMPLETE:
1910         return "incomplete";
1911     case TXN_ABORTED:
1912         return "aborted";
1913     case TXN_SUCCESS:
1914         return "success";
1915     case TXN_TRY_AGAIN:
1916         return "try again";
1917     case TXN_NOT_LOCKED:
1918         return "not locked";
1919     case TXN_ERROR:
1920         return "error";
1921     }
1922     return "<unknown>";
1923 }
1924
1925 /* Starts a new transaction on 'idl'.  A given ovsdb_idl may only have a single
1926  * active transaction at a time.  See the large comment in ovsdb-idl.h for
1927  * general information on transactions. */
1928 struct ovsdb_idl_txn *
1929 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
1930 {
1931     struct ovsdb_idl_txn *txn;
1932
1933     ovs_assert(!idl->txn);
1934     idl->txn = txn = xmalloc(sizeof *txn);
1935     txn->request_id = NULL;
1936     txn->idl = idl;
1937     hmap_init(&txn->txn_rows);
1938     txn->status = TXN_UNCOMMITTED;
1939     txn->error = NULL;
1940     txn->dry_run = false;
1941     ds_init(&txn->comment);
1942
1943     txn->inc_table = NULL;
1944     txn->inc_column = NULL;
1945
1946     hmap_init(&txn->inserted_rows);
1947
1948     return txn;
1949 }
1950
1951 /* Appends 's', which is treated as a printf()-type format string, to the
1952  * comments that will be passed to the OVSDB server when 'txn' is committed.
1953  * (The comment will be committed to the OVSDB log, which "ovsdb-tool
1954  * show-log" can print in a relatively human-readable form.) */
1955 void
1956 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s, ...)
1957 {
1958     va_list args;
1959
1960     if (txn->comment.length) {
1961         ds_put_char(&txn->comment, '\n');
1962     }
1963
1964     va_start(args, s);
1965     ds_put_format_valist(&txn->comment, s, args);
1966     va_end(args);
1967 }
1968
1969 /* Marks 'txn' as a transaction that will not actually modify the database.  In
1970  * almost every way, the transaction is treated like other transactions.  It
1971  * must be committed or aborted like other transactions, it will be sent to the
1972  * database server like other transactions, and so on.  The only difference is
1973  * that the operations sent to the database server will include, as the last
1974  * step, an "abort" operation, so that any changes made by the transaction will
1975  * not actually take effect. */
1976 void
1977 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
1978 {
1979     txn->dry_run = true;
1980 }
1981
1982 /* Causes 'txn', when committed, to increment the value of 'column' within
1983  * 'row' by 1.  'column' must have an integer type.  After 'txn' commits
1984  * successfully, the client may retrieve the final (incremented) value of
1985  * 'column' with ovsdb_idl_txn_get_increment_new_value().
1986  *
1987  * The client could accomplish something similar with ovsdb_idl_read(),
1988  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
1989  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
1990  * will never (by itself) fail because of a verify error.
1991  *
1992  * The intended use is for incrementing the "next_cfg" column in the
1993  * Open_vSwitch table. */
1994 void
1995 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
1996                         const struct ovsdb_idl_row *row,
1997                         const struct ovsdb_idl_column *column)
1998 {
1999     ovs_assert(!txn->inc_table);
2000     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
2001     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
2002
2003     txn->inc_table = row->table->class->name;
2004     txn->inc_column = column->name;
2005     txn->inc_row = row->uuid;
2006 }
2007
2008 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
2009  * has been called for 'txn' but the commit is still incomplete (that is, the
2010  * last call returned TXN_INCOMPLETE) then the transaction may or may not still
2011  * end up committing at the database server, but the client will not be able to
2012  * get any further status information back. */
2013 void
2014 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
2015 {
2016     struct ovsdb_idl_txn_insert *insert, *next;
2017
2018     json_destroy(txn->request_id);
2019     if (txn->status == TXN_INCOMPLETE) {
2020         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2021     }
2022     ovsdb_idl_txn_abort(txn);
2023     ds_destroy(&txn->comment);
2024     free(txn->error);
2025     HMAP_FOR_EACH_SAFE (insert, next, hmap_node, &txn->inserted_rows) {
2026         free(insert);
2027     }
2028     hmap_destroy(&txn->inserted_rows);
2029     free(txn);
2030 }
2031
2032 /* Causes poll_block() to wake up if 'txn' has completed committing. */
2033 void
2034 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
2035 {
2036     if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
2037         poll_immediate_wake();
2038     }
2039 }
2040
2041 static struct json *
2042 where_uuid_equals(const struct uuid *uuid)
2043 {
2044     return
2045         json_array_create_1(
2046             json_array_create_3(
2047                 json_string_create("_uuid"),
2048                 json_string_create("=="),
2049                 json_array_create_2(
2050                     json_string_create("uuid"),
2051                     json_string_create_nocopy(
2052                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
2053 }
2054
2055 static char *
2056 uuid_name_from_uuid(const struct uuid *uuid)
2057 {
2058     char *name;
2059     char *p;
2060
2061     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
2062     for (p = name; *p != '\0'; p++) {
2063         if (*p == '-') {
2064             *p = '_';
2065         }
2066     }
2067
2068     return name;
2069 }
2070
2071 static const struct ovsdb_idl_row *
2072 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
2073 {
2074     const struct ovsdb_idl_row *row;
2075
2076     HMAP_FOR_EACH_WITH_HASH (row, txn_node, uuid_hash(uuid), &txn->txn_rows) {
2077         if (uuid_equals(&row->uuid, uuid)) {
2078             return row;
2079         }
2080     }
2081     return NULL;
2082 }
2083
2084 /* XXX there must be a cleaner way to do this */
2085 static struct json *
2086 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
2087 {
2088     if (json->type == JSON_ARRAY) {
2089         struct uuid uuid;
2090         size_t i;
2091
2092         if (json->u.array.n == 2
2093             && json->u.array.elems[0]->type == JSON_STRING
2094             && json->u.array.elems[1]->type == JSON_STRING
2095             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
2096             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
2097             const struct ovsdb_idl_row *row;
2098
2099             row = ovsdb_idl_txn_get_row(txn, &uuid);
2100             if (row && !row->old && row->new) {
2101                 json_destroy(json);
2102
2103                 return json_array_create_2(
2104                     json_string_create("named-uuid"),
2105                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
2106             }
2107         }
2108
2109         for (i = 0; i < json->u.array.n; i++) {
2110             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
2111                                                       txn);
2112         }
2113     } else if (json->type == JSON_OBJECT) {
2114         struct shash_node *node;
2115
2116         SHASH_FOR_EACH (node, json_object(json)) {
2117             node->data = substitute_uuids(node->data, txn);
2118         }
2119     }
2120     return json;
2121 }
2122
2123 static void
2124 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
2125 {
2126     struct ovsdb_idl_row *row, *next;
2127
2128     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
2129      * ovsdb_idl_column's 'parse' function, which will call
2130      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
2131      * transaction and fail to update the graph.  */
2132     txn->idl->txn = NULL;
2133
2134     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
2135         if (row->old) {
2136             if (row->written) {
2137                 ovsdb_idl_row_unparse(row);
2138                 ovsdb_idl_row_clear_arcs(row, false);
2139                 ovsdb_idl_row_parse(row);
2140             }
2141         } else {
2142             ovsdb_idl_row_unparse(row);
2143         }
2144         ovsdb_idl_row_clear_new(row);
2145
2146         free(row->prereqs);
2147         row->prereqs = NULL;
2148
2149         free(row->written);
2150         row->written = NULL;
2151
2152         hmap_remove(&txn->txn_rows, &row->txn_node);
2153         hmap_node_nullify(&row->txn_node);
2154         if (!row->old) {
2155             hmap_remove(&row->table->rows, &row->hmap_node);
2156             free(row);
2157         }
2158     }
2159     hmap_destroy(&txn->txn_rows);
2160     hmap_init(&txn->txn_rows);
2161 }
2162
2163 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
2164  * of the following TXN_* constants:
2165  *
2166  *   TXN_INCOMPLETE:
2167  *
2168  *       The transaction is in progress, but not yet complete.  The caller
2169  *       should call again later, after calling ovsdb_idl_run() to let the IDL
2170  *       do OVSDB protocol processing.
2171  *
2172  *   TXN_UNCHANGED:
2173  *
2174  *       The transaction is complete.  (It didn't actually change the database,
2175  *       so the IDL didn't send any request to the database server.)
2176  *
2177  *   TXN_ABORTED:
2178  *
2179  *       The caller previously called ovsdb_idl_txn_abort().
2180  *
2181  *   TXN_SUCCESS:
2182  *
2183  *       The transaction was successful.  The update made by the transaction
2184  *       (and possibly other changes made by other database clients) should
2185  *       already be visible in the IDL.
2186  *
2187  *   TXN_TRY_AGAIN:
2188  *
2189  *       The transaction failed for some transient reason, e.g. because a
2190  *       "verify" operation reported an inconsistency or due to a network
2191  *       problem.  The caller should wait for a change to the database, then
2192  *       compose a new transaction, and commit the new transaction.
2193  *
2194  *       Use the return value of ovsdb_idl_get_seqno() to wait for a change in
2195  *       the database.  It is important to use its return value *before* the
2196  *       initial call to ovsdb_idl_txn_commit() as the baseline for this
2197  *       purpose, because the change that one should wait for can happen after
2198  *       the initial call but before the call that returns TXN_TRY_AGAIN, and
2199  *       using some other baseline value in that situation could cause an
2200  *       indefinite wait if the database rarely changes.
2201  *
2202  *   TXN_NOT_LOCKED:
2203  *
2204  *       The transaction failed because the IDL has been configured to require
2205  *       a database lock (with ovsdb_idl_set_lock()) but didn't get it yet or
2206  *       has already lost it.
2207  *
2208  * Committing a transaction rolls back all of the changes that it made to the
2209  * IDL's copy of the database.  If the transaction commits successfully, then
2210  * the database server will send an update and, thus, the IDL will be updated
2211  * with the committed changes. */
2212 enum ovsdb_idl_txn_status
2213 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
2214 {
2215     struct ovsdb_idl_row *row;
2216     struct json *operations;
2217     bool any_updates;
2218
2219     if (txn != txn->idl->txn) {
2220         goto coverage_out;
2221     }
2222
2223     /* If we need a lock but don't have it, give up quickly. */
2224     if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
2225         txn->status = TXN_NOT_LOCKED;
2226         goto disassemble_out;
2227     }
2228
2229     operations = json_array_create_1(
2230         json_string_create(txn->idl->class->database));
2231
2232     /* Assert that we have the required lock (avoiding a race). */
2233     if (txn->idl->lock_name) {
2234         struct json *op = json_object_create();
2235         json_array_add(operations, op);
2236         json_object_put_string(op, "op", "assert");
2237         json_object_put_string(op, "lock", txn->idl->lock_name);
2238     }
2239
2240     /* Add prerequisites and declarations of new rows. */
2241     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2242         /* XXX check that deleted rows exist even if no prereqs? */
2243         if (row->prereqs) {
2244             const struct ovsdb_idl_table_class *class = row->table->class;
2245             size_t n_columns = class->n_columns;
2246             struct json *op, *columns, *row_json;
2247             size_t idx;
2248
2249             op = json_object_create();
2250             json_array_add(operations, op);
2251             json_object_put_string(op, "op", "wait");
2252             json_object_put_string(op, "table", class->name);
2253             json_object_put(op, "timeout", json_integer_create(0));
2254             json_object_put(op, "where", where_uuid_equals(&row->uuid));
2255             json_object_put_string(op, "until", "==");
2256             columns = json_array_create_empty();
2257             json_object_put(op, "columns", columns);
2258             row_json = json_object_create();
2259             json_object_put(op, "rows", json_array_create_1(row_json));
2260
2261             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
2262                 const struct ovsdb_idl_column *column = &class->columns[idx];
2263                 json_array_add(columns, json_string_create(column->name));
2264                 json_object_put(row_json, column->name,
2265                                 ovsdb_datum_to_json(&row->old[idx],
2266                                                     &column->type));
2267             }
2268         }
2269     }
2270
2271     /* Add updates. */
2272     any_updates = false;
2273     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2274         const struct ovsdb_idl_table_class *class = row->table->class;
2275
2276         if (!row->new) {
2277             if (class->is_root) {
2278                 struct json *op = json_object_create();
2279                 json_object_put_string(op, "op", "delete");
2280                 json_object_put_string(op, "table", class->name);
2281                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2282                 json_array_add(operations, op);
2283                 any_updates = true;
2284             } else {
2285                 /* Let ovsdb-server decide whether to really delete it. */
2286             }
2287         } else if (row->old != row->new) {
2288             struct json *row_json;
2289             struct json *op;
2290             size_t idx;
2291
2292             op = json_object_create();
2293             json_object_put_string(op, "op", row->old ? "update" : "insert");
2294             json_object_put_string(op, "table", class->name);
2295             if (row->old) {
2296                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2297             } else {
2298                 struct ovsdb_idl_txn_insert *insert;
2299
2300                 any_updates = true;
2301
2302                 json_object_put(op, "uuid-name",
2303                                 json_string_create_nocopy(
2304                                     uuid_name_from_uuid(&row->uuid)));
2305
2306                 insert = xmalloc(sizeof *insert);
2307                 insert->dummy = row->uuid;
2308                 insert->op_index = operations->u.array.n - 1;
2309                 uuid_zero(&insert->real);
2310                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
2311                             uuid_hash(&insert->dummy));
2312             }
2313             row_json = json_object_create();
2314             json_object_put(op, "row", row_json);
2315
2316             if (row->written) {
2317                 BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
2318                     const struct ovsdb_idl_column *column =
2319                                                         &class->columns[idx];
2320
2321                     if (row->old
2322                         || !ovsdb_datum_is_default(&row->new[idx],
2323                                                   &column->type)) {
2324                         json_object_put(row_json, column->name,
2325                                         substitute_uuids(
2326                                             ovsdb_datum_to_json(&row->new[idx],
2327                                                                 &column->type),
2328                                             txn));
2329
2330                         /* If anything really changed, consider it an update.
2331                          * We can't suppress not-really-changed values earlier
2332                          * or transactions would become nonatomic (see the big
2333                          * comment inside ovsdb_idl_txn_write()). */
2334                         if (!any_updates && row->old &&
2335                             !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
2336                                                 &column->type)) {
2337                             any_updates = true;
2338                         }
2339                     }
2340                 }
2341             }
2342
2343             if (!row->old || !shash_is_empty(json_object(row_json))) {
2344                 json_array_add(operations, op);
2345             } else {
2346                 json_destroy(op);
2347             }
2348         }
2349     }
2350
2351     /* Add increment. */
2352     if (txn->inc_table && any_updates) {
2353         struct json *op;
2354
2355         txn->inc_index = operations->u.array.n - 1;
2356
2357         op = json_object_create();
2358         json_object_put_string(op, "op", "mutate");
2359         json_object_put_string(op, "table", txn->inc_table);
2360         json_object_put(op, "where",
2361                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2362                                          txn));
2363         json_object_put(op, "mutations",
2364                         json_array_create_1(
2365                             json_array_create_3(
2366                                 json_string_create(txn->inc_column),
2367                                 json_string_create("+="),
2368                                 json_integer_create(1))));
2369         json_array_add(operations, op);
2370
2371         op = json_object_create();
2372         json_object_put_string(op, "op", "select");
2373         json_object_put_string(op, "table", txn->inc_table);
2374         json_object_put(op, "where",
2375                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2376                                          txn));
2377         json_object_put(op, "columns",
2378                         json_array_create_1(json_string_create(
2379                                                 txn->inc_column)));
2380         json_array_add(operations, op);
2381     }
2382
2383     if (txn->comment.length) {
2384         struct json *op = json_object_create();
2385         json_object_put_string(op, "op", "comment");
2386         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
2387         json_array_add(operations, op);
2388     }
2389
2390     if (txn->dry_run) {
2391         struct json *op = json_object_create();
2392         json_object_put_string(op, "op", "abort");
2393         json_array_add(operations, op);
2394     }
2395
2396     if (!any_updates) {
2397         txn->status = TXN_UNCHANGED;
2398         json_destroy(operations);
2399     } else if (!jsonrpc_session_send(
2400                    txn->idl->session,
2401                    jsonrpc_create_request(
2402                        "transact", operations, &txn->request_id))) {
2403         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
2404                     json_hash(txn->request_id, 0));
2405         txn->status = TXN_INCOMPLETE;
2406     } else {
2407         txn->status = TXN_TRY_AGAIN;
2408     }
2409
2410 disassemble_out:
2411     ovsdb_idl_txn_disassemble(txn);
2412 coverage_out:
2413     switch (txn->status) {
2414     case TXN_UNCOMMITTED:   COVERAGE_INC(txn_uncommitted);    break;
2415     case TXN_UNCHANGED:     COVERAGE_INC(txn_unchanged);      break;
2416     case TXN_INCOMPLETE:    COVERAGE_INC(txn_incomplete);     break;
2417     case TXN_ABORTED:       COVERAGE_INC(txn_aborted);        break;
2418     case TXN_SUCCESS:       COVERAGE_INC(txn_success);        break;
2419     case TXN_TRY_AGAIN:     COVERAGE_INC(txn_try_again);      break;
2420     case TXN_NOT_LOCKED:    COVERAGE_INC(txn_not_locked);     break;
2421     case TXN_ERROR:         COVERAGE_INC(txn_error);          break;
2422     }
2423
2424     return txn->status;
2425 }
2426
2427 /* Attempts to commit 'txn', blocking until the commit either succeeds or
2428  * fails.  Returns the final commit status, which may be any TXN_* value other
2429  * than TXN_INCOMPLETE.
2430  *
2431  * This function calls ovsdb_idl_run() on 'txn''s IDL, so it may cause the
2432  * return value of ovsdb_idl_get_seqno() to change. */
2433 enum ovsdb_idl_txn_status
2434 ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
2435 {
2436     enum ovsdb_idl_txn_status status;
2437
2438     fatal_signal_run();
2439     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
2440         ovsdb_idl_run(txn->idl);
2441         ovsdb_idl_wait(txn->idl);
2442         ovsdb_idl_txn_wait(txn);
2443         poll_block();
2444     }
2445     return status;
2446 }
2447
2448 /* Returns the final (incremented) value of the column in 'txn' that was set to
2449  * be incremented by ovsdb_idl_txn_increment().  'txn' must have committed
2450  * successfully. */
2451 int64_t
2452 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
2453 {
2454     ovs_assert(txn->status == TXN_SUCCESS);
2455     return txn->inc_new_value;
2456 }
2457
2458 /* Aborts 'txn' without sending it to the database server.  This is effective
2459  * only if ovsdb_idl_txn_commit() has not yet been called for 'txn'.
2460  * Otherwise, it has no effect.
2461  *
2462  * Aborting a transaction doesn't free its memory.  Use
2463  * ovsdb_idl_txn_destroy() to do that. */
2464 void
2465 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
2466 {
2467     ovsdb_idl_txn_disassemble(txn);
2468     if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
2469         txn->status = TXN_ABORTED;
2470     }
2471 }
2472
2473 /* Returns a string that reports the error status for 'txn'.  The caller must
2474  * not modify or free the returned string.  A call to ovsdb_idl_txn_destroy()
2475  * for 'txn' may free the returned string.
2476  *
2477  * The return value is ordinarily one of the strings that
2478  * ovsdb_idl_txn_status_to_string() would return, but if the transaction failed
2479  * due to an error reported by the database server, the return value is that
2480  * error. */
2481 const char *
2482 ovsdb_idl_txn_get_error(const struct ovsdb_idl_txn *txn)
2483 {
2484     if (txn->status != TXN_ERROR) {
2485         return ovsdb_idl_txn_status_to_string(txn->status);
2486     } else if (txn->error) {
2487         return txn->error;
2488     } else {
2489         return "no error details available";
2490     }
2491 }
2492
2493 static void
2494 ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn,
2495                              const struct json *json)
2496 {
2497     if (txn->error == NULL) {
2498         txn->error = json_to_string(json, JSSF_SORT);
2499     }
2500 }
2501
2502 /* For transaction 'txn' that completed successfully, finds and returns the
2503  * permanent UUID that the database assigned to a newly inserted row, given the
2504  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
2505  *
2506  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
2507  * if it was assigned by that function and then deleted by
2508  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
2509  * and then deleted within a single transaction are never sent to the database
2510  * server, so it never assigns them a permanent UUID.) */
2511 const struct uuid *
2512 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
2513                               const struct uuid *uuid)
2514 {
2515     const struct ovsdb_idl_txn_insert *insert;
2516
2517     ovs_assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
2518     HMAP_FOR_EACH_IN_BUCKET (insert, hmap_node,
2519                              uuid_hash(uuid), &txn->inserted_rows) {
2520         if (uuid_equals(uuid, &insert->dummy)) {
2521             return &insert->real;
2522         }
2523     }
2524     return NULL;
2525 }
2526
2527 static void
2528 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
2529                        enum ovsdb_idl_txn_status status)
2530 {
2531     txn->status = status;
2532     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2533 }
2534
2535 /* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
2536  * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
2537  * ovs-vswitchd).
2538  *
2539  * 'datum' must have the correct type for its column.  The IDL does not check
2540  * that it meets schema constraints, but ovsdb-server will do so at commit time
2541  * so it had better be correct.
2542  *
2543  * A transaction must be in progress.  Replication of 'column' must not have
2544  * been disabled (by calling ovsdb_idl_omit()).
2545  *
2546  * Usually this function is used indirectly through one of the "set" functions
2547  * generated by ovsdb-idlc.
2548  *
2549  * Takes ownership of what 'datum' points to (and in some cases destroys that
2550  * data before returning) but makes a copy of 'datum' itself.  (Commonly
2551  * 'datum' is on the caller's stack.) */
2552 static void
2553 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
2554                       const struct ovsdb_idl_column *column,
2555                       struct ovsdb_datum *datum, bool owns_datum)
2556 {
2557     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2558     const struct ovsdb_idl_table_class *class;
2559     size_t column_idx;
2560     bool write_only;
2561
2562     if (ovsdb_idl_row_is_synthetic(row)) {
2563         goto discard_datum;
2564     }
2565
2566     class = row->table->class;
2567     column_idx = column - class->columns;
2568     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
2569
2570     ovs_assert(row->new != NULL);
2571     ovs_assert(column_idx < class->n_columns);
2572     ovs_assert(row->old == NULL ||
2573                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2574
2575     if (row->table->idl->verify_write_only && !write_only) {
2576         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
2577                  " explicitly configured not to.", class->name, column->name);
2578         goto discard_datum;
2579     }
2580
2581     /* If this is a write-only column and the datum being written is the same
2582      * as the one already there, just skip the update entirely.  This is worth
2583      * optimizing because we have a lot of columns that get periodically
2584      * refreshed into the database but don't actually change that often.
2585      *
2586      * We don't do this for read/write columns because that would break
2587      * atomicity of transactions--some other client might have written a
2588      * different value in that column since we read it.  (But if a whole
2589      * transaction only does writes of existing values, without making any real
2590      * changes, we will drop the whole transaction later in
2591      * ovsdb_idl_txn_commit().) */
2592     if (write_only && ovsdb_datum_equals(ovsdb_idl_read(row, column),
2593                                          datum, &column->type)) {
2594         goto discard_datum;
2595     }
2596
2597     if (hmap_node_is_null(&row->txn_node)) {
2598         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2599                     uuid_hash(&row->uuid));
2600     }
2601     if (row->old == row->new) {
2602         row->new = xmalloc(class->n_columns * sizeof *row->new);
2603     }
2604     if (!row->written) {
2605         row->written = bitmap_allocate(class->n_columns);
2606     }
2607     if (bitmap_is_set(row->written, column_idx)) {
2608         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
2609     } else {
2610         bitmap_set1(row->written, column_idx);
2611     }
2612     if (owns_datum) {
2613         row->new[column_idx] = *datum;
2614     } else {
2615         ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
2616     }
2617     (column->unparse)(row);
2618     (column->parse)(row, &row->new[column_idx]);
2619     return;
2620
2621 discard_datum:
2622     if (owns_datum) {
2623         ovsdb_datum_destroy(datum, &column->type);
2624     }
2625 }
2626
2627 void
2628 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
2629                     const struct ovsdb_idl_column *column,
2630                     struct ovsdb_datum *datum)
2631 {
2632     ovsdb_idl_txn_write__(row, column, datum, true);
2633 }
2634
2635 void
2636 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
2637                           const struct ovsdb_idl_column *column,
2638                           const struct ovsdb_datum *datum)
2639 {
2640     ovsdb_idl_txn_write__(row, column,
2641                           CONST_CAST(struct ovsdb_datum *, datum), false);
2642 }
2643
2644 /* Causes the original contents of 'column' in 'row_' to be verified as a
2645  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
2646  * changed (or if 'row_' was deleted) between the time that the IDL originally
2647  * read its contents and the time that the transaction commits, then the
2648  * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
2649  * TXN_AGAIN_NOW (depending on whether the database change has already been
2650  * received).
2651  *
2652  * The intention is that, to ensure that no transaction commits based on dirty
2653  * reads, an application should call ovsdb_idl_txn_verify() on each data item
2654  * read as part of a read-modify-write operation.
2655  *
2656  * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
2657  * value of 'column' is already known:
2658  *
2659  *   - If 'row_' is a row created by the current transaction (returned by
2660  *     ovsdb_idl_txn_insert()).
2661  *
2662  *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
2663  *     within the current transaction.
2664  *
2665  * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
2666  * ovsdb_idl_txn_write() for a given read-modify-write.
2667  *
2668  * A transaction must be in progress.
2669  *
2670  * Usually this function is used indirectly through one of the "verify"
2671  * functions generated by ovsdb-idlc. */
2672 void
2673 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
2674                      const struct ovsdb_idl_column *column)
2675 {
2676     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2677     const struct ovsdb_idl_table_class *class;
2678     size_t column_idx;
2679
2680     if (ovsdb_idl_row_is_synthetic(row)) {
2681         return;
2682     }
2683
2684     class = row->table->class;
2685     column_idx = column - class->columns;
2686
2687     ovs_assert(row->new != NULL);
2688     ovs_assert(row->old == NULL ||
2689                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2690     if (!row->old
2691         || (row->written && bitmap_is_set(row->written, column_idx))) {
2692         return;
2693     }
2694
2695     if (hmap_node_is_null(&row->txn_node)) {
2696         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2697                     uuid_hash(&row->uuid));
2698     }
2699     if (!row->prereqs) {
2700         row->prereqs = bitmap_allocate(class->n_columns);
2701     }
2702     bitmap_set1(row->prereqs, column_idx);
2703 }
2704
2705 /* Deletes 'row_' from its table.  May free 'row_', so it must not be
2706  * accessed afterward.
2707  *
2708  * A transaction must be in progress.
2709  *
2710  * Usually this function is used indirectly through one of the "delete"
2711  * functions generated by ovsdb-idlc. */
2712 void
2713 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
2714 {
2715     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2716
2717     if (ovsdb_idl_row_is_synthetic(row)) {
2718         return;
2719     }
2720
2721     ovs_assert(row->new != NULL);
2722     if (!row->old) {
2723         ovsdb_idl_row_unparse(row);
2724         ovsdb_idl_row_clear_new(row);
2725         ovs_assert(!row->prereqs);
2726         hmap_remove(&row->table->rows, &row->hmap_node);
2727         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
2728         free(row);
2729         return;
2730     }
2731     if (hmap_node_is_null(&row->txn_node)) {
2732         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2733                     uuid_hash(&row->uuid));
2734     }
2735     ovsdb_idl_row_clear_new(row);
2736     row->new = NULL;
2737 }
2738
2739 /* Inserts and returns a new row in the table with the specified 'class' in the
2740  * database with open transaction 'txn'.
2741  *
2742  * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
2743  * randomly generated; otherwise 'uuid' should specify a randomly generated
2744  * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
2745  * 'txn' is committed, but the IDL will replace any uses of the provisional
2746  * UUID in the data to be to be committed by the UUID assigned by
2747  * ovsdb-server.
2748  *
2749  * Usually this function is used indirectly through one of the "insert"
2750  * functions generated by ovsdb-idlc. */
2751 const struct ovsdb_idl_row *
2752 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
2753                      const struct ovsdb_idl_table_class *class,
2754                      const struct uuid *uuid)
2755 {
2756     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
2757
2758     if (uuid) {
2759         ovs_assert(!ovsdb_idl_txn_get_row(txn, uuid));
2760         row->uuid = *uuid;
2761     } else {
2762         uuid_generate(&row->uuid);
2763     }
2764
2765     row->table = ovsdb_idl_table_from_class(txn->idl, class);
2766     row->new = xmalloc(class->n_columns * sizeof *row->new);
2767     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
2768     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
2769     return row;
2770 }
2771
2772 static void
2773 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
2774 {
2775     struct ovsdb_idl_txn *txn;
2776
2777     HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
2778         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
2779     }
2780 }
2781
2782 static struct ovsdb_idl_txn *
2783 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
2784 {
2785     struct ovsdb_idl_txn *txn;
2786
2787     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
2788                              json_hash(id, 0), &idl->outstanding_txns) {
2789         if (json_equal(id, txn->request_id)) {
2790             return txn;
2791         }
2792     }
2793     return NULL;
2794 }
2795
2796 static bool
2797 check_json_type(const struct json *json, enum json_type type, const char *name)
2798 {
2799     if (!json) {
2800         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
2801         return false;
2802     } else if (json->type != type) {
2803         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
2804                      name, json_type_to_string(json->type),
2805                      json_type_to_string(type));
2806         return false;
2807     } else {
2808         return true;
2809     }
2810 }
2811
2812 static bool
2813 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
2814                                 const struct json_array *results)
2815 {
2816     struct json *count, *rows, *row, *column;
2817     struct shash *mutate, *select;
2818
2819     if (txn->inc_index + 2 > results->n) {
2820         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2821                      "for increment (has %"PRIuSIZE", needs %u)",
2822                      results->n, txn->inc_index + 2);
2823         return false;
2824     }
2825
2826     /* We know that this is a JSON object because the loop in
2827      * ovsdb_idl_txn_process_reply() checked. */
2828     mutate = json_object(results->elems[txn->inc_index]);
2829     count = shash_find_data(mutate, "count");
2830     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
2831         return false;
2832     }
2833     if (count->u.integer != 1) {
2834         VLOG_WARN_RL(&syntax_rl,
2835                      "\"mutate\" reply \"count\" is %lld instead of 1",
2836                      count->u.integer);
2837         return false;
2838     }
2839
2840     select = json_object(results->elems[txn->inc_index + 1]);
2841     rows = shash_find_data(select, "rows");
2842     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
2843         return false;
2844     }
2845     if (rows->u.array.n != 1) {
2846         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %"PRIuSIZE" elements "
2847                      "instead of 1",
2848                      rows->u.array.n);
2849         return false;
2850     }
2851     row = rows->u.array.elems[0];
2852     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
2853         return false;
2854     }
2855     column = shash_find_data(json_object(row), txn->inc_column);
2856     if (!check_json_type(column, JSON_INTEGER,
2857                          "\"select\" reply inc column")) {
2858         return false;
2859     }
2860     txn->inc_new_value = column->u.integer;
2861     return true;
2862 }
2863
2864 static bool
2865 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
2866                                    const struct json_array *results)
2867 {
2868     static const struct ovsdb_base_type uuid_type = OVSDB_BASE_UUID_INIT;
2869     struct ovsdb_error *error;
2870     struct json *json_uuid;
2871     union ovsdb_atom uuid;
2872     struct shash *reply;
2873
2874     if (insert->op_index >= results->n) {
2875         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2876                      "for insert (has %"PRIuSIZE", needs %u)",
2877                      results->n, insert->op_index);
2878         return false;
2879     }
2880
2881     /* We know that this is a JSON object because the loop in
2882      * ovsdb_idl_txn_process_reply() checked. */
2883     reply = json_object(results->elems[insert->op_index]);
2884     json_uuid = shash_find_data(reply, "uuid");
2885     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
2886         return false;
2887     }
2888
2889     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
2890     if (error) {
2891         char *s = ovsdb_error_to_string(error);
2892         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
2893                      "UUID: %s", s);
2894         free(s);
2895         ovsdb_error_destroy(error);
2896         return false;
2897     }
2898
2899     insert->real = uuid.uuid;
2900
2901     return true;
2902 }
2903
2904 static bool
2905 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
2906                             const struct jsonrpc_msg *msg)
2907 {
2908     struct ovsdb_idl_txn *txn;
2909     enum ovsdb_idl_txn_status status;
2910
2911     txn = ovsdb_idl_txn_find(idl, msg->id);
2912     if (!txn) {
2913         return false;
2914     }
2915
2916     if (msg->type == JSONRPC_ERROR) {
2917         status = TXN_ERROR;
2918     } else if (msg->result->type != JSON_ARRAY) {
2919         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
2920         status = TXN_ERROR;
2921     } else {
2922         struct json_array *ops = &msg->result->u.array;
2923         int hard_errors = 0;
2924         int soft_errors = 0;
2925         int lock_errors = 0;
2926         size_t i;
2927
2928         for (i = 0; i < ops->n; i++) {
2929             struct json *op = ops->elems[i];
2930
2931             if (op->type == JSON_NULL) {
2932                 /* This isn't an error in itself but indicates that some prior
2933                  * operation failed, so make sure that we know about it. */
2934                 soft_errors++;
2935             } else if (op->type == JSON_OBJECT) {
2936                 struct json *error;
2937
2938                 error = shash_find_data(json_object(op), "error");
2939                 if (error) {
2940                     if (error->type == JSON_STRING) {
2941                         if (!strcmp(error->u.string, "timed out")) {
2942                             soft_errors++;
2943                         } else if (!strcmp(error->u.string, "not owner")) {
2944                             lock_errors++;
2945                         } else if (strcmp(error->u.string, "aborted")) {
2946                             hard_errors++;
2947                             ovsdb_idl_txn_set_error_json(txn, op);
2948                         }
2949                     } else {
2950                         hard_errors++;
2951                         ovsdb_idl_txn_set_error_json(txn, op);
2952                         VLOG_WARN_RL(&syntax_rl,
2953                                      "\"error\" in reply is not JSON string");
2954                     }
2955                 }
2956             } else {
2957                 hard_errors++;
2958                 ovsdb_idl_txn_set_error_json(txn, op);
2959                 VLOG_WARN_RL(&syntax_rl,
2960                              "operation reply is not JSON null or object");
2961             }
2962         }
2963
2964         if (!soft_errors && !hard_errors && !lock_errors) {
2965             struct ovsdb_idl_txn_insert *insert;
2966
2967             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
2968                 hard_errors++;
2969             }
2970
2971             HMAP_FOR_EACH (insert, hmap_node, &txn->inserted_rows) {
2972                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
2973                     hard_errors++;
2974                 }
2975             }
2976         }
2977
2978         status = (hard_errors ? TXN_ERROR
2979                   : lock_errors ? TXN_NOT_LOCKED
2980                   : soft_errors ? TXN_TRY_AGAIN
2981                   : TXN_SUCCESS);
2982     }
2983
2984     ovsdb_idl_txn_complete(txn, status);
2985     return true;
2986 }
2987
2988 /* Returns the transaction currently active for 'row''s IDL.  A transaction
2989  * must currently be active. */
2990 struct ovsdb_idl_txn *
2991 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
2992 {
2993     struct ovsdb_idl_txn *txn = row->table->idl->txn;
2994     ovs_assert(txn != NULL);
2995     return txn;
2996 }
2997
2998 /* Returns the IDL on which 'txn' acts. */
2999 struct ovsdb_idl *
3000 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
3001 {
3002     return txn->idl;
3003 }
3004
3005 /* Blocks until 'idl' successfully connects to the remote database and
3006  * retrieves its contents. */
3007 void
3008 ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
3009 {
3010     while (1) {
3011         ovsdb_idl_run(idl);
3012         if (ovsdb_idl_has_ever_connected(idl)) {
3013             return;
3014         }
3015         ovsdb_idl_wait(idl);
3016         poll_block();
3017     }
3018 }
3019 \f
3020 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
3021  * the database server and to avoid modifying the database when the lock cannot
3022  * be acquired (that is, when another client has the same lock).
3023  *
3024  * If 'lock_name' is NULL, drops the locking requirement and releases the
3025  * lock. */
3026 void
3027 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
3028 {
3029     ovs_assert(!idl->txn);
3030     ovs_assert(hmap_is_empty(&idl->outstanding_txns));
3031
3032     if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
3033         /* Release previous lock. */
3034         ovsdb_idl_send_unlock_request(idl);
3035         free(idl->lock_name);
3036         idl->lock_name = NULL;
3037         idl->is_lock_contended = false;
3038     }
3039
3040     if (lock_name && !idl->lock_name) {
3041         /* Acquire new lock. */
3042         idl->lock_name = xstrdup(lock_name);
3043         ovsdb_idl_send_lock_request(idl);
3044     }
3045 }
3046
3047 /* Returns true if 'idl' is configured to obtain a lock and owns that lock.
3048  *
3049  * Locking and unlocking happens asynchronously from the database client's
3050  * point of view, so the information is only useful for optimization (e.g. if
3051  * the client doesn't have the lock then there's no point in trying to write to
3052  * the database). */
3053 bool
3054 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
3055 {
3056     return idl->has_lock;
3057 }
3058
3059 /* Returns true if 'idl' is configured to obtain a lock but the database server
3060  * has indicated that some other client already owns the requested lock. */
3061 bool
3062 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
3063 {
3064     return idl->is_lock_contended;
3065 }
3066
3067 static void
3068 ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
3069 {
3070     if (new_has_lock && !idl->has_lock) {
3071         if (idl->state == IDL_S_MONITORING ||
3072             idl->state == IDL_S_MONITORING2) {
3073             idl->change_seqno++;
3074         } else {
3075             /* We're setting up a session, so don't signal that the database
3076              * changed.  Finalizing the session will increment change_seqno
3077              * anyhow. */
3078         }
3079         idl->is_lock_contended = false;
3080     }
3081     idl->has_lock = new_has_lock;
3082 }
3083
3084 static void
3085 ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
3086                               struct json **idp)
3087 {
3088     ovsdb_idl_update_has_lock(idl, false);
3089
3090     json_destroy(idl->lock_request_id);
3091     idl->lock_request_id = NULL;
3092
3093     if (jsonrpc_session_is_connected(idl->session)) {
3094         struct json *params;
3095
3096         params = json_array_create_1(json_string_create(idl->lock_name));
3097         jsonrpc_session_send(idl->session,
3098                              jsonrpc_create_request(method, params, idp));
3099     }
3100 }
3101
3102 static void
3103 ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
3104 {
3105     ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
3106 }
3107
3108 static void
3109 ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
3110 {
3111     ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
3112 }
3113
3114 static void
3115 ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
3116 {
3117     bool got_lock;
3118
3119     json_destroy(idl->lock_request_id);
3120     idl->lock_request_id = NULL;
3121
3122     if (result->type == JSON_OBJECT) {
3123         const struct json *locked;
3124
3125         locked = shash_find_data(json_object(result), "locked");
3126         got_lock = locked && locked->type == JSON_TRUE;
3127     } else {
3128         got_lock = false;
3129     }
3130
3131     ovsdb_idl_update_has_lock(idl, got_lock);
3132     if (!got_lock) {
3133         idl->is_lock_contended = true;
3134     }
3135 }
3136
3137 static void
3138 ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
3139                             const struct json *params,
3140                             bool new_has_lock)
3141 {
3142     if (idl->lock_name
3143         && params->type == JSON_ARRAY
3144         && json_array(params)->n > 0
3145         && json_array(params)->elems[0]->type == JSON_STRING) {
3146         const char *lock_name = json_string(json_array(params)->elems[0]);
3147
3148         if (!strcmp(idl->lock_name, lock_name)) {
3149             ovsdb_idl_update_has_lock(idl, new_has_lock);
3150             if (!new_has_lock) {
3151                 idl->is_lock_contended = true;
3152             }
3153         }
3154     }
3155 }
3156
3157 void
3158 ovsdb_idl_loop_destroy(struct ovsdb_idl_loop *loop)
3159 {
3160     if (loop) {
3161         ovsdb_idl_destroy(loop->idl);
3162     }
3163 }
3164
3165 struct ovsdb_idl_txn *
3166 ovsdb_idl_loop_run(struct ovsdb_idl_loop *loop)
3167 {
3168     ovsdb_idl_run(loop->idl);
3169     loop->open_txn = (loop->committing_txn
3170                       || ovsdb_idl_get_seqno(loop->idl) == loop->skip_seqno
3171                       ? NULL
3172                       : ovsdb_idl_txn_create(loop->idl));
3173     return loop->open_txn;
3174 }
3175
3176 void
3177 ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
3178 {
3179     if (loop->open_txn) {
3180         loop->committing_txn = loop->open_txn;
3181         loop->open_txn = NULL;
3182
3183         loop->precommit_seqno = ovsdb_idl_get_seqno(loop->idl);
3184     }
3185
3186     struct ovsdb_idl_txn *txn = loop->committing_txn;
3187     if (txn) {
3188         enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(txn);
3189         if (status != TXN_INCOMPLETE) {
3190             switch (status) {
3191             case TXN_TRY_AGAIN:
3192                 /* We want to re-evaluate the database when it's changed from
3193                  * the contents that it had when we started the commit.  (That
3194                  * might have already happened.) */
3195                 loop->skip_seqno = loop->precommit_seqno;
3196                 if (ovsdb_idl_get_seqno(loop->idl) != loop->skip_seqno) {
3197                     poll_immediate_wake();
3198                 }
3199                 break;
3200
3201             case TXN_SUCCESS:
3202                 /* If the database has already changed since we started the
3203                  * commit, re-evaluate it immediately to avoid missing a change
3204                  * for a while. */
3205                 if (ovsdb_idl_get_seqno(loop->idl) != loop->precommit_seqno) {
3206                     poll_immediate_wake();
3207                 }
3208                 break;
3209
3210             case TXN_UNCHANGED:
3211             case TXN_ABORTED:
3212             case TXN_NOT_LOCKED:
3213             case TXN_ERROR:
3214                 break;
3215
3216             case TXN_UNCOMMITTED:
3217             case TXN_INCOMPLETE:
3218                 OVS_NOT_REACHED();
3219
3220             }
3221             ovsdb_idl_txn_destroy(txn);
3222             loop->committing_txn = NULL;
3223         }
3224     }
3225
3226     ovsdb_idl_wait(loop->idl);
3227 }