tnl-neigh-cache: check for arp expiration.
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "coverage.h"
27 #include "openvswitch/dynamic-string.h"
28 #include "fatal-signal.h"
29 #include "json.h"
30 #include "jsonrpc.h"
31 #include "ovsdb/ovsdb.h"
32 #include "ovsdb/table.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-idl-provider.h"
36 #include "ovsdb-parser.h"
37 #include "poll-loop.h"
38 #include "shash.h"
39 #include "sset.h"
40 #include "util.h"
41 #include "openvswitch/vlog.h"
42
43 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
44
45 COVERAGE_DEFINE(txn_uncommitted);
46 COVERAGE_DEFINE(txn_unchanged);
47 COVERAGE_DEFINE(txn_incomplete);
48 COVERAGE_DEFINE(txn_aborted);
49 COVERAGE_DEFINE(txn_success);
50 COVERAGE_DEFINE(txn_try_again);
51 COVERAGE_DEFINE(txn_not_locked);
52 COVERAGE_DEFINE(txn_error);
53
54 /* An arc from one idl_row to another.  When row A contains a UUID that
55  * references row B, this is represented by an arc from A (the source) to B
56  * (the destination).
57  *
58  * Arcs from a row to itself are omitted, that is, src and dst are always
59  * different.
60  *
61  * Arcs are never duplicated, that is, even if there are multiple references
62  * from A to B, there is only a single arc from A to B.
63  *
64  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
65  * A.  Both an arc and its converse may both be present, if each row refers
66  * to the other circularly.
67  *
68  * The source and destination row may be in the same table or in different
69  * tables.
70  */
71 struct ovsdb_idl_arc {
72     struct ovs_list src_node;   /* In src->src_arcs list. */
73     struct ovs_list dst_node;   /* In dst->dst_arcs list. */
74     struct ovsdb_idl_row *src;  /* Source row. */
75     struct ovsdb_idl_row *dst;  /* Destination row. */
76 };
77
78 enum ovsdb_idl_state {
79     IDL_S_SCHEMA_REQUESTED,
80     IDL_S_MONITOR_REQUESTED,
81     IDL_S_MONITORING,
82     IDL_S_MONITOR2_REQUESTED,
83     IDL_S_MONITORING2,
84     IDL_S_NO_SCHEMA
85 };
86
87 struct ovsdb_idl {
88     const struct ovsdb_idl_class *class;
89     struct jsonrpc_session *session;
90     struct shash table_by_name;
91     struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
92     unsigned int change_seqno;
93     bool verify_write_only;
94
95     /* Session state. */
96     unsigned int state_seqno;
97     enum ovsdb_idl_state state;
98     struct json *request_id;
99     struct json *schema;
100
101     /* Database locking. */
102     char *lock_name;            /* Name of lock we need, NULL if none. */
103     bool has_lock;              /* Has db server told us we have the lock? */
104     bool is_lock_contended;     /* Has db server told us we can't get lock? */
105     struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
106
107     /* Transaction support. */
108     struct ovsdb_idl_txn *txn;
109     struct hmap outstanding_txns;
110 };
111
112 struct ovsdb_idl_txn {
113     struct hmap_node hmap_node;
114     struct json *request_id;
115     struct ovsdb_idl *idl;
116     struct hmap txn_rows;
117     enum ovsdb_idl_txn_status status;
118     char *error;
119     bool dry_run;
120     struct ds comment;
121
122     /* Increments. */
123     const char *inc_table;
124     const char *inc_column;
125     struct uuid inc_row;
126     unsigned int inc_index;
127     int64_t inc_new_value;
128
129     /* Inserted rows. */
130     struct hmap inserted_rows;  /* Contains "struct ovsdb_idl_txn_insert"s. */
131 };
132
133 struct ovsdb_idl_txn_insert {
134     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
135     struct uuid dummy;          /* Dummy UUID used locally. */
136     int op_index;               /* Index into transaction's operation array. */
137     struct uuid real;           /* Real UUID used by database server. */
138 };
139
140 enum ovsdb_update_version {
141     OVSDB_UPDATE,               /* RFC 7047 "update" method. */
142     OVSDB_UPDATE2               /* "update2" Extension to RFC 7047.
143                                    See ovsdb-server(1) for more information. */
144 };
145
146 /* Name arrays indexed by 'enum ovsdb_update_version'. */
147 static const char *table_updates_names[] = {"table_updates", "table_updates2"};
148 static const char *table_update_names[] = {"table_update", "table_update2"};
149 static const char *row_update_names[] = {"row_update", "row_update2"};
150
151 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
152 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
153
154 static void ovsdb_idl_clear(struct ovsdb_idl *);
155 static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
156 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
157 static void ovsdb_idl_send_monitor2_request(struct ovsdb_idl *);
158 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *,
159                                    enum ovsdb_update_version);
160 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
161                                                     const struct json *,
162                                                     enum ovsdb_update_version);
163 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
164                                      const struct uuid *,
165                                      const struct json *old,
166                                      const struct json *new);
167 static bool ovsdb_idl_process_update2(struct ovsdb_idl_table *,
168                                       const struct uuid *,
169                                       const char *operation,
170                                       const struct json *row);
171 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
172 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
173 static bool ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
174 static bool ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *,
175                                          const struct json *);
176
177 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
178 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
179     const struct ovsdb_idl_table_class *);
180 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
181                                                   const struct uuid *);
182 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
183 static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *);
184
185 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
186 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
187 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
188 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
189 static void ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *, bool destroy_dsts);
190
191 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
192 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
193                                         const struct jsonrpc_msg *msg);
194
195 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
196 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
197 static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
198                                        const struct json *);
199 static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
200                                         const struct json *params,
201                                         bool new_has_lock);
202 static struct ovsdb_idl_table *
203 ovsdb_idl_table_from_class(const struct ovsdb_idl *,
204                            const struct ovsdb_idl_table_class *);
205 static bool ovsdb_idl_track_is_set(struct ovsdb_idl_table *table);
206
207 /* Creates and returns a connection to database 'remote', which should be in a
208  * form acceptable to jsonrpc_session_open().  The connection will maintain an
209  * in-memory replica of the remote database whose schema is described by
210  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
211  * by ovsdb-idlc.)
212  *
213  * Passes 'retry' to jsonrpc_session_open().  See that function for
214  * documentation.
215  *
216  * If 'monitor_everything_by_default' is true, then everything in the remote
217  * database will be replicated by default.  ovsdb_idl_omit() and
218  * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
219  * monitoring.
220  *
221  * If 'monitor_everything_by_default' is false, then no columns or tables will
222  * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
223  * must be used to choose some columns or tables to replicate.
224  */
225 struct ovsdb_idl *
226 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
227                  bool monitor_everything_by_default, bool retry)
228 {
229     struct ovsdb_idl *idl;
230     uint8_t default_mode;
231     size_t i;
232
233     default_mode = (monitor_everything_by_default
234                     ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
235                     : 0);
236
237     idl = xzalloc(sizeof *idl);
238     idl->class = class;
239     idl->session = jsonrpc_session_open(remote, retry);
240     shash_init(&idl->table_by_name);
241     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
242     for (i = 0; i < class->n_tables; i++) {
243         const struct ovsdb_idl_table_class *tc = &class->tables[i];
244         struct ovsdb_idl_table *table = &idl->tables[i];
245         size_t j;
246
247         shash_add_assert(&idl->table_by_name, tc->name, table);
248         table->class = tc;
249         table->modes = xmalloc(tc->n_columns);
250         memset(table->modes, default_mode, tc->n_columns);
251         table->need_table = false;
252         shash_init(&table->columns);
253         for (j = 0; j < tc->n_columns; j++) {
254             const struct ovsdb_idl_column *column = &tc->columns[j];
255
256             shash_add_assert(&table->columns, column->name, column);
257         }
258         hmap_init(&table->rows);
259         ovs_list_init(&table->track_list);
260         table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
261             = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
262             = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
263         table->idl = idl;
264     }
265
266     idl->state_seqno = UINT_MAX;
267     idl->request_id = NULL;
268     idl->schema = NULL;
269
270     hmap_init(&idl->outstanding_txns);
271
272     return idl;
273 }
274
275 /* Changes the remote and creates a new session. */
276 void
277 ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote,
278                      bool retry)
279 {
280     if (idl) {
281         ovs_assert(!idl->txn);
282         idl->session = jsonrpc_session_open(remote, retry);
283         idl->state_seqno = UINT_MAX;
284     }
285 }
286
287 /* Destroys 'idl' and all of the data structures that it manages. */
288 void
289 ovsdb_idl_destroy(struct ovsdb_idl *idl)
290 {
291     if (idl) {
292         size_t i;
293
294         ovs_assert(!idl->txn);
295         ovsdb_idl_clear(idl);
296         jsonrpc_session_close(idl->session);
297
298         for (i = 0; i < idl->class->n_tables; i++) {
299             struct ovsdb_idl_table *table = &idl->tables[i];
300             shash_destroy(&table->columns);
301             hmap_destroy(&table->rows);
302             free(table->modes);
303         }
304         shash_destroy(&idl->table_by_name);
305         free(idl->tables);
306         json_destroy(idl->request_id);
307         free(idl->lock_name);
308         json_destroy(idl->lock_request_id);
309         json_destroy(idl->schema);
310         hmap_destroy(&idl->outstanding_txns);
311         free(idl);
312     }
313 }
314
315 static void
316 ovsdb_idl_clear(struct ovsdb_idl *idl)
317 {
318     bool changed = false;
319     size_t i;
320
321     for (i = 0; i < idl->class->n_tables; i++) {
322         struct ovsdb_idl_table *table = &idl->tables[i];
323         struct ovsdb_idl_row *row, *next_row;
324
325         if (hmap_is_empty(&table->rows)) {
326             continue;
327         }
328
329         changed = true;
330         HMAP_FOR_EACH_SAFE (row, next_row, hmap_node, &table->rows) {
331             struct ovsdb_idl_arc *arc, *next_arc;
332
333             if (!ovsdb_idl_row_is_orphan(row)) {
334                 ovsdb_idl_row_unparse(row);
335             }
336             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
337                 free(arc);
338             }
339             /* No need to do anything with dst_arcs: some node has those arcs
340              * as forward arcs and will destroy them itself. */
341
342             if (!ovs_list_is_empty(&row->track_node)) {
343                 ovs_list_remove(&row->track_node);
344             }
345
346             ovsdb_idl_row_destroy(row);
347         }
348     }
349
350     ovsdb_idl_track_clear(idl);
351
352     if (changed) {
353         idl->change_seqno++;
354     }
355 }
356
357 /* Processes a batch of messages from the database server on 'idl'.  This may
358  * cause the IDL's contents to change.  The client may check for that with
359  * ovsdb_idl_get_seqno(). */
360 void
361 ovsdb_idl_run(struct ovsdb_idl *idl)
362 {
363     int i;
364
365     ovs_assert(!idl->txn);
366     jsonrpc_session_run(idl->session);
367     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
368         struct jsonrpc_msg *msg;
369         unsigned int seqno;
370
371         seqno = jsonrpc_session_get_seqno(idl->session);
372         if (idl->state_seqno != seqno) {
373             idl->state_seqno = seqno;
374             json_destroy(idl->request_id);
375             idl->request_id = NULL;
376             ovsdb_idl_txn_abort_all(idl);
377
378             ovsdb_idl_send_schema_request(idl);
379             idl->state = IDL_S_SCHEMA_REQUESTED;
380             if (idl->lock_name) {
381                 ovsdb_idl_send_lock_request(idl);
382             }
383         }
384
385         msg = jsonrpc_session_recv(idl->session);
386         if (!msg) {
387             break;
388         }
389
390         if (msg->type == JSONRPC_NOTIFY
391             && !strcmp(msg->method, "update2")
392             && msg->params->type == JSON_ARRAY
393             && msg->params->u.array.n == 2
394             && msg->params->u.array.elems[0]->type == JSON_NULL) {
395             /* Database contents changed. */
396             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
397                                    OVSDB_UPDATE2);
398         } else if (msg->type == JSONRPC_REPLY
399                    && idl->request_id
400                    && json_equal(idl->request_id, msg->id)) {
401             json_destroy(idl->request_id);
402             idl->request_id = NULL;
403
404             switch (idl->state) {
405             case IDL_S_SCHEMA_REQUESTED:
406                 /* Reply to our "get_schema" request. */
407                 idl->schema = json_clone(msg->result);
408                 ovsdb_idl_send_monitor2_request(idl);
409                 idl->state = IDL_S_MONITOR2_REQUESTED;
410                 break;
411
412             case IDL_S_MONITOR_REQUESTED:
413             case IDL_S_MONITOR2_REQUESTED:
414                 /* Reply to our "monitor" or "monitor2" request. */
415                 idl->change_seqno++;
416                 ovsdb_idl_clear(idl);
417                 if (idl->state == IDL_S_MONITOR_REQUESTED) {
418                     idl->state = IDL_S_MONITORING;
419                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE);
420                 } else { /* IDL_S_MONITOR2_REQUESTED. */
421                     idl->state = IDL_S_MONITORING2;
422                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE2);
423                 }
424
425                 /* Schema is not useful after monitor request is accepted
426                  * by the server.  */
427                 json_destroy(idl->schema);
428                 idl->schema = NULL;
429                 break;
430
431             case IDL_S_MONITORING:
432             case IDL_S_MONITORING2:
433             case IDL_S_NO_SCHEMA:
434             default:
435                 OVS_NOT_REACHED();
436             }
437         } else if (msg->type == JSONRPC_NOTIFY
438             && !strcmp(msg->method, "update")
439             && msg->params->type == JSON_ARRAY
440             && msg->params->u.array.n == 2
441             && msg->params->u.array.elems[0]->type == JSON_NULL) {
442             /* Database contents changed. */
443             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
444                                    OVSDB_UPDATE);
445         } else if (msg->type == JSONRPC_REPLY
446                    && idl->lock_request_id
447                    && json_equal(idl->lock_request_id, msg->id)) {
448             /* Reply to our "lock" request. */
449             ovsdb_idl_parse_lock_reply(idl, msg->result);
450         } else if (msg->type == JSONRPC_NOTIFY
451                    && !strcmp(msg->method, "locked")) {
452             /* We got our lock. */
453             ovsdb_idl_parse_lock_notify(idl, msg->params, true);
454         } else if (msg->type == JSONRPC_NOTIFY
455                    && !strcmp(msg->method, "stolen")) {
456             /* Someone else stole our lock. */
457             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
458         } else if (msg->type == JSONRPC_ERROR
459                    && idl->state == IDL_S_MONITOR2_REQUESTED
460                    && idl->request_id
461                    && json_equal(idl->request_id, msg->id)) {
462             if (msg->error && !strcmp(json_string(msg->error),
463                                       "unknown method")) {
464                 /* Fall back to using "monitor" method.  */
465                 json_destroy(idl->request_id);
466                 idl->request_id = NULL;
467                 ovsdb_idl_send_monitor_request(idl);
468                 idl->state = IDL_S_MONITOR_REQUESTED;
469             }
470         } else if (msg->type == JSONRPC_ERROR
471                    && idl->state == IDL_S_SCHEMA_REQUESTED
472                    && idl->request_id
473                    && json_equal(idl->request_id, msg->id)) {
474                 json_destroy(idl->request_id);
475                 idl->request_id = NULL;
476                 VLOG_ERR("%s: requested schema not found",
477                          jsonrpc_session_get_name(idl->session));
478                 idl->state = IDL_S_NO_SCHEMA;
479         } else if ((msg->type == JSONRPC_ERROR
480                     || msg->type == JSONRPC_REPLY)
481                    && ovsdb_idl_txn_process_reply(idl, msg)) {
482             /* ovsdb_idl_txn_process_reply() did everything needful. */
483         } else {
484             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
485              * a transaction before we receive the reply, so keep the log level
486              * low. */
487             VLOG_DBG("%s: received unexpected %s message",
488                      jsonrpc_session_get_name(idl->session),
489                      jsonrpc_msg_type_to_string(msg->type));
490         }
491         jsonrpc_msg_destroy(msg);
492     }
493     ovsdb_idl_row_destroy_postprocess(idl);
494 }
495
496 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
497  * do or when activity occurs on a transaction on 'idl'. */
498 void
499 ovsdb_idl_wait(struct ovsdb_idl *idl)
500 {
501     jsonrpc_session_wait(idl->session);
502     jsonrpc_session_recv_wait(idl->session);
503 }
504
505 /* Returns a "sequence number" that represents the state of 'idl'.  When
506  * ovsdb_idl_run() changes the database, the sequence number changes.  The
507  * initial fetch of the entire contents of the remote database is considered to
508  * be one kind of change.  Successfully acquiring a lock, if one has been
509  * configured with ovsdb_idl_set_lock(), is also considered to be a change.
510  *
511  * As long as the sequence number does not change, the client may continue to
512  * use any data structures it obtains from 'idl'.  But when it changes, the
513  * client must not access any of these data structures again, because they
514  * could have freed or reused for other purposes.
515  *
516  * The sequence number can occasionally change even if the database does not.
517  * This happens if the connection to the database drops and reconnects, which
518  * causes the database contents to be reloaded even if they didn't change.  (It
519  * could also happen if the database server sends out a "change" that reflects
520  * what the IDL already thought was in the database.  The database server is
521  * not supposed to do that, but bugs could in theory cause it to do so.) */
522 unsigned int
523 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
524 {
525     return idl->change_seqno;
526 }
527
528 /* Returns true if 'idl' successfully connected to the remote database and
529  * retrieved its contents (even if the connection subsequently dropped and is
530  * in the process of reconnecting).  If so, then 'idl' contains an atomic
531  * snapshot of the database's contents (but it might be arbitrarily old if the
532  * connection dropped).
533  *
534  * Returns false if 'idl' has never connected or retrieved the database's
535  * contents.  If so, 'idl' is empty. */
536 bool
537 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
538 {
539     return ovsdb_idl_get_seqno(idl) != 0;
540 }
541
542 /* Reconfigures 'idl' so that it would reconnect to the database, if
543  * connection was dropped. */
544 void
545 ovsdb_idl_enable_reconnect(struct ovsdb_idl *idl)
546 {
547     jsonrpc_session_enable_reconnect(idl->session);
548 }
549
550 /* Forces 'idl' to drop its connection to the database and reconnect.  In the
551  * meantime, the contents of 'idl' will not change. */
552 void
553 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
554 {
555     jsonrpc_session_force_reconnect(idl->session);
556 }
557
558 /* Some IDL users should only write to write-only columns.  Furthermore,
559  * writing to a column which is not write-only can cause serious performance
560  * degradations for these users.  This function causes 'idl' to reject writes
561  * to columns which are not marked write only using ovsdb_idl_omit_alert(). */
562 void
563 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
564 {
565     idl->verify_write_only = true;
566 }
567
568 /* Returns true if 'idl' is currently connected or trying to connect
569  * and a negative response to a schema request has not been received */
570 bool
571 ovsdb_idl_is_alive(const struct ovsdb_idl *idl)
572 {
573     return jsonrpc_session_is_alive(idl->session) &&
574            idl->state != IDL_S_NO_SCHEMA;
575 }
576
577 /* Returns the last error reported on a connection by 'idl'.  The return value
578  * is 0 only if no connection made by 'idl' has ever encountered an error and
579  * a negative response to a schema request has never been received. See
580  * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value
581  * interpretation. */
582 int
583 ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
584 {
585     int err;
586
587     err = jsonrpc_session_get_last_error(idl->session);
588
589     if (err) {
590         return err;
591     } else if (idl->state == IDL_S_NO_SCHEMA) {
592         return ENOENT;
593     } else {
594         return 0;
595     }
596 }
597
598 /* Sets the "probe interval" for 'idl->session' to 'probe_interval', in
599  * milliseconds.
600  */
601 void
602 ovsdb_idl_set_probe_interval(const struct ovsdb_idl *idl, int probe_interval)
603 {
604     jsonrpc_session_set_probe_interval(idl->session, probe_interval);
605 }
606 \f
607 static unsigned char *
608 ovsdb_idl_get_mode(struct ovsdb_idl *idl,
609                    const struct ovsdb_idl_column *column)
610 {
611     size_t i;
612
613     ovs_assert(!idl->change_seqno);
614
615     for (i = 0; i < idl->class->n_tables; i++) {
616         const struct ovsdb_idl_table *table = &idl->tables[i];
617         const struct ovsdb_idl_table_class *tc = table->class;
618
619         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
620             return &table->modes[column - tc->columns];
621         }
622     }
623
624     OVS_NOT_REACHED();
625 }
626
627 static void
628 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
629 {
630     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
631         struct ovsdb_idl_table *table;
632
633         table = shash_find_data(&idl->table_by_name,
634                                 base->u.uuid.refTableName);
635         if (table) {
636             table->need_table = true;
637         } else {
638             VLOG_WARN("%s IDL class missing referenced table %s",
639                       idl->class->database, base->u.uuid.refTableName);
640         }
641     }
642 }
643
644 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
645  * ensures that any tables referenced by 'column' will be replicated, even if
646  * no columns in that table are selected for replication (see
647  * ovsdb_idl_add_table() for more information).
648  *
649  * This function is only useful if 'monitor_everything_by_default' was false in
650  * the call to ovsdb_idl_create().  This function should be called between
651  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
652  */
653 void
654 ovsdb_idl_add_column(struct ovsdb_idl *idl,
655                      const struct ovsdb_idl_column *column)
656 {
657     *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
658     add_ref_table(idl, &column->type.key);
659     add_ref_table(idl, &column->type.value);
660 }
661
662 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
663  * no columns are selected for replication. Just the necessary data for table
664  * references will be replicated (the UUID of the rows, for instance), any
665  * columns not selected for replication will remain unreplicated.
666  * This can be useful because it allows 'idl' to keep track of what rows in the
667  * table actually exist, which in turn allows columns that reference the table
668  * to have accurate contents. (The IDL presents the database with references to
669  * rows that do not exist removed.)
670  *
671  * This function is only useful if 'monitor_everything_by_default' was false in
672  * the call to ovsdb_idl_create().  This function should be called between
673  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
674  */
675 void
676 ovsdb_idl_add_table(struct ovsdb_idl *idl,
677                     const struct ovsdb_idl_table_class *tc)
678 {
679     size_t i;
680
681     for (i = 0; i < idl->class->n_tables; i++) {
682         struct ovsdb_idl_table *table = &idl->tables[i];
683
684         if (table->class == tc) {
685             table->need_table = true;
686             return;
687         }
688     }
689
690     OVS_NOT_REACHED();
691 }
692
693 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
694  *
695  * This function should be called between ovsdb_idl_create() and the first call
696  * to ovsdb_idl_run().
697  */
698 void
699 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
700                      const struct ovsdb_idl_column *column)
701 {
702     *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
703 }
704
705 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
706  * OVSDB_IDL_MONITOR for details.
707  *
708  * This function should be called between ovsdb_idl_create() and the first call
709  * to ovsdb_idl_run().
710  */
711 void
712 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
713 {
714     *ovsdb_idl_get_mode(idl, column) = 0;
715 }
716
717 /* Returns the most recent IDL change sequence number that caused a
718  * insert, modify or delete update to the table with class 'table_class'.
719  */
720 unsigned int
721 ovsdb_idl_table_get_seqno(const struct ovsdb_idl *idl,
722                           const struct ovsdb_idl_table_class *table_class)
723 {
724     struct ovsdb_idl_table *table
725         = ovsdb_idl_table_from_class(idl, table_class);
726     unsigned int max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_INSERT];
727
728     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]) {
729         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY];
730     }
731     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_DELETE]) {
732         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_DELETE];
733     }
734     return max_seqno;
735 }
736
737 /* For each row that contains tracked columns, IDL stores the most
738  * recent IDL change sequence numbers associateed with insert, modify
739  * and delete updates to the table.
740  */
741 unsigned int
742 ovsdb_idl_row_get_seqno(const struct ovsdb_idl_row *row,
743                         enum ovsdb_idl_change change)
744 {
745     return row->change_seqno[change];
746 }
747
748 /* Turns on OVSDB_IDL_TRACK for 'column' in 'idl', ensuring that
749  * all rows whose 'column' is modified are traced. Similarly, insert
750  * or delete of rows having 'column' are tracked. Clients are able
751  * to retrive the tracked rows with the ovsdb_idl_track_get_*()
752  * functions.
753  *
754  * This function should be called between ovsdb_idl_create() and
755  * the first call to ovsdb_idl_run(). The column to be tracked
756  * should have OVSDB_IDL_ALERT turned on.
757  */
758 void
759 ovsdb_idl_track_add_column(struct ovsdb_idl *idl,
760                            const struct ovsdb_idl_column *column)
761 {
762     if (!(*ovsdb_idl_get_mode(idl, column) & OVSDB_IDL_ALERT)) {
763         ovsdb_idl_add_column(idl, column);
764     }
765     *ovsdb_idl_get_mode(idl, column) |= OVSDB_IDL_TRACK;
766 }
767
768 void
769 ovsdb_idl_track_add_all(struct ovsdb_idl *idl)
770 {
771     size_t i, j;
772
773     for (i = 0; i < idl->class->n_tables; i++) {
774         const struct ovsdb_idl_table_class *tc = &idl->class->tables[i];
775
776         for (j = 0; j < tc->n_columns; j++) {
777             const struct ovsdb_idl_column *column = &tc->columns[j];
778             ovsdb_idl_track_add_column(idl, column);
779         }
780     }
781 }
782
783 /* Returns true if 'table' has any tracked column. */
784 static bool
785 ovsdb_idl_track_is_set(struct ovsdb_idl_table *table)
786 {
787     size_t i;
788
789     for (i = 0; i < table->class->n_columns; i++) {
790         if (table->modes[i] & OVSDB_IDL_TRACK) {
791             return true;
792         }
793     }
794    return false;
795 }
796
797 /* Returns the first tracked row in table with class 'table_class'
798  * for the specified 'idl'. Returns NULL if there are no tracked rows */
799 const struct ovsdb_idl_row *
800 ovsdb_idl_track_get_first(const struct ovsdb_idl *idl,
801                           const struct ovsdb_idl_table_class *table_class)
802 {
803     struct ovsdb_idl_table *table
804         = ovsdb_idl_table_from_class(idl, table_class);
805
806     if (!ovs_list_is_empty(&table->track_list)) {
807         return CONTAINER_OF(ovs_list_front(&table->track_list), struct ovsdb_idl_row, track_node);
808     }
809     return NULL;
810 }
811
812 /* Returns the next tracked row in table after the specified 'row'
813  * (in no particular order). Returns NULL if there are no tracked rows */
814 const struct ovsdb_idl_row *
815 ovsdb_idl_track_get_next(const struct ovsdb_idl_row *row)
816 {
817     if (row->track_node.next != &row->table->track_list) {
818         return CONTAINER_OF(row->track_node.next, struct ovsdb_idl_row, track_node);
819     }
820
821     return NULL;
822 }
823
824 /* Returns true if a tracked 'column' in 'row' was updated by IDL, false
825  * otherwise. The tracking data is cleared by ovsdb_idl_track_clear()
826  *
827  * Function returns false if 'column' is not tracked (see
828  * ovsdb_idl_track_add_column()).
829  */
830 bool
831 ovsdb_idl_track_is_updated(const struct ovsdb_idl_row *row,
832                            const struct ovsdb_idl_column *column)
833 {
834     const struct ovsdb_idl_table_class *class;
835     size_t column_idx;
836
837     class = row->table->class;
838     column_idx = column - class->columns;
839
840     if (row->updated && bitmap_is_set(row->updated, column_idx)) {
841         return true;
842     } else {
843         return false;
844     }
845 }
846
847 /* Flushes the tracked rows. Client calls this function after calling
848  * ovsdb_idl_run() and read all tracked rows with the ovsdb_idl_track_get_*()
849  * functions. This is usually done at the end of the client's processing
850  * loop when it is ready to do ovsdb_idl_run() again.
851  */
852 void
853 ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
854 {
855     size_t i;
856
857     for (i = 0; i < idl->class->n_tables; i++) {
858         struct ovsdb_idl_table *table = &idl->tables[i];
859
860         if (!ovs_list_is_empty(&table->track_list)) {
861             struct ovsdb_idl_row *row, *next;
862
863             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
864                 if (row->updated) {
865                     free(row->updated);
866                     row->updated = NULL;
867                 }
868                 ovs_list_remove(&row->track_node);
869                 ovs_list_init(&row->track_node);
870                 if (ovsdb_idl_row_is_orphan(row)) {
871                     ovsdb_idl_row_clear_old(row);
872                     free(row);
873                 }
874             }
875         }
876     }
877 }
878
879 \f
880 static void
881 ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
882 {
883     struct jsonrpc_msg *msg;
884
885     json_destroy(idl->request_id);
886     msg = jsonrpc_create_request(
887         "get_schema",
888         json_array_create_1(json_string_create(idl->class->database)),
889         &idl->request_id);
890     jsonrpc_session_send(idl->session, msg);
891 }
892
893 static void
894 log_error(struct ovsdb_error *error)
895 {
896     char *s = ovsdb_error_to_string(error);
897     VLOG_WARN("error parsing database schema: %s", s);
898     free(s);
899     ovsdb_error_destroy(error);
900 }
901
902 /* Frees 'schema', which is in the format returned by parse_schema(). */
903 static void
904 free_schema(struct shash *schema)
905 {
906     if (schema) {
907         struct shash_node *node, *next;
908
909         SHASH_FOR_EACH_SAFE (node, next, schema) {
910             struct sset *sset = node->data;
911             sset_destroy(sset);
912             free(sset);
913             shash_delete(schema, node);
914         }
915         shash_destroy(schema);
916         free(schema);
917     }
918 }
919
920 /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
921  * 7047, to obtain the names of its rows and columns.  If successful, returns
922  * an shash whose keys are table names and whose values are ssets, where each
923  * sset contains the names of its table's columns.  On failure (due to a parse
924  * error), returns NULL.
925  *
926  * It would also be possible to use the general-purpose OVSDB schema parser in
927  * ovsdb-server, but that's overkill, possibly too strict for the current use
928  * case, and would require restructuring ovsdb-server to separate the schema
929  * code from the rest. */
930 static struct shash *
931 parse_schema(const struct json *schema_json)
932 {
933     struct ovsdb_parser parser;
934     const struct json *tables_json;
935     struct ovsdb_error *error;
936     struct shash_node *node;
937     struct shash *schema;
938
939     ovsdb_parser_init(&parser, schema_json, "database schema");
940     tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
941     error = ovsdb_parser_destroy(&parser);
942     if (error) {
943         log_error(error);
944         return NULL;
945     }
946
947     schema = xmalloc(sizeof *schema);
948     shash_init(schema);
949     SHASH_FOR_EACH (node, json_object(tables_json)) {
950         const char *table_name = node->name;
951         const struct json *json = node->data;
952         const struct json *columns_json;
953
954         ovsdb_parser_init(&parser, json, "table schema for table %s",
955                           table_name);
956         columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
957         error = ovsdb_parser_destroy(&parser);
958         if (error) {
959             log_error(error);
960             free_schema(schema);
961             return NULL;
962         }
963
964         struct sset *columns = xmalloc(sizeof *columns);
965         sset_init(columns);
966
967         struct shash_node *node2;
968         SHASH_FOR_EACH (node2, json_object(columns_json)) {
969             const char *column_name = node2->name;
970             sset_add(columns, column_name);
971         }
972         shash_add(schema, table_name, columns);
973     }
974     return schema;
975 }
976
977 static void
978 ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
979                                  const char *method)
980 {
981     struct shash *schema;
982     struct json *monitor_requests;
983     struct jsonrpc_msg *msg;
984     size_t i;
985
986     schema = parse_schema(idl->schema);
987     monitor_requests = json_object_create();
988     for (i = 0; i < idl->class->n_tables; i++) {
989         const struct ovsdb_idl_table *table = &idl->tables[i];
990         const struct ovsdb_idl_table_class *tc = table->class;
991         struct json *monitor_request, *columns;
992         const struct sset *table_schema;
993         size_t j;
994
995         table_schema = (schema
996                         ? shash_find_data(schema, table->class->name)
997                         : NULL);
998
999         columns = table->need_table ? json_array_create_empty() : NULL;
1000         for (j = 0; j < tc->n_columns; j++) {
1001             const struct ovsdb_idl_column *column = &tc->columns[j];
1002             if (table->modes[j] & OVSDB_IDL_MONITOR) {
1003                 if (table_schema
1004                     && !sset_contains(table_schema, column->name)) {
1005                     VLOG_WARN("%s table in %s database lacks %s column "
1006                               "(database needs upgrade?)",
1007                               table->class->name, idl->class->database,
1008                               column->name);
1009                     continue;
1010                 }
1011                 if (!columns) {
1012                     columns = json_array_create_empty();
1013                 }
1014                 json_array_add(columns, json_string_create(column->name));
1015             }
1016         }
1017
1018         if (columns) {
1019             if (schema && !table_schema) {
1020                 VLOG_WARN("%s database lacks %s table "
1021                           "(database needs upgrade?)",
1022                           idl->class->database, table->class->name);
1023                 json_destroy(columns);
1024                 continue;
1025             }
1026
1027             monitor_request = json_object_create();
1028             json_object_put(monitor_request, "columns", columns);
1029             json_object_put(monitor_requests, tc->name, monitor_request);
1030         }
1031     }
1032     free_schema(schema);
1033
1034     json_destroy(idl->request_id);
1035     msg = jsonrpc_create_request(
1036         method,
1037         json_array_create_3(json_string_create(idl->class->database),
1038                             json_null_create(), monitor_requests),
1039         &idl->request_id);
1040     jsonrpc_session_send(idl->session, msg);
1041 }
1042
1043 static void
1044 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
1045 {
1046     ovsdb_idl_send_monitor_request__(idl, "monitor");
1047 }
1048
1049 static void
1050 log_parse_update_error(struct ovsdb_error *error)
1051 {
1052         if (!VLOG_DROP_WARN(&syntax_rl)) {
1053             char *s = ovsdb_error_to_string(error);
1054             VLOG_WARN_RL(&syntax_rl, "%s", s);
1055             free(s);
1056         }
1057         ovsdb_error_destroy(error);
1058 }
1059
1060 static void
1061 ovsdb_idl_send_monitor2_request(struct ovsdb_idl *idl)
1062 {
1063     ovsdb_idl_send_monitor_request__(idl, "monitor2");
1064 }
1065
1066 static void
1067 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates,
1068                        enum ovsdb_update_version version)
1069 {
1070     struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates,
1071                                                          version);
1072     if (error) {
1073         log_parse_update_error(error);
1074     }
1075 }
1076
1077 static struct ovsdb_error *
1078 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
1079                          const struct json *table_updates,
1080                          enum ovsdb_update_version version)
1081 {
1082     const struct shash_node *tables_node;
1083     const char *table_updates_name = table_updates_names[version];
1084     const char *table_update_name = table_update_names[version];
1085     const char *row_update_name = row_update_names[version];
1086
1087     if (table_updates->type != JSON_OBJECT) {
1088         return ovsdb_syntax_error(table_updates, NULL,
1089                                   "<%s> is not an object",
1090                                   table_updates_name);
1091     }
1092
1093     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
1094         const struct json *table_update = tables_node->data;
1095         const struct shash_node *table_node;
1096         struct ovsdb_idl_table *table;
1097
1098         table = shash_find_data(&idl->table_by_name, tables_node->name);
1099         if (!table) {
1100             return ovsdb_syntax_error(
1101                 table_updates, NULL,
1102                 "<%s> includes unknown table \"%s\"",
1103                 table_updates_name,
1104                 tables_node->name);
1105         }
1106
1107         if (table_update->type != JSON_OBJECT) {
1108             return ovsdb_syntax_error(table_update, NULL,
1109                                       "<%s> for table \"%s\" is "
1110                                       "not an object",
1111                                       table_update_name,
1112                                       table->class->name);
1113         }
1114         SHASH_FOR_EACH (table_node, json_object(table_update)) {
1115             const struct json *row_update = table_node->data;
1116             const struct json *old_json, *new_json;
1117             struct uuid uuid;
1118
1119             if (!uuid_from_string(&uuid, table_node->name)) {
1120                 return ovsdb_syntax_error(table_update, NULL,
1121                                           "<%s> for table \"%s\" "
1122                                           "contains bad UUID "
1123                                           "\"%s\" as member name",
1124                                           table_update_name,
1125                                           table->class->name,
1126                                           table_node->name);
1127             }
1128             if (row_update->type != JSON_OBJECT) {
1129                 return ovsdb_syntax_error(row_update, NULL,
1130                                           "<%s> for table \"%s\" "
1131                                           "contains <%s> for %s that "
1132                                           "is not an object",
1133                                           table_update_name,
1134                                           table->class->name,
1135                                           row_update_name,
1136                                           table_node->name);
1137             }
1138
1139             switch(version) {
1140             case OVSDB_UPDATE:
1141                 old_json = shash_find_data(json_object(row_update), "old");
1142                 new_json = shash_find_data(json_object(row_update), "new");
1143                 if (old_json && old_json->type != JSON_OBJECT) {
1144                     return ovsdb_syntax_error(old_json, NULL,
1145                                               "\"old\" <row> is not object");
1146                 } else if (new_json && new_json->type != JSON_OBJECT) {
1147                     return ovsdb_syntax_error(new_json, NULL,
1148                                               "\"new\" <row> is not object");
1149                 } else if ((old_json != NULL) + (new_json != NULL)
1150                            != shash_count(json_object(row_update))) {
1151                     return ovsdb_syntax_error(row_update, NULL,
1152                                               "<row-update> contains "
1153                                               "unexpected member");
1154                 } else if (!old_json && !new_json) {
1155                     return ovsdb_syntax_error(row_update, NULL,
1156                                               "<row-update> missing \"old\" "
1157                                               "and \"new\" members");
1158                 }
1159
1160                 if (ovsdb_idl_process_update(table, &uuid, old_json,
1161                                              new_json)) {
1162                     idl->change_seqno++;
1163                 }
1164                 break;
1165
1166             case OVSDB_UPDATE2: {
1167                 const char *ops[] = {"modify", "insert", "delete", "initial"};
1168                 const char *operation;
1169                 const struct json *row;
1170                 int i;
1171
1172                 for (i = 0; i < ARRAY_SIZE(ops); i++) {
1173                     operation = ops[i];
1174                     row = shash_find_data(json_object(row_update), operation);
1175
1176                     if (row)  {
1177                         if (ovsdb_idl_process_update2(table, &uuid, operation,
1178                                                       row)) {
1179                             idl->change_seqno++;
1180                         }
1181                         break;
1182                     }
1183                 }
1184
1185                 /* row_update2 should contain one of the objects */
1186                 if (i == ARRAY_SIZE(ops)) {
1187                     return ovsdb_syntax_error(row_update, NULL,
1188                                               "<row_update2> includes unknown "
1189                                               "object");
1190                 }
1191                 break;
1192             }
1193
1194             default:
1195                 OVS_NOT_REACHED();
1196             }
1197         }
1198     }
1199
1200     return NULL;
1201 }
1202
1203 static struct ovsdb_idl_row *
1204 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
1205 {
1206     struct ovsdb_idl_row *row;
1207
1208     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &table->rows) {
1209         if (uuid_equals(&row->uuid, uuid)) {
1210             return row;
1211         }
1212     }
1213     return NULL;
1214 }
1215
1216 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1217  * otherwise. */
1218 static bool
1219 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
1220                          const struct uuid *uuid, const struct json *old,
1221                          const struct json *new)
1222 {
1223     struct ovsdb_idl_row *row;
1224
1225     row = ovsdb_idl_get_row(table, uuid);
1226     if (!new) {
1227         /* Delete row. */
1228         if (row && !ovsdb_idl_row_is_orphan(row)) {
1229             /* XXX perhaps we should check the 'old' values? */
1230             ovsdb_idl_delete_row(row);
1231         } else {
1232             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1233                          "from table %s",
1234                          UUID_ARGS(uuid), table->class->name);
1235             return false;
1236         }
1237     } else if (!old) {
1238         /* Insert row. */
1239         if (!row) {
1240             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1241         } else if (ovsdb_idl_row_is_orphan(row)) {
1242             ovsdb_idl_insert_row(row, new);
1243         } else {
1244             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1245                          "table %s", UUID_ARGS(uuid), table->class->name);
1246             return ovsdb_idl_modify_row(row, new);
1247         }
1248     } else {
1249         /* Modify row. */
1250         if (row) {
1251             /* XXX perhaps we should check the 'old' values? */
1252             if (!ovsdb_idl_row_is_orphan(row)) {
1253                 return ovsdb_idl_modify_row(row, new);
1254             } else {
1255                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1256                              "referenced row "UUID_FMT" in table %s",
1257                              UUID_ARGS(uuid), table->class->name);
1258                 ovsdb_idl_insert_row(row, new);
1259             }
1260         } else {
1261             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1262                          "in table %s", UUID_ARGS(uuid), table->class->name);
1263             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1264         }
1265     }
1266
1267     return true;
1268 }
1269
1270 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1271  * otherwise. */
1272 static bool
1273 ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
1274                           const struct uuid *uuid,
1275                           const char *operation,
1276                           const struct json *json_row)
1277 {
1278     struct ovsdb_idl_row *row;
1279
1280     row = ovsdb_idl_get_row(table, uuid);
1281     if (!strcmp(operation, "delete")) {
1282         /* Delete row. */
1283         if (row && !ovsdb_idl_row_is_orphan(row)) {
1284             ovsdb_idl_delete_row(row);
1285         } else {
1286             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1287                          "from table %s",
1288                          UUID_ARGS(uuid), table->class->name);
1289             return false;
1290         }
1291     } else if (!strcmp(operation, "insert") || !strcmp(operation, "initial")) {
1292         /* Insert row. */
1293         if (!row) {
1294             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), json_row);
1295         } else if (ovsdb_idl_row_is_orphan(row)) {
1296             ovsdb_idl_insert_row(row, json_row);
1297         } else {
1298             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1299                          "table %s", UUID_ARGS(uuid), table->class->name);
1300             ovsdb_idl_delete_row(row);
1301             ovsdb_idl_insert_row(row, json_row);
1302         }
1303     } else if (!strcmp(operation, "modify")) {
1304         /* Modify row. */
1305         if (row) {
1306             if (!ovsdb_idl_row_is_orphan(row)) {
1307                 return ovsdb_idl_modify_row_by_diff(row, json_row);
1308             } else {
1309                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1310                              "referenced row "UUID_FMT" in table %s",
1311                              UUID_ARGS(uuid), table->class->name);
1312                 return false;
1313             }
1314         } else {
1315             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1316                          "in table %s", UUID_ARGS(uuid), table->class->name);
1317             return false;
1318         }
1319     } else {
1320             VLOG_WARN_RL(&semantic_rl, "unknown operation %s to "
1321                          "table %s", operation, table->class->name);
1322             return false;
1323     }
1324
1325     return true;
1326 }
1327
1328 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1329  * otherwise.
1330  *
1331  * Change 'row' either with the content of 'row_json' or by apply 'diff'.
1332  * Caller needs to provide either valid 'row_json' or 'diff', but not
1333  * both.  */
1334 static bool
1335 ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
1336                        const struct json *diff_json,
1337                        enum ovsdb_idl_change change)
1338 {
1339     struct ovsdb_idl_table *table = row->table;
1340     const struct ovsdb_idl_table_class *class = table->class;
1341     struct shash_node *node;
1342     bool changed = false;
1343     bool apply_diff = diff_json != NULL;
1344     const struct json *json = apply_diff ? diff_json : row_json;
1345
1346     SHASH_FOR_EACH (node, json_object(json)) {
1347         const char *column_name = node->name;
1348         const struct ovsdb_idl_column *column;
1349         struct ovsdb_datum datum;
1350         struct ovsdb_error *error;
1351         unsigned int column_idx;
1352         struct ovsdb_datum *old;
1353
1354         column = shash_find_data(&table->columns, column_name);
1355         if (!column) {
1356             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
1357                          column_name, UUID_ARGS(&row->uuid));
1358             continue;
1359         }
1360
1361         column_idx = column - table->class->columns;
1362         old = &row->old[column_idx];
1363
1364         error = NULL;
1365         if (apply_diff) {
1366             struct ovsdb_datum diff;
1367
1368             ovs_assert(!row_json);
1369             error = ovsdb_transient_datum_from_json(&diff, &column->type,
1370                                                     node->data);
1371             if (!error) {
1372                 error = ovsdb_datum_apply_diff(&datum, old, &diff,
1373                                                &column->type);
1374                 ovsdb_datum_destroy(&diff, &column->type);
1375             }
1376         } else {
1377             ovs_assert(!diff_json);
1378             error = ovsdb_datum_from_json(&datum, &column->type, node->data,
1379                                           NULL);
1380         }
1381
1382         if (!error) {
1383             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
1384                 ovsdb_datum_swap(old, &datum);
1385                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
1386                     changed = true;
1387                     row->change_seqno[change]
1388                         = row->table->change_seqno[change]
1389                         = row->table->idl->change_seqno + 1;
1390                     if (table->modes[column_idx] & OVSDB_IDL_TRACK) {
1391                         if (!ovs_list_is_empty(&row->track_node)) {
1392                             ovs_list_remove(&row->track_node);
1393                         }
1394                         ovs_list_push_back(&row->table->track_list,
1395                                        &row->track_node);
1396                         if (!row->updated) {
1397                             row->updated = bitmap_allocate(class->n_columns);
1398                         }
1399                         bitmap_set1(row->updated, column_idx);
1400                     }
1401                 }
1402             } else {
1403                 /* Didn't really change but the OVSDB monitor protocol always
1404                  * includes every value in a row. */
1405             }
1406
1407             ovsdb_datum_destroy(&datum, &column->type);
1408         } else {
1409             char *s = ovsdb_error_to_string(error);
1410             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
1411                          " in table %s: %s", column_name,
1412                          UUID_ARGS(&row->uuid), table->class->name, s);
1413             free(s);
1414             ovsdb_error_destroy(error);
1415         }
1416     }
1417     return changed;
1418 }
1419
1420 static bool
1421 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json,
1422                      enum ovsdb_idl_change change)
1423 {
1424     return ovsdb_idl_row_change__(row, row_json, NULL, change);
1425 }
1426
1427 static bool
1428 ovsdb_idl_row_apply_diff(struct ovsdb_idl_row *row,
1429                          const struct json *diff_json,
1430                          enum ovsdb_idl_change change)
1431 {
1432     return ovsdb_idl_row_change__(row, NULL, diff_json, change);
1433 }
1434
1435 /* When a row A refers to row B through a column with a "refTable" constraint,
1436  * but row B does not exist, row B is called an "orphan row".  Orphan rows
1437  * should not persist, because the database enforces referential integrity, but
1438  * they can appear transiently as changes from the database are received (the
1439  * database doesn't try to topologically sort them and circular references mean
1440  * it isn't always possible anyhow).
1441  *
1442  * This function returns true if 'row' is an orphan row, otherwise false.
1443  */
1444 static bool
1445 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
1446 {
1447     return !row->old && !row->new;
1448 }
1449
1450 /* Returns true if 'row' is conceptually part of the database as modified by
1451  * the current transaction (if any), false otherwise.
1452  *
1453  * This function will return true if 'row' is not an orphan (see the comment on
1454  * ovsdb_idl_row_is_orphan()) and:
1455  *
1456  *   - 'row' exists in the database and has not been deleted within the
1457  *     current transaction (if any).
1458  *
1459  *   - 'row' was inserted within the current transaction and has not been
1460  *     deleted.  (In the latter case you should not have passed 'row' in at
1461  *     all, because ovsdb_idl_txn_delete() freed it.)
1462  *
1463  * This function will return false if 'row' is an orphan or if 'row' was
1464  * deleted within the current transaction.
1465  */
1466 static bool
1467 ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
1468 {
1469     return row->new != NULL;
1470 }
1471
1472 static void
1473 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
1474 {
1475     const struct ovsdb_idl_table_class *class = row->table->class;
1476     size_t i;
1477
1478     for (i = 0; i < class->n_columns; i++) {
1479         const struct ovsdb_idl_column *c = &class->columns[i];
1480         (c->parse)(row, &row->old[i]);
1481     }
1482 }
1483
1484 static void
1485 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
1486 {
1487     const struct ovsdb_idl_table_class *class = row->table->class;
1488     size_t i;
1489
1490     for (i = 0; i < class->n_columns; i++) {
1491         const struct ovsdb_idl_column *c = &class->columns[i];
1492         (c->unparse)(row);
1493     }
1494 }
1495
1496 static void
1497 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
1498 {
1499     ovs_assert(row->old == row->new);
1500     if (!ovsdb_idl_row_is_orphan(row)) {
1501         const struct ovsdb_idl_table_class *class = row->table->class;
1502         size_t i;
1503
1504         for (i = 0; i < class->n_columns; i++) {
1505             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
1506         }
1507         free(row->old);
1508         row->old = row->new = NULL;
1509     }
1510 }
1511
1512 static void
1513 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
1514 {
1515     if (row->old != row->new) {
1516         if (row->new) {
1517             const struct ovsdb_idl_table_class *class = row->table->class;
1518             size_t i;
1519
1520             if (row->written) {
1521                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
1522                     ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
1523                 }
1524             }
1525             free(row->new);
1526             free(row->written);
1527             row->written = NULL;
1528         }
1529         row->new = row->old;
1530     }
1531 }
1532
1533 static void
1534 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
1535 {
1536     struct ovsdb_idl_arc *arc, *next;
1537
1538     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
1539      * that this causes to be unreferenced, if tracking is not enabled.
1540      * If tracking is enabled, orphaned nodes are removed from hmap but not
1541      * freed.
1542      */
1543     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
1544         ovs_list_remove(&arc->dst_node);
1545         if (destroy_dsts
1546             && ovsdb_idl_row_is_orphan(arc->dst)
1547             && ovs_list_is_empty(&arc->dst->dst_arcs)) {
1548             ovsdb_idl_row_destroy(arc->dst);
1549         }
1550         free(arc);
1551     }
1552     ovs_list_init(&row->src_arcs);
1553 }
1554
1555 /* Force nodes that reference 'row' to reparse. */
1556 static void
1557 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
1558 {
1559     struct ovsdb_idl_arc *arc, *next;
1560
1561     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
1562      * 'arc', so we need to use the "safe" variant of list traversal.  However,
1563      * calling an ovsdb_idl_column's 'parse' function will add an arc
1564      * equivalent to 'arc' to row->arcs.  That could be a problem for
1565      * traversal, but it adds it at the beginning of the list to prevent us
1566      * from stumbling upon it again.
1567      *
1568      * (If duplicate arcs were possible then we would need to make sure that
1569      * 'next' didn't also point into 'arc''s destination, but we forbid
1570      * duplicate arcs.) */
1571     LIST_FOR_EACH_SAFE (arc, next, dst_node, &row->dst_arcs) {
1572         struct ovsdb_idl_row *ref = arc->src;
1573
1574         ovsdb_idl_row_unparse(ref);
1575         ovsdb_idl_row_clear_arcs(ref, false);
1576         ovsdb_idl_row_parse(ref);
1577     }
1578 }
1579
1580 static struct ovsdb_idl_row *
1581 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
1582 {
1583     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
1584     class->row_init(row);
1585     ovs_list_init(&row->src_arcs);
1586     ovs_list_init(&row->dst_arcs);
1587     hmap_node_nullify(&row->txn_node);
1588     ovs_list_init(&row->track_node);
1589     return row;
1590 }
1591
1592 static struct ovsdb_idl_row *
1593 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
1594 {
1595     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
1596     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
1597     row->uuid = *uuid;
1598     row->table = table;
1599     return row;
1600 }
1601
1602 static void
1603 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
1604 {
1605     if (row) {
1606         ovsdb_idl_row_clear_old(row);
1607         hmap_remove(&row->table->rows, &row->hmap_node);
1608         if (ovsdb_idl_track_is_set(row->table)) {
1609             row->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1610                 = row->table->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1611                 = row->table->idl->change_seqno + 1;
1612         }
1613         if (!ovs_list_is_empty(&row->track_node)) {
1614             ovs_list_remove(&row->track_node);
1615         }
1616         ovs_list_push_back(&row->table->track_list, &row->track_node);
1617     }
1618 }
1619
1620 static void
1621 ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
1622 {
1623     size_t i;
1624
1625     for (i = 0; i < idl->class->n_tables; i++) {
1626         struct ovsdb_idl_table *table = &idl->tables[i];
1627
1628         if (!ovs_list_is_empty(&table->track_list)) {
1629             struct ovsdb_idl_row *row, *next;
1630
1631             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
1632                 if (!ovsdb_idl_track_is_set(row->table)) {
1633                     ovs_list_remove(&row->track_node);
1634                     free(row);
1635                 }
1636             }
1637         }
1638     }
1639 }
1640
1641 static void
1642 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
1643 {
1644     const struct ovsdb_idl_table_class *class = row->table->class;
1645     size_t i;
1646
1647     ovs_assert(!row->old && !row->new);
1648     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
1649     for (i = 0; i < class->n_columns; i++) {
1650         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
1651     }
1652     ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_INSERT);
1653     ovsdb_idl_row_parse(row);
1654
1655     ovsdb_idl_row_reparse_backrefs(row);
1656 }
1657
1658 static void
1659 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
1660 {
1661     ovsdb_idl_row_unparse(row);
1662     ovsdb_idl_row_clear_arcs(row, true);
1663     ovsdb_idl_row_clear_old(row);
1664     if (ovs_list_is_empty(&row->dst_arcs)) {
1665         ovsdb_idl_row_destroy(row);
1666     } else {
1667         ovsdb_idl_row_reparse_backrefs(row);
1668     }
1669 }
1670
1671 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1672  * otherwise. */
1673 static bool
1674 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
1675 {
1676     bool changed;
1677
1678     ovsdb_idl_row_unparse(row);
1679     ovsdb_idl_row_clear_arcs(row, true);
1680     changed = ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_MODIFY);
1681     ovsdb_idl_row_parse(row);
1682
1683     return changed;
1684 }
1685
1686 static bool
1687 ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *row,
1688                              const struct json *diff_json)
1689 {
1690     bool changed;
1691
1692     ovsdb_idl_row_unparse(row);
1693     ovsdb_idl_row_clear_arcs(row, true);
1694     changed = ovsdb_idl_row_apply_diff(row, diff_json,
1695                                        OVSDB_IDL_CHANGE_MODIFY);
1696     ovsdb_idl_row_parse(row);
1697
1698     return changed;
1699 }
1700
1701 static bool
1702 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
1703 {
1704     const struct ovsdb_idl_arc *arc;
1705
1706     /* No self-arcs. */
1707     if (src == dst) {
1708         return false;
1709     }
1710
1711     /* No duplicate arcs.
1712      *
1713      * We only need to test whether the first arc in dst->dst_arcs originates
1714      * at 'src', since we add all of the arcs from a given source in a clump
1715      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
1716      * added at the front of the dst_arcs list. */
1717     if (ovs_list_is_empty(&dst->dst_arcs)) {
1718         return true;
1719     }
1720     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
1721     return arc->src != src;
1722 }
1723
1724 static struct ovsdb_idl_table *
1725 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
1726                            const struct ovsdb_idl_table_class *table_class)
1727 {
1728     return &idl->tables[table_class - idl->class->tables];
1729 }
1730
1731 /* Called by ovsdb-idlc generated code. */
1732 struct ovsdb_idl_row *
1733 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
1734                       struct ovsdb_idl_table_class *dst_table_class,
1735                       const struct uuid *dst_uuid)
1736 {
1737     struct ovsdb_idl *idl = src->table->idl;
1738     struct ovsdb_idl_table *dst_table;
1739     struct ovsdb_idl_arc *arc;
1740     struct ovsdb_idl_row *dst;
1741
1742     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
1743     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
1744     if (idl->txn) {
1745         /* We're being called from ovsdb_idl_txn_write().  We must not update
1746          * any arcs, because the transaction will be backed out at commit or
1747          * abort time and we don't want our graph screwed up.
1748          *
1749          * Just return the destination row, if there is one and it has not been
1750          * deleted. */
1751         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
1752             return dst;
1753         }
1754         return NULL;
1755     } else {
1756         /* We're being called from some other context.  Update the graph. */
1757         if (!dst) {
1758             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
1759         }
1760
1761         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
1762         if (may_add_arc(src, dst)) {
1763             /* The arc *must* be added at the front of the dst_arcs list.  See
1764              * ovsdb_idl_row_reparse_backrefs() for details. */
1765             arc = xmalloc(sizeof *arc);
1766             ovs_list_push_front(&src->src_arcs, &arc->src_node);
1767             ovs_list_push_front(&dst->dst_arcs, &arc->dst_node);
1768             arc->src = src;
1769             arc->dst = dst;
1770         }
1771
1772         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
1773     }
1774 }
1775
1776 /* Searches 'tc''s table in 'idl' for a row with UUID 'uuid'.  Returns a
1777  * pointer to the row if there is one, otherwise a null pointer.  */
1778 const struct ovsdb_idl_row *
1779 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
1780                            const struct ovsdb_idl_table_class *tc,
1781                            const struct uuid *uuid)
1782 {
1783     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
1784 }
1785
1786 static struct ovsdb_idl_row *
1787 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
1788 {
1789     for (; node; node = hmap_next(&table->rows, node)) {
1790         struct ovsdb_idl_row *row;
1791
1792         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
1793         if (ovsdb_idl_row_exists(row)) {
1794             return row;
1795         }
1796     }
1797     return NULL;
1798 }
1799
1800 /* Returns a row in 'table_class''s table in 'idl', or a null pointer if that
1801  * table is empty.
1802  *
1803  * Database tables are internally maintained as hash tables, so adding or
1804  * removing rows while traversing the same table can cause some rows to be
1805  * visited twice or not at apply. */
1806 const struct ovsdb_idl_row *
1807 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
1808                     const struct ovsdb_idl_table_class *table_class)
1809 {
1810     struct ovsdb_idl_table *table
1811         = ovsdb_idl_table_from_class(idl, table_class);
1812     return next_real_row(table, hmap_first(&table->rows));
1813 }
1814
1815 /* Returns a row following 'row' within its table, or a null pointer if 'row'
1816  * is the last row in its table. */
1817 const struct ovsdb_idl_row *
1818 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
1819 {
1820     struct ovsdb_idl_table *table = row->table;
1821
1822     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
1823 }
1824
1825 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
1826  * transaction has changed 'column''s value, the modified value is returned.
1827  *
1828  * The caller must not modify or free the returned value.
1829  *
1830  * Various kinds of changes can invalidate the returned value: writing to the
1831  * same 'column' in 'row' (e.g. with ovsdb_idl_txn_write()), deleting 'row'
1832  * (e.g. with ovsdb_idl_txn_delete()), or completing an ongoing transaction
1833  * (e.g. with ovsdb_idl_txn_commit() or ovsdb_idl_txn_abort()).  If the
1834  * returned value is needed for a long time, it is best to make a copy of it
1835  * with ovsdb_datum_clone(). */
1836 const struct ovsdb_datum *
1837 ovsdb_idl_read(const struct ovsdb_idl_row *row,
1838                const struct ovsdb_idl_column *column)
1839 {
1840     const struct ovsdb_idl_table_class *class;
1841     size_t column_idx;
1842
1843     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
1844
1845     class = row->table->class;
1846     column_idx = column - class->columns;
1847
1848     ovs_assert(row->new != NULL);
1849     ovs_assert(column_idx < class->n_columns);
1850
1851     if (row->written && bitmap_is_set(row->written, column_idx)) {
1852         return &row->new[column_idx];
1853     } else if (row->old) {
1854         return &row->old[column_idx];
1855     } else {
1856         return ovsdb_datum_default(&column->type);
1857     }
1858 }
1859
1860 /* Same as ovsdb_idl_read(), except that it also asserts that 'column' has key
1861  * type 'key_type' and value type 'value_type'.  (Scalar and set types will
1862  * have a value type of OVSDB_TYPE_VOID.)
1863  *
1864  * This is useful in code that "knows" that a particular column has a given
1865  * type, so that it will abort if someone changes the column's type without
1866  * updating the code that uses it. */
1867 const struct ovsdb_datum *
1868 ovsdb_idl_get(const struct ovsdb_idl_row *row,
1869               const struct ovsdb_idl_column *column,
1870               enum ovsdb_atomic_type key_type OVS_UNUSED,
1871               enum ovsdb_atomic_type value_type OVS_UNUSED)
1872 {
1873     ovs_assert(column->type.key.type == key_type);
1874     ovs_assert(column->type.value.type == value_type);
1875
1876     return ovsdb_idl_read(row, column);
1877 }
1878
1879 /* Returns true if the field represented by 'column' in 'row' may be modified,
1880  * false if it is immutable.
1881  *
1882  * Normally, whether a field is mutable is controlled by its column's schema.
1883  * However, an immutable column can be set to any initial value at the time of
1884  * insertion, so if 'row' is a new row (one that is being added as part of the
1885  * current transaction, supposing that a transaction is in progress) then even
1886  * its "immutable" fields are actually mutable. */
1887 bool
1888 ovsdb_idl_is_mutable(const struct ovsdb_idl_row *row,
1889                      const struct ovsdb_idl_column *column)
1890 {
1891     return column->mutable || (row->new && !row->old);
1892 }
1893
1894 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
1895  * to all-zero-bits by some other entity.  If 'row' was set up some other way
1896  * then the return value is indeterminate. */
1897 bool
1898 ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
1899 {
1900     return row->table == NULL;
1901 }
1902 \f
1903 /* Transactions. */
1904
1905 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1906                                    enum ovsdb_idl_txn_status);
1907
1908 /* Returns a string representation of 'status'.  The caller must not modify or
1909  * free the returned string.
1910  *
1911  * The return value is probably useful only for debug log messages and unit
1912  * tests. */
1913 const char *
1914 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
1915 {
1916     switch (status) {
1917     case TXN_UNCOMMITTED:
1918         return "uncommitted";
1919     case TXN_UNCHANGED:
1920         return "unchanged";
1921     case TXN_INCOMPLETE:
1922         return "incomplete";
1923     case TXN_ABORTED:
1924         return "aborted";
1925     case TXN_SUCCESS:
1926         return "success";
1927     case TXN_TRY_AGAIN:
1928         return "try again";
1929     case TXN_NOT_LOCKED:
1930         return "not locked";
1931     case TXN_ERROR:
1932         return "error";
1933     }
1934     return "<unknown>";
1935 }
1936
1937 /* Starts a new transaction on 'idl'.  A given ovsdb_idl may only have a single
1938  * active transaction at a time.  See the large comment in ovsdb-idl.h for
1939  * general information on transactions. */
1940 struct ovsdb_idl_txn *
1941 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
1942 {
1943     struct ovsdb_idl_txn *txn;
1944
1945     ovs_assert(!idl->txn);
1946     idl->txn = txn = xmalloc(sizeof *txn);
1947     txn->request_id = NULL;
1948     txn->idl = idl;
1949     hmap_init(&txn->txn_rows);
1950     txn->status = TXN_UNCOMMITTED;
1951     txn->error = NULL;
1952     txn->dry_run = false;
1953     ds_init(&txn->comment);
1954
1955     txn->inc_table = NULL;
1956     txn->inc_column = NULL;
1957
1958     hmap_init(&txn->inserted_rows);
1959
1960     return txn;
1961 }
1962
1963 /* Appends 's', which is treated as a printf()-type format string, to the
1964  * comments that will be passed to the OVSDB server when 'txn' is committed.
1965  * (The comment will be committed to the OVSDB log, which "ovsdb-tool
1966  * show-log" can print in a relatively human-readable form.) */
1967 void
1968 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s, ...)
1969 {
1970     va_list args;
1971
1972     if (txn->comment.length) {
1973         ds_put_char(&txn->comment, '\n');
1974     }
1975
1976     va_start(args, s);
1977     ds_put_format_valist(&txn->comment, s, args);
1978     va_end(args);
1979 }
1980
1981 /* Marks 'txn' as a transaction that will not actually modify the database.  In
1982  * almost every way, the transaction is treated like other transactions.  It
1983  * must be committed or aborted like other transactions, it will be sent to the
1984  * database server like other transactions, and so on.  The only difference is
1985  * that the operations sent to the database server will include, as the last
1986  * step, an "abort" operation, so that any changes made by the transaction will
1987  * not actually take effect. */
1988 void
1989 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
1990 {
1991     txn->dry_run = true;
1992 }
1993
1994 /* Causes 'txn', when committed, to increment the value of 'column' within
1995  * 'row' by 1.  'column' must have an integer type.  After 'txn' commits
1996  * successfully, the client may retrieve the final (incremented) value of
1997  * 'column' with ovsdb_idl_txn_get_increment_new_value().
1998  *
1999  * The client could accomplish something similar with ovsdb_idl_read(),
2000  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
2001  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
2002  * will never (by itself) fail because of a verify error.
2003  *
2004  * The intended use is for incrementing the "next_cfg" column in the
2005  * Open_vSwitch table. */
2006 void
2007 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
2008                         const struct ovsdb_idl_row *row,
2009                         const struct ovsdb_idl_column *column)
2010 {
2011     ovs_assert(!txn->inc_table);
2012     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
2013     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
2014
2015     txn->inc_table = row->table->class->name;
2016     txn->inc_column = column->name;
2017     txn->inc_row = row->uuid;
2018 }
2019
2020 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
2021  * has been called for 'txn' but the commit is still incomplete (that is, the
2022  * last call returned TXN_INCOMPLETE) then the transaction may or may not still
2023  * end up committing at the database server, but the client will not be able to
2024  * get any further status information back. */
2025 void
2026 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
2027 {
2028     struct ovsdb_idl_txn_insert *insert, *next;
2029
2030     json_destroy(txn->request_id);
2031     if (txn->status == TXN_INCOMPLETE) {
2032         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2033     }
2034     ovsdb_idl_txn_abort(txn);
2035     ds_destroy(&txn->comment);
2036     free(txn->error);
2037     HMAP_FOR_EACH_SAFE (insert, next, hmap_node, &txn->inserted_rows) {
2038         free(insert);
2039     }
2040     hmap_destroy(&txn->inserted_rows);
2041     free(txn);
2042 }
2043
2044 /* Causes poll_block() to wake up if 'txn' has completed committing. */
2045 void
2046 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
2047 {
2048     if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
2049         poll_immediate_wake();
2050     }
2051 }
2052
2053 static struct json *
2054 where_uuid_equals(const struct uuid *uuid)
2055 {
2056     return
2057         json_array_create_1(
2058             json_array_create_3(
2059                 json_string_create("_uuid"),
2060                 json_string_create("=="),
2061                 json_array_create_2(
2062                     json_string_create("uuid"),
2063                     json_string_create_nocopy(
2064                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
2065 }
2066
2067 static char *
2068 uuid_name_from_uuid(const struct uuid *uuid)
2069 {
2070     char *name;
2071     char *p;
2072
2073     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
2074     for (p = name; *p != '\0'; p++) {
2075         if (*p == '-') {
2076             *p = '_';
2077         }
2078     }
2079
2080     return name;
2081 }
2082
2083 static const struct ovsdb_idl_row *
2084 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
2085 {
2086     const struct ovsdb_idl_row *row;
2087
2088     HMAP_FOR_EACH_WITH_HASH (row, txn_node, uuid_hash(uuid), &txn->txn_rows) {
2089         if (uuid_equals(&row->uuid, uuid)) {
2090             return row;
2091         }
2092     }
2093     return NULL;
2094 }
2095
2096 /* XXX there must be a cleaner way to do this */
2097 static struct json *
2098 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
2099 {
2100     if (json->type == JSON_ARRAY) {
2101         struct uuid uuid;
2102         size_t i;
2103
2104         if (json->u.array.n == 2
2105             && json->u.array.elems[0]->type == JSON_STRING
2106             && json->u.array.elems[1]->type == JSON_STRING
2107             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
2108             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
2109             const struct ovsdb_idl_row *row;
2110
2111             row = ovsdb_idl_txn_get_row(txn, &uuid);
2112             if (row && !row->old && row->new) {
2113                 json_destroy(json);
2114
2115                 return json_array_create_2(
2116                     json_string_create("named-uuid"),
2117                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
2118             }
2119         }
2120
2121         for (i = 0; i < json->u.array.n; i++) {
2122             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
2123                                                       txn);
2124         }
2125     } else if (json->type == JSON_OBJECT) {
2126         struct shash_node *node;
2127
2128         SHASH_FOR_EACH (node, json_object(json)) {
2129             node->data = substitute_uuids(node->data, txn);
2130         }
2131     }
2132     return json;
2133 }
2134
2135 static void
2136 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
2137 {
2138     struct ovsdb_idl_row *row, *next;
2139
2140     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
2141      * ovsdb_idl_column's 'parse' function, which will call
2142      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
2143      * transaction and fail to update the graph.  */
2144     txn->idl->txn = NULL;
2145
2146     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
2147         if (row->old) {
2148             if (row->written) {
2149                 ovsdb_idl_row_unparse(row);
2150                 ovsdb_idl_row_clear_arcs(row, false);
2151                 ovsdb_idl_row_parse(row);
2152             }
2153         } else {
2154             ovsdb_idl_row_unparse(row);
2155         }
2156         ovsdb_idl_row_clear_new(row);
2157
2158         free(row->prereqs);
2159         row->prereqs = NULL;
2160
2161         free(row->written);
2162         row->written = NULL;
2163
2164         hmap_remove(&txn->txn_rows, &row->txn_node);
2165         hmap_node_nullify(&row->txn_node);
2166         if (!row->old) {
2167             hmap_remove(&row->table->rows, &row->hmap_node);
2168             free(row);
2169         }
2170     }
2171     hmap_destroy(&txn->txn_rows);
2172     hmap_init(&txn->txn_rows);
2173 }
2174
2175 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
2176  * of the following TXN_* constants:
2177  *
2178  *   TXN_INCOMPLETE:
2179  *
2180  *       The transaction is in progress, but not yet complete.  The caller
2181  *       should call again later, after calling ovsdb_idl_run() to let the IDL
2182  *       do OVSDB protocol processing.
2183  *
2184  *   TXN_UNCHANGED:
2185  *
2186  *       The transaction is complete.  (It didn't actually change the database,
2187  *       so the IDL didn't send any request to the database server.)
2188  *
2189  *   TXN_ABORTED:
2190  *
2191  *       The caller previously called ovsdb_idl_txn_abort().
2192  *
2193  *   TXN_SUCCESS:
2194  *
2195  *       The transaction was successful.  The update made by the transaction
2196  *       (and possibly other changes made by other database clients) should
2197  *       already be visible in the IDL.
2198  *
2199  *   TXN_TRY_AGAIN:
2200  *
2201  *       The transaction failed for some transient reason, e.g. because a
2202  *       "verify" operation reported an inconsistency or due to a network
2203  *       problem.  The caller should wait for a change to the database, then
2204  *       compose a new transaction, and commit the new transaction.
2205  *
2206  *       Use the return value of ovsdb_idl_get_seqno() to wait for a change in
2207  *       the database.  It is important to use its return value *before* the
2208  *       initial call to ovsdb_idl_txn_commit() as the baseline for this
2209  *       purpose, because the change that one should wait for can happen after
2210  *       the initial call but before the call that returns TXN_TRY_AGAIN, and
2211  *       using some other baseline value in that situation could cause an
2212  *       indefinite wait if the database rarely changes.
2213  *
2214  *   TXN_NOT_LOCKED:
2215  *
2216  *       The transaction failed because the IDL has been configured to require
2217  *       a database lock (with ovsdb_idl_set_lock()) but didn't get it yet or
2218  *       has already lost it.
2219  *
2220  * Committing a transaction rolls back all of the changes that it made to the
2221  * IDL's copy of the database.  If the transaction commits successfully, then
2222  * the database server will send an update and, thus, the IDL will be updated
2223  * with the committed changes. */
2224 enum ovsdb_idl_txn_status
2225 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
2226 {
2227     struct ovsdb_idl_row *row;
2228     struct json *operations;
2229     bool any_updates;
2230
2231     if (txn != txn->idl->txn) {
2232         goto coverage_out;
2233     }
2234
2235     /* If we need a lock but don't have it, give up quickly. */
2236     if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
2237         txn->status = TXN_NOT_LOCKED;
2238         goto disassemble_out;
2239     }
2240
2241     operations = json_array_create_1(
2242         json_string_create(txn->idl->class->database));
2243
2244     /* Assert that we have the required lock (avoiding a race). */
2245     if (txn->idl->lock_name) {
2246         struct json *op = json_object_create();
2247         json_array_add(operations, op);
2248         json_object_put_string(op, "op", "assert");
2249         json_object_put_string(op, "lock", txn->idl->lock_name);
2250     }
2251
2252     /* Add prerequisites and declarations of new rows. */
2253     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2254         /* XXX check that deleted rows exist even if no prereqs? */
2255         if (row->prereqs) {
2256             const struct ovsdb_idl_table_class *class = row->table->class;
2257             size_t n_columns = class->n_columns;
2258             struct json *op, *columns, *row_json;
2259             size_t idx;
2260
2261             op = json_object_create();
2262             json_array_add(operations, op);
2263             json_object_put_string(op, "op", "wait");
2264             json_object_put_string(op, "table", class->name);
2265             json_object_put(op, "timeout", json_integer_create(0));
2266             json_object_put(op, "where", where_uuid_equals(&row->uuid));
2267             json_object_put_string(op, "until", "==");
2268             columns = json_array_create_empty();
2269             json_object_put(op, "columns", columns);
2270             row_json = json_object_create();
2271             json_object_put(op, "rows", json_array_create_1(row_json));
2272
2273             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
2274                 const struct ovsdb_idl_column *column = &class->columns[idx];
2275                 json_array_add(columns, json_string_create(column->name));
2276                 json_object_put(row_json, column->name,
2277                                 ovsdb_datum_to_json(&row->old[idx],
2278                                                     &column->type));
2279             }
2280         }
2281     }
2282
2283     /* Add updates. */
2284     any_updates = false;
2285     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2286         const struct ovsdb_idl_table_class *class = row->table->class;
2287
2288         if (!row->new) {
2289             if (class->is_root) {
2290                 struct json *op = json_object_create();
2291                 json_object_put_string(op, "op", "delete");
2292                 json_object_put_string(op, "table", class->name);
2293                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2294                 json_array_add(operations, op);
2295                 any_updates = true;
2296             } else {
2297                 /* Let ovsdb-server decide whether to really delete it. */
2298             }
2299         } else if (row->old != row->new) {
2300             struct json *row_json;
2301             struct json *op;
2302             size_t idx;
2303
2304             op = json_object_create();
2305             json_object_put_string(op, "op", row->old ? "update" : "insert");
2306             json_object_put_string(op, "table", class->name);
2307             if (row->old) {
2308                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2309             } else {
2310                 struct ovsdb_idl_txn_insert *insert;
2311
2312                 any_updates = true;
2313
2314                 json_object_put(op, "uuid-name",
2315                                 json_string_create_nocopy(
2316                                     uuid_name_from_uuid(&row->uuid)));
2317
2318                 insert = xmalloc(sizeof *insert);
2319                 insert->dummy = row->uuid;
2320                 insert->op_index = operations->u.array.n - 1;
2321                 uuid_zero(&insert->real);
2322                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
2323                             uuid_hash(&insert->dummy));
2324             }
2325             row_json = json_object_create();
2326             json_object_put(op, "row", row_json);
2327
2328             if (row->written) {
2329                 BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
2330                     const struct ovsdb_idl_column *column =
2331                                                         &class->columns[idx];
2332
2333                     if (row->old
2334                         || !ovsdb_datum_is_default(&row->new[idx],
2335                                                   &column->type)) {
2336                         json_object_put(row_json, column->name,
2337                                         substitute_uuids(
2338                                             ovsdb_datum_to_json(&row->new[idx],
2339                                                                 &column->type),
2340                                             txn));
2341
2342                         /* If anything really changed, consider it an update.
2343                          * We can't suppress not-really-changed values earlier
2344                          * or transactions would become nonatomic (see the big
2345                          * comment inside ovsdb_idl_txn_write()). */
2346                         if (!any_updates && row->old &&
2347                             !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
2348                                                 &column->type)) {
2349                             any_updates = true;
2350                         }
2351                     }
2352                 }
2353             }
2354
2355             if (!row->old || !shash_is_empty(json_object(row_json))) {
2356                 json_array_add(operations, op);
2357             } else {
2358                 json_destroy(op);
2359             }
2360         }
2361     }
2362
2363     /* Add increment. */
2364     if (txn->inc_table && any_updates) {
2365         struct json *op;
2366
2367         txn->inc_index = operations->u.array.n - 1;
2368
2369         op = json_object_create();
2370         json_object_put_string(op, "op", "mutate");
2371         json_object_put_string(op, "table", txn->inc_table);
2372         json_object_put(op, "where",
2373                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2374                                          txn));
2375         json_object_put(op, "mutations",
2376                         json_array_create_1(
2377                             json_array_create_3(
2378                                 json_string_create(txn->inc_column),
2379                                 json_string_create("+="),
2380                                 json_integer_create(1))));
2381         json_array_add(operations, op);
2382
2383         op = json_object_create();
2384         json_object_put_string(op, "op", "select");
2385         json_object_put_string(op, "table", txn->inc_table);
2386         json_object_put(op, "where",
2387                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2388                                          txn));
2389         json_object_put(op, "columns",
2390                         json_array_create_1(json_string_create(
2391                                                 txn->inc_column)));
2392         json_array_add(operations, op);
2393     }
2394
2395     if (txn->comment.length) {
2396         struct json *op = json_object_create();
2397         json_object_put_string(op, "op", "comment");
2398         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
2399         json_array_add(operations, op);
2400     }
2401
2402     if (txn->dry_run) {
2403         struct json *op = json_object_create();
2404         json_object_put_string(op, "op", "abort");
2405         json_array_add(operations, op);
2406     }
2407
2408     if (!any_updates) {
2409         txn->status = TXN_UNCHANGED;
2410         json_destroy(operations);
2411     } else if (!jsonrpc_session_send(
2412                    txn->idl->session,
2413                    jsonrpc_create_request(
2414                        "transact", operations, &txn->request_id))) {
2415         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
2416                     json_hash(txn->request_id, 0));
2417         txn->status = TXN_INCOMPLETE;
2418     } else {
2419         txn->status = TXN_TRY_AGAIN;
2420     }
2421
2422 disassemble_out:
2423     ovsdb_idl_txn_disassemble(txn);
2424 coverage_out:
2425     switch (txn->status) {
2426     case TXN_UNCOMMITTED:   COVERAGE_INC(txn_uncommitted);    break;
2427     case TXN_UNCHANGED:     COVERAGE_INC(txn_unchanged);      break;
2428     case TXN_INCOMPLETE:    COVERAGE_INC(txn_incomplete);     break;
2429     case TXN_ABORTED:       COVERAGE_INC(txn_aborted);        break;
2430     case TXN_SUCCESS:       COVERAGE_INC(txn_success);        break;
2431     case TXN_TRY_AGAIN:     COVERAGE_INC(txn_try_again);      break;
2432     case TXN_NOT_LOCKED:    COVERAGE_INC(txn_not_locked);     break;
2433     case TXN_ERROR:         COVERAGE_INC(txn_error);          break;
2434     }
2435
2436     return txn->status;
2437 }
2438
2439 /* Attempts to commit 'txn', blocking until the commit either succeeds or
2440  * fails.  Returns the final commit status, which may be any TXN_* value other
2441  * than TXN_INCOMPLETE.
2442  *
2443  * This function calls ovsdb_idl_run() on 'txn''s IDL, so it may cause the
2444  * return value of ovsdb_idl_get_seqno() to change. */
2445 enum ovsdb_idl_txn_status
2446 ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
2447 {
2448     enum ovsdb_idl_txn_status status;
2449
2450     fatal_signal_run();
2451     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
2452         ovsdb_idl_run(txn->idl);
2453         ovsdb_idl_wait(txn->idl);
2454         ovsdb_idl_txn_wait(txn);
2455         poll_block();
2456     }
2457     return status;
2458 }
2459
2460 /* Returns the final (incremented) value of the column in 'txn' that was set to
2461  * be incremented by ovsdb_idl_txn_increment().  'txn' must have committed
2462  * successfully. */
2463 int64_t
2464 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
2465 {
2466     ovs_assert(txn->status == TXN_SUCCESS);
2467     return txn->inc_new_value;
2468 }
2469
2470 /* Aborts 'txn' without sending it to the database server.  This is effective
2471  * only if ovsdb_idl_txn_commit() has not yet been called for 'txn'.
2472  * Otherwise, it has no effect.
2473  *
2474  * Aborting a transaction doesn't free its memory.  Use
2475  * ovsdb_idl_txn_destroy() to do that. */
2476 void
2477 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
2478 {
2479     ovsdb_idl_txn_disassemble(txn);
2480     if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
2481         txn->status = TXN_ABORTED;
2482     }
2483 }
2484
2485 /* Returns a string that reports the error status for 'txn'.  The caller must
2486  * not modify or free the returned string.  A call to ovsdb_idl_txn_destroy()
2487  * for 'txn' may free the returned string.
2488  *
2489  * The return value is ordinarily one of the strings that
2490  * ovsdb_idl_txn_status_to_string() would return, but if the transaction failed
2491  * due to an error reported by the database server, the return value is that
2492  * error. */
2493 const char *
2494 ovsdb_idl_txn_get_error(const struct ovsdb_idl_txn *txn)
2495 {
2496     if (txn->status != TXN_ERROR) {
2497         return ovsdb_idl_txn_status_to_string(txn->status);
2498     } else if (txn->error) {
2499         return txn->error;
2500     } else {
2501         return "no error details available";
2502     }
2503 }
2504
2505 static void
2506 ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn,
2507                              const struct json *json)
2508 {
2509     if (txn->error == NULL) {
2510         txn->error = json_to_string(json, JSSF_SORT);
2511     }
2512 }
2513
2514 /* For transaction 'txn' that completed successfully, finds and returns the
2515  * permanent UUID that the database assigned to a newly inserted row, given the
2516  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
2517  *
2518  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
2519  * if it was assigned by that function and then deleted by
2520  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
2521  * and then deleted within a single transaction are never sent to the database
2522  * server, so it never assigns them a permanent UUID.) */
2523 const struct uuid *
2524 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
2525                               const struct uuid *uuid)
2526 {
2527     const struct ovsdb_idl_txn_insert *insert;
2528
2529     ovs_assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
2530     HMAP_FOR_EACH_IN_BUCKET (insert, hmap_node,
2531                              uuid_hash(uuid), &txn->inserted_rows) {
2532         if (uuid_equals(uuid, &insert->dummy)) {
2533             return &insert->real;
2534         }
2535     }
2536     return NULL;
2537 }
2538
2539 static void
2540 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
2541                        enum ovsdb_idl_txn_status status)
2542 {
2543     txn->status = status;
2544     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2545 }
2546
2547 /* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
2548  * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
2549  * ovs-vswitchd).
2550  *
2551  * 'datum' must have the correct type for its column.  The IDL does not check
2552  * that it meets schema constraints, but ovsdb-server will do so at commit time
2553  * so it had better be correct.
2554  *
2555  * A transaction must be in progress.  Replication of 'column' must not have
2556  * been disabled (by calling ovsdb_idl_omit()).
2557  *
2558  * Usually this function is used indirectly through one of the "set" functions
2559  * generated by ovsdb-idlc.
2560  *
2561  * Takes ownership of what 'datum' points to (and in some cases destroys that
2562  * data before returning) but makes a copy of 'datum' itself.  (Commonly
2563  * 'datum' is on the caller's stack.) */
2564 static void
2565 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
2566                       const struct ovsdb_idl_column *column,
2567                       struct ovsdb_datum *datum, bool owns_datum)
2568 {
2569     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2570     const struct ovsdb_idl_table_class *class;
2571     size_t column_idx;
2572     bool write_only;
2573
2574     if (ovsdb_idl_row_is_synthetic(row)) {
2575         goto discard_datum;
2576     }
2577
2578     class = row->table->class;
2579     column_idx = column - class->columns;
2580     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
2581
2582     ovs_assert(row->new != NULL);
2583     ovs_assert(column_idx < class->n_columns);
2584     ovs_assert(row->old == NULL ||
2585                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2586
2587     if (row->table->idl->verify_write_only && !write_only) {
2588         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
2589                  " explicitly configured not to.", class->name, column->name);
2590         goto discard_datum;
2591     }
2592
2593     /* If this is a write-only column and the datum being written is the same
2594      * as the one already there, just skip the update entirely.  This is worth
2595      * optimizing because we have a lot of columns that get periodically
2596      * refreshed into the database but don't actually change that often.
2597      *
2598      * We don't do this for read/write columns because that would break
2599      * atomicity of transactions--some other client might have written a
2600      * different value in that column since we read it.  (But if a whole
2601      * transaction only does writes of existing values, without making any real
2602      * changes, we will drop the whole transaction later in
2603      * ovsdb_idl_txn_commit().) */
2604     if (write_only && ovsdb_datum_equals(ovsdb_idl_read(row, column),
2605                                          datum, &column->type)) {
2606         goto discard_datum;
2607     }
2608
2609     if (hmap_node_is_null(&row->txn_node)) {
2610         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2611                     uuid_hash(&row->uuid));
2612     }
2613     if (row->old == row->new) {
2614         row->new = xmalloc(class->n_columns * sizeof *row->new);
2615     }
2616     if (!row->written) {
2617         row->written = bitmap_allocate(class->n_columns);
2618     }
2619     if (bitmap_is_set(row->written, column_idx)) {
2620         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
2621     } else {
2622         bitmap_set1(row->written, column_idx);
2623     }
2624     if (owns_datum) {
2625         row->new[column_idx] = *datum;
2626     } else {
2627         ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
2628     }
2629     (column->unparse)(row);
2630     (column->parse)(row, &row->new[column_idx]);
2631     return;
2632
2633 discard_datum:
2634     if (owns_datum) {
2635         ovsdb_datum_destroy(datum, &column->type);
2636     }
2637 }
2638
2639 void
2640 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
2641                     const struct ovsdb_idl_column *column,
2642                     struct ovsdb_datum *datum)
2643 {
2644     ovsdb_idl_txn_write__(row, column, datum, true);
2645 }
2646
2647 void
2648 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
2649                           const struct ovsdb_idl_column *column,
2650                           const struct ovsdb_datum *datum)
2651 {
2652     ovsdb_idl_txn_write__(row, column,
2653                           CONST_CAST(struct ovsdb_datum *, datum), false);
2654 }
2655
2656 /* Causes the original contents of 'column' in 'row_' to be verified as a
2657  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
2658  * changed (or if 'row_' was deleted) between the time that the IDL originally
2659  * read its contents and the time that the transaction commits, then the
2660  * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
2661  * TXN_AGAIN_NOW (depending on whether the database change has already been
2662  * received).
2663  *
2664  * The intention is that, to ensure that no transaction commits based on dirty
2665  * reads, an application should call ovsdb_idl_txn_verify() on each data item
2666  * read as part of a read-modify-write operation.
2667  *
2668  * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
2669  * value of 'column' is already known:
2670  *
2671  *   - If 'row_' is a row created by the current transaction (returned by
2672  *     ovsdb_idl_txn_insert()).
2673  *
2674  *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
2675  *     within the current transaction.
2676  *
2677  * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
2678  * ovsdb_idl_txn_write() for a given read-modify-write.
2679  *
2680  * A transaction must be in progress.
2681  *
2682  * Usually this function is used indirectly through one of the "verify"
2683  * functions generated by ovsdb-idlc. */
2684 void
2685 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
2686                      const struct ovsdb_idl_column *column)
2687 {
2688     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2689     const struct ovsdb_idl_table_class *class;
2690     size_t column_idx;
2691
2692     if (ovsdb_idl_row_is_synthetic(row)) {
2693         return;
2694     }
2695
2696     class = row->table->class;
2697     column_idx = column - class->columns;
2698
2699     ovs_assert(row->new != NULL);
2700     ovs_assert(row->old == NULL ||
2701                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2702     if (!row->old
2703         || (row->written && bitmap_is_set(row->written, column_idx))) {
2704         return;
2705     }
2706
2707     if (hmap_node_is_null(&row->txn_node)) {
2708         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2709                     uuid_hash(&row->uuid));
2710     }
2711     if (!row->prereqs) {
2712         row->prereqs = bitmap_allocate(class->n_columns);
2713     }
2714     bitmap_set1(row->prereqs, column_idx);
2715 }
2716
2717 /* Deletes 'row_' from its table.  May free 'row_', so it must not be
2718  * accessed afterward.
2719  *
2720  * A transaction must be in progress.
2721  *
2722  * Usually this function is used indirectly through one of the "delete"
2723  * functions generated by ovsdb-idlc. */
2724 void
2725 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
2726 {
2727     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2728
2729     if (ovsdb_idl_row_is_synthetic(row)) {
2730         return;
2731     }
2732
2733     ovs_assert(row->new != NULL);
2734     if (!row->old) {
2735         ovsdb_idl_row_unparse(row);
2736         ovsdb_idl_row_clear_new(row);
2737         ovs_assert(!row->prereqs);
2738         hmap_remove(&row->table->rows, &row->hmap_node);
2739         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
2740         free(row);
2741         return;
2742     }
2743     if (hmap_node_is_null(&row->txn_node)) {
2744         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2745                     uuid_hash(&row->uuid));
2746     }
2747     ovsdb_idl_row_clear_new(row);
2748     row->new = NULL;
2749 }
2750
2751 /* Inserts and returns a new row in the table with the specified 'class' in the
2752  * database with open transaction 'txn'.
2753  *
2754  * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
2755  * randomly generated; otherwise 'uuid' should specify a randomly generated
2756  * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
2757  * 'txn' is committed, but the IDL will replace any uses of the provisional
2758  * UUID in the data to be to be committed by the UUID assigned by
2759  * ovsdb-server.
2760  *
2761  * Usually this function is used indirectly through one of the "insert"
2762  * functions generated by ovsdb-idlc. */
2763 const struct ovsdb_idl_row *
2764 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
2765                      const struct ovsdb_idl_table_class *class,
2766                      const struct uuid *uuid)
2767 {
2768     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
2769
2770     if (uuid) {
2771         ovs_assert(!ovsdb_idl_txn_get_row(txn, uuid));
2772         row->uuid = *uuid;
2773     } else {
2774         uuid_generate(&row->uuid);
2775     }
2776
2777     row->table = ovsdb_idl_table_from_class(txn->idl, class);
2778     row->new = xmalloc(class->n_columns * sizeof *row->new);
2779     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
2780     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
2781     return row;
2782 }
2783
2784 static void
2785 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
2786 {
2787     struct ovsdb_idl_txn *txn;
2788
2789     HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
2790         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
2791     }
2792 }
2793
2794 static struct ovsdb_idl_txn *
2795 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
2796 {
2797     struct ovsdb_idl_txn *txn;
2798
2799     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
2800                              json_hash(id, 0), &idl->outstanding_txns) {
2801         if (json_equal(id, txn->request_id)) {
2802             return txn;
2803         }
2804     }
2805     return NULL;
2806 }
2807
2808 static bool
2809 check_json_type(const struct json *json, enum json_type type, const char *name)
2810 {
2811     if (!json) {
2812         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
2813         return false;
2814     } else if (json->type != type) {
2815         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
2816                      name, json_type_to_string(json->type),
2817                      json_type_to_string(type));
2818         return false;
2819     } else {
2820         return true;
2821     }
2822 }
2823
2824 static bool
2825 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
2826                                 const struct json_array *results)
2827 {
2828     struct json *count, *rows, *row, *column;
2829     struct shash *mutate, *select;
2830
2831     if (txn->inc_index + 2 > results->n) {
2832         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2833                      "for increment (has %"PRIuSIZE", needs %u)",
2834                      results->n, txn->inc_index + 2);
2835         return false;
2836     }
2837
2838     /* We know that this is a JSON object because the loop in
2839      * ovsdb_idl_txn_process_reply() checked. */
2840     mutate = json_object(results->elems[txn->inc_index]);
2841     count = shash_find_data(mutate, "count");
2842     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
2843         return false;
2844     }
2845     if (count->u.integer != 1) {
2846         VLOG_WARN_RL(&syntax_rl,
2847                      "\"mutate\" reply \"count\" is %lld instead of 1",
2848                      count->u.integer);
2849         return false;
2850     }
2851
2852     select = json_object(results->elems[txn->inc_index + 1]);
2853     rows = shash_find_data(select, "rows");
2854     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
2855         return false;
2856     }
2857     if (rows->u.array.n != 1) {
2858         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %"PRIuSIZE" elements "
2859                      "instead of 1",
2860                      rows->u.array.n);
2861         return false;
2862     }
2863     row = rows->u.array.elems[0];
2864     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
2865         return false;
2866     }
2867     column = shash_find_data(json_object(row), txn->inc_column);
2868     if (!check_json_type(column, JSON_INTEGER,
2869                          "\"select\" reply inc column")) {
2870         return false;
2871     }
2872     txn->inc_new_value = column->u.integer;
2873     return true;
2874 }
2875
2876 static bool
2877 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
2878                                    const struct json_array *results)
2879 {
2880     static const struct ovsdb_base_type uuid_type = OVSDB_BASE_UUID_INIT;
2881     struct ovsdb_error *error;
2882     struct json *json_uuid;
2883     union ovsdb_atom uuid;
2884     struct shash *reply;
2885
2886     if (insert->op_index >= results->n) {
2887         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2888                      "for insert (has %"PRIuSIZE", needs %u)",
2889                      results->n, insert->op_index);
2890         return false;
2891     }
2892
2893     /* We know that this is a JSON object because the loop in
2894      * ovsdb_idl_txn_process_reply() checked. */
2895     reply = json_object(results->elems[insert->op_index]);
2896     json_uuid = shash_find_data(reply, "uuid");
2897     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
2898         return false;
2899     }
2900
2901     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
2902     if (error) {
2903         char *s = ovsdb_error_to_string(error);
2904         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
2905                      "UUID: %s", s);
2906         free(s);
2907         ovsdb_error_destroy(error);
2908         return false;
2909     }
2910
2911     insert->real = uuid.uuid;
2912
2913     return true;
2914 }
2915
2916 static bool
2917 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
2918                             const struct jsonrpc_msg *msg)
2919 {
2920     struct ovsdb_idl_txn *txn;
2921     enum ovsdb_idl_txn_status status;
2922
2923     txn = ovsdb_idl_txn_find(idl, msg->id);
2924     if (!txn) {
2925         return false;
2926     }
2927
2928     if (msg->type == JSONRPC_ERROR) {
2929         status = TXN_ERROR;
2930     } else if (msg->result->type != JSON_ARRAY) {
2931         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
2932         status = TXN_ERROR;
2933     } else {
2934         struct json_array *ops = &msg->result->u.array;
2935         int hard_errors = 0;
2936         int soft_errors = 0;
2937         int lock_errors = 0;
2938         size_t i;
2939
2940         for (i = 0; i < ops->n; i++) {
2941             struct json *op = ops->elems[i];
2942
2943             if (op->type == JSON_NULL) {
2944                 /* This isn't an error in itself but indicates that some prior
2945                  * operation failed, so make sure that we know about it. */
2946                 soft_errors++;
2947             } else if (op->type == JSON_OBJECT) {
2948                 struct json *error;
2949
2950                 error = shash_find_data(json_object(op), "error");
2951                 if (error) {
2952                     if (error->type == JSON_STRING) {
2953                         if (!strcmp(error->u.string, "timed out")) {
2954                             soft_errors++;
2955                         } else if (!strcmp(error->u.string, "not owner")) {
2956                             lock_errors++;
2957                         } else if (strcmp(error->u.string, "aborted")) {
2958                             hard_errors++;
2959                             ovsdb_idl_txn_set_error_json(txn, op);
2960                         }
2961                     } else {
2962                         hard_errors++;
2963                         ovsdb_idl_txn_set_error_json(txn, op);
2964                         VLOG_WARN_RL(&syntax_rl,
2965                                      "\"error\" in reply is not JSON string");
2966                     }
2967                 }
2968             } else {
2969                 hard_errors++;
2970                 ovsdb_idl_txn_set_error_json(txn, op);
2971                 VLOG_WARN_RL(&syntax_rl,
2972                              "operation reply is not JSON null or object");
2973             }
2974         }
2975
2976         if (!soft_errors && !hard_errors && !lock_errors) {
2977             struct ovsdb_idl_txn_insert *insert;
2978
2979             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
2980                 hard_errors++;
2981             }
2982
2983             HMAP_FOR_EACH (insert, hmap_node, &txn->inserted_rows) {
2984                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
2985                     hard_errors++;
2986                 }
2987             }
2988         }
2989
2990         status = (hard_errors ? TXN_ERROR
2991                   : lock_errors ? TXN_NOT_LOCKED
2992                   : soft_errors ? TXN_TRY_AGAIN
2993                   : TXN_SUCCESS);
2994     }
2995
2996     ovsdb_idl_txn_complete(txn, status);
2997     return true;
2998 }
2999
3000 /* Returns the transaction currently active for 'row''s IDL.  A transaction
3001  * must currently be active. */
3002 struct ovsdb_idl_txn *
3003 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
3004 {
3005     struct ovsdb_idl_txn *txn = row->table->idl->txn;
3006     ovs_assert(txn != NULL);
3007     return txn;
3008 }
3009
3010 /* Returns the IDL on which 'txn' acts. */
3011 struct ovsdb_idl *
3012 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
3013 {
3014     return txn->idl;
3015 }
3016
3017 /* Blocks until 'idl' successfully connects to the remote database and
3018  * retrieves its contents. */
3019 void
3020 ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
3021 {
3022     while (1) {
3023         ovsdb_idl_run(idl);
3024         if (ovsdb_idl_has_ever_connected(idl)) {
3025             return;
3026         }
3027         ovsdb_idl_wait(idl);
3028         poll_block();
3029     }
3030 }
3031 \f
3032 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
3033  * the database server and to avoid modifying the database when the lock cannot
3034  * be acquired (that is, when another client has the same lock).
3035  *
3036  * If 'lock_name' is NULL, drops the locking requirement and releases the
3037  * lock. */
3038 void
3039 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
3040 {
3041     ovs_assert(!idl->txn);
3042     ovs_assert(hmap_is_empty(&idl->outstanding_txns));
3043
3044     if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
3045         /* Release previous lock. */
3046         ovsdb_idl_send_unlock_request(idl);
3047         free(idl->lock_name);
3048         idl->lock_name = NULL;
3049         idl->is_lock_contended = false;
3050     }
3051
3052     if (lock_name && !idl->lock_name) {
3053         /* Acquire new lock. */
3054         idl->lock_name = xstrdup(lock_name);
3055         ovsdb_idl_send_lock_request(idl);
3056     }
3057 }
3058
3059 /* Returns true if 'idl' is configured to obtain a lock and owns that lock.
3060  *
3061  * Locking and unlocking happens asynchronously from the database client's
3062  * point of view, so the information is only useful for optimization (e.g. if
3063  * the client doesn't have the lock then there's no point in trying to write to
3064  * the database). */
3065 bool
3066 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
3067 {
3068     return idl->has_lock;
3069 }
3070
3071 /* Returns true if 'idl' is configured to obtain a lock but the database server
3072  * has indicated that some other client already owns the requested lock. */
3073 bool
3074 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
3075 {
3076     return idl->is_lock_contended;
3077 }
3078
3079 static void
3080 ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
3081 {
3082     if (new_has_lock && !idl->has_lock) {
3083         if (idl->state == IDL_S_MONITORING ||
3084             idl->state == IDL_S_MONITORING2) {
3085             idl->change_seqno++;
3086         } else {
3087             /* We're setting up a session, so don't signal that the database
3088              * changed.  Finalizing the session will increment change_seqno
3089              * anyhow. */
3090         }
3091         idl->is_lock_contended = false;
3092     }
3093     idl->has_lock = new_has_lock;
3094 }
3095
3096 static void
3097 ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
3098                               struct json **idp)
3099 {
3100     ovsdb_idl_update_has_lock(idl, false);
3101
3102     json_destroy(idl->lock_request_id);
3103     idl->lock_request_id = NULL;
3104
3105     if (jsonrpc_session_is_connected(idl->session)) {
3106         struct json *params;
3107
3108         params = json_array_create_1(json_string_create(idl->lock_name));
3109         jsonrpc_session_send(idl->session,
3110                              jsonrpc_create_request(method, params, idp));
3111     }
3112 }
3113
3114 static void
3115 ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
3116 {
3117     ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
3118 }
3119
3120 static void
3121 ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
3122 {
3123     ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
3124 }
3125
3126 static void
3127 ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
3128 {
3129     bool got_lock;
3130
3131     json_destroy(idl->lock_request_id);
3132     idl->lock_request_id = NULL;
3133
3134     if (result->type == JSON_OBJECT) {
3135         const struct json *locked;
3136
3137         locked = shash_find_data(json_object(result), "locked");
3138         got_lock = locked && locked->type == JSON_TRUE;
3139     } else {
3140         got_lock = false;
3141     }
3142
3143     ovsdb_idl_update_has_lock(idl, got_lock);
3144     if (!got_lock) {
3145         idl->is_lock_contended = true;
3146     }
3147 }
3148
3149 static void
3150 ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
3151                             const struct json *params,
3152                             bool new_has_lock)
3153 {
3154     if (idl->lock_name
3155         && params->type == JSON_ARRAY
3156         && json_array(params)->n > 0
3157         && json_array(params)->elems[0]->type == JSON_STRING) {
3158         const char *lock_name = json_string(json_array(params)->elems[0]);
3159
3160         if (!strcmp(idl->lock_name, lock_name)) {
3161             ovsdb_idl_update_has_lock(idl, new_has_lock);
3162             if (!new_has_lock) {
3163                 idl->is_lock_contended = true;
3164             }
3165         }
3166     }
3167 }
3168
3169 void
3170 ovsdb_idl_loop_destroy(struct ovsdb_idl_loop *loop)
3171 {
3172     if (loop) {
3173         ovsdb_idl_destroy(loop->idl);
3174     }
3175 }
3176
3177 struct ovsdb_idl_txn *
3178 ovsdb_idl_loop_run(struct ovsdb_idl_loop *loop)
3179 {
3180     ovsdb_idl_run(loop->idl);
3181     loop->open_txn = (loop->committing_txn
3182                       || ovsdb_idl_get_seqno(loop->idl) == loop->skip_seqno
3183                       ? NULL
3184                       : ovsdb_idl_txn_create(loop->idl));
3185     return loop->open_txn;
3186 }
3187
3188 void
3189 ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
3190 {
3191     if (loop->open_txn) {
3192         loop->committing_txn = loop->open_txn;
3193         loop->open_txn = NULL;
3194
3195         loop->precommit_seqno = ovsdb_idl_get_seqno(loop->idl);
3196     }
3197
3198     struct ovsdb_idl_txn *txn = loop->committing_txn;
3199     if (txn) {
3200         enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(txn);
3201         if (status != TXN_INCOMPLETE) {
3202             switch (status) {
3203             case TXN_TRY_AGAIN:
3204                 /* We want to re-evaluate the database when it's changed from
3205                  * the contents that it had when we started the commit.  (That
3206                  * might have already happened.) */
3207                 loop->skip_seqno = loop->precommit_seqno;
3208                 if (ovsdb_idl_get_seqno(loop->idl) != loop->skip_seqno) {
3209                     poll_immediate_wake();
3210                 }
3211                 break;
3212
3213             case TXN_SUCCESS:
3214                 /* If the database has already changed since we started the
3215                  * commit, re-evaluate it immediately to avoid missing a change
3216                  * for a while. */
3217                 if (ovsdb_idl_get_seqno(loop->idl) != loop->precommit_seqno) {
3218                     poll_immediate_wake();
3219                 }
3220                 break;
3221
3222             case TXN_UNCHANGED:
3223             case TXN_ABORTED:
3224             case TXN_NOT_LOCKED:
3225             case TXN_ERROR:
3226                 break;
3227
3228             case TXN_UNCOMMITTED:
3229             case TXN_INCOMPLETE:
3230                 OVS_NOT_REACHED();
3231
3232             }
3233             ovsdb_idl_txn_destroy(txn);
3234             loop->committing_txn = NULL;
3235         }
3236     }
3237
3238     ovsdb_idl_wait(loop->idl);
3239 }