list: Rename all functions in list.h with ovs_ prefix.
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "coverage.h"
27 #include "openvswitch/dynamic-string.h"
28 #include "fatal-signal.h"
29 #include "json.h"
30 #include "jsonrpc.h"
31 #include "ovsdb/ovsdb.h"
32 #include "ovsdb/table.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-idl-provider.h"
36 #include "ovsdb-parser.h"
37 #include "poll-loop.h"
38 #include "shash.h"
39 #include "sset.h"
40 #include "util.h"
41 #include "openvswitch/vlog.h"
42
43 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
44
45 COVERAGE_DEFINE(txn_uncommitted);
46 COVERAGE_DEFINE(txn_unchanged);
47 COVERAGE_DEFINE(txn_incomplete);
48 COVERAGE_DEFINE(txn_aborted);
49 COVERAGE_DEFINE(txn_success);
50 COVERAGE_DEFINE(txn_try_again);
51 COVERAGE_DEFINE(txn_not_locked);
52 COVERAGE_DEFINE(txn_error);
53
54 /* An arc from one idl_row to another.  When row A contains a UUID that
55  * references row B, this is represented by an arc from A (the source) to B
56  * (the destination).
57  *
58  * Arcs from a row to itself are omitted, that is, src and dst are always
59  * different.
60  *
61  * Arcs are never duplicated, that is, even if there are multiple references
62  * from A to B, there is only a single arc from A to B.
63  *
64  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
65  * A.  Both an arc and its converse may both be present, if each row refers
66  * to the other circularly.
67  *
68  * The source and destination row may be in the same table or in different
69  * tables.
70  */
71 struct ovsdb_idl_arc {
72     struct ovs_list src_node;   /* In src->src_arcs list. */
73     struct ovs_list dst_node;   /* In dst->dst_arcs list. */
74     struct ovsdb_idl_row *src;  /* Source row. */
75     struct ovsdb_idl_row *dst;  /* Destination row. */
76 };
77
78 enum ovsdb_idl_state {
79     IDL_S_SCHEMA_REQUESTED,
80     IDL_S_MONITOR_REQUESTED,
81     IDL_S_MONITORING,
82     IDL_S_MONITOR2_REQUESTED,
83     IDL_S_MONITORING2,
84     IDL_S_NO_SCHEMA
85 };
86
87 struct ovsdb_idl {
88     const struct ovsdb_idl_class *class;
89     struct jsonrpc_session *session;
90     struct shash table_by_name;
91     struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
92     unsigned int change_seqno;
93     bool verify_write_only;
94
95     /* Session state. */
96     unsigned int state_seqno;
97     enum ovsdb_idl_state state;
98     struct json *request_id;
99     struct json *schema;
100
101     /* Database locking. */
102     char *lock_name;            /* Name of lock we need, NULL if none. */
103     bool has_lock;              /* Has db server told us we have the lock? */
104     bool is_lock_contended;     /* Has db server told us we can't get lock? */
105     struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
106
107     /* Transaction support. */
108     struct ovsdb_idl_txn *txn;
109     struct hmap outstanding_txns;
110 };
111
112 struct ovsdb_idl_txn {
113     struct hmap_node hmap_node;
114     struct json *request_id;
115     struct ovsdb_idl *idl;
116     struct hmap txn_rows;
117     enum ovsdb_idl_txn_status status;
118     char *error;
119     bool dry_run;
120     struct ds comment;
121
122     /* Increments. */
123     const char *inc_table;
124     const char *inc_column;
125     struct uuid inc_row;
126     unsigned int inc_index;
127     int64_t inc_new_value;
128
129     /* Inserted rows. */
130     struct hmap inserted_rows;  /* Contains "struct ovsdb_idl_txn_insert"s. */
131 };
132
133 struct ovsdb_idl_txn_insert {
134     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
135     struct uuid dummy;          /* Dummy UUID used locally. */
136     int op_index;               /* Index into transaction's operation array. */
137     struct uuid real;           /* Real UUID used by database server. */
138 };
139
140 enum ovsdb_update_version {
141     OVSDB_UPDATE,               /* RFC 7047 "update" method. */
142     OVSDB_UPDATE2               /* "update2" Extension to RFC 7047.
143                                    See ovsdb-server(1) for more information. */
144 };
145
146 /* Name arrays indexed by 'enum ovsdb_update_version'. */
147 static const char *table_updates_names[] = {"table_updates", "table_updates2"};
148 static const char *table_update_names[] = {"table_update", "table_update2"};
149 static const char *row_update_names[] = {"row_update", "row_update2"};
150
151 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
152 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
153
154 static void ovsdb_idl_clear(struct ovsdb_idl *);
155 static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
156 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
157 static void ovsdb_idl_send_monitor2_request(struct ovsdb_idl *);
158 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *,
159                                    enum ovsdb_update_version);
160 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
161                                                     const struct json *,
162                                                     enum ovsdb_update_version);
163 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
164                                      const struct uuid *,
165                                      const struct json *old,
166                                      const struct json *new);
167 static bool ovsdb_idl_process_update2(struct ovsdb_idl_table *,
168                                       const struct uuid *,
169                                       const char *operation,
170                                       const struct json *row);
171 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
172 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
173 static bool ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
174 static bool ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *,
175                                          const struct json *);
176
177 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
178 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
179     const struct ovsdb_idl_table_class *);
180 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
181                                                   const struct uuid *);
182 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
183 static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *);
184
185 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
186 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
187 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
188 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
189 static void ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *, bool destroy_dsts);
190
191 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
192 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
193                                         const struct jsonrpc_msg *msg);
194
195 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
196 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
197 static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
198                                        const struct json *);
199 static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
200                                         const struct json *params,
201                                         bool new_has_lock);
202 static struct ovsdb_idl_table *
203 ovsdb_idl_table_from_class(const struct ovsdb_idl *,
204                            const struct ovsdb_idl_table_class *);
205 static bool ovsdb_idl_track_is_set(struct ovsdb_idl_table *table);
206
207 /* Creates and returns a connection to database 'remote', which should be in a
208  * form acceptable to jsonrpc_session_open().  The connection will maintain an
209  * in-memory replica of the remote database whose schema is described by
210  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
211  * by ovsdb-idlc.)
212  *
213  * Passes 'retry' to jsonrpc_session_open().  See that function for
214  * documentation.
215  *
216  * If 'monitor_everything_by_default' is true, then everything in the remote
217  * database will be replicated by default.  ovsdb_idl_omit() and
218  * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
219  * monitoring.
220  *
221  * If 'monitor_everything_by_default' is false, then no columns or tables will
222  * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
223  * must be used to choose some columns or tables to replicate.
224  */
225 struct ovsdb_idl *
226 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
227                  bool monitor_everything_by_default, bool retry)
228 {
229     struct ovsdb_idl *idl;
230     uint8_t default_mode;
231     size_t i;
232
233     default_mode = (monitor_everything_by_default
234                     ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
235                     : 0);
236
237     idl = xzalloc(sizeof *idl);
238     idl->class = class;
239     idl->session = jsonrpc_session_open(remote, retry);
240     shash_init(&idl->table_by_name);
241     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
242     for (i = 0; i < class->n_tables; i++) {
243         const struct ovsdb_idl_table_class *tc = &class->tables[i];
244         struct ovsdb_idl_table *table = &idl->tables[i];
245         size_t j;
246
247         shash_add_assert(&idl->table_by_name, tc->name, table);
248         table->class = tc;
249         table->modes = xmalloc(tc->n_columns);
250         memset(table->modes, default_mode, tc->n_columns);
251         table->need_table = false;
252         shash_init(&table->columns);
253         for (j = 0; j < tc->n_columns; j++) {
254             const struct ovsdb_idl_column *column = &tc->columns[j];
255
256             shash_add_assert(&table->columns, column->name, column);
257         }
258         hmap_init(&table->rows);
259         ovs_list_init(&table->track_list);
260         table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
261             = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
262             = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
263         table->idl = idl;
264     }
265
266     idl->state_seqno = UINT_MAX;
267     idl->request_id = NULL;
268     idl->schema = NULL;
269
270     hmap_init(&idl->outstanding_txns);
271
272     return idl;
273 }
274
275 /* Destroys 'idl' and all of the data structures that it manages. */
276 void
277 ovsdb_idl_destroy(struct ovsdb_idl *idl)
278 {
279     if (idl) {
280         size_t i;
281
282         ovs_assert(!idl->txn);
283         ovsdb_idl_clear(idl);
284         jsonrpc_session_close(idl->session);
285
286         for (i = 0; i < idl->class->n_tables; i++) {
287             struct ovsdb_idl_table *table = &idl->tables[i];
288             shash_destroy(&table->columns);
289             hmap_destroy(&table->rows);
290             free(table->modes);
291         }
292         shash_destroy(&idl->table_by_name);
293         free(idl->tables);
294         json_destroy(idl->request_id);
295         free(idl->lock_name);
296         json_destroy(idl->lock_request_id);
297         json_destroy(idl->schema);
298         hmap_destroy(&idl->outstanding_txns);
299         free(idl);
300     }
301 }
302
303 static void
304 ovsdb_idl_clear(struct ovsdb_idl *idl)
305 {
306     bool changed = false;
307     size_t i;
308
309     for (i = 0; i < idl->class->n_tables; i++) {
310         struct ovsdb_idl_table *table = &idl->tables[i];
311         struct ovsdb_idl_row *row, *next_row;
312
313         if (hmap_is_empty(&table->rows)) {
314             continue;
315         }
316
317         changed = true;
318         HMAP_FOR_EACH_SAFE (row, next_row, hmap_node, &table->rows) {
319             struct ovsdb_idl_arc *arc, *next_arc;
320
321             if (!ovsdb_idl_row_is_orphan(row)) {
322                 ovsdb_idl_row_unparse(row);
323             }
324             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
325                 free(arc);
326             }
327             /* No need to do anything with dst_arcs: some node has those arcs
328              * as forward arcs and will destroy them itself. */
329
330             if (!ovs_list_is_empty(&row->track_node)) {
331                 ovs_list_remove(&row->track_node);
332             }
333
334             ovsdb_idl_row_destroy(row);
335         }
336     }
337
338     ovsdb_idl_track_clear(idl);
339
340     if (changed) {
341         idl->change_seqno++;
342     }
343 }
344
345 /* Processes a batch of messages from the database server on 'idl'.  This may
346  * cause the IDL's contents to change.  The client may check for that with
347  * ovsdb_idl_get_seqno(). */
348 void
349 ovsdb_idl_run(struct ovsdb_idl *idl)
350 {
351     int i;
352
353     ovs_assert(!idl->txn);
354     jsonrpc_session_run(idl->session);
355     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
356         struct jsonrpc_msg *msg;
357         unsigned int seqno;
358
359         seqno = jsonrpc_session_get_seqno(idl->session);
360         if (idl->state_seqno != seqno) {
361             idl->state_seqno = seqno;
362             json_destroy(idl->request_id);
363             idl->request_id = NULL;
364             ovsdb_idl_txn_abort_all(idl);
365
366             ovsdb_idl_send_schema_request(idl);
367             idl->state = IDL_S_SCHEMA_REQUESTED;
368             if (idl->lock_name) {
369                 ovsdb_idl_send_lock_request(idl);
370             }
371         }
372
373         msg = jsonrpc_session_recv(idl->session);
374         if (!msg) {
375             break;
376         }
377
378         if (msg->type == JSONRPC_NOTIFY
379             && !strcmp(msg->method, "update2")
380             && msg->params->type == JSON_ARRAY
381             && msg->params->u.array.n == 2
382             && msg->params->u.array.elems[0]->type == JSON_NULL) {
383             /* Database contents changed. */
384             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
385                                    OVSDB_UPDATE2);
386         } else if (msg->type == JSONRPC_REPLY
387                    && idl->request_id
388                    && json_equal(idl->request_id, msg->id)) {
389             json_destroy(idl->request_id);
390             idl->request_id = NULL;
391
392             switch (idl->state) {
393             case IDL_S_SCHEMA_REQUESTED:
394                 /* Reply to our "get_schema" request. */
395                 idl->schema = json_clone(msg->result);
396                 ovsdb_idl_send_monitor2_request(idl);
397                 idl->state = IDL_S_MONITOR2_REQUESTED;
398                 break;
399
400             case IDL_S_MONITOR_REQUESTED:
401             case IDL_S_MONITOR2_REQUESTED:
402                 /* Reply to our "monitor" or "monitor2" request. */
403                 idl->change_seqno++;
404                 ovsdb_idl_clear(idl);
405                 if (idl->state == IDL_S_MONITOR_REQUESTED) {
406                     idl->state = IDL_S_MONITORING;
407                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE);
408                 } else { /* IDL_S_MONITOR2_REQUESTED. */
409                     idl->state = IDL_S_MONITORING2;
410                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE2);
411                 }
412
413                 /* Schema is not useful after monitor request is accepted
414                  * by the server.  */
415                 json_destroy(idl->schema);
416                 idl->schema = NULL;
417                 break;
418
419             case IDL_S_MONITORING:
420             case IDL_S_MONITORING2:
421             case IDL_S_NO_SCHEMA:
422             default:
423                 OVS_NOT_REACHED();
424             }
425         } else if (msg->type == JSONRPC_NOTIFY
426             && !strcmp(msg->method, "update")
427             && msg->params->type == JSON_ARRAY
428             && msg->params->u.array.n == 2
429             && msg->params->u.array.elems[0]->type == JSON_NULL) {
430             /* Database contents changed. */
431             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
432                                    OVSDB_UPDATE);
433         } else if (msg->type == JSONRPC_REPLY
434                    && idl->lock_request_id
435                    && json_equal(idl->lock_request_id, msg->id)) {
436             /* Reply to our "lock" request. */
437             ovsdb_idl_parse_lock_reply(idl, msg->result);
438         } else if (msg->type == JSONRPC_NOTIFY
439                    && !strcmp(msg->method, "locked")) {
440             /* We got our lock. */
441             ovsdb_idl_parse_lock_notify(idl, msg->params, true);
442         } else if (msg->type == JSONRPC_NOTIFY
443                    && !strcmp(msg->method, "stolen")) {
444             /* Someone else stole our lock. */
445             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
446         } else if (msg->type == JSONRPC_ERROR
447                    && idl->state == IDL_S_MONITOR2_REQUESTED
448                    && idl->request_id
449                    && json_equal(idl->request_id, msg->id)) {
450             if (msg->error && !strcmp(json_string(msg->error),
451                                       "unknown method")) {
452                 /* Fall back to using "monitor" method.  */
453                 json_destroy(idl->request_id);
454                 idl->request_id = NULL;
455                 ovsdb_idl_send_monitor_request(idl);
456                 idl->state = IDL_S_MONITOR_REQUESTED;
457             }
458         } else if (msg->type == JSONRPC_ERROR
459                    && idl->state == IDL_S_SCHEMA_REQUESTED
460                    && idl->request_id
461                    && json_equal(idl->request_id, msg->id)) {
462                 json_destroy(idl->request_id);
463                 idl->request_id = NULL;
464                 VLOG_ERR("%s: requested schema not found",
465                          jsonrpc_session_get_name(idl->session));
466                 idl->state = IDL_S_NO_SCHEMA;
467         } else if ((msg->type == JSONRPC_ERROR
468                     || msg->type == JSONRPC_REPLY)
469                    && ovsdb_idl_txn_process_reply(idl, msg)) {
470             /* ovsdb_idl_txn_process_reply() did everything needful. */
471         } else {
472             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
473              * a transaction before we receive the reply, so keep the log level
474              * low. */
475             VLOG_DBG("%s: received unexpected %s message",
476                      jsonrpc_session_get_name(idl->session),
477                      jsonrpc_msg_type_to_string(msg->type));
478         }
479         jsonrpc_msg_destroy(msg);
480     }
481     ovsdb_idl_row_destroy_postprocess(idl);
482 }
483
484 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
485  * do or when activity occurs on a transaction on 'idl'. */
486 void
487 ovsdb_idl_wait(struct ovsdb_idl *idl)
488 {
489     jsonrpc_session_wait(idl->session);
490     jsonrpc_session_recv_wait(idl->session);
491 }
492
493 /* Returns a "sequence number" that represents the state of 'idl'.  When
494  * ovsdb_idl_run() changes the database, the sequence number changes.  The
495  * initial fetch of the entire contents of the remote database is considered to
496  * be one kind of change.  Successfully acquiring a lock, if one has been
497  * configured with ovsdb_idl_set_lock(), is also considered to be a change.
498  *
499  * As long as the sequence number does not change, the client may continue to
500  * use any data structures it obtains from 'idl'.  But when it changes, the
501  * client must not access any of these data structures again, because they
502  * could have freed or reused for other purposes.
503  *
504  * The sequence number can occasionally change even if the database does not.
505  * This happens if the connection to the database drops and reconnects, which
506  * causes the database contents to be reloaded even if they didn't change.  (It
507  * could also happen if the database server sends out a "change" that reflects
508  * what the IDL already thought was in the database.  The database server is
509  * not supposed to do that, but bugs could in theory cause it to do so.) */
510 unsigned int
511 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
512 {
513     return idl->change_seqno;
514 }
515
516 /* Returns true if 'idl' successfully connected to the remote database and
517  * retrieved its contents (even if the connection subsequently dropped and is
518  * in the process of reconnecting).  If so, then 'idl' contains an atomic
519  * snapshot of the database's contents (but it might be arbitrarily old if the
520  * connection dropped).
521  *
522  * Returns false if 'idl' has never connected or retrieved the database's
523  * contents.  If so, 'idl' is empty. */
524 bool
525 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
526 {
527     return ovsdb_idl_get_seqno(idl) != 0;
528 }
529
530 /* Reconfigures 'idl' so that it would reconnect to the database, if
531  * connection was dropped. */
532 void
533 ovsdb_idl_enable_reconnect(struct ovsdb_idl *idl)
534 {
535     jsonrpc_session_enable_reconnect(idl->session);
536 }
537
538 /* Forces 'idl' to drop its connection to the database and reconnect.  In the
539  * meantime, the contents of 'idl' will not change. */
540 void
541 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
542 {
543     jsonrpc_session_force_reconnect(idl->session);
544 }
545
546 /* Some IDL users should only write to write-only columns.  Furthermore,
547  * writing to a column which is not write-only can cause serious performance
548  * degradations for these users.  This function causes 'idl' to reject writes
549  * to columns which are not marked write only using ovsdb_idl_omit_alert(). */
550 void
551 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
552 {
553     idl->verify_write_only = true;
554 }
555
556 /* Returns true if 'idl' is currently connected or trying to connect
557  * and a negative response to a schema request has not been received */
558 bool
559 ovsdb_idl_is_alive(const struct ovsdb_idl *idl)
560 {
561     return jsonrpc_session_is_alive(idl->session) &&
562            idl->state != IDL_S_NO_SCHEMA;
563 }
564
565 /* Returns the last error reported on a connection by 'idl'.  The return value
566  * is 0 only if no connection made by 'idl' has ever encountered an error and
567  * a negative response to a schema request has never been received. See
568  * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value
569  * interpretation. */
570 int
571 ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
572 {
573     int err;
574
575     err = jsonrpc_session_get_last_error(idl->session);
576
577     if (err) {
578         return err;
579     } else if (idl->state == IDL_S_NO_SCHEMA) {
580         return ENOENT;
581     } else {
582         return 0;
583     }
584 }
585 \f
586 static unsigned char *
587 ovsdb_idl_get_mode(struct ovsdb_idl *idl,
588                    const struct ovsdb_idl_column *column)
589 {
590     size_t i;
591
592     ovs_assert(!idl->change_seqno);
593
594     for (i = 0; i < idl->class->n_tables; i++) {
595         const struct ovsdb_idl_table *table = &idl->tables[i];
596         const struct ovsdb_idl_table_class *tc = table->class;
597
598         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
599             return &table->modes[column - tc->columns];
600         }
601     }
602
603     OVS_NOT_REACHED();
604 }
605
606 static void
607 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
608 {
609     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
610         struct ovsdb_idl_table *table;
611
612         table = shash_find_data(&idl->table_by_name,
613                                 base->u.uuid.refTableName);
614         if (table) {
615             table->need_table = true;
616         } else {
617             VLOG_WARN("%s IDL class missing referenced table %s",
618                       idl->class->database, base->u.uuid.refTableName);
619         }
620     }
621 }
622
623 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
624  * ensures that any tables referenced by 'column' will be replicated, even if
625  * no columns in that table are selected for replication (see
626  * ovsdb_idl_add_table() for more information).
627  *
628  * This function is only useful if 'monitor_everything_by_default' was false in
629  * the call to ovsdb_idl_create().  This function should be called between
630  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
631  */
632 void
633 ovsdb_idl_add_column(struct ovsdb_idl *idl,
634                      const struct ovsdb_idl_column *column)
635 {
636     *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
637     add_ref_table(idl, &column->type.key);
638     add_ref_table(idl, &column->type.value);
639 }
640
641 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
642  * no columns are selected for replication. Just the necessary data for table
643  * references will be replicated (the UUID of the rows, for instance), any
644  * columns not selected for replication will remain unreplicated.
645  * This can be useful because it allows 'idl' to keep track of what rows in the
646  * table actually exist, which in turn allows columns that reference the table
647  * to have accurate contents. (The IDL presents the database with references to
648  * rows that do not exist removed.)
649  *
650  * This function is only useful if 'monitor_everything_by_default' was false in
651  * the call to ovsdb_idl_create().  This function should be called between
652  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
653  */
654 void
655 ovsdb_idl_add_table(struct ovsdb_idl *idl,
656                     const struct ovsdb_idl_table_class *tc)
657 {
658     size_t i;
659
660     for (i = 0; i < idl->class->n_tables; i++) {
661         struct ovsdb_idl_table *table = &idl->tables[i];
662
663         if (table->class == tc) {
664             table->need_table = true;
665             return;
666         }
667     }
668
669     OVS_NOT_REACHED();
670 }
671
672 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
673  *
674  * This function should be called between ovsdb_idl_create() and the first call
675  * to ovsdb_idl_run().
676  */
677 void
678 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
679                      const struct ovsdb_idl_column *column)
680 {
681     *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
682 }
683
684 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
685  * OVSDB_IDL_MONITOR for details.
686  *
687  * This function should be called between ovsdb_idl_create() and the first call
688  * to ovsdb_idl_run().
689  */
690 void
691 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
692 {
693     *ovsdb_idl_get_mode(idl, column) = 0;
694 }
695
696 /* Returns the most recent IDL change sequence number that caused a
697  * insert, modify or delete update to the table with class 'table_class'.
698  */
699 unsigned int
700 ovsdb_idl_table_get_seqno(const struct ovsdb_idl *idl,
701                           const struct ovsdb_idl_table_class *table_class)
702 {
703     struct ovsdb_idl_table *table
704         = ovsdb_idl_table_from_class(idl, table_class);
705     unsigned int max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_INSERT];
706
707     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]) {
708         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY];
709     }
710     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_DELETE]) {
711         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_DELETE];
712     }
713     return max_seqno;
714 }
715
716 /* For each row that contains tracked columns, IDL stores the most
717  * recent IDL change sequence numbers associateed with insert, modify
718  * and delete updates to the table.
719  */
720 unsigned int
721 ovsdb_idl_row_get_seqno(const struct ovsdb_idl_row *row,
722                         enum ovsdb_idl_change change)
723 {
724     return row->change_seqno[change];
725 }
726
727 /* Turns on OVSDB_IDL_TRACK for 'column' in 'idl', ensuring that
728  * all rows whose 'column' is modified are traced. Similarly, insert
729  * or delete of rows having 'column' are tracked. Clients are able
730  * to retrive the tracked rows with the ovsdb_idl_track_get_*()
731  * functions.
732  *
733  * This function should be called between ovsdb_idl_create() and
734  * the first call to ovsdb_idl_run(). The column to be tracked
735  * should have OVSDB_IDL_ALERT turned on.
736  */
737 void
738 ovsdb_idl_track_add_column(struct ovsdb_idl *idl,
739                            const struct ovsdb_idl_column *column)
740 {
741     if (!(*ovsdb_idl_get_mode(idl, column) & OVSDB_IDL_ALERT)) {
742         ovsdb_idl_add_column(idl, column);
743     }
744     *ovsdb_idl_get_mode(idl, column) |= OVSDB_IDL_TRACK;
745 }
746
747 void
748 ovsdb_idl_track_add_all(struct ovsdb_idl *idl)
749 {
750     size_t i, j;
751
752     for (i = 0; i < idl->class->n_tables; i++) {
753         const struct ovsdb_idl_table_class *tc = &idl->class->tables[i];
754
755         for (j = 0; j < tc->n_columns; j++) {
756             const struct ovsdb_idl_column *column = &tc->columns[j];
757             ovsdb_idl_track_add_column(idl, column);
758         }
759     }
760 }
761
762 /* Returns true if 'table' has any tracked column. */
763 static bool
764 ovsdb_idl_track_is_set(struct ovsdb_idl_table *table)
765 {
766     size_t i;
767
768     for (i = 0; i < table->class->n_columns; i++) {
769         if (table->modes[i] & OVSDB_IDL_TRACK) {
770             return true;
771         }
772     }
773    return false;
774 }
775
776 /* Returns the first tracked row in table with class 'table_class'
777  * for the specified 'idl'. Returns NULL if there are no tracked rows */
778 const struct ovsdb_idl_row *
779 ovsdb_idl_track_get_first(const struct ovsdb_idl *idl,
780                           const struct ovsdb_idl_table_class *table_class)
781 {
782     struct ovsdb_idl_table *table
783         = ovsdb_idl_table_from_class(idl, table_class);
784
785     if (!ovs_list_is_empty(&table->track_list)) {
786         return CONTAINER_OF(ovs_list_front(&table->track_list), struct ovsdb_idl_row, track_node);
787     }
788     return NULL;
789 }
790
791 /* Returns the next tracked row in table after the specified 'row'
792  * (in no particular order). Returns NULL if there are no tracked rows */
793 const struct ovsdb_idl_row *
794 ovsdb_idl_track_get_next(const struct ovsdb_idl_row *row)
795 {
796     if (row->track_node.next != &row->table->track_list) {
797         return CONTAINER_OF(row->track_node.next, struct ovsdb_idl_row, track_node);
798     }
799
800     return NULL;
801 }
802
803 /* Returns true if a tracked 'column' in 'row' was updated by IDL, false
804  * otherwise. The tracking data is cleared by ovsdb_idl_track_clear()
805  *
806  * Function returns false if 'column' is not tracked (see
807  * ovsdb_idl_track_add_column()).
808  */
809 bool
810 ovsdb_idl_track_is_updated(const struct ovsdb_idl_row *row,
811                            const struct ovsdb_idl_column *column)
812 {
813     const struct ovsdb_idl_table_class *class;
814     size_t column_idx;
815
816     class = row->table->class;
817     column_idx = column - class->columns;
818
819     if (row->updated && bitmap_is_set(row->updated, column_idx)) {
820         return true;
821     } else {
822         return false;
823     }
824 }
825
826 /* Flushes the tracked rows. Client calls this function after calling
827  * ovsdb_idl_run() and read all tracked rows with the ovsdb_idl_track_get_*()
828  * functions. This is usually done at the end of the client's processing
829  * loop when it is ready to do ovsdb_idl_run() again.
830  */
831 void
832 ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
833 {
834     size_t i;
835
836     for (i = 0; i < idl->class->n_tables; i++) {
837         struct ovsdb_idl_table *table = &idl->tables[i];
838
839         if (!ovs_list_is_empty(&table->track_list)) {
840             struct ovsdb_idl_row *row, *next;
841
842             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
843                 if (row->updated) {
844                     free(row->updated);
845                     row->updated = NULL;
846                 }
847                 ovs_list_remove(&row->track_node);
848                 ovs_list_init(&row->track_node);
849                 if (ovsdb_idl_row_is_orphan(row)) {
850                     ovsdb_idl_row_clear_old(row);
851                     free(row);
852                 }
853             }
854         }
855     }
856 }
857
858 \f
859 static void
860 ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
861 {
862     struct jsonrpc_msg *msg;
863
864     json_destroy(idl->request_id);
865     msg = jsonrpc_create_request(
866         "get_schema",
867         json_array_create_1(json_string_create(idl->class->database)),
868         &idl->request_id);
869     jsonrpc_session_send(idl->session, msg);
870 }
871
872 static void
873 log_error(struct ovsdb_error *error)
874 {
875     char *s = ovsdb_error_to_string(error);
876     VLOG_WARN("error parsing database schema: %s", s);
877     free(s);
878     ovsdb_error_destroy(error);
879 }
880
881 /* Frees 'schema', which is in the format returned by parse_schema(). */
882 static void
883 free_schema(struct shash *schema)
884 {
885     if (schema) {
886         struct shash_node *node, *next;
887
888         SHASH_FOR_EACH_SAFE (node, next, schema) {
889             struct sset *sset = node->data;
890             sset_destroy(sset);
891             free(sset);
892             shash_delete(schema, node);
893         }
894         shash_destroy(schema);
895         free(schema);
896     }
897 }
898
899 /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
900  * 7047, to obtain the names of its rows and columns.  If successful, returns
901  * an shash whose keys are table names and whose values are ssets, where each
902  * sset contains the names of its table's columns.  On failure (due to a parse
903  * error), returns NULL.
904  *
905  * It would also be possible to use the general-purpose OVSDB schema parser in
906  * ovsdb-server, but that's overkill, possibly too strict for the current use
907  * case, and would require restructuring ovsdb-server to separate the schema
908  * code from the rest. */
909 static struct shash *
910 parse_schema(const struct json *schema_json)
911 {
912     struct ovsdb_parser parser;
913     const struct json *tables_json;
914     struct ovsdb_error *error;
915     struct shash_node *node;
916     struct shash *schema;
917
918     ovsdb_parser_init(&parser, schema_json, "database schema");
919     tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
920     error = ovsdb_parser_destroy(&parser);
921     if (error) {
922         log_error(error);
923         return NULL;
924     }
925
926     schema = xmalloc(sizeof *schema);
927     shash_init(schema);
928     SHASH_FOR_EACH (node, json_object(tables_json)) {
929         const char *table_name = node->name;
930         const struct json *json = node->data;
931         const struct json *columns_json;
932
933         ovsdb_parser_init(&parser, json, "table schema for table %s",
934                           table_name);
935         columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
936         error = ovsdb_parser_destroy(&parser);
937         if (error) {
938             log_error(error);
939             free_schema(schema);
940             return NULL;
941         }
942
943         struct sset *columns = xmalloc(sizeof *columns);
944         sset_init(columns);
945
946         struct shash_node *node2;
947         SHASH_FOR_EACH (node2, json_object(columns_json)) {
948             const char *column_name = node2->name;
949             sset_add(columns, column_name);
950         }
951         shash_add(schema, table_name, columns);
952     }
953     return schema;
954 }
955
956 static void
957 ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
958                                  const char *method)
959 {
960     struct shash *schema;
961     struct json *monitor_requests;
962     struct jsonrpc_msg *msg;
963     size_t i;
964
965     schema = parse_schema(idl->schema);
966     monitor_requests = json_object_create();
967     for (i = 0; i < idl->class->n_tables; i++) {
968         const struct ovsdb_idl_table *table = &idl->tables[i];
969         const struct ovsdb_idl_table_class *tc = table->class;
970         struct json *monitor_request, *columns;
971         const struct sset *table_schema;
972         size_t j;
973
974         table_schema = (schema
975                         ? shash_find_data(schema, table->class->name)
976                         : NULL);
977
978         columns = table->need_table ? json_array_create_empty() : NULL;
979         for (j = 0; j < tc->n_columns; j++) {
980             const struct ovsdb_idl_column *column = &tc->columns[j];
981             if (table->modes[j] & OVSDB_IDL_MONITOR) {
982                 if (table_schema
983                     && !sset_contains(table_schema, column->name)) {
984                     VLOG_WARN("%s table in %s database lacks %s column "
985                               "(database needs upgrade?)",
986                               table->class->name, idl->class->database,
987                               column->name);
988                     continue;
989                 }
990                 if (!columns) {
991                     columns = json_array_create_empty();
992                 }
993                 json_array_add(columns, json_string_create(column->name));
994             }
995         }
996
997         if (columns) {
998             if (schema && !table_schema) {
999                 VLOG_WARN("%s database lacks %s table "
1000                           "(database needs upgrade?)",
1001                           idl->class->database, table->class->name);
1002                 json_destroy(columns);
1003                 continue;
1004             }
1005
1006             monitor_request = json_object_create();
1007             json_object_put(monitor_request, "columns", columns);
1008             json_object_put(monitor_requests, tc->name, monitor_request);
1009         }
1010     }
1011     free_schema(schema);
1012
1013     json_destroy(idl->request_id);
1014     msg = jsonrpc_create_request(
1015         method,
1016         json_array_create_3(json_string_create(idl->class->database),
1017                             json_null_create(), monitor_requests),
1018         &idl->request_id);
1019     jsonrpc_session_send(idl->session, msg);
1020 }
1021
1022 static void
1023 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
1024 {
1025     ovsdb_idl_send_monitor_request__(idl, "monitor");
1026 }
1027
1028 static void
1029 log_parse_update_error(struct ovsdb_error *error)
1030 {
1031         if (!VLOG_DROP_WARN(&syntax_rl)) {
1032             char *s = ovsdb_error_to_string(error);
1033             VLOG_WARN_RL(&syntax_rl, "%s", s);
1034             free(s);
1035         }
1036         ovsdb_error_destroy(error);
1037 }
1038
1039 static void
1040 ovsdb_idl_send_monitor2_request(struct ovsdb_idl *idl)
1041 {
1042     ovsdb_idl_send_monitor_request__(idl, "monitor2");
1043 }
1044
1045 static void
1046 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates,
1047                        enum ovsdb_update_version version)
1048 {
1049     struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates,
1050                                                          version);
1051     if (error) {
1052         log_parse_update_error(error);
1053     }
1054 }
1055
1056 static struct ovsdb_error *
1057 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
1058                          const struct json *table_updates,
1059                          enum ovsdb_update_version version)
1060 {
1061     const struct shash_node *tables_node;
1062     const char *table_updates_name = table_updates_names[version];
1063     const char *table_update_name = table_update_names[version];
1064     const char *row_update_name = row_update_names[version];
1065
1066     if (table_updates->type != JSON_OBJECT) {
1067         return ovsdb_syntax_error(table_updates, NULL,
1068                                   "<%s> is not an object",
1069                                   table_updates_name);
1070     }
1071
1072     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
1073         const struct json *table_update = tables_node->data;
1074         const struct shash_node *table_node;
1075         struct ovsdb_idl_table *table;
1076
1077         table = shash_find_data(&idl->table_by_name, tables_node->name);
1078         if (!table) {
1079             return ovsdb_syntax_error(
1080                 table_updates, NULL,
1081                 "<%s> includes unknown table \"%s\"",
1082                 table_updates_name,
1083                 tables_node->name);
1084         }
1085
1086         if (table_update->type != JSON_OBJECT) {
1087             return ovsdb_syntax_error(table_update, NULL,
1088                                       "<%s> for table \"%s\" is "
1089                                       "not an object",
1090                                       table_update_name,
1091                                       table->class->name);
1092         }
1093         SHASH_FOR_EACH (table_node, json_object(table_update)) {
1094             const struct json *row_update = table_node->data;
1095             const struct json *old_json, *new_json;
1096             struct uuid uuid;
1097
1098             if (!uuid_from_string(&uuid, table_node->name)) {
1099                 return ovsdb_syntax_error(table_update, NULL,
1100                                           "<%s> for table \"%s\" "
1101                                           "contains bad UUID "
1102                                           "\"%s\" as member name",
1103                                           table_update_name,
1104                                           table->class->name,
1105                                           table_node->name);
1106             }
1107             if (row_update->type != JSON_OBJECT) {
1108                 return ovsdb_syntax_error(row_update, NULL,
1109                                           "<%s> for table \"%s\" "
1110                                           "contains <%s> for %s that "
1111                                           "is not an object",
1112                                           table_update_name,
1113                                           table->class->name,
1114                                           row_update_name,
1115                                           table_node->name);
1116             }
1117
1118             switch(version) {
1119             case OVSDB_UPDATE:
1120                 old_json = shash_find_data(json_object(row_update), "old");
1121                 new_json = shash_find_data(json_object(row_update), "new");
1122                 if (old_json && old_json->type != JSON_OBJECT) {
1123                     return ovsdb_syntax_error(old_json, NULL,
1124                                               "\"old\" <row> is not object");
1125                 } else if (new_json && new_json->type != JSON_OBJECT) {
1126                     return ovsdb_syntax_error(new_json, NULL,
1127                                               "\"new\" <row> is not object");
1128                 } else if ((old_json != NULL) + (new_json != NULL)
1129                            != shash_count(json_object(row_update))) {
1130                     return ovsdb_syntax_error(row_update, NULL,
1131                                               "<row-update> contains "
1132                                               "unexpected member");
1133                 } else if (!old_json && !new_json) {
1134                     return ovsdb_syntax_error(row_update, NULL,
1135                                               "<row-update> missing \"old\" "
1136                                               "and \"new\" members");
1137                 }
1138
1139                 if (ovsdb_idl_process_update(table, &uuid, old_json,
1140                                              new_json)) {
1141                     idl->change_seqno++;
1142                 }
1143                 break;
1144
1145             case OVSDB_UPDATE2: {
1146                 const char *ops[] = {"modify", "insert", "delete", "initial"};
1147                 const char *operation;
1148                 const struct json *row;
1149                 int i;
1150
1151                 for (i = 0; i < ARRAY_SIZE(ops); i++) {
1152                     operation = ops[i];
1153                     row = shash_find_data(json_object(row_update), operation);
1154
1155                     if (row)  {
1156                         if (ovsdb_idl_process_update2(table, &uuid, operation,
1157                                                       row)) {
1158                             idl->change_seqno++;
1159                         }
1160                         break;
1161                     }
1162                 }
1163
1164                 /* row_update2 should contain one of the objects */
1165                 if (i == ARRAY_SIZE(ops)) {
1166                     return ovsdb_syntax_error(row_update, NULL,
1167                                               "<row_update2> includes unknown "
1168                                               "object");
1169                 }
1170                 break;
1171             }
1172
1173             default:
1174                 OVS_NOT_REACHED();
1175             }
1176         }
1177     }
1178
1179     return NULL;
1180 }
1181
1182 static struct ovsdb_idl_row *
1183 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
1184 {
1185     struct ovsdb_idl_row *row;
1186
1187     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &table->rows) {
1188         if (uuid_equals(&row->uuid, uuid)) {
1189             return row;
1190         }
1191     }
1192     return NULL;
1193 }
1194
1195 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1196  * otherwise. */
1197 static bool
1198 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
1199                          const struct uuid *uuid, const struct json *old,
1200                          const struct json *new)
1201 {
1202     struct ovsdb_idl_row *row;
1203
1204     row = ovsdb_idl_get_row(table, uuid);
1205     if (!new) {
1206         /* Delete row. */
1207         if (row && !ovsdb_idl_row_is_orphan(row)) {
1208             /* XXX perhaps we should check the 'old' values? */
1209             ovsdb_idl_delete_row(row);
1210         } else {
1211             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1212                          "from table %s",
1213                          UUID_ARGS(uuid), table->class->name);
1214             return false;
1215         }
1216     } else if (!old) {
1217         /* Insert row. */
1218         if (!row) {
1219             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1220         } else if (ovsdb_idl_row_is_orphan(row)) {
1221             ovsdb_idl_insert_row(row, new);
1222         } else {
1223             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1224                          "table %s", UUID_ARGS(uuid), table->class->name);
1225             return ovsdb_idl_modify_row(row, new);
1226         }
1227     } else {
1228         /* Modify row. */
1229         if (row) {
1230             /* XXX perhaps we should check the 'old' values? */
1231             if (!ovsdb_idl_row_is_orphan(row)) {
1232                 return ovsdb_idl_modify_row(row, new);
1233             } else {
1234                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1235                              "referenced row "UUID_FMT" in table %s",
1236                              UUID_ARGS(uuid), table->class->name);
1237                 ovsdb_idl_insert_row(row, new);
1238             }
1239         } else {
1240             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1241                          "in table %s", UUID_ARGS(uuid), table->class->name);
1242             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1243         }
1244     }
1245
1246     return true;
1247 }
1248
1249 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1250  * otherwise. */
1251 static bool
1252 ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
1253                           const struct uuid *uuid,
1254                           const char *operation,
1255                           const struct json *json_row)
1256 {
1257     struct ovsdb_idl_row *row;
1258
1259     row = ovsdb_idl_get_row(table, uuid);
1260     if (!strcmp(operation, "delete")) {
1261         /* Delete row. */
1262         if (row && !ovsdb_idl_row_is_orphan(row)) {
1263             ovsdb_idl_delete_row(row);
1264         } else {
1265             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1266                          "from table %s",
1267                          UUID_ARGS(uuid), table->class->name);
1268             return false;
1269         }
1270     } else if (!strcmp(operation, "insert") || !strcmp(operation, "initial")) {
1271         /* Insert row. */
1272         if (!row) {
1273             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), json_row);
1274         } else if (ovsdb_idl_row_is_orphan(row)) {
1275             ovsdb_idl_insert_row(row, json_row);
1276         } else {
1277             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1278                          "table %s", UUID_ARGS(uuid), table->class->name);
1279             ovsdb_idl_delete_row(row);
1280             ovsdb_idl_insert_row(row, json_row);
1281         }
1282     } else if (!strcmp(operation, "modify")) {
1283         /* Modify row. */
1284         if (row) {
1285             if (!ovsdb_idl_row_is_orphan(row)) {
1286                 return ovsdb_idl_modify_row_by_diff(row, json_row);
1287             } else {
1288                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1289                              "referenced row "UUID_FMT" in table %s",
1290                              UUID_ARGS(uuid), table->class->name);
1291                 return false;
1292             }
1293         } else {
1294             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1295                          "in table %s", UUID_ARGS(uuid), table->class->name);
1296             return false;
1297         }
1298     } else {
1299             VLOG_WARN_RL(&semantic_rl, "unknown operation %s to "
1300                          "table %s", operation, table->class->name);
1301             return false;
1302     }
1303
1304     return true;
1305 }
1306
1307 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1308  * otherwise.
1309  *
1310  * Change 'row' either with the content of 'row_json' or by apply 'diff'.
1311  * Caller needs to provide either valid 'row_json' or 'diff', but not
1312  * both.  */
1313 static bool
1314 ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
1315                        const struct json *diff_json,
1316                        enum ovsdb_idl_change change)
1317 {
1318     struct ovsdb_idl_table *table = row->table;
1319     const struct ovsdb_idl_table_class *class = table->class;
1320     struct shash_node *node;
1321     bool changed = false;
1322     bool apply_diff = diff_json != NULL;
1323     const struct json *json = apply_diff ? diff_json : row_json;
1324
1325     SHASH_FOR_EACH (node, json_object(json)) {
1326         const char *column_name = node->name;
1327         const struct ovsdb_idl_column *column;
1328         struct ovsdb_datum datum;
1329         struct ovsdb_error *error;
1330         unsigned int column_idx;
1331         struct ovsdb_datum *old;
1332
1333         column = shash_find_data(&table->columns, column_name);
1334         if (!column) {
1335             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
1336                          column_name, UUID_ARGS(&row->uuid));
1337             continue;
1338         }
1339
1340         column_idx = column - table->class->columns;
1341         old = &row->old[column_idx];
1342
1343         error = NULL;
1344         if (apply_diff) {
1345             struct ovsdb_datum diff;
1346
1347             ovs_assert(!row_json);
1348             error = ovsdb_transient_datum_from_json(&diff, &column->type,
1349                                                     node->data);
1350             if (!error) {
1351                 error = ovsdb_datum_apply_diff(&datum, old, &diff,
1352                                                &column->type);
1353                 ovsdb_datum_destroy(&diff, &column->type);
1354             }
1355         } else {
1356             ovs_assert(!diff_json);
1357             error = ovsdb_datum_from_json(&datum, &column->type, node->data,
1358                                           NULL);
1359         }
1360
1361         if (!error) {
1362             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
1363                 ovsdb_datum_swap(old, &datum);
1364                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
1365                     changed = true;
1366                     row->change_seqno[change]
1367                         = row->table->change_seqno[change]
1368                         = row->table->idl->change_seqno + 1;
1369                     if (table->modes[column_idx] & OVSDB_IDL_TRACK) {
1370                         if (!ovs_list_is_empty(&row->track_node)) {
1371                             ovs_list_remove(&row->track_node);
1372                         }
1373                         ovs_list_push_back(&row->table->track_list,
1374                                        &row->track_node);
1375                         if (!row->updated) {
1376                             row->updated = bitmap_allocate(class->n_columns);
1377                         }
1378                         bitmap_set1(row->updated, column_idx);
1379                     }
1380                 }
1381             } else {
1382                 /* Didn't really change but the OVSDB monitor protocol always
1383                  * includes every value in a row. */
1384             }
1385
1386             ovsdb_datum_destroy(&datum, &column->type);
1387         } else {
1388             char *s = ovsdb_error_to_string(error);
1389             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
1390                          " in table %s: %s", column_name,
1391                          UUID_ARGS(&row->uuid), table->class->name, s);
1392             free(s);
1393             ovsdb_error_destroy(error);
1394         }
1395     }
1396     return changed;
1397 }
1398
1399 static bool
1400 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json,
1401                      enum ovsdb_idl_change change)
1402 {
1403     return ovsdb_idl_row_change__(row, row_json, NULL, change);
1404 }
1405
1406 static bool
1407 ovsdb_idl_row_apply_diff(struct ovsdb_idl_row *row,
1408                          const struct json *diff_json,
1409                          enum ovsdb_idl_change change)
1410 {
1411     return ovsdb_idl_row_change__(row, NULL, diff_json, change);
1412 }
1413
1414 /* When a row A refers to row B through a column with a "refTable" constraint,
1415  * but row B does not exist, row B is called an "orphan row".  Orphan rows
1416  * should not persist, because the database enforces referential integrity, but
1417  * they can appear transiently as changes from the database are received (the
1418  * database doesn't try to topologically sort them and circular references mean
1419  * it isn't always possible anyhow).
1420  *
1421  * This function returns true if 'row' is an orphan row, otherwise false.
1422  */
1423 static bool
1424 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
1425 {
1426     return !row->old && !row->new;
1427 }
1428
1429 /* Returns true if 'row' is conceptually part of the database as modified by
1430  * the current transaction (if any), false otherwise.
1431  *
1432  * This function will return true if 'row' is not an orphan (see the comment on
1433  * ovsdb_idl_row_is_orphan()) and:
1434  *
1435  *   - 'row' exists in the database and has not been deleted within the
1436  *     current transaction (if any).
1437  *
1438  *   - 'row' was inserted within the current transaction and has not been
1439  *     deleted.  (In the latter case you should not have passed 'row' in at
1440  *     all, because ovsdb_idl_txn_delete() freed it.)
1441  *
1442  * This function will return false if 'row' is an orphan or if 'row' was
1443  * deleted within the current transaction.
1444  */
1445 static bool
1446 ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
1447 {
1448     return row->new != NULL;
1449 }
1450
1451 static void
1452 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
1453 {
1454     const struct ovsdb_idl_table_class *class = row->table->class;
1455     size_t i;
1456
1457     for (i = 0; i < class->n_columns; i++) {
1458         const struct ovsdb_idl_column *c = &class->columns[i];
1459         (c->parse)(row, &row->old[i]);
1460     }
1461 }
1462
1463 static void
1464 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
1465 {
1466     const struct ovsdb_idl_table_class *class = row->table->class;
1467     size_t i;
1468
1469     for (i = 0; i < class->n_columns; i++) {
1470         const struct ovsdb_idl_column *c = &class->columns[i];
1471         (c->unparse)(row);
1472     }
1473 }
1474
1475 static void
1476 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
1477 {
1478     ovs_assert(row->old == row->new);
1479     if (!ovsdb_idl_row_is_orphan(row)) {
1480         const struct ovsdb_idl_table_class *class = row->table->class;
1481         size_t i;
1482
1483         for (i = 0; i < class->n_columns; i++) {
1484             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
1485         }
1486         free(row->old);
1487         row->old = row->new = NULL;
1488     }
1489 }
1490
1491 static void
1492 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
1493 {
1494     if (row->old != row->new) {
1495         if (row->new) {
1496             const struct ovsdb_idl_table_class *class = row->table->class;
1497             size_t i;
1498
1499             if (row->written) {
1500                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
1501                     ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
1502                 }
1503             }
1504             free(row->new);
1505             free(row->written);
1506             row->written = NULL;
1507         }
1508         row->new = row->old;
1509     }
1510 }
1511
1512 static void
1513 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
1514 {
1515     struct ovsdb_idl_arc *arc, *next;
1516
1517     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
1518      * that this causes to be unreferenced, if tracking is not enabled.
1519      * If tracking is enabled, orphaned nodes are removed from hmap but not
1520      * freed.
1521      */
1522     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
1523         ovs_list_remove(&arc->dst_node);
1524         if (destroy_dsts
1525             && ovsdb_idl_row_is_orphan(arc->dst)
1526             && ovs_list_is_empty(&arc->dst->dst_arcs)) {
1527             ovsdb_idl_row_destroy(arc->dst);
1528         }
1529         free(arc);
1530     }
1531     ovs_list_init(&row->src_arcs);
1532 }
1533
1534 /* Force nodes that reference 'row' to reparse. */
1535 static void
1536 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
1537 {
1538     struct ovsdb_idl_arc *arc, *next;
1539
1540     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
1541      * 'arc', so we need to use the "safe" variant of list traversal.  However,
1542      * calling an ovsdb_idl_column's 'parse' function will add an arc
1543      * equivalent to 'arc' to row->arcs.  That could be a problem for
1544      * traversal, but it adds it at the beginning of the list to prevent us
1545      * from stumbling upon it again.
1546      *
1547      * (If duplicate arcs were possible then we would need to make sure that
1548      * 'next' didn't also point into 'arc''s destination, but we forbid
1549      * duplicate arcs.) */
1550     LIST_FOR_EACH_SAFE (arc, next, dst_node, &row->dst_arcs) {
1551         struct ovsdb_idl_row *ref = arc->src;
1552
1553         ovsdb_idl_row_unparse(ref);
1554         ovsdb_idl_row_clear_arcs(ref, false);
1555         ovsdb_idl_row_parse(ref);
1556     }
1557 }
1558
1559 static struct ovsdb_idl_row *
1560 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
1561 {
1562     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
1563     class->row_init(row);
1564     ovs_list_init(&row->src_arcs);
1565     ovs_list_init(&row->dst_arcs);
1566     hmap_node_nullify(&row->txn_node);
1567     ovs_list_init(&row->track_node);
1568     return row;
1569 }
1570
1571 static struct ovsdb_idl_row *
1572 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
1573 {
1574     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
1575     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
1576     row->uuid = *uuid;
1577     row->table = table;
1578     return row;
1579 }
1580
1581 static void
1582 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
1583 {
1584     if (row) {
1585         ovsdb_idl_row_clear_old(row);
1586         hmap_remove(&row->table->rows, &row->hmap_node);
1587         if (ovsdb_idl_track_is_set(row->table)) {
1588             row->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1589                 = row->table->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1590                 = row->table->idl->change_seqno + 1;
1591         }
1592         if (ovs_list_is_empty(&row->track_node)) {
1593             ovs_list_push_back(&row->table->track_list, &row->track_node);
1594         }
1595     }
1596 }
1597
1598 static void
1599 ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
1600 {
1601     size_t i;
1602
1603     for (i = 0; i < idl->class->n_tables; i++) {
1604         struct ovsdb_idl_table *table = &idl->tables[i];
1605
1606         if (!ovs_list_is_empty(&table->track_list)) {
1607             struct ovsdb_idl_row *row, *next;
1608
1609             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
1610                 if (!ovsdb_idl_track_is_set(row->table)) {
1611                     ovs_list_remove(&row->track_node);
1612                     free(row);
1613                 }
1614             }
1615         }
1616     }
1617 }
1618
1619 static void
1620 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
1621 {
1622     const struct ovsdb_idl_table_class *class = row->table->class;
1623     size_t i;
1624
1625     ovs_assert(!row->old && !row->new);
1626     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
1627     for (i = 0; i < class->n_columns; i++) {
1628         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
1629     }
1630     ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_INSERT);
1631     ovsdb_idl_row_parse(row);
1632
1633     ovsdb_idl_row_reparse_backrefs(row);
1634 }
1635
1636 static void
1637 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
1638 {
1639     ovsdb_idl_row_unparse(row);
1640     ovsdb_idl_row_clear_arcs(row, true);
1641     ovsdb_idl_row_clear_old(row);
1642     if (ovs_list_is_empty(&row->dst_arcs)) {
1643         ovsdb_idl_row_destroy(row);
1644     } else {
1645         ovsdb_idl_row_reparse_backrefs(row);
1646     }
1647 }
1648
1649 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1650  * otherwise. */
1651 static bool
1652 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
1653 {
1654     bool changed;
1655
1656     ovsdb_idl_row_unparse(row);
1657     ovsdb_idl_row_clear_arcs(row, true);
1658     changed = ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_MODIFY);
1659     ovsdb_idl_row_parse(row);
1660
1661     return changed;
1662 }
1663
1664 static bool
1665 ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *row,
1666                              const struct json *diff_json)
1667 {
1668     bool changed;
1669
1670     ovsdb_idl_row_unparse(row);
1671     ovsdb_idl_row_clear_arcs(row, true);
1672     changed = ovsdb_idl_row_apply_diff(row, diff_json,
1673                                        OVSDB_IDL_CHANGE_MODIFY);
1674     ovsdb_idl_row_parse(row);
1675
1676     return changed;
1677 }
1678
1679 static bool
1680 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
1681 {
1682     const struct ovsdb_idl_arc *arc;
1683
1684     /* No self-arcs. */
1685     if (src == dst) {
1686         return false;
1687     }
1688
1689     /* No duplicate arcs.
1690      *
1691      * We only need to test whether the first arc in dst->dst_arcs originates
1692      * at 'src', since we add all of the arcs from a given source in a clump
1693      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
1694      * added at the front of the dst_arcs list. */
1695     if (ovs_list_is_empty(&dst->dst_arcs)) {
1696         return true;
1697     }
1698     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
1699     return arc->src != src;
1700 }
1701
1702 static struct ovsdb_idl_table *
1703 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
1704                            const struct ovsdb_idl_table_class *table_class)
1705 {
1706     return &idl->tables[table_class - idl->class->tables];
1707 }
1708
1709 /* Called by ovsdb-idlc generated code. */
1710 struct ovsdb_idl_row *
1711 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
1712                       struct ovsdb_idl_table_class *dst_table_class,
1713                       const struct uuid *dst_uuid)
1714 {
1715     struct ovsdb_idl *idl = src->table->idl;
1716     struct ovsdb_idl_table *dst_table;
1717     struct ovsdb_idl_arc *arc;
1718     struct ovsdb_idl_row *dst;
1719
1720     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
1721     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
1722     if (idl->txn) {
1723         /* We're being called from ovsdb_idl_txn_write().  We must not update
1724          * any arcs, because the transaction will be backed out at commit or
1725          * abort time and we don't want our graph screwed up.
1726          *
1727          * Just return the destination row, if there is one and it has not been
1728          * deleted. */
1729         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
1730             return dst;
1731         }
1732         return NULL;
1733     } else {
1734         /* We're being called from some other context.  Update the graph. */
1735         if (!dst) {
1736             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
1737         }
1738
1739         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
1740         if (may_add_arc(src, dst)) {
1741             /* The arc *must* be added at the front of the dst_arcs list.  See
1742              * ovsdb_idl_row_reparse_backrefs() for details. */
1743             arc = xmalloc(sizeof *arc);
1744             ovs_list_push_front(&src->src_arcs, &arc->src_node);
1745             ovs_list_push_front(&dst->dst_arcs, &arc->dst_node);
1746             arc->src = src;
1747             arc->dst = dst;
1748         }
1749
1750         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
1751     }
1752 }
1753
1754 /* Searches 'tc''s table in 'idl' for a row with UUID 'uuid'.  Returns a
1755  * pointer to the row if there is one, otherwise a null pointer.  */
1756 const struct ovsdb_idl_row *
1757 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
1758                            const struct ovsdb_idl_table_class *tc,
1759                            const struct uuid *uuid)
1760 {
1761     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
1762 }
1763
1764 static struct ovsdb_idl_row *
1765 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
1766 {
1767     for (; node; node = hmap_next(&table->rows, node)) {
1768         struct ovsdb_idl_row *row;
1769
1770         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
1771         if (ovsdb_idl_row_exists(row)) {
1772             return row;
1773         }
1774     }
1775     return NULL;
1776 }
1777
1778 /* Returns a row in 'table_class''s table in 'idl', or a null pointer if that
1779  * table is empty.
1780  *
1781  * Database tables are internally maintained as hash tables, so adding or
1782  * removing rows while traversing the same table can cause some rows to be
1783  * visited twice or not at apply. */
1784 const struct ovsdb_idl_row *
1785 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
1786                     const struct ovsdb_idl_table_class *table_class)
1787 {
1788     struct ovsdb_idl_table *table
1789         = ovsdb_idl_table_from_class(idl, table_class);
1790     return next_real_row(table, hmap_first(&table->rows));
1791 }
1792
1793 /* Returns a row following 'row' within its table, or a null pointer if 'row'
1794  * is the last row in its table. */
1795 const struct ovsdb_idl_row *
1796 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
1797 {
1798     struct ovsdb_idl_table *table = row->table;
1799
1800     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
1801 }
1802
1803 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
1804  * transaction has changed 'column''s value, the modified value is returned.
1805  *
1806  * The caller must not modify or free the returned value.
1807  *
1808  * Various kinds of changes can invalidate the returned value: writing to the
1809  * same 'column' in 'row' (e.g. with ovsdb_idl_txn_write()), deleting 'row'
1810  * (e.g. with ovsdb_idl_txn_delete()), or completing an ongoing transaction
1811  * (e.g. with ovsdb_idl_txn_commit() or ovsdb_idl_txn_abort()).  If the
1812  * returned value is needed for a long time, it is best to make a copy of it
1813  * with ovsdb_datum_clone(). */
1814 const struct ovsdb_datum *
1815 ovsdb_idl_read(const struct ovsdb_idl_row *row,
1816                const struct ovsdb_idl_column *column)
1817 {
1818     const struct ovsdb_idl_table_class *class;
1819     size_t column_idx;
1820
1821     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
1822
1823     class = row->table->class;
1824     column_idx = column - class->columns;
1825
1826     ovs_assert(row->new != NULL);
1827     ovs_assert(column_idx < class->n_columns);
1828
1829     if (row->written && bitmap_is_set(row->written, column_idx)) {
1830         return &row->new[column_idx];
1831     } else if (row->old) {
1832         return &row->old[column_idx];
1833     } else {
1834         return ovsdb_datum_default(&column->type);
1835     }
1836 }
1837
1838 /* Same as ovsdb_idl_read(), except that it also asserts that 'column' has key
1839  * type 'key_type' and value type 'value_type'.  (Scalar and set types will
1840  * have a value type of OVSDB_TYPE_VOID.)
1841  *
1842  * This is useful in code that "knows" that a particular column has a given
1843  * type, so that it will abort if someone changes the column's type without
1844  * updating the code that uses it. */
1845 const struct ovsdb_datum *
1846 ovsdb_idl_get(const struct ovsdb_idl_row *row,
1847               const struct ovsdb_idl_column *column,
1848               enum ovsdb_atomic_type key_type OVS_UNUSED,
1849               enum ovsdb_atomic_type value_type OVS_UNUSED)
1850 {
1851     ovs_assert(column->type.key.type == key_type);
1852     ovs_assert(column->type.value.type == value_type);
1853
1854     return ovsdb_idl_read(row, column);
1855 }
1856
1857 /* Returns true if the field represented by 'column' in 'row' may be modified,
1858  * false if it is immutable.
1859  *
1860  * Normally, whether a field is mutable is controlled by its column's schema.
1861  * However, an immutable column can be set to any initial value at the time of
1862  * insertion, so if 'row' is a new row (one that is being added as part of the
1863  * current transaction, supposing that a transaction is in progress) then even
1864  * its "immutable" fields are actually mutable. */
1865 bool
1866 ovsdb_idl_is_mutable(const struct ovsdb_idl_row *row,
1867                      const struct ovsdb_idl_column *column)
1868 {
1869     return column->mutable || (row->new && !row->old);
1870 }
1871
1872 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
1873  * to all-zero-bits by some other entity.  If 'row' was set up some other way
1874  * then the return value is indeterminate. */
1875 bool
1876 ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
1877 {
1878     return row->table == NULL;
1879 }
1880 \f
1881 /* Transactions. */
1882
1883 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1884                                    enum ovsdb_idl_txn_status);
1885
1886 /* Returns a string representation of 'status'.  The caller must not modify or
1887  * free the returned string.
1888  *
1889  * The return value is probably useful only for debug log messages and unit
1890  * tests. */
1891 const char *
1892 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
1893 {
1894     switch (status) {
1895     case TXN_UNCOMMITTED:
1896         return "uncommitted";
1897     case TXN_UNCHANGED:
1898         return "unchanged";
1899     case TXN_INCOMPLETE:
1900         return "incomplete";
1901     case TXN_ABORTED:
1902         return "aborted";
1903     case TXN_SUCCESS:
1904         return "success";
1905     case TXN_TRY_AGAIN:
1906         return "try again";
1907     case TXN_NOT_LOCKED:
1908         return "not locked";
1909     case TXN_ERROR:
1910         return "error";
1911     }
1912     return "<unknown>";
1913 }
1914
1915 /* Starts a new transaction on 'idl'.  A given ovsdb_idl may only have a single
1916  * active transaction at a time.  See the large comment in ovsdb-idl.h for
1917  * general information on transactions. */
1918 struct ovsdb_idl_txn *
1919 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
1920 {
1921     struct ovsdb_idl_txn *txn;
1922
1923     ovs_assert(!idl->txn);
1924     idl->txn = txn = xmalloc(sizeof *txn);
1925     txn->request_id = NULL;
1926     txn->idl = idl;
1927     hmap_init(&txn->txn_rows);
1928     txn->status = TXN_UNCOMMITTED;
1929     txn->error = NULL;
1930     txn->dry_run = false;
1931     ds_init(&txn->comment);
1932
1933     txn->inc_table = NULL;
1934     txn->inc_column = NULL;
1935
1936     hmap_init(&txn->inserted_rows);
1937
1938     return txn;
1939 }
1940
1941 /* Appends 's', which is treated as a printf()-type format string, to the
1942  * comments that will be passed to the OVSDB server when 'txn' is committed.
1943  * (The comment will be committed to the OVSDB log, which "ovsdb-tool
1944  * show-log" can print in a relatively human-readable form.) */
1945 void
1946 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s, ...)
1947 {
1948     va_list args;
1949
1950     if (txn->comment.length) {
1951         ds_put_char(&txn->comment, '\n');
1952     }
1953
1954     va_start(args, s);
1955     ds_put_format_valist(&txn->comment, s, args);
1956     va_end(args);
1957 }
1958
1959 /* Marks 'txn' as a transaction that will not actually modify the database.  In
1960  * almost every way, the transaction is treated like other transactions.  It
1961  * must be committed or aborted like other transactions, it will be sent to the
1962  * database server like other transactions, and so on.  The only difference is
1963  * that the operations sent to the database server will include, as the last
1964  * step, an "abort" operation, so that any changes made by the transaction will
1965  * not actually take effect. */
1966 void
1967 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
1968 {
1969     txn->dry_run = true;
1970 }
1971
1972 /* Causes 'txn', when committed, to increment the value of 'column' within
1973  * 'row' by 1.  'column' must have an integer type.  After 'txn' commits
1974  * successfully, the client may retrieve the final (incremented) value of
1975  * 'column' with ovsdb_idl_txn_get_increment_new_value().
1976  *
1977  * The client could accomplish something similar with ovsdb_idl_read(),
1978  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
1979  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
1980  * will never (by itself) fail because of a verify error.
1981  *
1982  * The intended use is for incrementing the "next_cfg" column in the
1983  * Open_vSwitch table. */
1984 void
1985 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
1986                         const struct ovsdb_idl_row *row,
1987                         const struct ovsdb_idl_column *column)
1988 {
1989     ovs_assert(!txn->inc_table);
1990     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
1991     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
1992
1993     txn->inc_table = row->table->class->name;
1994     txn->inc_column = column->name;
1995     txn->inc_row = row->uuid;
1996 }
1997
1998 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
1999  * has been called for 'txn' but the commit is still incomplete (that is, the
2000  * last call returned TXN_INCOMPLETE) then the transaction may or may not still
2001  * end up committing at the database server, but the client will not be able to
2002  * get any further status information back. */
2003 void
2004 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
2005 {
2006     struct ovsdb_idl_txn_insert *insert, *next;
2007
2008     json_destroy(txn->request_id);
2009     if (txn->status == TXN_INCOMPLETE) {
2010         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2011     }
2012     ovsdb_idl_txn_abort(txn);
2013     ds_destroy(&txn->comment);
2014     free(txn->error);
2015     HMAP_FOR_EACH_SAFE (insert, next, hmap_node, &txn->inserted_rows) {
2016         free(insert);
2017     }
2018     hmap_destroy(&txn->inserted_rows);
2019     free(txn);
2020 }
2021
2022 /* Causes poll_block() to wake up if 'txn' has completed committing. */
2023 void
2024 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
2025 {
2026     if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
2027         poll_immediate_wake();
2028     }
2029 }
2030
2031 static struct json *
2032 where_uuid_equals(const struct uuid *uuid)
2033 {
2034     return
2035         json_array_create_1(
2036             json_array_create_3(
2037                 json_string_create("_uuid"),
2038                 json_string_create("=="),
2039                 json_array_create_2(
2040                     json_string_create("uuid"),
2041                     json_string_create_nocopy(
2042                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
2043 }
2044
2045 static char *
2046 uuid_name_from_uuid(const struct uuid *uuid)
2047 {
2048     char *name;
2049     char *p;
2050
2051     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
2052     for (p = name; *p != '\0'; p++) {
2053         if (*p == '-') {
2054             *p = '_';
2055         }
2056     }
2057
2058     return name;
2059 }
2060
2061 static const struct ovsdb_idl_row *
2062 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
2063 {
2064     const struct ovsdb_idl_row *row;
2065
2066     HMAP_FOR_EACH_WITH_HASH (row, txn_node, uuid_hash(uuid), &txn->txn_rows) {
2067         if (uuid_equals(&row->uuid, uuid)) {
2068             return row;
2069         }
2070     }
2071     return NULL;
2072 }
2073
2074 /* XXX there must be a cleaner way to do this */
2075 static struct json *
2076 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
2077 {
2078     if (json->type == JSON_ARRAY) {
2079         struct uuid uuid;
2080         size_t i;
2081
2082         if (json->u.array.n == 2
2083             && json->u.array.elems[0]->type == JSON_STRING
2084             && json->u.array.elems[1]->type == JSON_STRING
2085             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
2086             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
2087             const struct ovsdb_idl_row *row;
2088
2089             row = ovsdb_idl_txn_get_row(txn, &uuid);
2090             if (row && !row->old && row->new) {
2091                 json_destroy(json);
2092
2093                 return json_array_create_2(
2094                     json_string_create("named-uuid"),
2095                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
2096             }
2097         }
2098
2099         for (i = 0; i < json->u.array.n; i++) {
2100             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
2101                                                       txn);
2102         }
2103     } else if (json->type == JSON_OBJECT) {
2104         struct shash_node *node;
2105
2106         SHASH_FOR_EACH (node, json_object(json)) {
2107             node->data = substitute_uuids(node->data, txn);
2108         }
2109     }
2110     return json;
2111 }
2112
2113 static void
2114 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
2115 {
2116     struct ovsdb_idl_row *row, *next;
2117
2118     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
2119      * ovsdb_idl_column's 'parse' function, which will call
2120      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
2121      * transaction and fail to update the graph.  */
2122     txn->idl->txn = NULL;
2123
2124     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
2125         if (row->old) {
2126             if (row->written) {
2127                 ovsdb_idl_row_unparse(row);
2128                 ovsdb_idl_row_clear_arcs(row, false);
2129                 ovsdb_idl_row_parse(row);
2130             }
2131         } else {
2132             ovsdb_idl_row_unparse(row);
2133         }
2134         ovsdb_idl_row_clear_new(row);
2135
2136         free(row->prereqs);
2137         row->prereqs = NULL;
2138
2139         free(row->written);
2140         row->written = NULL;
2141
2142         hmap_remove(&txn->txn_rows, &row->txn_node);
2143         hmap_node_nullify(&row->txn_node);
2144         if (!row->old) {
2145             hmap_remove(&row->table->rows, &row->hmap_node);
2146             free(row);
2147         }
2148     }
2149     hmap_destroy(&txn->txn_rows);
2150     hmap_init(&txn->txn_rows);
2151 }
2152
2153 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
2154  * of the following TXN_* constants:
2155  *
2156  *   TXN_INCOMPLETE:
2157  *
2158  *       The transaction is in progress, but not yet complete.  The caller
2159  *       should call again later, after calling ovsdb_idl_run() to let the IDL
2160  *       do OVSDB protocol processing.
2161  *
2162  *   TXN_UNCHANGED:
2163  *
2164  *       The transaction is complete.  (It didn't actually change the database,
2165  *       so the IDL didn't send any request to the database server.)
2166  *
2167  *   TXN_ABORTED:
2168  *
2169  *       The caller previously called ovsdb_idl_txn_abort().
2170  *
2171  *   TXN_SUCCESS:
2172  *
2173  *       The transaction was successful.  The update made by the transaction
2174  *       (and possibly other changes made by other database clients) should
2175  *       already be visible in the IDL.
2176  *
2177  *   TXN_TRY_AGAIN:
2178  *
2179  *       The transaction failed for some transient reason, e.g. because a
2180  *       "verify" operation reported an inconsistency or due to a network
2181  *       problem.  The caller should wait for a change to the database, then
2182  *       compose a new transaction, and commit the new transaction.
2183  *
2184  *       Use the return value of ovsdb_idl_get_seqno() to wait for a change in
2185  *       the database.  It is important to use its return value *before* the
2186  *       initial call to ovsdb_idl_txn_commit() as the baseline for this
2187  *       purpose, because the change that one should wait for can happen after
2188  *       the initial call but before the call that returns TXN_TRY_AGAIN, and
2189  *       using some other baseline value in that situation could cause an
2190  *       indefinite wait if the database rarely changes.
2191  *
2192  *   TXN_NOT_LOCKED:
2193  *
2194  *       The transaction failed because the IDL has been configured to require
2195  *       a database lock (with ovsdb_idl_set_lock()) but didn't get it yet or
2196  *       has already lost it.
2197  *
2198  * Committing a transaction rolls back all of the changes that it made to the
2199  * IDL's copy of the database.  If the transaction commits successfully, then
2200  * the database server will send an update and, thus, the IDL will be updated
2201  * with the committed changes. */
2202 enum ovsdb_idl_txn_status
2203 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
2204 {
2205     struct ovsdb_idl_row *row;
2206     struct json *operations;
2207     bool any_updates;
2208
2209     if (txn != txn->idl->txn) {
2210         goto coverage_out;
2211     }
2212
2213     /* If we need a lock but don't have it, give up quickly. */
2214     if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
2215         txn->status = TXN_NOT_LOCKED;
2216         goto disassemble_out;
2217     }
2218
2219     operations = json_array_create_1(
2220         json_string_create(txn->idl->class->database));
2221
2222     /* Assert that we have the required lock (avoiding a race). */
2223     if (txn->idl->lock_name) {
2224         struct json *op = json_object_create();
2225         json_array_add(operations, op);
2226         json_object_put_string(op, "op", "assert");
2227         json_object_put_string(op, "lock", txn->idl->lock_name);
2228     }
2229
2230     /* Add prerequisites and declarations of new rows. */
2231     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2232         /* XXX check that deleted rows exist even if no prereqs? */
2233         if (row->prereqs) {
2234             const struct ovsdb_idl_table_class *class = row->table->class;
2235             size_t n_columns = class->n_columns;
2236             struct json *op, *columns, *row_json;
2237             size_t idx;
2238
2239             op = json_object_create();
2240             json_array_add(operations, op);
2241             json_object_put_string(op, "op", "wait");
2242             json_object_put_string(op, "table", class->name);
2243             json_object_put(op, "timeout", json_integer_create(0));
2244             json_object_put(op, "where", where_uuid_equals(&row->uuid));
2245             json_object_put_string(op, "until", "==");
2246             columns = json_array_create_empty();
2247             json_object_put(op, "columns", columns);
2248             row_json = json_object_create();
2249             json_object_put(op, "rows", json_array_create_1(row_json));
2250
2251             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
2252                 const struct ovsdb_idl_column *column = &class->columns[idx];
2253                 json_array_add(columns, json_string_create(column->name));
2254                 json_object_put(row_json, column->name,
2255                                 ovsdb_datum_to_json(&row->old[idx],
2256                                                     &column->type));
2257             }
2258         }
2259     }
2260
2261     /* Add updates. */
2262     any_updates = false;
2263     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2264         const struct ovsdb_idl_table_class *class = row->table->class;
2265
2266         if (!row->new) {
2267             if (class->is_root) {
2268                 struct json *op = json_object_create();
2269                 json_object_put_string(op, "op", "delete");
2270                 json_object_put_string(op, "table", class->name);
2271                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2272                 json_array_add(operations, op);
2273                 any_updates = true;
2274             } else {
2275                 /* Let ovsdb-server decide whether to really delete it. */
2276             }
2277         } else if (row->old != row->new) {
2278             struct json *row_json;
2279             struct json *op;
2280             size_t idx;
2281
2282             op = json_object_create();
2283             json_object_put_string(op, "op", row->old ? "update" : "insert");
2284             json_object_put_string(op, "table", class->name);
2285             if (row->old) {
2286                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2287             } else {
2288                 struct ovsdb_idl_txn_insert *insert;
2289
2290                 any_updates = true;
2291
2292                 json_object_put(op, "uuid-name",
2293                                 json_string_create_nocopy(
2294                                     uuid_name_from_uuid(&row->uuid)));
2295
2296                 insert = xmalloc(sizeof *insert);
2297                 insert->dummy = row->uuid;
2298                 insert->op_index = operations->u.array.n - 1;
2299                 uuid_zero(&insert->real);
2300                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
2301                             uuid_hash(&insert->dummy));
2302             }
2303             row_json = json_object_create();
2304             json_object_put(op, "row", row_json);
2305
2306             if (row->written) {
2307                 BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
2308                     const struct ovsdb_idl_column *column =
2309                                                         &class->columns[idx];
2310
2311                     if (row->old
2312                         || !ovsdb_datum_is_default(&row->new[idx],
2313                                                   &column->type)) {
2314                         json_object_put(row_json, column->name,
2315                                         substitute_uuids(
2316                                             ovsdb_datum_to_json(&row->new[idx],
2317                                                                 &column->type),
2318                                             txn));
2319
2320                         /* If anything really changed, consider it an update.
2321                          * We can't suppress not-really-changed values earlier
2322                          * or transactions would become nonatomic (see the big
2323                          * comment inside ovsdb_idl_txn_write()). */
2324                         if (!any_updates && row->old &&
2325                             !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
2326                                                 &column->type)) {
2327                             any_updates = true;
2328                         }
2329                     }
2330                 }
2331             }
2332
2333             if (!row->old || !shash_is_empty(json_object(row_json))) {
2334                 json_array_add(operations, op);
2335             } else {
2336                 json_destroy(op);
2337             }
2338         }
2339     }
2340
2341     /* Add increment. */
2342     if (txn->inc_table && any_updates) {
2343         struct json *op;
2344
2345         txn->inc_index = operations->u.array.n - 1;
2346
2347         op = json_object_create();
2348         json_object_put_string(op, "op", "mutate");
2349         json_object_put_string(op, "table", txn->inc_table);
2350         json_object_put(op, "where",
2351                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2352                                          txn));
2353         json_object_put(op, "mutations",
2354                         json_array_create_1(
2355                             json_array_create_3(
2356                                 json_string_create(txn->inc_column),
2357                                 json_string_create("+="),
2358                                 json_integer_create(1))));
2359         json_array_add(operations, op);
2360
2361         op = json_object_create();
2362         json_object_put_string(op, "op", "select");
2363         json_object_put_string(op, "table", txn->inc_table);
2364         json_object_put(op, "where",
2365                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2366                                          txn));
2367         json_object_put(op, "columns",
2368                         json_array_create_1(json_string_create(
2369                                                 txn->inc_column)));
2370         json_array_add(operations, op);
2371     }
2372
2373     if (txn->comment.length) {
2374         struct json *op = json_object_create();
2375         json_object_put_string(op, "op", "comment");
2376         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
2377         json_array_add(operations, op);
2378     }
2379
2380     if (txn->dry_run) {
2381         struct json *op = json_object_create();
2382         json_object_put_string(op, "op", "abort");
2383         json_array_add(operations, op);
2384     }
2385
2386     if (!any_updates) {
2387         txn->status = TXN_UNCHANGED;
2388         json_destroy(operations);
2389     } else if (!jsonrpc_session_send(
2390                    txn->idl->session,
2391                    jsonrpc_create_request(
2392                        "transact", operations, &txn->request_id))) {
2393         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
2394                     json_hash(txn->request_id, 0));
2395         txn->status = TXN_INCOMPLETE;
2396     } else {
2397         txn->status = TXN_TRY_AGAIN;
2398     }
2399
2400 disassemble_out:
2401     ovsdb_idl_txn_disassemble(txn);
2402 coverage_out:
2403     switch (txn->status) {
2404     case TXN_UNCOMMITTED:   COVERAGE_INC(txn_uncommitted);    break;
2405     case TXN_UNCHANGED:     COVERAGE_INC(txn_unchanged);      break;
2406     case TXN_INCOMPLETE:    COVERAGE_INC(txn_incomplete);     break;
2407     case TXN_ABORTED:       COVERAGE_INC(txn_aborted);        break;
2408     case TXN_SUCCESS:       COVERAGE_INC(txn_success);        break;
2409     case TXN_TRY_AGAIN:     COVERAGE_INC(txn_try_again);      break;
2410     case TXN_NOT_LOCKED:    COVERAGE_INC(txn_not_locked);     break;
2411     case TXN_ERROR:         COVERAGE_INC(txn_error);          break;
2412     }
2413
2414     return txn->status;
2415 }
2416
2417 /* Attempts to commit 'txn', blocking until the commit either succeeds or
2418  * fails.  Returns the final commit status, which may be any TXN_* value other
2419  * than TXN_INCOMPLETE.
2420  *
2421  * This function calls ovsdb_idl_run() on 'txn''s IDL, so it may cause the
2422  * return value of ovsdb_idl_get_seqno() to change. */
2423 enum ovsdb_idl_txn_status
2424 ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
2425 {
2426     enum ovsdb_idl_txn_status status;
2427
2428     fatal_signal_run();
2429     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
2430         ovsdb_idl_run(txn->idl);
2431         ovsdb_idl_wait(txn->idl);
2432         ovsdb_idl_txn_wait(txn);
2433         poll_block();
2434     }
2435     return status;
2436 }
2437
2438 /* Returns the final (incremented) value of the column in 'txn' that was set to
2439  * be incremented by ovsdb_idl_txn_increment().  'txn' must have committed
2440  * successfully. */
2441 int64_t
2442 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
2443 {
2444     ovs_assert(txn->status == TXN_SUCCESS);
2445     return txn->inc_new_value;
2446 }
2447
2448 /* Aborts 'txn' without sending it to the database server.  This is effective
2449  * only if ovsdb_idl_txn_commit() has not yet been called for 'txn'.
2450  * Otherwise, it has no effect.
2451  *
2452  * Aborting a transaction doesn't free its memory.  Use
2453  * ovsdb_idl_txn_destroy() to do that. */
2454 void
2455 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
2456 {
2457     ovsdb_idl_txn_disassemble(txn);
2458     if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
2459         txn->status = TXN_ABORTED;
2460     }
2461 }
2462
2463 /* Returns a string that reports the error status for 'txn'.  The caller must
2464  * not modify or free the returned string.  A call to ovsdb_idl_txn_destroy()
2465  * for 'txn' may free the returned string.
2466  *
2467  * The return value is ordinarily one of the strings that
2468  * ovsdb_idl_txn_status_to_string() would return, but if the transaction failed
2469  * due to an error reported by the database server, the return value is that
2470  * error. */
2471 const char *
2472 ovsdb_idl_txn_get_error(const struct ovsdb_idl_txn *txn)
2473 {
2474     if (txn->status != TXN_ERROR) {
2475         return ovsdb_idl_txn_status_to_string(txn->status);
2476     } else if (txn->error) {
2477         return txn->error;
2478     } else {
2479         return "no error details available";
2480     }
2481 }
2482
2483 static void
2484 ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn,
2485                              const struct json *json)
2486 {
2487     if (txn->error == NULL) {
2488         txn->error = json_to_string(json, JSSF_SORT);
2489     }
2490 }
2491
2492 /* For transaction 'txn' that completed successfully, finds and returns the
2493  * permanent UUID that the database assigned to a newly inserted row, given the
2494  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
2495  *
2496  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
2497  * if it was assigned by that function and then deleted by
2498  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
2499  * and then deleted within a single transaction are never sent to the database
2500  * server, so it never assigns them a permanent UUID.) */
2501 const struct uuid *
2502 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
2503                               const struct uuid *uuid)
2504 {
2505     const struct ovsdb_idl_txn_insert *insert;
2506
2507     ovs_assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
2508     HMAP_FOR_EACH_IN_BUCKET (insert, hmap_node,
2509                              uuid_hash(uuid), &txn->inserted_rows) {
2510         if (uuid_equals(uuid, &insert->dummy)) {
2511             return &insert->real;
2512         }
2513     }
2514     return NULL;
2515 }
2516
2517 static void
2518 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
2519                        enum ovsdb_idl_txn_status status)
2520 {
2521     txn->status = status;
2522     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2523 }
2524
2525 /* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
2526  * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
2527  * ovs-vswitchd).
2528  *
2529  * 'datum' must have the correct type for its column.  The IDL does not check
2530  * that it meets schema constraints, but ovsdb-server will do so at commit time
2531  * so it had better be correct.
2532  *
2533  * A transaction must be in progress.  Replication of 'column' must not have
2534  * been disabled (by calling ovsdb_idl_omit()).
2535  *
2536  * Usually this function is used indirectly through one of the "set" functions
2537  * generated by ovsdb-idlc.
2538  *
2539  * Takes ownership of what 'datum' points to (and in some cases destroys that
2540  * data before returning) but makes a copy of 'datum' itself.  (Commonly
2541  * 'datum' is on the caller's stack.) */
2542 static void
2543 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
2544                       const struct ovsdb_idl_column *column,
2545                       struct ovsdb_datum *datum, bool owns_datum)
2546 {
2547     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2548     const struct ovsdb_idl_table_class *class;
2549     size_t column_idx;
2550     bool write_only;
2551
2552     if (ovsdb_idl_row_is_synthetic(row)) {
2553         goto discard_datum;
2554     }
2555
2556     class = row->table->class;
2557     column_idx = column - class->columns;
2558     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
2559
2560     ovs_assert(row->new != NULL);
2561     ovs_assert(column_idx < class->n_columns);
2562     ovs_assert(row->old == NULL ||
2563                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2564
2565     if (row->table->idl->verify_write_only && !write_only) {
2566         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
2567                  " explicitly configured not to.", class->name, column->name);
2568         goto discard_datum;
2569     }
2570
2571     /* If this is a write-only column and the datum being written is the same
2572      * as the one already there, just skip the update entirely.  This is worth
2573      * optimizing because we have a lot of columns that get periodically
2574      * refreshed into the database but don't actually change that often.
2575      *
2576      * We don't do this for read/write columns because that would break
2577      * atomicity of transactions--some other client might have written a
2578      * different value in that column since we read it.  (But if a whole
2579      * transaction only does writes of existing values, without making any real
2580      * changes, we will drop the whole transaction later in
2581      * ovsdb_idl_txn_commit().) */
2582     if (write_only && ovsdb_datum_equals(ovsdb_idl_read(row, column),
2583                                          datum, &column->type)) {
2584         goto discard_datum;
2585     }
2586
2587     if (hmap_node_is_null(&row->txn_node)) {
2588         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2589                     uuid_hash(&row->uuid));
2590     }
2591     if (row->old == row->new) {
2592         row->new = xmalloc(class->n_columns * sizeof *row->new);
2593     }
2594     if (!row->written) {
2595         row->written = bitmap_allocate(class->n_columns);
2596     }
2597     if (bitmap_is_set(row->written, column_idx)) {
2598         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
2599     } else {
2600         bitmap_set1(row->written, column_idx);
2601     }
2602     if (owns_datum) {
2603         row->new[column_idx] = *datum;
2604     } else {
2605         ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
2606     }
2607     (column->unparse)(row);
2608     (column->parse)(row, &row->new[column_idx]);
2609     return;
2610
2611 discard_datum:
2612     if (owns_datum) {
2613         ovsdb_datum_destroy(datum, &column->type);
2614     }
2615 }
2616
2617 void
2618 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
2619                     const struct ovsdb_idl_column *column,
2620                     struct ovsdb_datum *datum)
2621 {
2622     ovsdb_idl_txn_write__(row, column, datum, true);
2623 }
2624
2625 void
2626 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
2627                           const struct ovsdb_idl_column *column,
2628                           const struct ovsdb_datum *datum)
2629 {
2630     ovsdb_idl_txn_write__(row, column,
2631                           CONST_CAST(struct ovsdb_datum *, datum), false);
2632 }
2633
2634 /* Causes the original contents of 'column' in 'row_' to be verified as a
2635  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
2636  * changed (or if 'row_' was deleted) between the time that the IDL originally
2637  * read its contents and the time that the transaction commits, then the
2638  * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
2639  * TXN_AGAIN_NOW (depending on whether the database change has already been
2640  * received).
2641  *
2642  * The intention is that, to ensure that no transaction commits based on dirty
2643  * reads, an application should call ovsdb_idl_txn_verify() on each data item
2644  * read as part of a read-modify-write operation.
2645  *
2646  * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
2647  * value of 'column' is already known:
2648  *
2649  *   - If 'row_' is a row created by the current transaction (returned by
2650  *     ovsdb_idl_txn_insert()).
2651  *
2652  *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
2653  *     within the current transaction.
2654  *
2655  * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
2656  * ovsdb_idl_txn_write() for a given read-modify-write.
2657  *
2658  * A transaction must be in progress.
2659  *
2660  * Usually this function is used indirectly through one of the "verify"
2661  * functions generated by ovsdb-idlc. */
2662 void
2663 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
2664                      const struct ovsdb_idl_column *column)
2665 {
2666     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2667     const struct ovsdb_idl_table_class *class;
2668     size_t column_idx;
2669
2670     if (ovsdb_idl_row_is_synthetic(row)) {
2671         return;
2672     }
2673
2674     class = row->table->class;
2675     column_idx = column - class->columns;
2676
2677     ovs_assert(row->new != NULL);
2678     ovs_assert(row->old == NULL ||
2679                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2680     if (!row->old
2681         || (row->written && bitmap_is_set(row->written, column_idx))) {
2682         return;
2683     }
2684
2685     if (hmap_node_is_null(&row->txn_node)) {
2686         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2687                     uuid_hash(&row->uuid));
2688     }
2689     if (!row->prereqs) {
2690         row->prereqs = bitmap_allocate(class->n_columns);
2691     }
2692     bitmap_set1(row->prereqs, column_idx);
2693 }
2694
2695 /* Deletes 'row_' from its table.  May free 'row_', so it must not be
2696  * accessed afterward.
2697  *
2698  * A transaction must be in progress.
2699  *
2700  * Usually this function is used indirectly through one of the "delete"
2701  * functions generated by ovsdb-idlc. */
2702 void
2703 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
2704 {
2705     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2706
2707     if (ovsdb_idl_row_is_synthetic(row)) {
2708         return;
2709     }
2710
2711     ovs_assert(row->new != NULL);
2712     if (!row->old) {
2713         ovsdb_idl_row_unparse(row);
2714         ovsdb_idl_row_clear_new(row);
2715         ovs_assert(!row->prereqs);
2716         hmap_remove(&row->table->rows, &row->hmap_node);
2717         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
2718         free(row);
2719         return;
2720     }
2721     if (hmap_node_is_null(&row->txn_node)) {
2722         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2723                     uuid_hash(&row->uuid));
2724     }
2725     ovsdb_idl_row_clear_new(row);
2726     row->new = NULL;
2727 }
2728
2729 /* Inserts and returns a new row in the table with the specified 'class' in the
2730  * database with open transaction 'txn'.
2731  *
2732  * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
2733  * randomly generated; otherwise 'uuid' should specify a randomly generated
2734  * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
2735  * 'txn' is committed, but the IDL will replace any uses of the provisional
2736  * UUID in the data to be to be committed by the UUID assigned by
2737  * ovsdb-server.
2738  *
2739  * Usually this function is used indirectly through one of the "insert"
2740  * functions generated by ovsdb-idlc. */
2741 const struct ovsdb_idl_row *
2742 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
2743                      const struct ovsdb_idl_table_class *class,
2744                      const struct uuid *uuid)
2745 {
2746     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
2747
2748     if (uuid) {
2749         ovs_assert(!ovsdb_idl_txn_get_row(txn, uuid));
2750         row->uuid = *uuid;
2751     } else {
2752         uuid_generate(&row->uuid);
2753     }
2754
2755     row->table = ovsdb_idl_table_from_class(txn->idl, class);
2756     row->new = xmalloc(class->n_columns * sizeof *row->new);
2757     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
2758     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
2759     return row;
2760 }
2761
2762 static void
2763 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
2764 {
2765     struct ovsdb_idl_txn *txn;
2766
2767     HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
2768         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
2769     }
2770 }
2771
2772 static struct ovsdb_idl_txn *
2773 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
2774 {
2775     struct ovsdb_idl_txn *txn;
2776
2777     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
2778                              json_hash(id, 0), &idl->outstanding_txns) {
2779         if (json_equal(id, txn->request_id)) {
2780             return txn;
2781         }
2782     }
2783     return NULL;
2784 }
2785
2786 static bool
2787 check_json_type(const struct json *json, enum json_type type, const char *name)
2788 {
2789     if (!json) {
2790         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
2791         return false;
2792     } else if (json->type != type) {
2793         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
2794                      name, json_type_to_string(json->type),
2795                      json_type_to_string(type));
2796         return false;
2797     } else {
2798         return true;
2799     }
2800 }
2801
2802 static bool
2803 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
2804                                 const struct json_array *results)
2805 {
2806     struct json *count, *rows, *row, *column;
2807     struct shash *mutate, *select;
2808
2809     if (txn->inc_index + 2 > results->n) {
2810         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2811                      "for increment (has %"PRIuSIZE", needs %u)",
2812                      results->n, txn->inc_index + 2);
2813         return false;
2814     }
2815
2816     /* We know that this is a JSON object because the loop in
2817      * ovsdb_idl_txn_process_reply() checked. */
2818     mutate = json_object(results->elems[txn->inc_index]);
2819     count = shash_find_data(mutate, "count");
2820     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
2821         return false;
2822     }
2823     if (count->u.integer != 1) {
2824         VLOG_WARN_RL(&syntax_rl,
2825                      "\"mutate\" reply \"count\" is %lld instead of 1",
2826                      count->u.integer);
2827         return false;
2828     }
2829
2830     select = json_object(results->elems[txn->inc_index + 1]);
2831     rows = shash_find_data(select, "rows");
2832     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
2833         return false;
2834     }
2835     if (rows->u.array.n != 1) {
2836         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %"PRIuSIZE" elements "
2837                      "instead of 1",
2838                      rows->u.array.n);
2839         return false;
2840     }
2841     row = rows->u.array.elems[0];
2842     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
2843         return false;
2844     }
2845     column = shash_find_data(json_object(row), txn->inc_column);
2846     if (!check_json_type(column, JSON_INTEGER,
2847                          "\"select\" reply inc column")) {
2848         return false;
2849     }
2850     txn->inc_new_value = column->u.integer;
2851     return true;
2852 }
2853
2854 static bool
2855 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
2856                                    const struct json_array *results)
2857 {
2858     static const struct ovsdb_base_type uuid_type = OVSDB_BASE_UUID_INIT;
2859     struct ovsdb_error *error;
2860     struct json *json_uuid;
2861     union ovsdb_atom uuid;
2862     struct shash *reply;
2863
2864     if (insert->op_index >= results->n) {
2865         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2866                      "for insert (has %"PRIuSIZE", needs %u)",
2867                      results->n, insert->op_index);
2868         return false;
2869     }
2870
2871     /* We know that this is a JSON object because the loop in
2872      * ovsdb_idl_txn_process_reply() checked. */
2873     reply = json_object(results->elems[insert->op_index]);
2874     json_uuid = shash_find_data(reply, "uuid");
2875     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
2876         return false;
2877     }
2878
2879     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
2880     if (error) {
2881         char *s = ovsdb_error_to_string(error);
2882         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
2883                      "UUID: %s", s);
2884         free(s);
2885         ovsdb_error_destroy(error);
2886         return false;
2887     }
2888
2889     insert->real = uuid.uuid;
2890
2891     return true;
2892 }
2893
2894 static bool
2895 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
2896                             const struct jsonrpc_msg *msg)
2897 {
2898     struct ovsdb_idl_txn *txn;
2899     enum ovsdb_idl_txn_status status;
2900
2901     txn = ovsdb_idl_txn_find(idl, msg->id);
2902     if (!txn) {
2903         return false;
2904     }
2905
2906     if (msg->type == JSONRPC_ERROR) {
2907         status = TXN_ERROR;
2908     } else if (msg->result->type != JSON_ARRAY) {
2909         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
2910         status = TXN_ERROR;
2911     } else {
2912         struct json_array *ops = &msg->result->u.array;
2913         int hard_errors = 0;
2914         int soft_errors = 0;
2915         int lock_errors = 0;
2916         size_t i;
2917
2918         for (i = 0; i < ops->n; i++) {
2919             struct json *op = ops->elems[i];
2920
2921             if (op->type == JSON_NULL) {
2922                 /* This isn't an error in itself but indicates that some prior
2923                  * operation failed, so make sure that we know about it. */
2924                 soft_errors++;
2925             } else if (op->type == JSON_OBJECT) {
2926                 struct json *error;
2927
2928                 error = shash_find_data(json_object(op), "error");
2929                 if (error) {
2930                     if (error->type == JSON_STRING) {
2931                         if (!strcmp(error->u.string, "timed out")) {
2932                             soft_errors++;
2933                         } else if (!strcmp(error->u.string, "not owner")) {
2934                             lock_errors++;
2935                         } else if (strcmp(error->u.string, "aborted")) {
2936                             hard_errors++;
2937                             ovsdb_idl_txn_set_error_json(txn, op);
2938                         }
2939                     } else {
2940                         hard_errors++;
2941                         ovsdb_idl_txn_set_error_json(txn, op);
2942                         VLOG_WARN_RL(&syntax_rl,
2943                                      "\"error\" in reply is not JSON string");
2944                     }
2945                 }
2946             } else {
2947                 hard_errors++;
2948                 ovsdb_idl_txn_set_error_json(txn, op);
2949                 VLOG_WARN_RL(&syntax_rl,
2950                              "operation reply is not JSON null or object");
2951             }
2952         }
2953
2954         if (!soft_errors && !hard_errors && !lock_errors) {
2955             struct ovsdb_idl_txn_insert *insert;
2956
2957             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
2958                 hard_errors++;
2959             }
2960
2961             HMAP_FOR_EACH (insert, hmap_node, &txn->inserted_rows) {
2962                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
2963                     hard_errors++;
2964                 }
2965             }
2966         }
2967
2968         status = (hard_errors ? TXN_ERROR
2969                   : lock_errors ? TXN_NOT_LOCKED
2970                   : soft_errors ? TXN_TRY_AGAIN
2971                   : TXN_SUCCESS);
2972     }
2973
2974     ovsdb_idl_txn_complete(txn, status);
2975     return true;
2976 }
2977
2978 /* Returns the transaction currently active for 'row''s IDL.  A transaction
2979  * must currently be active. */
2980 struct ovsdb_idl_txn *
2981 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
2982 {
2983     struct ovsdb_idl_txn *txn = row->table->idl->txn;
2984     ovs_assert(txn != NULL);
2985     return txn;
2986 }
2987
2988 /* Returns the IDL on which 'txn' acts. */
2989 struct ovsdb_idl *
2990 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
2991 {
2992     return txn->idl;
2993 }
2994
2995 /* Blocks until 'idl' successfully connects to the remote database and
2996  * retrieves its contents. */
2997 void
2998 ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
2999 {
3000     while (1) {
3001         ovsdb_idl_run(idl);
3002         if (ovsdb_idl_has_ever_connected(idl)) {
3003             return;
3004         }
3005         ovsdb_idl_wait(idl);
3006         poll_block();
3007     }
3008 }
3009 \f
3010 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
3011  * the database server and to avoid modifying the database when the lock cannot
3012  * be acquired (that is, when another client has the same lock).
3013  *
3014  * If 'lock_name' is NULL, drops the locking requirement and releases the
3015  * lock. */
3016 void
3017 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
3018 {
3019     ovs_assert(!idl->txn);
3020     ovs_assert(hmap_is_empty(&idl->outstanding_txns));
3021
3022     if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
3023         /* Release previous lock. */
3024         ovsdb_idl_send_unlock_request(idl);
3025         free(idl->lock_name);
3026         idl->lock_name = NULL;
3027         idl->is_lock_contended = false;
3028     }
3029
3030     if (lock_name && !idl->lock_name) {
3031         /* Acquire new lock. */
3032         idl->lock_name = xstrdup(lock_name);
3033         ovsdb_idl_send_lock_request(idl);
3034     }
3035 }
3036
3037 /* Returns true if 'idl' is configured to obtain a lock and owns that lock.
3038  *
3039  * Locking and unlocking happens asynchronously from the database client's
3040  * point of view, so the information is only useful for optimization (e.g. if
3041  * the client doesn't have the lock then there's no point in trying to write to
3042  * the database). */
3043 bool
3044 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
3045 {
3046     return idl->has_lock;
3047 }
3048
3049 /* Returns true if 'idl' is configured to obtain a lock but the database server
3050  * has indicated that some other client already owns the requested lock. */
3051 bool
3052 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
3053 {
3054     return idl->is_lock_contended;
3055 }
3056
3057 static void
3058 ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
3059 {
3060     if (new_has_lock && !idl->has_lock) {
3061         if (idl->state == IDL_S_MONITORING ||
3062             idl->state == IDL_S_MONITORING2) {
3063             idl->change_seqno++;
3064         } else {
3065             /* We're setting up a session, so don't signal that the database
3066              * changed.  Finalizing the session will increment change_seqno
3067              * anyhow. */
3068         }
3069         idl->is_lock_contended = false;
3070     }
3071     idl->has_lock = new_has_lock;
3072 }
3073
3074 static void
3075 ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
3076                               struct json **idp)
3077 {
3078     ovsdb_idl_update_has_lock(idl, false);
3079
3080     json_destroy(idl->lock_request_id);
3081     idl->lock_request_id = NULL;
3082
3083     if (jsonrpc_session_is_connected(idl->session)) {
3084         struct json *params;
3085
3086         params = json_array_create_1(json_string_create(idl->lock_name));
3087         jsonrpc_session_send(idl->session,
3088                              jsonrpc_create_request(method, params, idp));
3089     }
3090 }
3091
3092 static void
3093 ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
3094 {
3095     ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
3096 }
3097
3098 static void
3099 ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
3100 {
3101     ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
3102 }
3103
3104 static void
3105 ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
3106 {
3107     bool got_lock;
3108
3109     json_destroy(idl->lock_request_id);
3110     idl->lock_request_id = NULL;
3111
3112     if (result->type == JSON_OBJECT) {
3113         const struct json *locked;
3114
3115         locked = shash_find_data(json_object(result), "locked");
3116         got_lock = locked && locked->type == JSON_TRUE;
3117     } else {
3118         got_lock = false;
3119     }
3120
3121     ovsdb_idl_update_has_lock(idl, got_lock);
3122     if (!got_lock) {
3123         idl->is_lock_contended = true;
3124     }
3125 }
3126
3127 static void
3128 ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
3129                             const struct json *params,
3130                             bool new_has_lock)
3131 {
3132     if (idl->lock_name
3133         && params->type == JSON_ARRAY
3134         && json_array(params)->n > 0
3135         && json_array(params)->elems[0]->type == JSON_STRING) {
3136         const char *lock_name = json_string(json_array(params)->elems[0]);
3137
3138         if (!strcmp(idl->lock_name, lock_name)) {
3139             ovsdb_idl_update_has_lock(idl, new_has_lock);
3140             if (!new_has_lock) {
3141                 idl->is_lock_contended = true;
3142             }
3143         }
3144     }
3145 }
3146
3147 void
3148 ovsdb_idl_loop_destroy(struct ovsdb_idl_loop *loop)
3149 {
3150     if (loop) {
3151         ovsdb_idl_destroy(loop->idl);
3152     }
3153 }
3154
3155 struct ovsdb_idl_txn *
3156 ovsdb_idl_loop_run(struct ovsdb_idl_loop *loop)
3157 {
3158     ovsdb_idl_run(loop->idl);
3159     loop->open_txn = (loop->committing_txn
3160                       || ovsdb_idl_get_seqno(loop->idl) == loop->skip_seqno
3161                       ? NULL
3162                       : ovsdb_idl_txn_create(loop->idl));
3163     return loop->open_txn;
3164 }
3165
3166 void
3167 ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
3168 {
3169     if (loop->open_txn) {
3170         loop->committing_txn = loop->open_txn;
3171         loop->open_txn = NULL;
3172
3173         loop->precommit_seqno = ovsdb_idl_get_seqno(loop->idl);
3174     }
3175
3176     struct ovsdb_idl_txn *txn = loop->committing_txn;
3177     if (txn) {
3178         enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(txn);
3179         if (status != TXN_INCOMPLETE) {
3180             switch (status) {
3181             case TXN_TRY_AGAIN:
3182                 /* We want to re-evaluate the database when it's changed from
3183                  * the contents that it had when we started the commit.  (That
3184                  * might have already happened.) */
3185                 loop->skip_seqno = loop->precommit_seqno;
3186                 if (ovsdb_idl_get_seqno(loop->idl) != loop->skip_seqno) {
3187                     poll_immediate_wake();
3188                 }
3189                 break;
3190
3191             case TXN_SUCCESS:
3192                 /* If the database has already changed since we started the
3193                  * commit, re-evaluate it immediately to avoid missing a change
3194                  * for a while. */
3195                 if (ovsdb_idl_get_seqno(loop->idl) != loop->precommit_seqno) {
3196                     poll_immediate_wake();
3197                 }
3198                 break;
3199
3200             case TXN_UNCHANGED:
3201             case TXN_ABORTED:
3202             case TXN_NOT_LOCKED:
3203             case TXN_ERROR:
3204                 break;
3205
3206             case TXN_UNCOMMITTED:
3207             case TXN_INCOMPLETE:
3208                 OVS_NOT_REACHED();
3209
3210             }
3211             ovsdb_idl_txn_destroy(txn);
3212             loop->committing_txn = NULL;
3213         }
3214     }
3215
3216     ovsdb_idl_wait(loop->idl);
3217 }