openvswitch-switch.init: Redirect error to /dev/null.
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "coverage.h"
27 #include "dynamic-string.h"
28 #include "fatal-signal.h"
29 #include "json.h"
30 #include "jsonrpc.h"
31 #include "ovsdb/ovsdb.h"
32 #include "ovsdb/table.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-idl-provider.h"
36 #include "ovsdb-parser.h"
37 #include "poll-loop.h"
38 #include "shash.h"
39 #include "sset.h"
40 #include "util.h"
41 #include "openvswitch/vlog.h"
42
43 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
44
45 COVERAGE_DEFINE(txn_uncommitted);
46 COVERAGE_DEFINE(txn_unchanged);
47 COVERAGE_DEFINE(txn_incomplete);
48 COVERAGE_DEFINE(txn_aborted);
49 COVERAGE_DEFINE(txn_success);
50 COVERAGE_DEFINE(txn_try_again);
51 COVERAGE_DEFINE(txn_not_locked);
52 COVERAGE_DEFINE(txn_error);
53
54 /* An arc from one idl_row to another.  When row A contains a UUID that
55  * references row B, this is represented by an arc from A (the source) to B
56  * (the destination).
57  *
58  * Arcs from a row to itself are omitted, that is, src and dst are always
59  * different.
60  *
61  * Arcs are never duplicated, that is, even if there are multiple references
62  * from A to B, there is only a single arc from A to B.
63  *
64  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
65  * A.  Both an arc and its converse may both be present, if each row refers
66  * to the other circularly.
67  *
68  * The source and destination row may be in the same table or in different
69  * tables.
70  */
71 struct ovsdb_idl_arc {
72     struct ovs_list src_node;   /* In src->src_arcs list. */
73     struct ovs_list dst_node;   /* In dst->dst_arcs list. */
74     struct ovsdb_idl_row *src;  /* Source row. */
75     struct ovsdb_idl_row *dst;  /* Destination row. */
76 };
77
78 enum ovsdb_idl_state {
79     IDL_S_SCHEMA_REQUESTED,
80     IDL_S_MONITOR_REQUESTED,
81     IDL_S_MONITORING,
82     IDL_S_MONITOR2_REQUESTED,
83     IDL_S_MONITORING2
84 };
85
86 struct ovsdb_idl {
87     const struct ovsdb_idl_class *class;
88     struct jsonrpc_session *session;
89     struct shash table_by_name;
90     struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
91     unsigned int change_seqno;
92     bool verify_write_only;
93
94     /* Session state. */
95     unsigned int state_seqno;
96     enum ovsdb_idl_state state;
97     struct json *request_id;
98     struct json *schema;
99
100     /* Database locking. */
101     char *lock_name;            /* Name of lock we need, NULL if none. */
102     bool has_lock;              /* Has db server told us we have the lock? */
103     bool is_lock_contended;     /* Has db server told us we can't get lock? */
104     struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
105
106     /* Transaction support. */
107     struct ovsdb_idl_txn *txn;
108     struct hmap outstanding_txns;
109 };
110
111 struct ovsdb_idl_txn {
112     struct hmap_node hmap_node;
113     struct json *request_id;
114     struct ovsdb_idl *idl;
115     struct hmap txn_rows;
116     enum ovsdb_idl_txn_status status;
117     char *error;
118     bool dry_run;
119     struct ds comment;
120
121     /* Increments. */
122     const char *inc_table;
123     const char *inc_column;
124     struct uuid inc_row;
125     unsigned int inc_index;
126     int64_t inc_new_value;
127
128     /* Inserted rows. */
129     struct hmap inserted_rows;  /* Contains "struct ovsdb_idl_txn_insert"s. */
130 };
131
132 struct ovsdb_idl_txn_insert {
133     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
134     struct uuid dummy;          /* Dummy UUID used locally. */
135     int op_index;               /* Index into transaction's operation array. */
136     struct uuid real;           /* Real UUID used by database server. */
137 };
138
139 enum ovsdb_update_version {
140     OVSDB_UPDATE,               /* RFC 7047 "update" method. */
141     OVSDB_UPDATE2               /* "update2" Extension to RFC 7047.
142                                    See ovsdb-server(1) for more information. */
143 };
144
145 /* Name arrays indexed by 'enum ovsdb_update_version'. */
146 static const char *table_updates_names[] = {"table_updates", "table_updates2"};
147 static const char *table_update_names[] = {"table_update", "table_update2"};
148 static const char *row_update_names[] = {"row_update", "row_update2"};
149
150 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
151 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
152
153 static void ovsdb_idl_clear(struct ovsdb_idl *);
154 static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
155 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
156 static void ovsdb_idl_send_monitor2_request(struct ovsdb_idl *);
157 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *,
158                                    enum ovsdb_update_version);
159 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
160                                                     const struct json *,
161                                                     enum ovsdb_update_version);
162 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
163                                      const struct uuid *,
164                                      const struct json *old,
165                                      const struct json *new);
166 static bool ovsdb_idl_process_update2(struct ovsdb_idl_table *,
167                                       const struct uuid *,
168                                       const char *operation,
169                                       const struct json *row);
170 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
171 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
172 static bool ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
173 static bool ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *,
174                                          const struct json *);
175
176 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
177 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
178     const struct ovsdb_idl_table_class *);
179 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
180                                                   const struct uuid *);
181 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
182 static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *);
183
184 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
185 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
186 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
187 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
188 static void ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *, bool destroy_dsts);
189
190 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
191 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
192                                         const struct jsonrpc_msg *msg);
193
194 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
195 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
196 static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
197                                        const struct json *);
198 static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
199                                         const struct json *params,
200                                         bool new_has_lock);
201 static struct ovsdb_idl_table *
202 ovsdb_idl_table_from_class(const struct ovsdb_idl *,
203                            const struct ovsdb_idl_table_class *);
204 static bool ovsdb_idl_track_is_set(struct ovsdb_idl_table *table);
205
206 /* Creates and returns a connection to database 'remote', which should be in a
207  * form acceptable to jsonrpc_session_open().  The connection will maintain an
208  * in-memory replica of the remote database whose schema is described by
209  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
210  * by ovsdb-idlc.)
211  *
212  * Passes 'retry' to jsonrpc_session_open().  See that function for
213  * documentation.
214  *
215  * If 'monitor_everything_by_default' is true, then everything in the remote
216  * database will be replicated by default.  ovsdb_idl_omit() and
217  * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
218  * monitoring.
219  *
220  * If 'monitor_everything_by_default' is false, then no columns or tables will
221  * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
222  * must be used to choose some columns or tables to replicate.
223  */
224 struct ovsdb_idl *
225 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
226                  bool monitor_everything_by_default, bool retry)
227 {
228     struct ovsdb_idl *idl;
229     uint8_t default_mode;
230     size_t i;
231
232     default_mode = (monitor_everything_by_default
233                     ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
234                     : 0);
235
236     idl = xzalloc(sizeof *idl);
237     idl->class = class;
238     idl->session = jsonrpc_session_open(remote, retry);
239     shash_init(&idl->table_by_name);
240     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
241     for (i = 0; i < class->n_tables; i++) {
242         const struct ovsdb_idl_table_class *tc = &class->tables[i];
243         struct ovsdb_idl_table *table = &idl->tables[i];
244         size_t j;
245
246         shash_add_assert(&idl->table_by_name, tc->name, table);
247         table->class = tc;
248         table->modes = xmalloc(tc->n_columns);
249         memset(table->modes, default_mode, tc->n_columns);
250         table->need_table = false;
251         shash_init(&table->columns);
252         for (j = 0; j < tc->n_columns; j++) {
253             const struct ovsdb_idl_column *column = &tc->columns[j];
254
255             shash_add_assert(&table->columns, column->name, column);
256         }
257         hmap_init(&table->rows);
258         list_init(&table->track_list);
259         table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
260             = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
261             = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
262         table->idl = idl;
263     }
264
265     idl->state_seqno = UINT_MAX;
266     idl->request_id = NULL;
267     idl->schema = NULL;
268
269     hmap_init(&idl->outstanding_txns);
270
271     return idl;
272 }
273
274 /* Destroys 'idl' and all of the data structures that it manages. */
275 void
276 ovsdb_idl_destroy(struct ovsdb_idl *idl)
277 {
278     if (idl) {
279         size_t i;
280
281         ovs_assert(!idl->txn);
282         ovsdb_idl_clear(idl);
283         jsonrpc_session_close(idl->session);
284
285         for (i = 0; i < idl->class->n_tables; i++) {
286             struct ovsdb_idl_table *table = &idl->tables[i];
287             shash_destroy(&table->columns);
288             hmap_destroy(&table->rows);
289             free(table->modes);
290         }
291         shash_destroy(&idl->table_by_name);
292         free(idl->tables);
293         json_destroy(idl->request_id);
294         free(idl->lock_name);
295         json_destroy(idl->lock_request_id);
296         json_destroy(idl->schema);
297         hmap_destroy(&idl->outstanding_txns);
298         free(idl);
299     }
300 }
301
302 static void
303 ovsdb_idl_clear(struct ovsdb_idl *idl)
304 {
305     bool changed = false;
306     size_t i;
307
308     for (i = 0; i < idl->class->n_tables; i++) {
309         struct ovsdb_idl_table *table = &idl->tables[i];
310         struct ovsdb_idl_row *row, *next_row;
311
312         if (hmap_is_empty(&table->rows)) {
313             continue;
314         }
315
316         changed = true;
317         HMAP_FOR_EACH_SAFE (row, next_row, hmap_node, &table->rows) {
318             struct ovsdb_idl_arc *arc, *next_arc;
319
320             if (!ovsdb_idl_row_is_orphan(row)) {
321                 ovsdb_idl_row_unparse(row);
322             }
323             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
324                 free(arc);
325             }
326             /* No need to do anything with dst_arcs: some node has those arcs
327              * as forward arcs and will destroy them itself. */
328
329             if (!list_is_empty(&row->track_node)) {
330                 list_remove(&row->track_node);
331             }
332
333             ovsdb_idl_row_destroy(row);
334         }
335     }
336
337     ovsdb_idl_track_clear(idl);
338
339     if (changed) {
340         idl->change_seqno++;
341     }
342 }
343
344 /* Processes a batch of messages from the database server on 'idl'.  This may
345  * cause the IDL's contents to change.  The client may check for that with
346  * ovsdb_idl_get_seqno(). */
347 void
348 ovsdb_idl_run(struct ovsdb_idl *idl)
349 {
350     int i;
351
352     ovs_assert(!idl->txn);
353     jsonrpc_session_run(idl->session);
354     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
355         struct jsonrpc_msg *msg;
356         unsigned int seqno;
357
358         seqno = jsonrpc_session_get_seqno(idl->session);
359         if (idl->state_seqno != seqno) {
360             idl->state_seqno = seqno;
361             json_destroy(idl->request_id);
362             idl->request_id = NULL;
363             ovsdb_idl_txn_abort_all(idl);
364
365             ovsdb_idl_send_schema_request(idl);
366             idl->state = IDL_S_SCHEMA_REQUESTED;
367             if (idl->lock_name) {
368                 ovsdb_idl_send_lock_request(idl);
369             }
370         }
371
372         msg = jsonrpc_session_recv(idl->session);
373         if (!msg) {
374             break;
375         }
376
377         if (msg->type == JSONRPC_NOTIFY
378             && !strcmp(msg->method, "update2")
379             && msg->params->type == JSON_ARRAY
380             && msg->params->u.array.n == 2
381             && msg->params->u.array.elems[0]->type == JSON_NULL) {
382             /* Database contents changed. */
383             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
384                                    OVSDB_UPDATE2);
385         } else if (msg->type == JSONRPC_REPLY
386                    && idl->request_id
387                    && json_equal(idl->request_id, msg->id)) {
388             json_destroy(idl->request_id);
389             idl->request_id = NULL;
390
391             switch (idl->state) {
392             case IDL_S_SCHEMA_REQUESTED:
393                 /* Reply to our "get_schema" request. */
394                 idl->schema = json_clone(msg->result);
395                 ovsdb_idl_send_monitor2_request(idl);
396                 idl->state = IDL_S_MONITOR2_REQUESTED;
397                 break;
398
399             case IDL_S_MONITOR_REQUESTED:
400             case IDL_S_MONITOR2_REQUESTED:
401                 /* Reply to our "monitor" or "monitor2" request. */
402                 idl->change_seqno++;
403                 ovsdb_idl_clear(idl);
404                 if (idl->state == IDL_S_MONITOR_REQUESTED) {
405                     idl->state = IDL_S_MONITORING;
406                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE);
407                 } else { /* IDL_S_MONITOR2_REQUESTED. */
408                     idl->state = IDL_S_MONITORING2;
409                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE2);
410                 }
411
412                 /* Schema is not useful after monitor request is accepted
413                  * by the server.  */
414                 json_destroy(idl->schema);
415                 idl->schema = NULL;
416                 break;
417
418             case IDL_S_MONITORING:
419             case IDL_S_MONITORING2:
420             default:
421                 OVS_NOT_REACHED();
422             }
423         } else if (msg->type == JSONRPC_NOTIFY
424             && !strcmp(msg->method, "update")
425             && msg->params->type == JSON_ARRAY
426             && msg->params->u.array.n == 2
427             && msg->params->u.array.elems[0]->type == JSON_NULL) {
428             /* Database contents changed. */
429             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
430                                    OVSDB_UPDATE);
431         } else if (msg->type == JSONRPC_REPLY
432                    && idl->lock_request_id
433                    && json_equal(idl->lock_request_id, msg->id)) {
434             /* Reply to our "lock" request. */
435             ovsdb_idl_parse_lock_reply(idl, msg->result);
436         } else if (msg->type == JSONRPC_NOTIFY
437                    && !strcmp(msg->method, "locked")) {
438             /* We got our lock. */
439             ovsdb_idl_parse_lock_notify(idl, msg->params, true);
440         } else if (msg->type == JSONRPC_NOTIFY
441                    && !strcmp(msg->method, "stolen")) {
442             /* Someone else stole our lock. */
443             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
444         } else if (msg->type == JSONRPC_ERROR
445                    && idl->state == IDL_S_MONITOR2_REQUESTED
446                    && idl->request_id
447                    && json_equal(idl->request_id, msg->id)) {
448             if (msg->error && !strcmp(json_string(msg->error),
449                                       "unknown method")) {
450                 /* Fall back to using "monitor" method.  */
451                 json_destroy(idl->request_id);
452                 idl->request_id = NULL;
453                 ovsdb_idl_send_monitor_request(idl);
454                 idl->state = IDL_S_MONITOR_REQUESTED;
455             }
456         } else if ((msg->type == JSONRPC_ERROR
457                     || msg->type == JSONRPC_REPLY)
458                    && ovsdb_idl_txn_process_reply(idl, msg)) {
459             /* ovsdb_idl_txn_process_reply() did everything needful. */
460         } else {
461             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
462              * a transaction before we receive the reply, so keep the log level
463              * low. */
464             VLOG_DBG("%s: received unexpected %s message",
465                      jsonrpc_session_get_name(idl->session),
466                      jsonrpc_msg_type_to_string(msg->type));
467         }
468         jsonrpc_msg_destroy(msg);
469     }
470     ovsdb_idl_row_destroy_postprocess(idl);
471 }
472
473 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
474  * do or when activity occurs on a transaction on 'idl'. */
475 void
476 ovsdb_idl_wait(struct ovsdb_idl *idl)
477 {
478     jsonrpc_session_wait(idl->session);
479     jsonrpc_session_recv_wait(idl->session);
480 }
481
482 /* Returns a "sequence number" that represents the state of 'idl'.  When
483  * ovsdb_idl_run() changes the database, the sequence number changes.  The
484  * initial fetch of the entire contents of the remote database is considered to
485  * be one kind of change.  Successfully acquiring a lock, if one has been
486  * configured with ovsdb_idl_set_lock(), is also considered to be a change.
487  *
488  * As long as the sequence number does not change, the client may continue to
489  * use any data structures it obtains from 'idl'.  But when it changes, the
490  * client must not access any of these data structures again, because they
491  * could have freed or reused for other purposes.
492  *
493  * The sequence number can occasionally change even if the database does not.
494  * This happens if the connection to the database drops and reconnects, which
495  * causes the database contents to be reloaded even if they didn't change.  (It
496  * could also happen if the database server sends out a "change" that reflects
497  * what the IDL already thought was in the database.  The database server is
498  * not supposed to do that, but bugs could in theory cause it to do so.) */
499 unsigned int
500 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
501 {
502     return idl->change_seqno;
503 }
504
505 /* Returns true if 'idl' successfully connected to the remote database and
506  * retrieved its contents (even if the connection subsequently dropped and is
507  * in the process of reconnecting).  If so, then 'idl' contains an atomic
508  * snapshot of the database's contents (but it might be arbitrarily old if the
509  * connection dropped).
510  *
511  * Returns false if 'idl' has never connected or retrieved the database's
512  * contents.  If so, 'idl' is empty. */
513 bool
514 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
515 {
516     return ovsdb_idl_get_seqno(idl) != 0;
517 }
518
519 /* Reconfigures 'idl' so that it would reconnect to the database, if
520  * connection was dropped. */
521 void
522 ovsdb_idl_enable_reconnect(struct ovsdb_idl *idl)
523 {
524     jsonrpc_session_enable_reconnect(idl->session);
525 }
526
527 /* Forces 'idl' to drop its connection to the database and reconnect.  In the
528  * meantime, the contents of 'idl' will not change. */
529 void
530 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
531 {
532     jsonrpc_session_force_reconnect(idl->session);
533 }
534
535 /* Some IDL users should only write to write-only columns.  Furthermore,
536  * writing to a column which is not write-only can cause serious performance
537  * degradations for these users.  This function causes 'idl' to reject writes
538  * to columns which are not marked write only using ovsdb_idl_omit_alert(). */
539 void
540 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
541 {
542     idl->verify_write_only = true;
543 }
544
545 /* Returns true if 'idl' is currently connected or trying to connect. */
546 bool
547 ovsdb_idl_is_alive(const struct ovsdb_idl *idl)
548 {
549     return jsonrpc_session_is_alive(idl->session);
550 }
551
552 /* Returns the last error reported on a connection by 'idl'.  The return value
553  * is 0 only if no connection made by 'idl' has ever encountered an error.  See
554  * jsonrpc_get_status() for return value interpretation. */
555 int
556 ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
557 {
558     return jsonrpc_session_get_last_error(idl->session);
559 }
560 \f
561 static unsigned char *
562 ovsdb_idl_get_mode(struct ovsdb_idl *idl,
563                    const struct ovsdb_idl_column *column)
564 {
565     size_t i;
566
567     ovs_assert(!idl->change_seqno);
568
569     for (i = 0; i < idl->class->n_tables; i++) {
570         const struct ovsdb_idl_table *table = &idl->tables[i];
571         const struct ovsdb_idl_table_class *tc = table->class;
572
573         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
574             return &table->modes[column - tc->columns];
575         }
576     }
577
578     OVS_NOT_REACHED();
579 }
580
581 static void
582 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
583 {
584     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
585         struct ovsdb_idl_table *table;
586
587         table = shash_find_data(&idl->table_by_name,
588                                 base->u.uuid.refTableName);
589         if (table) {
590             table->need_table = true;
591         } else {
592             VLOG_WARN("%s IDL class missing referenced table %s",
593                       idl->class->database, base->u.uuid.refTableName);
594         }
595     }
596 }
597
598 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
599  * ensures that any tables referenced by 'column' will be replicated, even if
600  * no columns in that table are selected for replication (see
601  * ovsdb_idl_add_table() for more information).
602  *
603  * This function is only useful if 'monitor_everything_by_default' was false in
604  * the call to ovsdb_idl_create().  This function should be called between
605  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
606  */
607 void
608 ovsdb_idl_add_column(struct ovsdb_idl *idl,
609                      const struct ovsdb_idl_column *column)
610 {
611     *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
612     add_ref_table(idl, &column->type.key);
613     add_ref_table(idl, &column->type.value);
614 }
615
616 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
617  * no columns are selected for replication. Just the necessary data for table
618  * references will be replicated (the UUID of the rows, for instance), any
619  * columns not selected for replication will remain unreplicated.
620  * This can be useful because it allows 'idl' to keep track of what rows in the
621  * table actually exist, which in turn allows columns that reference the table
622  * to have accurate contents. (The IDL presents the database with references to
623  * rows that do not exist removed.)
624  *
625  * This function is only useful if 'monitor_everything_by_default' was false in
626  * the call to ovsdb_idl_create().  This function should be called between
627  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
628  */
629 void
630 ovsdb_idl_add_table(struct ovsdb_idl *idl,
631                     const struct ovsdb_idl_table_class *tc)
632 {
633     size_t i;
634
635     for (i = 0; i < idl->class->n_tables; i++) {
636         struct ovsdb_idl_table *table = &idl->tables[i];
637
638         if (table->class == tc) {
639             table->need_table = true;
640             return;
641         }
642     }
643
644     OVS_NOT_REACHED();
645 }
646
647 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
648  *
649  * This function should be called between ovsdb_idl_create() and the first call
650  * to ovsdb_idl_run().
651  */
652 void
653 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
654                      const struct ovsdb_idl_column *column)
655 {
656     *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
657 }
658
659 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
660  * OVSDB_IDL_MONITOR for details.
661  *
662  * This function should be called between ovsdb_idl_create() and the first call
663  * to ovsdb_idl_run().
664  */
665 void
666 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
667 {
668     *ovsdb_idl_get_mode(idl, column) = 0;
669 }
670
671 /* Returns the most recent IDL change sequence number that caused a
672  * insert, modify or delete update to the table with class 'table_class'.
673  */
674 unsigned int
675 ovsdb_idl_table_get_seqno(const struct ovsdb_idl *idl,
676                           const struct ovsdb_idl_table_class *table_class)
677 {
678     struct ovsdb_idl_table *table
679         = ovsdb_idl_table_from_class(idl, table_class);
680     unsigned int max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_INSERT];
681
682     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]) {
683         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY];
684     }
685     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_DELETE]) {
686         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_DELETE];
687     }
688     return max_seqno;
689 }
690
691 /* For each row that contains tracked columns, IDL stores the most
692  * recent IDL change sequence numbers associateed with insert, modify
693  * and delete updates to the table.
694  */
695 unsigned int
696 ovsdb_idl_row_get_seqno(const struct ovsdb_idl_row *row,
697                         enum ovsdb_idl_change change)
698 {
699     return row->change_seqno[change];
700 }
701
702 /* Turns on OVSDB_IDL_TRACK for 'column' in 'idl', ensuring that
703  * all rows whose 'column' is modified are traced. Similarly, insert
704  * or delete of rows having 'column' are tracked. Clients are able
705  * to retrive the tracked rows with the ovsdb_idl_track_get_*()
706  * functions.
707  *
708  * This function should be called between ovsdb_idl_create() and
709  * the first call to ovsdb_idl_run(). The column to be tracked
710  * should have OVSDB_IDL_ALERT turned on.
711  */
712 void
713 ovsdb_idl_track_add_column(struct ovsdb_idl *idl,
714                            const struct ovsdb_idl_column *column)
715 {
716     if (!(*ovsdb_idl_get_mode(idl, column) & OVSDB_IDL_ALERT)) {
717         ovsdb_idl_add_column(idl, column);
718     }
719     *ovsdb_idl_get_mode(idl, column) |= OVSDB_IDL_TRACK;
720 }
721
722 void
723 ovsdb_idl_track_add_all(struct ovsdb_idl *idl)
724 {
725     size_t i, j;
726
727     for (i = 0; i < idl->class->n_tables; i++) {
728         const struct ovsdb_idl_table_class *tc = &idl->class->tables[i];
729
730         for (j = 0; j < tc->n_columns; j++) {
731             const struct ovsdb_idl_column *column = &tc->columns[j];
732             ovsdb_idl_track_add_column(idl, column);
733         }
734     }
735 }
736
737 /* Returns true if 'table' has any tracked column. */
738 static bool
739 ovsdb_idl_track_is_set(struct ovsdb_idl_table *table)
740 {
741     size_t i;
742
743     for (i = 0; i < table->class->n_columns; i++) {
744         if (table->modes[i] & OVSDB_IDL_TRACK) {
745             return true;
746         }
747     }
748    return false;
749 }
750
751 /* Returns the first tracked row in table with class 'table_class'
752  * for the specified 'idl'. Returns NULL if there are no tracked rows */
753 const struct ovsdb_idl_row *
754 ovsdb_idl_track_get_first(const struct ovsdb_idl *idl,
755                           const struct ovsdb_idl_table_class *table_class)
756 {
757     struct ovsdb_idl_table *table
758         = ovsdb_idl_table_from_class(idl, table_class);
759
760     if (!list_is_empty(&table->track_list)) {
761         return CONTAINER_OF(list_front(&table->track_list), struct ovsdb_idl_row, track_node);
762     }
763     return NULL;
764 }
765
766 /* Returns the next tracked row in table after the specified 'row'
767  * (in no particular order). Returns NULL if there are no tracked rows */
768 const struct ovsdb_idl_row *
769 ovsdb_idl_track_get_next(const struct ovsdb_idl_row *row)
770 {
771     if (row->track_node.next != &row->table->track_list) {
772         return CONTAINER_OF(row->track_node.next, struct ovsdb_idl_row, track_node);
773     }
774
775     return NULL;
776 }
777
778 /* Flushes the tracked rows. Client calls this function after calling
779  * ovsdb_idl_run() and read all tracked rows with the ovsdb_idl_track_get_*()
780  * functions. This is usually done at the end of the client's processing
781  * loop when it is ready to do ovsdb_idl_run() again.
782  */
783 void
784 ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
785 {
786     size_t i;
787
788     for (i = 0; i < idl->class->n_tables; i++) {
789         struct ovsdb_idl_table *table = &idl->tables[i];
790
791         if (!list_is_empty(&table->track_list)) {
792             struct ovsdb_idl_row *row, *next;
793
794             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
795                 list_remove(&row->track_node);
796                 list_init(&row->track_node);
797                 if (ovsdb_idl_row_is_orphan(row)) {
798                     ovsdb_idl_row_clear_old(row);
799                     free(row);
800                 }
801             }
802         }
803     }
804 }
805
806 \f
807 static void
808 ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
809 {
810     struct jsonrpc_msg *msg;
811
812     json_destroy(idl->request_id);
813     msg = jsonrpc_create_request(
814         "get_schema",
815         json_array_create_1(json_string_create(idl->class->database)),
816         &idl->request_id);
817     jsonrpc_session_send(idl->session, msg);
818 }
819
820 static void
821 log_error(struct ovsdb_error *error)
822 {
823     char *s = ovsdb_error_to_string(error);
824     VLOG_WARN("error parsing database schema: %s", s);
825     free(s);
826     ovsdb_error_destroy(error);
827 }
828
829 /* Frees 'schema', which is in the format returned by parse_schema(). */
830 static void
831 free_schema(struct shash *schema)
832 {
833     if (schema) {
834         struct shash_node *node, *next;
835
836         SHASH_FOR_EACH_SAFE (node, next, schema) {
837             struct sset *sset = node->data;
838             sset_destroy(sset);
839             free(sset);
840             shash_delete(schema, node);
841         }
842         shash_destroy(schema);
843         free(schema);
844     }
845 }
846
847 /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
848  * 7047, to obtain the names of its rows and columns.  If successful, returns
849  * an shash whose keys are table names and whose values are ssets, where each
850  * sset contains the names of its table's columns.  On failure (due to a parse
851  * error), returns NULL.
852  *
853  * It would also be possible to use the general-purpose OVSDB schema parser in
854  * ovsdb-server, but that's overkill, possibly too strict for the current use
855  * case, and would require restructuring ovsdb-server to separate the schema
856  * code from the rest. */
857 static struct shash *
858 parse_schema(const struct json *schema_json)
859 {
860     struct ovsdb_parser parser;
861     const struct json *tables_json;
862     struct ovsdb_error *error;
863     struct shash_node *node;
864     struct shash *schema;
865
866     ovsdb_parser_init(&parser, schema_json, "database schema");
867     tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
868     error = ovsdb_parser_destroy(&parser);
869     if (error) {
870         log_error(error);
871         return NULL;
872     }
873
874     schema = xmalloc(sizeof *schema);
875     shash_init(schema);
876     SHASH_FOR_EACH (node, json_object(tables_json)) {
877         const char *table_name = node->name;
878         const struct json *json = node->data;
879         const struct json *columns_json;
880
881         ovsdb_parser_init(&parser, json, "table schema for table %s",
882                           table_name);
883         columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
884         error = ovsdb_parser_destroy(&parser);
885         if (error) {
886             log_error(error);
887             free_schema(schema);
888             return NULL;
889         }
890
891         struct sset *columns = xmalloc(sizeof *columns);
892         sset_init(columns);
893
894         struct shash_node *node2;
895         SHASH_FOR_EACH (node2, json_object(columns_json)) {
896             const char *column_name = node2->name;
897             sset_add(columns, column_name);
898         }
899         shash_add(schema, table_name, columns);
900     }
901     return schema;
902 }
903
904 static void
905 ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
906                                  const char *method)
907 {
908     struct shash *schema;
909     struct json *monitor_requests;
910     struct jsonrpc_msg *msg;
911     size_t i;
912
913     schema = parse_schema(idl->schema);
914     monitor_requests = json_object_create();
915     for (i = 0; i < idl->class->n_tables; i++) {
916         const struct ovsdb_idl_table *table = &idl->tables[i];
917         const struct ovsdb_idl_table_class *tc = table->class;
918         struct json *monitor_request, *columns;
919         const struct sset *table_schema;
920         size_t j;
921
922         table_schema = (schema
923                         ? shash_find_data(schema, table->class->name)
924                         : NULL);
925
926         columns = table->need_table ? json_array_create_empty() : NULL;
927         for (j = 0; j < tc->n_columns; j++) {
928             const struct ovsdb_idl_column *column = &tc->columns[j];
929             if (table->modes[j] & OVSDB_IDL_MONITOR) {
930                 if (table_schema
931                     && !sset_contains(table_schema, column->name)) {
932                     VLOG_WARN("%s table in %s database lacks %s column "
933                               "(database needs upgrade?)",
934                               table->class->name, idl->class->database,
935                               column->name);
936                     continue;
937                 }
938                 if (!columns) {
939                     columns = json_array_create_empty();
940                 }
941                 json_array_add(columns, json_string_create(column->name));
942             }
943         }
944
945         if (columns) {
946             if (schema && !table_schema) {
947                 VLOG_WARN("%s database lacks %s table "
948                           "(database needs upgrade?)",
949                           idl->class->database, table->class->name);
950                 json_destroy(columns);
951                 continue;
952             }
953
954             monitor_request = json_object_create();
955             json_object_put(monitor_request, "columns", columns);
956             json_object_put(monitor_requests, tc->name, monitor_request);
957         }
958     }
959     free_schema(schema);
960
961     json_destroy(idl->request_id);
962     msg = jsonrpc_create_request(
963         method,
964         json_array_create_3(json_string_create(idl->class->database),
965                             json_null_create(), monitor_requests),
966         &idl->request_id);
967     jsonrpc_session_send(idl->session, msg);
968 }
969
970 static void
971 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
972 {
973     ovsdb_idl_send_monitor_request__(idl, "monitor");
974 }
975
976 static void
977 log_parse_update_error(struct ovsdb_error *error)
978 {
979         if (!VLOG_DROP_WARN(&syntax_rl)) {
980             char *s = ovsdb_error_to_string(error);
981             VLOG_WARN_RL(&syntax_rl, "%s", s);
982             free(s);
983         }
984         ovsdb_error_destroy(error);
985 }
986
987 static void
988 ovsdb_idl_send_monitor2_request(struct ovsdb_idl *idl)
989 {
990     ovsdb_idl_send_monitor_request__(idl, "monitor2");
991 }
992
993 static void
994 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates,
995                        enum ovsdb_update_version version)
996 {
997     struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates,
998                                                          version);
999     if (error) {
1000         log_parse_update_error(error);
1001     }
1002 }
1003
1004 static struct ovsdb_error *
1005 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
1006                          const struct json *table_updates,
1007                          enum ovsdb_update_version version)
1008 {
1009     const struct shash_node *tables_node;
1010     const char *table_updates_name = table_updates_names[version];
1011     const char *table_update_name = table_update_names[version];
1012     const char *row_update_name = row_update_names[version];
1013
1014     if (table_updates->type != JSON_OBJECT) {
1015         return ovsdb_syntax_error(table_updates, NULL,
1016                                   "<%s> is not an object",
1017                                   table_updates_name);
1018     }
1019
1020     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
1021         const struct json *table_update = tables_node->data;
1022         const struct shash_node *table_node;
1023         struct ovsdb_idl_table *table;
1024
1025         table = shash_find_data(&idl->table_by_name, tables_node->name);
1026         if (!table) {
1027             return ovsdb_syntax_error(
1028                 table_updates, NULL,
1029                 "<%s> includes unknown table \"%s\"",
1030                 table_updates_name,
1031                 tables_node->name);
1032         }
1033
1034         if (table_update->type != JSON_OBJECT) {
1035             return ovsdb_syntax_error(table_update, NULL,
1036                                       "<%s> for table \"%s\" is "
1037                                       "not an object",
1038                                       table_update_name,
1039                                       table->class->name);
1040         }
1041         SHASH_FOR_EACH (table_node, json_object(table_update)) {
1042             const struct json *row_update = table_node->data;
1043             const struct json *old_json, *new_json;
1044             struct uuid uuid;
1045
1046             if (!uuid_from_string(&uuid, table_node->name)) {
1047                 return ovsdb_syntax_error(table_update, NULL,
1048                                           "<%s> for table \"%s\" "
1049                                           "contains bad UUID "
1050                                           "\"%s\" as member name",
1051                                           table_update_name,
1052                                           table->class->name,
1053                                           table_node->name);
1054             }
1055             if (row_update->type != JSON_OBJECT) {
1056                 return ovsdb_syntax_error(row_update, NULL,
1057                                           "<%s> for table \"%s\" "
1058                                           "contains <%s> for %s that "
1059                                           "is not an object",
1060                                           table_update_name,
1061                                           table->class->name,
1062                                           row_update_name,
1063                                           table_node->name);
1064             }
1065
1066             switch(version) {
1067             case OVSDB_UPDATE:
1068                 old_json = shash_find_data(json_object(row_update), "old");
1069                 new_json = shash_find_data(json_object(row_update), "new");
1070                 if (old_json && old_json->type != JSON_OBJECT) {
1071                     return ovsdb_syntax_error(old_json, NULL,
1072                                               "\"old\" <row> is not object");
1073                 } else if (new_json && new_json->type != JSON_OBJECT) {
1074                     return ovsdb_syntax_error(new_json, NULL,
1075                                               "\"new\" <row> is not object");
1076                 } else if ((old_json != NULL) + (new_json != NULL)
1077                            != shash_count(json_object(row_update))) {
1078                     return ovsdb_syntax_error(row_update, NULL,
1079                                               "<row-update> contains "
1080                                               "unexpected member");
1081                 } else if (!old_json && !new_json) {
1082                     return ovsdb_syntax_error(row_update, NULL,
1083                                               "<row-update> missing \"old\" "
1084                                               "and \"new\" members");
1085                 }
1086
1087                 if (ovsdb_idl_process_update(table, &uuid, old_json,
1088                                              new_json)) {
1089                     idl->change_seqno++;
1090                 }
1091                 break;
1092
1093             case OVSDB_UPDATE2: {
1094                 const char *ops[] = {"modify", "insert", "delete", "initial"};
1095                 const char *operation;
1096                 const struct json *row;
1097                 int i;
1098
1099                 for (i = 0; i < ARRAY_SIZE(ops); i++) {
1100                     operation = ops[i];
1101                     row = shash_find_data(json_object(row_update), operation);
1102
1103                     if (row)  {
1104                         if (ovsdb_idl_process_update2(table, &uuid, operation,
1105                                                       row)) {
1106                             idl->change_seqno++;
1107                         }
1108                         break;
1109                     }
1110                 }
1111
1112                 /* row_update2 should contain one of the objects */
1113                 if (i == ARRAY_SIZE(ops)) {
1114                     return ovsdb_syntax_error(row_update, NULL,
1115                                               "<row_update2> includes unknown "
1116                                               "object");
1117                 }
1118                 break;
1119             }
1120
1121             default:
1122                 OVS_NOT_REACHED();
1123             }
1124         }
1125     }
1126
1127     return NULL;
1128 }
1129
1130 static struct ovsdb_idl_row *
1131 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
1132 {
1133     struct ovsdb_idl_row *row;
1134
1135     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &table->rows) {
1136         if (uuid_equals(&row->uuid, uuid)) {
1137             return row;
1138         }
1139     }
1140     return NULL;
1141 }
1142
1143 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1144  * otherwise. */
1145 static bool
1146 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
1147                          const struct uuid *uuid, const struct json *old,
1148                          const struct json *new)
1149 {
1150     struct ovsdb_idl_row *row;
1151
1152     row = ovsdb_idl_get_row(table, uuid);
1153     if (!new) {
1154         /* Delete row. */
1155         if (row && !ovsdb_idl_row_is_orphan(row)) {
1156             /* XXX perhaps we should check the 'old' values? */
1157             ovsdb_idl_delete_row(row);
1158         } else {
1159             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1160                          "from table %s",
1161                          UUID_ARGS(uuid), table->class->name);
1162             return false;
1163         }
1164     } else if (!old) {
1165         /* Insert row. */
1166         if (!row) {
1167             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1168         } else if (ovsdb_idl_row_is_orphan(row)) {
1169             ovsdb_idl_insert_row(row, new);
1170         } else {
1171             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1172                          "table %s", UUID_ARGS(uuid), table->class->name);
1173             return ovsdb_idl_modify_row(row, new);
1174         }
1175     } else {
1176         /* Modify row. */
1177         if (row) {
1178             /* XXX perhaps we should check the 'old' values? */
1179             if (!ovsdb_idl_row_is_orphan(row)) {
1180                 return ovsdb_idl_modify_row(row, new);
1181             } else {
1182                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1183                              "referenced row "UUID_FMT" in table %s",
1184                              UUID_ARGS(uuid), table->class->name);
1185                 ovsdb_idl_insert_row(row, new);
1186             }
1187         } else {
1188             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1189                          "in table %s", UUID_ARGS(uuid), table->class->name);
1190             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1191         }
1192     }
1193
1194     return true;
1195 }
1196
1197 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1198  * otherwise. */
1199 static bool
1200 ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
1201                           const struct uuid *uuid,
1202                           const char *operation,
1203                           const struct json *json_row)
1204 {
1205     struct ovsdb_idl_row *row;
1206
1207     row = ovsdb_idl_get_row(table, uuid);
1208     if (!strcmp(operation, "delete")) {
1209         /* Delete row. */
1210         if (row && !ovsdb_idl_row_is_orphan(row)) {
1211             ovsdb_idl_delete_row(row);
1212         } else {
1213             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1214                          "from table %s",
1215                          UUID_ARGS(uuid), table->class->name);
1216             return false;
1217         }
1218     } else if (!strcmp(operation, "insert") || !strcmp(operation, "initial")) {
1219         /* Insert row. */
1220         if (!row) {
1221             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), json_row);
1222         } else if (ovsdb_idl_row_is_orphan(row)) {
1223             ovsdb_idl_insert_row(row, json_row);
1224         } else {
1225             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1226                          "table %s", UUID_ARGS(uuid), table->class->name);
1227             ovsdb_idl_delete_row(row);
1228             ovsdb_idl_insert_row(row, json_row);
1229         }
1230     } else if (!strcmp(operation, "modify")) {
1231         /* Modify row. */
1232         if (row) {
1233             if (!ovsdb_idl_row_is_orphan(row)) {
1234                 return ovsdb_idl_modify_row_by_diff(row, json_row);
1235             } else {
1236                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1237                              "referenced row "UUID_FMT" in table %s",
1238                              UUID_ARGS(uuid), table->class->name);
1239                 return false;
1240             }
1241         } else {
1242             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1243                          "in table %s", UUID_ARGS(uuid), table->class->name);
1244             return false;
1245         }
1246     } else {
1247             VLOG_WARN_RL(&semantic_rl, "unknown operation %s to "
1248                          "table %s", operation, table->class->name);
1249             return false;
1250     }
1251
1252     return true;
1253 }
1254
1255 #if 0
1256 ovsdb_idl_row_apply_diff(struct ovsdb_idl_row *row,
1257                          const struct json *diff_json)
1258 {
1259     struct ovsdb_idl_table *table = row->table;
1260     struct shash_node *node;
1261     bool changed = false;
1262
1263     SHASH_FOR_EACH (node, json_object(diff_json)) {
1264         const char *column_name = node->name;
1265         const struct ovsdb_idl_column *column;
1266         struct ovsdb_datum diff;
1267         struct ovsdb_error *error;
1268
1269         column = shash_find_data(&table->columns, column_name);
1270         if (!column) {
1271             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
1272                          column_name, UUID_ARGS(&row->uuid));
1273             continue;
1274         }
1275
1276         error = ovsdb_transient_datum_from_json(&diff, &column->type,
1277                                                 node->data);
1278         if (!error) {
1279             unsigned int column_idx = column - table->class->columns;
1280             struct ovsdb_datum *old = &row->old[column_idx];
1281             struct ovsdb_datum new;
1282             struct ovsdb_error *error;
1283
1284             error = ovsdb_datum_apply_diff(&new, old, &diff, &column->type);
1285             if (error) {
1286                 VLOG_WARN_RL(&syntax_rl, "update2 failed to modify column "
1287                              "%s row "UUID_FMT, column_name,
1288                              UUID_ARGS(&row->uuid));
1289                 ovsdb_error_destroy(error);
1290             } else {
1291                 ovsdb_datum_swap(old, &new);
1292                 ovsdb_datum_destroy(&new, &column->type);
1293                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
1294                     changed = true;
1295                 }
1296             }
1297             ovsdb_datum_destroy(&diff, &column->type);
1298         } else {
1299             char *s = ovsdb_error_to_string(error);
1300             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
1301                          " in table %s: %s", column_name,
1302                          UUID_ARGS(&row->uuid), table->class->name, s);
1303             free(s);
1304             ovsdb_error_destroy(error);
1305         }
1306     }
1307     return changed;
1308 }
1309 #endif
1310
1311 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1312  * otherwise.
1313  *
1314  * Change 'row' either with the content of 'row_json' or by apply 'diff'.
1315  * Caller needs to provide either valid 'row_json' or 'diff', but not
1316  * both.  */
1317 static bool
1318 ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
1319                        const struct json *diff_json,
1320                        enum ovsdb_idl_change change)
1321 {
1322     struct ovsdb_idl_table *table = row->table;
1323     struct shash_node *node;
1324     bool changed = false;
1325     bool apply_diff = diff_json != NULL;
1326     const struct json *json = apply_diff ? diff_json : row_json;
1327
1328     SHASH_FOR_EACH (node, json_object(json)) {
1329         const char *column_name = node->name;
1330         const struct ovsdb_idl_column *column;
1331         struct ovsdb_datum datum;
1332         struct ovsdb_error *error;
1333         unsigned int column_idx;
1334         struct ovsdb_datum *old;
1335
1336         column = shash_find_data(&table->columns, column_name);
1337         if (!column) {
1338             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
1339                          column_name, UUID_ARGS(&row->uuid));
1340             continue;
1341         }
1342
1343         column_idx = column - table->class->columns;
1344         old = &row->old[column_idx];
1345
1346         error = NULL;
1347         if (apply_diff) {
1348             struct ovsdb_datum diff;
1349
1350             ovs_assert(!row_json);
1351             error = ovsdb_transient_datum_from_json(&diff, &column->type,
1352                                                     node->data);
1353             if (!error) {
1354                 error = ovsdb_datum_apply_diff(&datum, old, &diff,
1355                                                &column->type);
1356                 ovsdb_datum_destroy(&diff, &column->type);
1357             }
1358         } else {
1359             ovs_assert(!diff_json);
1360             error = ovsdb_datum_from_json(&datum, &column->type, node->data,
1361                                           NULL);
1362         }
1363
1364         if (!error) {
1365             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
1366                 ovsdb_datum_swap(old, &datum);
1367                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
1368                     changed = true;
1369                     row->change_seqno[change]
1370                         = row->table->change_seqno[change]
1371                         = row->table->idl->change_seqno + 1;
1372                     if (table->modes[column_idx] & OVSDB_IDL_TRACK) {
1373                         if (list_is_empty(&row->track_node)) {
1374                             list_push_front(&row->table->track_list,
1375                                             &row->track_node);
1376                         }
1377                     }
1378                 }
1379             } else {
1380                 /* Didn't really change but the OVSDB monitor protocol always
1381                  * includes every value in a row. */
1382             }
1383
1384             ovsdb_datum_destroy(&datum, &column->type);
1385         } else {
1386             char *s = ovsdb_error_to_string(error);
1387             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
1388                          " in table %s: %s", column_name,
1389                          UUID_ARGS(&row->uuid), table->class->name, s);
1390             free(s);
1391             ovsdb_error_destroy(error);
1392         }
1393     }
1394     return changed;
1395 }
1396
1397 static bool
1398 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json,
1399                      enum ovsdb_idl_change change)
1400 {
1401     return ovsdb_idl_row_change__(row, row_json, NULL, change);
1402 }
1403
1404 static bool
1405 ovsdb_idl_row_apply_diff(struct ovsdb_idl_row *row,
1406                          const struct json *diff_json,
1407                          enum ovsdb_idl_change change)
1408 {
1409     return ovsdb_idl_row_change__(row, NULL, diff_json, change);
1410 }
1411
1412 /* When a row A refers to row B through a column with a "refTable" constraint,
1413  * but row B does not exist, row B is called an "orphan row".  Orphan rows
1414  * should not persist, because the database enforces referential integrity, but
1415  * they can appear transiently as changes from the database are received (the
1416  * database doesn't try to topologically sort them and circular references mean
1417  * it isn't always possible anyhow).
1418  *
1419  * This function returns true if 'row' is an orphan row, otherwise false.
1420  */
1421 static bool
1422 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
1423 {
1424     return !row->old && !row->new;
1425 }
1426
1427 /* Returns true if 'row' is conceptually part of the database as modified by
1428  * the current transaction (if any), false otherwise.
1429  *
1430  * This function will return true if 'row' is not an orphan (see the comment on
1431  * ovsdb_idl_row_is_orphan()) and:
1432  *
1433  *   - 'row' exists in the database and has not been deleted within the
1434  *     current transaction (if any).
1435  *
1436  *   - 'row' was inserted within the current transaction and has not been
1437  *     deleted.  (In the latter case you should not have passed 'row' in at
1438  *     all, because ovsdb_idl_txn_delete() freed it.)
1439  *
1440  * This function will return false if 'row' is an orphan or if 'row' was
1441  * deleted within the current transaction.
1442  */
1443 static bool
1444 ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
1445 {
1446     return row->new != NULL;
1447 }
1448
1449 static void
1450 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
1451 {
1452     const struct ovsdb_idl_table_class *class = row->table->class;
1453     size_t i;
1454
1455     for (i = 0; i < class->n_columns; i++) {
1456         const struct ovsdb_idl_column *c = &class->columns[i];
1457         (c->parse)(row, &row->old[i]);
1458     }
1459 }
1460
1461 static void
1462 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
1463 {
1464     const struct ovsdb_idl_table_class *class = row->table->class;
1465     size_t i;
1466
1467     for (i = 0; i < class->n_columns; i++) {
1468         const struct ovsdb_idl_column *c = &class->columns[i];
1469         (c->unparse)(row);
1470     }
1471 }
1472
1473 static void
1474 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
1475 {
1476     ovs_assert(row->old == row->new);
1477     if (!ovsdb_idl_row_is_orphan(row)) {
1478         const struct ovsdb_idl_table_class *class = row->table->class;
1479         size_t i;
1480
1481         for (i = 0; i < class->n_columns; i++) {
1482             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
1483         }
1484         free(row->old);
1485         row->old = row->new = NULL;
1486     }
1487 }
1488
1489 static void
1490 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
1491 {
1492     if (row->old != row->new) {
1493         if (row->new) {
1494             const struct ovsdb_idl_table_class *class = row->table->class;
1495             size_t i;
1496
1497             if (row->written) {
1498                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
1499                     ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
1500                 }
1501             }
1502             free(row->new);
1503             free(row->written);
1504             row->written = NULL;
1505         }
1506         row->new = row->old;
1507     }
1508 }
1509
1510 static void
1511 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
1512 {
1513     struct ovsdb_idl_arc *arc, *next;
1514
1515     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
1516      * that this causes to be unreferenced, if tracking is not enabled.
1517      * If tracking is enabled, orphaned nodes are removed from hmap but not
1518      * freed.
1519      */
1520     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
1521         list_remove(&arc->dst_node);
1522         if (destroy_dsts
1523             && ovsdb_idl_row_is_orphan(arc->dst)
1524             && list_is_empty(&arc->dst->dst_arcs)) {
1525             ovsdb_idl_row_destroy(arc->dst);
1526         }
1527         free(arc);
1528     }
1529     list_init(&row->src_arcs);
1530 }
1531
1532 /* Force nodes that reference 'row' to reparse. */
1533 static void
1534 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
1535 {
1536     struct ovsdb_idl_arc *arc, *next;
1537
1538     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
1539      * 'arc', so we need to use the "safe" variant of list traversal.  However,
1540      * calling an ovsdb_idl_column's 'parse' function will add an arc
1541      * equivalent to 'arc' to row->arcs.  That could be a problem for
1542      * traversal, but it adds it at the beginning of the list to prevent us
1543      * from stumbling upon it again.
1544      *
1545      * (If duplicate arcs were possible then we would need to make sure that
1546      * 'next' didn't also point into 'arc''s destination, but we forbid
1547      * duplicate arcs.) */
1548     LIST_FOR_EACH_SAFE (arc, next, dst_node, &row->dst_arcs) {
1549         struct ovsdb_idl_row *ref = arc->src;
1550
1551         ovsdb_idl_row_unparse(ref);
1552         ovsdb_idl_row_clear_arcs(ref, false);
1553         ovsdb_idl_row_parse(ref);
1554     }
1555 }
1556
1557 static struct ovsdb_idl_row *
1558 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
1559 {
1560     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
1561     class->row_init(row);
1562     list_init(&row->src_arcs);
1563     list_init(&row->dst_arcs);
1564     hmap_node_nullify(&row->txn_node);
1565     list_init(&row->track_node);
1566     return row;
1567 }
1568
1569 static struct ovsdb_idl_row *
1570 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
1571 {
1572     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
1573     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
1574     row->uuid = *uuid;
1575     row->table = table;
1576     return row;
1577 }
1578
1579 static void
1580 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
1581 {
1582     if (row) {
1583         ovsdb_idl_row_clear_old(row);
1584         hmap_remove(&row->table->rows, &row->hmap_node);
1585         if (ovsdb_idl_track_is_set(row->table)) {
1586             row->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1587                 = row->table->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1588                 = row->table->idl->change_seqno + 1;
1589         }
1590         if (list_is_empty(&row->track_node)) {
1591             list_push_front(&row->table->track_list, &row->track_node);
1592         }
1593     }
1594 }
1595
1596 static void
1597 ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
1598 {
1599     size_t i;
1600
1601     for (i = 0; i < idl->class->n_tables; i++) {
1602         struct ovsdb_idl_table *table = &idl->tables[i];
1603
1604         if (!list_is_empty(&table->track_list)) {
1605             struct ovsdb_idl_row *row, *next;
1606
1607             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
1608                 if (!ovsdb_idl_track_is_set(row->table)) {
1609                     list_remove(&row->track_node);
1610                     free(row);
1611                 }
1612             }
1613         }
1614     }
1615 }
1616
1617 static void
1618 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
1619 {
1620     const struct ovsdb_idl_table_class *class = row->table->class;
1621     size_t i;
1622
1623     ovs_assert(!row->old && !row->new);
1624     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
1625     for (i = 0; i < class->n_columns; i++) {
1626         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
1627     }
1628     ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_INSERT);
1629     ovsdb_idl_row_parse(row);
1630
1631     ovsdb_idl_row_reparse_backrefs(row);
1632 }
1633
1634 static void
1635 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
1636 {
1637     ovsdb_idl_row_unparse(row);
1638     ovsdb_idl_row_clear_arcs(row, true);
1639     ovsdb_idl_row_clear_old(row);
1640     if (list_is_empty(&row->dst_arcs)) {
1641         ovsdb_idl_row_destroy(row);
1642     } else {
1643         ovsdb_idl_row_reparse_backrefs(row);
1644     }
1645 }
1646
1647 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1648  * otherwise. */
1649 static bool
1650 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
1651 {
1652     bool changed;
1653
1654     ovsdb_idl_row_unparse(row);
1655     ovsdb_idl_row_clear_arcs(row, true);
1656     changed = ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_MODIFY);
1657     ovsdb_idl_row_parse(row);
1658
1659     return changed;
1660 }
1661
1662 static bool
1663 ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *row,
1664                              const struct json *diff_json)
1665 {
1666     bool changed;
1667
1668     ovsdb_idl_row_unparse(row);
1669     ovsdb_idl_row_clear_arcs(row, true);
1670     changed = ovsdb_idl_row_apply_diff(row, diff_json,
1671                                        OVSDB_IDL_CHANGE_MODIFY);
1672     ovsdb_idl_row_parse(row);
1673
1674     return changed;
1675 }
1676
1677 static bool
1678 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
1679 {
1680     const struct ovsdb_idl_arc *arc;
1681
1682     /* No self-arcs. */
1683     if (src == dst) {
1684         return false;
1685     }
1686
1687     /* No duplicate arcs.
1688      *
1689      * We only need to test whether the first arc in dst->dst_arcs originates
1690      * at 'src', since we add all of the arcs from a given source in a clump
1691      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
1692      * added at the front of the dst_arcs list. */
1693     if (list_is_empty(&dst->dst_arcs)) {
1694         return true;
1695     }
1696     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
1697     return arc->src != src;
1698 }
1699
1700 static struct ovsdb_idl_table *
1701 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
1702                            const struct ovsdb_idl_table_class *table_class)
1703 {
1704     return &idl->tables[table_class - idl->class->tables];
1705 }
1706
1707 /* Called by ovsdb-idlc generated code. */
1708 struct ovsdb_idl_row *
1709 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
1710                       struct ovsdb_idl_table_class *dst_table_class,
1711                       const struct uuid *dst_uuid)
1712 {
1713     struct ovsdb_idl *idl = src->table->idl;
1714     struct ovsdb_idl_table *dst_table;
1715     struct ovsdb_idl_arc *arc;
1716     struct ovsdb_idl_row *dst;
1717
1718     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
1719     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
1720     if (idl->txn) {
1721         /* We're being called from ovsdb_idl_txn_write().  We must not update
1722          * any arcs, because the transaction will be backed out at commit or
1723          * abort time and we don't want our graph screwed up.
1724          *
1725          * Just return the destination row, if there is one and it has not been
1726          * deleted. */
1727         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
1728             return dst;
1729         }
1730         return NULL;
1731     } else {
1732         /* We're being called from some other context.  Update the graph. */
1733         if (!dst) {
1734             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
1735         }
1736
1737         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
1738         if (may_add_arc(src, dst)) {
1739             /* The arc *must* be added at the front of the dst_arcs list.  See
1740              * ovsdb_idl_row_reparse_backrefs() for details. */
1741             arc = xmalloc(sizeof *arc);
1742             list_push_front(&src->src_arcs, &arc->src_node);
1743             list_push_front(&dst->dst_arcs, &arc->dst_node);
1744             arc->src = src;
1745             arc->dst = dst;
1746         }
1747
1748         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
1749     }
1750 }
1751
1752 /* Searches 'tc''s table in 'idl' for a row with UUID 'uuid'.  Returns a
1753  * pointer to the row if there is one, otherwise a null pointer.  */
1754 const struct ovsdb_idl_row *
1755 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
1756                            const struct ovsdb_idl_table_class *tc,
1757                            const struct uuid *uuid)
1758 {
1759     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
1760 }
1761
1762 static struct ovsdb_idl_row *
1763 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
1764 {
1765     for (; node; node = hmap_next(&table->rows, node)) {
1766         struct ovsdb_idl_row *row;
1767
1768         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
1769         if (ovsdb_idl_row_exists(row)) {
1770             return row;
1771         }
1772     }
1773     return NULL;
1774 }
1775
1776 /* Returns a row in 'table_class''s table in 'idl', or a null pointer if that
1777  * table is empty.
1778  *
1779  * Database tables are internally maintained as hash tables, so adding or
1780  * removing rows while traversing the same table can cause some rows to be
1781  * visited twice or not at apply. */
1782 const struct ovsdb_idl_row *
1783 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
1784                     const struct ovsdb_idl_table_class *table_class)
1785 {
1786     struct ovsdb_idl_table *table
1787         = ovsdb_idl_table_from_class(idl, table_class);
1788     return next_real_row(table, hmap_first(&table->rows));
1789 }
1790
1791 /* Returns a row following 'row' within its table, or a null pointer if 'row'
1792  * is the last row in its table. */
1793 const struct ovsdb_idl_row *
1794 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
1795 {
1796     struct ovsdb_idl_table *table = row->table;
1797
1798     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
1799 }
1800
1801 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
1802  * transaction has changed 'column''s value, the modified value is returned.
1803  *
1804  * The caller must not modify or free the returned value.
1805  *
1806  * Various kinds of changes can invalidate the returned value: writing to the
1807  * same 'column' in 'row' (e.g. with ovsdb_idl_txn_write()), deleting 'row'
1808  * (e.g. with ovsdb_idl_txn_delete()), or completing an ongoing transaction
1809  * (e.g. with ovsdb_idl_txn_commit() or ovsdb_idl_txn_abort()).  If the
1810  * returned value is needed for a long time, it is best to make a copy of it
1811  * with ovsdb_datum_clone(). */
1812 const struct ovsdb_datum *
1813 ovsdb_idl_read(const struct ovsdb_idl_row *row,
1814                const struct ovsdb_idl_column *column)
1815 {
1816     const struct ovsdb_idl_table_class *class;
1817     size_t column_idx;
1818
1819     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
1820
1821     class = row->table->class;
1822     column_idx = column - class->columns;
1823
1824     ovs_assert(row->new != NULL);
1825     ovs_assert(column_idx < class->n_columns);
1826
1827     if (row->written && bitmap_is_set(row->written, column_idx)) {
1828         return &row->new[column_idx];
1829     } else if (row->old) {
1830         return &row->old[column_idx];
1831     } else {
1832         return ovsdb_datum_default(&column->type);
1833     }
1834 }
1835
1836 /* Same as ovsdb_idl_read(), except that it also asserts that 'column' has key
1837  * type 'key_type' and value type 'value_type'.  (Scalar and set types will
1838  * have a value type of OVSDB_TYPE_VOID.)
1839  *
1840  * This is useful in code that "knows" that a particular column has a given
1841  * type, so that it will abort if someone changes the column's type without
1842  * updating the code that uses it. */
1843 const struct ovsdb_datum *
1844 ovsdb_idl_get(const struct ovsdb_idl_row *row,
1845               const struct ovsdb_idl_column *column,
1846               enum ovsdb_atomic_type key_type OVS_UNUSED,
1847               enum ovsdb_atomic_type value_type OVS_UNUSED)
1848 {
1849     ovs_assert(column->type.key.type == key_type);
1850     ovs_assert(column->type.value.type == value_type);
1851
1852     return ovsdb_idl_read(row, column);
1853 }
1854
1855 /* Returns true if the field represented by 'column' in 'row' may be modified,
1856  * false if it is immutable.
1857  *
1858  * Normally, whether a field is mutable is controlled by its column's schema.
1859  * However, an immutable column can be set to any initial value at the time of
1860  * insertion, so if 'row' is a new row (one that is being added as part of the
1861  * current transaction, supposing that a transaction is in progress) then even
1862  * its "immutable" fields are actually mutable. */
1863 bool
1864 ovsdb_idl_is_mutable(const struct ovsdb_idl_row *row,
1865                      const struct ovsdb_idl_column *column)
1866 {
1867     return column->mutable || (row->new && !row->old);
1868 }
1869
1870 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
1871  * to all-zero-bits by some other entity.  If 'row' was set up some other way
1872  * then the return value is indeterminate. */
1873 bool
1874 ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
1875 {
1876     return row->table == NULL;
1877 }
1878 \f
1879 /* Transactions. */
1880
1881 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1882                                    enum ovsdb_idl_txn_status);
1883
1884 /* Returns a string representation of 'status'.  The caller must not modify or
1885  * free the returned string.
1886  *
1887  * The return value is probably useful only for debug log messages and unit
1888  * tests. */
1889 const char *
1890 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
1891 {
1892     switch (status) {
1893     case TXN_UNCOMMITTED:
1894         return "uncommitted";
1895     case TXN_UNCHANGED:
1896         return "unchanged";
1897     case TXN_INCOMPLETE:
1898         return "incomplete";
1899     case TXN_ABORTED:
1900         return "aborted";
1901     case TXN_SUCCESS:
1902         return "success";
1903     case TXN_TRY_AGAIN:
1904         return "try again";
1905     case TXN_NOT_LOCKED:
1906         return "not locked";
1907     case TXN_ERROR:
1908         return "error";
1909     }
1910     return "<unknown>";
1911 }
1912
1913 /* Starts a new transaction on 'idl'.  A given ovsdb_idl may only have a single
1914  * active transaction at a time.  See the large comment in ovsdb-idl.h for
1915  * general information on transactions. */
1916 struct ovsdb_idl_txn *
1917 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
1918 {
1919     struct ovsdb_idl_txn *txn;
1920
1921     ovs_assert(!idl->txn);
1922     idl->txn = txn = xmalloc(sizeof *txn);
1923     txn->request_id = NULL;
1924     txn->idl = idl;
1925     hmap_init(&txn->txn_rows);
1926     txn->status = TXN_UNCOMMITTED;
1927     txn->error = NULL;
1928     txn->dry_run = false;
1929     ds_init(&txn->comment);
1930
1931     txn->inc_table = NULL;
1932     txn->inc_column = NULL;
1933
1934     hmap_init(&txn->inserted_rows);
1935
1936     return txn;
1937 }
1938
1939 /* Appends 's', which is treated as a printf()-type format string, to the
1940  * comments that will be passed to the OVSDB server when 'txn' is committed.
1941  * (The comment will be committed to the OVSDB log, which "ovsdb-tool
1942  * show-log" can print in a relatively human-readable form.) */
1943 void
1944 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s, ...)
1945 {
1946     va_list args;
1947
1948     if (txn->comment.length) {
1949         ds_put_char(&txn->comment, '\n');
1950     }
1951
1952     va_start(args, s);
1953     ds_put_format_valist(&txn->comment, s, args);
1954     va_end(args);
1955 }
1956
1957 /* Marks 'txn' as a transaction that will not actually modify the database.  In
1958  * almost every way, the transaction is treated like other transactions.  It
1959  * must be committed or aborted like other transactions, it will be sent to the
1960  * database server like other transactions, and so on.  The only difference is
1961  * that the operations sent to the database server will include, as the last
1962  * step, an "abort" operation, so that any changes made by the transaction will
1963  * not actually take effect. */
1964 void
1965 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
1966 {
1967     txn->dry_run = true;
1968 }
1969
1970 /* Causes 'txn', when committed, to increment the value of 'column' within
1971  * 'row' by 1.  'column' must have an integer type.  After 'txn' commits
1972  * successfully, the client may retrieve the final (incremented) value of
1973  * 'column' with ovsdb_idl_txn_get_increment_new_value().
1974  *
1975  * The client could accomplish something similar with ovsdb_idl_read(),
1976  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
1977  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
1978  * will never (by itself) fail because of a verify error.
1979  *
1980  * The intended use is for incrementing the "next_cfg" column in the
1981  * Open_vSwitch table. */
1982 void
1983 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
1984                         const struct ovsdb_idl_row *row,
1985                         const struct ovsdb_idl_column *column)
1986 {
1987     ovs_assert(!txn->inc_table);
1988     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
1989     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
1990
1991     txn->inc_table = row->table->class->name;
1992     txn->inc_column = column->name;
1993     txn->inc_row = row->uuid;
1994 }
1995
1996 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
1997  * has been called for 'txn' but the commit is still incomplete (that is, the
1998  * last call returned TXN_INCOMPLETE) then the transaction may or may not still
1999  * end up committing at the database server, but the client will not be able to
2000  * get any further status information back. */
2001 void
2002 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
2003 {
2004     struct ovsdb_idl_txn_insert *insert, *next;
2005
2006     json_destroy(txn->request_id);
2007     if (txn->status == TXN_INCOMPLETE) {
2008         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2009     }
2010     ovsdb_idl_txn_abort(txn);
2011     ds_destroy(&txn->comment);
2012     free(txn->error);
2013     HMAP_FOR_EACH_SAFE (insert, next, hmap_node, &txn->inserted_rows) {
2014         free(insert);
2015     }
2016     hmap_destroy(&txn->inserted_rows);
2017     free(txn);
2018 }
2019
2020 /* Causes poll_block() to wake up if 'txn' has completed committing. */
2021 void
2022 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
2023 {
2024     if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
2025         poll_immediate_wake();
2026     }
2027 }
2028
2029 static struct json *
2030 where_uuid_equals(const struct uuid *uuid)
2031 {
2032     return
2033         json_array_create_1(
2034             json_array_create_3(
2035                 json_string_create("_uuid"),
2036                 json_string_create("=="),
2037                 json_array_create_2(
2038                     json_string_create("uuid"),
2039                     json_string_create_nocopy(
2040                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
2041 }
2042
2043 static char *
2044 uuid_name_from_uuid(const struct uuid *uuid)
2045 {
2046     char *name;
2047     char *p;
2048
2049     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
2050     for (p = name; *p != '\0'; p++) {
2051         if (*p == '-') {
2052             *p = '_';
2053         }
2054     }
2055
2056     return name;
2057 }
2058
2059 static const struct ovsdb_idl_row *
2060 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
2061 {
2062     const struct ovsdb_idl_row *row;
2063
2064     HMAP_FOR_EACH_WITH_HASH (row, txn_node, uuid_hash(uuid), &txn->txn_rows) {
2065         if (uuid_equals(&row->uuid, uuid)) {
2066             return row;
2067         }
2068     }
2069     return NULL;
2070 }
2071
2072 /* XXX there must be a cleaner way to do this */
2073 static struct json *
2074 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
2075 {
2076     if (json->type == JSON_ARRAY) {
2077         struct uuid uuid;
2078         size_t i;
2079
2080         if (json->u.array.n == 2
2081             && json->u.array.elems[0]->type == JSON_STRING
2082             && json->u.array.elems[1]->type == JSON_STRING
2083             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
2084             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
2085             const struct ovsdb_idl_row *row;
2086
2087             row = ovsdb_idl_txn_get_row(txn, &uuid);
2088             if (row && !row->old && row->new) {
2089                 json_destroy(json);
2090
2091                 return json_array_create_2(
2092                     json_string_create("named-uuid"),
2093                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
2094             }
2095         }
2096
2097         for (i = 0; i < json->u.array.n; i++) {
2098             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
2099                                                       txn);
2100         }
2101     } else if (json->type == JSON_OBJECT) {
2102         struct shash_node *node;
2103
2104         SHASH_FOR_EACH (node, json_object(json)) {
2105             node->data = substitute_uuids(node->data, txn);
2106         }
2107     }
2108     return json;
2109 }
2110
2111 static void
2112 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
2113 {
2114     struct ovsdb_idl_row *row, *next;
2115
2116     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
2117      * ovsdb_idl_column's 'parse' function, which will call
2118      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
2119      * transaction and fail to update the graph.  */
2120     txn->idl->txn = NULL;
2121
2122     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
2123         if (row->old) {
2124             if (row->written) {
2125                 ovsdb_idl_row_unparse(row);
2126                 ovsdb_idl_row_clear_arcs(row, false);
2127                 ovsdb_idl_row_parse(row);
2128             }
2129         } else {
2130             ovsdb_idl_row_unparse(row);
2131         }
2132         ovsdb_idl_row_clear_new(row);
2133
2134         free(row->prereqs);
2135         row->prereqs = NULL;
2136
2137         free(row->written);
2138         row->written = NULL;
2139
2140         hmap_remove(&txn->txn_rows, &row->txn_node);
2141         hmap_node_nullify(&row->txn_node);
2142         if (!row->old) {
2143             hmap_remove(&row->table->rows, &row->hmap_node);
2144             free(row);
2145         }
2146     }
2147     hmap_destroy(&txn->txn_rows);
2148     hmap_init(&txn->txn_rows);
2149 }
2150
2151 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
2152  * of the following TXN_* constants:
2153  *
2154  *   TXN_INCOMPLETE:
2155  *
2156  *       The transaction is in progress, but not yet complete.  The caller
2157  *       should call again later, after calling ovsdb_idl_run() to let the IDL
2158  *       do OVSDB protocol processing.
2159  *
2160  *   TXN_UNCHANGED:
2161  *
2162  *       The transaction is complete.  (It didn't actually change the database,
2163  *       so the IDL didn't send any request to the database server.)
2164  *
2165  *   TXN_ABORTED:
2166  *
2167  *       The caller previously called ovsdb_idl_txn_abort().
2168  *
2169  *   TXN_SUCCESS:
2170  *
2171  *       The transaction was successful.  The update made by the transaction
2172  *       (and possibly other changes made by other database clients) should
2173  *       already be visible in the IDL.
2174  *
2175  *   TXN_TRY_AGAIN:
2176  *
2177  *       The transaction failed for some transient reason, e.g. because a
2178  *       "verify" operation reported an inconsistency or due to a network
2179  *       problem.  The caller should wait for a change to the database, then
2180  *       compose a new transaction, and commit the new transaction.
2181  *
2182  *       Use the return value of ovsdb_idl_get_seqno() to wait for a change in
2183  *       the database.  It is important to use its return value *before* the
2184  *       initial call to ovsdb_idl_txn_commit() as the baseline for this
2185  *       purpose, because the change that one should wait for can happen after
2186  *       the initial call but before the call that returns TXN_TRY_AGAIN, and
2187  *       using some other baseline value in that situation could cause an
2188  *       indefinite wait if the database rarely changes.
2189  *
2190  *   TXN_NOT_LOCKED:
2191  *
2192  *       The transaction failed because the IDL has been configured to require
2193  *       a database lock (with ovsdb_idl_set_lock()) but didn't get it yet or
2194  *       has already lost it.
2195  *
2196  * Committing a transaction rolls back all of the changes that it made to the
2197  * IDL's copy of the database.  If the transaction commits successfully, then
2198  * the database server will send an update and, thus, the IDL will be updated
2199  * with the committed changes. */
2200 enum ovsdb_idl_txn_status
2201 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
2202 {
2203     struct ovsdb_idl_row *row;
2204     struct json *operations;
2205     bool any_updates;
2206
2207     if (txn != txn->idl->txn) {
2208         goto coverage_out;
2209     }
2210
2211     /* If we need a lock but don't have it, give up quickly. */
2212     if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
2213         txn->status = TXN_NOT_LOCKED;
2214         goto disassemble_out;
2215     }
2216
2217     operations = json_array_create_1(
2218         json_string_create(txn->idl->class->database));
2219
2220     /* Assert that we have the required lock (avoiding a race). */
2221     if (txn->idl->lock_name) {
2222         struct json *op = json_object_create();
2223         json_array_add(operations, op);
2224         json_object_put_string(op, "op", "assert");
2225         json_object_put_string(op, "lock", txn->idl->lock_name);
2226     }
2227
2228     /* Add prerequisites and declarations of new rows. */
2229     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2230         /* XXX check that deleted rows exist even if no prereqs? */
2231         if (row->prereqs) {
2232             const struct ovsdb_idl_table_class *class = row->table->class;
2233             size_t n_columns = class->n_columns;
2234             struct json *op, *columns, *row_json;
2235             size_t idx;
2236
2237             op = json_object_create();
2238             json_array_add(operations, op);
2239             json_object_put_string(op, "op", "wait");
2240             json_object_put_string(op, "table", class->name);
2241             json_object_put(op, "timeout", json_integer_create(0));
2242             json_object_put(op, "where", where_uuid_equals(&row->uuid));
2243             json_object_put_string(op, "until", "==");
2244             columns = json_array_create_empty();
2245             json_object_put(op, "columns", columns);
2246             row_json = json_object_create();
2247             json_object_put(op, "rows", json_array_create_1(row_json));
2248
2249             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
2250                 const struct ovsdb_idl_column *column = &class->columns[idx];
2251                 json_array_add(columns, json_string_create(column->name));
2252                 json_object_put(row_json, column->name,
2253                                 ovsdb_datum_to_json(&row->old[idx],
2254                                                     &column->type));
2255             }
2256         }
2257     }
2258
2259     /* Add updates. */
2260     any_updates = false;
2261     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2262         const struct ovsdb_idl_table_class *class = row->table->class;
2263
2264         if (!row->new) {
2265             if (class->is_root) {
2266                 struct json *op = json_object_create();
2267                 json_object_put_string(op, "op", "delete");
2268                 json_object_put_string(op, "table", class->name);
2269                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2270                 json_array_add(operations, op);
2271                 any_updates = true;
2272             } else {
2273                 /* Let ovsdb-server decide whether to really delete it. */
2274             }
2275         } else if (row->old != row->new) {
2276             struct json *row_json;
2277             struct json *op;
2278             size_t idx;
2279
2280             op = json_object_create();
2281             json_object_put_string(op, "op", row->old ? "update" : "insert");
2282             json_object_put_string(op, "table", class->name);
2283             if (row->old) {
2284                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2285             } else {
2286                 struct ovsdb_idl_txn_insert *insert;
2287
2288                 any_updates = true;
2289
2290                 json_object_put(op, "uuid-name",
2291                                 json_string_create_nocopy(
2292                                     uuid_name_from_uuid(&row->uuid)));
2293
2294                 insert = xmalloc(sizeof *insert);
2295                 insert->dummy = row->uuid;
2296                 insert->op_index = operations->u.array.n - 1;
2297                 uuid_zero(&insert->real);
2298                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
2299                             uuid_hash(&insert->dummy));
2300             }
2301             row_json = json_object_create();
2302             json_object_put(op, "row", row_json);
2303
2304             if (row->written) {
2305                 BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
2306                     const struct ovsdb_idl_column *column =
2307                                                         &class->columns[idx];
2308
2309                     if (row->old
2310                         || !ovsdb_datum_is_default(&row->new[idx],
2311                                                   &column->type)) {
2312                         json_object_put(row_json, column->name,
2313                                         substitute_uuids(
2314                                             ovsdb_datum_to_json(&row->new[idx],
2315                                                                 &column->type),
2316                                             txn));
2317
2318                         /* If anything really changed, consider it an update.
2319                          * We can't suppress not-really-changed values earlier
2320                          * or transactions would become nonatomic (see the big
2321                          * comment inside ovsdb_idl_txn_write()). */
2322                         if (!any_updates && row->old &&
2323                             !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
2324                                                 &column->type)) {
2325                             any_updates = true;
2326                         }
2327                     }
2328                 }
2329             }
2330
2331             if (!row->old || !shash_is_empty(json_object(row_json))) {
2332                 json_array_add(operations, op);
2333             } else {
2334                 json_destroy(op);
2335             }
2336         }
2337     }
2338
2339     /* Add increment. */
2340     if (txn->inc_table && any_updates) {
2341         struct json *op;
2342
2343         txn->inc_index = operations->u.array.n - 1;
2344
2345         op = json_object_create();
2346         json_object_put_string(op, "op", "mutate");
2347         json_object_put_string(op, "table", txn->inc_table);
2348         json_object_put(op, "where",
2349                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2350                                          txn));
2351         json_object_put(op, "mutations",
2352                         json_array_create_1(
2353                             json_array_create_3(
2354                                 json_string_create(txn->inc_column),
2355                                 json_string_create("+="),
2356                                 json_integer_create(1))));
2357         json_array_add(operations, op);
2358
2359         op = json_object_create();
2360         json_object_put_string(op, "op", "select");
2361         json_object_put_string(op, "table", txn->inc_table);
2362         json_object_put(op, "where",
2363                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2364                                          txn));
2365         json_object_put(op, "columns",
2366                         json_array_create_1(json_string_create(
2367                                                 txn->inc_column)));
2368         json_array_add(operations, op);
2369     }
2370
2371     if (txn->comment.length) {
2372         struct json *op = json_object_create();
2373         json_object_put_string(op, "op", "comment");
2374         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
2375         json_array_add(operations, op);
2376     }
2377
2378     if (txn->dry_run) {
2379         struct json *op = json_object_create();
2380         json_object_put_string(op, "op", "abort");
2381         json_array_add(operations, op);
2382     }
2383
2384     if (!any_updates) {
2385         txn->status = TXN_UNCHANGED;
2386         json_destroy(operations);
2387     } else if (!jsonrpc_session_send(
2388                    txn->idl->session,
2389                    jsonrpc_create_request(
2390                        "transact", operations, &txn->request_id))) {
2391         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
2392                     json_hash(txn->request_id, 0));
2393         txn->status = TXN_INCOMPLETE;
2394     } else {
2395         txn->status = TXN_TRY_AGAIN;
2396     }
2397
2398 disassemble_out:
2399     ovsdb_idl_txn_disassemble(txn);
2400 coverage_out:
2401     switch (txn->status) {
2402     case TXN_UNCOMMITTED:   COVERAGE_INC(txn_uncommitted);    break;
2403     case TXN_UNCHANGED:     COVERAGE_INC(txn_unchanged);      break;
2404     case TXN_INCOMPLETE:    COVERAGE_INC(txn_incomplete);     break;
2405     case TXN_ABORTED:       COVERAGE_INC(txn_aborted);        break;
2406     case TXN_SUCCESS:       COVERAGE_INC(txn_success);        break;
2407     case TXN_TRY_AGAIN:     COVERAGE_INC(txn_try_again);      break;
2408     case TXN_NOT_LOCKED:    COVERAGE_INC(txn_not_locked);     break;
2409     case TXN_ERROR:         COVERAGE_INC(txn_error);          break;
2410     }
2411
2412     return txn->status;
2413 }
2414
2415 /* Attempts to commit 'txn', blocking until the commit either succeeds or
2416  * fails.  Returns the final commit status, which may be any TXN_* value other
2417  * than TXN_INCOMPLETE.
2418  *
2419  * This function calls ovsdb_idl_run() on 'txn''s IDL, so it may cause the
2420  * return value of ovsdb_idl_get_seqno() to change. */
2421 enum ovsdb_idl_txn_status
2422 ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
2423 {
2424     enum ovsdb_idl_txn_status status;
2425
2426     fatal_signal_run();
2427     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
2428         ovsdb_idl_run(txn->idl);
2429         ovsdb_idl_wait(txn->idl);
2430         ovsdb_idl_txn_wait(txn);
2431         poll_block();
2432     }
2433     return status;
2434 }
2435
2436 /* Returns the final (incremented) value of the column in 'txn' that was set to
2437  * be incremented by ovsdb_idl_txn_increment().  'txn' must have committed
2438  * successfully. */
2439 int64_t
2440 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
2441 {
2442     ovs_assert(txn->status == TXN_SUCCESS);
2443     return txn->inc_new_value;
2444 }
2445
2446 /* Aborts 'txn' without sending it to the database server.  This is effective
2447  * only if ovsdb_idl_txn_commit() has not yet been called for 'txn'.
2448  * Otherwise, it has no effect.
2449  *
2450  * Aborting a transaction doesn't free its memory.  Use
2451  * ovsdb_idl_txn_destroy() to do that. */
2452 void
2453 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
2454 {
2455     ovsdb_idl_txn_disassemble(txn);
2456     if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
2457         txn->status = TXN_ABORTED;
2458     }
2459 }
2460
2461 /* Returns a string that reports the error status for 'txn'.  The caller must
2462  * not modify or free the returned string.  A call to ovsdb_idl_txn_destroy()
2463  * for 'txn' may free the returned string.
2464  *
2465  * The return value is ordinarily one of the strings that
2466  * ovsdb_idl_txn_status_to_string() would return, but if the transaction failed
2467  * due to an error reported by the database server, the return value is that
2468  * error. */
2469 const char *
2470 ovsdb_idl_txn_get_error(const struct ovsdb_idl_txn *txn)
2471 {
2472     if (txn->status != TXN_ERROR) {
2473         return ovsdb_idl_txn_status_to_string(txn->status);
2474     } else if (txn->error) {
2475         return txn->error;
2476     } else {
2477         return "no error details available";
2478     }
2479 }
2480
2481 static void
2482 ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn,
2483                              const struct json *json)
2484 {
2485     if (txn->error == NULL) {
2486         txn->error = json_to_string(json, JSSF_SORT);
2487     }
2488 }
2489
2490 /* For transaction 'txn' that completed successfully, finds and returns the
2491  * permanent UUID that the database assigned to a newly inserted row, given the
2492  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
2493  *
2494  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
2495  * if it was assigned by that function and then deleted by
2496  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
2497  * and then deleted within a single transaction are never sent to the database
2498  * server, so it never assigns them a permanent UUID.) */
2499 const struct uuid *
2500 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
2501                               const struct uuid *uuid)
2502 {
2503     const struct ovsdb_idl_txn_insert *insert;
2504
2505     ovs_assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
2506     HMAP_FOR_EACH_IN_BUCKET (insert, hmap_node,
2507                              uuid_hash(uuid), &txn->inserted_rows) {
2508         if (uuid_equals(uuid, &insert->dummy)) {
2509             return &insert->real;
2510         }
2511     }
2512     return NULL;
2513 }
2514
2515 static void
2516 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
2517                        enum ovsdb_idl_txn_status status)
2518 {
2519     txn->status = status;
2520     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2521 }
2522
2523 /* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
2524  * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
2525  * ovs-vswitchd).
2526  *
2527  * 'datum' must have the correct type for its column.  The IDL does not check
2528  * that it meets schema constraints, but ovsdb-server will do so at commit time
2529  * so it had better be correct.
2530  *
2531  * A transaction must be in progress.  Replication of 'column' must not have
2532  * been disabled (by calling ovsdb_idl_omit()).
2533  *
2534  * Usually this function is used indirectly through one of the "set" functions
2535  * generated by ovsdb-idlc.
2536  *
2537  * Takes ownership of what 'datum' points to (and in some cases destroys that
2538  * data before returning) but makes a copy of 'datum' itself.  (Commonly
2539  * 'datum' is on the caller's stack.) */
2540 static void
2541 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
2542                       const struct ovsdb_idl_column *column,
2543                       struct ovsdb_datum *datum, bool owns_datum)
2544 {
2545     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2546     const struct ovsdb_idl_table_class *class;
2547     size_t column_idx;
2548     bool write_only;
2549
2550     if (ovsdb_idl_row_is_synthetic(row)) {
2551         goto discard_datum;
2552     }
2553
2554     class = row->table->class;
2555     column_idx = column - class->columns;
2556     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
2557
2558     ovs_assert(row->new != NULL);
2559     ovs_assert(column_idx < class->n_columns);
2560     ovs_assert(row->old == NULL ||
2561                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2562
2563     if (row->table->idl->verify_write_only && !write_only) {
2564         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
2565                  " explicitly configured not to.", class->name, column->name);
2566         goto discard_datum;
2567     }
2568
2569     /* If this is a write-only column and the datum being written is the same
2570      * as the one already there, just skip the update entirely.  This is worth
2571      * optimizing because we have a lot of columns that get periodically
2572      * refreshed into the database but don't actually change that often.
2573      *
2574      * We don't do this for read/write columns because that would break
2575      * atomicity of transactions--some other client might have written a
2576      * different value in that column since we read it.  (But if a whole
2577      * transaction only does writes of existing values, without making any real
2578      * changes, we will drop the whole transaction later in
2579      * ovsdb_idl_txn_commit().) */
2580     if (write_only && ovsdb_datum_equals(ovsdb_idl_read(row, column),
2581                                          datum, &column->type)) {
2582         goto discard_datum;
2583     }
2584
2585     if (hmap_node_is_null(&row->txn_node)) {
2586         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2587                     uuid_hash(&row->uuid));
2588     }
2589     if (row->old == row->new) {
2590         row->new = xmalloc(class->n_columns * sizeof *row->new);
2591     }
2592     if (!row->written) {
2593         row->written = bitmap_allocate(class->n_columns);
2594     }
2595     if (bitmap_is_set(row->written, column_idx)) {
2596         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
2597     } else {
2598         bitmap_set1(row->written, column_idx);
2599     }
2600     if (owns_datum) {
2601         row->new[column_idx] = *datum;
2602     } else {
2603         ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
2604     }
2605     (column->unparse)(row);
2606     (column->parse)(row, &row->new[column_idx]);
2607     return;
2608
2609 discard_datum:
2610     if (owns_datum) {
2611         ovsdb_datum_destroy(datum, &column->type);
2612     }
2613 }
2614
2615 void
2616 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
2617                     const struct ovsdb_idl_column *column,
2618                     struct ovsdb_datum *datum)
2619 {
2620     ovsdb_idl_txn_write__(row, column, datum, true);
2621 }
2622
2623 void
2624 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
2625                           const struct ovsdb_idl_column *column,
2626                           const struct ovsdb_datum *datum)
2627 {
2628     ovsdb_idl_txn_write__(row, column,
2629                           CONST_CAST(struct ovsdb_datum *, datum), false);
2630 }
2631
2632 /* Causes the original contents of 'column' in 'row_' to be verified as a
2633  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
2634  * changed (or if 'row_' was deleted) between the time that the IDL originally
2635  * read its contents and the time that the transaction commits, then the
2636  * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
2637  * TXN_AGAIN_NOW (depending on whether the database change has already been
2638  * received).
2639  *
2640  * The intention is that, to ensure that no transaction commits based on dirty
2641  * reads, an application should call ovsdb_idl_txn_verify() on each data item
2642  * read as part of a read-modify-write operation.
2643  *
2644  * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
2645  * value of 'column' is already known:
2646  *
2647  *   - If 'row_' is a row created by the current transaction (returned by
2648  *     ovsdb_idl_txn_insert()).
2649  *
2650  *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
2651  *     within the current transaction.
2652  *
2653  * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
2654  * ovsdb_idl_txn_write() for a given read-modify-write.
2655  *
2656  * A transaction must be in progress.
2657  *
2658  * Usually this function is used indirectly through one of the "verify"
2659  * functions generated by ovsdb-idlc. */
2660 void
2661 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
2662                      const struct ovsdb_idl_column *column)
2663 {
2664     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2665     const struct ovsdb_idl_table_class *class;
2666     size_t column_idx;
2667
2668     if (ovsdb_idl_row_is_synthetic(row)) {
2669         return;
2670     }
2671
2672     class = row->table->class;
2673     column_idx = column - class->columns;
2674
2675     ovs_assert(row->new != NULL);
2676     ovs_assert(row->old == NULL ||
2677                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2678     if (!row->old
2679         || (row->written && bitmap_is_set(row->written, column_idx))) {
2680         return;
2681     }
2682
2683     if (hmap_node_is_null(&row->txn_node)) {
2684         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2685                     uuid_hash(&row->uuid));
2686     }
2687     if (!row->prereqs) {
2688         row->prereqs = bitmap_allocate(class->n_columns);
2689     }
2690     bitmap_set1(row->prereqs, column_idx);
2691 }
2692
2693 /* Deletes 'row_' from its table.  May free 'row_', so it must not be
2694  * accessed afterward.
2695  *
2696  * A transaction must be in progress.
2697  *
2698  * Usually this function is used indirectly through one of the "delete"
2699  * functions generated by ovsdb-idlc. */
2700 void
2701 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
2702 {
2703     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2704
2705     if (ovsdb_idl_row_is_synthetic(row)) {
2706         return;
2707     }
2708
2709     ovs_assert(row->new != NULL);
2710     if (!row->old) {
2711         ovsdb_idl_row_unparse(row);
2712         ovsdb_idl_row_clear_new(row);
2713         ovs_assert(!row->prereqs);
2714         hmap_remove(&row->table->rows, &row->hmap_node);
2715         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
2716         free(row);
2717         return;
2718     }
2719     if (hmap_node_is_null(&row->txn_node)) {
2720         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2721                     uuid_hash(&row->uuid));
2722     }
2723     ovsdb_idl_row_clear_new(row);
2724     row->new = NULL;
2725 }
2726
2727 /* Inserts and returns a new row in the table with the specified 'class' in the
2728  * database with open transaction 'txn'.
2729  *
2730  * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
2731  * randomly generated; otherwise 'uuid' should specify a randomly generated
2732  * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
2733  * 'txn' is committed, but the IDL will replace any uses of the provisional
2734  * UUID in the data to be to be committed by the UUID assigned by
2735  * ovsdb-server.
2736  *
2737  * Usually this function is used indirectly through one of the "insert"
2738  * functions generated by ovsdb-idlc. */
2739 const struct ovsdb_idl_row *
2740 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
2741                      const struct ovsdb_idl_table_class *class,
2742                      const struct uuid *uuid)
2743 {
2744     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
2745
2746     if (uuid) {
2747         ovs_assert(!ovsdb_idl_txn_get_row(txn, uuid));
2748         row->uuid = *uuid;
2749     } else {
2750         uuid_generate(&row->uuid);
2751     }
2752
2753     row->table = ovsdb_idl_table_from_class(txn->idl, class);
2754     row->new = xmalloc(class->n_columns * sizeof *row->new);
2755     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
2756     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
2757     return row;
2758 }
2759
2760 static void
2761 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
2762 {
2763     struct ovsdb_idl_txn *txn;
2764
2765     HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
2766         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
2767     }
2768 }
2769
2770 static struct ovsdb_idl_txn *
2771 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
2772 {
2773     struct ovsdb_idl_txn *txn;
2774
2775     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
2776                              json_hash(id, 0), &idl->outstanding_txns) {
2777         if (json_equal(id, txn->request_id)) {
2778             return txn;
2779         }
2780     }
2781     return NULL;
2782 }
2783
2784 static bool
2785 check_json_type(const struct json *json, enum json_type type, const char *name)
2786 {
2787     if (!json) {
2788         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
2789         return false;
2790     } else if (json->type != type) {
2791         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
2792                      name, json_type_to_string(json->type),
2793                      json_type_to_string(type));
2794         return false;
2795     } else {
2796         return true;
2797     }
2798 }
2799
2800 static bool
2801 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
2802                                 const struct json_array *results)
2803 {
2804     struct json *count, *rows, *row, *column;
2805     struct shash *mutate, *select;
2806
2807     if (txn->inc_index + 2 > results->n) {
2808         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2809                      "for increment (has %"PRIuSIZE", needs %u)",
2810                      results->n, txn->inc_index + 2);
2811         return false;
2812     }
2813
2814     /* We know that this is a JSON object because the loop in
2815      * ovsdb_idl_txn_process_reply() checked. */
2816     mutate = json_object(results->elems[txn->inc_index]);
2817     count = shash_find_data(mutate, "count");
2818     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
2819         return false;
2820     }
2821     if (count->u.integer != 1) {
2822         VLOG_WARN_RL(&syntax_rl,
2823                      "\"mutate\" reply \"count\" is %lld instead of 1",
2824                      count->u.integer);
2825         return false;
2826     }
2827
2828     select = json_object(results->elems[txn->inc_index + 1]);
2829     rows = shash_find_data(select, "rows");
2830     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
2831         return false;
2832     }
2833     if (rows->u.array.n != 1) {
2834         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %"PRIuSIZE" elements "
2835                      "instead of 1",
2836                      rows->u.array.n);
2837         return false;
2838     }
2839     row = rows->u.array.elems[0];
2840     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
2841         return false;
2842     }
2843     column = shash_find_data(json_object(row), txn->inc_column);
2844     if (!check_json_type(column, JSON_INTEGER,
2845                          "\"select\" reply inc column")) {
2846         return false;
2847     }
2848     txn->inc_new_value = column->u.integer;
2849     return true;
2850 }
2851
2852 static bool
2853 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
2854                                    const struct json_array *results)
2855 {
2856     static const struct ovsdb_base_type uuid_type = OVSDB_BASE_UUID_INIT;
2857     struct ovsdb_error *error;
2858     struct json *json_uuid;
2859     union ovsdb_atom uuid;
2860     struct shash *reply;
2861
2862     if (insert->op_index >= results->n) {
2863         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2864                      "for insert (has %"PRIuSIZE", needs %u)",
2865                      results->n, insert->op_index);
2866         return false;
2867     }
2868
2869     /* We know that this is a JSON object because the loop in
2870      * ovsdb_idl_txn_process_reply() checked. */
2871     reply = json_object(results->elems[insert->op_index]);
2872     json_uuid = shash_find_data(reply, "uuid");
2873     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
2874         return false;
2875     }
2876
2877     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
2878     if (error) {
2879         char *s = ovsdb_error_to_string(error);
2880         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
2881                      "UUID: %s", s);
2882         free(s);
2883         ovsdb_error_destroy(error);
2884         return false;
2885     }
2886
2887     insert->real = uuid.uuid;
2888
2889     return true;
2890 }
2891
2892 static bool
2893 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
2894                             const struct jsonrpc_msg *msg)
2895 {
2896     struct ovsdb_idl_txn *txn;
2897     enum ovsdb_idl_txn_status status;
2898
2899     txn = ovsdb_idl_txn_find(idl, msg->id);
2900     if (!txn) {
2901         return false;
2902     }
2903
2904     if (msg->type == JSONRPC_ERROR) {
2905         status = TXN_ERROR;
2906     } else if (msg->result->type != JSON_ARRAY) {
2907         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
2908         status = TXN_ERROR;
2909     } else {
2910         struct json_array *ops = &msg->result->u.array;
2911         int hard_errors = 0;
2912         int soft_errors = 0;
2913         int lock_errors = 0;
2914         size_t i;
2915
2916         for (i = 0; i < ops->n; i++) {
2917             struct json *op = ops->elems[i];
2918
2919             if (op->type == JSON_NULL) {
2920                 /* This isn't an error in itself but indicates that some prior
2921                  * operation failed, so make sure that we know about it. */
2922                 soft_errors++;
2923             } else if (op->type == JSON_OBJECT) {
2924                 struct json *error;
2925
2926                 error = shash_find_data(json_object(op), "error");
2927                 if (error) {
2928                     if (error->type == JSON_STRING) {
2929                         if (!strcmp(error->u.string, "timed out")) {
2930                             soft_errors++;
2931                         } else if (!strcmp(error->u.string, "not owner")) {
2932                             lock_errors++;
2933                         } else if (strcmp(error->u.string, "aborted")) {
2934                             hard_errors++;
2935                             ovsdb_idl_txn_set_error_json(txn, op);
2936                         }
2937                     } else {
2938                         hard_errors++;
2939                         ovsdb_idl_txn_set_error_json(txn, op);
2940                         VLOG_WARN_RL(&syntax_rl,
2941                                      "\"error\" in reply is not JSON string");
2942                     }
2943                 }
2944             } else {
2945                 hard_errors++;
2946                 ovsdb_idl_txn_set_error_json(txn, op);
2947                 VLOG_WARN_RL(&syntax_rl,
2948                              "operation reply is not JSON null or object");
2949             }
2950         }
2951
2952         if (!soft_errors && !hard_errors && !lock_errors) {
2953             struct ovsdb_idl_txn_insert *insert;
2954
2955             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
2956                 hard_errors++;
2957             }
2958
2959             HMAP_FOR_EACH (insert, hmap_node, &txn->inserted_rows) {
2960                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
2961                     hard_errors++;
2962                 }
2963             }
2964         }
2965
2966         status = (hard_errors ? TXN_ERROR
2967                   : lock_errors ? TXN_NOT_LOCKED
2968                   : soft_errors ? TXN_TRY_AGAIN
2969                   : TXN_SUCCESS);
2970     }
2971
2972     ovsdb_idl_txn_complete(txn, status);
2973     return true;
2974 }
2975
2976 /* Returns the transaction currently active for 'row''s IDL.  A transaction
2977  * must currently be active. */
2978 struct ovsdb_idl_txn *
2979 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
2980 {
2981     struct ovsdb_idl_txn *txn = row->table->idl->txn;
2982     ovs_assert(txn != NULL);
2983     return txn;
2984 }
2985
2986 /* Returns the IDL on which 'txn' acts. */
2987 struct ovsdb_idl *
2988 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
2989 {
2990     return txn->idl;
2991 }
2992
2993 /* Blocks until 'idl' successfully connects to the remote database and
2994  * retrieves its contents. */
2995 void
2996 ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
2997 {
2998     while (1) {
2999         ovsdb_idl_run(idl);
3000         if (ovsdb_idl_has_ever_connected(idl)) {
3001             return;
3002         }
3003         ovsdb_idl_wait(idl);
3004         poll_block();
3005     }
3006 }
3007 \f
3008 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
3009  * the database server and to avoid modifying the database when the lock cannot
3010  * be acquired (that is, when another client has the same lock).
3011  *
3012  * If 'lock_name' is NULL, drops the locking requirement and releases the
3013  * lock. */
3014 void
3015 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
3016 {
3017     ovs_assert(!idl->txn);
3018     ovs_assert(hmap_is_empty(&idl->outstanding_txns));
3019
3020     if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
3021         /* Release previous lock. */
3022         ovsdb_idl_send_unlock_request(idl);
3023         free(idl->lock_name);
3024         idl->lock_name = NULL;
3025         idl->is_lock_contended = false;
3026     }
3027
3028     if (lock_name && !idl->lock_name) {
3029         /* Acquire new lock. */
3030         idl->lock_name = xstrdup(lock_name);
3031         ovsdb_idl_send_lock_request(idl);
3032     }
3033 }
3034
3035 /* Returns true if 'idl' is configured to obtain a lock and owns that lock.
3036  *
3037  * Locking and unlocking happens asynchronously from the database client's
3038  * point of view, so the information is only useful for optimization (e.g. if
3039  * the client doesn't have the lock then there's no point in trying to write to
3040  * the database). */
3041 bool
3042 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
3043 {
3044     return idl->has_lock;
3045 }
3046
3047 /* Returns true if 'idl' is configured to obtain a lock but the database server
3048  * has indicated that some other client already owns the requested lock. */
3049 bool
3050 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
3051 {
3052     return idl->is_lock_contended;
3053 }
3054
3055 static void
3056 ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
3057 {
3058     if (new_has_lock && !idl->has_lock) {
3059         if (idl->state == IDL_S_MONITORING ||
3060             idl->state == IDL_S_MONITORING2) {
3061             idl->change_seqno++;
3062         } else {
3063             /* We're setting up a session, so don't signal that the database
3064              * changed.  Finalizing the session will increment change_seqno
3065              * anyhow. */
3066         }
3067         idl->is_lock_contended = false;
3068     }
3069     idl->has_lock = new_has_lock;
3070 }
3071
3072 static void
3073 ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
3074                               struct json **idp)
3075 {
3076     ovsdb_idl_update_has_lock(idl, false);
3077
3078     json_destroy(idl->lock_request_id);
3079     idl->lock_request_id = NULL;
3080
3081     if (jsonrpc_session_is_connected(idl->session)) {
3082         struct json *params;
3083
3084         params = json_array_create_1(json_string_create(idl->lock_name));
3085         jsonrpc_session_send(idl->session,
3086                              jsonrpc_create_request(method, params, idp));
3087     }
3088 }
3089
3090 static void
3091 ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
3092 {
3093     ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
3094 }
3095
3096 static void
3097 ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
3098 {
3099     ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
3100 }
3101
3102 static void
3103 ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
3104 {
3105     bool got_lock;
3106
3107     json_destroy(idl->lock_request_id);
3108     idl->lock_request_id = NULL;
3109
3110     if (result->type == JSON_OBJECT) {
3111         const struct json *locked;
3112
3113         locked = shash_find_data(json_object(result), "locked");
3114         got_lock = locked && locked->type == JSON_TRUE;
3115     } else {
3116         got_lock = false;
3117     }
3118
3119     ovsdb_idl_update_has_lock(idl, got_lock);
3120     if (!got_lock) {
3121         idl->is_lock_contended = true;
3122     }
3123 }
3124
3125 static void
3126 ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
3127                             const struct json *params,
3128                             bool new_has_lock)
3129 {
3130     if (idl->lock_name
3131         && params->type == JSON_ARRAY
3132         && json_array(params)->n > 0
3133         && json_array(params)->elems[0]->type == JSON_STRING) {
3134         const char *lock_name = json_string(json_array(params)->elems[0]);
3135
3136         if (!strcmp(idl->lock_name, lock_name)) {
3137             ovsdb_idl_update_has_lock(idl, new_has_lock);
3138             if (!new_has_lock) {
3139                 idl->is_lock_contended = true;
3140             }
3141         }
3142     }
3143 }
3144
3145 void
3146 ovsdb_idl_loop_destroy(struct ovsdb_idl_loop *loop)
3147 {
3148     if (loop) {
3149         ovsdb_idl_destroy(loop->idl);
3150     }
3151 }
3152
3153 struct ovsdb_idl_txn *
3154 ovsdb_idl_loop_run(struct ovsdb_idl_loop *loop)
3155 {
3156     ovsdb_idl_run(loop->idl);
3157     loop->open_txn = (loop->committing_txn
3158                       || ovsdb_idl_get_seqno(loop->idl) == loop->skip_seqno
3159                       ? NULL
3160                       : ovsdb_idl_txn_create(loop->idl));
3161     return loop->open_txn;
3162 }
3163
3164 void
3165 ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
3166 {
3167     if (loop->open_txn) {
3168         loop->committing_txn = loop->open_txn;
3169         loop->open_txn = NULL;
3170
3171         loop->precommit_seqno = ovsdb_idl_get_seqno(loop->idl);
3172     }
3173
3174     struct ovsdb_idl_txn *txn = loop->committing_txn;
3175     if (txn) {
3176         enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(txn);
3177         if (status != TXN_INCOMPLETE) {
3178             switch (status) {
3179             case TXN_TRY_AGAIN:
3180                 /* We want to re-evaluate the database when it's changed from
3181                  * the contents that it had when we started the commit.  (That
3182                  * might have already happened.) */
3183                 loop->skip_seqno = loop->precommit_seqno;
3184                 if (ovsdb_idl_get_seqno(loop->idl) != loop->skip_seqno) {
3185                     poll_immediate_wake();
3186                 }
3187                 break;
3188
3189             case TXN_SUCCESS:
3190                 /* If the database has already changed since we started the
3191                  * commit, re-evaluate it immediately to avoid missing a change
3192                  * for a while. */
3193                 if (ovsdb_idl_get_seqno(loop->idl) != loop->precommit_seqno) {
3194                     poll_immediate_wake();
3195                 }
3196                 break;
3197
3198             case TXN_UNCHANGED:
3199             case TXN_ABORTED:
3200             case TXN_NOT_LOCKED:
3201             case TXN_ERROR:
3202                 break;
3203
3204             case TXN_UNCOMMITTED:
3205             case TXN_INCOMPLETE:
3206                 OVS_NOT_REACHED();
3207
3208             }
3209             ovsdb_idl_txn_destroy(txn);
3210             loop->committing_txn = NULL;
3211         }
3212     }
3213
3214     ovsdb_idl_wait(loop->idl);
3215 }