ovs-numa: Introduce function to set current thread affinity.
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "coverage.h"
27 #include "openvswitch/dynamic-string.h"
28 #include "fatal-signal.h"
29 #include "json.h"
30 #include "jsonrpc.h"
31 #include "ovsdb/ovsdb.h"
32 #include "ovsdb/table.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-idl-provider.h"
36 #include "ovsdb-parser.h"
37 #include "poll-loop.h"
38 #include "shash.h"
39 #include "sset.h"
40 #include "util.h"
41 #include "openvswitch/vlog.h"
42
43 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
44
45 COVERAGE_DEFINE(txn_uncommitted);
46 COVERAGE_DEFINE(txn_unchanged);
47 COVERAGE_DEFINE(txn_incomplete);
48 COVERAGE_DEFINE(txn_aborted);
49 COVERAGE_DEFINE(txn_success);
50 COVERAGE_DEFINE(txn_try_again);
51 COVERAGE_DEFINE(txn_not_locked);
52 COVERAGE_DEFINE(txn_error);
53
54 /* An arc from one idl_row to another.  When row A contains a UUID that
55  * references row B, this is represented by an arc from A (the source) to B
56  * (the destination).
57  *
58  * Arcs from a row to itself are omitted, that is, src and dst are always
59  * different.
60  *
61  * Arcs are never duplicated, that is, even if there are multiple references
62  * from A to B, there is only a single arc from A to B.
63  *
64  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
65  * A.  Both an arc and its converse may both be present, if each row refers
66  * to the other circularly.
67  *
68  * The source and destination row may be in the same table or in different
69  * tables.
70  */
71 struct ovsdb_idl_arc {
72     struct ovs_list src_node;   /* In src->src_arcs list. */
73     struct ovs_list dst_node;   /* In dst->dst_arcs list. */
74     struct ovsdb_idl_row *src;  /* Source row. */
75     struct ovsdb_idl_row *dst;  /* Destination row. */
76 };
77
78 enum ovsdb_idl_state {
79     IDL_S_SCHEMA_REQUESTED,
80     IDL_S_MONITOR_REQUESTED,
81     IDL_S_MONITORING,
82     IDL_S_MONITOR2_REQUESTED,
83     IDL_S_MONITORING2,
84     IDL_S_NO_SCHEMA
85 };
86
87 struct ovsdb_idl {
88     const struct ovsdb_idl_class *class;
89     struct jsonrpc_session *session;
90     struct shash table_by_name;
91     struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
92     unsigned int change_seqno;
93     bool verify_write_only;
94
95     /* Session state. */
96     unsigned int state_seqno;
97     enum ovsdb_idl_state state;
98     struct json *request_id;
99     struct json *schema;
100
101     /* Database locking. */
102     char *lock_name;            /* Name of lock we need, NULL if none. */
103     bool has_lock;              /* Has db server told us we have the lock? */
104     bool is_lock_contended;     /* Has db server told us we can't get lock? */
105     struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
106
107     /* Transaction support. */
108     struct ovsdb_idl_txn *txn;
109     struct hmap outstanding_txns;
110 };
111
112 struct ovsdb_idl_txn {
113     struct hmap_node hmap_node;
114     struct json *request_id;
115     struct ovsdb_idl *idl;
116     struct hmap txn_rows;
117     enum ovsdb_idl_txn_status status;
118     char *error;
119     bool dry_run;
120     struct ds comment;
121
122     /* Increments. */
123     const char *inc_table;
124     const char *inc_column;
125     struct uuid inc_row;
126     unsigned int inc_index;
127     int64_t inc_new_value;
128
129     /* Inserted rows. */
130     struct hmap inserted_rows;  /* Contains "struct ovsdb_idl_txn_insert"s. */
131 };
132
133 struct ovsdb_idl_txn_insert {
134     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
135     struct uuid dummy;          /* Dummy UUID used locally. */
136     int op_index;               /* Index into transaction's operation array. */
137     struct uuid real;           /* Real UUID used by database server. */
138 };
139
140 enum ovsdb_update_version {
141     OVSDB_UPDATE,               /* RFC 7047 "update" method. */
142     OVSDB_UPDATE2               /* "update2" Extension to RFC 7047.
143                                    See ovsdb-server(1) for more information. */
144 };
145
146 /* Name arrays indexed by 'enum ovsdb_update_version'. */
147 static const char *table_updates_names[] = {"table_updates", "table_updates2"};
148 static const char *table_update_names[] = {"table_update", "table_update2"};
149 static const char *row_update_names[] = {"row_update", "row_update2"};
150
151 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
152 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
153
154 static void ovsdb_idl_clear(struct ovsdb_idl *);
155 static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
156 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
157 static void ovsdb_idl_send_monitor2_request(struct ovsdb_idl *);
158 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *,
159                                    enum ovsdb_update_version);
160 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
161                                                     const struct json *,
162                                                     enum ovsdb_update_version);
163 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
164                                      const struct uuid *,
165                                      const struct json *old,
166                                      const struct json *new);
167 static bool ovsdb_idl_process_update2(struct ovsdb_idl_table *,
168                                       const struct uuid *,
169                                       const char *operation,
170                                       const struct json *row);
171 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
172 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
173 static bool ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
174 static bool ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *,
175                                          const struct json *);
176
177 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
178 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
179     const struct ovsdb_idl_table_class *);
180 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
181                                                   const struct uuid *);
182 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
183 static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *);
184 static void ovsdb_idl_destroy_all_map_op_lists(struct ovsdb_idl_row *);
185
186 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
187 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
188 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
189 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
190 static void ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *, bool destroy_dsts);
191
192 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
193 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
194                                         const struct jsonrpc_msg *msg);
195 static bool ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *,
196                                             struct json *);
197 static void ovsdb_idl_txn_add_map_op(struct ovsdb_idl_row *,
198                                      const struct ovsdb_idl_column *,
199                                      struct ovsdb_datum *,
200                                      enum map_op_type);
201
202 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
203 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
204 static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
205                                        const struct json *);
206 static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
207                                         const struct json *params,
208                                         bool new_has_lock);
209 static struct ovsdb_idl_table *
210 ovsdb_idl_table_from_class(const struct ovsdb_idl *,
211                            const struct ovsdb_idl_table_class *);
212 static bool ovsdb_idl_track_is_set(struct ovsdb_idl_table *table);
213
214 /* Creates and returns a connection to database 'remote', which should be in a
215  * form acceptable to jsonrpc_session_open().  The connection will maintain an
216  * in-memory replica of the remote database whose schema is described by
217  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
218  * by ovsdb-idlc.)
219  *
220  * Passes 'retry' to jsonrpc_session_open().  See that function for
221  * documentation.
222  *
223  * If 'monitor_everything_by_default' is true, then everything in the remote
224  * database will be replicated by default.  ovsdb_idl_omit() and
225  * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
226  * monitoring.
227  *
228  * If 'monitor_everything_by_default' is false, then no columns or tables will
229  * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
230  * must be used to choose some columns or tables to replicate.
231  */
232 struct ovsdb_idl *
233 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
234                  bool monitor_everything_by_default, bool retry)
235 {
236     struct ovsdb_idl *idl;
237     uint8_t default_mode;
238     size_t i;
239
240     default_mode = (monitor_everything_by_default
241                     ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
242                     : 0);
243
244     idl = xzalloc(sizeof *idl);
245     idl->class = class;
246     idl->session = jsonrpc_session_open(remote, retry);
247     shash_init(&idl->table_by_name);
248     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
249     for (i = 0; i < class->n_tables; i++) {
250         const struct ovsdb_idl_table_class *tc = &class->tables[i];
251         struct ovsdb_idl_table *table = &idl->tables[i];
252         size_t j;
253
254         shash_add_assert(&idl->table_by_name, tc->name, table);
255         table->class = tc;
256         table->modes = xmalloc(tc->n_columns);
257         memset(table->modes, default_mode, tc->n_columns);
258         table->need_table = false;
259         shash_init(&table->columns);
260         for (j = 0; j < tc->n_columns; j++) {
261             const struct ovsdb_idl_column *column = &tc->columns[j];
262
263             shash_add_assert(&table->columns, column->name, column);
264         }
265         hmap_init(&table->rows);
266         ovs_list_init(&table->track_list);
267         table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
268             = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
269             = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
270         table->idl = idl;
271     }
272
273     idl->state_seqno = UINT_MAX;
274     idl->request_id = NULL;
275     idl->schema = NULL;
276
277     hmap_init(&idl->outstanding_txns);
278
279     return idl;
280 }
281
282 /* Changes the remote and creates a new session. */
283 void
284 ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote,
285                      bool retry)
286 {
287     if (idl) {
288         ovs_assert(!idl->txn);
289         idl->session = jsonrpc_session_open(remote, retry);
290         idl->state_seqno = UINT_MAX;
291     }
292 }
293
294 /* Destroys 'idl' and all of the data structures that it manages. */
295 void
296 ovsdb_idl_destroy(struct ovsdb_idl *idl)
297 {
298     if (idl) {
299         size_t i;
300
301         ovs_assert(!idl->txn);
302         ovsdb_idl_clear(idl);
303         jsonrpc_session_close(idl->session);
304
305         for (i = 0; i < idl->class->n_tables; i++) {
306             struct ovsdb_idl_table *table = &idl->tables[i];
307             shash_destroy(&table->columns);
308             hmap_destroy(&table->rows);
309             free(table->modes);
310         }
311         shash_destroy(&idl->table_by_name);
312         free(idl->tables);
313         json_destroy(idl->request_id);
314         free(idl->lock_name);
315         json_destroy(idl->lock_request_id);
316         json_destroy(idl->schema);
317         hmap_destroy(&idl->outstanding_txns);
318         free(idl);
319     }
320 }
321
322 static void
323 ovsdb_idl_clear(struct ovsdb_idl *idl)
324 {
325     bool changed = false;
326     size_t i;
327
328     for (i = 0; i < idl->class->n_tables; i++) {
329         struct ovsdb_idl_table *table = &idl->tables[i];
330         struct ovsdb_idl_row *row, *next_row;
331
332         if (hmap_is_empty(&table->rows)) {
333             continue;
334         }
335
336         changed = true;
337         HMAP_FOR_EACH_SAFE (row, next_row, hmap_node, &table->rows) {
338             struct ovsdb_idl_arc *arc, *next_arc;
339
340             if (!ovsdb_idl_row_is_orphan(row)) {
341                 ovsdb_idl_row_unparse(row);
342             }
343             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
344                 free(arc);
345             }
346             /* No need to do anything with dst_arcs: some node has those arcs
347              * as forward arcs and will destroy them itself. */
348
349             if (!ovs_list_is_empty(&row->track_node)) {
350                 ovs_list_remove(&row->track_node);
351             }
352
353             ovsdb_idl_row_destroy(row);
354         }
355     }
356
357     ovsdb_idl_track_clear(idl);
358
359     if (changed) {
360         idl->change_seqno++;
361     }
362 }
363
364 /* Processes a batch of messages from the database server on 'idl'.  This may
365  * cause the IDL's contents to change.  The client may check for that with
366  * ovsdb_idl_get_seqno(). */
367 void
368 ovsdb_idl_run(struct ovsdb_idl *idl)
369 {
370     int i;
371
372     ovs_assert(!idl->txn);
373     jsonrpc_session_run(idl->session);
374     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
375         struct jsonrpc_msg *msg;
376         unsigned int seqno;
377
378         seqno = jsonrpc_session_get_seqno(idl->session);
379         if (idl->state_seqno != seqno) {
380             idl->state_seqno = seqno;
381             json_destroy(idl->request_id);
382             idl->request_id = NULL;
383             ovsdb_idl_txn_abort_all(idl);
384
385             ovsdb_idl_send_schema_request(idl);
386             idl->state = IDL_S_SCHEMA_REQUESTED;
387             if (idl->lock_name) {
388                 ovsdb_idl_send_lock_request(idl);
389             }
390         }
391
392         msg = jsonrpc_session_recv(idl->session);
393         if (!msg) {
394             break;
395         }
396
397         if (msg->type == JSONRPC_NOTIFY
398             && !strcmp(msg->method, "update2")
399             && msg->params->type == JSON_ARRAY
400             && msg->params->u.array.n == 2
401             && msg->params->u.array.elems[0]->type == JSON_NULL) {
402             /* Database contents changed. */
403             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
404                                    OVSDB_UPDATE2);
405         } else if (msg->type == JSONRPC_REPLY
406                    && idl->request_id
407                    && json_equal(idl->request_id, msg->id)) {
408             json_destroy(idl->request_id);
409             idl->request_id = NULL;
410
411             switch (idl->state) {
412             case IDL_S_SCHEMA_REQUESTED:
413                 /* Reply to our "get_schema" request. */
414                 idl->schema = json_clone(msg->result);
415                 ovsdb_idl_send_monitor2_request(idl);
416                 idl->state = IDL_S_MONITOR2_REQUESTED;
417                 break;
418
419             case IDL_S_MONITOR_REQUESTED:
420             case IDL_S_MONITOR2_REQUESTED:
421                 /* Reply to our "monitor" or "monitor2" request. */
422                 idl->change_seqno++;
423                 ovsdb_idl_clear(idl);
424                 if (idl->state == IDL_S_MONITOR_REQUESTED) {
425                     idl->state = IDL_S_MONITORING;
426                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE);
427                 } else { /* IDL_S_MONITOR2_REQUESTED. */
428                     idl->state = IDL_S_MONITORING2;
429                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE2);
430                 }
431
432                 /* Schema is not useful after monitor request is accepted
433                  * by the server.  */
434                 json_destroy(idl->schema);
435                 idl->schema = NULL;
436                 break;
437
438             case IDL_S_MONITORING:
439             case IDL_S_MONITORING2:
440             case IDL_S_NO_SCHEMA:
441             default:
442                 OVS_NOT_REACHED();
443             }
444         } else if (msg->type == JSONRPC_NOTIFY
445             && !strcmp(msg->method, "update")
446             && msg->params->type == JSON_ARRAY
447             && msg->params->u.array.n == 2
448             && msg->params->u.array.elems[0]->type == JSON_NULL) {
449             /* Database contents changed. */
450             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
451                                    OVSDB_UPDATE);
452         } else if (msg->type == JSONRPC_REPLY
453                    && idl->lock_request_id
454                    && json_equal(idl->lock_request_id, msg->id)) {
455             /* Reply to our "lock" request. */
456             ovsdb_idl_parse_lock_reply(idl, msg->result);
457         } else if (msg->type == JSONRPC_NOTIFY
458                    && !strcmp(msg->method, "locked")) {
459             /* We got our lock. */
460             ovsdb_idl_parse_lock_notify(idl, msg->params, true);
461         } else if (msg->type == JSONRPC_NOTIFY
462                    && !strcmp(msg->method, "stolen")) {
463             /* Someone else stole our lock. */
464             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
465         } else if (msg->type == JSONRPC_ERROR
466                    && idl->state == IDL_S_MONITOR2_REQUESTED
467                    && idl->request_id
468                    && json_equal(idl->request_id, msg->id)) {
469             if (msg->error && !strcmp(json_string(msg->error),
470                                       "unknown method")) {
471                 /* Fall back to using "monitor" method.  */
472                 json_destroy(idl->request_id);
473                 idl->request_id = NULL;
474                 ovsdb_idl_send_monitor_request(idl);
475                 idl->state = IDL_S_MONITOR_REQUESTED;
476             }
477         } else if (msg->type == JSONRPC_ERROR
478                    && idl->state == IDL_S_SCHEMA_REQUESTED
479                    && idl->request_id
480                    && json_equal(idl->request_id, msg->id)) {
481                 json_destroy(idl->request_id);
482                 idl->request_id = NULL;
483                 VLOG_ERR("%s: requested schema not found",
484                          jsonrpc_session_get_name(idl->session));
485                 idl->state = IDL_S_NO_SCHEMA;
486         } else if ((msg->type == JSONRPC_ERROR
487                     || msg->type == JSONRPC_REPLY)
488                    && ovsdb_idl_txn_process_reply(idl, msg)) {
489             /* ovsdb_idl_txn_process_reply() did everything needful. */
490         } else {
491             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
492              * a transaction before we receive the reply, so keep the log level
493              * low. */
494             VLOG_DBG("%s: received unexpected %s message",
495                      jsonrpc_session_get_name(idl->session),
496                      jsonrpc_msg_type_to_string(msg->type));
497         }
498         jsonrpc_msg_destroy(msg);
499     }
500     ovsdb_idl_row_destroy_postprocess(idl);
501 }
502
503 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
504  * do or when activity occurs on a transaction on 'idl'. */
505 void
506 ovsdb_idl_wait(struct ovsdb_idl *idl)
507 {
508     jsonrpc_session_wait(idl->session);
509     jsonrpc_session_recv_wait(idl->session);
510 }
511
512 /* Returns a "sequence number" that represents the state of 'idl'.  When
513  * ovsdb_idl_run() changes the database, the sequence number changes.  The
514  * initial fetch of the entire contents of the remote database is considered to
515  * be one kind of change.  Successfully acquiring a lock, if one has been
516  * configured with ovsdb_idl_set_lock(), is also considered to be a change.
517  *
518  * As long as the sequence number does not change, the client may continue to
519  * use any data structures it obtains from 'idl'.  But when it changes, the
520  * client must not access any of these data structures again, because they
521  * could have freed or reused for other purposes.
522  *
523  * The sequence number can occasionally change even if the database does not.
524  * This happens if the connection to the database drops and reconnects, which
525  * causes the database contents to be reloaded even if they didn't change.  (It
526  * could also happen if the database server sends out a "change" that reflects
527  * what the IDL already thought was in the database.  The database server is
528  * not supposed to do that, but bugs could in theory cause it to do so.) */
529 unsigned int
530 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
531 {
532     return idl->change_seqno;
533 }
534
535 /* Returns true if 'idl' successfully connected to the remote database and
536  * retrieved its contents (even if the connection subsequently dropped and is
537  * in the process of reconnecting).  If so, then 'idl' contains an atomic
538  * snapshot of the database's contents (but it might be arbitrarily old if the
539  * connection dropped).
540  *
541  * Returns false if 'idl' has never connected or retrieved the database's
542  * contents.  If so, 'idl' is empty. */
543 bool
544 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
545 {
546     return ovsdb_idl_get_seqno(idl) != 0;
547 }
548
549 /* Reconfigures 'idl' so that it would reconnect to the database, if
550  * connection was dropped. */
551 void
552 ovsdb_idl_enable_reconnect(struct ovsdb_idl *idl)
553 {
554     jsonrpc_session_enable_reconnect(idl->session);
555 }
556
557 /* Forces 'idl' to drop its connection to the database and reconnect.  In the
558  * meantime, the contents of 'idl' will not change. */
559 void
560 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
561 {
562     jsonrpc_session_force_reconnect(idl->session);
563 }
564
565 /* Some IDL users should only write to write-only columns.  Furthermore,
566  * writing to a column which is not write-only can cause serious performance
567  * degradations for these users.  This function causes 'idl' to reject writes
568  * to columns which are not marked write only using ovsdb_idl_omit_alert(). */
569 void
570 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
571 {
572     idl->verify_write_only = true;
573 }
574
575 /* Returns true if 'idl' is currently connected or trying to connect
576  * and a negative response to a schema request has not been received */
577 bool
578 ovsdb_idl_is_alive(const struct ovsdb_idl *idl)
579 {
580     return jsonrpc_session_is_alive(idl->session) &&
581            idl->state != IDL_S_NO_SCHEMA;
582 }
583
584 /* Returns the last error reported on a connection by 'idl'.  The return value
585  * is 0 only if no connection made by 'idl' has ever encountered an error and
586  * a negative response to a schema request has never been received. See
587  * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value
588  * interpretation. */
589 int
590 ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
591 {
592     int err;
593
594     err = jsonrpc_session_get_last_error(idl->session);
595
596     if (err) {
597         return err;
598     } else if (idl->state == IDL_S_NO_SCHEMA) {
599         return ENOENT;
600     } else {
601         return 0;
602     }
603 }
604
605 /* Sets the "probe interval" for 'idl->session' to 'probe_interval', in
606  * milliseconds.
607  */
608 void
609 ovsdb_idl_set_probe_interval(const struct ovsdb_idl *idl, int probe_interval)
610 {
611     jsonrpc_session_set_probe_interval(idl->session, probe_interval);
612 }
613 \f
614 static unsigned char *
615 ovsdb_idl_get_mode(struct ovsdb_idl *idl,
616                    const struct ovsdb_idl_column *column)
617 {
618     size_t i;
619
620     ovs_assert(!idl->change_seqno);
621
622     for (i = 0; i < idl->class->n_tables; i++) {
623         const struct ovsdb_idl_table *table = &idl->tables[i];
624         const struct ovsdb_idl_table_class *tc = table->class;
625
626         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
627             return &table->modes[column - tc->columns];
628         }
629     }
630
631     OVS_NOT_REACHED();
632 }
633
634 static void
635 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
636 {
637     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
638         struct ovsdb_idl_table *table;
639
640         table = shash_find_data(&idl->table_by_name,
641                                 base->u.uuid.refTableName);
642         if (table) {
643             table->need_table = true;
644         } else {
645             VLOG_WARN("%s IDL class missing referenced table %s",
646                       idl->class->database, base->u.uuid.refTableName);
647         }
648     }
649 }
650
651 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
652  * ensures that any tables referenced by 'column' will be replicated, even if
653  * no columns in that table are selected for replication (see
654  * ovsdb_idl_add_table() for more information).
655  *
656  * This function is only useful if 'monitor_everything_by_default' was false in
657  * the call to ovsdb_idl_create().  This function should be called between
658  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
659  */
660 void
661 ovsdb_idl_add_column(struct ovsdb_idl *idl,
662                      const struct ovsdb_idl_column *column)
663 {
664     *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
665     add_ref_table(idl, &column->type.key);
666     add_ref_table(idl, &column->type.value);
667 }
668
669 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
670  * no columns are selected for replication. Just the necessary data for table
671  * references will be replicated (the UUID of the rows, for instance), any
672  * columns not selected for replication will remain unreplicated.
673  * This can be useful because it allows 'idl' to keep track of what rows in the
674  * table actually exist, which in turn allows columns that reference the table
675  * to have accurate contents. (The IDL presents the database with references to
676  * rows that do not exist removed.)
677  *
678  * This function is only useful if 'monitor_everything_by_default' was false in
679  * the call to ovsdb_idl_create().  This function should be called between
680  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
681  */
682 void
683 ovsdb_idl_add_table(struct ovsdb_idl *idl,
684                     const struct ovsdb_idl_table_class *tc)
685 {
686     size_t i;
687
688     for (i = 0; i < idl->class->n_tables; i++) {
689         struct ovsdb_idl_table *table = &idl->tables[i];
690
691         if (table->class == tc) {
692             table->need_table = true;
693             return;
694         }
695     }
696
697     OVS_NOT_REACHED();
698 }
699
700 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
701  *
702  * This function should be called between ovsdb_idl_create() and the first call
703  * to ovsdb_idl_run().
704  */
705 void
706 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
707                      const struct ovsdb_idl_column *column)
708 {
709     *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
710 }
711
712 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
713  * OVSDB_IDL_MONITOR for details.
714  *
715  * This function should be called between ovsdb_idl_create() and the first call
716  * to ovsdb_idl_run().
717  */
718 void
719 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
720 {
721     *ovsdb_idl_get_mode(idl, column) = 0;
722 }
723
724 /* Returns the most recent IDL change sequence number that caused a
725  * insert, modify or delete update to the table with class 'table_class'.
726  */
727 unsigned int
728 ovsdb_idl_table_get_seqno(const struct ovsdb_idl *idl,
729                           const struct ovsdb_idl_table_class *table_class)
730 {
731     struct ovsdb_idl_table *table
732         = ovsdb_idl_table_from_class(idl, table_class);
733     unsigned int max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_INSERT];
734
735     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]) {
736         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY];
737     }
738     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_DELETE]) {
739         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_DELETE];
740     }
741     return max_seqno;
742 }
743
744 /* For each row that contains tracked columns, IDL stores the most
745  * recent IDL change sequence numbers associateed with insert, modify
746  * and delete updates to the table.
747  */
748 unsigned int
749 ovsdb_idl_row_get_seqno(const struct ovsdb_idl_row *row,
750                         enum ovsdb_idl_change change)
751 {
752     return row->change_seqno[change];
753 }
754
755 /* Turns on OVSDB_IDL_TRACK for 'column' in 'idl', ensuring that
756  * all rows whose 'column' is modified are traced. Similarly, insert
757  * or delete of rows having 'column' are tracked. Clients are able
758  * to retrive the tracked rows with the ovsdb_idl_track_get_*()
759  * functions.
760  *
761  * This function should be called between ovsdb_idl_create() and
762  * the first call to ovsdb_idl_run(). The column to be tracked
763  * should have OVSDB_IDL_ALERT turned on.
764  */
765 void
766 ovsdb_idl_track_add_column(struct ovsdb_idl *idl,
767                            const struct ovsdb_idl_column *column)
768 {
769     if (!(*ovsdb_idl_get_mode(idl, column) & OVSDB_IDL_ALERT)) {
770         ovsdb_idl_add_column(idl, column);
771     }
772     *ovsdb_idl_get_mode(idl, column) |= OVSDB_IDL_TRACK;
773 }
774
775 void
776 ovsdb_idl_track_add_all(struct ovsdb_idl *idl)
777 {
778     size_t i, j;
779
780     for (i = 0; i < idl->class->n_tables; i++) {
781         const struct ovsdb_idl_table_class *tc = &idl->class->tables[i];
782
783         for (j = 0; j < tc->n_columns; j++) {
784             const struct ovsdb_idl_column *column = &tc->columns[j];
785             ovsdb_idl_track_add_column(idl, column);
786         }
787     }
788 }
789
790 /* Returns true if 'table' has any tracked column. */
791 static bool
792 ovsdb_idl_track_is_set(struct ovsdb_idl_table *table)
793 {
794     size_t i;
795
796     for (i = 0; i < table->class->n_columns; i++) {
797         if (table->modes[i] & OVSDB_IDL_TRACK) {
798             return true;
799         }
800     }
801    return false;
802 }
803
804 /* Returns the first tracked row in table with class 'table_class'
805  * for the specified 'idl'. Returns NULL if there are no tracked rows */
806 const struct ovsdb_idl_row *
807 ovsdb_idl_track_get_first(const struct ovsdb_idl *idl,
808                           const struct ovsdb_idl_table_class *table_class)
809 {
810     struct ovsdb_idl_table *table
811         = ovsdb_idl_table_from_class(idl, table_class);
812
813     if (!ovs_list_is_empty(&table->track_list)) {
814         return CONTAINER_OF(ovs_list_front(&table->track_list), struct ovsdb_idl_row, track_node);
815     }
816     return NULL;
817 }
818
819 /* Returns the next tracked row in table after the specified 'row'
820  * (in no particular order). Returns NULL if there are no tracked rows */
821 const struct ovsdb_idl_row *
822 ovsdb_idl_track_get_next(const struct ovsdb_idl_row *row)
823 {
824     if (row->track_node.next != &row->table->track_list) {
825         return CONTAINER_OF(row->track_node.next, struct ovsdb_idl_row, track_node);
826     }
827
828     return NULL;
829 }
830
831 /* Returns true if a tracked 'column' in 'row' was updated by IDL, false
832  * otherwise. The tracking data is cleared by ovsdb_idl_track_clear()
833  *
834  * Function returns false if 'column' is not tracked (see
835  * ovsdb_idl_track_add_column()).
836  */
837 bool
838 ovsdb_idl_track_is_updated(const struct ovsdb_idl_row *row,
839                            const struct ovsdb_idl_column *column)
840 {
841     const struct ovsdb_idl_table_class *class;
842     size_t column_idx;
843
844     class = row->table->class;
845     column_idx = column - class->columns;
846
847     if (row->updated && bitmap_is_set(row->updated, column_idx)) {
848         return true;
849     } else {
850         return false;
851     }
852 }
853
854 /* Flushes the tracked rows. Client calls this function after calling
855  * ovsdb_idl_run() and read all tracked rows with the ovsdb_idl_track_get_*()
856  * functions. This is usually done at the end of the client's processing
857  * loop when it is ready to do ovsdb_idl_run() again.
858  */
859 void
860 ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
861 {
862     size_t i;
863
864     for (i = 0; i < idl->class->n_tables; i++) {
865         struct ovsdb_idl_table *table = &idl->tables[i];
866
867         if (!ovs_list_is_empty(&table->track_list)) {
868             struct ovsdb_idl_row *row, *next;
869
870             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
871                 if (row->updated) {
872                     free(row->updated);
873                     row->updated = NULL;
874                 }
875                 ovs_list_remove(&row->track_node);
876                 ovs_list_init(&row->track_node);
877                 if (ovsdb_idl_row_is_orphan(row)) {
878                     ovsdb_idl_row_clear_old(row);
879                     free(row);
880                 }
881             }
882         }
883     }
884 }
885
886 \f
887 static void
888 ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
889 {
890     struct jsonrpc_msg *msg;
891
892     json_destroy(idl->request_id);
893     msg = jsonrpc_create_request(
894         "get_schema",
895         json_array_create_1(json_string_create(idl->class->database)),
896         &idl->request_id);
897     jsonrpc_session_send(idl->session, msg);
898 }
899
900 static void
901 log_error(struct ovsdb_error *error)
902 {
903     char *s = ovsdb_error_to_string(error);
904     VLOG_WARN("error parsing database schema: %s", s);
905     free(s);
906     ovsdb_error_destroy(error);
907 }
908
909 /* Frees 'schema', which is in the format returned by parse_schema(). */
910 static void
911 free_schema(struct shash *schema)
912 {
913     if (schema) {
914         struct shash_node *node, *next;
915
916         SHASH_FOR_EACH_SAFE (node, next, schema) {
917             struct sset *sset = node->data;
918             sset_destroy(sset);
919             free(sset);
920             shash_delete(schema, node);
921         }
922         shash_destroy(schema);
923         free(schema);
924     }
925 }
926
927 /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
928  * 7047, to obtain the names of its rows and columns.  If successful, returns
929  * an shash whose keys are table names and whose values are ssets, where each
930  * sset contains the names of its table's columns.  On failure (due to a parse
931  * error), returns NULL.
932  *
933  * It would also be possible to use the general-purpose OVSDB schema parser in
934  * ovsdb-server, but that's overkill, possibly too strict for the current use
935  * case, and would require restructuring ovsdb-server to separate the schema
936  * code from the rest. */
937 static struct shash *
938 parse_schema(const struct json *schema_json)
939 {
940     struct ovsdb_parser parser;
941     const struct json *tables_json;
942     struct ovsdb_error *error;
943     struct shash_node *node;
944     struct shash *schema;
945
946     ovsdb_parser_init(&parser, schema_json, "database schema");
947     tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
948     error = ovsdb_parser_destroy(&parser);
949     if (error) {
950         log_error(error);
951         return NULL;
952     }
953
954     schema = xmalloc(sizeof *schema);
955     shash_init(schema);
956     SHASH_FOR_EACH (node, json_object(tables_json)) {
957         const char *table_name = node->name;
958         const struct json *json = node->data;
959         const struct json *columns_json;
960
961         ovsdb_parser_init(&parser, json, "table schema for table %s",
962                           table_name);
963         columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
964         error = ovsdb_parser_destroy(&parser);
965         if (error) {
966             log_error(error);
967             free_schema(schema);
968             return NULL;
969         }
970
971         struct sset *columns = xmalloc(sizeof *columns);
972         sset_init(columns);
973
974         struct shash_node *node2;
975         SHASH_FOR_EACH (node2, json_object(columns_json)) {
976             const char *column_name = node2->name;
977             sset_add(columns, column_name);
978         }
979         shash_add(schema, table_name, columns);
980     }
981     return schema;
982 }
983
984 static void
985 ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
986                                  const char *method)
987 {
988     struct shash *schema;
989     struct json *monitor_requests;
990     struct jsonrpc_msg *msg;
991     size_t i;
992
993     schema = parse_schema(idl->schema);
994     monitor_requests = json_object_create();
995     for (i = 0; i < idl->class->n_tables; i++) {
996         const struct ovsdb_idl_table *table = &idl->tables[i];
997         const struct ovsdb_idl_table_class *tc = table->class;
998         struct json *monitor_request, *columns;
999         const struct sset *table_schema;
1000         size_t j;
1001
1002         table_schema = (schema
1003                         ? shash_find_data(schema, table->class->name)
1004                         : NULL);
1005
1006         columns = table->need_table ? json_array_create_empty() : NULL;
1007         for (j = 0; j < tc->n_columns; j++) {
1008             const struct ovsdb_idl_column *column = &tc->columns[j];
1009             if (table->modes[j] & OVSDB_IDL_MONITOR) {
1010                 if (table_schema
1011                     && !sset_contains(table_schema, column->name)) {
1012                     VLOG_WARN("%s table in %s database lacks %s column "
1013                               "(database needs upgrade?)",
1014                               table->class->name, idl->class->database,
1015                               column->name);
1016                     continue;
1017                 }
1018                 if (!columns) {
1019                     columns = json_array_create_empty();
1020                 }
1021                 json_array_add(columns, json_string_create(column->name));
1022             }
1023         }
1024
1025         if (columns) {
1026             if (schema && !table_schema) {
1027                 VLOG_WARN("%s database lacks %s table "
1028                           "(database needs upgrade?)",
1029                           idl->class->database, table->class->name);
1030                 json_destroy(columns);
1031                 continue;
1032             }
1033
1034             monitor_request = json_object_create();
1035             json_object_put(monitor_request, "columns", columns);
1036             json_object_put(monitor_requests, tc->name, monitor_request);
1037         }
1038     }
1039     free_schema(schema);
1040
1041     json_destroy(idl->request_id);
1042     msg = jsonrpc_create_request(
1043         method,
1044         json_array_create_3(json_string_create(idl->class->database),
1045                             json_null_create(), monitor_requests),
1046         &idl->request_id);
1047     jsonrpc_session_send(idl->session, msg);
1048 }
1049
1050 static void
1051 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
1052 {
1053     ovsdb_idl_send_monitor_request__(idl, "monitor");
1054 }
1055
1056 static void
1057 log_parse_update_error(struct ovsdb_error *error)
1058 {
1059         if (!VLOG_DROP_WARN(&syntax_rl)) {
1060             char *s = ovsdb_error_to_string(error);
1061             VLOG_WARN_RL(&syntax_rl, "%s", s);
1062             free(s);
1063         }
1064         ovsdb_error_destroy(error);
1065 }
1066
1067 static void
1068 ovsdb_idl_send_monitor2_request(struct ovsdb_idl *idl)
1069 {
1070     ovsdb_idl_send_monitor_request__(idl, "monitor2");
1071 }
1072
1073 static void
1074 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates,
1075                        enum ovsdb_update_version version)
1076 {
1077     struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates,
1078                                                          version);
1079     if (error) {
1080         log_parse_update_error(error);
1081     }
1082 }
1083
1084 static struct ovsdb_error *
1085 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
1086                          const struct json *table_updates,
1087                          enum ovsdb_update_version version)
1088 {
1089     const struct shash_node *tables_node;
1090     const char *table_updates_name = table_updates_names[version];
1091     const char *table_update_name = table_update_names[version];
1092     const char *row_update_name = row_update_names[version];
1093
1094     if (table_updates->type != JSON_OBJECT) {
1095         return ovsdb_syntax_error(table_updates, NULL,
1096                                   "<%s> is not an object",
1097                                   table_updates_name);
1098     }
1099
1100     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
1101         const struct json *table_update = tables_node->data;
1102         const struct shash_node *table_node;
1103         struct ovsdb_idl_table *table;
1104
1105         table = shash_find_data(&idl->table_by_name, tables_node->name);
1106         if (!table) {
1107             return ovsdb_syntax_error(
1108                 table_updates, NULL,
1109                 "<%s> includes unknown table \"%s\"",
1110                 table_updates_name,
1111                 tables_node->name);
1112         }
1113
1114         if (table_update->type != JSON_OBJECT) {
1115             return ovsdb_syntax_error(table_update, NULL,
1116                                       "<%s> for table \"%s\" is "
1117                                       "not an object",
1118                                       table_update_name,
1119                                       table->class->name);
1120         }
1121         SHASH_FOR_EACH (table_node, json_object(table_update)) {
1122             const struct json *row_update = table_node->data;
1123             const struct json *old_json, *new_json;
1124             struct uuid uuid;
1125
1126             if (!uuid_from_string(&uuid, table_node->name)) {
1127                 return ovsdb_syntax_error(table_update, NULL,
1128                                           "<%s> for table \"%s\" "
1129                                           "contains bad UUID "
1130                                           "\"%s\" as member name",
1131                                           table_update_name,
1132                                           table->class->name,
1133                                           table_node->name);
1134             }
1135             if (row_update->type != JSON_OBJECT) {
1136                 return ovsdb_syntax_error(row_update, NULL,
1137                                           "<%s> for table \"%s\" "
1138                                           "contains <%s> for %s that "
1139                                           "is not an object",
1140                                           table_update_name,
1141                                           table->class->name,
1142                                           row_update_name,
1143                                           table_node->name);
1144             }
1145
1146             switch(version) {
1147             case OVSDB_UPDATE:
1148                 old_json = shash_find_data(json_object(row_update), "old");
1149                 new_json = shash_find_data(json_object(row_update), "new");
1150                 if (old_json && old_json->type != JSON_OBJECT) {
1151                     return ovsdb_syntax_error(old_json, NULL,
1152                                               "\"old\" <row> is not object");
1153                 } else if (new_json && new_json->type != JSON_OBJECT) {
1154                     return ovsdb_syntax_error(new_json, NULL,
1155                                               "\"new\" <row> is not object");
1156                 } else if ((old_json != NULL) + (new_json != NULL)
1157                            != shash_count(json_object(row_update))) {
1158                     return ovsdb_syntax_error(row_update, NULL,
1159                                               "<row-update> contains "
1160                                               "unexpected member");
1161                 } else if (!old_json && !new_json) {
1162                     return ovsdb_syntax_error(row_update, NULL,
1163                                               "<row-update> missing \"old\" "
1164                                               "and \"new\" members");
1165                 }
1166
1167                 if (ovsdb_idl_process_update(table, &uuid, old_json,
1168                                              new_json)) {
1169                     idl->change_seqno++;
1170                 }
1171                 break;
1172
1173             case OVSDB_UPDATE2: {
1174                 const char *ops[] = {"modify", "insert", "delete", "initial"};
1175                 const char *operation;
1176                 const struct json *row;
1177                 int i;
1178
1179                 for (i = 0; i < ARRAY_SIZE(ops); i++) {
1180                     operation = ops[i];
1181                     row = shash_find_data(json_object(row_update), operation);
1182
1183                     if (row)  {
1184                         if (ovsdb_idl_process_update2(table, &uuid, operation,
1185                                                       row)) {
1186                             idl->change_seqno++;
1187                         }
1188                         break;
1189                     }
1190                 }
1191
1192                 /* row_update2 should contain one of the objects */
1193                 if (i == ARRAY_SIZE(ops)) {
1194                     return ovsdb_syntax_error(row_update, NULL,
1195                                               "<row_update2> includes unknown "
1196                                               "object");
1197                 }
1198                 break;
1199             }
1200
1201             default:
1202                 OVS_NOT_REACHED();
1203             }
1204         }
1205     }
1206
1207     return NULL;
1208 }
1209
1210 static struct ovsdb_idl_row *
1211 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
1212 {
1213     struct ovsdb_idl_row *row;
1214
1215     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &table->rows) {
1216         if (uuid_equals(&row->uuid, uuid)) {
1217             return row;
1218         }
1219     }
1220     return NULL;
1221 }
1222
1223 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1224  * otherwise. */
1225 static bool
1226 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
1227                          const struct uuid *uuid, const struct json *old,
1228                          const struct json *new)
1229 {
1230     struct ovsdb_idl_row *row;
1231
1232     row = ovsdb_idl_get_row(table, uuid);
1233     if (!new) {
1234         /* Delete row. */
1235         if (row && !ovsdb_idl_row_is_orphan(row)) {
1236             /* XXX perhaps we should check the 'old' values? */
1237             ovsdb_idl_delete_row(row);
1238         } else {
1239             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1240                          "from table %s",
1241                          UUID_ARGS(uuid), table->class->name);
1242             return false;
1243         }
1244     } else if (!old) {
1245         /* Insert row. */
1246         if (!row) {
1247             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1248         } else if (ovsdb_idl_row_is_orphan(row)) {
1249             ovsdb_idl_insert_row(row, new);
1250         } else {
1251             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1252                          "table %s", UUID_ARGS(uuid), table->class->name);
1253             return ovsdb_idl_modify_row(row, new);
1254         }
1255     } else {
1256         /* Modify row. */
1257         if (row) {
1258             /* XXX perhaps we should check the 'old' values? */
1259             if (!ovsdb_idl_row_is_orphan(row)) {
1260                 return ovsdb_idl_modify_row(row, new);
1261             } else {
1262                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1263                              "referenced row "UUID_FMT" in table %s",
1264                              UUID_ARGS(uuid), table->class->name);
1265                 ovsdb_idl_insert_row(row, new);
1266             }
1267         } else {
1268             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1269                          "in table %s", UUID_ARGS(uuid), table->class->name);
1270             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1271         }
1272     }
1273
1274     return true;
1275 }
1276
1277 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1278  * otherwise. */
1279 static bool
1280 ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
1281                           const struct uuid *uuid,
1282                           const char *operation,
1283                           const struct json *json_row)
1284 {
1285     struct ovsdb_idl_row *row;
1286
1287     row = ovsdb_idl_get_row(table, uuid);
1288     if (!strcmp(operation, "delete")) {
1289         /* Delete row. */
1290         if (row && !ovsdb_idl_row_is_orphan(row)) {
1291             ovsdb_idl_delete_row(row);
1292         } else {
1293             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1294                          "from table %s",
1295                          UUID_ARGS(uuid), table->class->name);
1296             return false;
1297         }
1298     } else if (!strcmp(operation, "insert") || !strcmp(operation, "initial")) {
1299         /* Insert row. */
1300         if (!row) {
1301             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), json_row);
1302         } else if (ovsdb_idl_row_is_orphan(row)) {
1303             ovsdb_idl_insert_row(row, json_row);
1304         } else {
1305             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1306                          "table %s", UUID_ARGS(uuid), table->class->name);
1307             ovsdb_idl_delete_row(row);
1308             ovsdb_idl_insert_row(row, json_row);
1309         }
1310     } else if (!strcmp(operation, "modify")) {
1311         /* Modify row. */
1312         if (row) {
1313             if (!ovsdb_idl_row_is_orphan(row)) {
1314                 return ovsdb_idl_modify_row_by_diff(row, json_row);
1315             } else {
1316                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1317                              "referenced row "UUID_FMT" in table %s",
1318                              UUID_ARGS(uuid), table->class->name);
1319                 return false;
1320             }
1321         } else {
1322             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1323                          "in table %s", UUID_ARGS(uuid), table->class->name);
1324             return false;
1325         }
1326     } else {
1327             VLOG_WARN_RL(&semantic_rl, "unknown operation %s to "
1328                          "table %s", operation, table->class->name);
1329             return false;
1330     }
1331
1332     return true;
1333 }
1334
1335 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1336  * otherwise.
1337  *
1338  * Change 'row' either with the content of 'row_json' or by apply 'diff'.
1339  * Caller needs to provide either valid 'row_json' or 'diff', but not
1340  * both.  */
1341 static bool
1342 ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
1343                        const struct json *diff_json,
1344                        enum ovsdb_idl_change change)
1345 {
1346     struct ovsdb_idl_table *table = row->table;
1347     const struct ovsdb_idl_table_class *class = table->class;
1348     struct shash_node *node;
1349     bool changed = false;
1350     bool apply_diff = diff_json != NULL;
1351     const struct json *json = apply_diff ? diff_json : row_json;
1352
1353     SHASH_FOR_EACH (node, json_object(json)) {
1354         const char *column_name = node->name;
1355         const struct ovsdb_idl_column *column;
1356         struct ovsdb_datum datum;
1357         struct ovsdb_error *error;
1358         unsigned int column_idx;
1359         struct ovsdb_datum *old;
1360
1361         column = shash_find_data(&table->columns, column_name);
1362         if (!column) {
1363             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
1364                          column_name, UUID_ARGS(&row->uuid));
1365             continue;
1366         }
1367
1368         column_idx = column - table->class->columns;
1369         old = &row->old[column_idx];
1370
1371         error = NULL;
1372         if (apply_diff) {
1373             struct ovsdb_datum diff;
1374
1375             ovs_assert(!row_json);
1376             error = ovsdb_transient_datum_from_json(&diff, &column->type,
1377                                                     node->data);
1378             if (!error) {
1379                 error = ovsdb_datum_apply_diff(&datum, old, &diff,
1380                                                &column->type);
1381                 ovsdb_datum_destroy(&diff, &column->type);
1382             }
1383         } else {
1384             ovs_assert(!diff_json);
1385             error = ovsdb_datum_from_json(&datum, &column->type, node->data,
1386                                           NULL);
1387         }
1388
1389         if (!error) {
1390             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
1391                 ovsdb_datum_swap(old, &datum);
1392                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
1393                     changed = true;
1394                     row->change_seqno[change]
1395                         = row->table->change_seqno[change]
1396                         = row->table->idl->change_seqno + 1;
1397                     if (table->modes[column_idx] & OVSDB_IDL_TRACK) {
1398                         if (!ovs_list_is_empty(&row->track_node)) {
1399                             ovs_list_remove(&row->track_node);
1400                         }
1401                         ovs_list_push_back(&row->table->track_list,
1402                                        &row->track_node);
1403                         if (!row->updated) {
1404                             row->updated = bitmap_allocate(class->n_columns);
1405                         }
1406                         bitmap_set1(row->updated, column_idx);
1407                     }
1408                 }
1409             } else {
1410                 /* Didn't really change but the OVSDB monitor protocol always
1411                  * includes every value in a row. */
1412             }
1413
1414             ovsdb_datum_destroy(&datum, &column->type);
1415         } else {
1416             char *s = ovsdb_error_to_string(error);
1417             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
1418                          " in table %s: %s", column_name,
1419                          UUID_ARGS(&row->uuid), table->class->name, s);
1420             free(s);
1421             ovsdb_error_destroy(error);
1422         }
1423     }
1424     return changed;
1425 }
1426
1427 static bool
1428 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json,
1429                      enum ovsdb_idl_change change)
1430 {
1431     return ovsdb_idl_row_change__(row, row_json, NULL, change);
1432 }
1433
1434 static bool
1435 ovsdb_idl_row_apply_diff(struct ovsdb_idl_row *row,
1436                          const struct json *diff_json,
1437                          enum ovsdb_idl_change change)
1438 {
1439     return ovsdb_idl_row_change__(row, NULL, diff_json, change);
1440 }
1441
1442 /* When a row A refers to row B through a column with a "refTable" constraint,
1443  * but row B does not exist, row B is called an "orphan row".  Orphan rows
1444  * should not persist, because the database enforces referential integrity, but
1445  * they can appear transiently as changes from the database are received (the
1446  * database doesn't try to topologically sort them and circular references mean
1447  * it isn't always possible anyhow).
1448  *
1449  * This function returns true if 'row' is an orphan row, otherwise false.
1450  */
1451 static bool
1452 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
1453 {
1454     return !row->old && !row->new;
1455 }
1456
1457 /* Returns true if 'row' is conceptually part of the database as modified by
1458  * the current transaction (if any), false otherwise.
1459  *
1460  * This function will return true if 'row' is not an orphan (see the comment on
1461  * ovsdb_idl_row_is_orphan()) and:
1462  *
1463  *   - 'row' exists in the database and has not been deleted within the
1464  *     current transaction (if any).
1465  *
1466  *   - 'row' was inserted within the current transaction and has not been
1467  *     deleted.  (In the latter case you should not have passed 'row' in at
1468  *     all, because ovsdb_idl_txn_delete() freed it.)
1469  *
1470  * This function will return false if 'row' is an orphan or if 'row' was
1471  * deleted within the current transaction.
1472  */
1473 static bool
1474 ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
1475 {
1476     return row->new != NULL;
1477 }
1478
1479 static void
1480 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
1481 {
1482     const struct ovsdb_idl_table_class *class = row->table->class;
1483     size_t i;
1484
1485     for (i = 0; i < class->n_columns; i++) {
1486         const struct ovsdb_idl_column *c = &class->columns[i];
1487         (c->parse)(row, &row->old[i]);
1488     }
1489 }
1490
1491 static void
1492 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
1493 {
1494     const struct ovsdb_idl_table_class *class = row->table->class;
1495     size_t i;
1496
1497     for (i = 0; i < class->n_columns; i++) {
1498         const struct ovsdb_idl_column *c = &class->columns[i];
1499         (c->unparse)(row);
1500     }
1501 }
1502
1503 static void
1504 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
1505 {
1506     ovs_assert(row->old == row->new);
1507     if (!ovsdb_idl_row_is_orphan(row)) {
1508         const struct ovsdb_idl_table_class *class = row->table->class;
1509         size_t i;
1510
1511         for (i = 0; i < class->n_columns; i++) {
1512             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
1513         }
1514         free(row->old);
1515         row->old = row->new = NULL;
1516     }
1517 }
1518
1519 static void
1520 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
1521 {
1522     if (row->old != row->new) {
1523         if (row->new) {
1524             const struct ovsdb_idl_table_class *class = row->table->class;
1525             size_t i;
1526
1527             if (row->written) {
1528                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
1529                     ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
1530                 }
1531             }
1532             free(row->new);
1533             free(row->written);
1534             row->written = NULL;
1535         }
1536         row->new = row->old;
1537     }
1538 }
1539
1540 static void
1541 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
1542 {
1543     struct ovsdb_idl_arc *arc, *next;
1544
1545     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
1546      * that this causes to be unreferenced, if tracking is not enabled.
1547      * If tracking is enabled, orphaned nodes are removed from hmap but not
1548      * freed.
1549      */
1550     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
1551         ovs_list_remove(&arc->dst_node);
1552         if (destroy_dsts
1553             && ovsdb_idl_row_is_orphan(arc->dst)
1554             && ovs_list_is_empty(&arc->dst->dst_arcs)) {
1555             ovsdb_idl_row_destroy(arc->dst);
1556         }
1557         free(arc);
1558     }
1559     ovs_list_init(&row->src_arcs);
1560 }
1561
1562 /* Force nodes that reference 'row' to reparse. */
1563 static void
1564 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
1565 {
1566     struct ovsdb_idl_arc *arc, *next;
1567
1568     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
1569      * 'arc', so we need to use the "safe" variant of list traversal.  However,
1570      * calling an ovsdb_idl_column's 'parse' function will add an arc
1571      * equivalent to 'arc' to row->arcs.  That could be a problem for
1572      * traversal, but it adds it at the beginning of the list to prevent us
1573      * from stumbling upon it again.
1574      *
1575      * (If duplicate arcs were possible then we would need to make sure that
1576      * 'next' didn't also point into 'arc''s destination, but we forbid
1577      * duplicate arcs.) */
1578     LIST_FOR_EACH_SAFE (arc, next, dst_node, &row->dst_arcs) {
1579         struct ovsdb_idl_row *ref = arc->src;
1580
1581         ovsdb_idl_row_unparse(ref);
1582         ovsdb_idl_row_clear_arcs(ref, false);
1583         ovsdb_idl_row_parse(ref);
1584     }
1585 }
1586
1587 static struct ovsdb_idl_row *
1588 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
1589 {
1590     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
1591     class->row_init(row);
1592     ovs_list_init(&row->src_arcs);
1593     ovs_list_init(&row->dst_arcs);
1594     hmap_node_nullify(&row->txn_node);
1595     ovs_list_init(&row->track_node);
1596     return row;
1597 }
1598
1599 static struct ovsdb_idl_row *
1600 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
1601 {
1602     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
1603     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
1604     row->uuid = *uuid;
1605     row->table = table;
1606     row->map_op_written = NULL;
1607     row->map_op_lists = NULL;
1608     return row;
1609 }
1610
1611 static void
1612 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
1613 {
1614     if (row) {
1615         ovsdb_idl_row_clear_old(row);
1616         hmap_remove(&row->table->rows, &row->hmap_node);
1617         ovsdb_idl_destroy_all_map_op_lists(row);
1618         if (ovsdb_idl_track_is_set(row->table)) {
1619             row->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1620                 = row->table->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1621                 = row->table->idl->change_seqno + 1;
1622         }
1623         if (!ovs_list_is_empty(&row->track_node)) {
1624             ovs_list_remove(&row->track_node);
1625         }
1626         ovs_list_push_back(&row->table->track_list, &row->track_node);
1627     }
1628 }
1629
1630 static void
1631 ovsdb_idl_destroy_all_map_op_lists(struct ovsdb_idl_row *row)
1632 {
1633     if (row->map_op_written) {
1634         /* Clear Map Operation Lists */
1635         size_t idx, n_columns;
1636         const struct ovsdb_idl_column *columns;
1637         const struct ovsdb_type *type;
1638         n_columns = row->table->class->n_columns;
1639         columns = row->table->class->columns;
1640         BITMAP_FOR_EACH_1 (idx, n_columns, row->map_op_written) {
1641             type = &columns[idx].type;
1642             map_op_list_destroy(row->map_op_lists[idx], type);
1643         }
1644         free(row->map_op_lists);
1645         bitmap_free(row->map_op_written);
1646         row->map_op_lists = NULL;
1647         row->map_op_written = NULL;
1648     }
1649 }
1650
1651 static void
1652 ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
1653 {
1654     size_t i;
1655
1656     for (i = 0; i < idl->class->n_tables; i++) {
1657         struct ovsdb_idl_table *table = &idl->tables[i];
1658
1659         if (!ovs_list_is_empty(&table->track_list)) {
1660             struct ovsdb_idl_row *row, *next;
1661
1662             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
1663                 if (!ovsdb_idl_track_is_set(row->table)) {
1664                     ovs_list_remove(&row->track_node);
1665                     free(row);
1666                 }
1667             }
1668         }
1669     }
1670 }
1671
1672 static void
1673 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
1674 {
1675     const struct ovsdb_idl_table_class *class = row->table->class;
1676     size_t i;
1677
1678     ovs_assert(!row->old && !row->new);
1679     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
1680     for (i = 0; i < class->n_columns; i++) {
1681         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
1682     }
1683     ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_INSERT);
1684     ovsdb_idl_row_parse(row);
1685
1686     ovsdb_idl_row_reparse_backrefs(row);
1687 }
1688
1689 static void
1690 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
1691 {
1692     ovsdb_idl_row_unparse(row);
1693     ovsdb_idl_row_clear_arcs(row, true);
1694     ovsdb_idl_row_clear_old(row);
1695     if (ovs_list_is_empty(&row->dst_arcs)) {
1696         ovsdb_idl_row_destroy(row);
1697     } else {
1698         ovsdb_idl_row_reparse_backrefs(row);
1699     }
1700 }
1701
1702 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1703  * otherwise. */
1704 static bool
1705 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
1706 {
1707     bool changed;
1708
1709     ovsdb_idl_row_unparse(row);
1710     ovsdb_idl_row_clear_arcs(row, true);
1711     changed = ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_MODIFY);
1712     ovsdb_idl_row_parse(row);
1713
1714     return changed;
1715 }
1716
1717 static bool
1718 ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *row,
1719                              const struct json *diff_json)
1720 {
1721     bool changed;
1722
1723     ovsdb_idl_row_unparse(row);
1724     ovsdb_idl_row_clear_arcs(row, true);
1725     changed = ovsdb_idl_row_apply_diff(row, diff_json,
1726                                        OVSDB_IDL_CHANGE_MODIFY);
1727     ovsdb_idl_row_parse(row);
1728
1729     return changed;
1730 }
1731
1732 static bool
1733 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
1734 {
1735     const struct ovsdb_idl_arc *arc;
1736
1737     /* No self-arcs. */
1738     if (src == dst) {
1739         return false;
1740     }
1741
1742     /* No duplicate arcs.
1743      *
1744      * We only need to test whether the first arc in dst->dst_arcs originates
1745      * at 'src', since we add all of the arcs from a given source in a clump
1746      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
1747      * added at the front of the dst_arcs list. */
1748     if (ovs_list_is_empty(&dst->dst_arcs)) {
1749         return true;
1750     }
1751     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
1752     return arc->src != src;
1753 }
1754
1755 static struct ovsdb_idl_table *
1756 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
1757                            const struct ovsdb_idl_table_class *table_class)
1758 {
1759     return &idl->tables[table_class - idl->class->tables];
1760 }
1761
1762 /* Called by ovsdb-idlc generated code. */
1763 struct ovsdb_idl_row *
1764 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
1765                       struct ovsdb_idl_table_class *dst_table_class,
1766                       const struct uuid *dst_uuid)
1767 {
1768     struct ovsdb_idl *idl = src->table->idl;
1769     struct ovsdb_idl_table *dst_table;
1770     struct ovsdb_idl_arc *arc;
1771     struct ovsdb_idl_row *dst;
1772
1773     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
1774     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
1775     if (idl->txn) {
1776         /* We're being called from ovsdb_idl_txn_write().  We must not update
1777          * any arcs, because the transaction will be backed out at commit or
1778          * abort time and we don't want our graph screwed up.
1779          *
1780          * Just return the destination row, if there is one and it has not been
1781          * deleted. */
1782         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
1783             return dst;
1784         }
1785         return NULL;
1786     } else {
1787         /* We're being called from some other context.  Update the graph. */
1788         if (!dst) {
1789             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
1790         }
1791
1792         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
1793         if (may_add_arc(src, dst)) {
1794             /* The arc *must* be added at the front of the dst_arcs list.  See
1795              * ovsdb_idl_row_reparse_backrefs() for details. */
1796             arc = xmalloc(sizeof *arc);
1797             ovs_list_push_front(&src->src_arcs, &arc->src_node);
1798             ovs_list_push_front(&dst->dst_arcs, &arc->dst_node);
1799             arc->src = src;
1800             arc->dst = dst;
1801         }
1802
1803         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
1804     }
1805 }
1806
1807 /* Searches 'tc''s table in 'idl' for a row with UUID 'uuid'.  Returns a
1808  * pointer to the row if there is one, otherwise a null pointer.  */
1809 const struct ovsdb_idl_row *
1810 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
1811                            const struct ovsdb_idl_table_class *tc,
1812                            const struct uuid *uuid)
1813 {
1814     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
1815 }
1816
1817 static struct ovsdb_idl_row *
1818 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
1819 {
1820     for (; node; node = hmap_next(&table->rows, node)) {
1821         struct ovsdb_idl_row *row;
1822
1823         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
1824         if (ovsdb_idl_row_exists(row)) {
1825             return row;
1826         }
1827     }
1828     return NULL;
1829 }
1830
1831 /* Returns a row in 'table_class''s table in 'idl', or a null pointer if that
1832  * table is empty.
1833  *
1834  * Database tables are internally maintained as hash tables, so adding or
1835  * removing rows while traversing the same table can cause some rows to be
1836  * visited twice or not at apply. */
1837 const struct ovsdb_idl_row *
1838 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
1839                     const struct ovsdb_idl_table_class *table_class)
1840 {
1841     struct ovsdb_idl_table *table
1842         = ovsdb_idl_table_from_class(idl, table_class);
1843     return next_real_row(table, hmap_first(&table->rows));
1844 }
1845
1846 /* Returns a row following 'row' within its table, or a null pointer if 'row'
1847  * is the last row in its table. */
1848 const struct ovsdb_idl_row *
1849 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
1850 {
1851     struct ovsdb_idl_table *table = row->table;
1852
1853     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
1854 }
1855
1856 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
1857  * transaction has changed 'column''s value, the modified value is returned.
1858  *
1859  * The caller must not modify or free the returned value.
1860  *
1861  * Various kinds of changes can invalidate the returned value: writing to the
1862  * same 'column' in 'row' (e.g. with ovsdb_idl_txn_write()), deleting 'row'
1863  * (e.g. with ovsdb_idl_txn_delete()), or completing an ongoing transaction
1864  * (e.g. with ovsdb_idl_txn_commit() or ovsdb_idl_txn_abort()).  If the
1865  * returned value is needed for a long time, it is best to make a copy of it
1866  * with ovsdb_datum_clone(). */
1867 const struct ovsdb_datum *
1868 ovsdb_idl_read(const struct ovsdb_idl_row *row,
1869                const struct ovsdb_idl_column *column)
1870 {
1871     const struct ovsdb_idl_table_class *class;
1872     size_t column_idx;
1873
1874     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
1875
1876     class = row->table->class;
1877     column_idx = column - class->columns;
1878
1879     ovs_assert(row->new != NULL);
1880     ovs_assert(column_idx < class->n_columns);
1881
1882     if (row->written && bitmap_is_set(row->written, column_idx)) {
1883         return &row->new[column_idx];
1884     } else if (row->old) {
1885         return &row->old[column_idx];
1886     } else {
1887         return ovsdb_datum_default(&column->type);
1888     }
1889 }
1890
1891 /* Same as ovsdb_idl_read(), except that it also asserts that 'column' has key
1892  * type 'key_type' and value type 'value_type'.  (Scalar and set types will
1893  * have a value type of OVSDB_TYPE_VOID.)
1894  *
1895  * This is useful in code that "knows" that a particular column has a given
1896  * type, so that it will abort if someone changes the column's type without
1897  * updating the code that uses it. */
1898 const struct ovsdb_datum *
1899 ovsdb_idl_get(const struct ovsdb_idl_row *row,
1900               const struct ovsdb_idl_column *column,
1901               enum ovsdb_atomic_type key_type OVS_UNUSED,
1902               enum ovsdb_atomic_type value_type OVS_UNUSED)
1903 {
1904     ovs_assert(column->type.key.type == key_type);
1905     ovs_assert(column->type.value.type == value_type);
1906
1907     return ovsdb_idl_read(row, column);
1908 }
1909
1910 /* Returns true if the field represented by 'column' in 'row' may be modified,
1911  * false if it is immutable.
1912  *
1913  * Normally, whether a field is mutable is controlled by its column's schema.
1914  * However, an immutable column can be set to any initial value at the time of
1915  * insertion, so if 'row' is a new row (one that is being added as part of the
1916  * current transaction, supposing that a transaction is in progress) then even
1917  * its "immutable" fields are actually mutable. */
1918 bool
1919 ovsdb_idl_is_mutable(const struct ovsdb_idl_row *row,
1920                      const struct ovsdb_idl_column *column)
1921 {
1922     return column->mutable || (row->new && !row->old);
1923 }
1924
1925 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
1926  * to all-zero-bits by some other entity.  If 'row' was set up some other way
1927  * then the return value is indeterminate. */
1928 bool
1929 ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
1930 {
1931     return row->table == NULL;
1932 }
1933 \f
1934 /* Transactions. */
1935
1936 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1937                                    enum ovsdb_idl_txn_status);
1938
1939 /* Returns a string representation of 'status'.  The caller must not modify or
1940  * free the returned string.
1941  *
1942  * The return value is probably useful only for debug log messages and unit
1943  * tests. */
1944 const char *
1945 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
1946 {
1947     switch (status) {
1948     case TXN_UNCOMMITTED:
1949         return "uncommitted";
1950     case TXN_UNCHANGED:
1951         return "unchanged";
1952     case TXN_INCOMPLETE:
1953         return "incomplete";
1954     case TXN_ABORTED:
1955         return "aborted";
1956     case TXN_SUCCESS:
1957         return "success";
1958     case TXN_TRY_AGAIN:
1959         return "try again";
1960     case TXN_NOT_LOCKED:
1961         return "not locked";
1962     case TXN_ERROR:
1963         return "error";
1964     }
1965     return "<unknown>";
1966 }
1967
1968 /* Starts a new transaction on 'idl'.  A given ovsdb_idl may only have a single
1969  * active transaction at a time.  See the large comment in ovsdb-idl.h for
1970  * general information on transactions. */
1971 struct ovsdb_idl_txn *
1972 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
1973 {
1974     struct ovsdb_idl_txn *txn;
1975
1976     ovs_assert(!idl->txn);
1977     idl->txn = txn = xmalloc(sizeof *txn);
1978     txn->request_id = NULL;
1979     txn->idl = idl;
1980     hmap_init(&txn->txn_rows);
1981     txn->status = TXN_UNCOMMITTED;
1982     txn->error = NULL;
1983     txn->dry_run = false;
1984     ds_init(&txn->comment);
1985
1986     txn->inc_table = NULL;
1987     txn->inc_column = NULL;
1988
1989     hmap_init(&txn->inserted_rows);
1990
1991     return txn;
1992 }
1993
1994 /* Appends 's', which is treated as a printf()-type format string, to the
1995  * comments that will be passed to the OVSDB server when 'txn' is committed.
1996  * (The comment will be committed to the OVSDB log, which "ovsdb-tool
1997  * show-log" can print in a relatively human-readable form.) */
1998 void
1999 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s, ...)
2000 {
2001     va_list args;
2002
2003     if (txn->comment.length) {
2004         ds_put_char(&txn->comment, '\n');
2005     }
2006
2007     va_start(args, s);
2008     ds_put_format_valist(&txn->comment, s, args);
2009     va_end(args);
2010 }
2011
2012 /* Marks 'txn' as a transaction that will not actually modify the database.  In
2013  * almost every way, the transaction is treated like other transactions.  It
2014  * must be committed or aborted like other transactions, it will be sent to the
2015  * database server like other transactions, and so on.  The only difference is
2016  * that the operations sent to the database server will include, as the last
2017  * step, an "abort" operation, so that any changes made by the transaction will
2018  * not actually take effect. */
2019 void
2020 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
2021 {
2022     txn->dry_run = true;
2023 }
2024
2025 /* Causes 'txn', when committed, to increment the value of 'column' within
2026  * 'row' by 1.  'column' must have an integer type.  After 'txn' commits
2027  * successfully, the client may retrieve the final (incremented) value of
2028  * 'column' with ovsdb_idl_txn_get_increment_new_value().
2029  *
2030  * The client could accomplish something similar with ovsdb_idl_read(),
2031  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
2032  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
2033  * will never (by itself) fail because of a verify error.
2034  *
2035  * The intended use is for incrementing the "next_cfg" column in the
2036  * Open_vSwitch table. */
2037 void
2038 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
2039                         const struct ovsdb_idl_row *row,
2040                         const struct ovsdb_idl_column *column)
2041 {
2042     ovs_assert(!txn->inc_table);
2043     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
2044     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
2045
2046     txn->inc_table = row->table->class->name;
2047     txn->inc_column = column->name;
2048     txn->inc_row = row->uuid;
2049 }
2050
2051 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
2052  * has been called for 'txn' but the commit is still incomplete (that is, the
2053  * last call returned TXN_INCOMPLETE) then the transaction may or may not still
2054  * end up committing at the database server, but the client will not be able to
2055  * get any further status information back. */
2056 void
2057 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
2058 {
2059     struct ovsdb_idl_txn_insert *insert, *next;
2060
2061     json_destroy(txn->request_id);
2062     if (txn->status == TXN_INCOMPLETE) {
2063         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2064     }
2065     ovsdb_idl_txn_abort(txn);
2066     ds_destroy(&txn->comment);
2067     free(txn->error);
2068     HMAP_FOR_EACH_SAFE (insert, next, hmap_node, &txn->inserted_rows) {
2069         free(insert);
2070     }
2071     hmap_destroy(&txn->inserted_rows);
2072     free(txn);
2073 }
2074
2075 /* Causes poll_block() to wake up if 'txn' has completed committing. */
2076 void
2077 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
2078 {
2079     if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
2080         poll_immediate_wake();
2081     }
2082 }
2083
2084 static struct json *
2085 where_uuid_equals(const struct uuid *uuid)
2086 {
2087     return
2088         json_array_create_1(
2089             json_array_create_3(
2090                 json_string_create("_uuid"),
2091                 json_string_create("=="),
2092                 json_array_create_2(
2093                     json_string_create("uuid"),
2094                     json_string_create_nocopy(
2095                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
2096 }
2097
2098 static char *
2099 uuid_name_from_uuid(const struct uuid *uuid)
2100 {
2101     char *name;
2102     char *p;
2103
2104     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
2105     for (p = name; *p != '\0'; p++) {
2106         if (*p == '-') {
2107             *p = '_';
2108         }
2109     }
2110
2111     return name;
2112 }
2113
2114 static const struct ovsdb_idl_row *
2115 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
2116 {
2117     const struct ovsdb_idl_row *row;
2118
2119     HMAP_FOR_EACH_WITH_HASH (row, txn_node, uuid_hash(uuid), &txn->txn_rows) {
2120         if (uuid_equals(&row->uuid, uuid)) {
2121             return row;
2122         }
2123     }
2124     return NULL;
2125 }
2126
2127 /* XXX there must be a cleaner way to do this */
2128 static struct json *
2129 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
2130 {
2131     if (json->type == JSON_ARRAY) {
2132         struct uuid uuid;
2133         size_t i;
2134
2135         if (json->u.array.n == 2
2136             && json->u.array.elems[0]->type == JSON_STRING
2137             && json->u.array.elems[1]->type == JSON_STRING
2138             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
2139             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
2140             const struct ovsdb_idl_row *row;
2141
2142             row = ovsdb_idl_txn_get_row(txn, &uuid);
2143             if (row && !row->old && row->new) {
2144                 json_destroy(json);
2145
2146                 return json_array_create_2(
2147                     json_string_create("named-uuid"),
2148                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
2149             }
2150         }
2151
2152         for (i = 0; i < json->u.array.n; i++) {
2153             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
2154                                                       txn);
2155         }
2156     } else if (json->type == JSON_OBJECT) {
2157         struct shash_node *node;
2158
2159         SHASH_FOR_EACH (node, json_object(json)) {
2160             node->data = substitute_uuids(node->data, txn);
2161         }
2162     }
2163     return json;
2164 }
2165
2166 static void
2167 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
2168 {
2169     struct ovsdb_idl_row *row, *next;
2170
2171     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
2172      * ovsdb_idl_column's 'parse' function, which will call
2173      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
2174      * transaction and fail to update the graph.  */
2175     txn->idl->txn = NULL;
2176
2177     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
2178         ovsdb_idl_destroy_all_map_op_lists(row);
2179         if (row->old) {
2180             if (row->written) {
2181                 ovsdb_idl_row_unparse(row);
2182                 ovsdb_idl_row_clear_arcs(row, false);
2183                 ovsdb_idl_row_parse(row);
2184             }
2185         } else {
2186             ovsdb_idl_row_unparse(row);
2187         }
2188         ovsdb_idl_row_clear_new(row);
2189
2190         free(row->prereqs);
2191         row->prereqs = NULL;
2192
2193         free(row->written);
2194         row->written = NULL;
2195
2196         hmap_remove(&txn->txn_rows, &row->txn_node);
2197         hmap_node_nullify(&row->txn_node);
2198         if (!row->old) {
2199             hmap_remove(&row->table->rows, &row->hmap_node);
2200             free(row);
2201         }
2202     }
2203     hmap_destroy(&txn->txn_rows);
2204     hmap_init(&txn->txn_rows);
2205 }
2206
2207 static bool
2208 ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
2209                                 struct json *mutations)
2210 {
2211     const struct ovsdb_idl_table_class *class = row->table->class;
2212     size_t idx;
2213     bool any_mutations = false;
2214
2215     BITMAP_FOR_EACH_1(idx, class->n_columns, row->map_op_written) {
2216         struct map_op_list *map_op_list;
2217         const struct ovsdb_idl_column *column;
2218         const struct ovsdb_datum *old_datum;
2219         enum ovsdb_atomic_type key_type, value_type;
2220         struct json *mutation, *map, *col_name, *mutator;
2221         struct json *del_set, *ins_map;
2222         bool any_del, any_ins;
2223
2224         map_op_list = row->map_op_lists[idx];
2225         column = &class->columns[idx];
2226         key_type = column->type.key.type;
2227         value_type = column->type.value.type;
2228         old_datum = ovsdb_idl_read(row, column);
2229
2230         del_set = json_array_create_empty();
2231         ins_map = json_array_create_empty();
2232         any_del = false;
2233         any_ins = false;
2234
2235         for (struct map_op *map_op = map_op_list_first(map_op_list); map_op;
2236              map_op = map_op_list_next(map_op_list, map_op)) {
2237
2238             if (map_op_type(map_op) == MAP_OP_UPDATE) {
2239                 /* Find out if value really changed. */
2240                 struct ovsdb_datum *new_datum;
2241                 unsigned int pos;
2242                 new_datum = map_op_datum(map_op);
2243                 pos = ovsdb_datum_find_key(old_datum,
2244                                            &new_datum->keys[0],
2245                                            key_type);
2246                 if (ovsdb_atom_equals(&new_datum->values[0],
2247                                       &old_datum->values[pos],
2248                                       value_type)) {
2249                     /* No change in value. Move on to next update. */
2250                     continue;
2251                 }
2252             } else if (map_op_type(map_op) == MAP_OP_DELETE){
2253                 /* Verify that there is a key to delete. */
2254                 unsigned int pos;
2255                 pos = ovsdb_datum_find_key(old_datum,
2256                                            &map_op_datum(map_op)->keys[0],
2257                                            key_type);
2258                 if (pos == UINT_MAX) {
2259                     /* No key to delete.  Move on to next update. */
2260                     VLOG_WARN("Trying to delete a key that doesn't "
2261                               "exist in the map.");
2262                     continue;
2263                 }
2264             }
2265
2266             if (map_op_type(map_op) == MAP_OP_INSERT) {
2267                 map = json_array_create_2(
2268                     ovsdb_atom_to_json(&map_op_datum(map_op)->keys[0],
2269                                        key_type),
2270                     ovsdb_atom_to_json(&map_op_datum(map_op)->values[0],
2271                                        value_type));
2272                 json_array_add(ins_map, map);
2273                 any_ins = true;
2274             } else { /* MAP_OP_UPDATE or MAP_OP_DELETE */
2275                 map = ovsdb_atom_to_json(&map_op_datum(map_op)->keys[0],
2276                                          key_type);
2277                 json_array_add(del_set, map);
2278                 any_del = true;
2279             }
2280
2281             /* Generate an additional insert mutate for updates. */
2282             if (map_op_type(map_op) == MAP_OP_UPDATE) {
2283                 map = json_array_create_2(
2284                     ovsdb_atom_to_json(&map_op_datum(map_op)->keys[0],
2285                                        key_type),
2286                     ovsdb_atom_to_json(&map_op_datum(map_op)->values[0],
2287                                        value_type));
2288                 json_array_add(ins_map, map);
2289                 any_ins = true;
2290             }
2291         }
2292
2293         if (any_del) {
2294             col_name = json_string_create(column->name);
2295             mutator = json_string_create("delete");
2296             map = json_array_create_2(json_string_create("set"), del_set);
2297             mutation = json_array_create_3(col_name, mutator, map);
2298             json_array_add(mutations, mutation);
2299             any_mutations = true;
2300         } else {
2301             json_destroy(del_set);
2302         }
2303         if (any_ins) {
2304             col_name = json_string_create(column->name);
2305             mutator = json_string_create("insert");
2306             map = json_array_create_2(json_string_create("map"), ins_map);
2307             mutation = json_array_create_3(col_name, mutator, map);
2308             json_array_add(mutations, mutation);
2309             any_mutations = true;
2310         } else {
2311             json_destroy(ins_map);
2312         }
2313     }
2314     return any_mutations;
2315 }
2316
2317 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
2318  * of the following TXN_* constants:
2319  *
2320  *   TXN_INCOMPLETE:
2321  *
2322  *       The transaction is in progress, but not yet complete.  The caller
2323  *       should call again later, after calling ovsdb_idl_run() to let the IDL
2324  *       do OVSDB protocol processing.
2325  *
2326  *   TXN_UNCHANGED:
2327  *
2328  *       The transaction is complete.  (It didn't actually change the database,
2329  *       so the IDL didn't send any request to the database server.)
2330  *
2331  *   TXN_ABORTED:
2332  *
2333  *       The caller previously called ovsdb_idl_txn_abort().
2334  *
2335  *   TXN_SUCCESS:
2336  *
2337  *       The transaction was successful.  The update made by the transaction
2338  *       (and possibly other changes made by other database clients) should
2339  *       already be visible in the IDL.
2340  *
2341  *   TXN_TRY_AGAIN:
2342  *
2343  *       The transaction failed for some transient reason, e.g. because a
2344  *       "verify" operation reported an inconsistency or due to a network
2345  *       problem.  The caller should wait for a change to the database, then
2346  *       compose a new transaction, and commit the new transaction.
2347  *
2348  *       Use the return value of ovsdb_idl_get_seqno() to wait for a change in
2349  *       the database.  It is important to use its return value *before* the
2350  *       initial call to ovsdb_idl_txn_commit() as the baseline for this
2351  *       purpose, because the change that one should wait for can happen after
2352  *       the initial call but before the call that returns TXN_TRY_AGAIN, and
2353  *       using some other baseline value in that situation could cause an
2354  *       indefinite wait if the database rarely changes.
2355  *
2356  *   TXN_NOT_LOCKED:
2357  *
2358  *       The transaction failed because the IDL has been configured to require
2359  *       a database lock (with ovsdb_idl_set_lock()) but didn't get it yet or
2360  *       has already lost it.
2361  *
2362  * Committing a transaction rolls back all of the changes that it made to the
2363  * IDL's copy of the database.  If the transaction commits successfully, then
2364  * the database server will send an update and, thus, the IDL will be updated
2365  * with the committed changes. */
2366 enum ovsdb_idl_txn_status
2367 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
2368 {
2369     struct ovsdb_idl_row *row;
2370     struct json *operations;
2371     bool any_updates;
2372
2373     if (txn != txn->idl->txn) {
2374         goto coverage_out;
2375     }
2376
2377     /* If we need a lock but don't have it, give up quickly. */
2378     if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
2379         txn->status = TXN_NOT_LOCKED;
2380         goto disassemble_out;
2381     }
2382
2383     operations = json_array_create_1(
2384         json_string_create(txn->idl->class->database));
2385
2386     /* Assert that we have the required lock (avoiding a race). */
2387     if (txn->idl->lock_name) {
2388         struct json *op = json_object_create();
2389         json_array_add(operations, op);
2390         json_object_put_string(op, "op", "assert");
2391         json_object_put_string(op, "lock", txn->idl->lock_name);
2392     }
2393
2394     /* Add prerequisites and declarations of new rows. */
2395     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2396         /* XXX check that deleted rows exist even if no prereqs? */
2397         if (row->prereqs) {
2398             const struct ovsdb_idl_table_class *class = row->table->class;
2399             size_t n_columns = class->n_columns;
2400             struct json *op, *columns, *row_json;
2401             size_t idx;
2402
2403             op = json_object_create();
2404             json_array_add(operations, op);
2405             json_object_put_string(op, "op", "wait");
2406             json_object_put_string(op, "table", class->name);
2407             json_object_put(op, "timeout", json_integer_create(0));
2408             json_object_put(op, "where", where_uuid_equals(&row->uuid));
2409             json_object_put_string(op, "until", "==");
2410             columns = json_array_create_empty();
2411             json_object_put(op, "columns", columns);
2412             row_json = json_object_create();
2413             json_object_put(op, "rows", json_array_create_1(row_json));
2414
2415             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
2416                 const struct ovsdb_idl_column *column = &class->columns[idx];
2417                 json_array_add(columns, json_string_create(column->name));
2418                 json_object_put(row_json, column->name,
2419                                 ovsdb_datum_to_json(&row->old[idx],
2420                                                     &column->type));
2421             }
2422         }
2423     }
2424
2425     /* Add updates. */
2426     any_updates = false;
2427     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2428         const struct ovsdb_idl_table_class *class = row->table->class;
2429
2430         if (!row->new) {
2431             if (class->is_root) {
2432                 struct json *op = json_object_create();
2433                 json_object_put_string(op, "op", "delete");
2434                 json_object_put_string(op, "table", class->name);
2435                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2436                 json_array_add(operations, op);
2437                 any_updates = true;
2438             } else {
2439                 /* Let ovsdb-server decide whether to really delete it. */
2440             }
2441         } else if (row->old != row->new) {
2442             struct json *row_json;
2443             struct json *op;
2444             size_t idx;
2445
2446             op = json_object_create();
2447             json_object_put_string(op, "op", row->old ? "update" : "insert");
2448             json_object_put_string(op, "table", class->name);
2449             if (row->old) {
2450                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2451             } else {
2452                 struct ovsdb_idl_txn_insert *insert;
2453
2454                 any_updates = true;
2455
2456                 json_object_put(op, "uuid-name",
2457                                 json_string_create_nocopy(
2458                                     uuid_name_from_uuid(&row->uuid)));
2459
2460                 insert = xmalloc(sizeof *insert);
2461                 insert->dummy = row->uuid;
2462                 insert->op_index = operations->u.array.n - 1;
2463                 uuid_zero(&insert->real);
2464                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
2465                             uuid_hash(&insert->dummy));
2466             }
2467             row_json = json_object_create();
2468             json_object_put(op, "row", row_json);
2469
2470             if (row->written) {
2471                 BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
2472                     const struct ovsdb_idl_column *column =
2473                                                         &class->columns[idx];
2474
2475                     if (row->old
2476                         || !ovsdb_datum_is_default(&row->new[idx],
2477                                                   &column->type)) {
2478                         json_object_put(row_json, column->name,
2479                                         substitute_uuids(
2480                                             ovsdb_datum_to_json(&row->new[idx],
2481                                                                 &column->type),
2482                                             txn));
2483
2484                         /* If anything really changed, consider it an update.
2485                          * We can't suppress not-really-changed values earlier
2486                          * or transactions would become nonatomic (see the big
2487                          * comment inside ovsdb_idl_txn_write()). */
2488                         if (!any_updates && row->old &&
2489                             !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
2490                                                 &column->type)) {
2491                             any_updates = true;
2492                         }
2493                     }
2494                 }
2495             }
2496
2497             if (!row->old || !shash_is_empty(json_object(row_json))) {
2498                 json_array_add(operations, op);
2499             } else {
2500                 json_destroy(op);
2501             }
2502         }
2503
2504         /* Add mutate operation, for partial map updates. */
2505         if (row->map_op_written) {
2506             struct json *op, *mutations;
2507             bool any_mutations;
2508
2509             op = json_object_create();
2510             json_object_put_string(op, "op", "mutate");
2511             json_object_put_string(op, "table", class->name);
2512             json_object_put(op, "where", where_uuid_equals(&row->uuid));
2513             mutations = json_array_create_empty();
2514             any_mutations = ovsdb_idl_txn_extract_mutations(row, mutations);
2515             json_object_put(op, "mutations", mutations);
2516
2517             if (any_mutations) {
2518                 op = substitute_uuids(op, txn);
2519                 json_array_add(operations, op);
2520                 any_updates = true;
2521             } else {
2522                 json_destroy(op);
2523             }
2524         }
2525     }
2526
2527     /* Add increment. */
2528     if (txn->inc_table && any_updates) {
2529         struct json *op;
2530
2531         txn->inc_index = operations->u.array.n - 1;
2532
2533         op = json_object_create();
2534         json_object_put_string(op, "op", "mutate");
2535         json_object_put_string(op, "table", txn->inc_table);
2536         json_object_put(op, "where",
2537                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2538                                          txn));
2539         json_object_put(op, "mutations",
2540                         json_array_create_1(
2541                             json_array_create_3(
2542                                 json_string_create(txn->inc_column),
2543                                 json_string_create("+="),
2544                                 json_integer_create(1))));
2545         json_array_add(operations, op);
2546
2547         op = json_object_create();
2548         json_object_put_string(op, "op", "select");
2549         json_object_put_string(op, "table", txn->inc_table);
2550         json_object_put(op, "where",
2551                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2552                                          txn));
2553         json_object_put(op, "columns",
2554                         json_array_create_1(json_string_create(
2555                                                 txn->inc_column)));
2556         json_array_add(operations, op);
2557     }
2558
2559     if (txn->comment.length) {
2560         struct json *op = json_object_create();
2561         json_object_put_string(op, "op", "comment");
2562         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
2563         json_array_add(operations, op);
2564     }
2565
2566     if (txn->dry_run) {
2567         struct json *op = json_object_create();
2568         json_object_put_string(op, "op", "abort");
2569         json_array_add(operations, op);
2570     }
2571
2572     if (!any_updates) {
2573         txn->status = TXN_UNCHANGED;
2574         json_destroy(operations);
2575     } else if (!jsonrpc_session_send(
2576                    txn->idl->session,
2577                    jsonrpc_create_request(
2578                        "transact", operations, &txn->request_id))) {
2579         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
2580                     json_hash(txn->request_id, 0));
2581         txn->status = TXN_INCOMPLETE;
2582     } else {
2583         txn->status = TXN_TRY_AGAIN;
2584     }
2585
2586 disassemble_out:
2587     ovsdb_idl_txn_disassemble(txn);
2588 coverage_out:
2589     switch (txn->status) {
2590     case TXN_UNCOMMITTED:   COVERAGE_INC(txn_uncommitted);    break;
2591     case TXN_UNCHANGED:     COVERAGE_INC(txn_unchanged);      break;
2592     case TXN_INCOMPLETE:    COVERAGE_INC(txn_incomplete);     break;
2593     case TXN_ABORTED:       COVERAGE_INC(txn_aborted);        break;
2594     case TXN_SUCCESS:       COVERAGE_INC(txn_success);        break;
2595     case TXN_TRY_AGAIN:     COVERAGE_INC(txn_try_again);      break;
2596     case TXN_NOT_LOCKED:    COVERAGE_INC(txn_not_locked);     break;
2597     case TXN_ERROR:         COVERAGE_INC(txn_error);          break;
2598     }
2599
2600     return txn->status;
2601 }
2602
2603 /* Attempts to commit 'txn', blocking until the commit either succeeds or
2604  * fails.  Returns the final commit status, which may be any TXN_* value other
2605  * than TXN_INCOMPLETE.
2606  *
2607  * This function calls ovsdb_idl_run() on 'txn''s IDL, so it may cause the
2608  * return value of ovsdb_idl_get_seqno() to change. */
2609 enum ovsdb_idl_txn_status
2610 ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
2611 {
2612     enum ovsdb_idl_txn_status status;
2613
2614     fatal_signal_run();
2615     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
2616         ovsdb_idl_run(txn->idl);
2617         ovsdb_idl_wait(txn->idl);
2618         ovsdb_idl_txn_wait(txn);
2619         poll_block();
2620     }
2621     return status;
2622 }
2623
2624 /* Returns the final (incremented) value of the column in 'txn' that was set to
2625  * be incremented by ovsdb_idl_txn_increment().  'txn' must have committed
2626  * successfully. */
2627 int64_t
2628 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
2629 {
2630     ovs_assert(txn->status == TXN_SUCCESS);
2631     return txn->inc_new_value;
2632 }
2633
2634 /* Aborts 'txn' without sending it to the database server.  This is effective
2635  * only if ovsdb_idl_txn_commit() has not yet been called for 'txn'.
2636  * Otherwise, it has no effect.
2637  *
2638  * Aborting a transaction doesn't free its memory.  Use
2639  * ovsdb_idl_txn_destroy() to do that. */
2640 void
2641 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
2642 {
2643     ovsdb_idl_txn_disassemble(txn);
2644     if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
2645         txn->status = TXN_ABORTED;
2646     }
2647 }
2648
2649 /* Returns a string that reports the error status for 'txn'.  The caller must
2650  * not modify or free the returned string.  A call to ovsdb_idl_txn_destroy()
2651  * for 'txn' may free the returned string.
2652  *
2653  * The return value is ordinarily one of the strings that
2654  * ovsdb_idl_txn_status_to_string() would return, but if the transaction failed
2655  * due to an error reported by the database server, the return value is that
2656  * error. */
2657 const char *
2658 ovsdb_idl_txn_get_error(const struct ovsdb_idl_txn *txn)
2659 {
2660     if (txn->status != TXN_ERROR) {
2661         return ovsdb_idl_txn_status_to_string(txn->status);
2662     } else if (txn->error) {
2663         return txn->error;
2664     } else {
2665         return "no error details available";
2666     }
2667 }
2668
2669 static void
2670 ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn,
2671                              const struct json *json)
2672 {
2673     if (txn->error == NULL) {
2674         txn->error = json_to_string(json, JSSF_SORT);
2675     }
2676 }
2677
2678 /* For transaction 'txn' that completed successfully, finds and returns the
2679  * permanent UUID that the database assigned to a newly inserted row, given the
2680  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
2681  *
2682  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
2683  * if it was assigned by that function and then deleted by
2684  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
2685  * and then deleted within a single transaction are never sent to the database
2686  * server, so it never assigns them a permanent UUID.) */
2687 const struct uuid *
2688 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
2689                               const struct uuid *uuid)
2690 {
2691     const struct ovsdb_idl_txn_insert *insert;
2692
2693     ovs_assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
2694     HMAP_FOR_EACH_IN_BUCKET (insert, hmap_node,
2695                              uuid_hash(uuid), &txn->inserted_rows) {
2696         if (uuid_equals(uuid, &insert->dummy)) {
2697             return &insert->real;
2698         }
2699     }
2700     return NULL;
2701 }
2702
2703 static void
2704 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
2705                        enum ovsdb_idl_txn_status status)
2706 {
2707     txn->status = status;
2708     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2709 }
2710
2711 /* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
2712  * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
2713  * ovs-vswitchd).
2714  *
2715  * 'datum' must have the correct type for its column.  The IDL does not check
2716  * that it meets schema constraints, but ovsdb-server will do so at commit time
2717  * so it had better be correct.
2718  *
2719  * A transaction must be in progress.  Replication of 'column' must not have
2720  * been disabled (by calling ovsdb_idl_omit()).
2721  *
2722  * Usually this function is used indirectly through one of the "set" functions
2723  * generated by ovsdb-idlc.
2724  *
2725  * Takes ownership of what 'datum' points to (and in some cases destroys that
2726  * data before returning) but makes a copy of 'datum' itself.  (Commonly
2727  * 'datum' is on the caller's stack.) */
2728 static void
2729 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
2730                       const struct ovsdb_idl_column *column,
2731                       struct ovsdb_datum *datum, bool owns_datum)
2732 {
2733     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2734     const struct ovsdb_idl_table_class *class;
2735     size_t column_idx;
2736     bool write_only;
2737
2738     if (ovsdb_idl_row_is_synthetic(row)) {
2739         goto discard_datum;
2740     }
2741
2742     class = row->table->class;
2743     column_idx = column - class->columns;
2744     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
2745
2746     ovs_assert(row->new != NULL);
2747     ovs_assert(column_idx < class->n_columns);
2748     ovs_assert(row->old == NULL ||
2749                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2750
2751     if (row->table->idl->verify_write_only && !write_only) {
2752         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
2753                  " explicitly configured not to.", class->name, column->name);
2754         goto discard_datum;
2755     }
2756
2757     /* If this is a write-only column and the datum being written is the same
2758      * as the one already there, just skip the update entirely.  This is worth
2759      * optimizing because we have a lot of columns that get periodically
2760      * refreshed into the database but don't actually change that often.
2761      *
2762      * We don't do this for read/write columns because that would break
2763      * atomicity of transactions--some other client might have written a
2764      * different value in that column since we read it.  (But if a whole
2765      * transaction only does writes of existing values, without making any real
2766      * changes, we will drop the whole transaction later in
2767      * ovsdb_idl_txn_commit().) */
2768     if (write_only && ovsdb_datum_equals(ovsdb_idl_read(row, column),
2769                                          datum, &column->type)) {
2770         goto discard_datum;
2771     }
2772
2773     if (hmap_node_is_null(&row->txn_node)) {
2774         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2775                     uuid_hash(&row->uuid));
2776     }
2777     if (row->old == row->new) {
2778         row->new = xmalloc(class->n_columns * sizeof *row->new);
2779     }
2780     if (!row->written) {
2781         row->written = bitmap_allocate(class->n_columns);
2782     }
2783     if (bitmap_is_set(row->written, column_idx)) {
2784         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
2785     } else {
2786         bitmap_set1(row->written, column_idx);
2787     }
2788     if (owns_datum) {
2789         row->new[column_idx] = *datum;
2790     } else {
2791         ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
2792     }
2793     (column->unparse)(row);
2794     (column->parse)(row, &row->new[column_idx]);
2795     return;
2796
2797 discard_datum:
2798     if (owns_datum) {
2799         ovsdb_datum_destroy(datum, &column->type);
2800     }
2801 }
2802
2803 void
2804 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
2805                     const struct ovsdb_idl_column *column,
2806                     struct ovsdb_datum *datum)
2807 {
2808     ovsdb_idl_txn_write__(row, column, datum, true);
2809 }
2810
2811 void
2812 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
2813                           const struct ovsdb_idl_column *column,
2814                           const struct ovsdb_datum *datum)
2815 {
2816     ovsdb_idl_txn_write__(row, column,
2817                           CONST_CAST(struct ovsdb_datum *, datum), false);
2818 }
2819
2820 /* Causes the original contents of 'column' in 'row_' to be verified as a
2821  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
2822  * changed (or if 'row_' was deleted) between the time that the IDL originally
2823  * read its contents and the time that the transaction commits, then the
2824  * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
2825  * TXN_AGAIN_NOW (depending on whether the database change has already been
2826  * received).
2827  *
2828  * The intention is that, to ensure that no transaction commits based on dirty
2829  * reads, an application should call ovsdb_idl_txn_verify() on each data item
2830  * read as part of a read-modify-write operation.
2831  *
2832  * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
2833  * value of 'column' is already known:
2834  *
2835  *   - If 'row_' is a row created by the current transaction (returned by
2836  *     ovsdb_idl_txn_insert()).
2837  *
2838  *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
2839  *     within the current transaction.
2840  *
2841  * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
2842  * ovsdb_idl_txn_write() for a given read-modify-write.
2843  *
2844  * A transaction must be in progress.
2845  *
2846  * Usually this function is used indirectly through one of the "verify"
2847  * functions generated by ovsdb-idlc. */
2848 void
2849 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
2850                      const struct ovsdb_idl_column *column)
2851 {
2852     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2853     const struct ovsdb_idl_table_class *class;
2854     size_t column_idx;
2855
2856     if (ovsdb_idl_row_is_synthetic(row)) {
2857         return;
2858     }
2859
2860     class = row->table->class;
2861     column_idx = column - class->columns;
2862
2863     ovs_assert(row->new != NULL);
2864     ovs_assert(row->old == NULL ||
2865                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2866     if (!row->old
2867         || (row->written && bitmap_is_set(row->written, column_idx))) {
2868         return;
2869     }
2870
2871     if (hmap_node_is_null(&row->txn_node)) {
2872         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2873                     uuid_hash(&row->uuid));
2874     }
2875     if (!row->prereqs) {
2876         row->prereqs = bitmap_allocate(class->n_columns);
2877     }
2878     bitmap_set1(row->prereqs, column_idx);
2879 }
2880
2881 /* Deletes 'row_' from its table.  May free 'row_', so it must not be
2882  * accessed afterward.
2883  *
2884  * A transaction must be in progress.
2885  *
2886  * Usually this function is used indirectly through one of the "delete"
2887  * functions generated by ovsdb-idlc. */
2888 void
2889 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
2890 {
2891     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2892
2893     if (ovsdb_idl_row_is_synthetic(row)) {
2894         return;
2895     }
2896
2897     ovs_assert(row->new != NULL);
2898     if (!row->old) {
2899         ovsdb_idl_row_unparse(row);
2900         ovsdb_idl_row_clear_new(row);
2901         ovs_assert(!row->prereqs);
2902         hmap_remove(&row->table->rows, &row->hmap_node);
2903         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
2904         free(row);
2905         return;
2906     }
2907     if (hmap_node_is_null(&row->txn_node)) {
2908         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2909                     uuid_hash(&row->uuid));
2910     }
2911     ovsdb_idl_row_clear_new(row);
2912     row->new = NULL;
2913 }
2914
2915 /* Inserts and returns a new row in the table with the specified 'class' in the
2916  * database with open transaction 'txn'.
2917  *
2918  * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
2919  * randomly generated; otherwise 'uuid' should specify a randomly generated
2920  * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
2921  * 'txn' is committed, but the IDL will replace any uses of the provisional
2922  * UUID in the data to be to be committed by the UUID assigned by
2923  * ovsdb-server.
2924  *
2925  * Usually this function is used indirectly through one of the "insert"
2926  * functions generated by ovsdb-idlc. */
2927 const struct ovsdb_idl_row *
2928 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
2929                      const struct ovsdb_idl_table_class *class,
2930                      const struct uuid *uuid)
2931 {
2932     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
2933
2934     if (uuid) {
2935         ovs_assert(!ovsdb_idl_txn_get_row(txn, uuid));
2936         row->uuid = *uuid;
2937     } else {
2938         uuid_generate(&row->uuid);
2939     }
2940
2941     row->table = ovsdb_idl_table_from_class(txn->idl, class);
2942     row->new = xmalloc(class->n_columns * sizeof *row->new);
2943     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
2944     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
2945     return row;
2946 }
2947
2948 static void
2949 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
2950 {
2951     struct ovsdb_idl_txn *txn;
2952
2953     HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
2954         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
2955     }
2956 }
2957
2958 static struct ovsdb_idl_txn *
2959 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
2960 {
2961     struct ovsdb_idl_txn *txn;
2962
2963     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
2964                              json_hash(id, 0), &idl->outstanding_txns) {
2965         if (json_equal(id, txn->request_id)) {
2966             return txn;
2967         }
2968     }
2969     return NULL;
2970 }
2971
2972 static bool
2973 check_json_type(const struct json *json, enum json_type type, const char *name)
2974 {
2975     if (!json) {
2976         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
2977         return false;
2978     } else if (json->type != type) {
2979         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
2980                      name, json_type_to_string(json->type),
2981                      json_type_to_string(type));
2982         return false;
2983     } else {
2984         return true;
2985     }
2986 }
2987
2988 static bool
2989 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
2990                                 const struct json_array *results)
2991 {
2992     struct json *count, *rows, *row, *column;
2993     struct shash *mutate, *select;
2994
2995     if (txn->inc_index + 2 > results->n) {
2996         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
2997                      "for increment (has %"PRIuSIZE", needs %u)",
2998                      results->n, txn->inc_index + 2);
2999         return false;
3000     }
3001
3002     /* We know that this is a JSON object because the loop in
3003      * ovsdb_idl_txn_process_reply() checked. */
3004     mutate = json_object(results->elems[txn->inc_index]);
3005     count = shash_find_data(mutate, "count");
3006     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
3007         return false;
3008     }
3009     if (count->u.integer != 1) {
3010         VLOG_WARN_RL(&syntax_rl,
3011                      "\"mutate\" reply \"count\" is %lld instead of 1",
3012                      count->u.integer);
3013         return false;
3014     }
3015
3016     select = json_object(results->elems[txn->inc_index + 1]);
3017     rows = shash_find_data(select, "rows");
3018     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
3019         return false;
3020     }
3021     if (rows->u.array.n != 1) {
3022         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %"PRIuSIZE" elements "
3023                      "instead of 1",
3024                      rows->u.array.n);
3025         return false;
3026     }
3027     row = rows->u.array.elems[0];
3028     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
3029         return false;
3030     }
3031     column = shash_find_data(json_object(row), txn->inc_column);
3032     if (!check_json_type(column, JSON_INTEGER,
3033                          "\"select\" reply inc column")) {
3034         return false;
3035     }
3036     txn->inc_new_value = column->u.integer;
3037     return true;
3038 }
3039
3040 static bool
3041 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
3042                                    const struct json_array *results)
3043 {
3044     static const struct ovsdb_base_type uuid_type = OVSDB_BASE_UUID_INIT;
3045     struct ovsdb_error *error;
3046     struct json *json_uuid;
3047     union ovsdb_atom uuid;
3048     struct shash *reply;
3049
3050     if (insert->op_index >= results->n) {
3051         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
3052                      "for insert (has %"PRIuSIZE", needs %u)",
3053                      results->n, insert->op_index);
3054         return false;
3055     }
3056
3057     /* We know that this is a JSON object because the loop in
3058      * ovsdb_idl_txn_process_reply() checked. */
3059     reply = json_object(results->elems[insert->op_index]);
3060     json_uuid = shash_find_data(reply, "uuid");
3061     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
3062         return false;
3063     }
3064
3065     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
3066     if (error) {
3067         char *s = ovsdb_error_to_string(error);
3068         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
3069                      "UUID: %s", s);
3070         free(s);
3071         ovsdb_error_destroy(error);
3072         return false;
3073     }
3074
3075     insert->real = uuid.uuid;
3076
3077     return true;
3078 }
3079
3080 static bool
3081 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
3082                             const struct jsonrpc_msg *msg)
3083 {
3084     struct ovsdb_idl_txn *txn;
3085     enum ovsdb_idl_txn_status status;
3086
3087     txn = ovsdb_idl_txn_find(idl, msg->id);
3088     if (!txn) {
3089         return false;
3090     }
3091
3092     if (msg->type == JSONRPC_ERROR) {
3093         status = TXN_ERROR;
3094     } else if (msg->result->type != JSON_ARRAY) {
3095         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
3096         status = TXN_ERROR;
3097     } else {
3098         struct json_array *ops = &msg->result->u.array;
3099         int hard_errors = 0;
3100         int soft_errors = 0;
3101         int lock_errors = 0;
3102         size_t i;
3103
3104         for (i = 0; i < ops->n; i++) {
3105             struct json *op = ops->elems[i];
3106
3107             if (op->type == JSON_NULL) {
3108                 /* This isn't an error in itself but indicates that some prior
3109                  * operation failed, so make sure that we know about it. */
3110                 soft_errors++;
3111             } else if (op->type == JSON_OBJECT) {
3112                 struct json *error;
3113
3114                 error = shash_find_data(json_object(op), "error");
3115                 if (error) {
3116                     if (error->type == JSON_STRING) {
3117                         if (!strcmp(error->u.string, "timed out")) {
3118                             soft_errors++;
3119                         } else if (!strcmp(error->u.string, "not owner")) {
3120                             lock_errors++;
3121                         } else if (strcmp(error->u.string, "aborted")) {
3122                             hard_errors++;
3123                             ovsdb_idl_txn_set_error_json(txn, op);
3124                         }
3125                     } else {
3126                         hard_errors++;
3127                         ovsdb_idl_txn_set_error_json(txn, op);
3128                         VLOG_WARN_RL(&syntax_rl,
3129                                      "\"error\" in reply is not JSON string");
3130                     }
3131                 }
3132             } else {
3133                 hard_errors++;
3134                 ovsdb_idl_txn_set_error_json(txn, op);
3135                 VLOG_WARN_RL(&syntax_rl,
3136                              "operation reply is not JSON null or object");
3137             }
3138         }
3139
3140         if (!soft_errors && !hard_errors && !lock_errors) {
3141             struct ovsdb_idl_txn_insert *insert;
3142
3143             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
3144                 hard_errors++;
3145             }
3146
3147             HMAP_FOR_EACH (insert, hmap_node, &txn->inserted_rows) {
3148                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
3149                     hard_errors++;
3150                 }
3151             }
3152         }
3153
3154         status = (hard_errors ? TXN_ERROR
3155                   : lock_errors ? TXN_NOT_LOCKED
3156                   : soft_errors ? TXN_TRY_AGAIN
3157                   : TXN_SUCCESS);
3158     }
3159
3160     ovsdb_idl_txn_complete(txn, status);
3161     return true;
3162 }
3163
3164 /* Returns the transaction currently active for 'row''s IDL.  A transaction
3165  * must currently be active. */
3166 struct ovsdb_idl_txn *
3167 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
3168 {
3169     struct ovsdb_idl_txn *txn = row->table->idl->txn;
3170     ovs_assert(txn != NULL);
3171     return txn;
3172 }
3173
3174 /* Returns the IDL on which 'txn' acts. */
3175 struct ovsdb_idl *
3176 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
3177 {
3178     return txn->idl;
3179 }
3180
3181 /* Blocks until 'idl' successfully connects to the remote database and
3182  * retrieves its contents. */
3183 void
3184 ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
3185 {
3186     while (1) {
3187         ovsdb_idl_run(idl);
3188         if (ovsdb_idl_has_ever_connected(idl)) {
3189             return;
3190         }
3191         ovsdb_idl_wait(idl);
3192         poll_block();
3193     }
3194 }
3195 \f
3196 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
3197  * the database server and to avoid modifying the database when the lock cannot
3198  * be acquired (that is, when another client has the same lock).
3199  *
3200  * If 'lock_name' is NULL, drops the locking requirement and releases the
3201  * lock. */
3202 void
3203 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
3204 {
3205     ovs_assert(!idl->txn);
3206     ovs_assert(hmap_is_empty(&idl->outstanding_txns));
3207
3208     if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
3209         /* Release previous lock. */
3210         ovsdb_idl_send_unlock_request(idl);
3211         free(idl->lock_name);
3212         idl->lock_name = NULL;
3213         idl->is_lock_contended = false;
3214     }
3215
3216     if (lock_name && !idl->lock_name) {
3217         /* Acquire new lock. */
3218         idl->lock_name = xstrdup(lock_name);
3219         ovsdb_idl_send_lock_request(idl);
3220     }
3221 }
3222
3223 /* Returns true if 'idl' is configured to obtain a lock and owns that lock.
3224  *
3225  * Locking and unlocking happens asynchronously from the database client's
3226  * point of view, so the information is only useful for optimization (e.g. if
3227  * the client doesn't have the lock then there's no point in trying to write to
3228  * the database). */
3229 bool
3230 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
3231 {
3232     return idl->has_lock;
3233 }
3234
3235 /* Returns true if 'idl' is configured to obtain a lock but the database server
3236  * has indicated that some other client already owns the requested lock. */
3237 bool
3238 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
3239 {
3240     return idl->is_lock_contended;
3241 }
3242
3243 static void
3244 ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
3245 {
3246     if (new_has_lock && !idl->has_lock) {
3247         if (idl->state == IDL_S_MONITORING ||
3248             idl->state == IDL_S_MONITORING2) {
3249             idl->change_seqno++;
3250         } else {
3251             /* We're setting up a session, so don't signal that the database
3252              * changed.  Finalizing the session will increment change_seqno
3253              * anyhow. */
3254         }
3255         idl->is_lock_contended = false;
3256     }
3257     idl->has_lock = new_has_lock;
3258 }
3259
3260 static void
3261 ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
3262                               struct json **idp)
3263 {
3264     ovsdb_idl_update_has_lock(idl, false);
3265
3266     json_destroy(idl->lock_request_id);
3267     idl->lock_request_id = NULL;
3268
3269     if (jsonrpc_session_is_connected(idl->session)) {
3270         struct json *params;
3271
3272         params = json_array_create_1(json_string_create(idl->lock_name));
3273         jsonrpc_session_send(idl->session,
3274                              jsonrpc_create_request(method, params, idp));
3275     }
3276 }
3277
3278 static void
3279 ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
3280 {
3281     ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
3282 }
3283
3284 static void
3285 ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
3286 {
3287     ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
3288 }
3289
3290 static void
3291 ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
3292 {
3293     bool got_lock;
3294
3295     json_destroy(idl->lock_request_id);
3296     idl->lock_request_id = NULL;
3297
3298     if (result->type == JSON_OBJECT) {
3299         const struct json *locked;
3300
3301         locked = shash_find_data(json_object(result), "locked");
3302         got_lock = locked && locked->type == JSON_TRUE;
3303     } else {
3304         got_lock = false;
3305     }
3306
3307     ovsdb_idl_update_has_lock(idl, got_lock);
3308     if (!got_lock) {
3309         idl->is_lock_contended = true;
3310     }
3311 }
3312
3313 static void
3314 ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
3315                             const struct json *params,
3316                             bool new_has_lock)
3317 {
3318     if (idl->lock_name
3319         && params->type == JSON_ARRAY
3320         && json_array(params)->n > 0
3321         && json_array(params)->elems[0]->type == JSON_STRING) {
3322         const char *lock_name = json_string(json_array(params)->elems[0]);
3323
3324         if (!strcmp(idl->lock_name, lock_name)) {
3325             ovsdb_idl_update_has_lock(idl, new_has_lock);
3326             if (!new_has_lock) {
3327                 idl->is_lock_contended = true;
3328             }
3329         }
3330     }
3331 }
3332
3333 /* Inserts a new Map Operation into current transaction. */
3334 static void
3335 ovsdb_idl_txn_add_map_op(struct ovsdb_idl_row *row,
3336                          const struct ovsdb_idl_column *column,
3337                          struct ovsdb_datum *datum,
3338                          enum map_op_type op_type)
3339 {
3340     const struct ovsdb_idl_table_class *class;
3341     size_t column_idx;
3342     struct map_op *map_op;
3343
3344     class = row->table->class;
3345     column_idx = column - class->columns;
3346
3347     /* Check if a map operation list exists for this column. */
3348     if (!row->map_op_written) {
3349         row->map_op_written = bitmap_allocate(class->n_columns);
3350         row->map_op_lists = xzalloc(class->n_columns *
3351                                     sizeof *row->map_op_lists);
3352     }
3353     if (!row->map_op_lists[column_idx]) {
3354         row->map_op_lists[column_idx] = map_op_list_create();
3355     }
3356
3357     /* Add a map operation to the corresponding list. */
3358     map_op = map_op_create(datum, op_type);
3359     bitmap_set1(row->map_op_written, column_idx);
3360     map_op_list_add(row->map_op_lists[column_idx], map_op, &column->type);
3361
3362     /* Add this row to transaction's list of rows. */
3363     if (hmap_node_is_null(&row->txn_node)) {
3364         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
3365                     uuid_hash(&row->uuid));
3366     }
3367 }
3368
3369 static bool
3370 is_valid_partial_update(const struct ovsdb_idl_row *row,
3371                         const struct ovsdb_idl_column *column,
3372                         struct ovsdb_datum *datum)
3373 {
3374     /* Verify that this column is being monitored. */
3375     unsigned int column_idx = column - row->table->class->columns;
3376     if (!(row->table->modes[column_idx] & OVSDB_IDL_MONITOR)) {
3377         VLOG_WARN("cannot partially update non-monitored column");
3378         return false;
3379     }
3380
3381     /* Verify that the update affects a single element. */
3382     if (datum->n != 1) {
3383         VLOG_WARN("invalid datum for partial update");
3384         return false;
3385     }
3386
3387     return true;
3388 }
3389
3390 /* Inserts the key-value specified in 'datum' into the map in 'column' in
3391  * 'row_'. If the key already exist in 'column', then it's value is updated
3392  * with the value in 'datum'. The key-value in 'datum' must be of the same type
3393  * as the keys-values in 'column'. This function takes ownership of 'datum'.
3394  *
3395  * Usually this function is used indirectly through one of the "update"
3396  * functions generated by vswitch-idl. */
3397 void
3398 ovsdb_idl_txn_write_partial_map(const struct ovsdb_idl_row *row_,
3399                                 const struct ovsdb_idl_column *column,
3400                                 struct ovsdb_datum *datum)
3401 {
3402     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
3403     enum ovsdb_atomic_type key_type;
3404     enum map_op_type op_type;
3405     unsigned int pos;
3406     const struct ovsdb_datum *old_datum;
3407
3408     if (!is_valid_partial_update(row, column, datum)) {
3409         ovsdb_datum_destroy(datum, &column->type);
3410         return;
3411     }
3412
3413     /* Find out if this is an insert or an update. */
3414     key_type = column->type.key.type;
3415     old_datum = ovsdb_idl_read(row, column);
3416     pos = ovsdb_datum_find_key(old_datum, &datum->keys[0], key_type);
3417     op_type = pos == UINT_MAX ? MAP_OP_INSERT : MAP_OP_UPDATE;
3418
3419     ovsdb_idl_txn_add_map_op(row, column, datum, op_type);
3420 }
3421
3422 /* Deletes the key specified in 'datum' from the map in 'column' in 'row_'.
3423  * The key in 'datum' must be of the same type as the keys in 'column'.
3424  * The value in 'datum' must be NULL. This function takes ownership of
3425  * 'datum'.
3426  *
3427  * Usually this function is used indirectly through one of the "update"
3428  * functions generated by vswitch-idl. */
3429 void
3430 ovsdb_idl_txn_delete_partial_map(const struct ovsdb_idl_row *row_,
3431                                  const struct ovsdb_idl_column *column,
3432                                  struct ovsdb_datum *datum)
3433 {
3434     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
3435
3436     if (!is_valid_partial_update(row, column, datum)) {
3437         struct ovsdb_type type_ = column->type;
3438         type_.value.type = OVSDB_TYPE_VOID;
3439         ovsdb_datum_destroy(datum, &type_);
3440         return;
3441     }
3442     ovsdb_idl_txn_add_map_op(row, column, datum, MAP_OP_DELETE);
3443 }
3444
3445 void
3446 ovsdb_idl_loop_destroy(struct ovsdb_idl_loop *loop)
3447 {
3448     if (loop) {
3449         ovsdb_idl_destroy(loop->idl);
3450     }
3451 }
3452
3453 struct ovsdb_idl_txn *
3454 ovsdb_idl_loop_run(struct ovsdb_idl_loop *loop)
3455 {
3456     ovsdb_idl_run(loop->idl);
3457     loop->open_txn = (loop->committing_txn
3458                       || ovsdb_idl_get_seqno(loop->idl) == loop->skip_seqno
3459                       ? NULL
3460                       : ovsdb_idl_txn_create(loop->idl));
3461     return loop->open_txn;
3462 }
3463
3464 void
3465 ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
3466 {
3467     if (loop->open_txn) {
3468         loop->committing_txn = loop->open_txn;
3469         loop->open_txn = NULL;
3470
3471         loop->precommit_seqno = ovsdb_idl_get_seqno(loop->idl);
3472     }
3473
3474     struct ovsdb_idl_txn *txn = loop->committing_txn;
3475     if (txn) {
3476         enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(txn);
3477         if (status != TXN_INCOMPLETE) {
3478             switch (status) {
3479             case TXN_TRY_AGAIN:
3480                 /* We want to re-evaluate the database when it's changed from
3481                  * the contents that it had when we started the commit.  (That
3482                  * might have already happened.) */
3483                 loop->skip_seqno = loop->precommit_seqno;
3484                 if (ovsdb_idl_get_seqno(loop->idl) != loop->skip_seqno) {
3485                     poll_immediate_wake();
3486                 }
3487                 break;
3488
3489             case TXN_SUCCESS:
3490                 /* If the database has already changed since we started the
3491                  * commit, re-evaluate it immediately to avoid missing a change
3492                  * for a while. */
3493                 if (ovsdb_idl_get_seqno(loop->idl) != loop->precommit_seqno) {
3494                     poll_immediate_wake();
3495                 }
3496                 break;
3497
3498             case TXN_UNCHANGED:
3499             case TXN_ABORTED:
3500             case TXN_NOT_LOCKED:
3501             case TXN_ERROR:
3502                 break;
3503
3504             case TXN_UNCOMMITTED:
3505             case TXN_INCOMPLETE:
3506                 OVS_NOT_REACHED();
3507
3508             }
3509             ovsdb_idl_txn_destroy(txn);
3510             loop->committing_txn = NULL;
3511         }
3512     }
3513
3514     ovsdb_idl_wait(loop->idl);
3515 }