7e84138d78dc3d9cd7bced7c454ccb74a39410a9
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "coverage.h"
27 #include "dynamic-string.h"
28 #include "fatal-signal.h"
29 #include "json.h"
30 #include "jsonrpc.h"
31 #include "ovsdb/ovsdb.h"
32 #include "ovsdb/table.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-idl-provider.h"
36 #include "ovsdb-parser.h"
37 #include "poll-loop.h"
38 #include "shash.h"
39 #include "sset.h"
40 #include "util.h"
41 #include "openvswitch/vlog.h"
42
43 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
44
45 COVERAGE_DEFINE(txn_uncommitted);
46 COVERAGE_DEFINE(txn_unchanged);
47 COVERAGE_DEFINE(txn_incomplete);
48 COVERAGE_DEFINE(txn_aborted);
49 COVERAGE_DEFINE(txn_success);
50 COVERAGE_DEFINE(txn_try_again);
51 COVERAGE_DEFINE(txn_not_locked);
52 COVERAGE_DEFINE(txn_error);
53
54 /* An arc from one idl_row to another.  When row A contains a UUID that
55  * references row B, this is represented by an arc from A (the source) to B
56  * (the destination).
57  *
58  * Arcs from a row to itself are omitted, that is, src and dst are always
59  * different.
60  *
61  * Arcs are never duplicated, that is, even if there are multiple references
62  * from A to B, there is only a single arc from A to B.
63  *
64  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
65  * A.  Both an arc and its converse may both be present, if each row refers
66  * to the other circularly.
67  *
68  * The source and destination row may be in the same table or in different
69  * tables.
70  */
71 struct ovsdb_idl_arc {
72     struct ovs_list src_node;   /* In src->src_arcs list. */
73     struct ovs_list dst_node;   /* In dst->dst_arcs list. */
74     struct ovsdb_idl_row *src;  /* Source row. */
75     struct ovsdb_idl_row *dst;  /* Destination row. */
76 };
77
78 enum ovsdb_idl_state {
79     IDL_S_SCHEMA_REQUESTED,
80     IDL_S_MONITOR_REQUESTED,
81     IDL_S_MONITORING,
82     IDL_S_MONITOR2_REQUESTED,
83     IDL_S_MONITORING2
84 };
85
86 struct ovsdb_idl {
87     const struct ovsdb_idl_class *class;
88     struct jsonrpc_session *session;
89     struct shash table_by_name;
90     struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
91     unsigned int change_seqno;
92     bool verify_write_only;
93
94     /* Session state. */
95     unsigned int state_seqno;
96     enum ovsdb_idl_state state;
97     struct json *request_id;
98     struct json *schema;
99
100     /* Database locking. */
101     char *lock_name;            /* Name of lock we need, NULL if none. */
102     bool has_lock;              /* Has db server told us we have the lock? */
103     bool is_lock_contended;     /* Has db server told us we can't get lock? */
104     struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
105
106     /* Transaction support. */
107     struct ovsdb_idl_txn *txn;
108     struct hmap outstanding_txns;
109 };
110
111 struct ovsdb_idl_txn {
112     struct hmap_node hmap_node;
113     struct json *request_id;
114     struct ovsdb_idl *idl;
115     struct hmap txn_rows;
116     enum ovsdb_idl_txn_status status;
117     char *error;
118     bool dry_run;
119     struct ds comment;
120
121     /* Increments. */
122     const char *inc_table;
123     const char *inc_column;
124     struct uuid inc_row;
125     unsigned int inc_index;
126     int64_t inc_new_value;
127
128     /* Inserted rows. */
129     struct hmap inserted_rows;  /* Contains "struct ovsdb_idl_txn_insert"s. */
130 };
131
132 struct ovsdb_idl_txn_insert {
133     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
134     struct uuid dummy;          /* Dummy UUID used locally. */
135     int op_index;               /* Index into transaction's operation array. */
136     struct uuid real;           /* Real UUID used by database server. */
137 };
138
139 enum ovsdb_update_version {
140     OVSDB_UPDATE,               /* RFC 7047 "update" method. */
141     OVSDB_UPDATE2               /* "update2" Extension to RFC 7047.
142                                    See ovsdb-server(1) for more information. */
143 };
144
145 /* Name arrays indexed by 'enum ovsdb_update_version'. */
146 static const char *table_updates_names[] = {"table_updates", "table_updates2"};
147 static const char *table_update_names[] = {"table_update", "table_update2"};
148 static const char *row_update_names[] = {"row_update", "row_update2"};
149
150 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
151 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
152
153 static void ovsdb_idl_clear(struct ovsdb_idl *);
154 static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
155 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
156 static void ovsdb_idl_send_monitor2_request(struct ovsdb_idl *);
157 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *,
158                                    enum ovsdb_update_version);
159 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
160                                                     const struct json *,
161                                                     enum ovsdb_update_version);
162 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
163                                      const struct uuid *,
164                                      const struct json *old,
165                                      const struct json *new);
166 static bool ovsdb_idl_process_update2(struct ovsdb_idl_table *,
167                                       const struct uuid *,
168                                       const char *operation,
169                                       const struct json *row);
170 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
171 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
172 static bool ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
173 static bool ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *,
174                                          const struct json *);
175
176 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
177 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
178     const struct ovsdb_idl_table_class *);
179 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
180                                                   const struct uuid *);
181 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
182 static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *);
183
184 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
185 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
186 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
187 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
188 static void ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *, bool destroy_dsts);
189
190 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
191 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
192                                         const struct jsonrpc_msg *msg);
193
194 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
195 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
196 static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
197                                        const struct json *);
198 static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
199                                         const struct json *params,
200                                         bool new_has_lock);
201 static struct ovsdb_idl_table *
202 ovsdb_idl_table_from_class(const struct ovsdb_idl *,
203                            const struct ovsdb_idl_table_class *);
204 static bool ovsdb_idl_track_is_set(struct ovsdb_idl_table *table);
205
206 /* Creates and returns a connection to database 'remote', which should be in a
207  * form acceptable to jsonrpc_session_open().  The connection will maintain an
208  * in-memory replica of the remote database whose schema is described by
209  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
210  * by ovsdb-idlc.)
211  *
212  * Passes 'retry' to jsonrpc_session_open().  See that function for
213  * documentation.
214  *
215  * If 'monitor_everything_by_default' is true, then everything in the remote
216  * database will be replicated by default.  ovsdb_idl_omit() and
217  * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
218  * monitoring.
219  *
220  * If 'monitor_everything_by_default' is false, then no columns or tables will
221  * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
222  * must be used to choose some columns or tables to replicate.
223  */
224 struct ovsdb_idl *
225 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
226                  bool monitor_everything_by_default, bool retry)
227 {
228     struct ovsdb_idl *idl;
229     uint8_t default_mode;
230     size_t i;
231
232     default_mode = (monitor_everything_by_default
233                     ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
234                     : 0);
235
236     idl = xzalloc(sizeof *idl);
237     idl->class = class;
238     idl->session = jsonrpc_session_open(remote, retry);
239     shash_init(&idl->table_by_name);
240     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
241     for (i = 0; i < class->n_tables; i++) {
242         const struct ovsdb_idl_table_class *tc = &class->tables[i];
243         struct ovsdb_idl_table *table = &idl->tables[i];
244         size_t j;
245
246         shash_add_assert(&idl->table_by_name, tc->name, table);
247         table->class = tc;
248         table->modes = xmalloc(tc->n_columns);
249         memset(table->modes, default_mode, tc->n_columns);
250         table->need_table = false;
251         shash_init(&table->columns);
252         for (j = 0; j < tc->n_columns; j++) {
253             const struct ovsdb_idl_column *column = &tc->columns[j];
254
255             shash_add_assert(&table->columns, column->name, column);
256         }
257         hmap_init(&table->rows);
258         list_init(&table->track_list);
259         table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
260             = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
261             = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
262         table->idl = idl;
263     }
264
265     idl->state_seqno = UINT_MAX;
266     idl->request_id = NULL;
267     idl->schema = NULL;
268
269     hmap_init(&idl->outstanding_txns);
270
271     return idl;
272 }
273
274 /* Destroys 'idl' and all of the data structures that it manages. */
275 void
276 ovsdb_idl_destroy(struct ovsdb_idl *idl)
277 {
278     if (idl) {
279         size_t i;
280
281         ovs_assert(!idl->txn);
282         ovsdb_idl_clear(idl);
283         jsonrpc_session_close(idl->session);
284
285         for (i = 0; i < idl->class->n_tables; i++) {
286             struct ovsdb_idl_table *table = &idl->tables[i];
287             shash_destroy(&table->columns);
288             hmap_destroy(&table->rows);
289             free(table->modes);
290         }
291         shash_destroy(&idl->table_by_name);
292         free(idl->tables);
293         json_destroy(idl->request_id);
294         free(idl->lock_name);
295         json_destroy(idl->lock_request_id);
296         json_destroy(idl->schema);
297         hmap_destroy(&idl->outstanding_txns);
298         free(idl);
299     }
300 }
301
302 static void
303 ovsdb_idl_clear(struct ovsdb_idl *idl)
304 {
305     bool changed = false;
306     size_t i;
307
308     for (i = 0; i < idl->class->n_tables; i++) {
309         struct ovsdb_idl_table *table = &idl->tables[i];
310         struct ovsdb_idl_row *row, *next_row;
311
312         if (hmap_is_empty(&table->rows)) {
313             continue;
314         }
315
316         changed = true;
317         HMAP_FOR_EACH_SAFE (row, next_row, hmap_node, &table->rows) {
318             struct ovsdb_idl_arc *arc, *next_arc;
319
320             if (!ovsdb_idl_row_is_orphan(row)) {
321                 ovsdb_idl_row_unparse(row);
322             }
323             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
324                 free(arc);
325             }
326             /* No need to do anything with dst_arcs: some node has those arcs
327              * as forward arcs and will destroy them itself. */
328
329             if (!list_is_empty(&row->track_node)) {
330                 list_remove(&row->track_node);
331             }
332
333             ovsdb_idl_row_destroy(row);
334         }
335     }
336
337     ovsdb_idl_track_clear(idl);
338
339     if (changed) {
340         idl->change_seqno++;
341     }
342 }
343
344 /* Processes a batch of messages from the database server on 'idl'.  This may
345  * cause the IDL's contents to change.  The client may check for that with
346  * ovsdb_idl_get_seqno(). */
347 void
348 ovsdb_idl_run(struct ovsdb_idl *idl)
349 {
350     int i;
351
352     ovs_assert(!idl->txn);
353     jsonrpc_session_run(idl->session);
354     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
355         struct jsonrpc_msg *msg;
356         unsigned int seqno;
357
358         seqno = jsonrpc_session_get_seqno(idl->session);
359         if (idl->state_seqno != seqno) {
360             idl->state_seqno = seqno;
361             json_destroy(idl->request_id);
362             idl->request_id = NULL;
363             ovsdb_idl_txn_abort_all(idl);
364
365             ovsdb_idl_send_schema_request(idl);
366             idl->state = IDL_S_SCHEMA_REQUESTED;
367             if (idl->lock_name) {
368                 ovsdb_idl_send_lock_request(idl);
369             }
370         }
371
372         msg = jsonrpc_session_recv(idl->session);
373         if (!msg) {
374             break;
375         }
376
377         if (msg->type == JSONRPC_NOTIFY
378             && !strcmp(msg->method, "update2")
379             && msg->params->type == JSON_ARRAY
380             && msg->params->u.array.n == 2
381             && msg->params->u.array.elems[0]->type == JSON_NULL) {
382             /* Database contents changed. */
383             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
384                                    OVSDB_UPDATE2);
385         } else if (msg->type == JSONRPC_REPLY
386                    && idl->request_id
387                    && json_equal(idl->request_id, msg->id)) {
388             json_destroy(idl->request_id);
389             idl->request_id = NULL;
390
391             switch (idl->state) {
392             case IDL_S_SCHEMA_REQUESTED:
393                 /* Reply to our "get_schema" request. */
394                 idl->schema = json_clone(msg->result);
395                 ovsdb_idl_send_monitor2_request(idl);
396                 idl->state = IDL_S_MONITOR2_REQUESTED;
397                 break;
398
399             case IDL_S_MONITOR_REQUESTED:
400             case IDL_S_MONITOR2_REQUESTED:
401                 /* Reply to our "monitor" or "monitor2" request. */
402                 idl->change_seqno++;
403                 ovsdb_idl_clear(idl);
404                 if (idl->state == IDL_S_MONITOR_REQUESTED) {
405                     idl->state = IDL_S_MONITORING;
406                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE);
407                 } else { /* IDL_S_MONITOR2_REQUESTED. */
408                     idl->state = IDL_S_MONITORING2;
409                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE2);
410                 }
411
412                 /* Schema is not useful after monitor request is accepted
413                  * by the server.  */
414                 json_destroy(idl->schema);
415                 idl->schema = NULL;
416                 break;
417
418             case IDL_S_MONITORING:
419             case IDL_S_MONITORING2:
420             default:
421                 OVS_NOT_REACHED();
422             }
423         } else if (msg->type == JSONRPC_NOTIFY
424             && !strcmp(msg->method, "update")
425             && msg->params->type == JSON_ARRAY
426             && msg->params->u.array.n == 2
427             && msg->params->u.array.elems[0]->type == JSON_NULL) {
428             /* Database contents changed. */
429             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
430                                    OVSDB_UPDATE);
431         } else if (msg->type == JSONRPC_REPLY
432                    && idl->lock_request_id
433                    && json_equal(idl->lock_request_id, msg->id)) {
434             /* Reply to our "lock" request. */
435             ovsdb_idl_parse_lock_reply(idl, msg->result);
436         } else if (msg->type == JSONRPC_NOTIFY
437                    && !strcmp(msg->method, "locked")) {
438             /* We got our lock. */
439             ovsdb_idl_parse_lock_notify(idl, msg->params, true);
440         } else if (msg->type == JSONRPC_NOTIFY
441                    && !strcmp(msg->method, "stolen")) {
442             /* Someone else stole our lock. */
443             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
444         } else if (msg->type == JSONRPC_ERROR
445                    && idl->state == IDL_S_MONITOR2_REQUESTED
446                    && idl->request_id
447                    && json_equal(idl->request_id, msg->id)) {
448             if (msg->error && !strcmp(json_string(msg->error),
449                                       "unknown method")) {
450                 /* Fall back to using "monitor" method.  */
451                 json_destroy(idl->request_id);
452                 idl->request_id = NULL;
453                 ovsdb_idl_send_monitor_request(idl);
454                 idl->state = IDL_S_MONITOR_REQUESTED;
455             }
456         } else if ((msg->type == JSONRPC_ERROR
457                     || msg->type == JSONRPC_REPLY)
458                    && ovsdb_idl_txn_process_reply(idl, msg)) {
459             /* ovsdb_idl_txn_process_reply() did everything needful. */
460         } else {
461             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
462              * a transaction before we receive the reply, so keep the log level
463              * low. */
464             VLOG_DBG("%s: received unexpected %s message",
465                      jsonrpc_session_get_name(idl->session),
466                      jsonrpc_msg_type_to_string(msg->type));
467         }
468         jsonrpc_msg_destroy(msg);
469     }
470     ovsdb_idl_row_destroy_postprocess(idl);
471 }
472
473 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
474  * do or when activity occurs on a transaction on 'idl'. */
475 void
476 ovsdb_idl_wait(struct ovsdb_idl *idl)
477 {
478     jsonrpc_session_wait(idl->session);
479     jsonrpc_session_recv_wait(idl->session);
480 }
481
482 /* Returns a "sequence number" that represents the state of 'idl'.  When
483  * ovsdb_idl_run() changes the database, the sequence number changes.  The
484  * initial fetch of the entire contents of the remote database is considered to
485  * be one kind of change.  Successfully acquiring a lock, if one has been
486  * configured with ovsdb_idl_set_lock(), is also considered to be a change.
487  *
488  * As long as the sequence number does not change, the client may continue to
489  * use any data structures it obtains from 'idl'.  But when it changes, the
490  * client must not access any of these data structures again, because they
491  * could have freed or reused for other purposes.
492  *
493  * The sequence number can occasionally change even if the database does not.
494  * This happens if the connection to the database drops and reconnects, which
495  * causes the database contents to be reloaded even if they didn't change.  (It
496  * could also happen if the database server sends out a "change" that reflects
497  * what the IDL already thought was in the database.  The database server is
498  * not supposed to do that, but bugs could in theory cause it to do so.) */
499 unsigned int
500 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
501 {
502     return idl->change_seqno;
503 }
504
505 /* Returns true if 'idl' successfully connected to the remote database and
506  * retrieved its contents (even if the connection subsequently dropped and is
507  * in the process of reconnecting).  If so, then 'idl' contains an atomic
508  * snapshot of the database's contents (but it might be arbitrarily old if the
509  * connection dropped).
510  *
511  * Returns false if 'idl' has never connected or retrieved the database's
512  * contents.  If so, 'idl' is empty. */
513 bool
514 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
515 {
516     return ovsdb_idl_get_seqno(idl) != 0;
517 }
518
519 /* Reconfigures 'idl' so that it would reconnect to the database, if
520  * connection was dropped. */
521 void
522 ovsdb_idl_enable_reconnect(struct ovsdb_idl *idl)
523 {
524     jsonrpc_session_enable_reconnect(idl->session);
525 }
526
527 /* Forces 'idl' to drop its connection to the database and reconnect.  In the
528  * meantime, the contents of 'idl' will not change. */
529 void
530 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
531 {
532     jsonrpc_session_force_reconnect(idl->session);
533 }
534
535 /* Some IDL users should only write to write-only columns.  Furthermore,
536  * writing to a column which is not write-only can cause serious performance
537  * degradations for these users.  This function causes 'idl' to reject writes
538  * to columns which are not marked write only using ovsdb_idl_omit_alert(). */
539 void
540 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
541 {
542     idl->verify_write_only = true;
543 }
544
545 /* Returns true if 'idl' is currently connected or trying to connect. */
546 bool
547 ovsdb_idl_is_alive(const struct ovsdb_idl *idl)
548 {
549     return jsonrpc_session_is_alive(idl->session);
550 }
551
552 /* Returns the last error reported on a connection by 'idl'.  The return value
553  * is 0 only if no connection made by 'idl' has ever encountered an error.  See
554  * jsonrpc_get_status() for return value interpretation. */
555 int
556 ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
557 {
558     return jsonrpc_session_get_last_error(idl->session);
559 }
560 \f
561 static unsigned char *
562 ovsdb_idl_get_mode(struct ovsdb_idl *idl,
563                    const struct ovsdb_idl_column *column)
564 {
565     size_t i;
566
567     ovs_assert(!idl->change_seqno);
568
569     for (i = 0; i < idl->class->n_tables; i++) {
570         const struct ovsdb_idl_table *table = &idl->tables[i];
571         const struct ovsdb_idl_table_class *tc = table->class;
572
573         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
574             return &table->modes[column - tc->columns];
575         }
576     }
577
578     OVS_NOT_REACHED();
579 }
580
581 static void
582 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
583 {
584     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
585         struct ovsdb_idl_table *table;
586
587         table = shash_find_data(&idl->table_by_name,
588                                 base->u.uuid.refTableName);
589         if (table) {
590             table->need_table = true;
591         } else {
592             VLOG_WARN("%s IDL class missing referenced table %s",
593                       idl->class->database, base->u.uuid.refTableName);
594         }
595     }
596 }
597
598 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
599  * ensures that any tables referenced by 'column' will be replicated, even if
600  * no columns in that table are selected for replication (see
601  * ovsdb_idl_add_table() for more information).
602  *
603  * This function is only useful if 'monitor_everything_by_default' was false in
604  * the call to ovsdb_idl_create().  This function should be called between
605  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
606  */
607 void
608 ovsdb_idl_add_column(struct ovsdb_idl *idl,
609                      const struct ovsdb_idl_column *column)
610 {
611     *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
612     add_ref_table(idl, &column->type.key);
613     add_ref_table(idl, &column->type.value);
614 }
615
616 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
617  * no columns are selected for replication. Just the necessary data for table
618  * references will be replicated (the UUID of the rows, for instance), any
619  * columns not selected for replication will remain unreplicated.
620  * This can be useful because it allows 'idl' to keep track of what rows in the
621  * table actually exist, which in turn allows columns that reference the table
622  * to have accurate contents. (The IDL presents the database with references to
623  * rows that do not exist removed.)
624  *
625  * This function is only useful if 'monitor_everything_by_default' was false in
626  * the call to ovsdb_idl_create().  This function should be called between
627  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
628  */
629 void
630 ovsdb_idl_add_table(struct ovsdb_idl *idl,
631                     const struct ovsdb_idl_table_class *tc)
632 {
633     size_t i;
634
635     for (i = 0; i < idl->class->n_tables; i++) {
636         struct ovsdb_idl_table *table = &idl->tables[i];
637
638         if (table->class == tc) {
639             table->need_table = true;
640             return;
641         }
642     }
643
644     OVS_NOT_REACHED();
645 }
646
647 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
648  *
649  * This function should be called between ovsdb_idl_create() and the first call
650  * to ovsdb_idl_run().
651  */
652 void
653 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
654                      const struct ovsdb_idl_column *column)
655 {
656     *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
657 }
658
659 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
660  * OVSDB_IDL_MONITOR for details.
661  *
662  * This function should be called between ovsdb_idl_create() and the first call
663  * to ovsdb_idl_run().
664  */
665 void
666 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
667 {
668     *ovsdb_idl_get_mode(idl, column) = 0;
669 }
670
671 /* Returns the most recent IDL change sequence number that caused a
672  * insert, modify or delete update to the table with class 'table_class'.
673  */
674 unsigned int
675 ovsdb_idl_table_get_seqno(const struct ovsdb_idl *idl,
676                           const struct ovsdb_idl_table_class *table_class)
677 {
678     struct ovsdb_idl_table *table
679         = ovsdb_idl_table_from_class(idl, table_class);
680     unsigned int max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_INSERT];
681
682     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]) {
683         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY];
684     }
685     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_DELETE]) {
686         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_DELETE];
687     }
688     return max_seqno;
689 }
690
691 /* For each row that contains tracked columns, IDL stores the most
692  * recent IDL change sequence numbers associateed with insert, modify
693  * and delete updates to the table.
694  */
695 unsigned int
696 ovsdb_idl_row_get_seqno(const struct ovsdb_idl_row *row,
697                         enum ovsdb_idl_change change)
698 {
699     return row->change_seqno[change];
700 }
701
702 /* Turns on OVSDB_IDL_TRACK for 'column' in 'idl', ensuring that
703  * all rows whose 'column' is modified are traced. Similarly, insert
704  * or delete of rows having 'column' are tracked. Clients are able
705  * to retrive the tracked rows with the ovsdb_idl_track_get_*()
706  * functions.
707  *
708  * This function should be called between ovsdb_idl_create() and
709  * the first call to ovsdb_idl_run(). The column to be tracked
710  * should have OVSDB_IDL_ALERT turned on.
711  */
712 void
713 ovsdb_idl_track_add_column(struct ovsdb_idl *idl,
714                            const struct ovsdb_idl_column *column)
715 {
716     if (!(*ovsdb_idl_get_mode(idl, column) & OVSDB_IDL_ALERT)) {
717         ovsdb_idl_add_column(idl, column);
718     }
719     *ovsdb_idl_get_mode(idl, column) |= OVSDB_IDL_TRACK;
720 }
721
722 void
723 ovsdb_idl_track_add_all(struct ovsdb_idl *idl)
724 {
725     size_t i, j;
726
727     for (i = 0; i < idl->class->n_tables; i++) {
728         const struct ovsdb_idl_table_class *tc = &idl->class->tables[i];
729
730         for (j = 0; j < tc->n_columns; j++) {
731             const struct ovsdb_idl_column *column = &tc->columns[j];
732             ovsdb_idl_track_add_column(idl, column);
733         }
734     }
735 }
736
737 /* Returns true if 'table' has any tracked column. */
738 static bool
739 ovsdb_idl_track_is_set(struct ovsdb_idl_table *table)
740 {
741     size_t i;
742
743     for (i = 0; i < table->class->n_columns; i++) {
744         if (table->modes[i] & OVSDB_IDL_TRACK) {
745             return true;
746         }
747     }
748    return false;
749 }
750
751 /* Returns the first tracked row in table with class 'table_class'
752  * for the specified 'idl'. Returns NULL if there are no tracked rows */
753 const struct ovsdb_idl_row *
754 ovsdb_idl_track_get_first(const struct ovsdb_idl *idl,
755                           const struct ovsdb_idl_table_class *table_class)
756 {
757     struct ovsdb_idl_table *table
758         = ovsdb_idl_table_from_class(idl, table_class);
759
760     if (!list_is_empty(&table->track_list)) {
761         return CONTAINER_OF(list_front(&table->track_list), struct ovsdb_idl_row, track_node);
762     }
763     return NULL;
764 }
765
766 /* Returns the next tracked row in table after the specified 'row'
767  * (in no particular order). Returns NULL if there are no tracked rows */
768 const struct ovsdb_idl_row *
769 ovsdb_idl_track_get_next(const struct ovsdb_idl_row *row)
770 {
771     if (row->track_node.next != &row->table->track_list) {
772         return CONTAINER_OF(row->track_node.next, struct ovsdb_idl_row, track_node);
773     }
774
775     return NULL;
776 }
777
778 /* Returns true if a tracked 'column' in 'row' was updated by IDL, false
779  * otherwise. The tracking data is cleared by ovsdb_idl_track_clear()
780  *
781  * Function returns false if 'column' is not tracked (see
782  * ovsdb_idl_track_add_column()).
783  */
784 bool
785 ovsdb_idl_track_is_updated(const struct ovsdb_idl_row *row,
786                            const struct ovsdb_idl_column *column)
787 {
788     const struct ovsdb_idl_table_class *class;
789     size_t column_idx;
790
791     class = row->table->class;
792     column_idx = column - class->columns;
793
794     if (row->updated && bitmap_is_set(row->updated, column_idx)) {
795         return true;
796     } else {
797         return false;
798     }
799 }
800
801 /* Flushes the tracked rows. Client calls this function after calling
802  * ovsdb_idl_run() and read all tracked rows with the ovsdb_idl_track_get_*()
803  * functions. This is usually done at the end of the client's processing
804  * loop when it is ready to do ovsdb_idl_run() again.
805  */
806 void
807 ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
808 {
809     size_t i;
810
811     for (i = 0; i < idl->class->n_tables; i++) {
812         struct ovsdb_idl_table *table = &idl->tables[i];
813
814         if (!list_is_empty(&table->track_list)) {
815             struct ovsdb_idl_row *row, *next;
816
817             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
818                 if (row->updated) {
819                     free(row->updated);
820                     row->updated = NULL;
821                 }
822                 list_remove(&row->track_node);
823                 list_init(&row->track_node);
824                 if (ovsdb_idl_row_is_orphan(row)) {
825                     ovsdb_idl_row_clear_old(row);
826                     free(row);
827                 }
828             }
829         }
830     }
831 }
832
833 \f
834 static void
835 ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
836 {
837     struct jsonrpc_msg *msg;
838
839     json_destroy(idl->request_id);
840     msg = jsonrpc_create_request(
841         "get_schema",
842         json_array_create_1(json_string_create(idl->class->database)),
843         &idl->request_id);
844     jsonrpc_session_send(idl->session, msg);
845 }
846
847 static void
848 log_error(struct ovsdb_error *error)
849 {
850     char *s = ovsdb_error_to_string(error);
851     VLOG_WARN("error parsing database schema: %s", s);
852     free(s);
853     ovsdb_error_destroy(error);
854 }
855
856 /* Frees 'schema', which is in the format returned by parse_schema(). */
857 static void
858 free_schema(struct shash *schema)
859 {
860     if (schema) {
861         struct shash_node *node, *next;
862
863         SHASH_FOR_EACH_SAFE (node, next, schema) {
864             struct sset *sset = node->data;
865             sset_destroy(sset);
866             free(sset);
867             shash_delete(schema, node);
868         }
869         shash_destroy(schema);
870         free(schema);
871     }
872 }
873
874 /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
875  * 7047, to obtain the names of its rows and columns.  If successful, returns
876  * an shash whose keys are table names and whose values are ssets, where each
877  * sset contains the names of its table's columns.  On failure (due to a parse
878  * error), returns NULL.
879  *
880  * It would also be possible to use the general-purpose OVSDB schema parser in
881  * ovsdb-server, but that's overkill, possibly too strict for the current use
882  * case, and would require restructuring ovsdb-server to separate the schema
883  * code from the rest. */
884 static struct shash *
885 parse_schema(const struct json *schema_json)
886 {
887     struct ovsdb_parser parser;
888     const struct json *tables_json;
889     struct ovsdb_error *error;
890     struct shash_node *node;
891     struct shash *schema;
892
893     ovsdb_parser_init(&parser, schema_json, "database schema");
894     tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
895     error = ovsdb_parser_destroy(&parser);
896     if (error) {
897         log_error(error);
898         return NULL;
899     }
900
901     schema = xmalloc(sizeof *schema);
902     shash_init(schema);
903     SHASH_FOR_EACH (node, json_object(tables_json)) {
904         const char *table_name = node->name;
905         const struct json *json = node->data;
906         const struct json *columns_json;
907
908         ovsdb_parser_init(&parser, json, "table schema for table %s",
909                           table_name);
910         columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
911         error = ovsdb_parser_destroy(&parser);
912         if (error) {
913             log_error(error);
914             free_schema(schema);
915             return NULL;
916         }
917
918         struct sset *columns = xmalloc(sizeof *columns);
919         sset_init(columns);
920
921         struct shash_node *node2;
922         SHASH_FOR_EACH (node2, json_object(columns_json)) {
923             const char *column_name = node2->name;
924             sset_add(columns, column_name);
925         }
926         shash_add(schema, table_name, columns);
927     }
928     return schema;
929 }
930
931 static void
932 ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
933                                  const char *method)
934 {
935     struct shash *schema;
936     struct json *monitor_requests;
937     struct jsonrpc_msg *msg;
938     size_t i;
939
940     schema = parse_schema(idl->schema);
941     monitor_requests = json_object_create();
942     for (i = 0; i < idl->class->n_tables; i++) {
943         const struct ovsdb_idl_table *table = &idl->tables[i];
944         const struct ovsdb_idl_table_class *tc = table->class;
945         struct json *monitor_request, *columns;
946         const struct sset *table_schema;
947         size_t j;
948
949         table_schema = (schema
950                         ? shash_find_data(schema, table->class->name)
951                         : NULL);
952
953         columns = table->need_table ? json_array_create_empty() : NULL;
954         for (j = 0; j < tc->n_columns; j++) {
955             const struct ovsdb_idl_column *column = &tc->columns[j];
956             if (table->modes[j] & OVSDB_IDL_MONITOR) {
957                 if (table_schema
958                     && !sset_contains(table_schema, column->name)) {
959                     VLOG_WARN("%s table in %s database lacks %s column "
960                               "(database needs upgrade?)",
961                               table->class->name, idl->class->database,
962                               column->name);
963                     continue;
964                 }
965                 if (!columns) {
966                     columns = json_array_create_empty();
967                 }
968                 json_array_add(columns, json_string_create(column->name));
969             }
970         }
971
972         if (columns) {
973             if (schema && !table_schema) {
974                 VLOG_WARN("%s database lacks %s table "
975                           "(database needs upgrade?)",
976                           idl->class->database, table->class->name);
977                 json_destroy(columns);
978                 continue;
979             }
980
981             monitor_request = json_object_create();
982             json_object_put(monitor_request, "columns", columns);
983             json_object_put(monitor_requests, tc->name, monitor_request);
984         }
985     }
986     free_schema(schema);
987
988     json_destroy(idl->request_id);
989     msg = jsonrpc_create_request(
990         method,
991         json_array_create_3(json_string_create(idl->class->database),
992                             json_null_create(), monitor_requests),
993         &idl->request_id);
994     jsonrpc_session_send(idl->session, msg);
995 }
996
997 static void
998 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
999 {
1000     ovsdb_idl_send_monitor_request__(idl, "monitor");
1001 }
1002
1003 static void
1004 log_parse_update_error(struct ovsdb_error *error)
1005 {
1006         if (!VLOG_DROP_WARN(&syntax_rl)) {
1007             char *s = ovsdb_error_to_string(error);
1008             VLOG_WARN_RL(&syntax_rl, "%s", s);
1009             free(s);
1010         }
1011         ovsdb_error_destroy(error);
1012 }
1013
1014 static void
1015 ovsdb_idl_send_monitor2_request(struct ovsdb_idl *idl)
1016 {
1017     ovsdb_idl_send_monitor_request__(idl, "monitor2");
1018 }
1019
1020 static void
1021 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates,
1022                        enum ovsdb_update_version version)
1023 {
1024     struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates,
1025                                                          version);
1026     if (error) {
1027         log_parse_update_error(error);
1028     }
1029 }
1030
1031 static struct ovsdb_error *
1032 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
1033                          const struct json *table_updates,
1034                          enum ovsdb_update_version version)
1035 {
1036     const struct shash_node *tables_node;
1037     const char *table_updates_name = table_updates_names[version];
1038     const char *table_update_name = table_update_names[version];
1039     const char *row_update_name = row_update_names[version];
1040
1041     if (table_updates->type != JSON_OBJECT) {
1042         return ovsdb_syntax_error(table_updates, NULL,
1043                                   "<%s> is not an object",
1044                                   table_updates_name);
1045     }
1046
1047     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
1048         const struct json *table_update = tables_node->data;
1049         const struct shash_node *table_node;
1050         struct ovsdb_idl_table *table;
1051
1052         table = shash_find_data(&idl->table_by_name, tables_node->name);
1053         if (!table) {
1054             return ovsdb_syntax_error(
1055                 table_updates, NULL,
1056                 "<%s> includes unknown table \"%s\"",
1057                 table_updates_name,
1058                 tables_node->name);
1059         }
1060
1061         if (table_update->type != JSON_OBJECT) {
1062             return ovsdb_syntax_error(table_update, NULL,
1063                                       "<%s> for table \"%s\" is "
1064                                       "not an object",
1065                                       table_update_name,
1066                                       table->class->name);
1067         }
1068         SHASH_FOR_EACH (table_node, json_object(table_update)) {
1069             const struct json *row_update = table_node->data;
1070             const struct json *old_json, *new_json;
1071             struct uuid uuid;
1072
1073             if (!uuid_from_string(&uuid, table_node->name)) {
1074                 return ovsdb_syntax_error(table_update, NULL,
1075                                           "<%s> for table \"%s\" "
1076                                           "contains bad UUID "
1077                                           "\"%s\" as member name",
1078                                           table_update_name,
1079                                           table->class->name,
1080                                           table_node->name);
1081             }
1082             if (row_update->type != JSON_OBJECT) {
1083                 return ovsdb_syntax_error(row_update, NULL,
1084                                           "<%s> for table \"%s\" "
1085                                           "contains <%s> for %s that "
1086                                           "is not an object",
1087                                           table_update_name,
1088                                           table->class->name,
1089                                           row_update_name,
1090                                           table_node->name);
1091             }
1092
1093             switch(version) {
1094             case OVSDB_UPDATE:
1095                 old_json = shash_find_data(json_object(row_update), "old");
1096                 new_json = shash_find_data(json_object(row_update), "new");
1097                 if (old_json && old_json->type != JSON_OBJECT) {
1098                     return ovsdb_syntax_error(old_json, NULL,
1099                                               "\"old\" <row> is not object");
1100                 } else if (new_json && new_json->type != JSON_OBJECT) {
1101                     return ovsdb_syntax_error(new_json, NULL,
1102                                               "\"new\" <row> is not object");
1103                 } else if ((old_json != NULL) + (new_json != NULL)
1104                            != shash_count(json_object(row_update))) {
1105                     return ovsdb_syntax_error(row_update, NULL,
1106                                               "<row-update> contains "
1107                                               "unexpected member");
1108                 } else if (!old_json && !new_json) {
1109                     return ovsdb_syntax_error(row_update, NULL,
1110                                               "<row-update> missing \"old\" "
1111                                               "and \"new\" members");
1112                 }
1113
1114                 if (ovsdb_idl_process_update(table, &uuid, old_json,
1115                                              new_json)) {
1116                     idl->change_seqno++;
1117                 }
1118                 break;
1119
1120             case OVSDB_UPDATE2: {
1121                 const char *ops[] = {"modify", "insert", "delete", "initial"};
1122                 const char *operation;
1123                 const struct json *row;
1124                 int i;
1125
1126                 for (i = 0; i < ARRAY_SIZE(ops); i++) {
1127                     operation = ops[i];
1128                     row = shash_find_data(json_object(row_update), operation);
1129
1130                     if (row)  {
1131                         if (ovsdb_idl_process_update2(table, &uuid, operation,
1132                                                       row)) {
1133                             idl->change_seqno++;
1134                         }
1135                         break;
1136                     }
1137                 }
1138
1139                 /* row_update2 should contain one of the objects */
1140                 if (i == ARRAY_SIZE(ops)) {
1141                     return ovsdb_syntax_error(row_update, NULL,
1142                                               "<row_update2> includes unknown "
1143                                               "object");
1144                 }
1145                 break;
1146             }
1147
1148             default:
1149                 OVS_NOT_REACHED();
1150             }
1151         }
1152     }
1153
1154     return NULL;
1155 }
1156
1157 static struct ovsdb_idl_row *
1158 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
1159 {
1160     struct ovsdb_idl_row *row;
1161
1162     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &table->rows) {
1163         if (uuid_equals(&row->uuid, uuid)) {
1164             return row;
1165         }
1166     }
1167     return NULL;
1168 }
1169
1170 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1171  * otherwise. */
1172 static bool
1173 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
1174                          const struct uuid *uuid, const struct json *old,
1175                          const struct json *new)
1176 {
1177     struct ovsdb_idl_row *row;
1178
1179     row = ovsdb_idl_get_row(table, uuid);
1180     if (!new) {
1181         /* Delete row. */
1182         if (row && !ovsdb_idl_row_is_orphan(row)) {
1183             /* XXX perhaps we should check the 'old' values? */
1184             ovsdb_idl_delete_row(row);
1185         } else {
1186             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1187                          "from table %s",
1188                          UUID_ARGS(uuid), table->class->name);
1189             return false;
1190         }
1191     } else if (!old) {
1192         /* Insert row. */
1193         if (!row) {
1194             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1195         } else if (ovsdb_idl_row_is_orphan(row)) {
1196             ovsdb_idl_insert_row(row, new);
1197         } else {
1198             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1199                          "table %s", UUID_ARGS(uuid), table->class->name);
1200             return ovsdb_idl_modify_row(row, new);
1201         }
1202     } else {
1203         /* Modify row. */
1204         if (row) {
1205             /* XXX perhaps we should check the 'old' values? */
1206             if (!ovsdb_idl_row_is_orphan(row)) {
1207                 return ovsdb_idl_modify_row(row, new);
1208             } else {
1209                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1210                              "referenced row "UUID_FMT" in table %s",
1211                              UUID_ARGS(uuid), table->class->name);
1212                 ovsdb_idl_insert_row(row, new);
1213             }
1214         } else {
1215             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1216                          "in table %s", UUID_ARGS(uuid), table->class->name);
1217             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1218         }
1219     }
1220
1221     return true;
1222 }
1223
1224 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1225  * otherwise. */
1226 static bool
1227 ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
1228                           const struct uuid *uuid,
1229                           const char *operation,
1230                           const struct json *json_row)
1231 {
1232     struct ovsdb_idl_row *row;
1233
1234     row = ovsdb_idl_get_row(table, uuid);
1235     if (!strcmp(operation, "delete")) {
1236         /* Delete row. */
1237         if (row && !ovsdb_idl_row_is_orphan(row)) {
1238             ovsdb_idl_delete_row(row);
1239         } else {
1240             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1241                          "from table %s",
1242                          UUID_ARGS(uuid), table->class->name);
1243             return false;
1244         }
1245     } else if (!strcmp(operation, "insert") || !strcmp(operation, "initial")) {
1246         /* Insert row. */
1247         if (!row) {
1248             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), json_row);
1249         } else if (ovsdb_idl_row_is_orphan(row)) {
1250             ovsdb_idl_insert_row(row, json_row);
1251         } else {
1252             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1253                          "table %s", UUID_ARGS(uuid), table->class->name);
1254             ovsdb_idl_delete_row(row);
1255             ovsdb_idl_insert_row(row, json_row);
1256         }
1257     } else if (!strcmp(operation, "modify")) {
1258         /* Modify row. */
1259         if (row) {
1260             if (!ovsdb_idl_row_is_orphan(row)) {
1261                 return ovsdb_idl_modify_row_by_diff(row, json_row);
1262             } else {
1263                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1264                              "referenced row "UUID_FMT" in table %s",
1265                              UUID_ARGS(uuid), table->class->name);
1266                 return false;
1267             }
1268         } else {
1269             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1270                          "in table %s", UUID_ARGS(uuid), table->class->name);
1271             return false;
1272         }
1273     } else {
1274             VLOG_WARN_RL(&semantic_rl, "unknown operation %s to "
1275                          "table %s", operation, table->class->name);
1276             return false;
1277     }
1278
1279     return true;
1280 }
1281
1282 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1283  * otherwise.
1284  *
1285  * Change 'row' either with the content of 'row_json' or by apply 'diff'.
1286  * Caller needs to provide either valid 'row_json' or 'diff', but not
1287  * both.  */
1288 static bool
1289 ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
1290                        const struct json *diff_json,
1291                        enum ovsdb_idl_change change)
1292 {
1293     struct ovsdb_idl_table *table = row->table;
1294     const struct ovsdb_idl_table_class *class = table->class;
1295     struct shash_node *node;
1296     bool changed = false;
1297     bool apply_diff = diff_json != NULL;
1298     const struct json *json = apply_diff ? diff_json : row_json;
1299
1300     SHASH_FOR_EACH (node, json_object(json)) {
1301         const char *column_name = node->name;
1302         const struct ovsdb_idl_column *column;
1303         struct ovsdb_datum datum;
1304         struct ovsdb_error *error;
1305         unsigned int column_idx;
1306         struct ovsdb_datum *old;
1307
1308         column = shash_find_data(&table->columns, column_name);
1309         if (!column) {
1310             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
1311                          column_name, UUID_ARGS(&row->uuid));
1312             continue;
1313         }
1314
1315         column_idx = column - table->class->columns;
1316         old = &row->old[column_idx];
1317
1318         error = NULL;
1319         if (apply_diff) {
1320             struct ovsdb_datum diff;
1321
1322             ovs_assert(!row_json);
1323             error = ovsdb_transient_datum_from_json(&diff, &column->type,
1324                                                     node->data);
1325             if (!error) {
1326                 error = ovsdb_datum_apply_diff(&datum, old, &diff,
1327                                                &column->type);
1328                 ovsdb_datum_destroy(&diff, &column->type);
1329             }
1330         } else {
1331             ovs_assert(!diff_json);
1332             error = ovsdb_datum_from_json(&datum, &column->type, node->data,
1333                                           NULL);
1334         }
1335
1336         if (!error) {
1337             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
1338                 ovsdb_datum_swap(old, &datum);
1339                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
1340                     changed = true;
1341                     row->change_seqno[change]
1342                         = row->table->change_seqno[change]
1343                         = row->table->idl->change_seqno + 1;
1344                     if (table->modes[column_idx] & OVSDB_IDL_TRACK) {
1345                         if (list_is_empty(&row->track_node)) {
1346                             list_push_front(&row->table->track_list,
1347                                             &row->track_node);
1348                         }
1349                         if (!row->updated) {
1350                             row->updated = bitmap_allocate(class->n_columns);
1351                         }
1352                         bitmap_set1(row->updated, column_idx);
1353                     }
1354                 }
1355             } else {
1356                 /* Didn't really change but the OVSDB monitor protocol always
1357                  * includes every value in a row. */
1358             }
1359
1360             ovsdb_datum_destroy(&datum, &column->type);
1361         } else {
1362             char *s = ovsdb_error_to_string(error);
1363             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
1364                          " in table %s: %s", column_name,
1365                          UUID_ARGS(&row->uuid), table->class->name, s);
1366             free(s);
1367             ovsdb_error_destroy(error);
1368         }
1369     }
1370     return changed;
1371 }
1372
1373 static bool
1374 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json,
1375                      enum ovsdb_idl_change change)
1376 {
1377     return ovsdb_idl_row_change__(row, row_json, NULL, change);
1378 }
1379
1380 static bool
1381 ovsdb_idl_row_apply_diff(struct ovsdb_idl_row *row,
1382                          const struct json *diff_json,
1383                          enum ovsdb_idl_change change)
1384 {
1385     return ovsdb_idl_row_change__(row, NULL, diff_json, change);
1386 }
1387
1388 /* When a row A refers to row B through a column with a "refTable" constraint,
1389  * but row B does not exist, row B is called an "orphan row".  Orphan rows
1390  * should not persist, because the database enforces referential integrity, but
1391  * they can appear transiently as changes from the database are received (the
1392  * database doesn't try to topologically sort them and circular references mean
1393  * it isn't always possible anyhow).
1394  *
1395  * This function returns true if 'row' is an orphan row, otherwise false.
1396  */
1397 static bool
1398 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
1399 {
1400     return !row->old && !row->new;
1401 }
1402
1403 /* Returns true if 'row' is conceptually part of the database as modified by
1404  * the current transaction (if any), false otherwise.
1405  *
1406  * This function will return true if 'row' is not an orphan (see the comment on
1407  * ovsdb_idl_row_is_orphan()) and:
1408  *
1409  *   - 'row' exists in the database and has not been deleted within the
1410  *     current transaction (if any).
1411  *
1412  *   - 'row' was inserted within the current transaction and has not been
1413  *     deleted.  (In the latter case you should not have passed 'row' in at
1414  *     all, because ovsdb_idl_txn_delete() freed it.)
1415  *
1416  * This function will return false if 'row' is an orphan or if 'row' was
1417  * deleted within the current transaction.
1418  */
1419 static bool
1420 ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
1421 {
1422     return row->new != NULL;
1423 }
1424
1425 static void
1426 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
1427 {
1428     const struct ovsdb_idl_table_class *class = row->table->class;
1429     size_t i;
1430
1431     for (i = 0; i < class->n_columns; i++) {
1432         const struct ovsdb_idl_column *c = &class->columns[i];
1433         (c->parse)(row, &row->old[i]);
1434     }
1435 }
1436
1437 static void
1438 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
1439 {
1440     const struct ovsdb_idl_table_class *class = row->table->class;
1441     size_t i;
1442
1443     for (i = 0; i < class->n_columns; i++) {
1444         const struct ovsdb_idl_column *c = &class->columns[i];
1445         (c->unparse)(row);
1446     }
1447 }
1448
1449 static void
1450 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
1451 {
1452     ovs_assert(row->old == row->new);
1453     if (!ovsdb_idl_row_is_orphan(row)) {
1454         const struct ovsdb_idl_table_class *class = row->table->class;
1455         size_t i;
1456
1457         for (i = 0; i < class->n_columns; i++) {
1458             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
1459         }
1460         free(row->old);
1461         row->old = row->new = NULL;
1462     }
1463 }
1464
1465 static void
1466 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
1467 {
1468     if (row->old != row->new) {
1469         if (row->new) {
1470             const struct ovsdb_idl_table_class *class = row->table->class;
1471             size_t i;
1472
1473             if (row->written) {
1474                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
1475                     ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
1476                 }
1477             }
1478             free(row->new);
1479             free(row->written);
1480             row->written = NULL;
1481         }
1482         row->new = row->old;
1483     }
1484 }
1485
1486 static void
1487 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
1488 {
1489     struct ovsdb_idl_arc *arc, *next;
1490
1491     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
1492      * that this causes to be unreferenced, if tracking is not enabled.
1493      * If tracking is enabled, orphaned nodes are removed from hmap but not
1494      * freed.
1495      */
1496     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
1497         list_remove(&arc->dst_node);
1498         if (destroy_dsts
1499             && ovsdb_idl_row_is_orphan(arc->dst)
1500             && list_is_empty(&arc->dst->dst_arcs)) {
1501             ovsdb_idl_row_destroy(arc->dst);
1502         }
1503         free(arc);
1504     }
1505     list_init(&row->src_arcs);
1506 }
1507
1508 /* Force nodes that reference 'row' to reparse. */
1509 static void
1510 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
1511 {
1512     struct ovsdb_idl_arc *arc, *next;
1513
1514     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
1515      * 'arc', so we need to use the "safe" variant of list traversal.  However,
1516      * calling an ovsdb_idl_column's 'parse' function will add an arc
1517      * equivalent to 'arc' to row->arcs.  That could be a problem for
1518      * traversal, but it adds it at the beginning of the list to prevent us
1519      * from stumbling upon it again.
1520      *
1521      * (If duplicate arcs were possible then we would need to make sure that
1522      * 'next' didn't also point into 'arc''s destination, but we forbid
1523      * duplicate arcs.) */
1524     LIST_FOR_EACH_SAFE (arc, next, dst_node, &row->dst_arcs) {
1525         struct ovsdb_idl_row *ref = arc->src;
1526
1527         ovsdb_idl_row_unparse(ref);
1528         ovsdb_idl_row_clear_arcs(ref, false);
1529         ovsdb_idl_row_parse(ref);
1530     }
1531 }
1532
1533 static struct ovsdb_idl_row *
1534 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
1535 {
1536     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
1537     class->row_init(row);
1538     list_init(&row->src_arcs);
1539     list_init(&row->dst_arcs);
1540     hmap_node_nullify(&row->txn_node);
1541     list_init(&row->track_node);
1542     return row;
1543 }
1544
1545 static struct ovsdb_idl_row *
1546 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
1547 {
1548     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
1549     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
1550     row->uuid = *uuid;
1551     row->table = table;
1552     return row;
1553 }
1554
1555 static void
1556 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
1557 {
1558     if (row) {
1559         ovsdb_idl_row_clear_old(row);
1560         hmap_remove(&row->table->rows, &row->hmap_node);
1561         if (ovsdb_idl_track_is_set(row->table)) {
1562             row->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1563                 = row->table->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1564                 = row->table->idl->change_seqno + 1;
1565         }
1566         if (list_is_empty(&row->track_node)) {
1567             list_push_front(&row->table->track_list, &row->track_node);
1568         }
1569     }
1570 }
1571
1572 static void
1573 ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
1574 {
1575     size_t i;
1576
1577     for (i = 0; i < idl->class->n_tables; i++) {
1578         struct ovsdb_idl_table *table = &idl->tables[i];
1579
1580         if (!list_is_empty(&table->track_list)) {
1581             struct ovsdb_idl_row *row, *next;
1582
1583             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
1584                 if (!ovsdb_idl_track_is_set(row->table)) {
1585                     list_remove(&row->track_node);
1586                     free(row);
1587                 }
1588             }
1589         }
1590     }
1591 }
1592
1593 static void
1594 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
1595 {
1596     const struct ovsdb_idl_table_class *class = row->table->class;
1597     size_t i;
1598
1599     ovs_assert(!row->old && !row->new);
1600     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
1601     for (i = 0; i < class->n_columns; i++) {
1602         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
1603     }
1604     ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_INSERT);
1605     ovsdb_idl_row_parse(row);
1606
1607     ovsdb_idl_row_reparse_backrefs(row);
1608 }
1609
1610 static void
1611 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
1612 {
1613     ovsdb_idl_row_unparse(row);
1614     ovsdb_idl_row_clear_arcs(row, true);
1615     ovsdb_idl_row_clear_old(row);
1616     if (list_is_empty(&row->dst_arcs)) {
1617         ovsdb_idl_row_destroy(row);
1618     } else {
1619         ovsdb_idl_row_reparse_backrefs(row);
1620     }
1621 }
1622
1623 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1624  * otherwise. */
1625 static bool
1626 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
1627 {
1628     bool changed;
1629
1630     ovsdb_idl_row_unparse(row);
1631     ovsdb_idl_row_clear_arcs(row, true);
1632     changed = ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_MODIFY);
1633     ovsdb_idl_row_parse(row);
1634
1635     return changed;
1636 }
1637
1638 static bool
1639 ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *row,
1640                              const struct json *diff_json)
1641 {
1642     bool changed;
1643
1644     ovsdb_idl_row_unparse(row);
1645     ovsdb_idl_row_clear_arcs(row, true);
1646     changed = ovsdb_idl_row_apply_diff(row, diff_json,
1647                                        OVSDB_IDL_CHANGE_MODIFY);
1648     ovsdb_idl_row_parse(row);
1649
1650     return changed;
1651 }
1652
1653 static bool
1654 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
1655 {
1656     const struct ovsdb_idl_arc *arc;
1657
1658     /* No self-arcs. */
1659     if (src == dst) {
1660         return false;
1661     }
1662
1663     /* No duplicate arcs.
1664      *
1665      * We only need to test whether the first arc in dst->dst_arcs originates
1666      * at 'src', since we add all of the arcs from a given source in a clump
1667      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
1668      * added at the front of the dst_arcs list. */
1669     if (list_is_empty(&dst->dst_arcs)) {
1670         return true;
1671     }
1672     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
1673     return arc->src != src;
1674 }
1675
1676 static struct ovsdb_idl_table *
1677 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
1678                            const struct ovsdb_idl_table_class *table_class)
1679 {
1680     return &idl->tables[table_class - idl->class->tables];
1681 }
1682
1683 /* Called by ovsdb-idlc generated code. */
1684 struct ovsdb_idl_row *
1685 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
1686                       struct ovsdb_idl_table_class *dst_table_class,
1687                       const struct uuid *dst_uuid)
1688 {
1689     struct ovsdb_idl *idl = src->table->idl;
1690     struct ovsdb_idl_table *dst_table;
1691     struct ovsdb_idl_arc *arc;
1692     struct ovsdb_idl_row *dst;
1693
1694     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
1695     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
1696     if (idl->txn) {
1697         /* We're being called from ovsdb_idl_txn_write().  We must not update
1698          * any arcs, because the transaction will be backed out at commit or
1699          * abort time and we don't want our graph screwed up.
1700          *
1701          * Just return the destination row, if there is one and it has not been
1702          * deleted. */
1703         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
1704             return dst;
1705         }
1706         return NULL;
1707     } else {
1708         /* We're being called from some other context.  Update the graph. */
1709         if (!dst) {
1710             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
1711         }
1712
1713         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
1714         if (may_add_arc(src, dst)) {
1715             /* The arc *must* be added at the front of the dst_arcs list.  See
1716              * ovsdb_idl_row_reparse_backrefs() for details. */
1717             arc = xmalloc(sizeof *arc);
1718             list_push_front(&src->src_arcs, &arc->src_node);
1719             list_push_front(&dst->dst_arcs, &arc->dst_node);
1720             arc->src = src;
1721             arc->dst = dst;
1722         }
1723
1724         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
1725     }
1726 }
1727
1728 /* Searches 'tc''s table in 'idl' for a row with UUID 'uuid'.  Returns a
1729  * pointer to the row if there is one, otherwise a null pointer.  */
1730 const struct ovsdb_idl_row *
1731 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
1732                            const struct ovsdb_idl_table_class *tc,
1733                            const struct uuid *uuid)
1734 {
1735     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
1736 }
1737
1738 static struct ovsdb_idl_row *
1739 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
1740 {
1741     for (; node; node = hmap_next(&table->rows, node)) {
1742         struct ovsdb_idl_row *row;
1743
1744         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
1745         if (ovsdb_idl_row_exists(row)) {
1746             return row;
1747         }
1748     }
1749     return NULL;
1750 }
1751
1752 /* Returns a row in 'table_class''s table in 'idl', or a null pointer if that
1753  * table is empty.
1754  *
1755  * Database tables are internally maintained as hash tables, so adding or
1756  * removing rows while traversing the same table can cause some rows to be
1757  * visited twice or not at apply. */
1758 const struct ovsdb_idl_row *
1759 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
1760                     const struct ovsdb_idl_table_class *table_class)
1761 {
1762     struct ovsdb_idl_table *table
1763         = ovsdb_idl_table_from_class(idl, table_class);
1764     return next_real_row(table, hmap_first(&table->rows));
1765 }
1766
1767 /* Returns a row following 'row' within its table, or a null pointer if 'row'
1768  * is the last row in its table. */
1769 const struct ovsdb_idl_row *
1770 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
1771 {
1772     struct ovsdb_idl_table *table = row->table;
1773
1774     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
1775 }
1776
1777 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
1778  * transaction has changed 'column''s value, the modified value is returned.
1779  *
1780  * The caller must not modify or free the returned value.
1781  *
1782  * Various kinds of changes can invalidate the returned value: writing to the
1783  * same 'column' in 'row' (e.g. with ovsdb_idl_txn_write()), deleting 'row'
1784  * (e.g. with ovsdb_idl_txn_delete()), or completing an ongoing transaction
1785  * (e.g. with ovsdb_idl_txn_commit() or ovsdb_idl_txn_abort()).  If the
1786  * returned value is needed for a long time, it is best to make a copy of it
1787  * with ovsdb_datum_clone(). */
1788 const struct ovsdb_datum *
1789 ovsdb_idl_read(const struct ovsdb_idl_row *row,
1790                const struct ovsdb_idl_column *column)
1791 {
1792     const struct ovsdb_idl_table_class *class;
1793     size_t column_idx;
1794
1795     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
1796
1797     class = row->table->class;
1798     column_idx = column - class->columns;
1799
1800     ovs_assert(row->new != NULL);
1801     ovs_assert(column_idx < class->n_columns);
1802
1803     if (row->written && bitmap_is_set(row->written, column_idx)) {
1804         return &row->new[column_idx];
1805     } else if (row->old) {
1806         return &row->old[column_idx];
1807     } else {
1808         return ovsdb_datum_default(&column->type);
1809     }
1810 }
1811
1812 /* Same as ovsdb_idl_read(), except that it also asserts that 'column' has key
1813  * type 'key_type' and value type 'value_type'.  (Scalar and set types will
1814  * have a value type of OVSDB_TYPE_VOID.)
1815  *
1816  * This is useful in code that "knows" that a particular column has a given
1817  * type, so that it will abort if someone changes the column's type without
1818  * updating the code that uses it. */
1819 const struct ovsdb_datum *
1820 ovsdb_idl_get(const struct ovsdb_idl_row *row,
1821               const struct ovsdb_idl_column *column,
1822               enum ovsdb_atomic_type key_type OVS_UNUSED,
1823               enum ovsdb_atomic_type value_type OVS_UNUSED)
1824 {
1825     ovs_assert(column->type.key.type == key_type);
1826     ovs_assert(column->type.value.type == value_type);
1827
1828     return ovsdb_idl_read(row, column);
1829 }
1830
1831 /* Returns true if the field represented by 'column' in 'row' may be modified,
1832  * false if it is immutable.
1833  *
1834  * Normally, whether a field is mutable is controlled by its column's schema.
1835  * However, an immutable column can be set to any initial value at the time of
1836  * insertion, so if 'row' is a new row (one that is being added as part of the
1837  * current transaction, supposing that a transaction is in progress) then even
1838  * its "immutable" fields are actually mutable. */
1839 bool
1840 ovsdb_idl_is_mutable(const struct ovsdb_idl_row *row,
1841                      const struct ovsdb_idl_column *column)
1842 {
1843     return column->mutable || (row->new && !row->old);
1844 }
1845
1846 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
1847  * to all-zero-bits by some other entity.  If 'row' was set up some other way
1848  * then the return value is indeterminate. */
1849 bool
1850 ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
1851 {
1852     return row->table == NULL;
1853 }
1854 \f
1855 /* Transactions. */
1856
1857 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1858                                    enum ovsdb_idl_txn_status);
1859
1860 /* Returns a string representation of 'status'.  The caller must not modify or
1861  * free the returned string.
1862  *
1863  * The return value is probably useful only for debug log messages and unit
1864  * tests. */
1865 const char *
1866 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
1867 {
1868     switch (status) {
1869     case TXN_UNCOMMITTED:
1870         return "uncommitted";
1871     case TXN_UNCHANGED:
1872         return "unchanged";
1873     case TXN_INCOMPLETE:
1874         return "incomplete";
1875     case TXN_ABORTED:
1876         return "aborted";
1877     case TXN_SUCCESS:
1878         return "success";
1879     case TXN_TRY_AGAIN:
1880         return "try again";
1881     case TXN_NOT_LOCKED:
1882         return "not locked";
1883     case TXN_ERROR:
1884         return "error";
1885     }
1886     return "<unknown>";
1887 }
1888
1889 /* Starts a new transaction on 'idl'.  A given ovsdb_idl may only have a single
1890  * active transaction at a time.  See the large comment in ovsdb-idl.h for
1891  * general information on transactions. */
1892 struct ovsdb_idl_txn *
1893 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
1894 {
1895     struct ovsdb_idl_txn *txn;
1896
1897     ovs_assert(!idl->txn);
1898     idl->txn = txn = xmalloc(sizeof *txn);
1899     txn->request_id = NULL;
1900     txn->idl = idl;
1901     hmap_init(&txn->txn_rows);
1902     txn->status = TXN_UNCOMMITTED;
1903     txn->error = NULL;
1904     txn->dry_run = false;
1905     ds_init(&txn->comment);
1906
1907     txn->inc_table = NULL;
1908     txn->inc_column = NULL;
1909
1910     hmap_init(&txn->inserted_rows);
1911
1912     return txn;
1913 }
1914
1915 /* Appends 's', which is treated as a printf()-type format string, to the
1916  * comments that will be passed to the OVSDB server when 'txn' is committed.
1917  * (The comment will be committed to the OVSDB log, which "ovsdb-tool
1918  * show-log" can print in a relatively human-readable form.) */
1919 void
1920 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s, ...)
1921 {
1922     va_list args;
1923
1924     if (txn->comment.length) {
1925         ds_put_char(&txn->comment, '\n');
1926     }
1927
1928     va_start(args, s);
1929     ds_put_format_valist(&txn->comment, s, args);
1930     va_end(args);
1931 }
1932
1933 /* Marks 'txn' as a transaction that will not actually modify the database.  In
1934  * almost every way, the transaction is treated like other transactions.  It
1935  * must be committed or aborted like other transactions, it will be sent to the
1936  * database server like other transactions, and so on.  The only difference is
1937  * that the operations sent to the database server will include, as the last
1938  * step, an "abort" operation, so that any changes made by the transaction will
1939  * not actually take effect. */
1940 void
1941 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
1942 {
1943     txn->dry_run = true;
1944 }
1945
1946 /* Causes 'txn', when committed, to increment the value of 'column' within
1947  * 'row' by 1.  'column' must have an integer type.  After 'txn' commits
1948  * successfully, the client may retrieve the final (incremented) value of
1949  * 'column' with ovsdb_idl_txn_get_increment_new_value().
1950  *
1951  * The client could accomplish something similar with ovsdb_idl_read(),
1952  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
1953  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
1954  * will never (by itself) fail because of a verify error.
1955  *
1956  * The intended use is for incrementing the "next_cfg" column in the
1957  * Open_vSwitch table. */
1958 void
1959 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
1960                         const struct ovsdb_idl_row *row,
1961                         const struct ovsdb_idl_column *column)
1962 {
1963     ovs_assert(!txn->inc_table);
1964     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
1965     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
1966
1967     txn->inc_table = row->table->class->name;
1968     txn->inc_column = column->name;
1969     txn->inc_row = row->uuid;
1970 }
1971
1972 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
1973  * has been called for 'txn' but the commit is still incomplete (that is, the
1974  * last call returned TXN_INCOMPLETE) then the transaction may or may not still
1975  * end up committing at the database server, but the client will not be able to
1976  * get any further status information back. */
1977 void
1978 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
1979 {
1980     struct ovsdb_idl_txn_insert *insert, *next;
1981
1982     json_destroy(txn->request_id);
1983     if (txn->status == TXN_INCOMPLETE) {
1984         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1985     }
1986     ovsdb_idl_txn_abort(txn);
1987     ds_destroy(&txn->comment);
1988     free(txn->error);
1989     HMAP_FOR_EACH_SAFE (insert, next, hmap_node, &txn->inserted_rows) {
1990         free(insert);
1991     }
1992     hmap_destroy(&txn->inserted_rows);
1993     free(txn);
1994 }
1995
1996 /* Causes poll_block() to wake up if 'txn' has completed committing. */
1997 void
1998 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
1999 {
2000     if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
2001         poll_immediate_wake();
2002     }
2003 }
2004
2005 static struct json *
2006 where_uuid_equals(const struct uuid *uuid)
2007 {
2008     return
2009         json_array_create_1(
2010             json_array_create_3(
2011                 json_string_create("_uuid"),
2012                 json_string_create("=="),
2013                 json_array_create_2(
2014                     json_string_create("uuid"),
2015                     json_string_create_nocopy(
2016                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
2017 }
2018
2019 static char *
2020 uuid_name_from_uuid(const struct uuid *uuid)
2021 {
2022     char *name;
2023     char *p;
2024
2025     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
2026     for (p = name; *p != '\0'; p++) {
2027         if (*p == '-') {
2028             *p = '_';
2029         }
2030     }
2031
2032     return name;
2033 }
2034
2035 static const struct ovsdb_idl_row *
2036 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
2037 {
2038     const struct ovsdb_idl_row *row;
2039
2040     HMAP_FOR_EACH_WITH_HASH (row, txn_node, uuid_hash(uuid), &txn->txn_rows) {
2041         if (uuid_equals(&row->uuid, uuid)) {
2042             return row;
2043         }
2044     }
2045     return NULL;
2046 }
2047
2048 /* XXX there must be a cleaner way to do this */
2049 static struct json *
2050 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
2051 {
2052     if (json->type == JSON_ARRAY) {
2053         struct uuid uuid;
2054         size_t i;
2055
2056         if (json->u.array.n == 2
2057             && json->u.array.elems[0]->type == JSON_STRING
2058             && json->u.array.elems[1]->type == JSON_STRING
2059             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
2060             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
2061             const struct ovsdb_idl_row *row;
2062
2063             row = ovsdb_idl_txn_get_row(txn, &uuid);
2064             if (row && !row->old && row->new) {
2065                 json_destroy(json);
2066
2067                 return json_array_create_2(
2068                     json_string_create("named-uuid"),
2069                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
2070             }
2071         }
2072
2073         for (i = 0; i < json->u.array.n; i++) {
2074             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
2075                                                       txn);
2076         }
2077     } else if (json->type == JSON_OBJECT) {
2078         struct shash_node *node;
2079
2080         SHASH_FOR_EACH (node, json_object(json)) {
2081             node->data = substitute_uuids(node->data, txn);
2082         }
2083     }
2084     return json;
2085 }
2086
2087 static void
2088 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
2089 {
2090     struct ovsdb_idl_row *row, *next;
2091
2092     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
2093      * ovsdb_idl_column's 'parse' function, which will call
2094      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
2095      * transaction and fail to update the graph.  */
2096     txn->idl->txn = NULL;
2097
2098     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
2099         if (row->old) {
2100             if (row->written) {
2101                 ovsdb_idl_row_unparse(row);
2102                 ovsdb_idl_row_clear_arcs(row, false);
2103                 ovsdb_idl_row_parse(row);
2104             }
2105         } else {
2106             ovsdb_idl_row_unparse(row);
2107         }
2108         ovsdb_idl_row_clear_new(row);
2109
2110         free(row->prereqs);
2111         row->prereqs = NULL;
2112
2113         free(row->written);
2114         row->written = NULL;
2115
2116         hmap_remove(&txn->txn_rows, &row->txn_node);
2117         hmap_node_nullify(&row->txn_node);
2118         if (!row->old) {
2119             hmap_remove(&row->table->rows, &row->hmap_node);
2120             free(row);
2121         }
2122     }
2123     hmap_destroy(&txn->txn_rows);
2124     hmap_init(&txn->txn_rows);
2125 }
2126
2127 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
2128  * of the following TXN_* constants:
2129  *
2130  *   TXN_INCOMPLETE:
2131  *
2132  *       The transaction is in progress, but not yet complete.  The caller
2133  *       should call again later, after calling ovsdb_idl_run() to let the IDL
2134  *       do OVSDB protocol processing.
2135  *
2136  *   TXN_UNCHANGED:
2137  *
2138  *       The transaction is complete.  (It didn't actually change the database,
2139  *       so the IDL didn't send any request to the database server.)
2140  *
2141  *   TXN_ABORTED:
2142  *
2143  *       The caller previously called ovsdb_idl_txn_abort().
2144  *
2145  *   TXN_SUCCESS:
2146  *
2147  *       The transaction was successful.  The update made by the transaction
2148  *       (and possibly other changes made by other database clients) should
2149  *       already be visible in the IDL.
2150  *
2151  *   TXN_TRY_AGAIN:
2152  *
2153  *       The transaction failed for some transient reason, e.g. because a
2154  *       "verify" operation reported an inconsistency or due to a network
2155  *       problem.  The caller should wait for a change to the database, then
2156  *       compose a new transaction, and commit the new transaction.
2157  *
2158  *       Use the return value of ovsdb_idl_get_seqno() to wait for a change in
2159  *       the database.  It is important to use its return value *before* the
2160  *       initial call to ovsdb_idl_txn_commit() as the baseline for this
2161  *       purpose, because the change that one should wait for can happen after
2162  *       the initial call but before the call that returns TXN_TRY_AGAIN, and
2163  *       using some other baseline value in that situation could cause an
2164  *       indefinite wait if the database rarely changes.
2165  *
2166  *   TXN_NOT_LOCKED:
2167  *
2168  *       The transaction failed because the IDL has been configured to require
2169  *       a database lock (with ovsdb_idl_set_lock()) but didn't get it yet or
2170  *       has already lost it.
2171  *
2172  * Committing a transaction rolls back all of the changes that it made to the
2173  * IDL's copy of the database.  If the transaction commits successfully, then
2174  * the database server will send an update and, thus, the IDL will be updated
2175  * with the committed changes. */
2176 enum ovsdb_idl_txn_status
2177 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
2178 {
2179     struct ovsdb_idl_row *row;
2180     struct json *operations;
2181     bool any_updates;
2182
2183     if (txn != txn->idl->txn) {
2184         goto coverage_out;
2185     }
2186
2187     /* If we need a lock but don't have it, give up quickly. */
2188     if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
2189         txn->status = TXN_NOT_LOCKED;
2190         goto disassemble_out;
2191     }
2192
2193     operations = json_array_create_1(
2194         json_string_create(txn->idl->class->database));
2195
2196     /* Assert that we have the required lock (avoiding a race). */
2197     if (txn->idl->lock_name) {
2198         struct json *op = json_object_create();
2199         json_array_add(operations, op);
2200         json_object_put_string(op, "op", "assert");
2201         json_object_put_string(op, "lock", txn->idl->lock_name);
2202     }
2203
2204     /* Add prerequisites and declarations of new rows. */
2205     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2206         /* XXX check that deleted rows exist even if no prereqs? */
2207         if (row->prereqs) {
2208             const struct ovsdb_idl_table_class *class = row->table->class;
2209             size_t n_columns = class->n_columns;
2210             struct json *op, *columns, *row_json;
2211             size_t idx;
2212
2213             op = json_object_create();
2214             json_array_add(operations, op);
2215             json_object_put_string(op, "op", "wait");
2216             json_object_put_string(op, "table", class->name);
2217             json_object_put(op, "timeout", json_integer_create(0));
2218             json_object_put(op, "where", where_uuid_equals(&row->uuid));
2219             json_object_put_string(op, "until", "==");
2220             columns = json_array_create_empty();
2221             json_object_put(op, "columns", columns);
2222             row_json = json_object_create();
2223             json_object_put(op, "rows", json_array_create_1(row_json));
2224
2225             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
2226                 const struct ovsdb_idl_column *column = &class->columns[idx];
2227                 json_array_add(columns, json_string_create(column->name));
2228                 json_object_put(row_json, column->name,
2229                                 ovsdb_datum_to_json(&row->old[idx],
2230                                                     &column->type));
2231             }
2232         }
2233     }
2234
2235     /* Add updates. */
2236     any_updates = false;
2237     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2238         const struct ovsdb_idl_table_class *class = row->table->class;
2239
2240         if (!row->new) {
2241             if (class->is_root) {
2242                 struct json *op = json_object_create();
2243                 json_object_put_string(op, "op", "delete");
2244                 json_object_put_string(op, "table", class->name);
2245                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2246                 json_array_add(operations, op);
2247                 any_updates = true;
2248             } else {
2249                 /* Let ovsdb-server decide whether to really delete it. */
2250             }
2251         } else if (row->old != row->new) {
2252             struct json *row_json;
2253             struct json *op;
2254             size_t idx;
2255
2256             op = json_object_create();
2257             json_object_put_string(op, "op", row->old ? "update" : "insert");
2258             json_object_put_string(op, "table", class->name);
2259             if (row->old) {
2260                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2261             } else {
2262                 struct ovsdb_idl_txn_insert *insert;
2263
2264                 any_updates = true;
2265
2266                 json_object_put(op, "uuid-name",
2267                                 json_string_create_nocopy(
2268                                     uuid_name_from_uuid(&row->uuid)));
2269
2270                 insert = xmalloc(sizeof *insert);
2271                 insert->dummy = row->uuid;
2272                 insert->op_index = operations->u.array.n - 1;
2273                 uuid_zero(&insert->real);
2274                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
2275                             uuid_hash(&insert->dummy));
2276             }
2277             row_json = json_object_create();
2278             json_object_put(op, "row", row_json);
2279
2280             if (row->written) {
2281                 BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
2282                     const struct ovsdb_idl_column *column =
2283                                                         &class->columns[idx];
2284
2285                     if (row->old
2286                         || !ovsdb_datum_is_default(&row->new[idx],
2287                                                   &column->type)) {
2288                         json_object_put(row_json, column->name,
2289                                         substitute_uuids(
2290                                             ovsdb_datum_to_json(&row->new[idx],
2291                                                                 &column->type),
2292                                             txn));
2293
2294                         /* If anything really changed, consider it an update.
2295                          * We can't suppress not-really-changed values earlier
2296                          * or transactions would become nonatomic (see the big
2297                          * comment inside ovsdb_idl_txn_write()). */
2298                         if (!any_updates && row->old &&
2299                             !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
2300                                                 &column->type)) {
2301                             any_updates = true;
2302                         }
2303                     }
2304                 }
2305             }
2306
2307             if (!row->old || !shash_is_empty(json_object(row_json))) {
2308                 json_array_add(operations, op);
2309             } else {
2310                 json_destroy(op);
2311             }
2312         }
2313     }
2314
2315     /* Add increment. */
2316     if (txn->inc_table && any_updates) {
2317         struct json *op;
2318
2319         txn->inc_index = operations->u.array.n - 1;
2320
2321         op = json_object_create();
2322         json_object_put_string(op, "op", "mutate");
2323         json_object_put_string(op, "table", txn->inc_table);
2324         json_object_put(op, "where",
2325                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2326                                          txn));
2327         json_object_put(op, "mutations",
2328                         json_array_create_1(
2329                             json_array_create_3(
2330                                 json_string_create(txn->inc_column),
2331                                 json_string_create("+="),
2332                                 json_integer_create(1))));
2333         json_array_add(operations, op);
2334
2335         op = json_object_create();
2336         json_object_put_string(op, "op", "select");
2337         json_object_put_string(op, "table", txn->inc_table);
2338         json_object_put(op, "where",
2339                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2340                                          txn));
2341         json_object_put(op, "columns",
2342                         json_array_create_1(json_string_create(
2343                                                 txn->inc_column)));
2344         json_array_add(operations, op);
2345     }
2346
2347     if (txn->comment.length) {
2348         struct json *op = json_object_create();
2349         json_object_put_string(op, "op", "comment");
2350         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
2351         json_array_add(operations, op);
2352     }
2353
2354     if (txn->dry_run) {
2355         struct json *op = json_object_create();
2356         json_object_put_string(op, "op", "abort");
2357         json_array_add(operations, op);
2358     }
2359
2360     if (!any_updates) {
2361         txn->status = TXN_UNCHANGED;
2362         json_destroy(operations);
2363     } else if (!jsonrpc_session_send(
2364                    txn->idl->session,
2365                    jsonrpc_create_request(
2366                        "transact", operations, &txn->request_id))) {
2367         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
2368                     json_hash(txn->request_id, 0));
2369         txn->status = TXN_INCOMPLETE;
2370     } else {
2371         txn->status = TXN_TRY_AGAIN;
2372     }
2373
2374 disassemble_out:
2375     ovsdb_idl_txn_disassemble(txn);
2376 coverage_out:
2377     switch (txn->status) {
2378     case TXN_UNCOMMITTED:   COVERAGE_INC(txn_uncommitted);    break;
2379     case TXN_UNCHANGED:     COVERAGE_INC(txn_unchanged);      break;
2380     case TXN_INCOMPLETE:    COVERAGE_INC(txn_incomplete);     break;
2381     case TXN_ABORTED:       COVERAGE_INC(txn_aborted);        break;
2382     case TXN_SUCCESS:       COVERAGE_INC(txn_success);        break;
2383     case TXN_TRY_AGAIN:     COVERAGE_INC(txn_try_again);      break;
2384     case TXN_NOT_LOCKED:    COVERAGE_INC(txn_not_locked);     break;
2385     case TXN_ERROR:         COVERAGE_INC(txn_error);          break;
2386     }
2387
2388     return txn->status;
2389 }
2390
2391 /* Attempts to commit 'txn', blocking until the commit either succeeds or
2392  * fails.  Returns the final commit status, which may be any TXN_* value other
2393  * than TXN_INCOMPLETE.
2394  *
2395  * This function calls ovsdb_idl_run() on 'txn''s IDL, so it may cause the
2396  * return value of ovsdb_idl_get_seqno() to change. */
2397 enum ovsdb_idl_txn_status
2398 ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
2399 {
2400     enum ovsdb_idl_txn_status status;
2401
2402     fatal_signal_run();
2403     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
2404         ovsdb_idl_run(txn->idl);
2405         ovsdb_idl_wait(txn->idl);
2406         ovsdb_idl_txn_wait(txn);
2407         poll_block();
2408     }
2409     return status;
2410 }
2411
2412 /* Returns the final (incremented) value of the column in 'txn' that was set to
2413  * be incremented by ovsdb_idl_txn_increment().  'txn' must have committed
2414  * successfully. */
2415 int64_t
2416 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
2417 {
2418     ovs_assert(txn->status == TXN_SUCCESS);
2419     return txn->inc_new_value;
2420 }
2421
2422 /* Aborts 'txn' without sending it to the database server.  This is effective
2423  * only if ovsdb_idl_txn_commit() has not yet been called for 'txn'.
2424  * Otherwise, it has no effect.
2425  *
2426  * Aborting a transaction doesn't free its memory.  Use
2427  * ovsdb_idl_txn_destroy() to do that. */
2428 void
2429 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
2430 {
2431     ovsdb_idl_txn_disassemble(txn);
2432     if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
2433         txn->status = TXN_ABORTED;
2434     }
2435 }
2436
2437 /* Returns a string that reports the error status for 'txn'.  The caller must
2438  * not modify or free the returned string.  A call to ovsdb_idl_txn_destroy()
2439  * for 'txn' may free the returned string.
2440  *
2441  * The return value is ordinarily one of the strings that
2442  * ovsdb_idl_txn_status_to_string() would return, but if the transaction failed
2443  * due to an error reported by the database server, the return value is that
2444  * error. */
2445 const char *
2446 ovsdb_idl_txn_get_error(const struct ovsdb_idl_txn *txn)
2447 {
2448     if (txn->status != TXN_ERROR) {
2449         return ovsdb_idl_txn_status_to_string(txn->status);
2450     } else if (txn->error) {
2451         return txn->error;
2452     } else {
2453         return "no error details available";
2454     }
2455 }
2456
2457 static void
2458 ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn,
2459                              const struct json *json)
2460 {
2461     if (txn->error == NULL) {
2462         txn->error = json_to_string(json, JSSF_SORT);
2463     }
2464 }
2465
2466 /* For transaction 'txn' that completed successfully, finds and returns the
2467  * permanent UUID that the database assigned to a newly inserted row, given the
2468  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
2469  *
2470  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
2471  * if it was assigned by that function and then deleted by
2472  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
2473  * and then deleted within a single transaction are never sent to the database
2474  * server, so it never assigns them a permanent UUID.) */
2475 const struct uuid *
2476 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
2477                               const struct uuid *uuid)
2478 {
2479     const struct ovsdb_idl_txn_insert *insert;
2480
2481     ovs_assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
2482     HMAP_FOR_EACH_IN_BUCKET (insert, hmap_node,
2483                              uuid_hash(uuid), &txn->inserted_rows) {
2484         if (uuid_equals(uuid, &insert->dummy)) {
2485             return &insert->real;
2486         }
2487     }
2488     return NULL;
2489 }
2490
2491 static void
2492 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
2493                        enum ovsdb_idl_txn_status status)
2494 {
2495     txn->status = status;
2496     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2497 }
2498
2499 /* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
2500  * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
2501  * ovs-vswitchd).
2502  *
2503  * 'datum' must have the correct type for its column.  The IDL does not check
2504  * that it meets schema constraints, but ovsdb-server will do so at commit time
2505  * so it had better be correct.
2506  *
2507  * A transaction must be in progress.  Replication of 'column' must not have
2508  * been disabled (by calling ovsdb_idl_omit()).
2509  *
2510  * Usually this function is used indirectly through one of the "set" functions
2511  * generated by ovsdb-idlc.
2512  *
2513  * Takes ownership of what 'datum' points to (and in some cases destroys that
2514  * data before returning) but makes a copy of 'datum' itself.  (Commonly
2515  * 'datum' is on the caller's stack.) */
2516 static void
2517 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
2518                       const struct ovsdb_idl_column *column,
2519                       struct ovsdb_datum *datum, bool owns_datum)
2520 {
2521     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2522     const struct ovsdb_idl_table_class *class;
2523     size_t column_idx;
2524     bool write_only;
2525
2526     if (ovsdb_idl_row_is_synthetic(row)) {
2527         goto discard_datum;
2528     }
2529
2530     class = row->table->class;
2531     column_idx = column - class->columns;
2532     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
2533
2534     ovs_assert(row->new != NULL);
2535     ovs_assert(column_idx < class->n_columns);
2536     ovs_assert(row->old == NULL ||
2537                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2538
2539     if (row->table->idl->verify_write_only && !write_only) {
2540         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
2541                  " explicitly configured not to.", class->name, column->name);
2542         goto discard_datum;
2543     }
2544
2545     /* If this is a write-only column and the datum being written is the same
2546      * as the one already there, just skip the update entirely.  This is worth
2547      * optimizing because we have a lot of columns that get periodically
2548      * refreshed into the database but don't actually change that often.
2549      *
2550      * We don't do this for read/write columns because that would break
2551      * atomicity of transactions--some other client might have written a
2552      * different value in that column since we read it.  (But if a whole
2553      * transaction only does writes of existing values, without making any real
2554      * changes, we will drop the whole transaction later in
2555      * ovsdb_idl_txn_commit().) */
2556     if (write_only && ovsdb_datum_equals(ovsdb_idl_read(row, column),
2557                                          datum, &column->type)) {
2558         goto discard_datum;
2559     }
2560
2561     if (hmap_node_is_null(&row->txn_node)) {
2562         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2563                     uuid_hash(&row->uuid));
2564     }
2565     if (row->old == row->new) {
2566         row->new = xmalloc(class->n_columns * sizeof *row->new);
2567     }
2568     if (!row->written) {
2569         row->written = bitmap_allocate(class->n_columns);
2570     }
2571     if (bitmap_is_set(row->written, column_idx)) {
2572         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
2573     } else {
2574         bitmap_set1(row->written, column_idx);
2575     }
2576     if (owns_datum) {
2577         row->new[column_idx] = *datum;
2578     } else {
2579         ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
2580     }
2581     (column->unparse)(row);
2582     (column->parse)(row, &row->new[column_idx]);
2583     return;
2584
2585 discard_datum:
2586     if (owns_datum) {
2587         ovsdb_datum_destroy(datum, &column->type);
2588     }
2589 }
2590
2591 void
2592 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
2593                     const struct ovsdb_idl_column *column,
2594                     struct ovsdb_datum *datum)
2595 {
2596     ovsdb_idl_txn_write__(row, column, datum, true);
2597 }
2598
2599 void
2600 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
2601                           const struct ovsdb_idl_column *column,
2602                           const struct ovsdb_datum *datum)
2603 {
2604     ovsdb_idl_txn_write__(row, column,
2605                           CONST_CAST(struct ovsdb_datum *, datum), false);
2606 }
2607
2608 /* Causes the original contents of 'column' in 'row_' to be verified as a
2609  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
2610  * changed (or if 'row_' was deleted) between the time that the IDL originally
2611  * read its contents and the time that the transaction commits, then the
2612  * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
2613  * TXN_AGAIN_NOW (depending on whether the database change has already been
2614  * received).
2615  *
2616  * The intention is that, to ensure that no transaction commits based on dirty
2617  * reads, an application should call ovsdb_idl_txn_verify() on each data item
2618  * read as part of a read-modify-write operation.
2619  *
2620  * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
2621  * value of 'column' is already known:
2622  *
2623  *   - If 'row_' is a row created by the current transaction (returned by
2624  *     ovsdb_idl_txn_insert()).
2625  *
2626  *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
2627  *     within the current transaction.
2628  *
2629  * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
2630  * ovsdb_idl_txn_write() for a given read-modify-write.
2631  *
2632  * A transaction must be in progress.
2633  *
2634  * Usually this function is used indirectly through one of the "verify"
2635  * functions generated by ovsdb-idlc. */
2636 void
2637 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
2638                      const struct ovsdb_idl_column *column)
2639 {
2640     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2641     const struct ovsdb_idl_table_class *class;
2642     size_t column_idx;
2643
2644     if (ovsdb_idl_row_is_synthetic(row)) {
2645         return;
2646     }
2647
2648     class = row->table->class;
2649     column_idx = column - class->columns;
2650
2651     ovs_assert(row->new != NULL);
2652     ovs_assert(row->old == NULL ||
2653                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2654     if (!row->old
2655         || (row->written && bitmap_is_set(row->written, column_idx))) {
2656         return;
2657     }
2658
2659     if (hmap_node_is_null(&row->txn_node)) {
2660         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2661                     uuid_hash(&row->uuid));
2662     }
2663     if (!row->prereqs) {
2664         row->prereqs = bitmap_allocate(class->n_columns);
2665     }
2666     bitmap_set1(row->prereqs, column_idx);
2667 }
2668
2669 /* Deletes 'row_' from its table.  May free 'row_', so it must not be
2670  * accessed afterward.
2671  *
2672  * A transaction must be in progress.
2673  *
2674  * Usually this function is used indirectly through one of the "delete"
2675  * functions generated by ovsdb-idlc. */
2676 void
2677 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
2678 {
2679     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2680
2681     if (ovsdb_idl_row_is_synthetic(row)) {
2682         return;
2683     }
2684
2685     ovs_assert(row->new != NULL);
2686     if (!row->old) {
2687         ovsdb_idl_row_unparse(row);
2688         ovsdb_idl_row_clear_new(row);
2689         ovs_assert(!row->prereqs);
2690         hmap_remove(&row->table->rows, &row->hmap_node);
2691         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
2692         free(row);
2693         return;
2694     }
2695     if (hmap_node_is_null(&row->txn_node)) {
2696         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2697                     uuid_hash(&row->uuid));
2698     }
2699     ovsdb_idl_row_clear_new(row);
2700     row->new = NULL;
2701 }
2702
2703 /* Inserts and returns a new row in the table with the specified 'class' in the
2704  * database with open transaction 'txn'.
2705  *
2706  * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
2707  * randomly generated; otherwise 'uuid' should specify a randomly generated
2708  * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
2709  * 'txn' is committed, but the IDL will replace any uses of the provisional
2710  * UUID in the data to be to be committed by the UUID assigned by
2711  * ovsdb-server.
2712  *
2713  * Usually this function is used indirectly through one of the "insert"
2714  * functions generated by ovsdb-idlc. */
2715 const struct ovsdb_idl_row *
2716 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
2717                      const struct ovsdb_idl_table_class *class,
2718                      const struct uuid *uuid)
2719 {
2720     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
2721
2722     if (uuid) {
2723         ovs_assert(!ovsdb_idl_txn_get_row(txn, uuid));
2724         row->uuid = *uuid;
2725     } else {
2726         uuid_generate(&row->uuid);
2727     }
2728
2729     row->table = ovsdb_idl_table_from_class(txn->idl, class);
2730     row->new = xmalloc(class->n_columns * sizeof *row->new);
2731     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
2732     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
2733     return row;
2734 }
2735
2736 static void
2737 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
2738 {
2739     struct ovsdb_idl_txn *txn;
2740
2741     HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
2742         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
2743     }
2744 }
2745
2746 static struct ovsdb_idl_txn *
2747 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
2748 {
2749     struct ovsdb_idl_txn *txn;
2750
2751     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
2752                              json_hash(id, 0), &idl->outstanding_txns) {
2753         if (json_equal(id, txn->request_id)) {
2754             return txn;
2755         }
2756     }
2757     return NULL;
2758 }
2759
2760 static bool
2761 check_json_type(const struct json *json, enum json_type type, const char *name)
2762 {
2763     if (!json) {
2764         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
2765         return false;
2766     } else if (json->type != type) {
2767         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
2768                      name, json_type_to_string(json->type),
2769                      json_type_to_string(type));
2770         return false;
2771     } else {
2772         return true;
2773     }
2774 }
2775
2776 static bool
2777 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
2778                                 const struct json_array *results)
2779 {
2780     struct json *count, *rows, *row, *column;
2781     struct shash *mutate, *select;
2782
2783     if (txn->inc_index + 2 > results->n) {
2784         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2785                      "for increment (has %"PRIuSIZE", needs %u)",
2786                      results->n, txn->inc_index + 2);
2787         return false;
2788     }
2789
2790     /* We know that this is a JSON object because the loop in
2791      * ovsdb_idl_txn_process_reply() checked. */
2792     mutate = json_object(results->elems[txn->inc_index]);
2793     count = shash_find_data(mutate, "count");
2794     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
2795         return false;
2796     }
2797     if (count->u.integer != 1) {
2798         VLOG_WARN_RL(&syntax_rl,
2799                      "\"mutate\" reply \"count\" is %lld instead of 1",
2800                      count->u.integer);
2801         return false;
2802     }
2803
2804     select = json_object(results->elems[txn->inc_index + 1]);
2805     rows = shash_find_data(select, "rows");
2806     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
2807         return false;
2808     }
2809     if (rows->u.array.n != 1) {
2810         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %"PRIuSIZE" elements "
2811                      "instead of 1",
2812                      rows->u.array.n);
2813         return false;
2814     }
2815     row = rows->u.array.elems[0];
2816     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
2817         return false;
2818     }
2819     column = shash_find_data(json_object(row), txn->inc_column);
2820     if (!check_json_type(column, JSON_INTEGER,
2821                          "\"select\" reply inc column")) {
2822         return false;
2823     }
2824     txn->inc_new_value = column->u.integer;
2825     return true;
2826 }
2827
2828 static bool
2829 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
2830                                    const struct json_array *results)
2831 {
2832     static const struct ovsdb_base_type uuid_type = OVSDB_BASE_UUID_INIT;
2833     struct ovsdb_error *error;
2834     struct json *json_uuid;
2835     union ovsdb_atom uuid;
2836     struct shash *reply;
2837
2838     if (insert->op_index >= results->n) {
2839         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2840                      "for insert (has %"PRIuSIZE", needs %u)",
2841                      results->n, insert->op_index);
2842         return false;
2843     }
2844
2845     /* We know that this is a JSON object because the loop in
2846      * ovsdb_idl_txn_process_reply() checked. */
2847     reply = json_object(results->elems[insert->op_index]);
2848     json_uuid = shash_find_data(reply, "uuid");
2849     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
2850         return false;
2851     }
2852
2853     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
2854     if (error) {
2855         char *s = ovsdb_error_to_string(error);
2856         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
2857                      "UUID: %s", s);
2858         free(s);
2859         ovsdb_error_destroy(error);
2860         return false;
2861     }
2862
2863     insert->real = uuid.uuid;
2864
2865     return true;
2866 }
2867
2868 static bool
2869 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
2870                             const struct jsonrpc_msg *msg)
2871 {
2872     struct ovsdb_idl_txn *txn;
2873     enum ovsdb_idl_txn_status status;
2874
2875     txn = ovsdb_idl_txn_find(idl, msg->id);
2876     if (!txn) {
2877         return false;
2878     }
2879
2880     if (msg->type == JSONRPC_ERROR) {
2881         status = TXN_ERROR;
2882     } else if (msg->result->type != JSON_ARRAY) {
2883         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
2884         status = TXN_ERROR;
2885     } else {
2886         struct json_array *ops = &msg->result->u.array;
2887         int hard_errors = 0;
2888         int soft_errors = 0;
2889         int lock_errors = 0;
2890         size_t i;
2891
2892         for (i = 0; i < ops->n; i++) {
2893             struct json *op = ops->elems[i];
2894
2895             if (op->type == JSON_NULL) {
2896                 /* This isn't an error in itself but indicates that some prior
2897                  * operation failed, so make sure that we know about it. */
2898                 soft_errors++;
2899             } else if (op->type == JSON_OBJECT) {
2900                 struct json *error;
2901
2902                 error = shash_find_data(json_object(op), "error");
2903                 if (error) {
2904                     if (error->type == JSON_STRING) {
2905                         if (!strcmp(error->u.string, "timed out")) {
2906                             soft_errors++;
2907                         } else if (!strcmp(error->u.string, "not owner")) {
2908                             lock_errors++;
2909                         } else if (strcmp(error->u.string, "aborted")) {
2910                             hard_errors++;
2911                             ovsdb_idl_txn_set_error_json(txn, op);
2912                         }
2913                     } else {
2914                         hard_errors++;
2915                         ovsdb_idl_txn_set_error_json(txn, op);
2916                         VLOG_WARN_RL(&syntax_rl,
2917                                      "\"error\" in reply is not JSON string");
2918                     }
2919                 }
2920             } else {
2921                 hard_errors++;
2922                 ovsdb_idl_txn_set_error_json(txn, op);
2923                 VLOG_WARN_RL(&syntax_rl,
2924                              "operation reply is not JSON null or object");
2925             }
2926         }
2927
2928         if (!soft_errors && !hard_errors && !lock_errors) {
2929             struct ovsdb_idl_txn_insert *insert;
2930
2931             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
2932                 hard_errors++;
2933             }
2934
2935             HMAP_FOR_EACH (insert, hmap_node, &txn->inserted_rows) {
2936                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
2937                     hard_errors++;
2938                 }
2939             }
2940         }
2941
2942         status = (hard_errors ? TXN_ERROR
2943                   : lock_errors ? TXN_NOT_LOCKED
2944                   : soft_errors ? TXN_TRY_AGAIN
2945                   : TXN_SUCCESS);
2946     }
2947
2948     ovsdb_idl_txn_complete(txn, status);
2949     return true;
2950 }
2951
2952 /* Returns the transaction currently active for 'row''s IDL.  A transaction
2953  * must currently be active. */
2954 struct ovsdb_idl_txn *
2955 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
2956 {
2957     struct ovsdb_idl_txn *txn = row->table->idl->txn;
2958     ovs_assert(txn != NULL);
2959     return txn;
2960 }
2961
2962 /* Returns the IDL on which 'txn' acts. */
2963 struct ovsdb_idl *
2964 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
2965 {
2966     return txn->idl;
2967 }
2968
2969 /* Blocks until 'idl' successfully connects to the remote database and
2970  * retrieves its contents. */
2971 void
2972 ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
2973 {
2974     while (1) {
2975         ovsdb_idl_run(idl);
2976         if (ovsdb_idl_has_ever_connected(idl)) {
2977             return;
2978         }
2979         ovsdb_idl_wait(idl);
2980         poll_block();
2981     }
2982 }
2983 \f
2984 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
2985  * the database server and to avoid modifying the database when the lock cannot
2986  * be acquired (that is, when another client has the same lock).
2987  *
2988  * If 'lock_name' is NULL, drops the locking requirement and releases the
2989  * lock. */
2990 void
2991 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
2992 {
2993     ovs_assert(!idl->txn);
2994     ovs_assert(hmap_is_empty(&idl->outstanding_txns));
2995
2996     if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
2997         /* Release previous lock. */
2998         ovsdb_idl_send_unlock_request(idl);
2999         free(idl->lock_name);
3000         idl->lock_name = NULL;
3001         idl->is_lock_contended = false;
3002     }
3003
3004     if (lock_name && !idl->lock_name) {
3005         /* Acquire new lock. */
3006         idl->lock_name = xstrdup(lock_name);
3007         ovsdb_idl_send_lock_request(idl);
3008     }
3009 }
3010
3011 /* Returns true if 'idl' is configured to obtain a lock and owns that lock.
3012  *
3013  * Locking and unlocking happens asynchronously from the database client's
3014  * point of view, so the information is only useful for optimization (e.g. if
3015  * the client doesn't have the lock then there's no point in trying to write to
3016  * the database). */
3017 bool
3018 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
3019 {
3020     return idl->has_lock;
3021 }
3022
3023 /* Returns true if 'idl' is configured to obtain a lock but the database server
3024  * has indicated that some other client already owns the requested lock. */
3025 bool
3026 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
3027 {
3028     return idl->is_lock_contended;
3029 }
3030
3031 static void
3032 ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
3033 {
3034     if (new_has_lock && !idl->has_lock) {
3035         if (idl->state == IDL_S_MONITORING ||
3036             idl->state == IDL_S_MONITORING2) {
3037             idl->change_seqno++;
3038         } else {
3039             /* We're setting up a session, so don't signal that the database
3040              * changed.  Finalizing the session will increment change_seqno
3041              * anyhow. */
3042         }
3043         idl->is_lock_contended = false;
3044     }
3045     idl->has_lock = new_has_lock;
3046 }
3047
3048 static void
3049 ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
3050                               struct json **idp)
3051 {
3052     ovsdb_idl_update_has_lock(idl, false);
3053
3054     json_destroy(idl->lock_request_id);
3055     idl->lock_request_id = NULL;
3056
3057     if (jsonrpc_session_is_connected(idl->session)) {
3058         struct json *params;
3059
3060         params = json_array_create_1(json_string_create(idl->lock_name));
3061         jsonrpc_session_send(idl->session,
3062                              jsonrpc_create_request(method, params, idp));
3063     }
3064 }
3065
3066 static void
3067 ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
3068 {
3069     ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
3070 }
3071
3072 static void
3073 ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
3074 {
3075     ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
3076 }
3077
3078 static void
3079 ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
3080 {
3081     bool got_lock;
3082
3083     json_destroy(idl->lock_request_id);
3084     idl->lock_request_id = NULL;
3085
3086     if (result->type == JSON_OBJECT) {
3087         const struct json *locked;
3088
3089         locked = shash_find_data(json_object(result), "locked");
3090         got_lock = locked && locked->type == JSON_TRUE;
3091     } else {
3092         got_lock = false;
3093     }
3094
3095     ovsdb_idl_update_has_lock(idl, got_lock);
3096     if (!got_lock) {
3097         idl->is_lock_contended = true;
3098     }
3099 }
3100
3101 static void
3102 ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
3103                             const struct json *params,
3104                             bool new_has_lock)
3105 {
3106     if (idl->lock_name
3107         && params->type == JSON_ARRAY
3108         && json_array(params)->n > 0
3109         && json_array(params)->elems[0]->type == JSON_STRING) {
3110         const char *lock_name = json_string(json_array(params)->elems[0]);
3111
3112         if (!strcmp(idl->lock_name, lock_name)) {
3113             ovsdb_idl_update_has_lock(idl, new_has_lock);
3114             if (!new_has_lock) {
3115                 idl->is_lock_contended = true;
3116             }
3117         }
3118     }
3119 }
3120
3121 void
3122 ovsdb_idl_loop_destroy(struct ovsdb_idl_loop *loop)
3123 {
3124     if (loop) {
3125         ovsdb_idl_destroy(loop->idl);
3126     }
3127 }
3128
3129 struct ovsdb_idl_txn *
3130 ovsdb_idl_loop_run(struct ovsdb_idl_loop *loop)
3131 {
3132     ovsdb_idl_run(loop->idl);
3133     loop->open_txn = (loop->committing_txn
3134                       || ovsdb_idl_get_seqno(loop->idl) == loop->skip_seqno
3135                       ? NULL
3136                       : ovsdb_idl_txn_create(loop->idl));
3137     return loop->open_txn;
3138 }
3139
3140 void
3141 ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
3142 {
3143     if (loop->open_txn) {
3144         loop->committing_txn = loop->open_txn;
3145         loop->open_txn = NULL;
3146
3147         loop->precommit_seqno = ovsdb_idl_get_seqno(loop->idl);
3148     }
3149
3150     struct ovsdb_idl_txn *txn = loop->committing_txn;
3151     if (txn) {
3152         enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(txn);
3153         if (status != TXN_INCOMPLETE) {
3154             switch (status) {
3155             case TXN_TRY_AGAIN:
3156                 /* We want to re-evaluate the database when it's changed from
3157                  * the contents that it had when we started the commit.  (That
3158                  * might have already happened.) */
3159                 loop->skip_seqno = loop->precommit_seqno;
3160                 if (ovsdb_idl_get_seqno(loop->idl) != loop->skip_seqno) {
3161                     poll_immediate_wake();
3162                 }
3163                 break;
3164
3165             case TXN_SUCCESS:
3166                 /* If the database has already changed since we started the
3167                  * commit, re-evaluate it immediately to avoid missing a change
3168                  * for a while. */
3169                 if (ovsdb_idl_get_seqno(loop->idl) != loop->precommit_seqno) {
3170                     poll_immediate_wake();
3171                 }
3172                 break;
3173
3174             case TXN_UNCHANGED:
3175             case TXN_ABORTED:
3176             case TXN_NOT_LOCKED:
3177             case TXN_ERROR:
3178                 break;
3179
3180             case TXN_UNCOMMITTED:
3181             case TXN_INCOMPLETE:
3182                 OVS_NOT_REACHED();
3183
3184             }
3185             ovsdb_idl_txn_destroy(txn);
3186             loop->committing_txn = NULL;
3187         }
3188     }
3189
3190     ovsdb_idl_wait(loop->idl);
3191 }