Move lib/dynamic-string.h to include/openvswitch directory
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "coverage.h"
27 #include "openvswitch/dynamic-string.h"
28 #include "fatal-signal.h"
29 #include "json.h"
30 #include "jsonrpc.h"
31 #include "ovsdb/ovsdb.h"
32 #include "ovsdb/table.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-idl-provider.h"
36 #include "ovsdb-parser.h"
37 #include "poll-loop.h"
38 #include "shash.h"
39 #include "sset.h"
40 #include "util.h"
41 #include "openvswitch/vlog.h"
42
43 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
44
45 COVERAGE_DEFINE(txn_uncommitted);
46 COVERAGE_DEFINE(txn_unchanged);
47 COVERAGE_DEFINE(txn_incomplete);
48 COVERAGE_DEFINE(txn_aborted);
49 COVERAGE_DEFINE(txn_success);
50 COVERAGE_DEFINE(txn_try_again);
51 COVERAGE_DEFINE(txn_not_locked);
52 COVERAGE_DEFINE(txn_error);
53
54 /* An arc from one idl_row to another.  When row A contains a UUID that
55  * references row B, this is represented by an arc from A (the source) to B
56  * (the destination).
57  *
58  * Arcs from a row to itself are omitted, that is, src and dst are always
59  * different.
60  *
61  * Arcs are never duplicated, that is, even if there are multiple references
62  * from A to B, there is only a single arc from A to B.
63  *
64  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
65  * A.  Both an arc and its converse may both be present, if each row refers
66  * to the other circularly.
67  *
68  * The source and destination row may be in the same table or in different
69  * tables.
70  */
71 struct ovsdb_idl_arc {
72     struct ovs_list src_node;   /* In src->src_arcs list. */
73     struct ovs_list dst_node;   /* In dst->dst_arcs list. */
74     struct ovsdb_idl_row *src;  /* Source row. */
75     struct ovsdb_idl_row *dst;  /* Destination row. */
76 };
77
78 enum ovsdb_idl_state {
79     IDL_S_SCHEMA_REQUESTED,
80     IDL_S_MONITOR_REQUESTED,
81     IDL_S_MONITORING,
82     IDL_S_MONITOR2_REQUESTED,
83     IDL_S_MONITORING2,
84     IDL_S_NO_SCHEMA
85 };
86
87 struct ovsdb_idl {
88     const struct ovsdb_idl_class *class;
89     struct jsonrpc_session *session;
90     struct shash table_by_name;
91     struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
92     unsigned int change_seqno;
93     bool verify_write_only;
94
95     /* Session state. */
96     unsigned int state_seqno;
97     enum ovsdb_idl_state state;
98     struct json *request_id;
99     struct json *schema;
100
101     /* Database locking. */
102     char *lock_name;            /* Name of lock we need, NULL if none. */
103     bool has_lock;              /* Has db server told us we have the lock? */
104     bool is_lock_contended;     /* Has db server told us we can't get lock? */
105     struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
106
107     /* Transaction support. */
108     struct ovsdb_idl_txn *txn;
109     struct hmap outstanding_txns;
110 };
111
112 struct ovsdb_idl_txn {
113     struct hmap_node hmap_node;
114     struct json *request_id;
115     struct ovsdb_idl *idl;
116     struct hmap txn_rows;
117     enum ovsdb_idl_txn_status status;
118     char *error;
119     bool dry_run;
120     struct ds comment;
121
122     /* Increments. */
123     const char *inc_table;
124     const char *inc_column;
125     struct uuid inc_row;
126     unsigned int inc_index;
127     int64_t inc_new_value;
128
129     /* Inserted rows. */
130     struct hmap inserted_rows;  /* Contains "struct ovsdb_idl_txn_insert"s. */
131 };
132
133 struct ovsdb_idl_txn_insert {
134     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
135     struct uuid dummy;          /* Dummy UUID used locally. */
136     int op_index;               /* Index into transaction's operation array. */
137     struct uuid real;           /* Real UUID used by database server. */
138 };
139
140 enum ovsdb_update_version {
141     OVSDB_UPDATE,               /* RFC 7047 "update" method. */
142     OVSDB_UPDATE2               /* "update2" Extension to RFC 7047.
143                                    See ovsdb-server(1) for more information. */
144 };
145
146 /* Name arrays indexed by 'enum ovsdb_update_version'. */
147 static const char *table_updates_names[] = {"table_updates", "table_updates2"};
148 static const char *table_update_names[] = {"table_update", "table_update2"};
149 static const char *row_update_names[] = {"row_update", "row_update2"};
150
151 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
152 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
153
154 static void ovsdb_idl_clear(struct ovsdb_idl *);
155 static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
156 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
157 static void ovsdb_idl_send_monitor2_request(struct ovsdb_idl *);
158 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *,
159                                    enum ovsdb_update_version);
160 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
161                                                     const struct json *,
162                                                     enum ovsdb_update_version);
163 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
164                                      const struct uuid *,
165                                      const struct json *old,
166                                      const struct json *new);
167 static bool ovsdb_idl_process_update2(struct ovsdb_idl_table *,
168                                       const struct uuid *,
169                                       const char *operation,
170                                       const struct json *row);
171 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
172 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
173 static bool ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
174 static bool ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *,
175                                          const struct json *);
176
177 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
178 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
179     const struct ovsdb_idl_table_class *);
180 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
181                                                   const struct uuid *);
182 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
183 static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *);
184
185 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
186 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
187 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
188 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
189 static void ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *, bool destroy_dsts);
190
191 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
192 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
193                                         const struct jsonrpc_msg *msg);
194
195 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
196 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
197 static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
198                                        const struct json *);
199 static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
200                                         const struct json *params,
201                                         bool new_has_lock);
202 static struct ovsdb_idl_table *
203 ovsdb_idl_table_from_class(const struct ovsdb_idl *,
204                            const struct ovsdb_idl_table_class *);
205 static bool ovsdb_idl_track_is_set(struct ovsdb_idl_table *table);
206
207 /* Creates and returns a connection to database 'remote', which should be in a
208  * form acceptable to jsonrpc_session_open().  The connection will maintain an
209  * in-memory replica of the remote database whose schema is described by
210  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
211  * by ovsdb-idlc.)
212  *
213  * Passes 'retry' to jsonrpc_session_open().  See that function for
214  * documentation.
215  *
216  * If 'monitor_everything_by_default' is true, then everything in the remote
217  * database will be replicated by default.  ovsdb_idl_omit() and
218  * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
219  * monitoring.
220  *
221  * If 'monitor_everything_by_default' is false, then no columns or tables will
222  * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
223  * must be used to choose some columns or tables to replicate.
224  */
225 struct ovsdb_idl *
226 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
227                  bool monitor_everything_by_default, bool retry)
228 {
229     struct ovsdb_idl *idl;
230     uint8_t default_mode;
231     size_t i;
232
233     default_mode = (monitor_everything_by_default
234                     ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
235                     : 0);
236
237     idl = xzalloc(sizeof *idl);
238     idl->class = class;
239     idl->session = jsonrpc_session_open(remote, retry);
240     shash_init(&idl->table_by_name);
241     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
242     for (i = 0; i < class->n_tables; i++) {
243         const struct ovsdb_idl_table_class *tc = &class->tables[i];
244         struct ovsdb_idl_table *table = &idl->tables[i];
245         size_t j;
246
247         shash_add_assert(&idl->table_by_name, tc->name, table);
248         table->class = tc;
249         table->modes = xmalloc(tc->n_columns);
250         memset(table->modes, default_mode, tc->n_columns);
251         table->need_table = false;
252         shash_init(&table->columns);
253         for (j = 0; j < tc->n_columns; j++) {
254             const struct ovsdb_idl_column *column = &tc->columns[j];
255
256             shash_add_assert(&table->columns, column->name, column);
257         }
258         hmap_init(&table->rows);
259         list_init(&table->track_list);
260         table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
261             = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
262             = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
263         table->idl = idl;
264     }
265
266     idl->state_seqno = UINT_MAX;
267     idl->request_id = NULL;
268     idl->schema = NULL;
269
270     hmap_init(&idl->outstanding_txns);
271
272     return idl;
273 }
274
275 /* Destroys 'idl' and all of the data structures that it manages. */
276 void
277 ovsdb_idl_destroy(struct ovsdb_idl *idl)
278 {
279     if (idl) {
280         size_t i;
281
282         ovs_assert(!idl->txn);
283         ovsdb_idl_clear(idl);
284         jsonrpc_session_close(idl->session);
285
286         for (i = 0; i < idl->class->n_tables; i++) {
287             struct ovsdb_idl_table *table = &idl->tables[i];
288             shash_destroy(&table->columns);
289             hmap_destroy(&table->rows);
290             free(table->modes);
291         }
292         shash_destroy(&idl->table_by_name);
293         free(idl->tables);
294         json_destroy(idl->request_id);
295         free(idl->lock_name);
296         json_destroy(idl->lock_request_id);
297         json_destroy(idl->schema);
298         hmap_destroy(&idl->outstanding_txns);
299         free(idl);
300     }
301 }
302
303 static void
304 ovsdb_idl_clear(struct ovsdb_idl *idl)
305 {
306     bool changed = false;
307     size_t i;
308
309     for (i = 0; i < idl->class->n_tables; i++) {
310         struct ovsdb_idl_table *table = &idl->tables[i];
311         struct ovsdb_idl_row *row, *next_row;
312
313         if (hmap_is_empty(&table->rows)) {
314             continue;
315         }
316
317         changed = true;
318         HMAP_FOR_EACH_SAFE (row, next_row, hmap_node, &table->rows) {
319             struct ovsdb_idl_arc *arc, *next_arc;
320
321             if (!ovsdb_idl_row_is_orphan(row)) {
322                 ovsdb_idl_row_unparse(row);
323             }
324             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
325                 free(arc);
326             }
327             /* No need to do anything with dst_arcs: some node has those arcs
328              * as forward arcs and will destroy them itself. */
329
330             if (!list_is_empty(&row->track_node)) {
331                 list_remove(&row->track_node);
332             }
333
334             ovsdb_idl_row_destroy(row);
335         }
336     }
337
338     ovsdb_idl_track_clear(idl);
339
340     if (changed) {
341         idl->change_seqno++;
342     }
343 }
344
345 /* Processes a batch of messages from the database server on 'idl'.  This may
346  * cause the IDL's contents to change.  The client may check for that with
347  * ovsdb_idl_get_seqno(). */
348 void
349 ovsdb_idl_run(struct ovsdb_idl *idl)
350 {
351     int i;
352
353     ovs_assert(!idl->txn);
354     jsonrpc_session_run(idl->session);
355     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
356         struct jsonrpc_msg *msg;
357         unsigned int seqno;
358
359         seqno = jsonrpc_session_get_seqno(idl->session);
360         if (idl->state_seqno != seqno) {
361             idl->state_seqno = seqno;
362             json_destroy(idl->request_id);
363             idl->request_id = NULL;
364             ovsdb_idl_txn_abort_all(idl);
365
366             ovsdb_idl_send_schema_request(idl);
367             idl->state = IDL_S_SCHEMA_REQUESTED;
368             if (idl->lock_name) {
369                 ovsdb_idl_send_lock_request(idl);
370             }
371         }
372
373         msg = jsonrpc_session_recv(idl->session);
374         if (!msg) {
375             break;
376         }
377
378         if (msg->type == JSONRPC_NOTIFY
379             && !strcmp(msg->method, "update2")
380             && msg->params->type == JSON_ARRAY
381             && msg->params->u.array.n == 2
382             && msg->params->u.array.elems[0]->type == JSON_NULL) {
383             /* Database contents changed. */
384             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
385                                    OVSDB_UPDATE2);
386         } else if (msg->type == JSONRPC_REPLY
387                    && idl->request_id
388                    && json_equal(idl->request_id, msg->id)) {
389             json_destroy(idl->request_id);
390             idl->request_id = NULL;
391
392             switch (idl->state) {
393             case IDL_S_SCHEMA_REQUESTED:
394                 /* Reply to our "get_schema" request. */
395                 idl->schema = json_clone(msg->result);
396                 ovsdb_idl_send_monitor2_request(idl);
397                 idl->state = IDL_S_MONITOR2_REQUESTED;
398                 break;
399
400             case IDL_S_MONITOR_REQUESTED:
401             case IDL_S_MONITOR2_REQUESTED:
402                 /* Reply to our "monitor" or "monitor2" request. */
403                 idl->change_seqno++;
404                 ovsdb_idl_clear(idl);
405                 if (idl->state == IDL_S_MONITOR_REQUESTED) {
406                     idl->state = IDL_S_MONITORING;
407                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE);
408                 } else { /* IDL_S_MONITOR2_REQUESTED. */
409                     idl->state = IDL_S_MONITORING2;
410                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE2);
411                 }
412
413                 /* Schema is not useful after monitor request is accepted
414                  * by the server.  */
415                 json_destroy(idl->schema);
416                 idl->schema = NULL;
417                 break;
418
419             case IDL_S_MONITORING:
420             case IDL_S_MONITORING2:
421             case IDL_S_NO_SCHEMA:
422             default:
423                 OVS_NOT_REACHED();
424             }
425         } else if (msg->type == JSONRPC_NOTIFY
426             && !strcmp(msg->method, "update")
427             && msg->params->type == JSON_ARRAY
428             && msg->params->u.array.n == 2
429             && msg->params->u.array.elems[0]->type == JSON_NULL) {
430             /* Database contents changed. */
431             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
432                                    OVSDB_UPDATE);
433         } else if (msg->type == JSONRPC_REPLY
434                    && idl->lock_request_id
435                    && json_equal(idl->lock_request_id, msg->id)) {
436             /* Reply to our "lock" request. */
437             ovsdb_idl_parse_lock_reply(idl, msg->result);
438         } else if (msg->type == JSONRPC_NOTIFY
439                    && !strcmp(msg->method, "locked")) {
440             /* We got our lock. */
441             ovsdb_idl_parse_lock_notify(idl, msg->params, true);
442         } else if (msg->type == JSONRPC_NOTIFY
443                    && !strcmp(msg->method, "stolen")) {
444             /* Someone else stole our lock. */
445             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
446         } else if (msg->type == JSONRPC_ERROR
447                    && idl->state == IDL_S_MONITOR2_REQUESTED
448                    && idl->request_id
449                    && json_equal(idl->request_id, msg->id)) {
450             if (msg->error && !strcmp(json_string(msg->error),
451                                       "unknown method")) {
452                 /* Fall back to using "monitor" method.  */
453                 json_destroy(idl->request_id);
454                 idl->request_id = NULL;
455                 ovsdb_idl_send_monitor_request(idl);
456                 idl->state = IDL_S_MONITOR_REQUESTED;
457             }
458         } else if (msg->type == JSONRPC_ERROR
459                    && idl->state == IDL_S_SCHEMA_REQUESTED
460                    && idl->request_id
461                    && json_equal(idl->request_id, msg->id)) {
462                 json_destroy(idl->request_id);
463                 idl->request_id = NULL;
464                 VLOG_ERR("%s: requested schema not found",
465                          jsonrpc_session_get_name(idl->session));
466                 idl->state = IDL_S_NO_SCHEMA;
467         } else if ((msg->type == JSONRPC_ERROR
468                     || msg->type == JSONRPC_REPLY)
469                    && ovsdb_idl_txn_process_reply(idl, msg)) {
470             /* ovsdb_idl_txn_process_reply() did everything needful. */
471         } else {
472             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
473              * a transaction before we receive the reply, so keep the log level
474              * low. */
475             VLOG_DBG("%s: received unexpected %s message",
476                      jsonrpc_session_get_name(idl->session),
477                      jsonrpc_msg_type_to_string(msg->type));
478         }
479         jsonrpc_msg_destroy(msg);
480     }
481     ovsdb_idl_row_destroy_postprocess(idl);
482 }
483
484 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
485  * do or when activity occurs on a transaction on 'idl'. */
486 void
487 ovsdb_idl_wait(struct ovsdb_idl *idl)
488 {
489     jsonrpc_session_wait(idl->session);
490     jsonrpc_session_recv_wait(idl->session);
491 }
492
493 /* Returns a "sequence number" that represents the state of 'idl'.  When
494  * ovsdb_idl_run() changes the database, the sequence number changes.  The
495  * initial fetch of the entire contents of the remote database is considered to
496  * be one kind of change.  Successfully acquiring a lock, if one has been
497  * configured with ovsdb_idl_set_lock(), is also considered to be a change.
498  *
499  * As long as the sequence number does not change, the client may continue to
500  * use any data structures it obtains from 'idl'.  But when it changes, the
501  * client must not access any of these data structures again, because they
502  * could have freed or reused for other purposes.
503  *
504  * The sequence number can occasionally change even if the database does not.
505  * This happens if the connection to the database drops and reconnects, which
506  * causes the database contents to be reloaded even if they didn't change.  (It
507  * could also happen if the database server sends out a "change" that reflects
508  * what the IDL already thought was in the database.  The database server is
509  * not supposed to do that, but bugs could in theory cause it to do so.) */
510 unsigned int
511 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
512 {
513     return idl->change_seqno;
514 }
515
516 /* Returns true if 'idl' successfully connected to the remote database and
517  * retrieved its contents (even if the connection subsequently dropped and is
518  * in the process of reconnecting).  If so, then 'idl' contains an atomic
519  * snapshot of the database's contents (but it might be arbitrarily old if the
520  * connection dropped).
521  *
522  * Returns false if 'idl' has never connected or retrieved the database's
523  * contents.  If so, 'idl' is empty. */
524 bool
525 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
526 {
527     return ovsdb_idl_get_seqno(idl) != 0;
528 }
529
530 /* Reconfigures 'idl' so that it would reconnect to the database, if
531  * connection was dropped. */
532 void
533 ovsdb_idl_enable_reconnect(struct ovsdb_idl *idl)
534 {
535     jsonrpc_session_enable_reconnect(idl->session);
536 }
537
538 /* Forces 'idl' to drop its connection to the database and reconnect.  In the
539  * meantime, the contents of 'idl' will not change. */
540 void
541 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
542 {
543     jsonrpc_session_force_reconnect(idl->session);
544 }
545
546 /* Some IDL users should only write to write-only columns.  Furthermore,
547  * writing to a column which is not write-only can cause serious performance
548  * degradations for these users.  This function causes 'idl' to reject writes
549  * to columns which are not marked write only using ovsdb_idl_omit_alert(). */
550 void
551 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
552 {
553     idl->verify_write_only = true;
554 }
555
556 /* Returns true if 'idl' is currently connected or trying to connect
557  * and a negative response to a schema request has not been received */
558 bool
559 ovsdb_idl_is_alive(const struct ovsdb_idl *idl)
560 {
561     return jsonrpc_session_is_alive(idl->session) &&
562            idl->state != IDL_S_NO_SCHEMA;
563 }
564
565 /* Returns the last error reported on a connection by 'idl'.  The return value
566  * is 0 only if no connection made by 'idl' has ever encountered an error and
567  * a negative response to a schema request has never been received. See
568  * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value
569  * interpretation. */
570 int
571 ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
572 {
573     int err;
574
575     err = jsonrpc_session_get_last_error(idl->session);
576
577     if (err) {
578         return err;
579     } else if (idl->state == IDL_S_NO_SCHEMA) {
580         return ENOENT;
581     } else {
582         return 0;
583     }
584 }
585 \f
586 static unsigned char *
587 ovsdb_idl_get_mode(struct ovsdb_idl *idl,
588                    const struct ovsdb_idl_column *column)
589 {
590     size_t i;
591
592     ovs_assert(!idl->change_seqno);
593
594     for (i = 0; i < idl->class->n_tables; i++) {
595         const struct ovsdb_idl_table *table = &idl->tables[i];
596         const struct ovsdb_idl_table_class *tc = table->class;
597
598         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
599             return &table->modes[column - tc->columns];
600         }
601     }
602
603     OVS_NOT_REACHED();
604 }
605
606 static void
607 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
608 {
609     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
610         struct ovsdb_idl_table *table;
611
612         table = shash_find_data(&idl->table_by_name,
613                                 base->u.uuid.refTableName);
614         if (table) {
615             table->need_table = true;
616         } else {
617             VLOG_WARN("%s IDL class missing referenced table %s",
618                       idl->class->database, base->u.uuid.refTableName);
619         }
620     }
621 }
622
623 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
624  * ensures that any tables referenced by 'column' will be replicated, even if
625  * no columns in that table are selected for replication (see
626  * ovsdb_idl_add_table() for more information).
627  *
628  * This function is only useful if 'monitor_everything_by_default' was false in
629  * the call to ovsdb_idl_create().  This function should be called between
630  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
631  */
632 void
633 ovsdb_idl_add_column(struct ovsdb_idl *idl,
634                      const struct ovsdb_idl_column *column)
635 {
636     *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
637     add_ref_table(idl, &column->type.key);
638     add_ref_table(idl, &column->type.value);
639 }
640
641 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
642  * no columns are selected for replication. Just the necessary data for table
643  * references will be replicated (the UUID of the rows, for instance), any
644  * columns not selected for replication will remain unreplicated.
645  * This can be useful because it allows 'idl' to keep track of what rows in the
646  * table actually exist, which in turn allows columns that reference the table
647  * to have accurate contents. (The IDL presents the database with references to
648  * rows that do not exist removed.)
649  *
650  * This function is only useful if 'monitor_everything_by_default' was false in
651  * the call to ovsdb_idl_create().  This function should be called between
652  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
653  */
654 void
655 ovsdb_idl_add_table(struct ovsdb_idl *idl,
656                     const struct ovsdb_idl_table_class *tc)
657 {
658     size_t i;
659
660     for (i = 0; i < idl->class->n_tables; i++) {
661         struct ovsdb_idl_table *table = &idl->tables[i];
662
663         if (table->class == tc) {
664             table->need_table = true;
665             return;
666         }
667     }
668
669     OVS_NOT_REACHED();
670 }
671
672 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
673  *
674  * This function should be called between ovsdb_idl_create() and the first call
675  * to ovsdb_idl_run().
676  */
677 void
678 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
679                      const struct ovsdb_idl_column *column)
680 {
681     *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
682 }
683
684 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
685  * OVSDB_IDL_MONITOR for details.
686  *
687  * This function should be called between ovsdb_idl_create() and the first call
688  * to ovsdb_idl_run().
689  */
690 void
691 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
692 {
693     *ovsdb_idl_get_mode(idl, column) = 0;
694 }
695
696 /* Returns the most recent IDL change sequence number that caused a
697  * insert, modify or delete update to the table with class 'table_class'.
698  */
699 unsigned int
700 ovsdb_idl_table_get_seqno(const struct ovsdb_idl *idl,
701                           const struct ovsdb_idl_table_class *table_class)
702 {
703     struct ovsdb_idl_table *table
704         = ovsdb_idl_table_from_class(idl, table_class);
705     unsigned int max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_INSERT];
706
707     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]) {
708         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY];
709     }
710     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_DELETE]) {
711         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_DELETE];
712     }
713     return max_seqno;
714 }
715
716 /* For each row that contains tracked columns, IDL stores the most
717  * recent IDL change sequence numbers associateed with insert, modify
718  * and delete updates to the table.
719  */
720 unsigned int
721 ovsdb_idl_row_get_seqno(const struct ovsdb_idl_row *row,
722                         enum ovsdb_idl_change change)
723 {
724     return row->change_seqno[change];
725 }
726
727 /* Turns on OVSDB_IDL_TRACK for 'column' in 'idl', ensuring that
728  * all rows whose 'column' is modified are traced. Similarly, insert
729  * or delete of rows having 'column' are tracked. Clients are able
730  * to retrive the tracked rows with the ovsdb_idl_track_get_*()
731  * functions.
732  *
733  * This function should be called between ovsdb_idl_create() and
734  * the first call to ovsdb_idl_run(). The column to be tracked
735  * should have OVSDB_IDL_ALERT turned on.
736  */
737 void
738 ovsdb_idl_track_add_column(struct ovsdb_idl *idl,
739                            const struct ovsdb_idl_column *column)
740 {
741     if (!(*ovsdb_idl_get_mode(idl, column) & OVSDB_IDL_ALERT)) {
742         ovsdb_idl_add_column(idl, column);
743     }
744     *ovsdb_idl_get_mode(idl, column) |= OVSDB_IDL_TRACK;
745 }
746
747 void
748 ovsdb_idl_track_add_all(struct ovsdb_idl *idl)
749 {
750     size_t i, j;
751
752     for (i = 0; i < idl->class->n_tables; i++) {
753         const struct ovsdb_idl_table_class *tc = &idl->class->tables[i];
754
755         for (j = 0; j < tc->n_columns; j++) {
756             const struct ovsdb_idl_column *column = &tc->columns[j];
757             ovsdb_idl_track_add_column(idl, column);
758         }
759     }
760 }
761
762 /* Returns true if 'table' has any tracked column. */
763 static bool
764 ovsdb_idl_track_is_set(struct ovsdb_idl_table *table)
765 {
766     size_t i;
767
768     for (i = 0; i < table->class->n_columns; i++) {
769         if (table->modes[i] & OVSDB_IDL_TRACK) {
770             return true;
771         }
772     }
773    return false;
774 }
775
776 /* Returns the first tracked row in table with class 'table_class'
777  * for the specified 'idl'. Returns NULL if there are no tracked rows */
778 const struct ovsdb_idl_row *
779 ovsdb_idl_track_get_first(const struct ovsdb_idl *idl,
780                           const struct ovsdb_idl_table_class *table_class)
781 {
782     struct ovsdb_idl_table *table
783         = ovsdb_idl_table_from_class(idl, table_class);
784
785     if (!list_is_empty(&table->track_list)) {
786         return CONTAINER_OF(list_front(&table->track_list), struct ovsdb_idl_row, track_node);
787     }
788     return NULL;
789 }
790
791 /* Returns the next tracked row in table after the specified 'row'
792  * (in no particular order). Returns NULL if there are no tracked rows */
793 const struct ovsdb_idl_row *
794 ovsdb_idl_track_get_next(const struct ovsdb_idl_row *row)
795 {
796     if (row->track_node.next != &row->table->track_list) {
797         return CONTAINER_OF(row->track_node.next, struct ovsdb_idl_row, track_node);
798     }
799
800     return NULL;
801 }
802
803 /* Returns true if a tracked 'column' in 'row' was updated by IDL, false
804  * otherwise. The tracking data is cleared by ovsdb_idl_track_clear()
805  *
806  * Function returns false if 'column' is not tracked (see
807  * ovsdb_idl_track_add_column()).
808  */
809 bool
810 ovsdb_idl_track_is_updated(const struct ovsdb_idl_row *row,
811                            const struct ovsdb_idl_column *column)
812 {
813     const struct ovsdb_idl_table_class *class;
814     size_t column_idx;
815
816     class = row->table->class;
817     column_idx = column - class->columns;
818
819     if (row->updated && bitmap_is_set(row->updated, column_idx)) {
820         return true;
821     } else {
822         return false;
823     }
824 }
825
826 /* Flushes the tracked rows. Client calls this function after calling
827  * ovsdb_idl_run() and read all tracked rows with the ovsdb_idl_track_get_*()
828  * functions. This is usually done at the end of the client's processing
829  * loop when it is ready to do ovsdb_idl_run() again.
830  */
831 void
832 ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
833 {
834     size_t i;
835
836     for (i = 0; i < idl->class->n_tables; i++) {
837         struct ovsdb_idl_table *table = &idl->tables[i];
838
839         if (!list_is_empty(&table->track_list)) {
840             struct ovsdb_idl_row *row, *next;
841
842             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
843                 if (row->updated) {
844                     free(row->updated);
845                     row->updated = NULL;
846                 }
847                 list_remove(&row->track_node);
848                 list_init(&row->track_node);
849                 if (ovsdb_idl_row_is_orphan(row)) {
850                     ovsdb_idl_row_clear_old(row);
851                     free(row);
852                 }
853             }
854         }
855     }
856 }
857
858 \f
859 static void
860 ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
861 {
862     struct jsonrpc_msg *msg;
863
864     json_destroy(idl->request_id);
865     msg = jsonrpc_create_request(
866         "get_schema",
867         json_array_create_1(json_string_create(idl->class->database)),
868         &idl->request_id);
869     jsonrpc_session_send(idl->session, msg);
870 }
871
872 static void
873 log_error(struct ovsdb_error *error)
874 {
875     char *s = ovsdb_error_to_string(error);
876     VLOG_WARN("error parsing database schema: %s", s);
877     free(s);
878     ovsdb_error_destroy(error);
879 }
880
881 /* Frees 'schema', which is in the format returned by parse_schema(). */
882 static void
883 free_schema(struct shash *schema)
884 {
885     if (schema) {
886         struct shash_node *node, *next;
887
888         SHASH_FOR_EACH_SAFE (node, next, schema) {
889             struct sset *sset = node->data;
890             sset_destroy(sset);
891             free(sset);
892             shash_delete(schema, node);
893         }
894         shash_destroy(schema);
895         free(schema);
896     }
897 }
898
899 /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
900  * 7047, to obtain the names of its rows and columns.  If successful, returns
901  * an shash whose keys are table names and whose values are ssets, where each
902  * sset contains the names of its table's columns.  On failure (due to a parse
903  * error), returns NULL.
904  *
905  * It would also be possible to use the general-purpose OVSDB schema parser in
906  * ovsdb-server, but that's overkill, possibly too strict for the current use
907  * case, and would require restructuring ovsdb-server to separate the schema
908  * code from the rest. */
909 static struct shash *
910 parse_schema(const struct json *schema_json)
911 {
912     struct ovsdb_parser parser;
913     const struct json *tables_json;
914     struct ovsdb_error *error;
915     struct shash_node *node;
916     struct shash *schema;
917
918     ovsdb_parser_init(&parser, schema_json, "database schema");
919     tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
920     error = ovsdb_parser_destroy(&parser);
921     if (error) {
922         log_error(error);
923         return NULL;
924     }
925
926     schema = xmalloc(sizeof *schema);
927     shash_init(schema);
928     SHASH_FOR_EACH (node, json_object(tables_json)) {
929         const char *table_name = node->name;
930         const struct json *json = node->data;
931         const struct json *columns_json;
932
933         ovsdb_parser_init(&parser, json, "table schema for table %s",
934                           table_name);
935         columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
936         error = ovsdb_parser_destroy(&parser);
937         if (error) {
938             log_error(error);
939             free_schema(schema);
940             return NULL;
941         }
942
943         struct sset *columns = xmalloc(sizeof *columns);
944         sset_init(columns);
945
946         struct shash_node *node2;
947         SHASH_FOR_EACH (node2, json_object(columns_json)) {
948             const char *column_name = node2->name;
949             sset_add(columns, column_name);
950         }
951         shash_add(schema, table_name, columns);
952     }
953     return schema;
954 }
955
956 static void
957 ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
958                                  const char *method)
959 {
960     struct shash *schema;
961     struct json *monitor_requests;
962     struct jsonrpc_msg *msg;
963     size_t i;
964
965     schema = parse_schema(idl->schema);
966     monitor_requests = json_object_create();
967     for (i = 0; i < idl->class->n_tables; i++) {
968         const struct ovsdb_idl_table *table = &idl->tables[i];
969         const struct ovsdb_idl_table_class *tc = table->class;
970         struct json *monitor_request, *columns;
971         const struct sset *table_schema;
972         size_t j;
973
974         table_schema = (schema
975                         ? shash_find_data(schema, table->class->name)
976                         : NULL);
977
978         columns = table->need_table ? json_array_create_empty() : NULL;
979         for (j = 0; j < tc->n_columns; j++) {
980             const struct ovsdb_idl_column *column = &tc->columns[j];
981             if (table->modes[j] & OVSDB_IDL_MONITOR) {
982                 if (table_schema
983                     && !sset_contains(table_schema, column->name)) {
984                     VLOG_WARN("%s table in %s database lacks %s column "
985                               "(database needs upgrade?)",
986                               table->class->name, idl->class->database,
987                               column->name);
988                     continue;
989                 }
990                 if (!columns) {
991                     columns = json_array_create_empty();
992                 }
993                 json_array_add(columns, json_string_create(column->name));
994             }
995         }
996
997         if (columns) {
998             if (schema && !table_schema) {
999                 VLOG_WARN("%s database lacks %s table "
1000                           "(database needs upgrade?)",
1001                           idl->class->database, table->class->name);
1002                 json_destroy(columns);
1003                 continue;
1004             }
1005
1006             monitor_request = json_object_create();
1007             json_object_put(monitor_request, "columns", columns);
1008             json_object_put(monitor_requests, tc->name, monitor_request);
1009         }
1010     }
1011     free_schema(schema);
1012
1013     json_destroy(idl->request_id);
1014     msg = jsonrpc_create_request(
1015         method,
1016         json_array_create_3(json_string_create(idl->class->database),
1017                             json_null_create(), monitor_requests),
1018         &idl->request_id);
1019     jsonrpc_session_send(idl->session, msg);
1020 }
1021
1022 static void
1023 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
1024 {
1025     ovsdb_idl_send_monitor_request__(idl, "monitor");
1026 }
1027
1028 static void
1029 log_parse_update_error(struct ovsdb_error *error)
1030 {
1031         if (!VLOG_DROP_WARN(&syntax_rl)) {
1032             char *s = ovsdb_error_to_string(error);
1033             VLOG_WARN_RL(&syntax_rl, "%s", s);
1034             free(s);
1035         }
1036         ovsdb_error_destroy(error);
1037 }
1038
1039 static void
1040 ovsdb_idl_send_monitor2_request(struct ovsdb_idl *idl)
1041 {
1042     ovsdb_idl_send_monitor_request__(idl, "monitor2");
1043 }
1044
1045 static void
1046 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates,
1047                        enum ovsdb_update_version version)
1048 {
1049     struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates,
1050                                                          version);
1051     if (error) {
1052         log_parse_update_error(error);
1053     }
1054 }
1055
1056 static struct ovsdb_error *
1057 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
1058                          const struct json *table_updates,
1059                          enum ovsdb_update_version version)
1060 {
1061     const struct shash_node *tables_node;
1062     const char *table_updates_name = table_updates_names[version];
1063     const char *table_update_name = table_update_names[version];
1064     const char *row_update_name = row_update_names[version];
1065
1066     if (table_updates->type != JSON_OBJECT) {
1067         return ovsdb_syntax_error(table_updates, NULL,
1068                                   "<%s> is not an object",
1069                                   table_updates_name);
1070     }
1071
1072     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
1073         const struct json *table_update = tables_node->data;
1074         const struct shash_node *table_node;
1075         struct ovsdb_idl_table *table;
1076
1077         table = shash_find_data(&idl->table_by_name, tables_node->name);
1078         if (!table) {
1079             return ovsdb_syntax_error(
1080                 table_updates, NULL,
1081                 "<%s> includes unknown table \"%s\"",
1082                 table_updates_name,
1083                 tables_node->name);
1084         }
1085
1086         if (table_update->type != JSON_OBJECT) {
1087             return ovsdb_syntax_error(table_update, NULL,
1088                                       "<%s> for table \"%s\" is "
1089                                       "not an object",
1090                                       table_update_name,
1091                                       table->class->name);
1092         }
1093         SHASH_FOR_EACH (table_node, json_object(table_update)) {
1094             const struct json *row_update = table_node->data;
1095             const struct json *old_json, *new_json;
1096             struct uuid uuid;
1097
1098             if (!uuid_from_string(&uuid, table_node->name)) {
1099                 return ovsdb_syntax_error(table_update, NULL,
1100                                           "<%s> for table \"%s\" "
1101                                           "contains bad UUID "
1102                                           "\"%s\" as member name",
1103                                           table_update_name,
1104                                           table->class->name,
1105                                           table_node->name);
1106             }
1107             if (row_update->type != JSON_OBJECT) {
1108                 return ovsdb_syntax_error(row_update, NULL,
1109                                           "<%s> for table \"%s\" "
1110                                           "contains <%s> for %s that "
1111                                           "is not an object",
1112                                           table_update_name,
1113                                           table->class->name,
1114                                           row_update_name,
1115                                           table_node->name);
1116             }
1117
1118             switch(version) {
1119             case OVSDB_UPDATE:
1120                 old_json = shash_find_data(json_object(row_update), "old");
1121                 new_json = shash_find_data(json_object(row_update), "new");
1122                 if (old_json && old_json->type != JSON_OBJECT) {
1123                     return ovsdb_syntax_error(old_json, NULL,
1124                                               "\"old\" <row> is not object");
1125                 } else if (new_json && new_json->type != JSON_OBJECT) {
1126                     return ovsdb_syntax_error(new_json, NULL,
1127                                               "\"new\" <row> is not object");
1128                 } else if ((old_json != NULL) + (new_json != NULL)
1129                            != shash_count(json_object(row_update))) {
1130                     return ovsdb_syntax_error(row_update, NULL,
1131                                               "<row-update> contains "
1132                                               "unexpected member");
1133                 } else if (!old_json && !new_json) {
1134                     return ovsdb_syntax_error(row_update, NULL,
1135                                               "<row-update> missing \"old\" "
1136                                               "and \"new\" members");
1137                 }
1138
1139                 if (ovsdb_idl_process_update(table, &uuid, old_json,
1140                                              new_json)) {
1141                     idl->change_seqno++;
1142                 }
1143                 break;
1144
1145             case OVSDB_UPDATE2: {
1146                 const char *ops[] = {"modify", "insert", "delete", "initial"};
1147                 const char *operation;
1148                 const struct json *row;
1149                 int i;
1150
1151                 for (i = 0; i < ARRAY_SIZE(ops); i++) {
1152                     operation = ops[i];
1153                     row = shash_find_data(json_object(row_update), operation);
1154
1155                     if (row)  {
1156                         if (ovsdb_idl_process_update2(table, &uuid, operation,
1157                                                       row)) {
1158                             idl->change_seqno++;
1159                         }
1160                         break;
1161                     }
1162                 }
1163
1164                 /* row_update2 should contain one of the objects */
1165                 if (i == ARRAY_SIZE(ops)) {
1166                     return ovsdb_syntax_error(row_update, NULL,
1167                                               "<row_update2> includes unknown "
1168                                               "object");
1169                 }
1170                 break;
1171             }
1172
1173             default:
1174                 OVS_NOT_REACHED();
1175             }
1176         }
1177     }
1178
1179     return NULL;
1180 }
1181
1182 static struct ovsdb_idl_row *
1183 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
1184 {
1185     struct ovsdb_idl_row *row;
1186
1187     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &table->rows) {
1188         if (uuid_equals(&row->uuid, uuid)) {
1189             return row;
1190         }
1191     }
1192     return NULL;
1193 }
1194
1195 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1196  * otherwise. */
1197 static bool
1198 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
1199                          const struct uuid *uuid, const struct json *old,
1200                          const struct json *new)
1201 {
1202     struct ovsdb_idl_row *row;
1203
1204     row = ovsdb_idl_get_row(table, uuid);
1205     if (!new) {
1206         /* Delete row. */
1207         if (row && !ovsdb_idl_row_is_orphan(row)) {
1208             /* XXX perhaps we should check the 'old' values? */
1209             ovsdb_idl_delete_row(row);
1210         } else {
1211             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1212                          "from table %s",
1213                          UUID_ARGS(uuid), table->class->name);
1214             return false;
1215         }
1216     } else if (!old) {
1217         /* Insert row. */
1218         if (!row) {
1219             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1220         } else if (ovsdb_idl_row_is_orphan(row)) {
1221             ovsdb_idl_insert_row(row, new);
1222         } else {
1223             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1224                          "table %s", UUID_ARGS(uuid), table->class->name);
1225             return ovsdb_idl_modify_row(row, new);
1226         }
1227     } else {
1228         /* Modify row. */
1229         if (row) {
1230             /* XXX perhaps we should check the 'old' values? */
1231             if (!ovsdb_idl_row_is_orphan(row)) {
1232                 return ovsdb_idl_modify_row(row, new);
1233             } else {
1234                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1235                              "referenced row "UUID_FMT" in table %s",
1236                              UUID_ARGS(uuid), table->class->name);
1237                 ovsdb_idl_insert_row(row, new);
1238             }
1239         } else {
1240             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1241                          "in table %s", UUID_ARGS(uuid), table->class->name);
1242             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1243         }
1244     }
1245
1246     return true;
1247 }
1248
1249 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1250  * otherwise. */
1251 static bool
1252 ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
1253                           const struct uuid *uuid,
1254                           const char *operation,
1255                           const struct json *json_row)
1256 {
1257     struct ovsdb_idl_row *row;
1258
1259     row = ovsdb_idl_get_row(table, uuid);
1260     if (!strcmp(operation, "delete")) {
1261         /* Delete row. */
1262         if (row && !ovsdb_idl_row_is_orphan(row)) {
1263             ovsdb_idl_delete_row(row);
1264         } else {
1265             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1266                          "from table %s",
1267                          UUID_ARGS(uuid), table->class->name);
1268             return false;
1269         }
1270     } else if (!strcmp(operation, "insert") || !strcmp(operation, "initial")) {
1271         /* Insert row. */
1272         if (!row) {
1273             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), json_row);
1274         } else if (ovsdb_idl_row_is_orphan(row)) {
1275             ovsdb_idl_insert_row(row, json_row);
1276         } else {
1277             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1278                          "table %s", UUID_ARGS(uuid), table->class->name);
1279             ovsdb_idl_delete_row(row);
1280             ovsdb_idl_insert_row(row, json_row);
1281         }
1282     } else if (!strcmp(operation, "modify")) {
1283         /* Modify row. */
1284         if (row) {
1285             if (!ovsdb_idl_row_is_orphan(row)) {
1286                 return ovsdb_idl_modify_row_by_diff(row, json_row);
1287             } else {
1288                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1289                              "referenced row "UUID_FMT" in table %s",
1290                              UUID_ARGS(uuid), table->class->name);
1291                 return false;
1292             }
1293         } else {
1294             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1295                          "in table %s", UUID_ARGS(uuid), table->class->name);
1296             return false;
1297         }
1298     } else {
1299             VLOG_WARN_RL(&semantic_rl, "unknown operation %s to "
1300                          "table %s", operation, table->class->name);
1301             return false;
1302     }
1303
1304     return true;
1305 }
1306
1307 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1308  * otherwise.
1309  *
1310  * Change 'row' either with the content of 'row_json' or by apply 'diff'.
1311  * Caller needs to provide either valid 'row_json' or 'diff', but not
1312  * both.  */
1313 static bool
1314 ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
1315                        const struct json *diff_json,
1316                        enum ovsdb_idl_change change)
1317 {
1318     struct ovsdb_idl_table *table = row->table;
1319     const struct ovsdb_idl_table_class *class = table->class;
1320     struct shash_node *node;
1321     bool changed = false;
1322     bool apply_diff = diff_json != NULL;
1323     const struct json *json = apply_diff ? diff_json : row_json;
1324
1325     SHASH_FOR_EACH (node, json_object(json)) {
1326         const char *column_name = node->name;
1327         const struct ovsdb_idl_column *column;
1328         struct ovsdb_datum datum;
1329         struct ovsdb_error *error;
1330         unsigned int column_idx;
1331         struct ovsdb_datum *old;
1332
1333         column = shash_find_data(&table->columns, column_name);
1334         if (!column) {
1335             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
1336                          column_name, UUID_ARGS(&row->uuid));
1337             continue;
1338         }
1339
1340         column_idx = column - table->class->columns;
1341         old = &row->old[column_idx];
1342
1343         error = NULL;
1344         if (apply_diff) {
1345             struct ovsdb_datum diff;
1346
1347             ovs_assert(!row_json);
1348             error = ovsdb_transient_datum_from_json(&diff, &column->type,
1349                                                     node->data);
1350             if (!error) {
1351                 error = ovsdb_datum_apply_diff(&datum, old, &diff,
1352                                                &column->type);
1353                 ovsdb_datum_destroy(&diff, &column->type);
1354             }
1355         } else {
1356             ovs_assert(!diff_json);
1357             error = ovsdb_datum_from_json(&datum, &column->type, node->data,
1358                                           NULL);
1359         }
1360
1361         if (!error) {
1362             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
1363                 ovsdb_datum_swap(old, &datum);
1364                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
1365                     changed = true;
1366                     row->change_seqno[change]
1367                         = row->table->change_seqno[change]
1368                         = row->table->idl->change_seqno + 1;
1369                     if (table->modes[column_idx] & OVSDB_IDL_TRACK) {
1370                         if (list_is_empty(&row->track_node)) {
1371                             list_push_front(&row->table->track_list,
1372                                             &row->track_node);
1373                         }
1374                         if (!row->updated) {
1375                             row->updated = bitmap_allocate(class->n_columns);
1376                         }
1377                         bitmap_set1(row->updated, column_idx);
1378                     }
1379                 }
1380             } else {
1381                 /* Didn't really change but the OVSDB monitor protocol always
1382                  * includes every value in a row. */
1383             }
1384
1385             ovsdb_datum_destroy(&datum, &column->type);
1386         } else {
1387             char *s = ovsdb_error_to_string(error);
1388             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
1389                          " in table %s: %s", column_name,
1390                          UUID_ARGS(&row->uuid), table->class->name, s);
1391             free(s);
1392             ovsdb_error_destroy(error);
1393         }
1394     }
1395     return changed;
1396 }
1397
1398 static bool
1399 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json,
1400                      enum ovsdb_idl_change change)
1401 {
1402     return ovsdb_idl_row_change__(row, row_json, NULL, change);
1403 }
1404
1405 static bool
1406 ovsdb_idl_row_apply_diff(struct ovsdb_idl_row *row,
1407                          const struct json *diff_json,
1408                          enum ovsdb_idl_change change)
1409 {
1410     return ovsdb_idl_row_change__(row, NULL, diff_json, change);
1411 }
1412
1413 /* When a row A refers to row B through a column with a "refTable" constraint,
1414  * but row B does not exist, row B is called an "orphan row".  Orphan rows
1415  * should not persist, because the database enforces referential integrity, but
1416  * they can appear transiently as changes from the database are received (the
1417  * database doesn't try to topologically sort them and circular references mean
1418  * it isn't always possible anyhow).
1419  *
1420  * This function returns true if 'row' is an orphan row, otherwise false.
1421  */
1422 static bool
1423 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
1424 {
1425     return !row->old && !row->new;
1426 }
1427
1428 /* Returns true if 'row' is conceptually part of the database as modified by
1429  * the current transaction (if any), false otherwise.
1430  *
1431  * This function will return true if 'row' is not an orphan (see the comment on
1432  * ovsdb_idl_row_is_orphan()) and:
1433  *
1434  *   - 'row' exists in the database and has not been deleted within the
1435  *     current transaction (if any).
1436  *
1437  *   - 'row' was inserted within the current transaction and has not been
1438  *     deleted.  (In the latter case you should not have passed 'row' in at
1439  *     all, because ovsdb_idl_txn_delete() freed it.)
1440  *
1441  * This function will return false if 'row' is an orphan or if 'row' was
1442  * deleted within the current transaction.
1443  */
1444 static bool
1445 ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
1446 {
1447     return row->new != NULL;
1448 }
1449
1450 static void
1451 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
1452 {
1453     const struct ovsdb_idl_table_class *class = row->table->class;
1454     size_t i;
1455
1456     for (i = 0; i < class->n_columns; i++) {
1457         const struct ovsdb_idl_column *c = &class->columns[i];
1458         (c->parse)(row, &row->old[i]);
1459     }
1460 }
1461
1462 static void
1463 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
1464 {
1465     const struct ovsdb_idl_table_class *class = row->table->class;
1466     size_t i;
1467
1468     for (i = 0; i < class->n_columns; i++) {
1469         const struct ovsdb_idl_column *c = &class->columns[i];
1470         (c->unparse)(row);
1471     }
1472 }
1473
1474 static void
1475 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
1476 {
1477     ovs_assert(row->old == row->new);
1478     if (!ovsdb_idl_row_is_orphan(row)) {
1479         const struct ovsdb_idl_table_class *class = row->table->class;
1480         size_t i;
1481
1482         for (i = 0; i < class->n_columns; i++) {
1483             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
1484         }
1485         free(row->old);
1486         row->old = row->new = NULL;
1487     }
1488 }
1489
1490 static void
1491 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
1492 {
1493     if (row->old != row->new) {
1494         if (row->new) {
1495             const struct ovsdb_idl_table_class *class = row->table->class;
1496             size_t i;
1497
1498             if (row->written) {
1499                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
1500                     ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
1501                 }
1502             }
1503             free(row->new);
1504             free(row->written);
1505             row->written = NULL;
1506         }
1507         row->new = row->old;
1508     }
1509 }
1510
1511 static void
1512 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
1513 {
1514     struct ovsdb_idl_arc *arc, *next;
1515
1516     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
1517      * that this causes to be unreferenced, if tracking is not enabled.
1518      * If tracking is enabled, orphaned nodes are removed from hmap but not
1519      * freed.
1520      */
1521     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
1522         list_remove(&arc->dst_node);
1523         if (destroy_dsts
1524             && ovsdb_idl_row_is_orphan(arc->dst)
1525             && list_is_empty(&arc->dst->dst_arcs)) {
1526             ovsdb_idl_row_destroy(arc->dst);
1527         }
1528         free(arc);
1529     }
1530     list_init(&row->src_arcs);
1531 }
1532
1533 /* Force nodes that reference 'row' to reparse. */
1534 static void
1535 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
1536 {
1537     struct ovsdb_idl_arc *arc, *next;
1538
1539     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
1540      * 'arc', so we need to use the "safe" variant of list traversal.  However,
1541      * calling an ovsdb_idl_column's 'parse' function will add an arc
1542      * equivalent to 'arc' to row->arcs.  That could be a problem for
1543      * traversal, but it adds it at the beginning of the list to prevent us
1544      * from stumbling upon it again.
1545      *
1546      * (If duplicate arcs were possible then we would need to make sure that
1547      * 'next' didn't also point into 'arc''s destination, but we forbid
1548      * duplicate arcs.) */
1549     LIST_FOR_EACH_SAFE (arc, next, dst_node, &row->dst_arcs) {
1550         struct ovsdb_idl_row *ref = arc->src;
1551
1552         ovsdb_idl_row_unparse(ref);
1553         ovsdb_idl_row_clear_arcs(ref, false);
1554         ovsdb_idl_row_parse(ref);
1555     }
1556 }
1557
1558 static struct ovsdb_idl_row *
1559 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
1560 {
1561     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
1562     class->row_init(row);
1563     list_init(&row->src_arcs);
1564     list_init(&row->dst_arcs);
1565     hmap_node_nullify(&row->txn_node);
1566     list_init(&row->track_node);
1567     return row;
1568 }
1569
1570 static struct ovsdb_idl_row *
1571 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
1572 {
1573     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
1574     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
1575     row->uuid = *uuid;
1576     row->table = table;
1577     return row;
1578 }
1579
1580 static void
1581 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
1582 {
1583     if (row) {
1584         ovsdb_idl_row_clear_old(row);
1585         hmap_remove(&row->table->rows, &row->hmap_node);
1586         if (ovsdb_idl_track_is_set(row->table)) {
1587             row->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1588                 = row->table->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1589                 = row->table->idl->change_seqno + 1;
1590         }
1591         if (list_is_empty(&row->track_node)) {
1592             list_push_front(&row->table->track_list, &row->track_node);
1593         }
1594     }
1595 }
1596
1597 static void
1598 ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
1599 {
1600     size_t i;
1601
1602     for (i = 0; i < idl->class->n_tables; i++) {
1603         struct ovsdb_idl_table *table = &idl->tables[i];
1604
1605         if (!list_is_empty(&table->track_list)) {
1606             struct ovsdb_idl_row *row, *next;
1607
1608             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
1609                 if (!ovsdb_idl_track_is_set(row->table)) {
1610                     list_remove(&row->track_node);
1611                     free(row);
1612                 }
1613             }
1614         }
1615     }
1616 }
1617
1618 static void
1619 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
1620 {
1621     const struct ovsdb_idl_table_class *class = row->table->class;
1622     size_t i;
1623
1624     ovs_assert(!row->old && !row->new);
1625     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
1626     for (i = 0; i < class->n_columns; i++) {
1627         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
1628     }
1629     ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_INSERT);
1630     ovsdb_idl_row_parse(row);
1631
1632     ovsdb_idl_row_reparse_backrefs(row);
1633 }
1634
1635 static void
1636 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
1637 {
1638     ovsdb_idl_row_unparse(row);
1639     ovsdb_idl_row_clear_arcs(row, true);
1640     ovsdb_idl_row_clear_old(row);
1641     if (list_is_empty(&row->dst_arcs)) {
1642         ovsdb_idl_row_destroy(row);
1643     } else {
1644         ovsdb_idl_row_reparse_backrefs(row);
1645     }
1646 }
1647
1648 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1649  * otherwise. */
1650 static bool
1651 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
1652 {
1653     bool changed;
1654
1655     ovsdb_idl_row_unparse(row);
1656     ovsdb_idl_row_clear_arcs(row, true);
1657     changed = ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_MODIFY);
1658     ovsdb_idl_row_parse(row);
1659
1660     return changed;
1661 }
1662
1663 static bool
1664 ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *row,
1665                              const struct json *diff_json)
1666 {
1667     bool changed;
1668
1669     ovsdb_idl_row_unparse(row);
1670     ovsdb_idl_row_clear_arcs(row, true);
1671     changed = ovsdb_idl_row_apply_diff(row, diff_json,
1672                                        OVSDB_IDL_CHANGE_MODIFY);
1673     ovsdb_idl_row_parse(row);
1674
1675     return changed;
1676 }
1677
1678 static bool
1679 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
1680 {
1681     const struct ovsdb_idl_arc *arc;
1682
1683     /* No self-arcs. */
1684     if (src == dst) {
1685         return false;
1686     }
1687
1688     /* No duplicate arcs.
1689      *
1690      * We only need to test whether the first arc in dst->dst_arcs originates
1691      * at 'src', since we add all of the arcs from a given source in a clump
1692      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
1693      * added at the front of the dst_arcs list. */
1694     if (list_is_empty(&dst->dst_arcs)) {
1695         return true;
1696     }
1697     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
1698     return arc->src != src;
1699 }
1700
1701 static struct ovsdb_idl_table *
1702 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
1703                            const struct ovsdb_idl_table_class *table_class)
1704 {
1705     return &idl->tables[table_class - idl->class->tables];
1706 }
1707
1708 /* Called by ovsdb-idlc generated code. */
1709 struct ovsdb_idl_row *
1710 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
1711                       struct ovsdb_idl_table_class *dst_table_class,
1712                       const struct uuid *dst_uuid)
1713 {
1714     struct ovsdb_idl *idl = src->table->idl;
1715     struct ovsdb_idl_table *dst_table;
1716     struct ovsdb_idl_arc *arc;
1717     struct ovsdb_idl_row *dst;
1718
1719     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
1720     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
1721     if (idl->txn) {
1722         /* We're being called from ovsdb_idl_txn_write().  We must not update
1723          * any arcs, because the transaction will be backed out at commit or
1724          * abort time and we don't want our graph screwed up.
1725          *
1726          * Just return the destination row, if there is one and it has not been
1727          * deleted. */
1728         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
1729             return dst;
1730         }
1731         return NULL;
1732     } else {
1733         /* We're being called from some other context.  Update the graph. */
1734         if (!dst) {
1735             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
1736         }
1737
1738         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
1739         if (may_add_arc(src, dst)) {
1740             /* The arc *must* be added at the front of the dst_arcs list.  See
1741              * ovsdb_idl_row_reparse_backrefs() for details. */
1742             arc = xmalloc(sizeof *arc);
1743             list_push_front(&src->src_arcs, &arc->src_node);
1744             list_push_front(&dst->dst_arcs, &arc->dst_node);
1745             arc->src = src;
1746             arc->dst = dst;
1747         }
1748
1749         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
1750     }
1751 }
1752
1753 /* Searches 'tc''s table in 'idl' for a row with UUID 'uuid'.  Returns a
1754  * pointer to the row if there is one, otherwise a null pointer.  */
1755 const struct ovsdb_idl_row *
1756 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
1757                            const struct ovsdb_idl_table_class *tc,
1758                            const struct uuid *uuid)
1759 {
1760     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
1761 }
1762
1763 static struct ovsdb_idl_row *
1764 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
1765 {
1766     for (; node; node = hmap_next(&table->rows, node)) {
1767         struct ovsdb_idl_row *row;
1768
1769         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
1770         if (ovsdb_idl_row_exists(row)) {
1771             return row;
1772         }
1773     }
1774     return NULL;
1775 }
1776
1777 /* Returns a row in 'table_class''s table in 'idl', or a null pointer if that
1778  * table is empty.
1779  *
1780  * Database tables are internally maintained as hash tables, so adding or
1781  * removing rows while traversing the same table can cause some rows to be
1782  * visited twice or not at apply. */
1783 const struct ovsdb_idl_row *
1784 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
1785                     const struct ovsdb_idl_table_class *table_class)
1786 {
1787     struct ovsdb_idl_table *table
1788         = ovsdb_idl_table_from_class(idl, table_class);
1789     return next_real_row(table, hmap_first(&table->rows));
1790 }
1791
1792 /* Returns a row following 'row' within its table, or a null pointer if 'row'
1793  * is the last row in its table. */
1794 const struct ovsdb_idl_row *
1795 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
1796 {
1797     struct ovsdb_idl_table *table = row->table;
1798
1799     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
1800 }
1801
1802 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
1803  * transaction has changed 'column''s value, the modified value is returned.
1804  *
1805  * The caller must not modify or free the returned value.
1806  *
1807  * Various kinds of changes can invalidate the returned value: writing to the
1808  * same 'column' in 'row' (e.g. with ovsdb_idl_txn_write()), deleting 'row'
1809  * (e.g. with ovsdb_idl_txn_delete()), or completing an ongoing transaction
1810  * (e.g. with ovsdb_idl_txn_commit() or ovsdb_idl_txn_abort()).  If the
1811  * returned value is needed for a long time, it is best to make a copy of it
1812  * with ovsdb_datum_clone(). */
1813 const struct ovsdb_datum *
1814 ovsdb_idl_read(const struct ovsdb_idl_row *row,
1815                const struct ovsdb_idl_column *column)
1816 {
1817     const struct ovsdb_idl_table_class *class;
1818     size_t column_idx;
1819
1820     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
1821
1822     class = row->table->class;
1823     column_idx = column - class->columns;
1824
1825     ovs_assert(row->new != NULL);
1826     ovs_assert(column_idx < class->n_columns);
1827
1828     if (row->written && bitmap_is_set(row->written, column_idx)) {
1829         return &row->new[column_idx];
1830     } else if (row->old) {
1831         return &row->old[column_idx];
1832     } else {
1833         return ovsdb_datum_default(&column->type);
1834     }
1835 }
1836
1837 /* Same as ovsdb_idl_read(), except that it also asserts that 'column' has key
1838  * type 'key_type' and value type 'value_type'.  (Scalar and set types will
1839  * have a value type of OVSDB_TYPE_VOID.)
1840  *
1841  * This is useful in code that "knows" that a particular column has a given
1842  * type, so that it will abort if someone changes the column's type without
1843  * updating the code that uses it. */
1844 const struct ovsdb_datum *
1845 ovsdb_idl_get(const struct ovsdb_idl_row *row,
1846               const struct ovsdb_idl_column *column,
1847               enum ovsdb_atomic_type key_type OVS_UNUSED,
1848               enum ovsdb_atomic_type value_type OVS_UNUSED)
1849 {
1850     ovs_assert(column->type.key.type == key_type);
1851     ovs_assert(column->type.value.type == value_type);
1852
1853     return ovsdb_idl_read(row, column);
1854 }
1855
1856 /* Returns true if the field represented by 'column' in 'row' may be modified,
1857  * false if it is immutable.
1858  *
1859  * Normally, whether a field is mutable is controlled by its column's schema.
1860  * However, an immutable column can be set to any initial value at the time of
1861  * insertion, so if 'row' is a new row (one that is being added as part of the
1862  * current transaction, supposing that a transaction is in progress) then even
1863  * its "immutable" fields are actually mutable. */
1864 bool
1865 ovsdb_idl_is_mutable(const struct ovsdb_idl_row *row,
1866                      const struct ovsdb_idl_column *column)
1867 {
1868     return column->mutable || (row->new && !row->old);
1869 }
1870
1871 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
1872  * to all-zero-bits by some other entity.  If 'row' was set up some other way
1873  * then the return value is indeterminate. */
1874 bool
1875 ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
1876 {
1877     return row->table == NULL;
1878 }
1879 \f
1880 /* Transactions. */
1881
1882 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1883                                    enum ovsdb_idl_txn_status);
1884
1885 /* Returns a string representation of 'status'.  The caller must not modify or
1886  * free the returned string.
1887  *
1888  * The return value is probably useful only for debug log messages and unit
1889  * tests. */
1890 const char *
1891 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
1892 {
1893     switch (status) {
1894     case TXN_UNCOMMITTED:
1895         return "uncommitted";
1896     case TXN_UNCHANGED:
1897         return "unchanged";
1898     case TXN_INCOMPLETE:
1899         return "incomplete";
1900     case TXN_ABORTED:
1901         return "aborted";
1902     case TXN_SUCCESS:
1903         return "success";
1904     case TXN_TRY_AGAIN:
1905         return "try again";
1906     case TXN_NOT_LOCKED:
1907         return "not locked";
1908     case TXN_ERROR:
1909         return "error";
1910     }
1911     return "<unknown>";
1912 }
1913
1914 /* Starts a new transaction on 'idl'.  A given ovsdb_idl may only have a single
1915  * active transaction at a time.  See the large comment in ovsdb-idl.h for
1916  * general information on transactions. */
1917 struct ovsdb_idl_txn *
1918 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
1919 {
1920     struct ovsdb_idl_txn *txn;
1921
1922     ovs_assert(!idl->txn);
1923     idl->txn = txn = xmalloc(sizeof *txn);
1924     txn->request_id = NULL;
1925     txn->idl = idl;
1926     hmap_init(&txn->txn_rows);
1927     txn->status = TXN_UNCOMMITTED;
1928     txn->error = NULL;
1929     txn->dry_run = false;
1930     ds_init(&txn->comment);
1931
1932     txn->inc_table = NULL;
1933     txn->inc_column = NULL;
1934
1935     hmap_init(&txn->inserted_rows);
1936
1937     return txn;
1938 }
1939
1940 /* Appends 's', which is treated as a printf()-type format string, to the
1941  * comments that will be passed to the OVSDB server when 'txn' is committed.
1942  * (The comment will be committed to the OVSDB log, which "ovsdb-tool
1943  * show-log" can print in a relatively human-readable form.) */
1944 void
1945 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s, ...)
1946 {
1947     va_list args;
1948
1949     if (txn->comment.length) {
1950         ds_put_char(&txn->comment, '\n');
1951     }
1952
1953     va_start(args, s);
1954     ds_put_format_valist(&txn->comment, s, args);
1955     va_end(args);
1956 }
1957
1958 /* Marks 'txn' as a transaction that will not actually modify the database.  In
1959  * almost every way, the transaction is treated like other transactions.  It
1960  * must be committed or aborted like other transactions, it will be sent to the
1961  * database server like other transactions, and so on.  The only difference is
1962  * that the operations sent to the database server will include, as the last
1963  * step, an "abort" operation, so that any changes made by the transaction will
1964  * not actually take effect. */
1965 void
1966 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
1967 {
1968     txn->dry_run = true;
1969 }
1970
1971 /* Causes 'txn', when committed, to increment the value of 'column' within
1972  * 'row' by 1.  'column' must have an integer type.  After 'txn' commits
1973  * successfully, the client may retrieve the final (incremented) value of
1974  * 'column' with ovsdb_idl_txn_get_increment_new_value().
1975  *
1976  * The client could accomplish something similar with ovsdb_idl_read(),
1977  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
1978  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
1979  * will never (by itself) fail because of a verify error.
1980  *
1981  * The intended use is for incrementing the "next_cfg" column in the
1982  * Open_vSwitch table. */
1983 void
1984 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
1985                         const struct ovsdb_idl_row *row,
1986                         const struct ovsdb_idl_column *column)
1987 {
1988     ovs_assert(!txn->inc_table);
1989     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
1990     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
1991
1992     txn->inc_table = row->table->class->name;
1993     txn->inc_column = column->name;
1994     txn->inc_row = row->uuid;
1995 }
1996
1997 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
1998  * has been called for 'txn' but the commit is still incomplete (that is, the
1999  * last call returned TXN_INCOMPLETE) then the transaction may or may not still
2000  * end up committing at the database server, but the client will not be able to
2001  * get any further status information back. */
2002 void
2003 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
2004 {
2005     struct ovsdb_idl_txn_insert *insert, *next;
2006
2007     json_destroy(txn->request_id);
2008     if (txn->status == TXN_INCOMPLETE) {
2009         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2010     }
2011     ovsdb_idl_txn_abort(txn);
2012     ds_destroy(&txn->comment);
2013     free(txn->error);
2014     HMAP_FOR_EACH_SAFE (insert, next, hmap_node, &txn->inserted_rows) {
2015         free(insert);
2016     }
2017     hmap_destroy(&txn->inserted_rows);
2018     free(txn);
2019 }
2020
2021 /* Causes poll_block() to wake up if 'txn' has completed committing. */
2022 void
2023 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
2024 {
2025     if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
2026         poll_immediate_wake();
2027     }
2028 }
2029
2030 static struct json *
2031 where_uuid_equals(const struct uuid *uuid)
2032 {
2033     return
2034         json_array_create_1(
2035             json_array_create_3(
2036                 json_string_create("_uuid"),
2037                 json_string_create("=="),
2038                 json_array_create_2(
2039                     json_string_create("uuid"),
2040                     json_string_create_nocopy(
2041                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
2042 }
2043
2044 static char *
2045 uuid_name_from_uuid(const struct uuid *uuid)
2046 {
2047     char *name;
2048     char *p;
2049
2050     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
2051     for (p = name; *p != '\0'; p++) {
2052         if (*p == '-') {
2053             *p = '_';
2054         }
2055     }
2056
2057     return name;
2058 }
2059
2060 static const struct ovsdb_idl_row *
2061 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
2062 {
2063     const struct ovsdb_idl_row *row;
2064
2065     HMAP_FOR_EACH_WITH_HASH (row, txn_node, uuid_hash(uuid), &txn->txn_rows) {
2066         if (uuid_equals(&row->uuid, uuid)) {
2067             return row;
2068         }
2069     }
2070     return NULL;
2071 }
2072
2073 /* XXX there must be a cleaner way to do this */
2074 static struct json *
2075 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
2076 {
2077     if (json->type == JSON_ARRAY) {
2078         struct uuid uuid;
2079         size_t i;
2080
2081         if (json->u.array.n == 2
2082             && json->u.array.elems[0]->type == JSON_STRING
2083             && json->u.array.elems[1]->type == JSON_STRING
2084             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
2085             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
2086             const struct ovsdb_idl_row *row;
2087
2088             row = ovsdb_idl_txn_get_row(txn, &uuid);
2089             if (row && !row->old && row->new) {
2090                 json_destroy(json);
2091
2092                 return json_array_create_2(
2093                     json_string_create("named-uuid"),
2094                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
2095             }
2096         }
2097
2098         for (i = 0; i < json->u.array.n; i++) {
2099             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
2100                                                       txn);
2101         }
2102     } else if (json->type == JSON_OBJECT) {
2103         struct shash_node *node;
2104
2105         SHASH_FOR_EACH (node, json_object(json)) {
2106             node->data = substitute_uuids(node->data, txn);
2107         }
2108     }
2109     return json;
2110 }
2111
2112 static void
2113 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
2114 {
2115     struct ovsdb_idl_row *row, *next;
2116
2117     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
2118      * ovsdb_idl_column's 'parse' function, which will call
2119      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
2120      * transaction and fail to update the graph.  */
2121     txn->idl->txn = NULL;
2122
2123     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
2124         if (row->old) {
2125             if (row->written) {
2126                 ovsdb_idl_row_unparse(row);
2127                 ovsdb_idl_row_clear_arcs(row, false);
2128                 ovsdb_idl_row_parse(row);
2129             }
2130         } else {
2131             ovsdb_idl_row_unparse(row);
2132         }
2133         ovsdb_idl_row_clear_new(row);
2134
2135         free(row->prereqs);
2136         row->prereqs = NULL;
2137
2138         free(row->written);
2139         row->written = NULL;
2140
2141         hmap_remove(&txn->txn_rows, &row->txn_node);
2142         hmap_node_nullify(&row->txn_node);
2143         if (!row->old) {
2144             hmap_remove(&row->table->rows, &row->hmap_node);
2145             free(row);
2146         }
2147     }
2148     hmap_destroy(&txn->txn_rows);
2149     hmap_init(&txn->txn_rows);
2150 }
2151
2152 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
2153  * of the following TXN_* constants:
2154  *
2155  *   TXN_INCOMPLETE:
2156  *
2157  *       The transaction is in progress, but not yet complete.  The caller
2158  *       should call again later, after calling ovsdb_idl_run() to let the IDL
2159  *       do OVSDB protocol processing.
2160  *
2161  *   TXN_UNCHANGED:
2162  *
2163  *       The transaction is complete.  (It didn't actually change the database,
2164  *       so the IDL didn't send any request to the database server.)
2165  *
2166  *   TXN_ABORTED:
2167  *
2168  *       The caller previously called ovsdb_idl_txn_abort().
2169  *
2170  *   TXN_SUCCESS:
2171  *
2172  *       The transaction was successful.  The update made by the transaction
2173  *       (and possibly other changes made by other database clients) should
2174  *       already be visible in the IDL.
2175  *
2176  *   TXN_TRY_AGAIN:
2177  *
2178  *       The transaction failed for some transient reason, e.g. because a
2179  *       "verify" operation reported an inconsistency or due to a network
2180  *       problem.  The caller should wait for a change to the database, then
2181  *       compose a new transaction, and commit the new transaction.
2182  *
2183  *       Use the return value of ovsdb_idl_get_seqno() to wait for a change in
2184  *       the database.  It is important to use its return value *before* the
2185  *       initial call to ovsdb_idl_txn_commit() as the baseline for this
2186  *       purpose, because the change that one should wait for can happen after
2187  *       the initial call but before the call that returns TXN_TRY_AGAIN, and
2188  *       using some other baseline value in that situation could cause an
2189  *       indefinite wait if the database rarely changes.
2190  *
2191  *   TXN_NOT_LOCKED:
2192  *
2193  *       The transaction failed because the IDL has been configured to require
2194  *       a database lock (with ovsdb_idl_set_lock()) but didn't get it yet or
2195  *       has already lost it.
2196  *
2197  * Committing a transaction rolls back all of the changes that it made to the
2198  * IDL's copy of the database.  If the transaction commits successfully, then
2199  * the database server will send an update and, thus, the IDL will be updated
2200  * with the committed changes. */
2201 enum ovsdb_idl_txn_status
2202 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
2203 {
2204     struct ovsdb_idl_row *row;
2205     struct json *operations;
2206     bool any_updates;
2207
2208     if (txn != txn->idl->txn) {
2209         goto coverage_out;
2210     }
2211
2212     /* If we need a lock but don't have it, give up quickly. */
2213     if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
2214         txn->status = TXN_NOT_LOCKED;
2215         goto disassemble_out;
2216     }
2217
2218     operations = json_array_create_1(
2219         json_string_create(txn->idl->class->database));
2220
2221     /* Assert that we have the required lock (avoiding a race). */
2222     if (txn->idl->lock_name) {
2223         struct json *op = json_object_create();
2224         json_array_add(operations, op);
2225         json_object_put_string(op, "op", "assert");
2226         json_object_put_string(op, "lock", txn->idl->lock_name);
2227     }
2228
2229     /* Add prerequisites and declarations of new rows. */
2230     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2231         /* XXX check that deleted rows exist even if no prereqs? */
2232         if (row->prereqs) {
2233             const struct ovsdb_idl_table_class *class = row->table->class;
2234             size_t n_columns = class->n_columns;
2235             struct json *op, *columns, *row_json;
2236             size_t idx;
2237
2238             op = json_object_create();
2239             json_array_add(operations, op);
2240             json_object_put_string(op, "op", "wait");
2241             json_object_put_string(op, "table", class->name);
2242             json_object_put(op, "timeout", json_integer_create(0));
2243             json_object_put(op, "where", where_uuid_equals(&row->uuid));
2244             json_object_put_string(op, "until", "==");
2245             columns = json_array_create_empty();
2246             json_object_put(op, "columns", columns);
2247             row_json = json_object_create();
2248             json_object_put(op, "rows", json_array_create_1(row_json));
2249
2250             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
2251                 const struct ovsdb_idl_column *column = &class->columns[idx];
2252                 json_array_add(columns, json_string_create(column->name));
2253                 json_object_put(row_json, column->name,
2254                                 ovsdb_datum_to_json(&row->old[idx],
2255                                                     &column->type));
2256             }
2257         }
2258     }
2259
2260     /* Add updates. */
2261     any_updates = false;
2262     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2263         const struct ovsdb_idl_table_class *class = row->table->class;
2264
2265         if (!row->new) {
2266             if (class->is_root) {
2267                 struct json *op = json_object_create();
2268                 json_object_put_string(op, "op", "delete");
2269                 json_object_put_string(op, "table", class->name);
2270                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2271                 json_array_add(operations, op);
2272                 any_updates = true;
2273             } else {
2274                 /* Let ovsdb-server decide whether to really delete it. */
2275             }
2276         } else if (row->old != row->new) {
2277             struct json *row_json;
2278             struct json *op;
2279             size_t idx;
2280
2281             op = json_object_create();
2282             json_object_put_string(op, "op", row->old ? "update" : "insert");
2283             json_object_put_string(op, "table", class->name);
2284             if (row->old) {
2285                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2286             } else {
2287                 struct ovsdb_idl_txn_insert *insert;
2288
2289                 any_updates = true;
2290
2291                 json_object_put(op, "uuid-name",
2292                                 json_string_create_nocopy(
2293                                     uuid_name_from_uuid(&row->uuid)));
2294
2295                 insert = xmalloc(sizeof *insert);
2296                 insert->dummy = row->uuid;
2297                 insert->op_index = operations->u.array.n - 1;
2298                 uuid_zero(&insert->real);
2299                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
2300                             uuid_hash(&insert->dummy));
2301             }
2302             row_json = json_object_create();
2303             json_object_put(op, "row", row_json);
2304
2305             if (row->written) {
2306                 BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
2307                     const struct ovsdb_idl_column *column =
2308                                                         &class->columns[idx];
2309
2310                     if (row->old
2311                         || !ovsdb_datum_is_default(&row->new[idx],
2312                                                   &column->type)) {
2313                         json_object_put(row_json, column->name,
2314                                         substitute_uuids(
2315                                             ovsdb_datum_to_json(&row->new[idx],
2316                                                                 &column->type),
2317                                             txn));
2318
2319                         /* If anything really changed, consider it an update.
2320                          * We can't suppress not-really-changed values earlier
2321                          * or transactions would become nonatomic (see the big
2322                          * comment inside ovsdb_idl_txn_write()). */
2323                         if (!any_updates && row->old &&
2324                             !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
2325                                                 &column->type)) {
2326                             any_updates = true;
2327                         }
2328                     }
2329                 }
2330             }
2331
2332             if (!row->old || !shash_is_empty(json_object(row_json))) {
2333                 json_array_add(operations, op);
2334             } else {
2335                 json_destroy(op);
2336             }
2337         }
2338     }
2339
2340     /* Add increment. */
2341     if (txn->inc_table && any_updates) {
2342         struct json *op;
2343
2344         txn->inc_index = operations->u.array.n - 1;
2345
2346         op = json_object_create();
2347         json_object_put_string(op, "op", "mutate");
2348         json_object_put_string(op, "table", txn->inc_table);
2349         json_object_put(op, "where",
2350                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2351                                          txn));
2352         json_object_put(op, "mutations",
2353                         json_array_create_1(
2354                             json_array_create_3(
2355                                 json_string_create(txn->inc_column),
2356                                 json_string_create("+="),
2357                                 json_integer_create(1))));
2358         json_array_add(operations, op);
2359
2360         op = json_object_create();
2361         json_object_put_string(op, "op", "select");
2362         json_object_put_string(op, "table", txn->inc_table);
2363         json_object_put(op, "where",
2364                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2365                                          txn));
2366         json_object_put(op, "columns",
2367                         json_array_create_1(json_string_create(
2368                                                 txn->inc_column)));
2369         json_array_add(operations, op);
2370     }
2371
2372     if (txn->comment.length) {
2373         struct json *op = json_object_create();
2374         json_object_put_string(op, "op", "comment");
2375         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
2376         json_array_add(operations, op);
2377     }
2378
2379     if (txn->dry_run) {
2380         struct json *op = json_object_create();
2381         json_object_put_string(op, "op", "abort");
2382         json_array_add(operations, op);
2383     }
2384
2385     if (!any_updates) {
2386         txn->status = TXN_UNCHANGED;
2387         json_destroy(operations);
2388     } else if (!jsonrpc_session_send(
2389                    txn->idl->session,
2390                    jsonrpc_create_request(
2391                        "transact", operations, &txn->request_id))) {
2392         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
2393                     json_hash(txn->request_id, 0));
2394         txn->status = TXN_INCOMPLETE;
2395     } else {
2396         txn->status = TXN_TRY_AGAIN;
2397     }
2398
2399 disassemble_out:
2400     ovsdb_idl_txn_disassemble(txn);
2401 coverage_out:
2402     switch (txn->status) {
2403     case TXN_UNCOMMITTED:   COVERAGE_INC(txn_uncommitted);    break;
2404     case TXN_UNCHANGED:     COVERAGE_INC(txn_unchanged);      break;
2405     case TXN_INCOMPLETE:    COVERAGE_INC(txn_incomplete);     break;
2406     case TXN_ABORTED:       COVERAGE_INC(txn_aborted);        break;
2407     case TXN_SUCCESS:       COVERAGE_INC(txn_success);        break;
2408     case TXN_TRY_AGAIN:     COVERAGE_INC(txn_try_again);      break;
2409     case TXN_NOT_LOCKED:    COVERAGE_INC(txn_not_locked);     break;
2410     case TXN_ERROR:         COVERAGE_INC(txn_error);          break;
2411     }
2412
2413     return txn->status;
2414 }
2415
2416 /* Attempts to commit 'txn', blocking until the commit either succeeds or
2417  * fails.  Returns the final commit status, which may be any TXN_* value other
2418  * than TXN_INCOMPLETE.
2419  *
2420  * This function calls ovsdb_idl_run() on 'txn''s IDL, so it may cause the
2421  * return value of ovsdb_idl_get_seqno() to change. */
2422 enum ovsdb_idl_txn_status
2423 ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
2424 {
2425     enum ovsdb_idl_txn_status status;
2426
2427     fatal_signal_run();
2428     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
2429         ovsdb_idl_run(txn->idl);
2430         ovsdb_idl_wait(txn->idl);
2431         ovsdb_idl_txn_wait(txn);
2432         poll_block();
2433     }
2434     return status;
2435 }
2436
2437 /* Returns the final (incremented) value of the column in 'txn' that was set to
2438  * be incremented by ovsdb_idl_txn_increment().  'txn' must have committed
2439  * successfully. */
2440 int64_t
2441 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
2442 {
2443     ovs_assert(txn->status == TXN_SUCCESS);
2444     return txn->inc_new_value;
2445 }
2446
2447 /* Aborts 'txn' without sending it to the database server.  This is effective
2448  * only if ovsdb_idl_txn_commit() has not yet been called for 'txn'.
2449  * Otherwise, it has no effect.
2450  *
2451  * Aborting a transaction doesn't free its memory.  Use
2452  * ovsdb_idl_txn_destroy() to do that. */
2453 void
2454 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
2455 {
2456     ovsdb_idl_txn_disassemble(txn);
2457     if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
2458         txn->status = TXN_ABORTED;
2459     }
2460 }
2461
2462 /* Returns a string that reports the error status for 'txn'.  The caller must
2463  * not modify or free the returned string.  A call to ovsdb_idl_txn_destroy()
2464  * for 'txn' may free the returned string.
2465  *
2466  * The return value is ordinarily one of the strings that
2467  * ovsdb_idl_txn_status_to_string() would return, but if the transaction failed
2468  * due to an error reported by the database server, the return value is that
2469  * error. */
2470 const char *
2471 ovsdb_idl_txn_get_error(const struct ovsdb_idl_txn *txn)
2472 {
2473     if (txn->status != TXN_ERROR) {
2474         return ovsdb_idl_txn_status_to_string(txn->status);
2475     } else if (txn->error) {
2476         return txn->error;
2477     } else {
2478         return "no error details available";
2479     }
2480 }
2481
2482 static void
2483 ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn,
2484                              const struct json *json)
2485 {
2486     if (txn->error == NULL) {
2487         txn->error = json_to_string(json, JSSF_SORT);
2488     }
2489 }
2490
2491 /* For transaction 'txn' that completed successfully, finds and returns the
2492  * permanent UUID that the database assigned to a newly inserted row, given the
2493  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
2494  *
2495  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
2496  * if it was assigned by that function and then deleted by
2497  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
2498  * and then deleted within a single transaction are never sent to the database
2499  * server, so it never assigns them a permanent UUID.) */
2500 const struct uuid *
2501 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
2502                               const struct uuid *uuid)
2503 {
2504     const struct ovsdb_idl_txn_insert *insert;
2505
2506     ovs_assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
2507     HMAP_FOR_EACH_IN_BUCKET (insert, hmap_node,
2508                              uuid_hash(uuid), &txn->inserted_rows) {
2509         if (uuid_equals(uuid, &insert->dummy)) {
2510             return &insert->real;
2511         }
2512     }
2513     return NULL;
2514 }
2515
2516 static void
2517 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
2518                        enum ovsdb_idl_txn_status status)
2519 {
2520     txn->status = status;
2521     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2522 }
2523
2524 /* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
2525  * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
2526  * ovs-vswitchd).
2527  *
2528  * 'datum' must have the correct type for its column.  The IDL does not check
2529  * that it meets schema constraints, but ovsdb-server will do so at commit time
2530  * so it had better be correct.
2531  *
2532  * A transaction must be in progress.  Replication of 'column' must not have
2533  * been disabled (by calling ovsdb_idl_omit()).
2534  *
2535  * Usually this function is used indirectly through one of the "set" functions
2536  * generated by ovsdb-idlc.
2537  *
2538  * Takes ownership of what 'datum' points to (and in some cases destroys that
2539  * data before returning) but makes a copy of 'datum' itself.  (Commonly
2540  * 'datum' is on the caller's stack.) */
2541 static void
2542 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
2543                       const struct ovsdb_idl_column *column,
2544                       struct ovsdb_datum *datum, bool owns_datum)
2545 {
2546     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2547     const struct ovsdb_idl_table_class *class;
2548     size_t column_idx;
2549     bool write_only;
2550
2551     if (ovsdb_idl_row_is_synthetic(row)) {
2552         goto discard_datum;
2553     }
2554
2555     class = row->table->class;
2556     column_idx = column - class->columns;
2557     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
2558
2559     ovs_assert(row->new != NULL);
2560     ovs_assert(column_idx < class->n_columns);
2561     ovs_assert(row->old == NULL ||
2562                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2563
2564     if (row->table->idl->verify_write_only && !write_only) {
2565         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
2566                  " explicitly configured not to.", class->name, column->name);
2567         goto discard_datum;
2568     }
2569
2570     /* If this is a write-only column and the datum being written is the same
2571      * as the one already there, just skip the update entirely.  This is worth
2572      * optimizing because we have a lot of columns that get periodically
2573      * refreshed into the database but don't actually change that often.
2574      *
2575      * We don't do this for read/write columns because that would break
2576      * atomicity of transactions--some other client might have written a
2577      * different value in that column since we read it.  (But if a whole
2578      * transaction only does writes of existing values, without making any real
2579      * changes, we will drop the whole transaction later in
2580      * ovsdb_idl_txn_commit().) */
2581     if (write_only && ovsdb_datum_equals(ovsdb_idl_read(row, column),
2582                                          datum, &column->type)) {
2583         goto discard_datum;
2584     }
2585
2586     if (hmap_node_is_null(&row->txn_node)) {
2587         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2588                     uuid_hash(&row->uuid));
2589     }
2590     if (row->old == row->new) {
2591         row->new = xmalloc(class->n_columns * sizeof *row->new);
2592     }
2593     if (!row->written) {
2594         row->written = bitmap_allocate(class->n_columns);
2595     }
2596     if (bitmap_is_set(row->written, column_idx)) {
2597         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
2598     } else {
2599         bitmap_set1(row->written, column_idx);
2600     }
2601     if (owns_datum) {
2602         row->new[column_idx] = *datum;
2603     } else {
2604         ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
2605     }
2606     (column->unparse)(row);
2607     (column->parse)(row, &row->new[column_idx]);
2608     return;
2609
2610 discard_datum:
2611     if (owns_datum) {
2612         ovsdb_datum_destroy(datum, &column->type);
2613     }
2614 }
2615
2616 void
2617 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
2618                     const struct ovsdb_idl_column *column,
2619                     struct ovsdb_datum *datum)
2620 {
2621     ovsdb_idl_txn_write__(row, column, datum, true);
2622 }
2623
2624 void
2625 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
2626                           const struct ovsdb_idl_column *column,
2627                           const struct ovsdb_datum *datum)
2628 {
2629     ovsdb_idl_txn_write__(row, column,
2630                           CONST_CAST(struct ovsdb_datum *, datum), false);
2631 }
2632
2633 /* Causes the original contents of 'column' in 'row_' to be verified as a
2634  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
2635  * changed (or if 'row_' was deleted) between the time that the IDL originally
2636  * read its contents and the time that the transaction commits, then the
2637  * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
2638  * TXN_AGAIN_NOW (depending on whether the database change has already been
2639  * received).
2640  *
2641  * The intention is that, to ensure that no transaction commits based on dirty
2642  * reads, an application should call ovsdb_idl_txn_verify() on each data item
2643  * read as part of a read-modify-write operation.
2644  *
2645  * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
2646  * value of 'column' is already known:
2647  *
2648  *   - If 'row_' is a row created by the current transaction (returned by
2649  *     ovsdb_idl_txn_insert()).
2650  *
2651  *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
2652  *     within the current transaction.
2653  *
2654  * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
2655  * ovsdb_idl_txn_write() for a given read-modify-write.
2656  *
2657  * A transaction must be in progress.
2658  *
2659  * Usually this function is used indirectly through one of the "verify"
2660  * functions generated by ovsdb-idlc. */
2661 void
2662 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
2663                      const struct ovsdb_idl_column *column)
2664 {
2665     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2666     const struct ovsdb_idl_table_class *class;
2667     size_t column_idx;
2668
2669     if (ovsdb_idl_row_is_synthetic(row)) {
2670         return;
2671     }
2672
2673     class = row->table->class;
2674     column_idx = column - class->columns;
2675
2676     ovs_assert(row->new != NULL);
2677     ovs_assert(row->old == NULL ||
2678                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2679     if (!row->old
2680         || (row->written && bitmap_is_set(row->written, column_idx))) {
2681         return;
2682     }
2683
2684     if (hmap_node_is_null(&row->txn_node)) {
2685         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2686                     uuid_hash(&row->uuid));
2687     }
2688     if (!row->prereqs) {
2689         row->prereqs = bitmap_allocate(class->n_columns);
2690     }
2691     bitmap_set1(row->prereqs, column_idx);
2692 }
2693
2694 /* Deletes 'row_' from its table.  May free 'row_', so it must not be
2695  * accessed afterward.
2696  *
2697  * A transaction must be in progress.
2698  *
2699  * Usually this function is used indirectly through one of the "delete"
2700  * functions generated by ovsdb-idlc. */
2701 void
2702 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
2703 {
2704     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2705
2706     if (ovsdb_idl_row_is_synthetic(row)) {
2707         return;
2708     }
2709
2710     ovs_assert(row->new != NULL);
2711     if (!row->old) {
2712         ovsdb_idl_row_unparse(row);
2713         ovsdb_idl_row_clear_new(row);
2714         ovs_assert(!row->prereqs);
2715         hmap_remove(&row->table->rows, &row->hmap_node);
2716         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
2717         free(row);
2718         return;
2719     }
2720     if (hmap_node_is_null(&row->txn_node)) {
2721         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2722                     uuid_hash(&row->uuid));
2723     }
2724     ovsdb_idl_row_clear_new(row);
2725     row->new = NULL;
2726 }
2727
2728 /* Inserts and returns a new row in the table with the specified 'class' in the
2729  * database with open transaction 'txn'.
2730  *
2731  * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
2732  * randomly generated; otherwise 'uuid' should specify a randomly generated
2733  * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
2734  * 'txn' is committed, but the IDL will replace any uses of the provisional
2735  * UUID in the data to be to be committed by the UUID assigned by
2736  * ovsdb-server.
2737  *
2738  * Usually this function is used indirectly through one of the "insert"
2739  * functions generated by ovsdb-idlc. */
2740 const struct ovsdb_idl_row *
2741 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
2742                      const struct ovsdb_idl_table_class *class,
2743                      const struct uuid *uuid)
2744 {
2745     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
2746
2747     if (uuid) {
2748         ovs_assert(!ovsdb_idl_txn_get_row(txn, uuid));
2749         row->uuid = *uuid;
2750     } else {
2751         uuid_generate(&row->uuid);
2752     }
2753
2754     row->table = ovsdb_idl_table_from_class(txn->idl, class);
2755     row->new = xmalloc(class->n_columns * sizeof *row->new);
2756     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
2757     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
2758     return row;
2759 }
2760
2761 static void
2762 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
2763 {
2764     struct ovsdb_idl_txn *txn;
2765
2766     HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
2767         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
2768     }
2769 }
2770
2771 static struct ovsdb_idl_txn *
2772 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
2773 {
2774     struct ovsdb_idl_txn *txn;
2775
2776     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
2777                              json_hash(id, 0), &idl->outstanding_txns) {
2778         if (json_equal(id, txn->request_id)) {
2779             return txn;
2780         }
2781     }
2782     return NULL;
2783 }
2784
2785 static bool
2786 check_json_type(const struct json *json, enum json_type type, const char *name)
2787 {
2788     if (!json) {
2789         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
2790         return false;
2791     } else if (json->type != type) {
2792         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
2793                      name, json_type_to_string(json->type),
2794                      json_type_to_string(type));
2795         return false;
2796     } else {
2797         return true;
2798     }
2799 }
2800
2801 static bool
2802 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
2803                                 const struct json_array *results)
2804 {
2805     struct json *count, *rows, *row, *column;
2806     struct shash *mutate, *select;
2807
2808     if (txn->inc_index + 2 > results->n) {
2809         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2810                      "for increment (has %"PRIuSIZE", needs %u)",
2811                      results->n, txn->inc_index + 2);
2812         return false;
2813     }
2814
2815     /* We know that this is a JSON object because the loop in
2816      * ovsdb_idl_txn_process_reply() checked. */
2817     mutate = json_object(results->elems[txn->inc_index]);
2818     count = shash_find_data(mutate, "count");
2819     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
2820         return false;
2821     }
2822     if (count->u.integer != 1) {
2823         VLOG_WARN_RL(&syntax_rl,
2824                      "\"mutate\" reply \"count\" is %lld instead of 1",
2825                      count->u.integer);
2826         return false;
2827     }
2828
2829     select = json_object(results->elems[txn->inc_index + 1]);
2830     rows = shash_find_data(select, "rows");
2831     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
2832         return false;
2833     }
2834     if (rows->u.array.n != 1) {
2835         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %"PRIuSIZE" elements "
2836                      "instead of 1",
2837                      rows->u.array.n);
2838         return false;
2839     }
2840     row = rows->u.array.elems[0];
2841     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
2842         return false;
2843     }
2844     column = shash_find_data(json_object(row), txn->inc_column);
2845     if (!check_json_type(column, JSON_INTEGER,
2846                          "\"select\" reply inc column")) {
2847         return false;
2848     }
2849     txn->inc_new_value = column->u.integer;
2850     return true;
2851 }
2852
2853 static bool
2854 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
2855                                    const struct json_array *results)
2856 {
2857     static const struct ovsdb_base_type uuid_type = OVSDB_BASE_UUID_INIT;
2858     struct ovsdb_error *error;
2859     struct json *json_uuid;
2860     union ovsdb_atom uuid;
2861     struct shash *reply;
2862
2863     if (insert->op_index >= results->n) {
2864         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2865                      "for insert (has %"PRIuSIZE", needs %u)",
2866                      results->n, insert->op_index);
2867         return false;
2868     }
2869
2870     /* We know that this is a JSON object because the loop in
2871      * ovsdb_idl_txn_process_reply() checked. */
2872     reply = json_object(results->elems[insert->op_index]);
2873     json_uuid = shash_find_data(reply, "uuid");
2874     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
2875         return false;
2876     }
2877
2878     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
2879     if (error) {
2880         char *s = ovsdb_error_to_string(error);
2881         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
2882                      "UUID: %s", s);
2883         free(s);
2884         ovsdb_error_destroy(error);
2885         return false;
2886     }
2887
2888     insert->real = uuid.uuid;
2889
2890     return true;
2891 }
2892
2893 static bool
2894 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
2895                             const struct jsonrpc_msg *msg)
2896 {
2897     struct ovsdb_idl_txn *txn;
2898     enum ovsdb_idl_txn_status status;
2899
2900     txn = ovsdb_idl_txn_find(idl, msg->id);
2901     if (!txn) {
2902         return false;
2903     }
2904
2905     if (msg->type == JSONRPC_ERROR) {
2906         status = TXN_ERROR;
2907     } else if (msg->result->type != JSON_ARRAY) {
2908         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
2909         status = TXN_ERROR;
2910     } else {
2911         struct json_array *ops = &msg->result->u.array;
2912         int hard_errors = 0;
2913         int soft_errors = 0;
2914         int lock_errors = 0;
2915         size_t i;
2916
2917         for (i = 0; i < ops->n; i++) {
2918             struct json *op = ops->elems[i];
2919
2920             if (op->type == JSON_NULL) {
2921                 /* This isn't an error in itself but indicates that some prior
2922                  * operation failed, so make sure that we know about it. */
2923                 soft_errors++;
2924             } else if (op->type == JSON_OBJECT) {
2925                 struct json *error;
2926
2927                 error = shash_find_data(json_object(op), "error");
2928                 if (error) {
2929                     if (error->type == JSON_STRING) {
2930                         if (!strcmp(error->u.string, "timed out")) {
2931                             soft_errors++;
2932                         } else if (!strcmp(error->u.string, "not owner")) {
2933                             lock_errors++;
2934                         } else if (strcmp(error->u.string, "aborted")) {
2935                             hard_errors++;
2936                             ovsdb_idl_txn_set_error_json(txn, op);
2937                         }
2938                     } else {
2939                         hard_errors++;
2940                         ovsdb_idl_txn_set_error_json(txn, op);
2941                         VLOG_WARN_RL(&syntax_rl,
2942                                      "\"error\" in reply is not JSON string");
2943                     }
2944                 }
2945             } else {
2946                 hard_errors++;
2947                 ovsdb_idl_txn_set_error_json(txn, op);
2948                 VLOG_WARN_RL(&syntax_rl,
2949                              "operation reply is not JSON null or object");
2950             }
2951         }
2952
2953         if (!soft_errors && !hard_errors && !lock_errors) {
2954             struct ovsdb_idl_txn_insert *insert;
2955
2956             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
2957                 hard_errors++;
2958             }
2959
2960             HMAP_FOR_EACH (insert, hmap_node, &txn->inserted_rows) {
2961                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
2962                     hard_errors++;
2963                 }
2964             }
2965         }
2966
2967         status = (hard_errors ? TXN_ERROR
2968                   : lock_errors ? TXN_NOT_LOCKED
2969                   : soft_errors ? TXN_TRY_AGAIN
2970                   : TXN_SUCCESS);
2971     }
2972
2973     ovsdb_idl_txn_complete(txn, status);
2974     return true;
2975 }
2976
2977 /* Returns the transaction currently active for 'row''s IDL.  A transaction
2978  * must currently be active. */
2979 struct ovsdb_idl_txn *
2980 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
2981 {
2982     struct ovsdb_idl_txn *txn = row->table->idl->txn;
2983     ovs_assert(txn != NULL);
2984     return txn;
2985 }
2986
2987 /* Returns the IDL on which 'txn' acts. */
2988 struct ovsdb_idl *
2989 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
2990 {
2991     return txn->idl;
2992 }
2993
2994 /* Blocks until 'idl' successfully connects to the remote database and
2995  * retrieves its contents. */
2996 void
2997 ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
2998 {
2999     while (1) {
3000         ovsdb_idl_run(idl);
3001         if (ovsdb_idl_has_ever_connected(idl)) {
3002             return;
3003         }
3004         ovsdb_idl_wait(idl);
3005         poll_block();
3006     }
3007 }
3008 \f
3009 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
3010  * the database server and to avoid modifying the database when the lock cannot
3011  * be acquired (that is, when another client has the same lock).
3012  *
3013  * If 'lock_name' is NULL, drops the locking requirement and releases the
3014  * lock. */
3015 void
3016 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
3017 {
3018     ovs_assert(!idl->txn);
3019     ovs_assert(hmap_is_empty(&idl->outstanding_txns));
3020
3021     if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
3022         /* Release previous lock. */
3023         ovsdb_idl_send_unlock_request(idl);
3024         free(idl->lock_name);
3025         idl->lock_name = NULL;
3026         idl->is_lock_contended = false;
3027     }
3028
3029     if (lock_name && !idl->lock_name) {
3030         /* Acquire new lock. */
3031         idl->lock_name = xstrdup(lock_name);
3032         ovsdb_idl_send_lock_request(idl);
3033     }
3034 }
3035
3036 /* Returns true if 'idl' is configured to obtain a lock and owns that lock.
3037  *
3038  * Locking and unlocking happens asynchronously from the database client's
3039  * point of view, so the information is only useful for optimization (e.g. if
3040  * the client doesn't have the lock then there's no point in trying to write to
3041  * the database). */
3042 bool
3043 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
3044 {
3045     return idl->has_lock;
3046 }
3047
3048 /* Returns true if 'idl' is configured to obtain a lock but the database server
3049  * has indicated that some other client already owns the requested lock. */
3050 bool
3051 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
3052 {
3053     return idl->is_lock_contended;
3054 }
3055
3056 static void
3057 ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
3058 {
3059     if (new_has_lock && !idl->has_lock) {
3060         if (idl->state == IDL_S_MONITORING ||
3061             idl->state == IDL_S_MONITORING2) {
3062             idl->change_seqno++;
3063         } else {
3064             /* We're setting up a session, so don't signal that the database
3065              * changed.  Finalizing the session will increment change_seqno
3066              * anyhow. */
3067         }
3068         idl->is_lock_contended = false;
3069     }
3070     idl->has_lock = new_has_lock;
3071 }
3072
3073 static void
3074 ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
3075                               struct json **idp)
3076 {
3077     ovsdb_idl_update_has_lock(idl, false);
3078
3079     json_destroy(idl->lock_request_id);
3080     idl->lock_request_id = NULL;
3081
3082     if (jsonrpc_session_is_connected(idl->session)) {
3083         struct json *params;
3084
3085         params = json_array_create_1(json_string_create(idl->lock_name));
3086         jsonrpc_session_send(idl->session,
3087                              jsonrpc_create_request(method, params, idp));
3088     }
3089 }
3090
3091 static void
3092 ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
3093 {
3094     ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
3095 }
3096
3097 static void
3098 ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
3099 {
3100     ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
3101 }
3102
3103 static void
3104 ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
3105 {
3106     bool got_lock;
3107
3108     json_destroy(idl->lock_request_id);
3109     idl->lock_request_id = NULL;
3110
3111     if (result->type == JSON_OBJECT) {
3112         const struct json *locked;
3113
3114         locked = shash_find_data(json_object(result), "locked");
3115         got_lock = locked && locked->type == JSON_TRUE;
3116     } else {
3117         got_lock = false;
3118     }
3119
3120     ovsdb_idl_update_has_lock(idl, got_lock);
3121     if (!got_lock) {
3122         idl->is_lock_contended = true;
3123     }
3124 }
3125
3126 static void
3127 ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
3128                             const struct json *params,
3129                             bool new_has_lock)
3130 {
3131     if (idl->lock_name
3132         && params->type == JSON_ARRAY
3133         && json_array(params)->n > 0
3134         && json_array(params)->elems[0]->type == JSON_STRING) {
3135         const char *lock_name = json_string(json_array(params)->elems[0]);
3136
3137         if (!strcmp(idl->lock_name, lock_name)) {
3138             ovsdb_idl_update_has_lock(idl, new_has_lock);
3139             if (!new_has_lock) {
3140                 idl->is_lock_contended = true;
3141             }
3142         }
3143     }
3144 }
3145
3146 void
3147 ovsdb_idl_loop_destroy(struct ovsdb_idl_loop *loop)
3148 {
3149     if (loop) {
3150         ovsdb_idl_destroy(loop->idl);
3151     }
3152 }
3153
3154 struct ovsdb_idl_txn *
3155 ovsdb_idl_loop_run(struct ovsdb_idl_loop *loop)
3156 {
3157     ovsdb_idl_run(loop->idl);
3158     loop->open_txn = (loop->committing_txn
3159                       || ovsdb_idl_get_seqno(loop->idl) == loop->skip_seqno
3160                       ? NULL
3161                       : ovsdb_idl_txn_create(loop->idl));
3162     return loop->open_txn;
3163 }
3164
3165 void
3166 ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
3167 {
3168     if (loop->open_txn) {
3169         loop->committing_txn = loop->open_txn;
3170         loop->open_txn = NULL;
3171
3172         loop->precommit_seqno = ovsdb_idl_get_seqno(loop->idl);
3173     }
3174
3175     struct ovsdb_idl_txn *txn = loop->committing_txn;
3176     if (txn) {
3177         enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(txn);
3178         if (status != TXN_INCOMPLETE) {
3179             switch (status) {
3180             case TXN_TRY_AGAIN:
3181                 /* We want to re-evaluate the database when it's changed from
3182                  * the contents that it had when we started the commit.  (That
3183                  * might have already happened.) */
3184                 loop->skip_seqno = loop->precommit_seqno;
3185                 if (ovsdb_idl_get_seqno(loop->idl) != loop->skip_seqno) {
3186                     poll_immediate_wake();
3187                 }
3188                 break;
3189
3190             case TXN_SUCCESS:
3191                 /* If the database has already changed since we started the
3192                  * commit, re-evaluate it immediately to avoid missing a change
3193                  * for a while. */
3194                 if (ovsdb_idl_get_seqno(loop->idl) != loop->precommit_seqno) {
3195                     poll_immediate_wake();
3196                 }
3197                 break;
3198
3199             case TXN_UNCHANGED:
3200             case TXN_ABORTED:
3201             case TXN_NOT_LOCKED:
3202             case TXN_ERROR:
3203                 break;
3204
3205             case TXN_UNCOMMITTED:
3206             case TXN_INCOMPLETE:
3207                 OVS_NOT_REACHED();
3208
3209             }
3210             ovsdb_idl_txn_destroy(txn);
3211             loop->committing_txn = NULL;
3212         }
3213     }
3214
3215     ovsdb_idl_wait(loop->idl);
3216 }