netdev-dpdk: fix mbuf leaks
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "coverage.h"
27 #include "dynamic-string.h"
28 #include "fatal-signal.h"
29 #include "json.h"
30 #include "jsonrpc.h"
31 #include "ovsdb/ovsdb.h"
32 #include "ovsdb/table.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-idl-provider.h"
36 #include "ovsdb-parser.h"
37 #include "poll-loop.h"
38 #include "shash.h"
39 #include "sset.h"
40 #include "util.h"
41 #include "openvswitch/vlog.h"
42
43 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
44
45 COVERAGE_DEFINE(txn_uncommitted);
46 COVERAGE_DEFINE(txn_unchanged);
47 COVERAGE_DEFINE(txn_incomplete);
48 COVERAGE_DEFINE(txn_aborted);
49 COVERAGE_DEFINE(txn_success);
50 COVERAGE_DEFINE(txn_try_again);
51 COVERAGE_DEFINE(txn_not_locked);
52 COVERAGE_DEFINE(txn_error);
53
54 /* An arc from one idl_row to another.  When row A contains a UUID that
55  * references row B, this is represented by an arc from A (the source) to B
56  * (the destination).
57  *
58  * Arcs from a row to itself are omitted, that is, src and dst are always
59  * different.
60  *
61  * Arcs are never duplicated, that is, even if there are multiple references
62  * from A to B, there is only a single arc from A to B.
63  *
64  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
65  * A.  Both an arc and its converse may both be present, if each row refers
66  * to the other circularly.
67  *
68  * The source and destination row may be in the same table or in different
69  * tables.
70  */
71 struct ovsdb_idl_arc {
72     struct ovs_list src_node;   /* In src->src_arcs list. */
73     struct ovs_list dst_node;   /* In dst->dst_arcs list. */
74     struct ovsdb_idl_row *src;  /* Source row. */
75     struct ovsdb_idl_row *dst;  /* Destination row. */
76 };
77
78 enum ovsdb_idl_state {
79     IDL_S_SCHEMA_REQUESTED,
80     IDL_S_MONITOR_REQUESTED,
81     IDL_S_MONITORING,
82     IDL_S_MONITOR2_REQUESTED,
83     IDL_S_MONITORING2
84 };
85
86 struct ovsdb_idl {
87     const struct ovsdb_idl_class *class;
88     struct jsonrpc_session *session;
89     struct shash table_by_name;
90     struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
91     unsigned int change_seqno;
92     bool verify_write_only;
93
94     /* Session state. */
95     unsigned int state_seqno;
96     enum ovsdb_idl_state state;
97     struct json *request_id;
98     struct json *schema;
99
100     /* Database locking. */
101     char *lock_name;            /* Name of lock we need, NULL if none. */
102     bool has_lock;              /* Has db server told us we have the lock? */
103     bool is_lock_contended;     /* Has db server told us we can't get lock? */
104     struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
105
106     /* Transaction support. */
107     struct ovsdb_idl_txn *txn;
108     struct hmap outstanding_txns;
109 };
110
111 struct ovsdb_idl_txn {
112     struct hmap_node hmap_node;
113     struct json *request_id;
114     struct ovsdb_idl *idl;
115     struct hmap txn_rows;
116     enum ovsdb_idl_txn_status status;
117     char *error;
118     bool dry_run;
119     struct ds comment;
120
121     /* Increments. */
122     const char *inc_table;
123     const char *inc_column;
124     struct uuid inc_row;
125     unsigned int inc_index;
126     int64_t inc_new_value;
127
128     /* Inserted rows. */
129     struct hmap inserted_rows;  /* Contains "struct ovsdb_idl_txn_insert"s. */
130 };
131
132 struct ovsdb_idl_txn_insert {
133     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
134     struct uuid dummy;          /* Dummy UUID used locally. */
135     int op_index;               /* Index into transaction's operation array. */
136     struct uuid real;           /* Real UUID used by database server. */
137 };
138
139 enum ovsdb_update_version {
140     OVSDB_UPDATE,               /* RFC 7047 "update" method. */
141     OVSDB_UPDATE2               /* "update2" Extension to RFC 7047.
142                                    See ovsdb-server(1) for more information. */
143 };
144
145 /* Name arrays indexed by 'enum ovsdb_update_version'. */
146 static const char *table_updates_names[] = {"table_updates", "table_updates2"};
147 static const char *table_update_names[] = {"table_update", "table_update2"};
148 static const char *row_update_names[] = {"row_update", "row_update2"};
149
150 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
151 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
152
153 static void ovsdb_idl_clear(struct ovsdb_idl *);
154 static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
155 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
156 static void ovsdb_idl_send_monitor2_request(struct ovsdb_idl *);
157 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *,
158                                    enum ovsdb_update_version);
159 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
160                                                     const struct json *,
161                                                     enum ovsdb_update_version);
162 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
163                                      const struct uuid *,
164                                      const struct json *old,
165                                      const struct json *new);
166 static bool ovsdb_idl_process_update2(struct ovsdb_idl_table *,
167                                       const struct uuid *,
168                                       const char *operation,
169                                       const struct json *row);
170 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
171 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
172 static bool ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
173 static bool ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *,
174                                          const struct json *);
175
176 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
177 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
178     const struct ovsdb_idl_table_class *);
179 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
180                                                   const struct uuid *);
181 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
182 static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *);
183
184 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
185 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
186 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
187 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
188 static void ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *, bool destroy_dsts);
189
190 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
191 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
192                                         const struct jsonrpc_msg *msg);
193
194 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
195 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
196 static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
197                                        const struct json *);
198 static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
199                                         const struct json *params,
200                                         bool new_has_lock);
201 static struct ovsdb_idl_table *
202 ovsdb_idl_table_from_class(const struct ovsdb_idl *,
203                            const struct ovsdb_idl_table_class *);
204 static bool ovsdb_idl_track_is_set(struct ovsdb_idl_table *table);
205
206 /* Creates and returns a connection to database 'remote', which should be in a
207  * form acceptable to jsonrpc_session_open().  The connection will maintain an
208  * in-memory replica of the remote database whose schema is described by
209  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
210  * by ovsdb-idlc.)
211  *
212  * Passes 'retry' to jsonrpc_session_open().  See that function for
213  * documentation.
214  *
215  * If 'monitor_everything_by_default' is true, then everything in the remote
216  * database will be replicated by default.  ovsdb_idl_omit() and
217  * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
218  * monitoring.
219  *
220  * If 'monitor_everything_by_default' is false, then no columns or tables will
221  * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
222  * must be used to choose some columns or tables to replicate.
223  */
224 struct ovsdb_idl *
225 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
226                  bool monitor_everything_by_default, bool retry)
227 {
228     struct ovsdb_idl *idl;
229     uint8_t default_mode;
230     size_t i;
231
232     default_mode = (monitor_everything_by_default
233                     ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
234                     : 0);
235
236     idl = xzalloc(sizeof *idl);
237     idl->class = class;
238     idl->session = jsonrpc_session_open(remote, retry);
239     shash_init(&idl->table_by_name);
240     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
241     for (i = 0; i < class->n_tables; i++) {
242         const struct ovsdb_idl_table_class *tc = &class->tables[i];
243         struct ovsdb_idl_table *table = &idl->tables[i];
244         size_t j;
245
246         shash_add_assert(&idl->table_by_name, tc->name, table);
247         table->class = tc;
248         table->modes = xmalloc(tc->n_columns);
249         memset(table->modes, default_mode, tc->n_columns);
250         table->need_table = false;
251         shash_init(&table->columns);
252         for (j = 0; j < tc->n_columns; j++) {
253             const struct ovsdb_idl_column *column = &tc->columns[j];
254
255             shash_add_assert(&table->columns, column->name, column);
256         }
257         hmap_init(&table->rows);
258         list_init(&table->track_list);
259         table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
260             = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
261             = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
262         table->idl = idl;
263     }
264
265     idl->state_seqno = UINT_MAX;
266     idl->request_id = NULL;
267     idl->schema = NULL;
268
269     hmap_init(&idl->outstanding_txns);
270
271     return idl;
272 }
273
274 /* Destroys 'idl' and all of the data structures that it manages. */
275 void
276 ovsdb_idl_destroy(struct ovsdb_idl *idl)
277 {
278     if (idl) {
279         size_t i;
280
281         ovs_assert(!idl->txn);
282         ovsdb_idl_clear(idl);
283         jsonrpc_session_close(idl->session);
284
285         for (i = 0; i < idl->class->n_tables; i++) {
286             struct ovsdb_idl_table *table = &idl->tables[i];
287             shash_destroy(&table->columns);
288             hmap_destroy(&table->rows);
289             free(table->modes);
290         }
291         shash_destroy(&idl->table_by_name);
292         free(idl->tables);
293         json_destroy(idl->request_id);
294         free(idl->lock_name);
295         json_destroy(idl->lock_request_id);
296         json_destroy(idl->schema);
297         hmap_destroy(&idl->outstanding_txns);
298         free(idl);
299     }
300 }
301
302 static void
303 ovsdb_idl_clear(struct ovsdb_idl *idl)
304 {
305     bool changed = false;
306     size_t i;
307
308     for (i = 0; i < idl->class->n_tables; i++) {
309         struct ovsdb_idl_table *table = &idl->tables[i];
310         struct ovsdb_idl_row *row, *next_row;
311
312         if (hmap_is_empty(&table->rows)) {
313             continue;
314         }
315
316         changed = true;
317         HMAP_FOR_EACH_SAFE (row, next_row, hmap_node, &table->rows) {
318             struct ovsdb_idl_arc *arc, *next_arc;
319
320             if (!ovsdb_idl_row_is_orphan(row)) {
321                 ovsdb_idl_row_unparse(row);
322             }
323             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
324                 free(arc);
325             }
326             /* No need to do anything with dst_arcs: some node has those arcs
327              * as forward arcs and will destroy them itself. */
328
329             if (!list_is_empty(&row->track_node)) {
330                 list_remove(&row->track_node);
331             }
332
333             ovsdb_idl_row_destroy(row);
334         }
335     }
336
337     ovsdb_idl_track_clear(idl);
338
339     if (changed) {
340         idl->change_seqno++;
341     }
342 }
343
344 /* Processes a batch of messages from the database server on 'idl'.  This may
345  * cause the IDL's contents to change.  The client may check for that with
346  * ovsdb_idl_get_seqno(). */
347 void
348 ovsdb_idl_run(struct ovsdb_idl *idl)
349 {
350     int i;
351
352     ovs_assert(!idl->txn);
353     jsonrpc_session_run(idl->session);
354     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
355         struct jsonrpc_msg *msg;
356         unsigned int seqno;
357
358         seqno = jsonrpc_session_get_seqno(idl->session);
359         if (idl->state_seqno != seqno) {
360             idl->state_seqno = seqno;
361             json_destroy(idl->request_id);
362             idl->request_id = NULL;
363             ovsdb_idl_txn_abort_all(idl);
364
365             ovsdb_idl_send_schema_request(idl);
366             idl->state = IDL_S_SCHEMA_REQUESTED;
367             if (idl->lock_name) {
368                 ovsdb_idl_send_lock_request(idl);
369             }
370         }
371
372         msg = jsonrpc_session_recv(idl->session);
373         if (!msg) {
374             break;
375         }
376
377         if (msg->type == JSONRPC_NOTIFY
378             && !strcmp(msg->method, "update2")
379             && msg->params->type == JSON_ARRAY
380             && msg->params->u.array.n == 2
381             && msg->params->u.array.elems[0]->type == JSON_NULL) {
382             /* Database contents changed. */
383             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
384                                    OVSDB_UPDATE2);
385         } else if (msg->type == JSONRPC_REPLY
386                    && idl->request_id
387                    && json_equal(idl->request_id, msg->id)) {
388             json_destroy(idl->request_id);
389             idl->request_id = NULL;
390
391             switch (idl->state) {
392             case IDL_S_SCHEMA_REQUESTED:
393                 /* Reply to our "get_schema" request. */
394                 idl->schema = json_clone(msg->result);
395                 ovsdb_idl_send_monitor2_request(idl);
396                 idl->state = IDL_S_MONITOR2_REQUESTED;
397                 break;
398
399             case IDL_S_MONITOR_REQUESTED:
400             case IDL_S_MONITOR2_REQUESTED:
401                 /* Reply to our "monitor" or "monitor2" request. */
402                 idl->change_seqno++;
403                 ovsdb_idl_clear(idl);
404                 if (idl->state == IDL_S_MONITOR_REQUESTED) {
405                     idl->state = IDL_S_MONITORING;
406                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE);
407                 } else { /* IDL_S_MONITOR2_REQUESTED. */
408                     idl->state = IDL_S_MONITORING2;
409                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE2);
410                 }
411
412                 /* Schema is not useful after monitor request is accepted
413                  * by the server.  */
414                 json_destroy(idl->schema);
415                 idl->schema = NULL;
416                 break;
417
418             case IDL_S_MONITORING:
419             case IDL_S_MONITORING2:
420             default:
421                 OVS_NOT_REACHED();
422             }
423         } else if (msg->type == JSONRPC_NOTIFY
424             && !strcmp(msg->method, "update")
425             && msg->params->type == JSON_ARRAY
426             && msg->params->u.array.n == 2
427             && msg->params->u.array.elems[0]->type == JSON_NULL) {
428             /* Database contents changed. */
429             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
430                                    OVSDB_UPDATE);
431         } else if (msg->type == JSONRPC_REPLY
432                    && idl->lock_request_id
433                    && json_equal(idl->lock_request_id, msg->id)) {
434             /* Reply to our "lock" request. */
435             ovsdb_idl_parse_lock_reply(idl, msg->result);
436         } else if (msg->type == JSONRPC_NOTIFY
437                    && !strcmp(msg->method, "locked")) {
438             /* We got our lock. */
439             ovsdb_idl_parse_lock_notify(idl, msg->params, true);
440         } else if (msg->type == JSONRPC_NOTIFY
441                    && !strcmp(msg->method, "stolen")) {
442             /* Someone else stole our lock. */
443             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
444         } else if (msg->type == JSONRPC_ERROR
445                    && idl->state == IDL_S_MONITOR2_REQUESTED
446                    && idl->request_id
447                    && json_equal(idl->request_id, msg->id)) {
448             if (msg->error && !strcmp(json_string(msg->error),
449                                       "unknown method")) {
450                 /* Fall back to using "monitor" method.  */
451                 json_destroy(idl->request_id);
452                 idl->request_id = NULL;
453                 ovsdb_idl_send_monitor_request(idl);
454                 idl->state = IDL_S_MONITOR_REQUESTED;
455             }
456         } else if (msg->type == JSONRPC_ERROR
457                    && idl->state == IDL_S_SCHEMA_REQUESTED
458                    && idl->request_id
459                    && json_equal(idl->request_id, msg->id)) {
460                 json_destroy(idl->request_id);
461                 idl->request_id = NULL;
462                 VLOG_ERR("%s: requested schema not found",
463                          jsonrpc_session_get_name(idl->session));
464         } else if ((msg->type == JSONRPC_ERROR
465                     || msg->type == JSONRPC_REPLY)
466                    && ovsdb_idl_txn_process_reply(idl, msg)) {
467             /* ovsdb_idl_txn_process_reply() did everything needful. */
468         } else {
469             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
470              * a transaction before we receive the reply, so keep the log level
471              * low. */
472             VLOG_DBG("%s: received unexpected %s message",
473                      jsonrpc_session_get_name(idl->session),
474                      jsonrpc_msg_type_to_string(msg->type));
475         }
476         jsonrpc_msg_destroy(msg);
477     }
478     ovsdb_idl_row_destroy_postprocess(idl);
479 }
480
481 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
482  * do or when activity occurs on a transaction on 'idl'. */
483 void
484 ovsdb_idl_wait(struct ovsdb_idl *idl)
485 {
486     jsonrpc_session_wait(idl->session);
487     jsonrpc_session_recv_wait(idl->session);
488 }
489
490 /* Returns a "sequence number" that represents the state of 'idl'.  When
491  * ovsdb_idl_run() changes the database, the sequence number changes.  The
492  * initial fetch of the entire contents of the remote database is considered to
493  * be one kind of change.  Successfully acquiring a lock, if one has been
494  * configured with ovsdb_idl_set_lock(), is also considered to be a change.
495  *
496  * As long as the sequence number does not change, the client may continue to
497  * use any data structures it obtains from 'idl'.  But when it changes, the
498  * client must not access any of these data structures again, because they
499  * could have freed or reused for other purposes.
500  *
501  * The sequence number can occasionally change even if the database does not.
502  * This happens if the connection to the database drops and reconnects, which
503  * causes the database contents to be reloaded even if they didn't change.  (It
504  * could also happen if the database server sends out a "change" that reflects
505  * what the IDL already thought was in the database.  The database server is
506  * not supposed to do that, but bugs could in theory cause it to do so.) */
507 unsigned int
508 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
509 {
510     return idl->change_seqno;
511 }
512
513 /* Returns true if 'idl' successfully connected to the remote database and
514  * retrieved its contents (even if the connection subsequently dropped and is
515  * in the process of reconnecting).  If so, then 'idl' contains an atomic
516  * snapshot of the database's contents (but it might be arbitrarily old if the
517  * connection dropped).
518  *
519  * Returns false if 'idl' has never connected or retrieved the database's
520  * contents.  If so, 'idl' is empty. */
521 bool
522 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
523 {
524     return ovsdb_idl_get_seqno(idl) != 0;
525 }
526
527 /* Reconfigures 'idl' so that it would reconnect to the database, if
528  * connection was dropped. */
529 void
530 ovsdb_idl_enable_reconnect(struct ovsdb_idl *idl)
531 {
532     jsonrpc_session_enable_reconnect(idl->session);
533 }
534
535 /* Forces 'idl' to drop its connection to the database and reconnect.  In the
536  * meantime, the contents of 'idl' will not change. */
537 void
538 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
539 {
540     jsonrpc_session_force_reconnect(idl->session);
541 }
542
543 /* Some IDL users should only write to write-only columns.  Furthermore,
544  * writing to a column which is not write-only can cause serious performance
545  * degradations for these users.  This function causes 'idl' to reject writes
546  * to columns which are not marked write only using ovsdb_idl_omit_alert(). */
547 void
548 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
549 {
550     idl->verify_write_only = true;
551 }
552
553 /* Returns true if 'idl' is currently connected or trying to connect. */
554 bool
555 ovsdb_idl_is_alive(const struct ovsdb_idl *idl)
556 {
557     return jsonrpc_session_is_alive(idl->session);
558 }
559
560 /* Returns the last error reported on a connection by 'idl'.  The return value
561  * is 0 only if no connection made by 'idl' has ever encountered an error.  See
562  * jsonrpc_get_status() for return value interpretation. */
563 int
564 ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
565 {
566     return jsonrpc_session_get_last_error(idl->session);
567 }
568 \f
569 static unsigned char *
570 ovsdb_idl_get_mode(struct ovsdb_idl *idl,
571                    const struct ovsdb_idl_column *column)
572 {
573     size_t i;
574
575     ovs_assert(!idl->change_seqno);
576
577     for (i = 0; i < idl->class->n_tables; i++) {
578         const struct ovsdb_idl_table *table = &idl->tables[i];
579         const struct ovsdb_idl_table_class *tc = table->class;
580
581         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
582             return &table->modes[column - tc->columns];
583         }
584     }
585
586     OVS_NOT_REACHED();
587 }
588
589 static void
590 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
591 {
592     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
593         struct ovsdb_idl_table *table;
594
595         table = shash_find_data(&idl->table_by_name,
596                                 base->u.uuid.refTableName);
597         if (table) {
598             table->need_table = true;
599         } else {
600             VLOG_WARN("%s IDL class missing referenced table %s",
601                       idl->class->database, base->u.uuid.refTableName);
602         }
603     }
604 }
605
606 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
607  * ensures that any tables referenced by 'column' will be replicated, even if
608  * no columns in that table are selected for replication (see
609  * ovsdb_idl_add_table() for more information).
610  *
611  * This function is only useful if 'monitor_everything_by_default' was false in
612  * the call to ovsdb_idl_create().  This function should be called between
613  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
614  */
615 void
616 ovsdb_idl_add_column(struct ovsdb_idl *idl,
617                      const struct ovsdb_idl_column *column)
618 {
619     *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
620     add_ref_table(idl, &column->type.key);
621     add_ref_table(idl, &column->type.value);
622 }
623
624 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
625  * no columns are selected for replication. Just the necessary data for table
626  * references will be replicated (the UUID of the rows, for instance), any
627  * columns not selected for replication will remain unreplicated.
628  * This can be useful because it allows 'idl' to keep track of what rows in the
629  * table actually exist, which in turn allows columns that reference the table
630  * to have accurate contents. (The IDL presents the database with references to
631  * rows that do not exist removed.)
632  *
633  * This function is only useful if 'monitor_everything_by_default' was false in
634  * the call to ovsdb_idl_create().  This function should be called between
635  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
636  */
637 void
638 ovsdb_idl_add_table(struct ovsdb_idl *idl,
639                     const struct ovsdb_idl_table_class *tc)
640 {
641     size_t i;
642
643     for (i = 0; i < idl->class->n_tables; i++) {
644         struct ovsdb_idl_table *table = &idl->tables[i];
645
646         if (table->class == tc) {
647             table->need_table = true;
648             return;
649         }
650     }
651
652     OVS_NOT_REACHED();
653 }
654
655 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
656  *
657  * This function should be called between ovsdb_idl_create() and the first call
658  * to ovsdb_idl_run().
659  */
660 void
661 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
662                      const struct ovsdb_idl_column *column)
663 {
664     *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
665 }
666
667 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
668  * OVSDB_IDL_MONITOR for details.
669  *
670  * This function should be called between ovsdb_idl_create() and the first call
671  * to ovsdb_idl_run().
672  */
673 void
674 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
675 {
676     *ovsdb_idl_get_mode(idl, column) = 0;
677 }
678
679 /* Returns the most recent IDL change sequence number that caused a
680  * insert, modify or delete update to the table with class 'table_class'.
681  */
682 unsigned int
683 ovsdb_idl_table_get_seqno(const struct ovsdb_idl *idl,
684                           const struct ovsdb_idl_table_class *table_class)
685 {
686     struct ovsdb_idl_table *table
687         = ovsdb_idl_table_from_class(idl, table_class);
688     unsigned int max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_INSERT];
689
690     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]) {
691         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY];
692     }
693     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_DELETE]) {
694         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_DELETE];
695     }
696     return max_seqno;
697 }
698
699 /* For each row that contains tracked columns, IDL stores the most
700  * recent IDL change sequence numbers associateed with insert, modify
701  * and delete updates to the table.
702  */
703 unsigned int
704 ovsdb_idl_row_get_seqno(const struct ovsdb_idl_row *row,
705                         enum ovsdb_idl_change change)
706 {
707     return row->change_seqno[change];
708 }
709
710 /* Turns on OVSDB_IDL_TRACK for 'column' in 'idl', ensuring that
711  * all rows whose 'column' is modified are traced. Similarly, insert
712  * or delete of rows having 'column' are tracked. Clients are able
713  * to retrive the tracked rows with the ovsdb_idl_track_get_*()
714  * functions.
715  *
716  * This function should be called between ovsdb_idl_create() and
717  * the first call to ovsdb_idl_run(). The column to be tracked
718  * should have OVSDB_IDL_ALERT turned on.
719  */
720 void
721 ovsdb_idl_track_add_column(struct ovsdb_idl *idl,
722                            const struct ovsdb_idl_column *column)
723 {
724     if (!(*ovsdb_idl_get_mode(idl, column) & OVSDB_IDL_ALERT)) {
725         ovsdb_idl_add_column(idl, column);
726     }
727     *ovsdb_idl_get_mode(idl, column) |= OVSDB_IDL_TRACK;
728 }
729
730 void
731 ovsdb_idl_track_add_all(struct ovsdb_idl *idl)
732 {
733     size_t i, j;
734
735     for (i = 0; i < idl->class->n_tables; i++) {
736         const struct ovsdb_idl_table_class *tc = &idl->class->tables[i];
737
738         for (j = 0; j < tc->n_columns; j++) {
739             const struct ovsdb_idl_column *column = &tc->columns[j];
740             ovsdb_idl_track_add_column(idl, column);
741         }
742     }
743 }
744
745 /* Returns true if 'table' has any tracked column. */
746 static bool
747 ovsdb_idl_track_is_set(struct ovsdb_idl_table *table)
748 {
749     size_t i;
750
751     for (i = 0; i < table->class->n_columns; i++) {
752         if (table->modes[i] & OVSDB_IDL_TRACK) {
753             return true;
754         }
755     }
756    return false;
757 }
758
759 /* Returns the first tracked row in table with class 'table_class'
760  * for the specified 'idl'. Returns NULL if there are no tracked rows */
761 const struct ovsdb_idl_row *
762 ovsdb_idl_track_get_first(const struct ovsdb_idl *idl,
763                           const struct ovsdb_idl_table_class *table_class)
764 {
765     struct ovsdb_idl_table *table
766         = ovsdb_idl_table_from_class(idl, table_class);
767
768     if (!list_is_empty(&table->track_list)) {
769         return CONTAINER_OF(list_front(&table->track_list), struct ovsdb_idl_row, track_node);
770     }
771     return NULL;
772 }
773
774 /* Returns the next tracked row in table after the specified 'row'
775  * (in no particular order). Returns NULL if there are no tracked rows */
776 const struct ovsdb_idl_row *
777 ovsdb_idl_track_get_next(const struct ovsdb_idl_row *row)
778 {
779     if (row->track_node.next != &row->table->track_list) {
780         return CONTAINER_OF(row->track_node.next, struct ovsdb_idl_row, track_node);
781     }
782
783     return NULL;
784 }
785
786 /* Returns true if a tracked 'column' in 'row' was updated by IDL, false
787  * otherwise. The tracking data is cleared by ovsdb_idl_track_clear()
788  *
789  * Function returns false if 'column' is not tracked (see
790  * ovsdb_idl_track_add_column()).
791  */
792 bool
793 ovsdb_idl_track_is_updated(const struct ovsdb_idl_row *row,
794                            const struct ovsdb_idl_column *column)
795 {
796     const struct ovsdb_idl_table_class *class;
797     size_t column_idx;
798
799     class = row->table->class;
800     column_idx = column - class->columns;
801
802     if (row->updated && bitmap_is_set(row->updated, column_idx)) {
803         return true;
804     } else {
805         return false;
806     }
807 }
808
809 /* Flushes the tracked rows. Client calls this function after calling
810  * ovsdb_idl_run() and read all tracked rows with the ovsdb_idl_track_get_*()
811  * functions. This is usually done at the end of the client's processing
812  * loop when it is ready to do ovsdb_idl_run() again.
813  */
814 void
815 ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
816 {
817     size_t i;
818
819     for (i = 0; i < idl->class->n_tables; i++) {
820         struct ovsdb_idl_table *table = &idl->tables[i];
821
822         if (!list_is_empty(&table->track_list)) {
823             struct ovsdb_idl_row *row, *next;
824
825             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
826                 if (row->updated) {
827                     free(row->updated);
828                     row->updated = NULL;
829                 }
830                 list_remove(&row->track_node);
831                 list_init(&row->track_node);
832                 if (ovsdb_idl_row_is_orphan(row)) {
833                     ovsdb_idl_row_clear_old(row);
834                     free(row);
835                 }
836             }
837         }
838     }
839 }
840
841 \f
842 static void
843 ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
844 {
845     struct jsonrpc_msg *msg;
846
847     json_destroy(idl->request_id);
848     msg = jsonrpc_create_request(
849         "get_schema",
850         json_array_create_1(json_string_create(idl->class->database)),
851         &idl->request_id);
852     jsonrpc_session_send(idl->session, msg);
853 }
854
855 static void
856 log_error(struct ovsdb_error *error)
857 {
858     char *s = ovsdb_error_to_string(error);
859     VLOG_WARN("error parsing database schema: %s", s);
860     free(s);
861     ovsdb_error_destroy(error);
862 }
863
864 /* Frees 'schema', which is in the format returned by parse_schema(). */
865 static void
866 free_schema(struct shash *schema)
867 {
868     if (schema) {
869         struct shash_node *node, *next;
870
871         SHASH_FOR_EACH_SAFE (node, next, schema) {
872             struct sset *sset = node->data;
873             sset_destroy(sset);
874             free(sset);
875             shash_delete(schema, node);
876         }
877         shash_destroy(schema);
878         free(schema);
879     }
880 }
881
882 /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
883  * 7047, to obtain the names of its rows and columns.  If successful, returns
884  * an shash whose keys are table names and whose values are ssets, where each
885  * sset contains the names of its table's columns.  On failure (due to a parse
886  * error), returns NULL.
887  *
888  * It would also be possible to use the general-purpose OVSDB schema parser in
889  * ovsdb-server, but that's overkill, possibly too strict for the current use
890  * case, and would require restructuring ovsdb-server to separate the schema
891  * code from the rest. */
892 static struct shash *
893 parse_schema(const struct json *schema_json)
894 {
895     struct ovsdb_parser parser;
896     const struct json *tables_json;
897     struct ovsdb_error *error;
898     struct shash_node *node;
899     struct shash *schema;
900
901     ovsdb_parser_init(&parser, schema_json, "database schema");
902     tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
903     error = ovsdb_parser_destroy(&parser);
904     if (error) {
905         log_error(error);
906         return NULL;
907     }
908
909     schema = xmalloc(sizeof *schema);
910     shash_init(schema);
911     SHASH_FOR_EACH (node, json_object(tables_json)) {
912         const char *table_name = node->name;
913         const struct json *json = node->data;
914         const struct json *columns_json;
915
916         ovsdb_parser_init(&parser, json, "table schema for table %s",
917                           table_name);
918         columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
919         error = ovsdb_parser_destroy(&parser);
920         if (error) {
921             log_error(error);
922             free_schema(schema);
923             return NULL;
924         }
925
926         struct sset *columns = xmalloc(sizeof *columns);
927         sset_init(columns);
928
929         struct shash_node *node2;
930         SHASH_FOR_EACH (node2, json_object(columns_json)) {
931             const char *column_name = node2->name;
932             sset_add(columns, column_name);
933         }
934         shash_add(schema, table_name, columns);
935     }
936     return schema;
937 }
938
939 static void
940 ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
941                                  const char *method)
942 {
943     struct shash *schema;
944     struct json *monitor_requests;
945     struct jsonrpc_msg *msg;
946     size_t i;
947
948     schema = parse_schema(idl->schema);
949     monitor_requests = json_object_create();
950     for (i = 0; i < idl->class->n_tables; i++) {
951         const struct ovsdb_idl_table *table = &idl->tables[i];
952         const struct ovsdb_idl_table_class *tc = table->class;
953         struct json *monitor_request, *columns;
954         const struct sset *table_schema;
955         size_t j;
956
957         table_schema = (schema
958                         ? shash_find_data(schema, table->class->name)
959                         : NULL);
960
961         columns = table->need_table ? json_array_create_empty() : NULL;
962         for (j = 0; j < tc->n_columns; j++) {
963             const struct ovsdb_idl_column *column = &tc->columns[j];
964             if (table->modes[j] & OVSDB_IDL_MONITOR) {
965                 if (table_schema
966                     && !sset_contains(table_schema, column->name)) {
967                     VLOG_WARN("%s table in %s database lacks %s column "
968                               "(database needs upgrade?)",
969                               table->class->name, idl->class->database,
970                               column->name);
971                     continue;
972                 }
973                 if (!columns) {
974                     columns = json_array_create_empty();
975                 }
976                 json_array_add(columns, json_string_create(column->name));
977             }
978         }
979
980         if (columns) {
981             if (schema && !table_schema) {
982                 VLOG_WARN("%s database lacks %s table "
983                           "(database needs upgrade?)",
984                           idl->class->database, table->class->name);
985                 json_destroy(columns);
986                 continue;
987             }
988
989             monitor_request = json_object_create();
990             json_object_put(monitor_request, "columns", columns);
991             json_object_put(monitor_requests, tc->name, monitor_request);
992         }
993     }
994     free_schema(schema);
995
996     json_destroy(idl->request_id);
997     msg = jsonrpc_create_request(
998         method,
999         json_array_create_3(json_string_create(idl->class->database),
1000                             json_null_create(), monitor_requests),
1001         &idl->request_id);
1002     jsonrpc_session_send(idl->session, msg);
1003 }
1004
1005 static void
1006 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
1007 {
1008     ovsdb_idl_send_monitor_request__(idl, "monitor");
1009 }
1010
1011 static void
1012 log_parse_update_error(struct ovsdb_error *error)
1013 {
1014         if (!VLOG_DROP_WARN(&syntax_rl)) {
1015             char *s = ovsdb_error_to_string(error);
1016             VLOG_WARN_RL(&syntax_rl, "%s", s);
1017             free(s);
1018         }
1019         ovsdb_error_destroy(error);
1020 }
1021
1022 static void
1023 ovsdb_idl_send_monitor2_request(struct ovsdb_idl *idl)
1024 {
1025     ovsdb_idl_send_monitor_request__(idl, "monitor2");
1026 }
1027
1028 static void
1029 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates,
1030                        enum ovsdb_update_version version)
1031 {
1032     struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates,
1033                                                          version);
1034     if (error) {
1035         log_parse_update_error(error);
1036     }
1037 }
1038
1039 static struct ovsdb_error *
1040 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
1041                          const struct json *table_updates,
1042                          enum ovsdb_update_version version)
1043 {
1044     const struct shash_node *tables_node;
1045     const char *table_updates_name = table_updates_names[version];
1046     const char *table_update_name = table_update_names[version];
1047     const char *row_update_name = row_update_names[version];
1048
1049     if (table_updates->type != JSON_OBJECT) {
1050         return ovsdb_syntax_error(table_updates, NULL,
1051                                   "<%s> is not an object",
1052                                   table_updates_name);
1053     }
1054
1055     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
1056         const struct json *table_update = tables_node->data;
1057         const struct shash_node *table_node;
1058         struct ovsdb_idl_table *table;
1059
1060         table = shash_find_data(&idl->table_by_name, tables_node->name);
1061         if (!table) {
1062             return ovsdb_syntax_error(
1063                 table_updates, NULL,
1064                 "<%s> includes unknown table \"%s\"",
1065                 table_updates_name,
1066                 tables_node->name);
1067         }
1068
1069         if (table_update->type != JSON_OBJECT) {
1070             return ovsdb_syntax_error(table_update, NULL,
1071                                       "<%s> for table \"%s\" is "
1072                                       "not an object",
1073                                       table_update_name,
1074                                       table->class->name);
1075         }
1076         SHASH_FOR_EACH (table_node, json_object(table_update)) {
1077             const struct json *row_update = table_node->data;
1078             const struct json *old_json, *new_json;
1079             struct uuid uuid;
1080
1081             if (!uuid_from_string(&uuid, table_node->name)) {
1082                 return ovsdb_syntax_error(table_update, NULL,
1083                                           "<%s> for table \"%s\" "
1084                                           "contains bad UUID "
1085                                           "\"%s\" as member name",
1086                                           table_update_name,
1087                                           table->class->name,
1088                                           table_node->name);
1089             }
1090             if (row_update->type != JSON_OBJECT) {
1091                 return ovsdb_syntax_error(row_update, NULL,
1092                                           "<%s> for table \"%s\" "
1093                                           "contains <%s> for %s that "
1094                                           "is not an object",
1095                                           table_update_name,
1096                                           table->class->name,
1097                                           row_update_name,
1098                                           table_node->name);
1099             }
1100
1101             switch(version) {
1102             case OVSDB_UPDATE:
1103                 old_json = shash_find_data(json_object(row_update), "old");
1104                 new_json = shash_find_data(json_object(row_update), "new");
1105                 if (old_json && old_json->type != JSON_OBJECT) {
1106                     return ovsdb_syntax_error(old_json, NULL,
1107                                               "\"old\" <row> is not object");
1108                 } else if (new_json && new_json->type != JSON_OBJECT) {
1109                     return ovsdb_syntax_error(new_json, NULL,
1110                                               "\"new\" <row> is not object");
1111                 } else if ((old_json != NULL) + (new_json != NULL)
1112                            != shash_count(json_object(row_update))) {
1113                     return ovsdb_syntax_error(row_update, NULL,
1114                                               "<row-update> contains "
1115                                               "unexpected member");
1116                 } else if (!old_json && !new_json) {
1117                     return ovsdb_syntax_error(row_update, NULL,
1118                                               "<row-update> missing \"old\" "
1119                                               "and \"new\" members");
1120                 }
1121
1122                 if (ovsdb_idl_process_update(table, &uuid, old_json,
1123                                              new_json)) {
1124                     idl->change_seqno++;
1125                 }
1126                 break;
1127
1128             case OVSDB_UPDATE2: {
1129                 const char *ops[] = {"modify", "insert", "delete", "initial"};
1130                 const char *operation;
1131                 const struct json *row;
1132                 int i;
1133
1134                 for (i = 0; i < ARRAY_SIZE(ops); i++) {
1135                     operation = ops[i];
1136                     row = shash_find_data(json_object(row_update), operation);
1137
1138                     if (row)  {
1139                         if (ovsdb_idl_process_update2(table, &uuid, operation,
1140                                                       row)) {
1141                             idl->change_seqno++;
1142                         }
1143                         break;
1144                     }
1145                 }
1146
1147                 /* row_update2 should contain one of the objects */
1148                 if (i == ARRAY_SIZE(ops)) {
1149                     return ovsdb_syntax_error(row_update, NULL,
1150                                               "<row_update2> includes unknown "
1151                                               "object");
1152                 }
1153                 break;
1154             }
1155
1156             default:
1157                 OVS_NOT_REACHED();
1158             }
1159         }
1160     }
1161
1162     return NULL;
1163 }
1164
1165 static struct ovsdb_idl_row *
1166 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
1167 {
1168     struct ovsdb_idl_row *row;
1169
1170     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &table->rows) {
1171         if (uuid_equals(&row->uuid, uuid)) {
1172             return row;
1173         }
1174     }
1175     return NULL;
1176 }
1177
1178 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1179  * otherwise. */
1180 static bool
1181 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
1182                          const struct uuid *uuid, const struct json *old,
1183                          const struct json *new)
1184 {
1185     struct ovsdb_idl_row *row;
1186
1187     row = ovsdb_idl_get_row(table, uuid);
1188     if (!new) {
1189         /* Delete row. */
1190         if (row && !ovsdb_idl_row_is_orphan(row)) {
1191             /* XXX perhaps we should check the 'old' values? */
1192             ovsdb_idl_delete_row(row);
1193         } else {
1194             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1195                          "from table %s",
1196                          UUID_ARGS(uuid), table->class->name);
1197             return false;
1198         }
1199     } else if (!old) {
1200         /* Insert row. */
1201         if (!row) {
1202             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1203         } else if (ovsdb_idl_row_is_orphan(row)) {
1204             ovsdb_idl_insert_row(row, new);
1205         } else {
1206             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1207                          "table %s", UUID_ARGS(uuid), table->class->name);
1208             return ovsdb_idl_modify_row(row, new);
1209         }
1210     } else {
1211         /* Modify row. */
1212         if (row) {
1213             /* XXX perhaps we should check the 'old' values? */
1214             if (!ovsdb_idl_row_is_orphan(row)) {
1215                 return ovsdb_idl_modify_row(row, new);
1216             } else {
1217                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1218                              "referenced row "UUID_FMT" in table %s",
1219                              UUID_ARGS(uuid), table->class->name);
1220                 ovsdb_idl_insert_row(row, new);
1221             }
1222         } else {
1223             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1224                          "in table %s", UUID_ARGS(uuid), table->class->name);
1225             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1226         }
1227     }
1228
1229     return true;
1230 }
1231
1232 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1233  * otherwise. */
1234 static bool
1235 ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
1236                           const struct uuid *uuid,
1237                           const char *operation,
1238                           const struct json *json_row)
1239 {
1240     struct ovsdb_idl_row *row;
1241
1242     row = ovsdb_idl_get_row(table, uuid);
1243     if (!strcmp(operation, "delete")) {
1244         /* Delete row. */
1245         if (row && !ovsdb_idl_row_is_orphan(row)) {
1246             ovsdb_idl_delete_row(row);
1247         } else {
1248             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1249                          "from table %s",
1250                          UUID_ARGS(uuid), table->class->name);
1251             return false;
1252         }
1253     } else if (!strcmp(operation, "insert") || !strcmp(operation, "initial")) {
1254         /* Insert row. */
1255         if (!row) {
1256             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), json_row);
1257         } else if (ovsdb_idl_row_is_orphan(row)) {
1258             ovsdb_idl_insert_row(row, json_row);
1259         } else {
1260             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1261                          "table %s", UUID_ARGS(uuid), table->class->name);
1262             ovsdb_idl_delete_row(row);
1263             ovsdb_idl_insert_row(row, json_row);
1264         }
1265     } else if (!strcmp(operation, "modify")) {
1266         /* Modify row. */
1267         if (row) {
1268             if (!ovsdb_idl_row_is_orphan(row)) {
1269                 return ovsdb_idl_modify_row_by_diff(row, json_row);
1270             } else {
1271                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1272                              "referenced row "UUID_FMT" in table %s",
1273                              UUID_ARGS(uuid), table->class->name);
1274                 return false;
1275             }
1276         } else {
1277             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1278                          "in table %s", UUID_ARGS(uuid), table->class->name);
1279             return false;
1280         }
1281     } else {
1282             VLOG_WARN_RL(&semantic_rl, "unknown operation %s to "
1283                          "table %s", operation, table->class->name);
1284             return false;
1285     }
1286
1287     return true;
1288 }
1289
1290 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1291  * otherwise.
1292  *
1293  * Change 'row' either with the content of 'row_json' or by apply 'diff'.
1294  * Caller needs to provide either valid 'row_json' or 'diff', but not
1295  * both.  */
1296 static bool
1297 ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
1298                        const struct json *diff_json,
1299                        enum ovsdb_idl_change change)
1300 {
1301     struct ovsdb_idl_table *table = row->table;
1302     const struct ovsdb_idl_table_class *class = table->class;
1303     struct shash_node *node;
1304     bool changed = false;
1305     bool apply_diff = diff_json != NULL;
1306     const struct json *json = apply_diff ? diff_json : row_json;
1307
1308     SHASH_FOR_EACH (node, json_object(json)) {
1309         const char *column_name = node->name;
1310         const struct ovsdb_idl_column *column;
1311         struct ovsdb_datum datum;
1312         struct ovsdb_error *error;
1313         unsigned int column_idx;
1314         struct ovsdb_datum *old;
1315
1316         column = shash_find_data(&table->columns, column_name);
1317         if (!column) {
1318             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
1319                          column_name, UUID_ARGS(&row->uuid));
1320             continue;
1321         }
1322
1323         column_idx = column - table->class->columns;
1324         old = &row->old[column_idx];
1325
1326         error = NULL;
1327         if (apply_diff) {
1328             struct ovsdb_datum diff;
1329
1330             ovs_assert(!row_json);
1331             error = ovsdb_transient_datum_from_json(&diff, &column->type,
1332                                                     node->data);
1333             if (!error) {
1334                 error = ovsdb_datum_apply_diff(&datum, old, &diff,
1335                                                &column->type);
1336                 ovsdb_datum_destroy(&diff, &column->type);
1337             }
1338         } else {
1339             ovs_assert(!diff_json);
1340             error = ovsdb_datum_from_json(&datum, &column->type, node->data,
1341                                           NULL);
1342         }
1343
1344         if (!error) {
1345             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
1346                 ovsdb_datum_swap(old, &datum);
1347                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
1348                     changed = true;
1349                     row->change_seqno[change]
1350                         = row->table->change_seqno[change]
1351                         = row->table->idl->change_seqno + 1;
1352                     if (table->modes[column_idx] & OVSDB_IDL_TRACK) {
1353                         if (list_is_empty(&row->track_node)) {
1354                             list_push_front(&row->table->track_list,
1355                                             &row->track_node);
1356                         }
1357                         if (!row->updated) {
1358                             row->updated = bitmap_allocate(class->n_columns);
1359                         }
1360                         bitmap_set1(row->updated, column_idx);
1361                     }
1362                 }
1363             } else {
1364                 /* Didn't really change but the OVSDB monitor protocol always
1365                  * includes every value in a row. */
1366             }
1367
1368             ovsdb_datum_destroy(&datum, &column->type);
1369         } else {
1370             char *s = ovsdb_error_to_string(error);
1371             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
1372                          " in table %s: %s", column_name,
1373                          UUID_ARGS(&row->uuid), table->class->name, s);
1374             free(s);
1375             ovsdb_error_destroy(error);
1376         }
1377     }
1378     return changed;
1379 }
1380
1381 static bool
1382 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json,
1383                      enum ovsdb_idl_change change)
1384 {
1385     return ovsdb_idl_row_change__(row, row_json, NULL, change);
1386 }
1387
1388 static bool
1389 ovsdb_idl_row_apply_diff(struct ovsdb_idl_row *row,
1390                          const struct json *diff_json,
1391                          enum ovsdb_idl_change change)
1392 {
1393     return ovsdb_idl_row_change__(row, NULL, diff_json, change);
1394 }
1395
1396 /* When a row A refers to row B through a column with a "refTable" constraint,
1397  * but row B does not exist, row B is called an "orphan row".  Orphan rows
1398  * should not persist, because the database enforces referential integrity, but
1399  * they can appear transiently as changes from the database are received (the
1400  * database doesn't try to topologically sort them and circular references mean
1401  * it isn't always possible anyhow).
1402  *
1403  * This function returns true if 'row' is an orphan row, otherwise false.
1404  */
1405 static bool
1406 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
1407 {
1408     return !row->old && !row->new;
1409 }
1410
1411 /* Returns true if 'row' is conceptually part of the database as modified by
1412  * the current transaction (if any), false otherwise.
1413  *
1414  * This function will return true if 'row' is not an orphan (see the comment on
1415  * ovsdb_idl_row_is_orphan()) and:
1416  *
1417  *   - 'row' exists in the database and has not been deleted within the
1418  *     current transaction (if any).
1419  *
1420  *   - 'row' was inserted within the current transaction and has not been
1421  *     deleted.  (In the latter case you should not have passed 'row' in at
1422  *     all, because ovsdb_idl_txn_delete() freed it.)
1423  *
1424  * This function will return false if 'row' is an orphan or if 'row' was
1425  * deleted within the current transaction.
1426  */
1427 static bool
1428 ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
1429 {
1430     return row->new != NULL;
1431 }
1432
1433 static void
1434 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
1435 {
1436     const struct ovsdb_idl_table_class *class = row->table->class;
1437     size_t i;
1438
1439     for (i = 0; i < class->n_columns; i++) {
1440         const struct ovsdb_idl_column *c = &class->columns[i];
1441         (c->parse)(row, &row->old[i]);
1442     }
1443 }
1444
1445 static void
1446 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
1447 {
1448     const struct ovsdb_idl_table_class *class = row->table->class;
1449     size_t i;
1450
1451     for (i = 0; i < class->n_columns; i++) {
1452         const struct ovsdb_idl_column *c = &class->columns[i];
1453         (c->unparse)(row);
1454     }
1455 }
1456
1457 static void
1458 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
1459 {
1460     ovs_assert(row->old == row->new);
1461     if (!ovsdb_idl_row_is_orphan(row)) {
1462         const struct ovsdb_idl_table_class *class = row->table->class;
1463         size_t i;
1464
1465         for (i = 0; i < class->n_columns; i++) {
1466             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
1467         }
1468         free(row->old);
1469         row->old = row->new = NULL;
1470     }
1471 }
1472
1473 static void
1474 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
1475 {
1476     if (row->old != row->new) {
1477         if (row->new) {
1478             const struct ovsdb_idl_table_class *class = row->table->class;
1479             size_t i;
1480
1481             if (row->written) {
1482                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
1483                     ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
1484                 }
1485             }
1486             free(row->new);
1487             free(row->written);
1488             row->written = NULL;
1489         }
1490         row->new = row->old;
1491     }
1492 }
1493
1494 static void
1495 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
1496 {
1497     struct ovsdb_idl_arc *arc, *next;
1498
1499     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
1500      * that this causes to be unreferenced, if tracking is not enabled.
1501      * If tracking is enabled, orphaned nodes are removed from hmap but not
1502      * freed.
1503      */
1504     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
1505         list_remove(&arc->dst_node);
1506         if (destroy_dsts
1507             && ovsdb_idl_row_is_orphan(arc->dst)
1508             && list_is_empty(&arc->dst->dst_arcs)) {
1509             ovsdb_idl_row_destroy(arc->dst);
1510         }
1511         free(arc);
1512     }
1513     list_init(&row->src_arcs);
1514 }
1515
1516 /* Force nodes that reference 'row' to reparse. */
1517 static void
1518 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
1519 {
1520     struct ovsdb_idl_arc *arc, *next;
1521
1522     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
1523      * 'arc', so we need to use the "safe" variant of list traversal.  However,
1524      * calling an ovsdb_idl_column's 'parse' function will add an arc
1525      * equivalent to 'arc' to row->arcs.  That could be a problem for
1526      * traversal, but it adds it at the beginning of the list to prevent us
1527      * from stumbling upon it again.
1528      *
1529      * (If duplicate arcs were possible then we would need to make sure that
1530      * 'next' didn't also point into 'arc''s destination, but we forbid
1531      * duplicate arcs.) */
1532     LIST_FOR_EACH_SAFE (arc, next, dst_node, &row->dst_arcs) {
1533         struct ovsdb_idl_row *ref = arc->src;
1534
1535         ovsdb_idl_row_unparse(ref);
1536         ovsdb_idl_row_clear_arcs(ref, false);
1537         ovsdb_idl_row_parse(ref);
1538     }
1539 }
1540
1541 static struct ovsdb_idl_row *
1542 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
1543 {
1544     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
1545     class->row_init(row);
1546     list_init(&row->src_arcs);
1547     list_init(&row->dst_arcs);
1548     hmap_node_nullify(&row->txn_node);
1549     list_init(&row->track_node);
1550     return row;
1551 }
1552
1553 static struct ovsdb_idl_row *
1554 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
1555 {
1556     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
1557     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
1558     row->uuid = *uuid;
1559     row->table = table;
1560     return row;
1561 }
1562
1563 static void
1564 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
1565 {
1566     if (row) {
1567         ovsdb_idl_row_clear_old(row);
1568         hmap_remove(&row->table->rows, &row->hmap_node);
1569         if (ovsdb_idl_track_is_set(row->table)) {
1570             row->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1571                 = row->table->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1572                 = row->table->idl->change_seqno + 1;
1573         }
1574         if (list_is_empty(&row->track_node)) {
1575             list_push_front(&row->table->track_list, &row->track_node);
1576         }
1577     }
1578 }
1579
1580 static void
1581 ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
1582 {
1583     size_t i;
1584
1585     for (i = 0; i < idl->class->n_tables; i++) {
1586         struct ovsdb_idl_table *table = &idl->tables[i];
1587
1588         if (!list_is_empty(&table->track_list)) {
1589             struct ovsdb_idl_row *row, *next;
1590
1591             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
1592                 if (!ovsdb_idl_track_is_set(row->table)) {
1593                     list_remove(&row->track_node);
1594                     free(row);
1595                 }
1596             }
1597         }
1598     }
1599 }
1600
1601 static void
1602 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
1603 {
1604     const struct ovsdb_idl_table_class *class = row->table->class;
1605     size_t i;
1606
1607     ovs_assert(!row->old && !row->new);
1608     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
1609     for (i = 0; i < class->n_columns; i++) {
1610         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
1611     }
1612     ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_INSERT);
1613     ovsdb_idl_row_parse(row);
1614
1615     ovsdb_idl_row_reparse_backrefs(row);
1616 }
1617
1618 static void
1619 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
1620 {
1621     ovsdb_idl_row_unparse(row);
1622     ovsdb_idl_row_clear_arcs(row, true);
1623     ovsdb_idl_row_clear_old(row);
1624     if (list_is_empty(&row->dst_arcs)) {
1625         ovsdb_idl_row_destroy(row);
1626     } else {
1627         ovsdb_idl_row_reparse_backrefs(row);
1628     }
1629 }
1630
1631 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1632  * otherwise. */
1633 static bool
1634 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
1635 {
1636     bool changed;
1637
1638     ovsdb_idl_row_unparse(row);
1639     ovsdb_idl_row_clear_arcs(row, true);
1640     changed = ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_MODIFY);
1641     ovsdb_idl_row_parse(row);
1642
1643     return changed;
1644 }
1645
1646 static bool
1647 ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *row,
1648                              const struct json *diff_json)
1649 {
1650     bool changed;
1651
1652     ovsdb_idl_row_unparse(row);
1653     ovsdb_idl_row_clear_arcs(row, true);
1654     changed = ovsdb_idl_row_apply_diff(row, diff_json,
1655                                        OVSDB_IDL_CHANGE_MODIFY);
1656     ovsdb_idl_row_parse(row);
1657
1658     return changed;
1659 }
1660
1661 static bool
1662 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
1663 {
1664     const struct ovsdb_idl_arc *arc;
1665
1666     /* No self-arcs. */
1667     if (src == dst) {
1668         return false;
1669     }
1670
1671     /* No duplicate arcs.
1672      *
1673      * We only need to test whether the first arc in dst->dst_arcs originates
1674      * at 'src', since we add all of the arcs from a given source in a clump
1675      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
1676      * added at the front of the dst_arcs list. */
1677     if (list_is_empty(&dst->dst_arcs)) {
1678         return true;
1679     }
1680     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
1681     return arc->src != src;
1682 }
1683
1684 static struct ovsdb_idl_table *
1685 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
1686                            const struct ovsdb_idl_table_class *table_class)
1687 {
1688     return &idl->tables[table_class - idl->class->tables];
1689 }
1690
1691 /* Called by ovsdb-idlc generated code. */
1692 struct ovsdb_idl_row *
1693 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
1694                       struct ovsdb_idl_table_class *dst_table_class,
1695                       const struct uuid *dst_uuid)
1696 {
1697     struct ovsdb_idl *idl = src->table->idl;
1698     struct ovsdb_idl_table *dst_table;
1699     struct ovsdb_idl_arc *arc;
1700     struct ovsdb_idl_row *dst;
1701
1702     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
1703     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
1704     if (idl->txn) {
1705         /* We're being called from ovsdb_idl_txn_write().  We must not update
1706          * any arcs, because the transaction will be backed out at commit or
1707          * abort time and we don't want our graph screwed up.
1708          *
1709          * Just return the destination row, if there is one and it has not been
1710          * deleted. */
1711         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
1712             return dst;
1713         }
1714         return NULL;
1715     } else {
1716         /* We're being called from some other context.  Update the graph. */
1717         if (!dst) {
1718             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
1719         }
1720
1721         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
1722         if (may_add_arc(src, dst)) {
1723             /* The arc *must* be added at the front of the dst_arcs list.  See
1724              * ovsdb_idl_row_reparse_backrefs() for details. */
1725             arc = xmalloc(sizeof *arc);
1726             list_push_front(&src->src_arcs, &arc->src_node);
1727             list_push_front(&dst->dst_arcs, &arc->dst_node);
1728             arc->src = src;
1729             arc->dst = dst;
1730         }
1731
1732         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
1733     }
1734 }
1735
1736 /* Searches 'tc''s table in 'idl' for a row with UUID 'uuid'.  Returns a
1737  * pointer to the row if there is one, otherwise a null pointer.  */
1738 const struct ovsdb_idl_row *
1739 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
1740                            const struct ovsdb_idl_table_class *tc,
1741                            const struct uuid *uuid)
1742 {
1743     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
1744 }
1745
1746 static struct ovsdb_idl_row *
1747 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
1748 {
1749     for (; node; node = hmap_next(&table->rows, node)) {
1750         struct ovsdb_idl_row *row;
1751
1752         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
1753         if (ovsdb_idl_row_exists(row)) {
1754             return row;
1755         }
1756     }
1757     return NULL;
1758 }
1759
1760 /* Returns a row in 'table_class''s table in 'idl', or a null pointer if that
1761  * table is empty.
1762  *
1763  * Database tables are internally maintained as hash tables, so adding or
1764  * removing rows while traversing the same table can cause some rows to be
1765  * visited twice or not at apply. */
1766 const struct ovsdb_idl_row *
1767 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
1768                     const struct ovsdb_idl_table_class *table_class)
1769 {
1770     struct ovsdb_idl_table *table
1771         = ovsdb_idl_table_from_class(idl, table_class);
1772     return next_real_row(table, hmap_first(&table->rows));
1773 }
1774
1775 /* Returns a row following 'row' within its table, or a null pointer if 'row'
1776  * is the last row in its table. */
1777 const struct ovsdb_idl_row *
1778 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
1779 {
1780     struct ovsdb_idl_table *table = row->table;
1781
1782     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
1783 }
1784
1785 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
1786  * transaction has changed 'column''s value, the modified value is returned.
1787  *
1788  * The caller must not modify or free the returned value.
1789  *
1790  * Various kinds of changes can invalidate the returned value: writing to the
1791  * same 'column' in 'row' (e.g. with ovsdb_idl_txn_write()), deleting 'row'
1792  * (e.g. with ovsdb_idl_txn_delete()), or completing an ongoing transaction
1793  * (e.g. with ovsdb_idl_txn_commit() or ovsdb_idl_txn_abort()).  If the
1794  * returned value is needed for a long time, it is best to make a copy of it
1795  * with ovsdb_datum_clone(). */
1796 const struct ovsdb_datum *
1797 ovsdb_idl_read(const struct ovsdb_idl_row *row,
1798                const struct ovsdb_idl_column *column)
1799 {
1800     const struct ovsdb_idl_table_class *class;
1801     size_t column_idx;
1802
1803     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
1804
1805     class = row->table->class;
1806     column_idx = column - class->columns;
1807
1808     ovs_assert(row->new != NULL);
1809     ovs_assert(column_idx < class->n_columns);
1810
1811     if (row->written && bitmap_is_set(row->written, column_idx)) {
1812         return &row->new[column_idx];
1813     } else if (row->old) {
1814         return &row->old[column_idx];
1815     } else {
1816         return ovsdb_datum_default(&column->type);
1817     }
1818 }
1819
1820 /* Same as ovsdb_idl_read(), except that it also asserts that 'column' has key
1821  * type 'key_type' and value type 'value_type'.  (Scalar and set types will
1822  * have a value type of OVSDB_TYPE_VOID.)
1823  *
1824  * This is useful in code that "knows" that a particular column has a given
1825  * type, so that it will abort if someone changes the column's type without
1826  * updating the code that uses it. */
1827 const struct ovsdb_datum *
1828 ovsdb_idl_get(const struct ovsdb_idl_row *row,
1829               const struct ovsdb_idl_column *column,
1830               enum ovsdb_atomic_type key_type OVS_UNUSED,
1831               enum ovsdb_atomic_type value_type OVS_UNUSED)
1832 {
1833     ovs_assert(column->type.key.type == key_type);
1834     ovs_assert(column->type.value.type == value_type);
1835
1836     return ovsdb_idl_read(row, column);
1837 }
1838
1839 /* Returns true if the field represented by 'column' in 'row' may be modified,
1840  * false if it is immutable.
1841  *
1842  * Normally, whether a field is mutable is controlled by its column's schema.
1843  * However, an immutable column can be set to any initial value at the time of
1844  * insertion, so if 'row' is a new row (one that is being added as part of the
1845  * current transaction, supposing that a transaction is in progress) then even
1846  * its "immutable" fields are actually mutable. */
1847 bool
1848 ovsdb_idl_is_mutable(const struct ovsdb_idl_row *row,
1849                      const struct ovsdb_idl_column *column)
1850 {
1851     return column->mutable || (row->new && !row->old);
1852 }
1853
1854 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
1855  * to all-zero-bits by some other entity.  If 'row' was set up some other way
1856  * then the return value is indeterminate. */
1857 bool
1858 ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
1859 {
1860     return row->table == NULL;
1861 }
1862 \f
1863 /* Transactions. */
1864
1865 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1866                                    enum ovsdb_idl_txn_status);
1867
1868 /* Returns a string representation of 'status'.  The caller must not modify or
1869  * free the returned string.
1870  *
1871  * The return value is probably useful only for debug log messages and unit
1872  * tests. */
1873 const char *
1874 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
1875 {
1876     switch (status) {
1877     case TXN_UNCOMMITTED:
1878         return "uncommitted";
1879     case TXN_UNCHANGED:
1880         return "unchanged";
1881     case TXN_INCOMPLETE:
1882         return "incomplete";
1883     case TXN_ABORTED:
1884         return "aborted";
1885     case TXN_SUCCESS:
1886         return "success";
1887     case TXN_TRY_AGAIN:
1888         return "try again";
1889     case TXN_NOT_LOCKED:
1890         return "not locked";
1891     case TXN_ERROR:
1892         return "error";
1893     }
1894     return "<unknown>";
1895 }
1896
1897 /* Starts a new transaction on 'idl'.  A given ovsdb_idl may only have a single
1898  * active transaction at a time.  See the large comment in ovsdb-idl.h for
1899  * general information on transactions. */
1900 struct ovsdb_idl_txn *
1901 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
1902 {
1903     struct ovsdb_idl_txn *txn;
1904
1905     ovs_assert(!idl->txn);
1906     idl->txn = txn = xmalloc(sizeof *txn);
1907     txn->request_id = NULL;
1908     txn->idl = idl;
1909     hmap_init(&txn->txn_rows);
1910     txn->status = TXN_UNCOMMITTED;
1911     txn->error = NULL;
1912     txn->dry_run = false;
1913     ds_init(&txn->comment);
1914
1915     txn->inc_table = NULL;
1916     txn->inc_column = NULL;
1917
1918     hmap_init(&txn->inserted_rows);
1919
1920     return txn;
1921 }
1922
1923 /* Appends 's', which is treated as a printf()-type format string, to the
1924  * comments that will be passed to the OVSDB server when 'txn' is committed.
1925  * (The comment will be committed to the OVSDB log, which "ovsdb-tool
1926  * show-log" can print in a relatively human-readable form.) */
1927 void
1928 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s, ...)
1929 {
1930     va_list args;
1931
1932     if (txn->comment.length) {
1933         ds_put_char(&txn->comment, '\n');
1934     }
1935
1936     va_start(args, s);
1937     ds_put_format_valist(&txn->comment, s, args);
1938     va_end(args);
1939 }
1940
1941 /* Marks 'txn' as a transaction that will not actually modify the database.  In
1942  * almost every way, the transaction is treated like other transactions.  It
1943  * must be committed or aborted like other transactions, it will be sent to the
1944  * database server like other transactions, and so on.  The only difference is
1945  * that the operations sent to the database server will include, as the last
1946  * step, an "abort" operation, so that any changes made by the transaction will
1947  * not actually take effect. */
1948 void
1949 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
1950 {
1951     txn->dry_run = true;
1952 }
1953
1954 /* Causes 'txn', when committed, to increment the value of 'column' within
1955  * 'row' by 1.  'column' must have an integer type.  After 'txn' commits
1956  * successfully, the client may retrieve the final (incremented) value of
1957  * 'column' with ovsdb_idl_txn_get_increment_new_value().
1958  *
1959  * The client could accomplish something similar with ovsdb_idl_read(),
1960  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
1961  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
1962  * will never (by itself) fail because of a verify error.
1963  *
1964  * The intended use is for incrementing the "next_cfg" column in the
1965  * Open_vSwitch table. */
1966 void
1967 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
1968                         const struct ovsdb_idl_row *row,
1969                         const struct ovsdb_idl_column *column)
1970 {
1971     ovs_assert(!txn->inc_table);
1972     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
1973     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
1974
1975     txn->inc_table = row->table->class->name;
1976     txn->inc_column = column->name;
1977     txn->inc_row = row->uuid;
1978 }
1979
1980 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
1981  * has been called for 'txn' but the commit is still incomplete (that is, the
1982  * last call returned TXN_INCOMPLETE) then the transaction may or may not still
1983  * end up committing at the database server, but the client will not be able to
1984  * get any further status information back. */
1985 void
1986 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
1987 {
1988     struct ovsdb_idl_txn_insert *insert, *next;
1989
1990     json_destroy(txn->request_id);
1991     if (txn->status == TXN_INCOMPLETE) {
1992         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1993     }
1994     ovsdb_idl_txn_abort(txn);
1995     ds_destroy(&txn->comment);
1996     free(txn->error);
1997     HMAP_FOR_EACH_SAFE (insert, next, hmap_node, &txn->inserted_rows) {
1998         free(insert);
1999     }
2000     hmap_destroy(&txn->inserted_rows);
2001     free(txn);
2002 }
2003
2004 /* Causes poll_block() to wake up if 'txn' has completed committing. */
2005 void
2006 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
2007 {
2008     if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
2009         poll_immediate_wake();
2010     }
2011 }
2012
2013 static struct json *
2014 where_uuid_equals(const struct uuid *uuid)
2015 {
2016     return
2017         json_array_create_1(
2018             json_array_create_3(
2019                 json_string_create("_uuid"),
2020                 json_string_create("=="),
2021                 json_array_create_2(
2022                     json_string_create("uuid"),
2023                     json_string_create_nocopy(
2024                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
2025 }
2026
2027 static char *
2028 uuid_name_from_uuid(const struct uuid *uuid)
2029 {
2030     char *name;
2031     char *p;
2032
2033     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
2034     for (p = name; *p != '\0'; p++) {
2035         if (*p == '-') {
2036             *p = '_';
2037         }
2038     }
2039
2040     return name;
2041 }
2042
2043 static const struct ovsdb_idl_row *
2044 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
2045 {
2046     const struct ovsdb_idl_row *row;
2047
2048     HMAP_FOR_EACH_WITH_HASH (row, txn_node, uuid_hash(uuid), &txn->txn_rows) {
2049         if (uuid_equals(&row->uuid, uuid)) {
2050             return row;
2051         }
2052     }
2053     return NULL;
2054 }
2055
2056 /* XXX there must be a cleaner way to do this */
2057 static struct json *
2058 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
2059 {
2060     if (json->type == JSON_ARRAY) {
2061         struct uuid uuid;
2062         size_t i;
2063
2064         if (json->u.array.n == 2
2065             && json->u.array.elems[0]->type == JSON_STRING
2066             && json->u.array.elems[1]->type == JSON_STRING
2067             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
2068             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
2069             const struct ovsdb_idl_row *row;
2070
2071             row = ovsdb_idl_txn_get_row(txn, &uuid);
2072             if (row && !row->old && row->new) {
2073                 json_destroy(json);
2074
2075                 return json_array_create_2(
2076                     json_string_create("named-uuid"),
2077                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
2078             }
2079         }
2080
2081         for (i = 0; i < json->u.array.n; i++) {
2082             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
2083                                                       txn);
2084         }
2085     } else if (json->type == JSON_OBJECT) {
2086         struct shash_node *node;
2087
2088         SHASH_FOR_EACH (node, json_object(json)) {
2089             node->data = substitute_uuids(node->data, txn);
2090         }
2091     }
2092     return json;
2093 }
2094
2095 static void
2096 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
2097 {
2098     struct ovsdb_idl_row *row, *next;
2099
2100     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
2101      * ovsdb_idl_column's 'parse' function, which will call
2102      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
2103      * transaction and fail to update the graph.  */
2104     txn->idl->txn = NULL;
2105
2106     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
2107         if (row->old) {
2108             if (row->written) {
2109                 ovsdb_idl_row_unparse(row);
2110                 ovsdb_idl_row_clear_arcs(row, false);
2111                 ovsdb_idl_row_parse(row);
2112             }
2113         } else {
2114             ovsdb_idl_row_unparse(row);
2115         }
2116         ovsdb_idl_row_clear_new(row);
2117
2118         free(row->prereqs);
2119         row->prereqs = NULL;
2120
2121         free(row->written);
2122         row->written = NULL;
2123
2124         hmap_remove(&txn->txn_rows, &row->txn_node);
2125         hmap_node_nullify(&row->txn_node);
2126         if (!row->old) {
2127             hmap_remove(&row->table->rows, &row->hmap_node);
2128             free(row);
2129         }
2130     }
2131     hmap_destroy(&txn->txn_rows);
2132     hmap_init(&txn->txn_rows);
2133 }
2134
2135 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
2136  * of the following TXN_* constants:
2137  *
2138  *   TXN_INCOMPLETE:
2139  *
2140  *       The transaction is in progress, but not yet complete.  The caller
2141  *       should call again later, after calling ovsdb_idl_run() to let the IDL
2142  *       do OVSDB protocol processing.
2143  *
2144  *   TXN_UNCHANGED:
2145  *
2146  *       The transaction is complete.  (It didn't actually change the database,
2147  *       so the IDL didn't send any request to the database server.)
2148  *
2149  *   TXN_ABORTED:
2150  *
2151  *       The caller previously called ovsdb_idl_txn_abort().
2152  *
2153  *   TXN_SUCCESS:
2154  *
2155  *       The transaction was successful.  The update made by the transaction
2156  *       (and possibly other changes made by other database clients) should
2157  *       already be visible in the IDL.
2158  *
2159  *   TXN_TRY_AGAIN:
2160  *
2161  *       The transaction failed for some transient reason, e.g. because a
2162  *       "verify" operation reported an inconsistency or due to a network
2163  *       problem.  The caller should wait for a change to the database, then
2164  *       compose a new transaction, and commit the new transaction.
2165  *
2166  *       Use the return value of ovsdb_idl_get_seqno() to wait for a change in
2167  *       the database.  It is important to use its return value *before* the
2168  *       initial call to ovsdb_idl_txn_commit() as the baseline for this
2169  *       purpose, because the change that one should wait for can happen after
2170  *       the initial call but before the call that returns TXN_TRY_AGAIN, and
2171  *       using some other baseline value in that situation could cause an
2172  *       indefinite wait if the database rarely changes.
2173  *
2174  *   TXN_NOT_LOCKED:
2175  *
2176  *       The transaction failed because the IDL has been configured to require
2177  *       a database lock (with ovsdb_idl_set_lock()) but didn't get it yet or
2178  *       has already lost it.
2179  *
2180  * Committing a transaction rolls back all of the changes that it made to the
2181  * IDL's copy of the database.  If the transaction commits successfully, then
2182  * the database server will send an update and, thus, the IDL will be updated
2183  * with the committed changes. */
2184 enum ovsdb_idl_txn_status
2185 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
2186 {
2187     struct ovsdb_idl_row *row;
2188     struct json *operations;
2189     bool any_updates;
2190
2191     if (txn != txn->idl->txn) {
2192         goto coverage_out;
2193     }
2194
2195     /* If we need a lock but don't have it, give up quickly. */
2196     if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
2197         txn->status = TXN_NOT_LOCKED;
2198         goto disassemble_out;
2199     }
2200
2201     operations = json_array_create_1(
2202         json_string_create(txn->idl->class->database));
2203
2204     /* Assert that we have the required lock (avoiding a race). */
2205     if (txn->idl->lock_name) {
2206         struct json *op = json_object_create();
2207         json_array_add(operations, op);
2208         json_object_put_string(op, "op", "assert");
2209         json_object_put_string(op, "lock", txn->idl->lock_name);
2210     }
2211
2212     /* Add prerequisites and declarations of new rows. */
2213     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2214         /* XXX check that deleted rows exist even if no prereqs? */
2215         if (row->prereqs) {
2216             const struct ovsdb_idl_table_class *class = row->table->class;
2217             size_t n_columns = class->n_columns;
2218             struct json *op, *columns, *row_json;
2219             size_t idx;
2220
2221             op = json_object_create();
2222             json_array_add(operations, op);
2223             json_object_put_string(op, "op", "wait");
2224             json_object_put_string(op, "table", class->name);
2225             json_object_put(op, "timeout", json_integer_create(0));
2226             json_object_put(op, "where", where_uuid_equals(&row->uuid));
2227             json_object_put_string(op, "until", "==");
2228             columns = json_array_create_empty();
2229             json_object_put(op, "columns", columns);
2230             row_json = json_object_create();
2231             json_object_put(op, "rows", json_array_create_1(row_json));
2232
2233             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
2234                 const struct ovsdb_idl_column *column = &class->columns[idx];
2235                 json_array_add(columns, json_string_create(column->name));
2236                 json_object_put(row_json, column->name,
2237                                 ovsdb_datum_to_json(&row->old[idx],
2238                                                     &column->type));
2239             }
2240         }
2241     }
2242
2243     /* Add updates. */
2244     any_updates = false;
2245     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2246         const struct ovsdb_idl_table_class *class = row->table->class;
2247
2248         if (!row->new) {
2249             if (class->is_root) {
2250                 struct json *op = json_object_create();
2251                 json_object_put_string(op, "op", "delete");
2252                 json_object_put_string(op, "table", class->name);
2253                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2254                 json_array_add(operations, op);
2255                 any_updates = true;
2256             } else {
2257                 /* Let ovsdb-server decide whether to really delete it. */
2258             }
2259         } else if (row->old != row->new) {
2260             struct json *row_json;
2261             struct json *op;
2262             size_t idx;
2263
2264             op = json_object_create();
2265             json_object_put_string(op, "op", row->old ? "update" : "insert");
2266             json_object_put_string(op, "table", class->name);
2267             if (row->old) {
2268                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2269             } else {
2270                 struct ovsdb_idl_txn_insert *insert;
2271
2272                 any_updates = true;
2273
2274                 json_object_put(op, "uuid-name",
2275                                 json_string_create_nocopy(
2276                                     uuid_name_from_uuid(&row->uuid)));
2277
2278                 insert = xmalloc(sizeof *insert);
2279                 insert->dummy = row->uuid;
2280                 insert->op_index = operations->u.array.n - 1;
2281                 uuid_zero(&insert->real);
2282                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
2283                             uuid_hash(&insert->dummy));
2284             }
2285             row_json = json_object_create();
2286             json_object_put(op, "row", row_json);
2287
2288             if (row->written) {
2289                 BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
2290                     const struct ovsdb_idl_column *column =
2291                                                         &class->columns[idx];
2292
2293                     if (row->old
2294                         || !ovsdb_datum_is_default(&row->new[idx],
2295                                                   &column->type)) {
2296                         json_object_put(row_json, column->name,
2297                                         substitute_uuids(
2298                                             ovsdb_datum_to_json(&row->new[idx],
2299                                                                 &column->type),
2300                                             txn));
2301
2302                         /* If anything really changed, consider it an update.
2303                          * We can't suppress not-really-changed values earlier
2304                          * or transactions would become nonatomic (see the big
2305                          * comment inside ovsdb_idl_txn_write()). */
2306                         if (!any_updates && row->old &&
2307                             !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
2308                                                 &column->type)) {
2309                             any_updates = true;
2310                         }
2311                     }
2312                 }
2313             }
2314
2315             if (!row->old || !shash_is_empty(json_object(row_json))) {
2316                 json_array_add(operations, op);
2317             } else {
2318                 json_destroy(op);
2319             }
2320         }
2321     }
2322
2323     /* Add increment. */
2324     if (txn->inc_table && any_updates) {
2325         struct json *op;
2326
2327         txn->inc_index = operations->u.array.n - 1;
2328
2329         op = json_object_create();
2330         json_object_put_string(op, "op", "mutate");
2331         json_object_put_string(op, "table", txn->inc_table);
2332         json_object_put(op, "where",
2333                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2334                                          txn));
2335         json_object_put(op, "mutations",
2336                         json_array_create_1(
2337                             json_array_create_3(
2338                                 json_string_create(txn->inc_column),
2339                                 json_string_create("+="),
2340                                 json_integer_create(1))));
2341         json_array_add(operations, op);
2342
2343         op = json_object_create();
2344         json_object_put_string(op, "op", "select");
2345         json_object_put_string(op, "table", txn->inc_table);
2346         json_object_put(op, "where",
2347                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2348                                          txn));
2349         json_object_put(op, "columns",
2350                         json_array_create_1(json_string_create(
2351                                                 txn->inc_column)));
2352         json_array_add(operations, op);
2353     }
2354
2355     if (txn->comment.length) {
2356         struct json *op = json_object_create();
2357         json_object_put_string(op, "op", "comment");
2358         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
2359         json_array_add(operations, op);
2360     }
2361
2362     if (txn->dry_run) {
2363         struct json *op = json_object_create();
2364         json_object_put_string(op, "op", "abort");
2365         json_array_add(operations, op);
2366     }
2367
2368     if (!any_updates) {
2369         txn->status = TXN_UNCHANGED;
2370         json_destroy(operations);
2371     } else if (!jsonrpc_session_send(
2372                    txn->idl->session,
2373                    jsonrpc_create_request(
2374                        "transact", operations, &txn->request_id))) {
2375         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
2376                     json_hash(txn->request_id, 0));
2377         txn->status = TXN_INCOMPLETE;
2378     } else {
2379         txn->status = TXN_TRY_AGAIN;
2380     }
2381
2382 disassemble_out:
2383     ovsdb_idl_txn_disassemble(txn);
2384 coverage_out:
2385     switch (txn->status) {
2386     case TXN_UNCOMMITTED:   COVERAGE_INC(txn_uncommitted);    break;
2387     case TXN_UNCHANGED:     COVERAGE_INC(txn_unchanged);      break;
2388     case TXN_INCOMPLETE:    COVERAGE_INC(txn_incomplete);     break;
2389     case TXN_ABORTED:       COVERAGE_INC(txn_aborted);        break;
2390     case TXN_SUCCESS:       COVERAGE_INC(txn_success);        break;
2391     case TXN_TRY_AGAIN:     COVERAGE_INC(txn_try_again);      break;
2392     case TXN_NOT_LOCKED:    COVERAGE_INC(txn_not_locked);     break;
2393     case TXN_ERROR:         COVERAGE_INC(txn_error);          break;
2394     }
2395
2396     return txn->status;
2397 }
2398
2399 /* Attempts to commit 'txn', blocking until the commit either succeeds or
2400  * fails.  Returns the final commit status, which may be any TXN_* value other
2401  * than TXN_INCOMPLETE.
2402  *
2403  * This function calls ovsdb_idl_run() on 'txn''s IDL, so it may cause the
2404  * return value of ovsdb_idl_get_seqno() to change. */
2405 enum ovsdb_idl_txn_status
2406 ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
2407 {
2408     enum ovsdb_idl_txn_status status;
2409
2410     fatal_signal_run();
2411     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
2412         ovsdb_idl_run(txn->idl);
2413         ovsdb_idl_wait(txn->idl);
2414         ovsdb_idl_txn_wait(txn);
2415         poll_block();
2416     }
2417     return status;
2418 }
2419
2420 /* Returns the final (incremented) value of the column in 'txn' that was set to
2421  * be incremented by ovsdb_idl_txn_increment().  'txn' must have committed
2422  * successfully. */
2423 int64_t
2424 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
2425 {
2426     ovs_assert(txn->status == TXN_SUCCESS);
2427     return txn->inc_new_value;
2428 }
2429
2430 /* Aborts 'txn' without sending it to the database server.  This is effective
2431  * only if ovsdb_idl_txn_commit() has not yet been called for 'txn'.
2432  * Otherwise, it has no effect.
2433  *
2434  * Aborting a transaction doesn't free its memory.  Use
2435  * ovsdb_idl_txn_destroy() to do that. */
2436 void
2437 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
2438 {
2439     ovsdb_idl_txn_disassemble(txn);
2440     if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
2441         txn->status = TXN_ABORTED;
2442     }
2443 }
2444
2445 /* Returns a string that reports the error status for 'txn'.  The caller must
2446  * not modify or free the returned string.  A call to ovsdb_idl_txn_destroy()
2447  * for 'txn' may free the returned string.
2448  *
2449  * The return value is ordinarily one of the strings that
2450  * ovsdb_idl_txn_status_to_string() would return, but if the transaction failed
2451  * due to an error reported by the database server, the return value is that
2452  * error. */
2453 const char *
2454 ovsdb_idl_txn_get_error(const struct ovsdb_idl_txn *txn)
2455 {
2456     if (txn->status != TXN_ERROR) {
2457         return ovsdb_idl_txn_status_to_string(txn->status);
2458     } else if (txn->error) {
2459         return txn->error;
2460     } else {
2461         return "no error details available";
2462     }
2463 }
2464
2465 static void
2466 ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn,
2467                              const struct json *json)
2468 {
2469     if (txn->error == NULL) {
2470         txn->error = json_to_string(json, JSSF_SORT);
2471     }
2472 }
2473
2474 /* For transaction 'txn' that completed successfully, finds and returns the
2475  * permanent UUID that the database assigned to a newly inserted row, given the
2476  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
2477  *
2478  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
2479  * if it was assigned by that function and then deleted by
2480  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
2481  * and then deleted within a single transaction are never sent to the database
2482  * server, so it never assigns them a permanent UUID.) */
2483 const struct uuid *
2484 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
2485                               const struct uuid *uuid)
2486 {
2487     const struct ovsdb_idl_txn_insert *insert;
2488
2489     ovs_assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
2490     HMAP_FOR_EACH_IN_BUCKET (insert, hmap_node,
2491                              uuid_hash(uuid), &txn->inserted_rows) {
2492         if (uuid_equals(uuid, &insert->dummy)) {
2493             return &insert->real;
2494         }
2495     }
2496     return NULL;
2497 }
2498
2499 static void
2500 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
2501                        enum ovsdb_idl_txn_status status)
2502 {
2503     txn->status = status;
2504     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2505 }
2506
2507 /* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
2508  * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
2509  * ovs-vswitchd).
2510  *
2511  * 'datum' must have the correct type for its column.  The IDL does not check
2512  * that it meets schema constraints, but ovsdb-server will do so at commit time
2513  * so it had better be correct.
2514  *
2515  * A transaction must be in progress.  Replication of 'column' must not have
2516  * been disabled (by calling ovsdb_idl_omit()).
2517  *
2518  * Usually this function is used indirectly through one of the "set" functions
2519  * generated by ovsdb-idlc.
2520  *
2521  * Takes ownership of what 'datum' points to (and in some cases destroys that
2522  * data before returning) but makes a copy of 'datum' itself.  (Commonly
2523  * 'datum' is on the caller's stack.) */
2524 static void
2525 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
2526                       const struct ovsdb_idl_column *column,
2527                       struct ovsdb_datum *datum, bool owns_datum)
2528 {
2529     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2530     const struct ovsdb_idl_table_class *class;
2531     size_t column_idx;
2532     bool write_only;
2533
2534     if (ovsdb_idl_row_is_synthetic(row)) {
2535         goto discard_datum;
2536     }
2537
2538     class = row->table->class;
2539     column_idx = column - class->columns;
2540     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
2541
2542     ovs_assert(row->new != NULL);
2543     ovs_assert(column_idx < class->n_columns);
2544     ovs_assert(row->old == NULL ||
2545                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2546
2547     if (row->table->idl->verify_write_only && !write_only) {
2548         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
2549                  " explicitly configured not to.", class->name, column->name);
2550         goto discard_datum;
2551     }
2552
2553     /* If this is a write-only column and the datum being written is the same
2554      * as the one already there, just skip the update entirely.  This is worth
2555      * optimizing because we have a lot of columns that get periodically
2556      * refreshed into the database but don't actually change that often.
2557      *
2558      * We don't do this for read/write columns because that would break
2559      * atomicity of transactions--some other client might have written a
2560      * different value in that column since we read it.  (But if a whole
2561      * transaction only does writes of existing values, without making any real
2562      * changes, we will drop the whole transaction later in
2563      * ovsdb_idl_txn_commit().) */
2564     if (write_only && ovsdb_datum_equals(ovsdb_idl_read(row, column),
2565                                          datum, &column->type)) {
2566         goto discard_datum;
2567     }
2568
2569     if (hmap_node_is_null(&row->txn_node)) {
2570         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2571                     uuid_hash(&row->uuid));
2572     }
2573     if (row->old == row->new) {
2574         row->new = xmalloc(class->n_columns * sizeof *row->new);
2575     }
2576     if (!row->written) {
2577         row->written = bitmap_allocate(class->n_columns);
2578     }
2579     if (bitmap_is_set(row->written, column_idx)) {
2580         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
2581     } else {
2582         bitmap_set1(row->written, column_idx);
2583     }
2584     if (owns_datum) {
2585         row->new[column_idx] = *datum;
2586     } else {
2587         ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
2588     }
2589     (column->unparse)(row);
2590     (column->parse)(row, &row->new[column_idx]);
2591     return;
2592
2593 discard_datum:
2594     if (owns_datum) {
2595         ovsdb_datum_destroy(datum, &column->type);
2596     }
2597 }
2598
2599 void
2600 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
2601                     const struct ovsdb_idl_column *column,
2602                     struct ovsdb_datum *datum)
2603 {
2604     ovsdb_idl_txn_write__(row, column, datum, true);
2605 }
2606
2607 void
2608 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
2609                           const struct ovsdb_idl_column *column,
2610                           const struct ovsdb_datum *datum)
2611 {
2612     ovsdb_idl_txn_write__(row, column,
2613                           CONST_CAST(struct ovsdb_datum *, datum), false);
2614 }
2615
2616 /* Causes the original contents of 'column' in 'row_' to be verified as a
2617  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
2618  * changed (or if 'row_' was deleted) between the time that the IDL originally
2619  * read its contents and the time that the transaction commits, then the
2620  * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
2621  * TXN_AGAIN_NOW (depending on whether the database change has already been
2622  * received).
2623  *
2624  * The intention is that, to ensure that no transaction commits based on dirty
2625  * reads, an application should call ovsdb_idl_txn_verify() on each data item
2626  * read as part of a read-modify-write operation.
2627  *
2628  * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
2629  * value of 'column' is already known:
2630  *
2631  *   - If 'row_' is a row created by the current transaction (returned by
2632  *     ovsdb_idl_txn_insert()).
2633  *
2634  *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
2635  *     within the current transaction.
2636  *
2637  * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
2638  * ovsdb_idl_txn_write() for a given read-modify-write.
2639  *
2640  * A transaction must be in progress.
2641  *
2642  * Usually this function is used indirectly through one of the "verify"
2643  * functions generated by ovsdb-idlc. */
2644 void
2645 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
2646                      const struct ovsdb_idl_column *column)
2647 {
2648     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2649     const struct ovsdb_idl_table_class *class;
2650     size_t column_idx;
2651
2652     if (ovsdb_idl_row_is_synthetic(row)) {
2653         return;
2654     }
2655
2656     class = row->table->class;
2657     column_idx = column - class->columns;
2658
2659     ovs_assert(row->new != NULL);
2660     ovs_assert(row->old == NULL ||
2661                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2662     if (!row->old
2663         || (row->written && bitmap_is_set(row->written, column_idx))) {
2664         return;
2665     }
2666
2667     if (hmap_node_is_null(&row->txn_node)) {
2668         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2669                     uuid_hash(&row->uuid));
2670     }
2671     if (!row->prereqs) {
2672         row->prereqs = bitmap_allocate(class->n_columns);
2673     }
2674     bitmap_set1(row->prereqs, column_idx);
2675 }
2676
2677 /* Deletes 'row_' from its table.  May free 'row_', so it must not be
2678  * accessed afterward.
2679  *
2680  * A transaction must be in progress.
2681  *
2682  * Usually this function is used indirectly through one of the "delete"
2683  * functions generated by ovsdb-idlc. */
2684 void
2685 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
2686 {
2687     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2688
2689     if (ovsdb_idl_row_is_synthetic(row)) {
2690         return;
2691     }
2692
2693     ovs_assert(row->new != NULL);
2694     if (!row->old) {
2695         ovsdb_idl_row_unparse(row);
2696         ovsdb_idl_row_clear_new(row);
2697         ovs_assert(!row->prereqs);
2698         hmap_remove(&row->table->rows, &row->hmap_node);
2699         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
2700         free(row);
2701         return;
2702     }
2703     if (hmap_node_is_null(&row->txn_node)) {
2704         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2705                     uuid_hash(&row->uuid));
2706     }
2707     ovsdb_idl_row_clear_new(row);
2708     row->new = NULL;
2709 }
2710
2711 /* Inserts and returns a new row in the table with the specified 'class' in the
2712  * database with open transaction 'txn'.
2713  *
2714  * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
2715  * randomly generated; otherwise 'uuid' should specify a randomly generated
2716  * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
2717  * 'txn' is committed, but the IDL will replace any uses of the provisional
2718  * UUID in the data to be to be committed by the UUID assigned by
2719  * ovsdb-server.
2720  *
2721  * Usually this function is used indirectly through one of the "insert"
2722  * functions generated by ovsdb-idlc. */
2723 const struct ovsdb_idl_row *
2724 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
2725                      const struct ovsdb_idl_table_class *class,
2726                      const struct uuid *uuid)
2727 {
2728     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
2729
2730     if (uuid) {
2731         ovs_assert(!ovsdb_idl_txn_get_row(txn, uuid));
2732         row->uuid = *uuid;
2733     } else {
2734         uuid_generate(&row->uuid);
2735     }
2736
2737     row->table = ovsdb_idl_table_from_class(txn->idl, class);
2738     row->new = xmalloc(class->n_columns * sizeof *row->new);
2739     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
2740     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
2741     return row;
2742 }
2743
2744 static void
2745 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
2746 {
2747     struct ovsdb_idl_txn *txn;
2748
2749     HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
2750         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
2751     }
2752 }
2753
2754 static struct ovsdb_idl_txn *
2755 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
2756 {
2757     struct ovsdb_idl_txn *txn;
2758
2759     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
2760                              json_hash(id, 0), &idl->outstanding_txns) {
2761         if (json_equal(id, txn->request_id)) {
2762             return txn;
2763         }
2764     }
2765     return NULL;
2766 }
2767
2768 static bool
2769 check_json_type(const struct json *json, enum json_type type, const char *name)
2770 {
2771     if (!json) {
2772         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
2773         return false;
2774     } else if (json->type != type) {
2775         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
2776                      name, json_type_to_string(json->type),
2777                      json_type_to_string(type));
2778         return false;
2779     } else {
2780         return true;
2781     }
2782 }
2783
2784 static bool
2785 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
2786                                 const struct json_array *results)
2787 {
2788     struct json *count, *rows, *row, *column;
2789     struct shash *mutate, *select;
2790
2791     if (txn->inc_index + 2 > results->n) {
2792         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2793                      "for increment (has %"PRIuSIZE", needs %u)",
2794                      results->n, txn->inc_index + 2);
2795         return false;
2796     }
2797
2798     /* We know that this is a JSON object because the loop in
2799      * ovsdb_idl_txn_process_reply() checked. */
2800     mutate = json_object(results->elems[txn->inc_index]);
2801     count = shash_find_data(mutate, "count");
2802     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
2803         return false;
2804     }
2805     if (count->u.integer != 1) {
2806         VLOG_WARN_RL(&syntax_rl,
2807                      "\"mutate\" reply \"count\" is %lld instead of 1",
2808                      count->u.integer);
2809         return false;
2810     }
2811
2812     select = json_object(results->elems[txn->inc_index + 1]);
2813     rows = shash_find_data(select, "rows");
2814     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
2815         return false;
2816     }
2817     if (rows->u.array.n != 1) {
2818         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %"PRIuSIZE" elements "
2819                      "instead of 1",
2820                      rows->u.array.n);
2821         return false;
2822     }
2823     row = rows->u.array.elems[0];
2824     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
2825         return false;
2826     }
2827     column = shash_find_data(json_object(row), txn->inc_column);
2828     if (!check_json_type(column, JSON_INTEGER,
2829                          "\"select\" reply inc column")) {
2830         return false;
2831     }
2832     txn->inc_new_value = column->u.integer;
2833     return true;
2834 }
2835
2836 static bool
2837 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
2838                                    const struct json_array *results)
2839 {
2840     static const struct ovsdb_base_type uuid_type = OVSDB_BASE_UUID_INIT;
2841     struct ovsdb_error *error;
2842     struct json *json_uuid;
2843     union ovsdb_atom uuid;
2844     struct shash *reply;
2845
2846     if (insert->op_index >= results->n) {
2847         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2848                      "for insert (has %"PRIuSIZE", needs %u)",
2849                      results->n, insert->op_index);
2850         return false;
2851     }
2852
2853     /* We know that this is a JSON object because the loop in
2854      * ovsdb_idl_txn_process_reply() checked. */
2855     reply = json_object(results->elems[insert->op_index]);
2856     json_uuid = shash_find_data(reply, "uuid");
2857     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
2858         return false;
2859     }
2860
2861     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
2862     if (error) {
2863         char *s = ovsdb_error_to_string(error);
2864         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
2865                      "UUID: %s", s);
2866         free(s);
2867         ovsdb_error_destroy(error);
2868         return false;
2869     }
2870
2871     insert->real = uuid.uuid;
2872
2873     return true;
2874 }
2875
2876 static bool
2877 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
2878                             const struct jsonrpc_msg *msg)
2879 {
2880     struct ovsdb_idl_txn *txn;
2881     enum ovsdb_idl_txn_status status;
2882
2883     txn = ovsdb_idl_txn_find(idl, msg->id);
2884     if (!txn) {
2885         return false;
2886     }
2887
2888     if (msg->type == JSONRPC_ERROR) {
2889         status = TXN_ERROR;
2890     } else if (msg->result->type != JSON_ARRAY) {
2891         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
2892         status = TXN_ERROR;
2893     } else {
2894         struct json_array *ops = &msg->result->u.array;
2895         int hard_errors = 0;
2896         int soft_errors = 0;
2897         int lock_errors = 0;
2898         size_t i;
2899
2900         for (i = 0; i < ops->n; i++) {
2901             struct json *op = ops->elems[i];
2902
2903             if (op->type == JSON_NULL) {
2904                 /* This isn't an error in itself but indicates that some prior
2905                  * operation failed, so make sure that we know about it. */
2906                 soft_errors++;
2907             } else if (op->type == JSON_OBJECT) {
2908                 struct json *error;
2909
2910                 error = shash_find_data(json_object(op), "error");
2911                 if (error) {
2912                     if (error->type == JSON_STRING) {
2913                         if (!strcmp(error->u.string, "timed out")) {
2914                             soft_errors++;
2915                         } else if (!strcmp(error->u.string, "not owner")) {
2916                             lock_errors++;
2917                         } else if (strcmp(error->u.string, "aborted")) {
2918                             hard_errors++;
2919                             ovsdb_idl_txn_set_error_json(txn, op);
2920                         }
2921                     } else {
2922                         hard_errors++;
2923                         ovsdb_idl_txn_set_error_json(txn, op);
2924                         VLOG_WARN_RL(&syntax_rl,
2925                                      "\"error\" in reply is not JSON string");
2926                     }
2927                 }
2928             } else {
2929                 hard_errors++;
2930                 ovsdb_idl_txn_set_error_json(txn, op);
2931                 VLOG_WARN_RL(&syntax_rl,
2932                              "operation reply is not JSON null or object");
2933             }
2934         }
2935
2936         if (!soft_errors && !hard_errors && !lock_errors) {
2937             struct ovsdb_idl_txn_insert *insert;
2938
2939             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
2940                 hard_errors++;
2941             }
2942
2943             HMAP_FOR_EACH (insert, hmap_node, &txn->inserted_rows) {
2944                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
2945                     hard_errors++;
2946                 }
2947             }
2948         }
2949
2950         status = (hard_errors ? TXN_ERROR
2951                   : lock_errors ? TXN_NOT_LOCKED
2952                   : soft_errors ? TXN_TRY_AGAIN
2953                   : TXN_SUCCESS);
2954     }
2955
2956     ovsdb_idl_txn_complete(txn, status);
2957     return true;
2958 }
2959
2960 /* Returns the transaction currently active for 'row''s IDL.  A transaction
2961  * must currently be active. */
2962 struct ovsdb_idl_txn *
2963 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
2964 {
2965     struct ovsdb_idl_txn *txn = row->table->idl->txn;
2966     ovs_assert(txn != NULL);
2967     return txn;
2968 }
2969
2970 /* Returns the IDL on which 'txn' acts. */
2971 struct ovsdb_idl *
2972 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
2973 {
2974     return txn->idl;
2975 }
2976
2977 /* Blocks until 'idl' successfully connects to the remote database and
2978  * retrieves its contents. */
2979 void
2980 ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
2981 {
2982     while (1) {
2983         ovsdb_idl_run(idl);
2984         if (ovsdb_idl_has_ever_connected(idl)) {
2985             return;
2986         }
2987         ovsdb_idl_wait(idl);
2988         poll_block();
2989     }
2990 }
2991 \f
2992 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
2993  * the database server and to avoid modifying the database when the lock cannot
2994  * be acquired (that is, when another client has the same lock).
2995  *
2996  * If 'lock_name' is NULL, drops the locking requirement and releases the
2997  * lock. */
2998 void
2999 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
3000 {
3001     ovs_assert(!idl->txn);
3002     ovs_assert(hmap_is_empty(&idl->outstanding_txns));
3003
3004     if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
3005         /* Release previous lock. */
3006         ovsdb_idl_send_unlock_request(idl);
3007         free(idl->lock_name);
3008         idl->lock_name = NULL;
3009         idl->is_lock_contended = false;
3010     }
3011
3012     if (lock_name && !idl->lock_name) {
3013         /* Acquire new lock. */
3014         idl->lock_name = xstrdup(lock_name);
3015         ovsdb_idl_send_lock_request(idl);
3016     }
3017 }
3018
3019 /* Returns true if 'idl' is configured to obtain a lock and owns that lock.
3020  *
3021  * Locking and unlocking happens asynchronously from the database client's
3022  * point of view, so the information is only useful for optimization (e.g. if
3023  * the client doesn't have the lock then there's no point in trying to write to
3024  * the database). */
3025 bool
3026 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
3027 {
3028     return idl->has_lock;
3029 }
3030
3031 /* Returns true if 'idl' is configured to obtain a lock but the database server
3032  * has indicated that some other client already owns the requested lock. */
3033 bool
3034 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
3035 {
3036     return idl->is_lock_contended;
3037 }
3038
3039 static void
3040 ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
3041 {
3042     if (new_has_lock && !idl->has_lock) {
3043         if (idl->state == IDL_S_MONITORING ||
3044             idl->state == IDL_S_MONITORING2) {
3045             idl->change_seqno++;
3046         } else {
3047             /* We're setting up a session, so don't signal that the database
3048              * changed.  Finalizing the session will increment change_seqno
3049              * anyhow. */
3050         }
3051         idl->is_lock_contended = false;
3052     }
3053     idl->has_lock = new_has_lock;
3054 }
3055
3056 static void
3057 ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
3058                               struct json **idp)
3059 {
3060     ovsdb_idl_update_has_lock(idl, false);
3061
3062     json_destroy(idl->lock_request_id);
3063     idl->lock_request_id = NULL;
3064
3065     if (jsonrpc_session_is_connected(idl->session)) {
3066         struct json *params;
3067
3068         params = json_array_create_1(json_string_create(idl->lock_name));
3069         jsonrpc_session_send(idl->session,
3070                              jsonrpc_create_request(method, params, idp));
3071     }
3072 }
3073
3074 static void
3075 ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
3076 {
3077     ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
3078 }
3079
3080 static void
3081 ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
3082 {
3083     ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
3084 }
3085
3086 static void
3087 ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
3088 {
3089     bool got_lock;
3090
3091     json_destroy(idl->lock_request_id);
3092     idl->lock_request_id = NULL;
3093
3094     if (result->type == JSON_OBJECT) {
3095         const struct json *locked;
3096
3097         locked = shash_find_data(json_object(result), "locked");
3098         got_lock = locked && locked->type == JSON_TRUE;
3099     } else {
3100         got_lock = false;
3101     }
3102
3103     ovsdb_idl_update_has_lock(idl, got_lock);
3104     if (!got_lock) {
3105         idl->is_lock_contended = true;
3106     }
3107 }
3108
3109 static void
3110 ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
3111                             const struct json *params,
3112                             bool new_has_lock)
3113 {
3114     if (idl->lock_name
3115         && params->type == JSON_ARRAY
3116         && json_array(params)->n > 0
3117         && json_array(params)->elems[0]->type == JSON_STRING) {
3118         const char *lock_name = json_string(json_array(params)->elems[0]);
3119
3120         if (!strcmp(idl->lock_name, lock_name)) {
3121             ovsdb_idl_update_has_lock(idl, new_has_lock);
3122             if (!new_has_lock) {
3123                 idl->is_lock_contended = true;
3124             }
3125         }
3126     }
3127 }
3128
3129 void
3130 ovsdb_idl_loop_destroy(struct ovsdb_idl_loop *loop)
3131 {
3132     if (loop) {
3133         ovsdb_idl_destroy(loop->idl);
3134     }
3135 }
3136
3137 struct ovsdb_idl_txn *
3138 ovsdb_idl_loop_run(struct ovsdb_idl_loop *loop)
3139 {
3140     ovsdb_idl_run(loop->idl);
3141     loop->open_txn = (loop->committing_txn
3142                       || ovsdb_idl_get_seqno(loop->idl) == loop->skip_seqno
3143                       ? NULL
3144                       : ovsdb_idl_txn_create(loop->idl));
3145     return loop->open_txn;
3146 }
3147
3148 void
3149 ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
3150 {
3151     if (loop->open_txn) {
3152         loop->committing_txn = loop->open_txn;
3153         loop->open_txn = NULL;
3154
3155         loop->precommit_seqno = ovsdb_idl_get_seqno(loop->idl);
3156     }
3157
3158     struct ovsdb_idl_txn *txn = loop->committing_txn;
3159     if (txn) {
3160         enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(txn);
3161         if (status != TXN_INCOMPLETE) {
3162             switch (status) {
3163             case TXN_TRY_AGAIN:
3164                 /* We want to re-evaluate the database when it's changed from
3165                  * the contents that it had when we started the commit.  (That
3166                  * might have already happened.) */
3167                 loop->skip_seqno = loop->precommit_seqno;
3168                 if (ovsdb_idl_get_seqno(loop->idl) != loop->skip_seqno) {
3169                     poll_immediate_wake();
3170                 }
3171                 break;
3172
3173             case TXN_SUCCESS:
3174                 /* If the database has already changed since we started the
3175                  * commit, re-evaluate it immediately to avoid missing a change
3176                  * for a while. */
3177                 if (ovsdb_idl_get_seqno(loop->idl) != loop->precommit_seqno) {
3178                     poll_immediate_wake();
3179                 }
3180                 break;
3181
3182             case TXN_UNCHANGED:
3183             case TXN_ABORTED:
3184             case TXN_NOT_LOCKED:
3185             case TXN_ERROR:
3186                 break;
3187
3188             case TXN_UNCOMMITTED:
3189             case TXN_INCOMPLETE:
3190                 OVS_NOT_REACHED();
3191
3192             }
3193             ovsdb_idl_txn_destroy(txn);
3194             loop->committing_txn = NULL;
3195         }
3196     }
3197
3198     ovsdb_idl_wait(loop->idl);
3199 }