00b900d2c539d3ba47b520440bb33e679d8ec517
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "coverage.h"
27 #include "dynamic-string.h"
28 #include "fatal-signal.h"
29 #include "json.h"
30 #include "jsonrpc.h"
31 #include "ovsdb/ovsdb.h"
32 #include "ovsdb/table.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-idl-provider.h"
36 #include "ovsdb-parser.h"
37 #include "poll-loop.h"
38 #include "shash.h"
39 #include "sset.h"
40 #include "util.h"
41 #include "openvswitch/vlog.h"
42
43 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
44
45 COVERAGE_DEFINE(txn_uncommitted);
46 COVERAGE_DEFINE(txn_unchanged);
47 COVERAGE_DEFINE(txn_incomplete);
48 COVERAGE_DEFINE(txn_aborted);
49 COVERAGE_DEFINE(txn_success);
50 COVERAGE_DEFINE(txn_try_again);
51 COVERAGE_DEFINE(txn_not_locked);
52 COVERAGE_DEFINE(txn_error);
53
54 /* An arc from one idl_row to another.  When row A contains a UUID that
55  * references row B, this is represented by an arc from A (the source) to B
56  * (the destination).
57  *
58  * Arcs from a row to itself are omitted, that is, src and dst are always
59  * different.
60  *
61  * Arcs are never duplicated, that is, even if there are multiple references
62  * from A to B, there is only a single arc from A to B.
63  *
64  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
65  * A.  Both an arc and its converse may both be present, if each row refers
66  * to the other circularly.
67  *
68  * The source and destination row may be in the same table or in different
69  * tables.
70  */
71 struct ovsdb_idl_arc {
72     struct ovs_list src_node;   /* In src->src_arcs list. */
73     struct ovs_list dst_node;   /* In dst->dst_arcs list. */
74     struct ovsdb_idl_row *src;  /* Source row. */
75     struct ovsdb_idl_row *dst;  /* Destination row. */
76 };
77
78 enum ovsdb_idl_state {
79     IDL_S_SCHEMA_REQUESTED,
80     IDL_S_MONITOR_REQUESTED,
81     IDL_S_MONITORING
82 };
83
84 struct ovsdb_idl {
85     const struct ovsdb_idl_class *class;
86     struct jsonrpc_session *session;
87     struct shash table_by_name;
88     struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
89     unsigned int change_seqno;
90     bool verify_write_only;
91
92     /* Session state. */
93     unsigned int state_seqno;
94     enum ovsdb_idl_state state;
95     struct json *request_id;
96
97     /* Database locking. */
98     char *lock_name;            /* Name of lock we need, NULL if none. */
99     bool has_lock;              /* Has db server told us we have the lock? */
100     bool is_lock_contended;     /* Has db server told us we can't get lock? */
101     struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
102
103     /* Transaction support. */
104     struct ovsdb_idl_txn *txn;
105     struct hmap outstanding_txns;
106 };
107
108 struct ovsdb_idl_txn {
109     struct hmap_node hmap_node;
110     struct json *request_id;
111     struct ovsdb_idl *idl;
112     struct hmap txn_rows;
113     enum ovsdb_idl_txn_status status;
114     char *error;
115     bool dry_run;
116     struct ds comment;
117
118     /* Increments. */
119     const char *inc_table;
120     const char *inc_column;
121     struct uuid inc_row;
122     unsigned int inc_index;
123     int64_t inc_new_value;
124
125     /* Inserted rows. */
126     struct hmap inserted_rows;  /* Contains "struct ovsdb_idl_txn_insert"s. */
127 };
128
129 struct ovsdb_idl_txn_insert {
130     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
131     struct uuid dummy;          /* Dummy UUID used locally. */
132     int op_index;               /* Index into transaction's operation array. */
133     struct uuid real;           /* Real UUID used by database server. */
134 };
135
136 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
137 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
138
139 static void ovsdb_idl_clear(struct ovsdb_idl *);
140 static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
141 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *,
142                                            const struct json *schema_json);
143 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
144 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
145                                                     const struct json *);
146 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
147                                      const struct uuid *,
148                                      const struct json *old,
149                                      const struct json *new);
150 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
151 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
152 static bool ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
153
154 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
155 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
156     const struct ovsdb_idl_table_class *);
157 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
158                                                   const struct uuid *);
159 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
160
161 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
162 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
163 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
164 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
165
166 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
167 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
168                                         const struct jsonrpc_msg *msg);
169
170 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
171 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
172 static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
173                                        const struct json *);
174 static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
175                                         const struct json *params,
176                                         bool new_has_lock);
177
178 /* Creates and returns a connection to database 'remote', which should be in a
179  * form acceptable to jsonrpc_session_open().  The connection will maintain an
180  * in-memory replica of the remote database whose schema is described by
181  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
182  * by ovsdb-idlc.)
183  *
184  * Passes 'retry' to jsonrpc_session_open().  See that function for
185  * documentation.
186  *
187  * If 'monitor_everything_by_default' is true, then everything in the remote
188  * database will be replicated by default.  ovsdb_idl_omit() and
189  * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
190  * monitoring.
191  *
192  * If 'monitor_everything_by_default' is false, then no columns or tables will
193  * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
194  * must be used to choose some columns or tables to replicate.
195  */
196 struct ovsdb_idl *
197 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
198                  bool monitor_everything_by_default, bool retry)
199 {
200     struct ovsdb_idl *idl;
201     uint8_t default_mode;
202     size_t i;
203
204     default_mode = (monitor_everything_by_default
205                     ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
206                     : 0);
207
208     idl = xzalloc(sizeof *idl);
209     idl->class = class;
210     idl->session = jsonrpc_session_open(remote, retry);
211     shash_init(&idl->table_by_name);
212     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
213     for (i = 0; i < class->n_tables; i++) {
214         const struct ovsdb_idl_table_class *tc = &class->tables[i];
215         struct ovsdb_idl_table *table = &idl->tables[i];
216         size_t j;
217
218         shash_add_assert(&idl->table_by_name, tc->name, table);
219         table->class = tc;
220         table->modes = xmalloc(tc->n_columns);
221         memset(table->modes, default_mode, tc->n_columns);
222         table->need_table = false;
223         shash_init(&table->columns);
224         for (j = 0; j < tc->n_columns; j++) {
225             const struct ovsdb_idl_column *column = &tc->columns[j];
226
227             shash_add_assert(&table->columns, column->name, column);
228         }
229         hmap_init(&table->rows);
230         table->idl = idl;
231     }
232
233     idl->state_seqno = UINT_MAX;
234     idl->request_id = NULL;
235
236     hmap_init(&idl->outstanding_txns);
237
238     return idl;
239 }
240
241 /* Destroys 'idl' and all of the data structures that it manages. */
242 void
243 ovsdb_idl_destroy(struct ovsdb_idl *idl)
244 {
245     if (idl) {
246         size_t i;
247
248         ovs_assert(!idl->txn);
249         ovsdb_idl_clear(idl);
250         jsonrpc_session_close(idl->session);
251
252         for (i = 0; i < idl->class->n_tables; i++) {
253             struct ovsdb_idl_table *table = &idl->tables[i];
254             shash_destroy(&table->columns);
255             hmap_destroy(&table->rows);
256             free(table->modes);
257         }
258         shash_destroy(&idl->table_by_name);
259         free(idl->tables);
260         json_destroy(idl->request_id);
261         free(idl->lock_name);
262         json_destroy(idl->lock_request_id);
263         hmap_destroy(&idl->outstanding_txns);
264         free(idl);
265     }
266 }
267
268 static void
269 ovsdb_idl_clear(struct ovsdb_idl *idl)
270 {
271     bool changed = false;
272     size_t i;
273
274     for (i = 0; i < idl->class->n_tables; i++) {
275         struct ovsdb_idl_table *table = &idl->tables[i];
276         struct ovsdb_idl_row *row, *next_row;
277
278         if (hmap_is_empty(&table->rows)) {
279             continue;
280         }
281
282         changed = true;
283         HMAP_FOR_EACH_SAFE (row, next_row, hmap_node, &table->rows) {
284             struct ovsdb_idl_arc *arc, *next_arc;
285
286             if (!ovsdb_idl_row_is_orphan(row)) {
287                 ovsdb_idl_row_unparse(row);
288             }
289             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
290                 free(arc);
291             }
292             /* No need to do anything with dst_arcs: some node has those arcs
293              * as forward arcs and will destroy them itself. */
294
295             ovsdb_idl_row_destroy(row);
296         }
297     }
298
299     if (changed) {
300         idl->change_seqno++;
301     }
302 }
303
304 /* Processes a batch of messages from the database server on 'idl'.  This may
305  * cause the IDL's contents to change.  The client may check for that with
306  * ovsdb_idl_get_seqno(). */
307 void
308 ovsdb_idl_run(struct ovsdb_idl *idl)
309 {
310     int i;
311
312     ovs_assert(!idl->txn);
313     jsonrpc_session_run(idl->session);
314     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
315         struct jsonrpc_msg *msg;
316         unsigned int seqno;
317
318         seqno = jsonrpc_session_get_seqno(idl->session);
319         if (idl->state_seqno != seqno) {
320             idl->state_seqno = seqno;
321             json_destroy(idl->request_id);
322             idl->request_id = NULL;
323             ovsdb_idl_txn_abort_all(idl);
324
325             ovsdb_idl_send_schema_request(idl);
326             idl->state = IDL_S_SCHEMA_REQUESTED;
327             if (idl->lock_name) {
328                 ovsdb_idl_send_lock_request(idl);
329             }
330         }
331
332         msg = jsonrpc_session_recv(idl->session);
333         if (!msg) {
334             break;
335         }
336
337         if (msg->type == JSONRPC_NOTIFY
338             && !strcmp(msg->method, "update")
339             && msg->params->type == JSON_ARRAY
340             && msg->params->u.array.n == 2
341             && msg->params->u.array.elems[0]->type == JSON_NULL) {
342             /* Database contents changed. */
343             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
344         } else if (msg->type == JSONRPC_REPLY
345                    && idl->request_id
346                    && json_equal(idl->request_id, msg->id)) {
347             switch (idl->state) {
348             case IDL_S_SCHEMA_REQUESTED:
349                 /* Reply to our "get_schema" request. */
350                 json_destroy(idl->request_id);
351                 idl->request_id = NULL;
352                 ovsdb_idl_send_monitor_request(idl, msg->result);
353                 idl->state = IDL_S_MONITOR_REQUESTED;
354                 break;
355
356             case IDL_S_MONITOR_REQUESTED:
357                 /* Reply to our "monitor" request. */
358                 idl->change_seqno++;
359                 json_destroy(idl->request_id);
360                 idl->request_id = NULL;
361                 idl->state = IDL_S_MONITORING;
362                 ovsdb_idl_clear(idl);
363                 ovsdb_idl_parse_update(idl, msg->result);
364                 break;
365
366             case IDL_S_MONITORING:
367             default:
368                 OVS_NOT_REACHED();
369             }
370         } else if (msg->type == JSONRPC_REPLY
371                    && idl->lock_request_id
372                    && json_equal(idl->lock_request_id, msg->id)) {
373             /* Reply to our "lock" request. */
374             ovsdb_idl_parse_lock_reply(idl, msg->result);
375         } else if (msg->type == JSONRPC_NOTIFY
376                    && !strcmp(msg->method, "locked")) {
377             /* We got our lock. */
378             ovsdb_idl_parse_lock_notify(idl, msg->params, true);
379         } else if (msg->type == JSONRPC_NOTIFY
380                    && !strcmp(msg->method, "stolen")) {
381             /* Someone else stole our lock. */
382             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
383         } else if ((msg->type == JSONRPC_ERROR
384                     || msg->type == JSONRPC_REPLY)
385                    && ovsdb_idl_txn_process_reply(idl, msg)) {
386             /* ovsdb_idl_txn_process_reply() did everything needful. */
387         } else {
388             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
389              * a transaction before we receive the reply, so keep the log level
390              * low. */
391             VLOG_DBG("%s: received unexpected %s message",
392                      jsonrpc_session_get_name(idl->session),
393                      jsonrpc_msg_type_to_string(msg->type));
394         }
395         jsonrpc_msg_destroy(msg);
396     }
397 }
398
399 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
400  * do or when activity occurs on a transaction on 'idl'. */
401 void
402 ovsdb_idl_wait(struct ovsdb_idl *idl)
403 {
404     jsonrpc_session_wait(idl->session);
405     jsonrpc_session_recv_wait(idl->session);
406 }
407
408 /* Returns a "sequence number" that represents the state of 'idl'.  When
409  * ovsdb_idl_run() changes the database, the sequence number changes.  The
410  * initial fetch of the entire contents of the remote database is considered to
411  * be one kind of change.  Successfully acquiring a lock, if one has been
412  * configured with ovsdb_idl_set_lock(), is also considered to be a change.
413  *
414  * As long as the sequence number does not change, the client may continue to
415  * use any data structures it obtains from 'idl'.  But when it changes, the
416  * client must not access any of these data structures again, because they
417  * could have freed or reused for other purposes.
418  *
419  * The sequence number can occasionally change even if the database does not.
420  * This happens if the connection to the database drops and reconnects, which
421  * causes the database contents to be reloaded even if they didn't change.  (It
422  * could also happen if the database server sends out a "change" that reflects
423  * what the IDL already thought was in the database.  The database server is
424  * not supposed to do that, but bugs could in theory cause it to do so.) */
425 unsigned int
426 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
427 {
428     return idl->change_seqno;
429 }
430
431 /* Returns true if 'idl' successfully connected to the remote database and
432  * retrieved its contents (even if the connection subsequently dropped and is
433  * in the process of reconnecting).  If so, then 'idl' contains an atomic
434  * snapshot of the database's contents (but it might be arbitrarily old if the
435  * connection dropped).
436  *
437  * Returns false if 'idl' has never connected or retrieved the database's
438  * contents.  If so, 'idl' is empty. */
439 bool
440 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
441 {
442     return ovsdb_idl_get_seqno(idl) != 0;
443 }
444
445 /* Reconfigures 'idl' so that it would reconnect to the database, if
446  * connection was dropped. */
447 void
448 ovsdb_idl_enable_reconnect(struct ovsdb_idl *idl)
449 {
450     jsonrpc_session_enable_reconnect(idl->session);
451 }
452
453 /* Forces 'idl' to drop its connection to the database and reconnect.  In the
454  * meantime, the contents of 'idl' will not change. */
455 void
456 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
457 {
458     jsonrpc_session_force_reconnect(idl->session);
459 }
460
461 /* Some IDL users should only write to write-only columns.  Furthermore,
462  * writing to a column which is not write-only can cause serious performance
463  * degradations for these users.  This function causes 'idl' to reject writes
464  * to columns which are not marked write only using ovsdb_idl_omit_alert(). */
465 void
466 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
467 {
468     idl->verify_write_only = true;
469 }
470
471 /* Returns true if 'idl' is currently connected or trying to connect. */
472 bool
473 ovsdb_idl_is_alive(const struct ovsdb_idl *idl)
474 {
475     return jsonrpc_session_is_alive(idl->session);
476 }
477
478 /* Returns the last error reported on a connection by 'idl'.  The return value
479  * is 0 only if no connection made by 'idl' has ever encountered an error.  See
480  * jsonrpc_get_status() for return value interpretation. */
481 int
482 ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
483 {
484     return jsonrpc_session_get_last_error(idl->session);
485 }
486 \f
487 static unsigned char *
488 ovsdb_idl_get_mode(struct ovsdb_idl *idl,
489                    const struct ovsdb_idl_column *column)
490 {
491     size_t i;
492
493     ovs_assert(!idl->change_seqno);
494
495     for (i = 0; i < idl->class->n_tables; i++) {
496         const struct ovsdb_idl_table *table = &idl->tables[i];
497         const struct ovsdb_idl_table_class *tc = table->class;
498
499         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
500             return &table->modes[column - tc->columns];
501         }
502     }
503
504     OVS_NOT_REACHED();
505 }
506
507 static void
508 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
509 {
510     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
511         struct ovsdb_idl_table *table;
512
513         table = shash_find_data(&idl->table_by_name,
514                                 base->u.uuid.refTableName);
515         if (table) {
516             table->need_table = true;
517         } else {
518             VLOG_WARN("%s IDL class missing referenced table %s",
519                       idl->class->database, base->u.uuid.refTableName);
520         }
521     }
522 }
523
524 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
525  * ensures that any tables referenced by 'column' will be replicated, even if
526  * no columns in that table are selected for replication (see
527  * ovsdb_idl_add_table() for more information).
528  *
529  * This function is only useful if 'monitor_everything_by_default' was false in
530  * the call to ovsdb_idl_create().  This function should be called between
531  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
532  */
533 void
534 ovsdb_idl_add_column(struct ovsdb_idl *idl,
535                      const struct ovsdb_idl_column *column)
536 {
537     *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
538     add_ref_table(idl, &column->type.key);
539     add_ref_table(idl, &column->type.value);
540 }
541
542 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
543  * no columns are selected for replication.  This can be useful because it
544  * allows 'idl' to keep track of what rows in the table actually exist, which
545  * in turn allows columns that reference the table to have accurate contents.
546  * (The IDL presents the database with references to rows that do not exist
547  * removed.)
548  *
549  * This function is only useful if 'monitor_everything_by_default' was false in
550  * the call to ovsdb_idl_create().  This function should be called between
551  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
552  */
553 void
554 ovsdb_idl_add_table(struct ovsdb_idl *idl,
555                     const struct ovsdb_idl_table_class *tc)
556 {
557     size_t i;
558
559     for (i = 0; i < idl->class->n_tables; i++) {
560         struct ovsdb_idl_table *table = &idl->tables[i];
561
562         if (table->class == tc) {
563             table->need_table = true;
564             return;
565         }
566     }
567
568     OVS_NOT_REACHED();
569 }
570
571 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
572  *
573  * This function should be called between ovsdb_idl_create() and the first call
574  * to ovsdb_idl_run().
575  */
576 void
577 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
578                      const struct ovsdb_idl_column *column)
579 {
580     *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
581 }
582
583 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
584  * OVSDB_IDL_MONITOR for details.
585  *
586  * This function should be called between ovsdb_idl_create() and the first call
587  * to ovsdb_idl_run().
588  */
589 void
590 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
591 {
592     *ovsdb_idl_get_mode(idl, column) = 0;
593 }
594 \f
595 static void
596 ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
597 {
598     struct jsonrpc_msg *msg;
599
600     json_destroy(idl->request_id);
601     msg = jsonrpc_create_request(
602         "get_schema",
603         json_array_create_1(json_string_create(idl->class->database)),
604         &idl->request_id);
605     jsonrpc_session_send(idl->session, msg);
606 }
607
608 static void
609 log_error(struct ovsdb_error *error)
610 {
611     char *s = ovsdb_error_to_string(error);
612     VLOG_WARN("error parsing database schema: %s", s);
613     free(s);
614     ovsdb_error_destroy(error);
615 }
616
617 /* Frees 'schema', which is in the format returned by parse_schema(). */
618 static void
619 free_schema(struct shash *schema)
620 {
621     if (schema) {
622         struct shash_node *node, *next;
623
624         SHASH_FOR_EACH_SAFE (node, next, schema) {
625             struct sset *sset = node->data;
626             sset_destroy(sset);
627             free(sset);
628             shash_delete(schema, node);
629         }
630         shash_destroy(schema);
631         free(schema);
632     }
633 }
634
635 /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
636  * 7047, to obtain the names of its rows and columns.  If successful, returns
637  * an shash whose keys are table names and whose values are ssets, where each
638  * sset contains the names of its table's columns.  On failure (due to a parse
639  * error), returns NULL.
640  *
641  * It would also be possible to use the general-purpose OVSDB schema parser in
642  * ovsdb-server, but that's overkill, possibly too strict for the current use
643  * case, and would require restructuring ovsdb-server to separate the schema
644  * code from the rest. */
645 static struct shash *
646 parse_schema(const struct json *schema_json)
647 {
648     struct ovsdb_parser parser;
649     const struct json *tables_json;
650     struct ovsdb_error *error;
651     struct shash_node *node;
652     struct shash *schema;
653
654     ovsdb_parser_init(&parser, schema_json, "database schema");
655     tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
656     error = ovsdb_parser_destroy(&parser);
657     if (error) {
658         log_error(error);
659         return NULL;
660     }
661
662     schema = xmalloc(sizeof *schema);
663     shash_init(schema);
664     SHASH_FOR_EACH (node, json_object(tables_json)) {
665         const char *table_name = node->name;
666         const struct json *json = node->data;
667         const struct json *columns_json;
668
669         ovsdb_parser_init(&parser, json, "table schema for table %s",
670                           table_name);
671         columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
672         error = ovsdb_parser_destroy(&parser);
673         if (error) {
674             log_error(error);
675             free_schema(schema);
676             return NULL;
677         }
678
679         struct sset *columns = xmalloc(sizeof *columns);
680         sset_init(columns);
681
682         struct shash_node *node2;
683         SHASH_FOR_EACH (node2, json_object(columns_json)) {
684             const char *column_name = node2->name;
685             sset_add(columns, column_name);
686         }
687         shash_add(schema, table_name, columns);
688     }
689     return schema;
690 }
691
692 static void
693 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl,
694                                const struct json *schema_json)
695 {
696     struct shash *schema = parse_schema(schema_json);
697     struct json *monitor_requests;
698     struct jsonrpc_msg *msg;
699     size_t i;
700
701     monitor_requests = json_object_create();
702     for (i = 0; i < idl->class->n_tables; i++) {
703         const struct ovsdb_idl_table *table = &idl->tables[i];
704         const struct ovsdb_idl_table_class *tc = table->class;
705         struct json *monitor_request, *columns;
706         const struct sset *table_schema;
707         size_t j;
708
709         table_schema = (schema
710                         ? shash_find_data(schema, table->class->name)
711                         : NULL);
712
713         columns = table->need_table ? json_array_create_empty() : NULL;
714         for (j = 0; j < tc->n_columns; j++) {
715             const struct ovsdb_idl_column *column = &tc->columns[j];
716             if (table->modes[j] & OVSDB_IDL_MONITOR) {
717                 if (table_schema
718                     && !sset_contains(table_schema, column->name)) {
719                     VLOG_WARN("%s table in %s database lacks %s column "
720                               "(database needs upgrade?)",
721                               table->class->name, idl->class->database,
722                               column->name);
723                     continue;
724                 }
725                 if (!columns) {
726                     columns = json_array_create_empty();
727                 }
728                 json_array_add(columns, json_string_create(column->name));
729             }
730         }
731
732         if (columns) {
733             if (schema && !table_schema) {
734                 VLOG_WARN("%s database lacks %s table "
735                           "(database needs upgrade?)",
736                           idl->class->database, table->class->name);
737                 json_destroy(columns);
738                 continue;
739             }
740
741             monitor_request = json_object_create();
742             json_object_put(monitor_request, "columns", columns);
743             json_object_put(monitor_requests, tc->name, monitor_request);
744         }
745     }
746     free_schema(schema);
747
748     json_destroy(idl->request_id);
749     msg = jsonrpc_create_request(
750         "monitor",
751         json_array_create_3(json_string_create(idl->class->database),
752                             json_null_create(), monitor_requests),
753         &idl->request_id);
754     jsonrpc_session_send(idl->session, msg);
755 }
756
757 static void
758 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates)
759 {
760     struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates);
761     if (error) {
762         if (!VLOG_DROP_WARN(&syntax_rl)) {
763             char *s = ovsdb_error_to_string(error);
764             VLOG_WARN_RL(&syntax_rl, "%s", s);
765             free(s);
766         }
767         ovsdb_error_destroy(error);
768     }
769 }
770
771 static struct ovsdb_error *
772 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
773                          const struct json *table_updates)
774 {
775     const struct shash_node *tables_node;
776
777     if (table_updates->type != JSON_OBJECT) {
778         return ovsdb_syntax_error(table_updates, NULL,
779                                   "<table-updates> is not an object");
780     }
781     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
782         const struct json *table_update = tables_node->data;
783         const struct shash_node *table_node;
784         struct ovsdb_idl_table *table;
785
786         table = shash_find_data(&idl->table_by_name, tables_node->name);
787         if (!table) {
788             return ovsdb_syntax_error(
789                 table_updates, NULL,
790                 "<table-updates> includes unknown table \"%s\"",
791                 tables_node->name);
792         }
793
794         if (table_update->type != JSON_OBJECT) {
795             return ovsdb_syntax_error(table_update, NULL,
796                                       "<table-update> for table \"%s\" is "
797                                       "not an object", table->class->name);
798         }
799         SHASH_FOR_EACH (table_node, json_object(table_update)) {
800             const struct json *row_update = table_node->data;
801             const struct json *old_json, *new_json;
802             struct uuid uuid;
803
804             if (!uuid_from_string(&uuid, table_node->name)) {
805                 return ovsdb_syntax_error(table_update, NULL,
806                                           "<table-update> for table \"%s\" "
807                                           "contains bad UUID "
808                                           "\"%s\" as member name",
809                                           table->class->name,
810                                           table_node->name);
811             }
812             if (row_update->type != JSON_OBJECT) {
813                 return ovsdb_syntax_error(row_update, NULL,
814                                           "<table-update> for table \"%s\" "
815                                           "contains <row-update> for %s that "
816                                           "is not an object",
817                                           table->class->name,
818                                           table_node->name);
819             }
820
821             old_json = shash_find_data(json_object(row_update), "old");
822             new_json = shash_find_data(json_object(row_update), "new");
823             if (old_json && old_json->type != JSON_OBJECT) {
824                 return ovsdb_syntax_error(old_json, NULL,
825                                           "\"old\" <row> is not object");
826             } else if (new_json && new_json->type != JSON_OBJECT) {
827                 return ovsdb_syntax_error(new_json, NULL,
828                                           "\"new\" <row> is not object");
829             } else if ((old_json != NULL) + (new_json != NULL)
830                        != shash_count(json_object(row_update))) {
831                 return ovsdb_syntax_error(row_update, NULL,
832                                           "<row-update> contains unexpected "
833                                           "member");
834             } else if (!old_json && !new_json) {
835                 return ovsdb_syntax_error(row_update, NULL,
836                                           "<row-update> missing \"old\" "
837                                           "and \"new\" members");
838             }
839
840             if (ovsdb_idl_process_update(table, &uuid, old_json, new_json)) {
841                 idl->change_seqno++;
842             }
843         }
844     }
845
846     return NULL;
847 }
848
849 static struct ovsdb_idl_row *
850 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
851 {
852     struct ovsdb_idl_row *row;
853
854     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &table->rows) {
855         if (uuid_equals(&row->uuid, uuid)) {
856             return row;
857         }
858     }
859     return NULL;
860 }
861
862 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
863  * otherwise. */
864 static bool
865 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
866                          const struct uuid *uuid, const struct json *old,
867                          const struct json *new)
868 {
869     struct ovsdb_idl_row *row;
870
871     row = ovsdb_idl_get_row(table, uuid);
872     if (!new) {
873         /* Delete row. */
874         if (row && !ovsdb_idl_row_is_orphan(row)) {
875             /* XXX perhaps we should check the 'old' values? */
876             ovsdb_idl_delete_row(row);
877         } else {
878             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
879                          "from table %s",
880                          UUID_ARGS(uuid), table->class->name);
881             return false;
882         }
883     } else if (!old) {
884         /* Insert row. */
885         if (!row) {
886             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
887         } else if (ovsdb_idl_row_is_orphan(row)) {
888             ovsdb_idl_insert_row(row, new);
889         } else {
890             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
891                          "table %s", UUID_ARGS(uuid), table->class->name);
892             return ovsdb_idl_modify_row(row, new);
893         }
894     } else {
895         /* Modify row. */
896         if (row) {
897             /* XXX perhaps we should check the 'old' values? */
898             if (!ovsdb_idl_row_is_orphan(row)) {
899                 return ovsdb_idl_modify_row(row, new);
900             } else {
901                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
902                              "referenced row "UUID_FMT" in table %s",
903                              UUID_ARGS(uuid), table->class->name);
904                 ovsdb_idl_insert_row(row, new);
905             }
906         } else {
907             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
908                          "in table %s", UUID_ARGS(uuid), table->class->name);
909             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
910         }
911     }
912
913     return true;
914 }
915
916 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
917  * otherwise. */
918 static bool
919 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
920 {
921     struct ovsdb_idl_table *table = row->table;
922     struct shash_node *node;
923     bool changed = false;
924
925     SHASH_FOR_EACH (node, json_object(row_json)) {
926         const char *column_name = node->name;
927         const struct ovsdb_idl_column *column;
928         struct ovsdb_datum datum;
929         struct ovsdb_error *error;
930
931         column = shash_find_data(&table->columns, column_name);
932         if (!column) {
933             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
934                          column_name, UUID_ARGS(&row->uuid));
935             continue;
936         }
937
938         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
939         if (!error) {
940             unsigned int column_idx = column - table->class->columns;
941             struct ovsdb_datum *old = &row->old[column_idx];
942
943             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
944                 ovsdb_datum_swap(old, &datum);
945                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
946                     changed = true;
947                 }
948             } else {
949                 /* Didn't really change but the OVSDB monitor protocol always
950                  * includes every value in a row. */
951             }
952
953             ovsdb_datum_destroy(&datum, &column->type);
954         } else {
955             char *s = ovsdb_error_to_string(error);
956             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
957                          " in table %s: %s", column_name,
958                          UUID_ARGS(&row->uuid), table->class->name, s);
959             free(s);
960             ovsdb_error_destroy(error);
961         }
962     }
963     return changed;
964 }
965
966 /* When a row A refers to row B through a column with a "refTable" constraint,
967  * but row B does not exist, row B is called an "orphan row".  Orphan rows
968  * should not persist, because the database enforces referential integrity, but
969  * they can appear transiently as changes from the database are received (the
970  * database doesn't try to topologically sort them and circular references mean
971  * it isn't always possible anyhow).
972  *
973  * This function returns true if 'row' is an orphan row, otherwise false.
974  */
975 static bool
976 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
977 {
978     return !row->old && !row->new;
979 }
980
981 /* Returns true if 'row' is conceptually part of the database as modified by
982  * the current transaction (if any), false otherwise.
983  *
984  * This function will return true if 'row' is not an orphan (see the comment on
985  * ovsdb_idl_row_is_orphan()) and:
986  *
987  *   - 'row' exists in the database and has not been deleted within the
988  *     current transaction (if any).
989  *
990  *   - 'row' was inserted within the current transaction and has not been
991  *     deleted.  (In the latter case you should not have passed 'row' in at
992  *     all, because ovsdb_idl_txn_delete() freed it.)
993  *
994  * This function will return false if 'row' is an orphan or if 'row' was
995  * deleted within the current transaction.
996  */
997 static bool
998 ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
999 {
1000     return row->new != NULL;
1001 }
1002
1003 static void
1004 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
1005 {
1006     const struct ovsdb_idl_table_class *class = row->table->class;
1007     size_t i;
1008
1009     for (i = 0; i < class->n_columns; i++) {
1010         const struct ovsdb_idl_column *c = &class->columns[i];
1011         (c->parse)(row, &row->old[i]);
1012     }
1013 }
1014
1015 static void
1016 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
1017 {
1018     const struct ovsdb_idl_table_class *class = row->table->class;
1019     size_t i;
1020
1021     for (i = 0; i < class->n_columns; i++) {
1022         const struct ovsdb_idl_column *c = &class->columns[i];
1023         (c->unparse)(row);
1024     }
1025 }
1026
1027 static void
1028 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
1029 {
1030     ovs_assert(row->old == row->new);
1031     if (!ovsdb_idl_row_is_orphan(row)) {
1032         const struct ovsdb_idl_table_class *class = row->table->class;
1033         size_t i;
1034
1035         for (i = 0; i < class->n_columns; i++) {
1036             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
1037         }
1038         free(row->old);
1039         row->old = row->new = NULL;
1040     }
1041 }
1042
1043 static void
1044 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
1045 {
1046     if (row->old != row->new) {
1047         if (row->new) {
1048             const struct ovsdb_idl_table_class *class = row->table->class;
1049             size_t i;
1050
1051             if (row->written) {
1052                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
1053                     ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
1054                 }
1055             }
1056             free(row->new);
1057             free(row->written);
1058             row->written = NULL;
1059         }
1060         row->new = row->old;
1061     }
1062 }
1063
1064 static void
1065 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
1066 {
1067     struct ovsdb_idl_arc *arc, *next;
1068
1069     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
1070      * that this causes to be unreferenced. */
1071     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
1072         list_remove(&arc->dst_node);
1073         if (destroy_dsts
1074             && ovsdb_idl_row_is_orphan(arc->dst)
1075             && list_is_empty(&arc->dst->dst_arcs)) {
1076             ovsdb_idl_row_destroy(arc->dst);
1077         }
1078         free(arc);
1079     }
1080     list_init(&row->src_arcs);
1081 }
1082
1083 /* Force nodes that reference 'row' to reparse. */
1084 static void
1085 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
1086 {
1087     struct ovsdb_idl_arc *arc, *next;
1088
1089     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
1090      * 'arc', so we need to use the "safe" variant of list traversal.  However,
1091      * calling an ovsdb_idl_column's 'parse' function will add an arc
1092      * equivalent to 'arc' to row->arcs.  That could be a problem for
1093      * traversal, but it adds it at the beginning of the list to prevent us
1094      * from stumbling upon it again.
1095      *
1096      * (If duplicate arcs were possible then we would need to make sure that
1097      * 'next' didn't also point into 'arc''s destination, but we forbid
1098      * duplicate arcs.) */
1099     LIST_FOR_EACH_SAFE (arc, next, dst_node, &row->dst_arcs) {
1100         struct ovsdb_idl_row *ref = arc->src;
1101
1102         ovsdb_idl_row_unparse(ref);
1103         ovsdb_idl_row_clear_arcs(ref, false);
1104         ovsdb_idl_row_parse(ref);
1105     }
1106 }
1107
1108 static struct ovsdb_idl_row *
1109 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
1110 {
1111     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
1112     class->row_init(row);
1113     list_init(&row->src_arcs);
1114     list_init(&row->dst_arcs);
1115     hmap_node_nullify(&row->txn_node);
1116     return row;
1117 }
1118
1119 static struct ovsdb_idl_row *
1120 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
1121 {
1122     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
1123     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
1124     row->uuid = *uuid;
1125     row->table = table;
1126     return row;
1127 }
1128
1129 static void
1130 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
1131 {
1132     if (row) {
1133         ovsdb_idl_row_clear_old(row);
1134         hmap_remove(&row->table->rows, &row->hmap_node);
1135         free(row);
1136     }
1137 }
1138
1139 static void
1140 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
1141 {
1142     const struct ovsdb_idl_table_class *class = row->table->class;
1143     size_t i;
1144
1145     ovs_assert(!row->old && !row->new);
1146     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
1147     for (i = 0; i < class->n_columns; i++) {
1148         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
1149     }
1150     ovsdb_idl_row_update(row, row_json);
1151     ovsdb_idl_row_parse(row);
1152
1153     ovsdb_idl_row_reparse_backrefs(row);
1154 }
1155
1156 static void
1157 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
1158 {
1159     ovsdb_idl_row_unparse(row);
1160     ovsdb_idl_row_clear_arcs(row, true);
1161     ovsdb_idl_row_clear_old(row);
1162     if (list_is_empty(&row->dst_arcs)) {
1163         ovsdb_idl_row_destroy(row);
1164     } else {
1165         ovsdb_idl_row_reparse_backrefs(row);
1166     }
1167 }
1168
1169 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1170  * otherwise. */
1171 static bool
1172 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
1173 {
1174     bool changed;
1175
1176     ovsdb_idl_row_unparse(row);
1177     ovsdb_idl_row_clear_arcs(row, true);
1178     changed = ovsdb_idl_row_update(row, row_json);
1179     ovsdb_idl_row_parse(row);
1180
1181     return changed;
1182 }
1183
1184 static bool
1185 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
1186 {
1187     const struct ovsdb_idl_arc *arc;
1188
1189     /* No self-arcs. */
1190     if (src == dst) {
1191         return false;
1192     }
1193
1194     /* No duplicate arcs.
1195      *
1196      * We only need to test whether the first arc in dst->dst_arcs originates
1197      * at 'src', since we add all of the arcs from a given source in a clump
1198      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
1199      * added at the front of the dst_arcs list. */
1200     if (list_is_empty(&dst->dst_arcs)) {
1201         return true;
1202     }
1203     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
1204     return arc->src != src;
1205 }
1206
1207 static struct ovsdb_idl_table *
1208 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
1209                            const struct ovsdb_idl_table_class *table_class)
1210 {
1211     return &idl->tables[table_class - idl->class->tables];
1212 }
1213
1214 /* Called by ovsdb-idlc generated code. */
1215 struct ovsdb_idl_row *
1216 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
1217                       struct ovsdb_idl_table_class *dst_table_class,
1218                       const struct uuid *dst_uuid)
1219 {
1220     struct ovsdb_idl *idl = src->table->idl;
1221     struct ovsdb_idl_table *dst_table;
1222     struct ovsdb_idl_arc *arc;
1223     struct ovsdb_idl_row *dst;
1224
1225     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
1226     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
1227     if (idl->txn) {
1228         /* We're being called from ovsdb_idl_txn_write().  We must not update
1229          * any arcs, because the transaction will be backed out at commit or
1230          * abort time and we don't want our graph screwed up.
1231          *
1232          * Just return the destination row, if there is one and it has not been
1233          * deleted. */
1234         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
1235             return dst;
1236         }
1237         return NULL;
1238     } else {
1239         /* We're being called from some other context.  Update the graph. */
1240         if (!dst) {
1241             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
1242         }
1243
1244         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
1245         if (may_add_arc(src, dst)) {
1246             /* The arc *must* be added at the front of the dst_arcs list.  See
1247              * ovsdb_idl_row_reparse_backrefs() for details. */
1248             arc = xmalloc(sizeof *arc);
1249             list_push_front(&src->src_arcs, &arc->src_node);
1250             list_push_front(&dst->dst_arcs, &arc->dst_node);
1251             arc->src = src;
1252             arc->dst = dst;
1253         }
1254
1255         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
1256     }
1257 }
1258
1259 /* Searches 'tc''s table in 'idl' for a row with UUID 'uuid'.  Returns a
1260  * pointer to the row if there is one, otherwise a null pointer.  */
1261 const struct ovsdb_idl_row *
1262 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
1263                            const struct ovsdb_idl_table_class *tc,
1264                            const struct uuid *uuid)
1265 {
1266     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
1267 }
1268
1269 static struct ovsdb_idl_row *
1270 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
1271 {
1272     for (; node; node = hmap_next(&table->rows, node)) {
1273         struct ovsdb_idl_row *row;
1274
1275         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
1276         if (ovsdb_idl_row_exists(row)) {
1277             return row;
1278         }
1279     }
1280     return NULL;
1281 }
1282
1283 /* Returns a row in 'table_class''s table in 'idl', or a null pointer if that
1284  * table is empty.
1285  *
1286  * Database tables are internally maintained as hash tables, so adding or
1287  * removing rows while traversing the same table can cause some rows to be
1288  * visited twice or not at apply. */
1289 const struct ovsdb_idl_row *
1290 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
1291                     const struct ovsdb_idl_table_class *table_class)
1292 {
1293     struct ovsdb_idl_table *table
1294         = ovsdb_idl_table_from_class(idl, table_class);
1295     return next_real_row(table, hmap_first(&table->rows));
1296 }
1297
1298 /* Returns a row following 'row' within its table, or a null pointer if 'row'
1299  * is the last row in its table. */
1300 const struct ovsdb_idl_row *
1301 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
1302 {
1303     struct ovsdb_idl_table *table = row->table;
1304
1305     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
1306 }
1307
1308 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
1309  * transaction has changed 'column''s value, the modified value is returned.
1310  *
1311  * The caller must not modify or free the returned value.
1312  *
1313  * Various kinds of changes can invalidate the returned value: writing to the
1314  * same 'column' in 'row' (e.g. with ovsdb_idl_txn_write()), deleting 'row'
1315  * (e.g. with ovsdb_idl_txn_delete()), or completing an ongoing transaction
1316  * (e.g. with ovsdb_idl_txn_commit() or ovsdb_idl_txn_abort()).  If the
1317  * returned value is needed for a long time, it is best to make a copy of it
1318  * with ovsdb_datum_clone(). */
1319 const struct ovsdb_datum *
1320 ovsdb_idl_read(const struct ovsdb_idl_row *row,
1321                const struct ovsdb_idl_column *column)
1322 {
1323     const struct ovsdb_idl_table_class *class;
1324     size_t column_idx;
1325
1326     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
1327
1328     class = row->table->class;
1329     column_idx = column - class->columns;
1330
1331     ovs_assert(row->new != NULL);
1332     ovs_assert(column_idx < class->n_columns);
1333
1334     if (row->written && bitmap_is_set(row->written, column_idx)) {
1335         return &row->new[column_idx];
1336     } else if (row->old) {
1337         return &row->old[column_idx];
1338     } else {
1339         return ovsdb_datum_default(&column->type);
1340     }
1341 }
1342
1343 /* Same as ovsdb_idl_read(), except that it also asserts that 'column' has key
1344  * type 'key_type' and value type 'value_type'.  (Scalar and set types will
1345  * have a value type of OVSDB_TYPE_VOID.)
1346  *
1347  * This is useful in code that "knows" that a particular column has a given
1348  * type, so that it will abort if someone changes the column's type without
1349  * updating the code that uses it. */
1350 const struct ovsdb_datum *
1351 ovsdb_idl_get(const struct ovsdb_idl_row *row,
1352               const struct ovsdb_idl_column *column,
1353               enum ovsdb_atomic_type key_type OVS_UNUSED,
1354               enum ovsdb_atomic_type value_type OVS_UNUSED)
1355 {
1356     ovs_assert(column->type.key.type == key_type);
1357     ovs_assert(column->type.value.type == value_type);
1358
1359     return ovsdb_idl_read(row, column);
1360 }
1361
1362 /* Returns true if the field represented by 'column' in 'row' may be modified,
1363  * false if it is immutable.
1364  *
1365  * Normally, whether a field is mutable is controlled by its column's schema.
1366  * However, an immutable column can be set to any initial value at the time of
1367  * insertion, so if 'row' is a new row (one that is being added as part of the
1368  * current transaction, supposing that a transaction is in progress) then even
1369  * its "immutable" fields are actually mutable. */
1370 bool
1371 ovsdb_idl_is_mutable(const struct ovsdb_idl_row *row,
1372                      const struct ovsdb_idl_column *column)
1373 {
1374     return column->mutable || (row->new && !row->old);
1375 }
1376
1377 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
1378  * to all-zero-bits by some other entity.  If 'row' was set up some other way
1379  * then the return value is indeterminate. */
1380 bool
1381 ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
1382 {
1383     return row->table == NULL;
1384 }
1385 \f
1386 /* Transactions. */
1387
1388 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1389                                    enum ovsdb_idl_txn_status);
1390
1391 /* Returns a string representation of 'status'.  The caller must not modify or
1392  * free the returned string.
1393  *
1394  * The return value is probably useful only for debug log messages and unit
1395  * tests. */
1396 const char *
1397 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
1398 {
1399     switch (status) {
1400     case TXN_UNCOMMITTED:
1401         return "uncommitted";
1402     case TXN_UNCHANGED:
1403         return "unchanged";
1404     case TXN_INCOMPLETE:
1405         return "incomplete";
1406     case TXN_ABORTED:
1407         return "aborted";
1408     case TXN_SUCCESS:
1409         return "success";
1410     case TXN_TRY_AGAIN:
1411         return "try again";
1412     case TXN_NOT_LOCKED:
1413         return "not locked";
1414     case TXN_ERROR:
1415         return "error";
1416     }
1417     return "<unknown>";
1418 }
1419
1420 /* Starts a new transaction on 'idl'.  A given ovsdb_idl may only have a single
1421  * active transaction at a time.  See the large comment in ovsdb-idl.h for
1422  * general information on transactions. */
1423 struct ovsdb_idl_txn *
1424 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
1425 {
1426     struct ovsdb_idl_txn *txn;
1427
1428     ovs_assert(!idl->txn);
1429     idl->txn = txn = xmalloc(sizeof *txn);
1430     txn->request_id = NULL;
1431     txn->idl = idl;
1432     hmap_init(&txn->txn_rows);
1433     txn->status = TXN_UNCOMMITTED;
1434     txn->error = NULL;
1435     txn->dry_run = false;
1436     ds_init(&txn->comment);
1437
1438     txn->inc_table = NULL;
1439     txn->inc_column = NULL;
1440
1441     hmap_init(&txn->inserted_rows);
1442
1443     return txn;
1444 }
1445
1446 /* Appends 's', which is treated as a printf()-type format string, to the
1447  * comments that will be passed to the OVSDB server when 'txn' is committed.
1448  * (The comment will be committed to the OVSDB log, which "ovsdb-tool
1449  * show-log" can print in a relatively human-readable form.) */
1450 void
1451 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s, ...)
1452 {
1453     va_list args;
1454
1455     if (txn->comment.length) {
1456         ds_put_char(&txn->comment, '\n');
1457     }
1458
1459     va_start(args, s);
1460     ds_put_format_valist(&txn->comment, s, args);
1461     va_end(args);
1462 }
1463
1464 /* Marks 'txn' as a transaction that will not actually modify the database.  In
1465  * almost every way, the transaction is treated like other transactions.  It
1466  * must be committed or aborted like other transactions, it will be sent to the
1467  * database server like other transactions, and so on.  The only difference is
1468  * that the operations sent to the database server will include, as the last
1469  * step, an "abort" operation, so that any changes made by the transaction will
1470  * not actually take effect. */
1471 void
1472 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
1473 {
1474     txn->dry_run = true;
1475 }
1476
1477 /* Causes 'txn', when committed, to increment the value of 'column' within
1478  * 'row' by 1.  'column' must have an integer type.  After 'txn' commits
1479  * successfully, the client may retrieve the final (incremented) value of
1480  * 'column' with ovsdb_idl_txn_get_increment_new_value().
1481  *
1482  * The client could accomplish something similar with ovsdb_idl_read(),
1483  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
1484  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
1485  * will never (by itself) fail because of a verify error.
1486  *
1487  * The intended use is for incrementing the "next_cfg" column in the
1488  * Open_vSwitch table. */
1489 void
1490 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
1491                         const struct ovsdb_idl_row *row,
1492                         const struct ovsdb_idl_column *column)
1493 {
1494     ovs_assert(!txn->inc_table);
1495     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
1496     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
1497
1498     txn->inc_table = row->table->class->name;
1499     txn->inc_column = column->name;
1500     txn->inc_row = row->uuid;
1501 }
1502
1503 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
1504  * has been called for 'txn' but the commit is still incomplete (that is, the
1505  * last call returned TXN_INCOMPLETE) then the transaction may or may not still
1506  * end up committing at the database server, but the client will not be able to
1507  * get any further status information back. */
1508 void
1509 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
1510 {
1511     struct ovsdb_idl_txn_insert *insert, *next;
1512
1513     json_destroy(txn->request_id);
1514     if (txn->status == TXN_INCOMPLETE) {
1515         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1516     }
1517     ovsdb_idl_txn_abort(txn);
1518     ds_destroy(&txn->comment);
1519     free(txn->error);
1520     HMAP_FOR_EACH_SAFE (insert, next, hmap_node, &txn->inserted_rows) {
1521         free(insert);
1522     }
1523     hmap_destroy(&txn->inserted_rows);
1524     free(txn);
1525 }
1526
1527 /* Causes poll_block() to wake up if 'txn' has completed committing. */
1528 void
1529 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
1530 {
1531     if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
1532         poll_immediate_wake();
1533     }
1534 }
1535
1536 static struct json *
1537 where_uuid_equals(const struct uuid *uuid)
1538 {
1539     return
1540         json_array_create_1(
1541             json_array_create_3(
1542                 json_string_create("_uuid"),
1543                 json_string_create("=="),
1544                 json_array_create_2(
1545                     json_string_create("uuid"),
1546                     json_string_create_nocopy(
1547                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
1548 }
1549
1550 static char *
1551 uuid_name_from_uuid(const struct uuid *uuid)
1552 {
1553     char *name;
1554     char *p;
1555
1556     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
1557     for (p = name; *p != '\0'; p++) {
1558         if (*p == '-') {
1559             *p = '_';
1560         }
1561     }
1562
1563     return name;
1564 }
1565
1566 static const struct ovsdb_idl_row *
1567 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
1568 {
1569     const struct ovsdb_idl_row *row;
1570
1571     HMAP_FOR_EACH_WITH_HASH (row, txn_node, uuid_hash(uuid), &txn->txn_rows) {
1572         if (uuid_equals(&row->uuid, uuid)) {
1573             return row;
1574         }
1575     }
1576     return NULL;
1577 }
1578
1579 /* XXX there must be a cleaner way to do this */
1580 static struct json *
1581 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
1582 {
1583     if (json->type == JSON_ARRAY) {
1584         struct uuid uuid;
1585         size_t i;
1586
1587         if (json->u.array.n == 2
1588             && json->u.array.elems[0]->type == JSON_STRING
1589             && json->u.array.elems[1]->type == JSON_STRING
1590             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
1591             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
1592             const struct ovsdb_idl_row *row;
1593
1594             row = ovsdb_idl_txn_get_row(txn, &uuid);
1595             if (row && !row->old && row->new) {
1596                 json_destroy(json);
1597
1598                 return json_array_create_2(
1599                     json_string_create("named-uuid"),
1600                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
1601             }
1602         }
1603
1604         for (i = 0; i < json->u.array.n; i++) {
1605             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
1606                                                       txn);
1607         }
1608     } else if (json->type == JSON_OBJECT) {
1609         struct shash_node *node;
1610
1611         SHASH_FOR_EACH (node, json_object(json)) {
1612             node->data = substitute_uuids(node->data, txn);
1613         }
1614     }
1615     return json;
1616 }
1617
1618 static void
1619 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
1620 {
1621     struct ovsdb_idl_row *row, *next;
1622
1623     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
1624      * ovsdb_idl_column's 'parse' function, which will call
1625      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
1626      * transaction and fail to update the graph.  */
1627     txn->idl->txn = NULL;
1628
1629     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
1630         if (row->old) {
1631             if (row->written) {
1632                 ovsdb_idl_row_unparse(row);
1633                 ovsdb_idl_row_clear_arcs(row, false);
1634                 ovsdb_idl_row_parse(row);
1635             }
1636         } else {
1637             ovsdb_idl_row_unparse(row);
1638         }
1639         ovsdb_idl_row_clear_new(row);
1640
1641         free(row->prereqs);
1642         row->prereqs = NULL;
1643
1644         free(row->written);
1645         row->written = NULL;
1646
1647         hmap_remove(&txn->txn_rows, &row->txn_node);
1648         hmap_node_nullify(&row->txn_node);
1649         if (!row->old) {
1650             hmap_remove(&row->table->rows, &row->hmap_node);
1651             free(row);
1652         }
1653     }
1654     hmap_destroy(&txn->txn_rows);
1655     hmap_init(&txn->txn_rows);
1656 }
1657
1658 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
1659  * of the following TXN_* constants:
1660  *
1661  *   TXN_INCOMPLETE:
1662  *
1663  *       The transaction is in progress, but not yet complete.  The caller
1664  *       should call again later, after calling ovsdb_idl_run() to let the IDL
1665  *       do OVSDB protocol processing.
1666  *
1667  *   TXN_UNCHANGED:
1668  *
1669  *       The transaction is complete.  (It didn't actually change the database,
1670  *       so the IDL didn't send any request to the database server.)
1671  *
1672  *   TXN_ABORTED:
1673  *
1674  *       The caller previously called ovsdb_idl_txn_abort().
1675  *
1676  *   TXN_SUCCESS:
1677  *
1678  *       The transaction was successful.  The update made by the transaction
1679  *       (and possibly other changes made by other database clients) should
1680  *       already be visible in the IDL.
1681  *
1682  *   TXN_TRY_AGAIN:
1683  *
1684  *       The transaction failed for some transient reason, e.g. because a
1685  *       "verify" operation reported an inconsistency or due to a network
1686  *       problem.  The caller should wait for a change to the database, then
1687  *       compose a new transaction, and commit the new transaction.
1688  *
1689  *       Use the return value of ovsdb_idl_get_seqno() to wait for a change in
1690  *       the database.  It is important to use its return value *before* the
1691  *       initial call to ovsdb_idl_txn_commit() as the baseline for this
1692  *       purpose, because the change that one should wait for can happen after
1693  *       the initial call but before the call that returns TXN_TRY_AGAIN, and
1694  *       using some other baseline value in that situation could cause an
1695  *       indefinite wait if the database rarely changes.
1696  *
1697  *   TXN_NOT_LOCKED:
1698  *
1699  *       The transaction failed because the IDL has been configured to require
1700  *       a database lock (with ovsdb_idl_set_lock()) but didn't get it yet or
1701  *       has already lost it.
1702  *
1703  * Committing a transaction rolls back all of the changes that it made to the
1704  * IDL's copy of the database.  If the transaction commits successfully, then
1705  * the database server will send an update and, thus, the IDL will be updated
1706  * with the committed changes. */
1707 enum ovsdb_idl_txn_status
1708 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
1709 {
1710     struct ovsdb_idl_row *row;
1711     struct json *operations;
1712     bool any_updates;
1713
1714     if (txn != txn->idl->txn) {
1715         goto coverage_out;
1716     }
1717
1718     /* If we need a lock but don't have it, give up quickly. */
1719     if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
1720         txn->status = TXN_NOT_LOCKED;
1721         goto disassemble_out;
1722     }
1723
1724     operations = json_array_create_1(
1725         json_string_create(txn->idl->class->database));
1726
1727     /* Assert that we have the required lock (avoiding a race). */
1728     if (txn->idl->lock_name) {
1729         struct json *op = json_object_create();
1730         json_array_add(operations, op);
1731         json_object_put_string(op, "op", "assert");
1732         json_object_put_string(op, "lock", txn->idl->lock_name);
1733     }
1734
1735     /* Add prerequisites and declarations of new rows. */
1736     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
1737         /* XXX check that deleted rows exist even if no prereqs? */
1738         if (row->prereqs) {
1739             const struct ovsdb_idl_table_class *class = row->table->class;
1740             size_t n_columns = class->n_columns;
1741             struct json *op, *columns, *row_json;
1742             size_t idx;
1743
1744             op = json_object_create();
1745             json_array_add(operations, op);
1746             json_object_put_string(op, "op", "wait");
1747             json_object_put_string(op, "table", class->name);
1748             json_object_put(op, "timeout", json_integer_create(0));
1749             json_object_put(op, "where", where_uuid_equals(&row->uuid));
1750             json_object_put_string(op, "until", "==");
1751             columns = json_array_create_empty();
1752             json_object_put(op, "columns", columns);
1753             row_json = json_object_create();
1754             json_object_put(op, "rows", json_array_create_1(row_json));
1755
1756             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
1757                 const struct ovsdb_idl_column *column = &class->columns[idx];
1758                 json_array_add(columns, json_string_create(column->name));
1759                 json_object_put(row_json, column->name,
1760                                 ovsdb_datum_to_json(&row->old[idx],
1761                                                     &column->type));
1762             }
1763         }
1764     }
1765
1766     /* Add updates. */
1767     any_updates = false;
1768     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
1769         const struct ovsdb_idl_table_class *class = row->table->class;
1770
1771         if (!row->new) {
1772             if (class->is_root) {
1773                 struct json *op = json_object_create();
1774                 json_object_put_string(op, "op", "delete");
1775                 json_object_put_string(op, "table", class->name);
1776                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
1777                 json_array_add(operations, op);
1778                 any_updates = true;
1779             } else {
1780                 /* Let ovsdb-server decide whether to really delete it. */
1781             }
1782         } else if (row->old != row->new) {
1783             struct json *row_json;
1784             struct json *op;
1785             size_t idx;
1786
1787             op = json_object_create();
1788             json_object_put_string(op, "op", row->old ? "update" : "insert");
1789             json_object_put_string(op, "table", class->name);
1790             if (row->old) {
1791                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
1792             } else {
1793                 struct ovsdb_idl_txn_insert *insert;
1794
1795                 any_updates = true;
1796
1797                 json_object_put(op, "uuid-name",
1798                                 json_string_create_nocopy(
1799                                     uuid_name_from_uuid(&row->uuid)));
1800
1801                 insert = xmalloc(sizeof *insert);
1802                 insert->dummy = row->uuid;
1803                 insert->op_index = operations->u.array.n - 1;
1804                 uuid_zero(&insert->real);
1805                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
1806                             uuid_hash(&insert->dummy));
1807             }
1808             row_json = json_object_create();
1809             json_object_put(op, "row", row_json);
1810
1811             if (row->written) {
1812                 BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
1813                     const struct ovsdb_idl_column *column =
1814                                                         &class->columns[idx];
1815
1816                     if (row->old
1817                         || !ovsdb_datum_is_default(&row->new[idx],
1818                                                   &column->type)) {
1819                         json_object_put(row_json, column->name,
1820                                         substitute_uuids(
1821                                             ovsdb_datum_to_json(&row->new[idx],
1822                                                                 &column->type),
1823                                             txn));
1824
1825                         /* If anything really changed, consider it an update.
1826                          * We can't suppress not-really-changed values earlier
1827                          * or transactions would become nonatomic (see the big
1828                          * comment inside ovsdb_idl_txn_write()). */
1829                         if (!any_updates && row->old &&
1830                             !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
1831                                                 &column->type)) {
1832                             any_updates = true;
1833                         }
1834                     }
1835                 }
1836             }
1837
1838             if (!row->old || !shash_is_empty(json_object(row_json))) {
1839                 json_array_add(operations, op);
1840             } else {
1841                 json_destroy(op);
1842             }
1843         }
1844     }
1845
1846     /* Add increment. */
1847     if (txn->inc_table && any_updates) {
1848         struct json *op;
1849
1850         txn->inc_index = operations->u.array.n - 1;
1851
1852         op = json_object_create();
1853         json_object_put_string(op, "op", "mutate");
1854         json_object_put_string(op, "table", txn->inc_table);
1855         json_object_put(op, "where",
1856                         substitute_uuids(where_uuid_equals(&txn->inc_row),
1857                                          txn));
1858         json_object_put(op, "mutations",
1859                         json_array_create_1(
1860                             json_array_create_3(
1861                                 json_string_create(txn->inc_column),
1862                                 json_string_create("+="),
1863                                 json_integer_create(1))));
1864         json_array_add(operations, op);
1865
1866         op = json_object_create();
1867         json_object_put_string(op, "op", "select");
1868         json_object_put_string(op, "table", txn->inc_table);
1869         json_object_put(op, "where",
1870                         substitute_uuids(where_uuid_equals(&txn->inc_row),
1871                                          txn));
1872         json_object_put(op, "columns",
1873                         json_array_create_1(json_string_create(
1874                                                 txn->inc_column)));
1875         json_array_add(operations, op);
1876     }
1877
1878     if (txn->comment.length) {
1879         struct json *op = json_object_create();
1880         json_object_put_string(op, "op", "comment");
1881         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
1882         json_array_add(operations, op);
1883     }
1884
1885     if (txn->dry_run) {
1886         struct json *op = json_object_create();
1887         json_object_put_string(op, "op", "abort");
1888         json_array_add(operations, op);
1889     }
1890
1891     if (!any_updates) {
1892         txn->status = TXN_UNCHANGED;
1893         json_destroy(operations);
1894     } else if (!jsonrpc_session_send(
1895                    txn->idl->session,
1896                    jsonrpc_create_request(
1897                        "transact", operations, &txn->request_id))) {
1898         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
1899                     json_hash(txn->request_id, 0));
1900         txn->status = TXN_INCOMPLETE;
1901     } else {
1902         txn->status = TXN_TRY_AGAIN;
1903     }
1904
1905 disassemble_out:
1906     ovsdb_idl_txn_disassemble(txn);
1907 coverage_out:
1908     switch (txn->status) {
1909     case TXN_UNCOMMITTED:   COVERAGE_INC(txn_uncommitted);    break;
1910     case TXN_UNCHANGED:     COVERAGE_INC(txn_unchanged);      break;
1911     case TXN_INCOMPLETE:    COVERAGE_INC(txn_incomplete);     break;
1912     case TXN_ABORTED:       COVERAGE_INC(txn_aborted);        break;
1913     case TXN_SUCCESS:       COVERAGE_INC(txn_success);        break;
1914     case TXN_TRY_AGAIN:     COVERAGE_INC(txn_try_again);      break;
1915     case TXN_NOT_LOCKED:    COVERAGE_INC(txn_not_locked);     break;
1916     case TXN_ERROR:         COVERAGE_INC(txn_error);          break;
1917     }
1918
1919     return txn->status;
1920 }
1921
1922 /* Attempts to commit 'txn', blocking until the commit either succeeds or
1923  * fails.  Returns the final commit status, which may be any TXN_* value other
1924  * than TXN_INCOMPLETE.
1925  *
1926  * This function calls ovsdb_idl_run() on 'txn''s IDL, so it may cause the
1927  * return value of ovsdb_idl_get_seqno() to change. */
1928 enum ovsdb_idl_txn_status
1929 ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
1930 {
1931     enum ovsdb_idl_txn_status status;
1932
1933     fatal_signal_run();
1934     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
1935         ovsdb_idl_run(txn->idl);
1936         ovsdb_idl_wait(txn->idl);
1937         ovsdb_idl_txn_wait(txn);
1938         poll_block();
1939     }
1940     return status;
1941 }
1942
1943 /* Returns the final (incremented) value of the column in 'txn' that was set to
1944  * be incremented by ovsdb_idl_txn_increment().  'txn' must have committed
1945  * successfully. */
1946 int64_t
1947 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
1948 {
1949     ovs_assert(txn->status == TXN_SUCCESS);
1950     return txn->inc_new_value;
1951 }
1952
1953 /* Aborts 'txn' without sending it to the database server.  This is effective
1954  * only if ovsdb_idl_txn_commit() has not yet been called for 'txn'.
1955  * Otherwise, it has no effect.
1956  *
1957  * Aborting a transaction doesn't free its memory.  Use
1958  * ovsdb_idl_txn_destroy() to do that. */
1959 void
1960 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
1961 {
1962     ovsdb_idl_txn_disassemble(txn);
1963     if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
1964         txn->status = TXN_ABORTED;
1965     }
1966 }
1967
1968 /* Returns a string that reports the error status for 'txn'.  The caller must
1969  * not modify or free the returned string.  A call to ovsdb_idl_txn_destroy()
1970  * for 'txn' may free the returned string.
1971  *
1972  * The return value is ordinarily one of the strings that
1973  * ovsdb_idl_txn_status_to_string() would return, but if the transaction failed
1974  * due to an error reported by the database server, the return value is that
1975  * error. */
1976 const char *
1977 ovsdb_idl_txn_get_error(const struct ovsdb_idl_txn *txn)
1978 {
1979     if (txn->status != TXN_ERROR) {
1980         return ovsdb_idl_txn_status_to_string(txn->status);
1981     } else if (txn->error) {
1982         return txn->error;
1983     } else {
1984         return "no error details available";
1985     }
1986 }
1987
1988 static void
1989 ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn,
1990                              const struct json *json)
1991 {
1992     if (txn->error == NULL) {
1993         txn->error = json_to_string(json, JSSF_SORT);
1994     }
1995 }
1996
1997 /* For transaction 'txn' that completed successfully, finds and returns the
1998  * permanent UUID that the database assigned to a newly inserted row, given the
1999  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
2000  *
2001  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
2002  * if it was assigned by that function and then deleted by
2003  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
2004  * and then deleted within a single transaction are never sent to the database
2005  * server, so it never assigns them a permanent UUID.) */
2006 const struct uuid *
2007 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
2008                               const struct uuid *uuid)
2009 {
2010     const struct ovsdb_idl_txn_insert *insert;
2011
2012     ovs_assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
2013     HMAP_FOR_EACH_IN_BUCKET (insert, hmap_node,
2014                              uuid_hash(uuid), &txn->inserted_rows) {
2015         if (uuid_equals(uuid, &insert->dummy)) {
2016             return &insert->real;
2017         }
2018     }
2019     return NULL;
2020 }
2021
2022 static void
2023 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
2024                        enum ovsdb_idl_txn_status status)
2025 {
2026     txn->status = status;
2027     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2028 }
2029
2030 /* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
2031  * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
2032  * ovs-vswitchd).
2033  *
2034  * 'datum' must have the correct type for its column.  The IDL does not check
2035  * that it meets schema constraints, but ovsdb-server will do so at commit time
2036  * so it had better be correct.
2037  *
2038  * A transaction must be in progress.  Replication of 'column' must not have
2039  * been disabled (by calling ovsdb_idl_omit()).
2040  *
2041  * Usually this function is used indirectly through one of the "set" functions
2042  * generated by ovsdb-idlc.
2043  *
2044  * Takes ownership of what 'datum' points to (and in some cases destroys that
2045  * data before returning) but makes a copy of 'datum' itself.  (Commonly
2046  * 'datum' is on the caller's stack.) */
2047 static void
2048 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
2049                       const struct ovsdb_idl_column *column,
2050                       struct ovsdb_datum *datum, bool owns_datum)
2051 {
2052     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2053     const struct ovsdb_idl_table_class *class;
2054     size_t column_idx;
2055     bool write_only;
2056
2057     if (ovsdb_idl_row_is_synthetic(row)) {
2058         goto discard_datum;
2059     }
2060
2061     class = row->table->class;
2062     column_idx = column - class->columns;
2063     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
2064
2065     ovs_assert(row->new != NULL);
2066     ovs_assert(column_idx < class->n_columns);
2067     ovs_assert(row->old == NULL ||
2068                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2069
2070     if (row->table->idl->verify_write_only && !write_only) {
2071         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
2072                  " explicitly configured not to.", class->name, column->name);
2073         goto discard_datum;
2074     }
2075
2076     /* If this is a write-only column and the datum being written is the same
2077      * as the one already there, just skip the update entirely.  This is worth
2078      * optimizing because we have a lot of columns that get periodically
2079      * refreshed into the database but don't actually change that often.
2080      *
2081      * We don't do this for read/write columns because that would break
2082      * atomicity of transactions--some other client might have written a
2083      * different value in that column since we read it.  (But if a whole
2084      * transaction only does writes of existing values, without making any real
2085      * changes, we will drop the whole transaction later in
2086      * ovsdb_idl_txn_commit().) */
2087     if (write_only && ovsdb_datum_equals(ovsdb_idl_read(row, column),
2088                                          datum, &column->type)) {
2089         goto discard_datum;
2090     }
2091
2092     if (hmap_node_is_null(&row->txn_node)) {
2093         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2094                     uuid_hash(&row->uuid));
2095     }
2096     if (row->old == row->new) {
2097         row->new = xmalloc(class->n_columns * sizeof *row->new);
2098     }
2099     if (!row->written) {
2100         row->written = bitmap_allocate(class->n_columns);
2101     }
2102     if (bitmap_is_set(row->written, column_idx)) {
2103         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
2104     } else {
2105         bitmap_set1(row->written, column_idx);
2106     }
2107     if (owns_datum) {
2108         row->new[column_idx] = *datum;
2109     } else {
2110         ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
2111     }
2112     (column->unparse)(row);
2113     (column->parse)(row, &row->new[column_idx]);
2114     return;
2115
2116 discard_datum:
2117     if (owns_datum) {
2118         ovsdb_datum_destroy(datum, &column->type);
2119     }
2120 }
2121
2122 void
2123 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
2124                     const struct ovsdb_idl_column *column,
2125                     struct ovsdb_datum *datum)
2126 {
2127     ovsdb_idl_txn_write__(row, column, datum, true);
2128 }
2129
2130 void
2131 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
2132                           const struct ovsdb_idl_column *column,
2133                           const struct ovsdb_datum *datum)
2134 {
2135     ovsdb_idl_txn_write__(row, column,
2136                           CONST_CAST(struct ovsdb_datum *, datum), false);
2137 }
2138
2139 /* Causes the original contents of 'column' in 'row_' to be verified as a
2140  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
2141  * changed (or if 'row_' was deleted) between the time that the IDL originally
2142  * read its contents and the time that the transaction commits, then the
2143  * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
2144  * TXN_AGAIN_NOW (depending on whether the database change has already been
2145  * received).
2146  *
2147  * The intention is that, to ensure that no transaction commits based on dirty
2148  * reads, an application should call ovsdb_idl_txn_verify() on each data item
2149  * read as part of a read-modify-write operation.
2150  *
2151  * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
2152  * value of 'column' is already known:
2153  *
2154  *   - If 'row_' is a row created by the current transaction (returned by
2155  *     ovsdb_idl_txn_insert()).
2156  *
2157  *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
2158  *     within the current transaction.
2159  *
2160  * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
2161  * ovsdb_idl_txn_write() for a given read-modify-write.
2162  *
2163  * A transaction must be in progress.
2164  *
2165  * Usually this function is used indirectly through one of the "verify"
2166  * functions generated by ovsdb-idlc. */
2167 void
2168 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
2169                      const struct ovsdb_idl_column *column)
2170 {
2171     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2172     const struct ovsdb_idl_table_class *class;
2173     size_t column_idx;
2174
2175     if (ovsdb_idl_row_is_synthetic(row)) {
2176         return;
2177     }
2178
2179     class = row->table->class;
2180     column_idx = column - class->columns;
2181
2182     ovs_assert(row->new != NULL);
2183     ovs_assert(row->old == NULL ||
2184                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2185     if (!row->old
2186         || (row->written && bitmap_is_set(row->written, column_idx))) {
2187         return;
2188     }
2189
2190     if (hmap_node_is_null(&row->txn_node)) {
2191         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2192                     uuid_hash(&row->uuid));
2193     }
2194     if (!row->prereqs) {
2195         row->prereqs = bitmap_allocate(class->n_columns);
2196     }
2197     bitmap_set1(row->prereqs, column_idx);
2198 }
2199
2200 /* Deletes 'row_' from its table.  May free 'row_', so it must not be
2201  * accessed afterward.
2202  *
2203  * A transaction must be in progress.
2204  *
2205  * Usually this function is used indirectly through one of the "delete"
2206  * functions generated by ovsdb-idlc. */
2207 void
2208 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
2209 {
2210     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2211
2212     if (ovsdb_idl_row_is_synthetic(row)) {
2213         return;
2214     }
2215
2216     ovs_assert(row->new != NULL);
2217     if (!row->old) {
2218         ovsdb_idl_row_unparse(row);
2219         ovsdb_idl_row_clear_new(row);
2220         ovs_assert(!row->prereqs);
2221         hmap_remove(&row->table->rows, &row->hmap_node);
2222         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
2223         free(row);
2224         return;
2225     }
2226     if (hmap_node_is_null(&row->txn_node)) {
2227         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2228                     uuid_hash(&row->uuid));
2229     }
2230     ovsdb_idl_row_clear_new(row);
2231     row->new = NULL;
2232 }
2233
2234 /* Inserts and returns a new row in the table with the specified 'class' in the
2235  * database with open transaction 'txn'.
2236  *
2237  * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
2238  * randomly generated; otherwise 'uuid' should specify a randomly generated
2239  * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
2240  * 'txn' is committed, but the IDL will replace any uses of the provisional
2241  * UUID in the data to be to be committed by the UUID assigned by
2242  * ovsdb-server.
2243  *
2244  * Usually this function is used indirectly through one of the "insert"
2245  * functions generated by ovsdb-idlc. */
2246 const struct ovsdb_idl_row *
2247 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
2248                      const struct ovsdb_idl_table_class *class,
2249                      const struct uuid *uuid)
2250 {
2251     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
2252
2253     if (uuid) {
2254         ovs_assert(!ovsdb_idl_txn_get_row(txn, uuid));
2255         row->uuid = *uuid;
2256     } else {
2257         uuid_generate(&row->uuid);
2258     }
2259
2260     row->table = ovsdb_idl_table_from_class(txn->idl, class);
2261     row->new = xmalloc(class->n_columns * sizeof *row->new);
2262     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
2263     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
2264     return row;
2265 }
2266
2267 static void
2268 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
2269 {
2270     struct ovsdb_idl_txn *txn;
2271
2272     HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
2273         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
2274     }
2275 }
2276
2277 static struct ovsdb_idl_txn *
2278 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
2279 {
2280     struct ovsdb_idl_txn *txn;
2281
2282     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
2283                              json_hash(id, 0), &idl->outstanding_txns) {
2284         if (json_equal(id, txn->request_id)) {
2285             return txn;
2286         }
2287     }
2288     return NULL;
2289 }
2290
2291 static bool
2292 check_json_type(const struct json *json, enum json_type type, const char *name)
2293 {
2294     if (!json) {
2295         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
2296         return false;
2297     } else if (json->type != type) {
2298         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
2299                      name, json_type_to_string(json->type),
2300                      json_type_to_string(type));
2301         return false;
2302     } else {
2303         return true;
2304     }
2305 }
2306
2307 static bool
2308 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
2309                                 const struct json_array *results)
2310 {
2311     struct json *count, *rows, *row, *column;
2312     struct shash *mutate, *select;
2313
2314     if (txn->inc_index + 2 > results->n) {
2315         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2316                      "for increment (has %"PRIuSIZE", needs %u)",
2317                      results->n, txn->inc_index + 2);
2318         return false;
2319     }
2320
2321     /* We know that this is a JSON object because the loop in
2322      * ovsdb_idl_txn_process_reply() checked. */
2323     mutate = json_object(results->elems[txn->inc_index]);
2324     count = shash_find_data(mutate, "count");
2325     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
2326         return false;
2327     }
2328     if (count->u.integer != 1) {
2329         VLOG_WARN_RL(&syntax_rl,
2330                      "\"mutate\" reply \"count\" is %lld instead of 1",
2331                      count->u.integer);
2332         return false;
2333     }
2334
2335     select = json_object(results->elems[txn->inc_index + 1]);
2336     rows = shash_find_data(select, "rows");
2337     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
2338         return false;
2339     }
2340     if (rows->u.array.n != 1) {
2341         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %"PRIuSIZE" elements "
2342                      "instead of 1",
2343                      rows->u.array.n);
2344         return false;
2345     }
2346     row = rows->u.array.elems[0];
2347     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
2348         return false;
2349     }
2350     column = shash_find_data(json_object(row), txn->inc_column);
2351     if (!check_json_type(column, JSON_INTEGER,
2352                          "\"select\" reply inc column")) {
2353         return false;
2354     }
2355     txn->inc_new_value = column->u.integer;
2356     return true;
2357 }
2358
2359 static bool
2360 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
2361                                    const struct json_array *results)
2362 {
2363     static const struct ovsdb_base_type uuid_type = OVSDB_BASE_UUID_INIT;
2364     struct ovsdb_error *error;
2365     struct json *json_uuid;
2366     union ovsdb_atom uuid;
2367     struct shash *reply;
2368
2369     if (insert->op_index >= results->n) {
2370         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2371                      "for insert (has %"PRIuSIZE", needs %u)",
2372                      results->n, insert->op_index);
2373         return false;
2374     }
2375
2376     /* We know that this is a JSON object because the loop in
2377      * ovsdb_idl_txn_process_reply() checked. */
2378     reply = json_object(results->elems[insert->op_index]);
2379     json_uuid = shash_find_data(reply, "uuid");
2380     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
2381         return false;
2382     }
2383
2384     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
2385     if (error) {
2386         char *s = ovsdb_error_to_string(error);
2387         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
2388                      "UUID: %s", s);
2389         free(s);
2390         ovsdb_error_destroy(error);
2391         return false;
2392     }
2393
2394     insert->real = uuid.uuid;
2395
2396     return true;
2397 }
2398
2399 static bool
2400 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
2401                             const struct jsonrpc_msg *msg)
2402 {
2403     struct ovsdb_idl_txn *txn;
2404     enum ovsdb_idl_txn_status status;
2405
2406     txn = ovsdb_idl_txn_find(idl, msg->id);
2407     if (!txn) {
2408         return false;
2409     }
2410
2411     if (msg->type == JSONRPC_ERROR) {
2412         status = TXN_ERROR;
2413     } else if (msg->result->type != JSON_ARRAY) {
2414         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
2415         status = TXN_ERROR;
2416     } else {
2417         struct json_array *ops = &msg->result->u.array;
2418         int hard_errors = 0;
2419         int soft_errors = 0;
2420         int lock_errors = 0;
2421         size_t i;
2422
2423         for (i = 0; i < ops->n; i++) {
2424             struct json *op = ops->elems[i];
2425
2426             if (op->type == JSON_NULL) {
2427                 /* This isn't an error in itself but indicates that some prior
2428                  * operation failed, so make sure that we know about it. */
2429                 soft_errors++;
2430             } else if (op->type == JSON_OBJECT) {
2431                 struct json *error;
2432
2433                 error = shash_find_data(json_object(op), "error");
2434                 if (error) {
2435                     if (error->type == JSON_STRING) {
2436                         if (!strcmp(error->u.string, "timed out")) {
2437                             soft_errors++;
2438                         } else if (!strcmp(error->u.string, "not owner")) {
2439                             lock_errors++;
2440                         } else if (strcmp(error->u.string, "aborted")) {
2441                             hard_errors++;
2442                             ovsdb_idl_txn_set_error_json(txn, op);
2443                         }
2444                     } else {
2445                         hard_errors++;
2446                         ovsdb_idl_txn_set_error_json(txn, op);
2447                         VLOG_WARN_RL(&syntax_rl,
2448                                      "\"error\" in reply is not JSON string");
2449                     }
2450                 }
2451             } else {
2452                 hard_errors++;
2453                 ovsdb_idl_txn_set_error_json(txn, op);
2454                 VLOG_WARN_RL(&syntax_rl,
2455                              "operation reply is not JSON null or object");
2456             }
2457         }
2458
2459         if (!soft_errors && !hard_errors && !lock_errors) {
2460             struct ovsdb_idl_txn_insert *insert;
2461
2462             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
2463                 hard_errors++;
2464             }
2465
2466             HMAP_FOR_EACH (insert, hmap_node, &txn->inserted_rows) {
2467                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
2468                     hard_errors++;
2469                 }
2470             }
2471         }
2472
2473         status = (hard_errors ? TXN_ERROR
2474                   : lock_errors ? TXN_NOT_LOCKED
2475                   : soft_errors ? TXN_TRY_AGAIN
2476                   : TXN_SUCCESS);
2477     }
2478
2479     ovsdb_idl_txn_complete(txn, status);
2480     return true;
2481 }
2482
2483 /* Returns the transaction currently active for 'row''s IDL.  A transaction
2484  * must currently be active. */
2485 struct ovsdb_idl_txn *
2486 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
2487 {
2488     struct ovsdb_idl_txn *txn = row->table->idl->txn;
2489     ovs_assert(txn != NULL);
2490     return txn;
2491 }
2492
2493 /* Returns the IDL on which 'txn' acts. */
2494 struct ovsdb_idl *
2495 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
2496 {
2497     return txn->idl;
2498 }
2499
2500 /* Blocks until 'idl' successfully connects to the remote database and
2501  * retrieves its contents. */
2502 void
2503 ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
2504 {
2505     while (1) {
2506         ovsdb_idl_run(idl);
2507         if (ovsdb_idl_has_ever_connected(idl)) {
2508             return;
2509         }
2510         ovsdb_idl_wait(idl);
2511         poll_block();
2512     }
2513 }
2514 \f
2515 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
2516  * the database server and to avoid modifying the database when the lock cannot
2517  * be acquired (that is, when another client has the same lock).
2518  *
2519  * If 'lock_name' is NULL, drops the locking requirement and releases the
2520  * lock. */
2521 void
2522 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
2523 {
2524     ovs_assert(!idl->txn);
2525     ovs_assert(hmap_is_empty(&idl->outstanding_txns));
2526
2527     if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
2528         /* Release previous lock. */
2529         ovsdb_idl_send_unlock_request(idl);
2530         free(idl->lock_name);
2531         idl->lock_name = NULL;
2532         idl->is_lock_contended = false;
2533     }
2534
2535     if (lock_name && !idl->lock_name) {
2536         /* Acquire new lock. */
2537         idl->lock_name = xstrdup(lock_name);
2538         ovsdb_idl_send_lock_request(idl);
2539     }
2540 }
2541
2542 /* Returns true if 'idl' is configured to obtain a lock and owns that lock.
2543  *
2544  * Locking and unlocking happens asynchronously from the database client's
2545  * point of view, so the information is only useful for optimization (e.g. if
2546  * the client doesn't have the lock then there's no point in trying to write to
2547  * the database). */
2548 bool
2549 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
2550 {
2551     return idl->has_lock;
2552 }
2553
2554 /* Returns true if 'idl' is configured to obtain a lock but the database server
2555  * has indicated that some other client already owns the requested lock. */
2556 bool
2557 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
2558 {
2559     return idl->is_lock_contended;
2560 }
2561
2562 static void
2563 ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
2564 {
2565     if (new_has_lock && !idl->has_lock) {
2566         if (idl->state == IDL_S_MONITORING) {
2567             idl->change_seqno++;
2568         } else {
2569             /* We're setting up a session, so don't signal that the database
2570              * changed.  Finalizing the session will increment change_seqno
2571              * anyhow. */
2572         }
2573         idl->is_lock_contended = false;
2574     }
2575     idl->has_lock = new_has_lock;
2576 }
2577
2578 static void
2579 ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
2580                               struct json **idp)
2581 {
2582     ovsdb_idl_update_has_lock(idl, false);
2583
2584     json_destroy(idl->lock_request_id);
2585     idl->lock_request_id = NULL;
2586
2587     if (jsonrpc_session_is_connected(idl->session)) {
2588         struct json *params;
2589
2590         params = json_array_create_1(json_string_create(idl->lock_name));
2591         jsonrpc_session_send(idl->session,
2592                              jsonrpc_create_request(method, params, idp));
2593     }
2594 }
2595
2596 static void
2597 ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
2598 {
2599     ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
2600 }
2601
2602 static void
2603 ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
2604 {
2605     ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
2606 }
2607
2608 static void
2609 ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
2610 {
2611     bool got_lock;
2612
2613     json_destroy(idl->lock_request_id);
2614     idl->lock_request_id = NULL;
2615
2616     if (result->type == JSON_OBJECT) {
2617         const struct json *locked;
2618
2619         locked = shash_find_data(json_object(result), "locked");
2620         got_lock = locked && locked->type == JSON_TRUE;
2621     } else {
2622         got_lock = false;
2623     }
2624
2625     ovsdb_idl_update_has_lock(idl, got_lock);
2626     if (!got_lock) {
2627         idl->is_lock_contended = true;
2628     }
2629 }
2630
2631 static void
2632 ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
2633                             const struct json *params,
2634                             bool new_has_lock)
2635 {
2636     if (idl->lock_name
2637         && params->type == JSON_ARRAY
2638         && json_array(params)->n > 0
2639         && json_array(params)->elems[0]->type == JSON_STRING) {
2640         const char *lock_name = json_string(json_array(params)->elems[0]);
2641
2642         if (!strcmp(idl->lock_name, lock_name)) {
2643             ovsdb_idl_update_has_lock(idl, new_has_lock);
2644             if (!new_has_lock) {
2645                 idl->is_lock_contended = true;
2646             }
2647         }
2648     }
2649 }
2650
2651 void
2652 ovsdb_idl_loop_destroy(struct ovsdb_idl_loop *loop)
2653 {
2654     if (loop) {
2655         ovsdb_idl_destroy(loop->idl);
2656     }
2657 }
2658
2659 struct ovsdb_idl_txn *
2660 ovsdb_idl_loop_run(struct ovsdb_idl_loop *loop)
2661 {
2662     ovsdb_idl_run(loop->idl);
2663     loop->open_txn = (loop->committing_txn
2664                       || ovsdb_idl_get_seqno(loop->idl) == loop->skip_seqno
2665                       ? NULL
2666                       : ovsdb_idl_txn_create(loop->idl));
2667     return loop->open_txn;
2668 }
2669
2670 void
2671 ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
2672 {
2673     if (loop->open_txn) {
2674         loop->committing_txn = loop->open_txn;
2675         loop->open_txn = NULL;
2676
2677         loop->precommit_seqno = ovsdb_idl_get_seqno(loop->idl);
2678     }
2679
2680     struct ovsdb_idl_txn *txn = loop->committing_txn;
2681     if (txn) {
2682         enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(txn);
2683         if (status != TXN_INCOMPLETE) {
2684             switch (status) {
2685             case TXN_TRY_AGAIN:
2686                 /* We want to re-evaluate the database when it's changed from
2687                  * the contents that it had when we started the commit.  (That
2688                  * might have already happened.) */
2689                 loop->skip_seqno = loop->precommit_seqno;
2690                 if (ovsdb_idl_get_seqno(loop->idl) != loop->skip_seqno) {
2691                     poll_immediate_wake();
2692                 }
2693                 break;
2694
2695             case TXN_SUCCESS:
2696                 /* If the database has already changed since we started the
2697                  * commit, re-evaluate it immediately to avoid missing a change
2698                  * for a while. */
2699                 if (ovsdb_idl_get_seqno(loop->idl) != loop->precommit_seqno) {
2700                     poll_immediate_wake();
2701                 }
2702                 break;
2703
2704             case TXN_UNCHANGED:
2705             case TXN_ABORTED:
2706             case TXN_NOT_LOCKED:
2707             case TXN_ERROR:
2708                 break;
2709
2710             case TXN_UNCOMMITTED:
2711             case TXN_INCOMPLETE:
2712                 OVS_NOT_REACHED();
2713
2714             }
2715             ovsdb_idl_txn_destroy(txn);
2716             loop->committing_txn = NULL;
2717         }
2718     }
2719
2720     ovsdb_idl_wait(loop->idl);
2721 }