ovn: Make it possible for CMS to detect when the OVN system is up-to-date.
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "coverage.h"
27 #include "openvswitch/dynamic-string.h"
28 #include "fatal-signal.h"
29 #include "openvswitch/json.h"
30 #include "jsonrpc.h"
31 #include "ovsdb/ovsdb.h"
32 #include "ovsdb/table.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-idl-provider.h"
36 #include "ovsdb-parser.h"
37 #include "poll-loop.h"
38 #include "openvswitch/shash.h"
39 #include "sset.h"
40 #include "util.h"
41 #include "openvswitch/vlog.h"
42
43 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
44
45 COVERAGE_DEFINE(txn_uncommitted);
46 COVERAGE_DEFINE(txn_unchanged);
47 COVERAGE_DEFINE(txn_incomplete);
48 COVERAGE_DEFINE(txn_aborted);
49 COVERAGE_DEFINE(txn_success);
50 COVERAGE_DEFINE(txn_try_again);
51 COVERAGE_DEFINE(txn_not_locked);
52 COVERAGE_DEFINE(txn_error);
53
54 /* An arc from one idl_row to another.  When row A contains a UUID that
55  * references row B, this is represented by an arc from A (the source) to B
56  * (the destination).
57  *
58  * Arcs from a row to itself are omitted, that is, src and dst are always
59  * different.
60  *
61  * Arcs are never duplicated, that is, even if there are multiple references
62  * from A to B, there is only a single arc from A to B.
63  *
64  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
65  * A.  Both an arc and its converse may both be present, if each row refers
66  * to the other circularly.
67  *
68  * The source and destination row may be in the same table or in different
69  * tables.
70  */
71 struct ovsdb_idl_arc {
72     struct ovs_list src_node;   /* In src->src_arcs list. */
73     struct ovs_list dst_node;   /* In dst->dst_arcs list. */
74     struct ovsdb_idl_row *src;  /* Source row. */
75     struct ovsdb_idl_row *dst;  /* Destination row. */
76 };
77
78 enum ovsdb_idl_state {
79     IDL_S_SCHEMA_REQUESTED,
80     IDL_S_MONITOR_REQUESTED,
81     IDL_S_MONITORING,
82     IDL_S_MONITOR_COND_REQUESTED,
83     IDL_S_MONITORING_COND,
84     IDL_S_NO_SCHEMA
85 };
86
87 struct ovsdb_idl {
88     const struct ovsdb_idl_class *class;
89     struct jsonrpc_session *session;
90     struct uuid uuid;
91     struct shash table_by_name;
92     struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
93     unsigned int change_seqno;
94     bool verify_write_only;
95
96     /* Session state. */
97     unsigned int state_seqno;
98     enum ovsdb_idl_state state;
99     struct json *request_id;
100     struct json *schema;
101
102     /* Database locking. */
103     char *lock_name;            /* Name of lock we need, NULL if none. */
104     bool has_lock;              /* Has db server told us we have the lock? */
105     bool is_lock_contended;     /* Has db server told us we can't get lock? */
106     struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
107
108     /* Transaction support. */
109     struct ovsdb_idl_txn *txn;
110     struct hmap outstanding_txns;
111     bool cond_changed;
112 };
113
114 struct ovsdb_idl_txn {
115     struct hmap_node hmap_node;
116     struct json *request_id;
117     struct ovsdb_idl *idl;
118     struct hmap txn_rows;
119     enum ovsdb_idl_txn_status status;
120     char *error;
121     bool dry_run;
122     struct ds comment;
123
124     /* Increments. */
125     const char *inc_table;
126     const char *inc_column;
127     struct uuid inc_row;
128     unsigned int inc_index;
129     int64_t inc_new_value;
130
131     /* Inserted rows. */
132     struct hmap inserted_rows;  /* Contains "struct ovsdb_idl_txn_insert"s. */
133 };
134
135 struct ovsdb_idl_txn_insert {
136     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
137     struct uuid dummy;          /* Dummy UUID used locally. */
138     int op_index;               /* Index into transaction's operation array. */
139     struct uuid real;           /* Real UUID used by database server. */
140 };
141
142 enum ovsdb_update_version {
143     OVSDB_UPDATE,               /* RFC 7047 "update" method. */
144     OVSDB_UPDATE2               /* "update2" Extension to RFC 7047.
145                                    See ovsdb-server(1) for more information. */
146 };
147
148 /* Name arrays indexed by 'enum ovsdb_update_version'. */
149 static const char *table_updates_names[] = {"table_updates", "table_updates2"};
150 static const char *table_update_names[] = {"table_update", "table_update2"};
151 static const char *row_update_names[] = {"row_update", "row_update2"};
152
153 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
154 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
155
156 static void ovsdb_idl_clear(struct ovsdb_idl *);
157 static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
158 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
159 static void ovsdb_idl_send_monitor_cond_request(struct ovsdb_idl *);
160 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *,
161                                    enum ovsdb_update_version);
162 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
163                                                     const struct json *,
164                                                     enum ovsdb_update_version);
165 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
166                                      const struct uuid *,
167                                      const struct json *old,
168                                      const struct json *new);
169 static bool ovsdb_idl_process_update2(struct ovsdb_idl_table *,
170                                       const struct uuid *,
171                                       const char *operation,
172                                       const struct json *row);
173 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
174 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
175 static bool ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
176 static bool ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *,
177                                          const struct json *);
178
179 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
180 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
181     const struct ovsdb_idl_table_class *);
182 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
183                                                   const struct uuid *);
184 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
185 static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *);
186 static void ovsdb_idl_destroy_all_map_op_lists(struct ovsdb_idl_row *);
187
188 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
189 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
190 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
191 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
192 static void ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *, bool destroy_dsts);
193
194 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
195 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
196                                         const struct jsonrpc_msg *msg);
197 static bool ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *,
198                                             struct json *);
199 static void ovsdb_idl_txn_add_map_op(struct ovsdb_idl_row *,
200                                      const struct ovsdb_idl_column *,
201                                      struct ovsdb_datum *,
202                                      enum map_op_type);
203
204 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
205 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
206 static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
207                                        const struct json *);
208 static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
209                                         const struct json *params,
210                                         bool new_has_lock);
211 static struct ovsdb_idl_table *
212 ovsdb_idl_table_from_class(const struct ovsdb_idl *,
213                            const struct ovsdb_idl_table_class *);
214 static bool ovsdb_idl_track_is_set(struct ovsdb_idl_table *table);
215 static void ovsdb_idl_send_cond_change(struct ovsdb_idl *idl);
216 static void ovsdb_idl_condition_init(struct ovsdb_idl_condition *cnd,
217                                      const struct ovsdb_idl_table_class *tc);
218
219 /* Creates and returns a connection to database 'remote', which should be in a
220  * form acceptable to jsonrpc_session_open().  The connection will maintain an
221  * in-memory replica of the remote database whose schema is described by
222  * 'class'.  (Ordinarily 'class' is compiled from an OVSDB schema automatically
223  * by ovsdb-idlc.)
224  *
225  * Passes 'retry' to jsonrpc_session_open().  See that function for
226  * documentation.
227  *
228  * If 'monitor_everything_by_default' is true, then everything in the remote
229  * database will be replicated by default.  ovsdb_idl_omit() and
230  * ovsdb_idl_omit_alert() may be used to selectively drop some columns from
231  * monitoring.
232  *
233  * If 'monitor_everything_by_default' is false, then no columns or tables will
234  * be replicated by default.  ovsdb_idl_add_column() and ovsdb_idl_add_table()
235  * must be used to choose some columns or tables to replicate.
236  */
237 struct ovsdb_idl *
238 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
239                  bool monitor_everything_by_default, bool retry)
240 {
241     struct ovsdb_idl *idl;
242     uint8_t default_mode;
243     size_t i;
244
245     default_mode = (monitor_everything_by_default
246                     ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
247                     : 0);
248
249     idl = xzalloc(sizeof *idl);
250     idl->class = class;
251     idl->session = jsonrpc_session_open(remote, retry);
252     shash_init(&idl->table_by_name);
253     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
254     for (i = 0; i < class->n_tables; i++) {
255         const struct ovsdb_idl_table_class *tc = &class->tables[i];
256         struct ovsdb_idl_table *table = &idl->tables[i];
257         size_t j;
258
259         shash_add_assert(&idl->table_by_name, tc->name, table);
260         table->class = tc;
261         table->modes = xmalloc(tc->n_columns);
262         memset(table->modes, default_mode, tc->n_columns);
263         table->need_table = false;
264         shash_init(&table->columns);
265         for (j = 0; j < tc->n_columns; j++) {
266             const struct ovsdb_idl_column *column = &tc->columns[j];
267
268             shash_add_assert(&table->columns, column->name, column);
269         }
270         hmap_init(&table->rows);
271         ovs_list_init(&table->track_list);
272         table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
273             = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
274             = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
275         table->idl = idl;
276         ovsdb_idl_condition_init(&table->condition, tc);
277         table->cond_changed = false;
278     }
279
280     idl->cond_changed = false;
281     idl->state_seqno = UINT_MAX;
282     idl->request_id = NULL;
283     idl->schema = NULL;
284
285     hmap_init(&idl->outstanding_txns);
286     uuid_generate(&idl->uuid);
287
288     return idl;
289 }
290
291 /* Changes the remote and creates a new session. */
292 void
293 ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote,
294                      bool retry)
295 {
296     if (idl) {
297         ovs_assert(!idl->txn);
298         jsonrpc_session_close(idl->session);
299         idl->session = jsonrpc_session_open(remote, retry);
300         idl->state_seqno = UINT_MAX;
301     }
302 }
303
304 /* Destroys 'idl' and all of the data structures that it manages. */
305 void
306 ovsdb_idl_destroy(struct ovsdb_idl *idl)
307 {
308     if (idl) {
309         size_t i;
310
311         ovs_assert(!idl->txn);
312         ovsdb_idl_clear(idl);
313         jsonrpc_session_close(idl->session);
314
315         for (i = 0; i < idl->class->n_tables; i++) {
316             struct ovsdb_idl_table *table = &idl->tables[i];
317             shash_destroy(&table->columns);
318             hmap_destroy(&table->rows);
319             free(table->modes);
320         }
321         shash_destroy(&idl->table_by_name);
322         free(idl->tables);
323         json_destroy(idl->request_id);
324         free(idl->lock_name);
325         json_destroy(idl->lock_request_id);
326         json_destroy(idl->schema);
327         hmap_destroy(&idl->outstanding_txns);
328         free(idl);
329     }
330 }
331
332 static void
333 ovsdb_idl_clear(struct ovsdb_idl *idl)
334 {
335     bool changed = false;
336     size_t i;
337
338     for (i = 0; i < idl->class->n_tables; i++) {
339         struct ovsdb_idl_table *table = &idl->tables[i];
340         struct ovsdb_idl_row *row, *next_row;
341
342         if (hmap_is_empty(&table->rows)) {
343             continue;
344         }
345
346         changed = true;
347         HMAP_FOR_EACH_SAFE (row, next_row, hmap_node, &table->rows) {
348             struct ovsdb_idl_arc *arc, *next_arc;
349
350             if (!ovsdb_idl_row_is_orphan(row)) {
351                 ovsdb_idl_row_unparse(row);
352             }
353             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
354                 free(arc);
355             }
356             /* No need to do anything with dst_arcs: some node has those arcs
357              * as forward arcs and will destroy them itself. */
358
359             if (!ovs_list_is_empty(&row->track_node)) {
360                 ovs_list_remove(&row->track_node);
361             }
362
363             ovsdb_idl_row_destroy(row);
364         }
365     }
366
367     ovsdb_idl_track_clear(idl);
368
369     if (changed) {
370         idl->change_seqno++;
371     }
372 }
373
374 /* Processes a batch of messages from the database server on 'idl'.  This may
375  * cause the IDL's contents to change.  The client may check for that with
376  * ovsdb_idl_get_seqno(). */
377 void
378 ovsdb_idl_run(struct ovsdb_idl *idl)
379 {
380     int i;
381
382     ovs_assert(!idl->txn);
383
384     ovsdb_idl_send_cond_change(idl);
385
386     jsonrpc_session_run(idl->session);
387     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
388         struct jsonrpc_msg *msg;
389         unsigned int seqno;
390
391         seqno = jsonrpc_session_get_seqno(idl->session);
392         if (idl->state_seqno != seqno) {
393             idl->state_seqno = seqno;
394             json_destroy(idl->request_id);
395             idl->request_id = NULL;
396             ovsdb_idl_txn_abort_all(idl);
397
398             ovsdb_idl_send_schema_request(idl);
399             idl->state = IDL_S_SCHEMA_REQUESTED;
400             if (idl->lock_name) {
401                 ovsdb_idl_send_lock_request(idl);
402             }
403         }
404
405         msg = jsonrpc_session_recv(idl->session);
406         if (!msg) {
407             break;
408         }
409
410         if (msg->type == JSONRPC_NOTIFY
411             && !strcmp(msg->method, "update2")
412             && msg->params->type == JSON_ARRAY
413             && msg->params->u.array.n == 2
414             && msg->params->u.array.elems[0]->type == JSON_STRING) {
415             /* Database contents changed. */
416             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
417                                    OVSDB_UPDATE2);
418         } else if (msg->type == JSONRPC_REPLY
419                    && idl->request_id
420                    && json_equal(idl->request_id, msg->id)) {
421             json_destroy(idl->request_id);
422             idl->request_id = NULL;
423
424             switch (idl->state) {
425             case IDL_S_SCHEMA_REQUESTED:
426                 /* Reply to our "get_schema" request. */
427                 idl->schema = json_clone(msg->result);
428                 ovsdb_idl_send_monitor_cond_request(idl);
429                 idl->state = IDL_S_MONITOR_COND_REQUESTED;
430                 break;
431
432             case IDL_S_MONITOR_REQUESTED:
433             case IDL_S_MONITOR_COND_REQUESTED:
434                 /* Reply to our "monitor" or "monitor_cond" request. */
435                 idl->change_seqno++;
436                 ovsdb_idl_clear(idl);
437                 if (idl->state == IDL_S_MONITOR_REQUESTED) {
438                     idl->state = IDL_S_MONITORING;
439                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE);
440                 } else { /* IDL_S_MONITOR_COND_REQUESTED. */
441                     idl->state = IDL_S_MONITORING_COND;
442                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE2);
443                 }
444
445                 /* Schema is not useful after monitor request is accepted
446                  * by the server.  */
447                 json_destroy(idl->schema);
448                 idl->schema = NULL;
449                 break;
450
451             case IDL_S_MONITORING:
452             case IDL_S_MONITORING_COND:
453             case IDL_S_NO_SCHEMA:
454             default:
455                 OVS_NOT_REACHED();
456             }
457         } else if (msg->type == JSONRPC_NOTIFY
458                    && !strcmp(msg->method, "update")
459                    && msg->params->type == JSON_ARRAY
460                    && msg->params->u.array.n == 2
461                    && msg->params->u.array.elems[0]->type == JSON_STRING) {
462             /* Database contents changed. */
463             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
464                                    OVSDB_UPDATE);
465         } else if (msg->type == JSONRPC_REPLY
466                    && idl->lock_request_id
467                    && json_equal(idl->lock_request_id, msg->id)) {
468             /* Reply to our "lock" request. */
469             ovsdb_idl_parse_lock_reply(idl, msg->result);
470         } else if (msg->type == JSONRPC_NOTIFY
471                    && !strcmp(msg->method, "locked")) {
472             /* We got our lock. */
473             ovsdb_idl_parse_lock_notify(idl, msg->params, true);
474         } else if (msg->type == JSONRPC_NOTIFY
475                    && !strcmp(msg->method, "stolen")) {
476             /* Someone else stole our lock. */
477             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
478         } else if (msg->type == JSONRPC_ERROR
479                    && idl->state == IDL_S_MONITOR_COND_REQUESTED
480                    && idl->request_id
481                    && json_equal(idl->request_id, msg->id)) {
482             if (msg->error && !strcmp(json_string(msg->error),
483                                       "unknown method")) {
484                 /* Fall back to using "monitor" method.  */
485                 json_destroy(idl->request_id);
486                 idl->request_id = NULL;
487                 ovsdb_idl_send_monitor_request(idl);
488                 idl->state = IDL_S_MONITOR_REQUESTED;
489             }
490         } else if (msg->type == JSONRPC_ERROR
491                    && idl->state == IDL_S_SCHEMA_REQUESTED
492                    && idl->request_id
493                    && json_equal(idl->request_id, msg->id)) {
494                 json_destroy(idl->request_id);
495                 idl->request_id = NULL;
496                 VLOG_ERR("%s: requested schema not found",
497                          jsonrpc_session_get_name(idl->session));
498                 idl->state = IDL_S_NO_SCHEMA;
499         } else if ((msg->type == JSONRPC_ERROR
500                     || msg->type == JSONRPC_REPLY)
501                    && ovsdb_idl_txn_process_reply(idl, msg)) {
502             /* ovsdb_idl_txn_process_reply() did everything needful. */
503         } else {
504             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
505              * a transaction before we receive the reply, so keep the log level
506              * low. */
507             VLOG_DBG("%s: received unexpected %s message",
508                      jsonrpc_session_get_name(idl->session),
509                      jsonrpc_msg_type_to_string(msg->type));
510         }
511         jsonrpc_msg_destroy(msg);
512     }
513     ovsdb_idl_row_destroy_postprocess(idl);
514 }
515
516 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
517  * do or when activity occurs on a transaction on 'idl'. */
518 void
519 ovsdb_idl_wait(struct ovsdb_idl *idl)
520 {
521     jsonrpc_session_wait(idl->session);
522     jsonrpc_session_recv_wait(idl->session);
523 }
524
525 /* Returns a "sequence number" that represents the state of 'idl'.  When
526  * ovsdb_idl_run() changes the database, the sequence number changes.  The
527  * initial fetch of the entire contents of the remote database is considered to
528  * be one kind of change.  Successfully acquiring a lock, if one has been
529  * configured with ovsdb_idl_set_lock(), is also considered to be a change.
530  *
531  * As long as the sequence number does not change, the client may continue to
532  * use any data structures it obtains from 'idl'.  But when it changes, the
533  * client must not access any of these data structures again, because they
534  * could have freed or reused for other purposes.
535  *
536  * The sequence number can occasionally change even if the database does not.
537  * This happens if the connection to the database drops and reconnects, which
538  * causes the database contents to be reloaded even if they didn't change.  (It
539  * could also happen if the database server sends out a "change" that reflects
540  * what the IDL already thought was in the database.  The database server is
541  * not supposed to do that, but bugs could in theory cause it to do so.) */
542 unsigned int
543 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
544 {
545     return idl->change_seqno;
546 }
547
548 /* Returns true if 'idl' successfully connected to the remote database and
549  * retrieved its contents (even if the connection subsequently dropped and is
550  * in the process of reconnecting).  If so, then 'idl' contains an atomic
551  * snapshot of the database's contents (but it might be arbitrarily old if the
552  * connection dropped).
553  *
554  * Returns false if 'idl' has never connected or retrieved the database's
555  * contents.  If so, 'idl' is empty. */
556 bool
557 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
558 {
559     return ovsdb_idl_get_seqno(idl) != 0;
560 }
561
562 /* Reconfigures 'idl' so that it would reconnect to the database, if
563  * connection was dropped. */
564 void
565 ovsdb_idl_enable_reconnect(struct ovsdb_idl *idl)
566 {
567     jsonrpc_session_enable_reconnect(idl->session);
568 }
569
570 /* Forces 'idl' to drop its connection to the database and reconnect.  In the
571  * meantime, the contents of 'idl' will not change. */
572 void
573 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
574 {
575     jsonrpc_session_force_reconnect(idl->session);
576 }
577
578 /* Some IDL users should only write to write-only columns.  Furthermore,
579  * writing to a column which is not write-only can cause serious performance
580  * degradations for these users.  This function causes 'idl' to reject writes
581  * to columns which are not marked write only using ovsdb_idl_omit_alert(). */
582 void
583 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
584 {
585     idl->verify_write_only = true;
586 }
587
588 /* Returns true if 'idl' is currently connected or trying to connect
589  * and a negative response to a schema request has not been received */
590 bool
591 ovsdb_idl_is_alive(const struct ovsdb_idl *idl)
592 {
593     return jsonrpc_session_is_alive(idl->session) &&
594            idl->state != IDL_S_NO_SCHEMA;
595 }
596
597 /* Returns the last error reported on a connection by 'idl'.  The return value
598  * is 0 only if no connection made by 'idl' has ever encountered an error and
599  * a negative response to a schema request has never been received. See
600  * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value
601  * interpretation. */
602 int
603 ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
604 {
605     int err;
606
607     err = jsonrpc_session_get_last_error(idl->session);
608
609     if (err) {
610         return err;
611     } else if (idl->state == IDL_S_NO_SCHEMA) {
612         return ENOENT;
613     } else {
614         return 0;
615     }
616 }
617
618 /* Sets the "probe interval" for 'idl->session' to 'probe_interval', in
619  * milliseconds.
620  */
621 void
622 ovsdb_idl_set_probe_interval(const struct ovsdb_idl *idl, int probe_interval)
623 {
624     jsonrpc_session_set_probe_interval(idl->session, probe_interval);
625 }
626 \f
627 static unsigned char *
628 ovsdb_idl_get_mode(struct ovsdb_idl *idl,
629                    const struct ovsdb_idl_column *column)
630 {
631     size_t i;
632
633     ovs_assert(!idl->change_seqno);
634
635     for (i = 0; i < idl->class->n_tables; i++) {
636         const struct ovsdb_idl_table *table = &idl->tables[i];
637         const struct ovsdb_idl_table_class *tc = table->class;
638
639         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
640             return &table->modes[column - tc->columns];
641         }
642     }
643
644     OVS_NOT_REACHED();
645 }
646
647 static void
648 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
649 {
650     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
651         struct ovsdb_idl_table *table;
652
653         table = shash_find_data(&idl->table_by_name,
654                                 base->u.uuid.refTableName);
655         if (table) {
656             table->need_table = true;
657         } else {
658             VLOG_WARN("%s IDL class missing referenced table %s",
659                       idl->class->database, base->u.uuid.refTableName);
660         }
661     }
662 }
663
664 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
665  * ensures that any tables referenced by 'column' will be replicated, even if
666  * no columns in that table are selected for replication (see
667  * ovsdb_idl_add_table() for more information).
668  *
669  * This function is only useful if 'monitor_everything_by_default' was false in
670  * the call to ovsdb_idl_create().  This function should be called between
671  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
672  */
673 void
674 ovsdb_idl_add_column(struct ovsdb_idl *idl,
675                      const struct ovsdb_idl_column *column)
676 {
677     *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
678     add_ref_table(idl, &column->type.key);
679     add_ref_table(idl, &column->type.value);
680 }
681
682 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
683  * no columns are selected for replication. Just the necessary data for table
684  * references will be replicated (the UUID of the rows, for instance), any
685  * columns not selected for replication will remain unreplicated.
686  * This can be useful because it allows 'idl' to keep track of what rows in the
687  * table actually exist, which in turn allows columns that reference the table
688  * to have accurate contents. (The IDL presents the database with references to
689  * rows that do not exist removed.)
690  *
691  * This function is only useful if 'monitor_everything_by_default' was false in
692  * the call to ovsdb_idl_create().  This function should be called between
693  * ovsdb_idl_create() and the first call to ovsdb_idl_run().
694  */
695 void
696 ovsdb_idl_add_table(struct ovsdb_idl *idl,
697                     const struct ovsdb_idl_table_class *tc)
698 {
699     size_t i;
700
701     for (i = 0; i < idl->class->n_tables; i++) {
702         struct ovsdb_idl_table *table = &idl->tables[i];
703
704         if (table->class == tc) {
705             table->need_table = true;
706             return;
707         }
708     }
709
710     OVS_NOT_REACHED();
711 }
712
713 static struct json *
714 ovsdb_idl_clause_to_json(const struct ovsdb_idl_clause *clause)
715 {
716     if (clause->function != OVSDB_F_TRUE &&
717         clause->function != OVSDB_F_FALSE) {
718         const char *function = ovsdb_function_to_string(clause->function);
719
720         return json_array_create_3(json_string_create(clause->column->name),
721                                    json_string_create(function),
722                                    ovsdb_datum_to_json(&clause->arg,
723                                                        &clause->column->type));
724     }
725
726     return json_boolean_create(clause->function == OVSDB_F_TRUE ?
727                                true : false);
728 }
729
730 static void
731 ovsdb_idl_clause_free(struct ovsdb_idl_clause *clause)
732 {
733     if (clause->function != OVSDB_F_TRUE &&
734         clause->function != OVSDB_F_FALSE) {
735         ovsdb_datum_destroy(&clause->arg, &clause->column->type);
736     }
737
738     ovs_list_remove(&clause->node);
739     free(clause);
740 }
741
742 void
743 ovsdb_idl_condition_reset(struct ovsdb_idl *idl,
744                           const struct ovsdb_idl_table_class *tc)
745 {
746     struct ovsdb_idl_clause *c, *next;
747     struct ovsdb_idl_table *table = ovsdb_idl_table_from_class(idl, tc);
748
749     LIST_FOR_EACH_SAFE (c, next, node, &table->condition.clauses) {
750         ovs_list_remove(&c->node);
751         ovsdb_idl_clause_free(c);
752     }
753     idl->cond_changed = table->cond_changed = true;
754 }
755
756 static void
757 ovsdb_idl_condition_init(struct ovsdb_idl_condition *cnd,
758                          const struct ovsdb_idl_table_class *tc)
759 {
760     cnd->tc = tc;
761     ovs_list_init(&cnd->clauses);
762 }
763
764 void ovsdb_idl_condition_add_clause(struct ovsdb_idl *idl,
765                                     const struct ovsdb_idl_table_class *tc,
766                                     enum ovsdb_function function,
767                                     const struct ovsdb_idl_column *column,
768                                     struct ovsdb_datum *arg)
769 {
770     struct ovsdb_idl_table *table = ovsdb_idl_table_from_class(idl, tc);
771     struct ovsdb_idl_clause *clause = xzalloc(sizeof *clause);
772     const struct ovsdb_type *type = NULL;
773     struct ovsdb_idl_clause *c;
774
775     LIST_FOR_EACH(c, node, &table->condition.clauses) {
776         if (c->function == function &&
777             (!column || (c->column == column &&
778                          ovsdb_datum_equals(&c->arg,
779                                              arg, &column->type)))) {
780             return;
781         }
782     }
783
784     ovs_list_init(&clause->node);
785     clause->function = function;
786     clause->column = column;
787     if (column) {
788         type = &column->type;
789     } else {
790         type = &ovsdb_type_boolean;
791     }
792     ovsdb_datum_clone(&clause->arg, arg, type);
793     ovs_list_push_back(&table->condition.clauses, &clause->node);
794     idl->cond_changed = table->cond_changed = true;
795     poll_immediate_wake();
796 }
797
798 void ovsdb_idl_condition_remove_clause(struct ovsdb_idl *idl,
799                                        const struct ovsdb_idl_table_class *tc,
800                                        enum ovsdb_function function,
801                                        const struct ovsdb_idl_column *column,
802                                        struct ovsdb_datum *arg)
803 {
804     struct ovsdb_idl_clause *c, *next;
805     struct ovsdb_idl_table *table = ovsdb_idl_table_from_class(idl, tc);
806
807     LIST_FOR_EACH_SAFE(c, next, node, &table->condition.clauses) {
808         if (c->function == function &&
809             (!column || (c->column == column &&
810                          ovsdb_datum_equals(&c->arg,
811                                              arg, &column->type)))) {
812             ovsdb_idl_clause_free(c);
813             idl->cond_changed = table->cond_changed = true;
814             return;
815         }
816     }
817 }
818
819 static struct json *
820 ovsdb_idl_condition_to_json(const struct ovsdb_idl_condition *cnd)
821 {
822     struct json **clauses;
823     size_t i = 0, n_clauses = ovs_list_size(&cnd->clauses);
824     struct ovsdb_idl_clause *c;
825
826     clauses = xmalloc(n_clauses * sizeof *clauses);
827     LIST_FOR_EACH (c, node, &cnd->clauses) {
828            clauses[i++] = ovsdb_idl_clause_to_json(c);
829     }
830
831     return json_array_create(clauses, n_clauses);
832 }
833
834 static struct json*
835 ovsdb_idl_create_cond_change_req(struct ovsdb_idl_table *table)
836 {
837     const struct ovsdb_idl_condition *cond = &table->condition;
838     struct json *monitor_cond_change_request = json_object_create();
839     struct json *cond_json = ovsdb_idl_condition_to_json(cond);
840
841     json_object_put(monitor_cond_change_request, "where", cond_json);
842
843     return monitor_cond_change_request;
844 }
845
846 static void
847 ovsdb_idl_send_cond_change(struct ovsdb_idl *idl)
848 {
849     int i;
850     char uuid[UUID_LEN + 1];
851     struct json *params, *json_uuid;
852     struct jsonrpc_msg *request;
853
854     if (!idl->cond_changed || !jsonrpc_session_is_connected(idl->session) ||
855         idl->state != IDL_S_MONITORING_COND) {
856         return;
857     }
858
859     struct json *monitor_cond_change_requests = NULL;
860
861     for (i = 0; i < idl->class->n_tables; i++) {
862         struct ovsdb_idl_table *table = &idl->tables[i];
863
864         if (table->cond_changed) {
865             struct json *req = ovsdb_idl_create_cond_change_req(table);
866             if (req) {
867                 if (!monitor_cond_change_requests) {
868                     monitor_cond_change_requests = json_object_create();
869                 }
870                 json_object_put(monitor_cond_change_requests,
871                              table->class->name,
872                              json_array_create_1(req));
873             }
874             table->cond_changed = false;
875         }
876     }
877
878     /* Send request if not empty. */
879     if (monitor_cond_change_requests) {
880         snprintf(uuid, sizeof uuid, UUID_FMT,
881                  UUID_ARGS(&idl->uuid));
882         json_uuid = json_string_create(uuid);
883
884         /* Create a new uuid */
885         uuid_generate(&idl->uuid);
886         snprintf(uuid, sizeof uuid, UUID_FMT,
887                  UUID_ARGS(&idl->uuid));
888         params = json_array_create_3(json_uuid, json_string_create(uuid),
889                                      monitor_cond_change_requests);
890
891         request = jsonrpc_create_request("monitor_cond_change", params, NULL);
892         jsonrpc_session_send(idl->session, request);
893     }
894     idl->cond_changed = false;
895 }
896
897 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
898  *
899  * This function should be called between ovsdb_idl_create() and the first call
900  * to ovsdb_idl_run().
901  */
902 void
903 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
904                      const struct ovsdb_idl_column *column)
905 {
906     *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
907 }
908
909 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
910  * OVSDB_IDL_MONITOR for details.
911  *
912  * This function should be called between ovsdb_idl_create() and the first call
913  * to ovsdb_idl_run().
914  */
915 void
916 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
917 {
918     *ovsdb_idl_get_mode(idl, column) = 0;
919 }
920
921 /* Returns the most recent IDL change sequence number that caused a
922  * insert, modify or delete update to the table with class 'table_class'.
923  */
924 unsigned int
925 ovsdb_idl_table_get_seqno(const struct ovsdb_idl *idl,
926                           const struct ovsdb_idl_table_class *table_class)
927 {
928     struct ovsdb_idl_table *table
929         = ovsdb_idl_table_from_class(idl, table_class);
930     unsigned int max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_INSERT];
931
932     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]) {
933         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY];
934     }
935     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_DELETE]) {
936         max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_DELETE];
937     }
938     return max_seqno;
939 }
940
941 /* For each row that contains tracked columns, IDL stores the most
942  * recent IDL change sequence numbers associateed with insert, modify
943  * and delete updates to the table.
944  */
945 unsigned int
946 ovsdb_idl_row_get_seqno(const struct ovsdb_idl_row *row,
947                         enum ovsdb_idl_change change)
948 {
949     return row->change_seqno[change];
950 }
951
952 /* Turns on OVSDB_IDL_TRACK for 'column' in 'idl', ensuring that
953  * all rows whose 'column' is modified are traced. Similarly, insert
954  * or delete of rows having 'column' are tracked. Clients are able
955  * to retrive the tracked rows with the ovsdb_idl_track_get_*()
956  * functions.
957  *
958  * This function should be called between ovsdb_idl_create() and
959  * the first call to ovsdb_idl_run(). The column to be tracked
960  * should have OVSDB_IDL_ALERT turned on.
961  */
962 void
963 ovsdb_idl_track_add_column(struct ovsdb_idl *idl,
964                            const struct ovsdb_idl_column *column)
965 {
966     if (!(*ovsdb_idl_get_mode(idl, column) & OVSDB_IDL_ALERT)) {
967         ovsdb_idl_add_column(idl, column);
968     }
969     *ovsdb_idl_get_mode(idl, column) |= OVSDB_IDL_TRACK;
970 }
971
972 void
973 ovsdb_idl_track_add_all(struct ovsdb_idl *idl)
974 {
975     size_t i, j;
976
977     for (i = 0; i < idl->class->n_tables; i++) {
978         const struct ovsdb_idl_table_class *tc = &idl->class->tables[i];
979
980         for (j = 0; j < tc->n_columns; j++) {
981             const struct ovsdb_idl_column *column = &tc->columns[j];
982             ovsdb_idl_track_add_column(idl, column);
983         }
984     }
985 }
986
987 /* Returns true if 'table' has any tracked column. */
988 static bool
989 ovsdb_idl_track_is_set(struct ovsdb_idl_table *table)
990 {
991     size_t i;
992
993     for (i = 0; i < table->class->n_columns; i++) {
994         if (table->modes[i] & OVSDB_IDL_TRACK) {
995             return true;
996         }
997     }
998    return false;
999 }
1000
1001 /* Returns the first tracked row in table with class 'table_class'
1002  * for the specified 'idl'. Returns NULL if there are no tracked rows */
1003 const struct ovsdb_idl_row *
1004 ovsdb_idl_track_get_first(const struct ovsdb_idl *idl,
1005                           const struct ovsdb_idl_table_class *table_class)
1006 {
1007     struct ovsdb_idl_table *table
1008         = ovsdb_idl_table_from_class(idl, table_class);
1009
1010     if (!ovs_list_is_empty(&table->track_list)) {
1011         return CONTAINER_OF(ovs_list_front(&table->track_list), struct ovsdb_idl_row, track_node);
1012     }
1013     return NULL;
1014 }
1015
1016 /* Returns the next tracked row in table after the specified 'row'
1017  * (in no particular order). Returns NULL if there are no tracked rows */
1018 const struct ovsdb_idl_row *
1019 ovsdb_idl_track_get_next(const struct ovsdb_idl_row *row)
1020 {
1021     if (row->track_node.next != &row->table->track_list) {
1022         return CONTAINER_OF(row->track_node.next, struct ovsdb_idl_row, track_node);
1023     }
1024
1025     return NULL;
1026 }
1027
1028 /* Returns true if a tracked 'column' in 'row' was updated by IDL, false
1029  * otherwise. The tracking data is cleared by ovsdb_idl_track_clear()
1030  *
1031  * Function returns false if 'column' is not tracked (see
1032  * ovsdb_idl_track_add_column()).
1033  */
1034 bool
1035 ovsdb_idl_track_is_updated(const struct ovsdb_idl_row *row,
1036                            const struct ovsdb_idl_column *column)
1037 {
1038     const struct ovsdb_idl_table_class *class;
1039     size_t column_idx;
1040
1041     class = row->table->class;
1042     column_idx = column - class->columns;
1043
1044     if (row->updated && bitmap_is_set(row->updated, column_idx)) {
1045         return true;
1046     } else {
1047         return false;
1048     }
1049 }
1050
1051 /* Flushes the tracked rows. Client calls this function after calling
1052  * ovsdb_idl_run() and read all tracked rows with the ovsdb_idl_track_get_*()
1053  * functions. This is usually done at the end of the client's processing
1054  * loop when it is ready to do ovsdb_idl_run() again.
1055  */
1056 void
1057 ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
1058 {
1059     size_t i;
1060
1061     for (i = 0; i < idl->class->n_tables; i++) {
1062         struct ovsdb_idl_table *table = &idl->tables[i];
1063
1064         if (!ovs_list_is_empty(&table->track_list)) {
1065             struct ovsdb_idl_row *row, *next;
1066
1067             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
1068                 if (row->updated) {
1069                     free(row->updated);
1070                     row->updated = NULL;
1071                 }
1072                 ovs_list_remove(&row->track_node);
1073                 ovs_list_init(&row->track_node);
1074                 if (ovsdb_idl_row_is_orphan(row)) {
1075                     ovsdb_idl_row_clear_old(row);
1076                     free(row);
1077                 }
1078             }
1079         }
1080     }
1081 }
1082
1083 \f
1084 static void
1085 ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
1086 {
1087     struct jsonrpc_msg *msg;
1088
1089     json_destroy(idl->request_id);
1090     msg = jsonrpc_create_request(
1091         "get_schema",
1092         json_array_create_1(json_string_create(idl->class->database)),
1093         &idl->request_id);
1094     jsonrpc_session_send(idl->session, msg);
1095 }
1096
1097 static void
1098 log_error(struct ovsdb_error *error)
1099 {
1100     char *s = ovsdb_error_to_string(error);
1101     VLOG_WARN("error parsing database schema: %s", s);
1102     free(s);
1103     ovsdb_error_destroy(error);
1104 }
1105
1106 /* Frees 'schema', which is in the format returned by parse_schema(). */
1107 static void
1108 free_schema(struct shash *schema)
1109 {
1110     if (schema) {
1111         struct shash_node *node, *next;
1112
1113         SHASH_FOR_EACH_SAFE (node, next, schema) {
1114             struct sset *sset = node->data;
1115             sset_destroy(sset);
1116             free(sset);
1117             shash_delete(schema, node);
1118         }
1119         shash_destroy(schema);
1120         free(schema);
1121     }
1122 }
1123
1124 /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
1125  * 7047, to obtain the names of its rows and columns.  If successful, returns
1126  * an shash whose keys are table names and whose values are ssets, where each
1127  * sset contains the names of its table's columns.  On failure (due to a parse
1128  * error), returns NULL.
1129  *
1130  * It would also be possible to use the general-purpose OVSDB schema parser in
1131  * ovsdb-server, but that's overkill, possibly too strict for the current use
1132  * case, and would require restructuring ovsdb-server to separate the schema
1133  * code from the rest. */
1134 static struct shash *
1135 parse_schema(const struct json *schema_json)
1136 {
1137     struct ovsdb_parser parser;
1138     const struct json *tables_json;
1139     struct ovsdb_error *error;
1140     struct shash_node *node;
1141     struct shash *schema;
1142
1143     ovsdb_parser_init(&parser, schema_json, "database schema");
1144     tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
1145     error = ovsdb_parser_destroy(&parser);
1146     if (error) {
1147         log_error(error);
1148         return NULL;
1149     }
1150
1151     schema = xmalloc(sizeof *schema);
1152     shash_init(schema);
1153     SHASH_FOR_EACH (node, json_object(tables_json)) {
1154         const char *table_name = node->name;
1155         const struct json *json = node->data;
1156         const struct json *columns_json;
1157
1158         ovsdb_parser_init(&parser, json, "table schema for table %s",
1159                           table_name);
1160         columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
1161         error = ovsdb_parser_destroy(&parser);
1162         if (error) {
1163             log_error(error);
1164             free_schema(schema);
1165             return NULL;
1166         }
1167
1168         struct sset *columns = xmalloc(sizeof *columns);
1169         sset_init(columns);
1170
1171         struct shash_node *node2;
1172         SHASH_FOR_EACH (node2, json_object(columns_json)) {
1173             const char *column_name = node2->name;
1174             sset_add(columns, column_name);
1175         }
1176         shash_add(schema, table_name, columns);
1177     }
1178     return schema;
1179 }
1180
1181 static void
1182 ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
1183                                  const char *method)
1184 {
1185     struct shash *schema;
1186     struct json *monitor_requests;
1187     struct jsonrpc_msg *msg;
1188     char uuid[UUID_LEN + 1];
1189     size_t i;
1190
1191     schema = parse_schema(idl->schema);
1192     monitor_requests = json_object_create();
1193     for (i = 0; i < idl->class->n_tables; i++) {
1194         struct ovsdb_idl_table *table = &idl->tables[i];
1195         const struct ovsdb_idl_table_class *tc = table->class;
1196         struct json *monitor_request, *columns, *where;
1197         const struct sset *table_schema;
1198         size_t j;
1199
1200         table_schema = (schema
1201                         ? shash_find_data(schema, table->class->name)
1202                         : NULL);
1203
1204         columns = table->need_table ? json_array_create_empty() : NULL;
1205         for (j = 0; j < tc->n_columns; j++) {
1206             const struct ovsdb_idl_column *column = &tc->columns[j];
1207             if (table->modes[j] & OVSDB_IDL_MONITOR) {
1208                 if (table_schema
1209                     && !sset_contains(table_schema, column->name)) {
1210                     VLOG_WARN("%s table in %s database lacks %s column "
1211                               "(database needs upgrade?)",
1212                               table->class->name, idl->class->database,
1213                               column->name);
1214                     continue;
1215                 }
1216                 if (!columns) {
1217                     columns = json_array_create_empty();
1218                 }
1219                 json_array_add(columns, json_string_create(column->name));
1220             }
1221         }
1222
1223         if (columns) {
1224             if (schema && !table_schema) {
1225                 VLOG_WARN("%s database lacks %s table "
1226                           "(database needs upgrade?)",
1227                           idl->class->database, table->class->name);
1228                 json_destroy(columns);
1229                 continue;
1230             }
1231
1232             monitor_request = json_object_create();
1233             json_object_put(monitor_request, "columns", columns);
1234             if (!strcmp(method, "monitor_cond") && table->cond_changed &&
1235                 ovs_list_size(&table->condition.clauses) > 0) {
1236                 where = ovsdb_idl_condition_to_json(&table->condition);
1237                 json_object_put(monitor_request, "where", where);
1238                 table->cond_changed = false;
1239             }
1240             json_object_put(monitor_requests, tc->name, monitor_request);
1241         }
1242     }
1243     free_schema(schema);
1244
1245     json_destroy(idl->request_id);
1246
1247     snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&idl->uuid));
1248     msg = jsonrpc_create_request(
1249         method,
1250         json_array_create_3(json_string_create(idl->class->database),
1251                             json_string_create(uuid), monitor_requests),
1252         &idl->request_id);
1253     jsonrpc_session_send(idl->session, msg);
1254     idl->cond_changed = false;
1255 }
1256
1257 static void
1258 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
1259 {
1260     ovsdb_idl_send_monitor_request__(idl, "monitor");
1261 }
1262
1263 static void
1264 log_parse_update_error(struct ovsdb_error *error)
1265 {
1266         if (!VLOG_DROP_WARN(&syntax_rl)) {
1267             char *s = ovsdb_error_to_string(error);
1268             VLOG_WARN_RL(&syntax_rl, "%s", s);
1269             free(s);
1270         }
1271         ovsdb_error_destroy(error);
1272 }
1273
1274 static void
1275 ovsdb_idl_send_monitor_cond_request(struct ovsdb_idl *idl)
1276 {
1277     ovsdb_idl_send_monitor_request__(idl, "monitor_cond");
1278 }
1279
1280 static void
1281 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates,
1282                        enum ovsdb_update_version version)
1283 {
1284     struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates,
1285                                                          version);
1286     if (error) {
1287         log_parse_update_error(error);
1288     }
1289 }
1290
1291 static struct ovsdb_error *
1292 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
1293                          const struct json *table_updates,
1294                          enum ovsdb_update_version version)
1295 {
1296     const struct shash_node *tables_node;
1297     const char *table_updates_name = table_updates_names[version];
1298     const char *table_update_name = table_update_names[version];
1299     const char *row_update_name = row_update_names[version];
1300
1301     if (table_updates->type != JSON_OBJECT) {
1302         return ovsdb_syntax_error(table_updates, NULL,
1303                                   "<%s> is not an object",
1304                                   table_updates_name);
1305     }
1306
1307     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
1308         const struct json *table_update = tables_node->data;
1309         const struct shash_node *table_node;
1310         struct ovsdb_idl_table *table;
1311
1312         table = shash_find_data(&idl->table_by_name, tables_node->name);
1313         if (!table) {
1314             return ovsdb_syntax_error(
1315                 table_updates, NULL,
1316                 "<%s> includes unknown table \"%s\"",
1317                 table_updates_name,
1318                 tables_node->name);
1319         }
1320
1321         if (table_update->type != JSON_OBJECT) {
1322             return ovsdb_syntax_error(table_update, NULL,
1323                                       "<%s> for table \"%s\" is "
1324                                       "not an object",
1325                                       table_update_name,
1326                                       table->class->name);
1327         }
1328         SHASH_FOR_EACH (table_node, json_object(table_update)) {
1329             const struct json *row_update = table_node->data;
1330             const struct json *old_json, *new_json;
1331             struct uuid uuid;
1332
1333             if (!uuid_from_string(&uuid, table_node->name)) {
1334                 return ovsdb_syntax_error(table_update, NULL,
1335                                           "<%s> for table \"%s\" "
1336                                           "contains bad UUID "
1337                                           "\"%s\" as member name",
1338                                           table_update_name,
1339                                           table->class->name,
1340                                           table_node->name);
1341             }
1342             if (row_update->type != JSON_OBJECT) {
1343                 return ovsdb_syntax_error(row_update, NULL,
1344                                           "<%s> for table \"%s\" "
1345                                           "contains <%s> for %s that "
1346                                           "is not an object",
1347                                           table_update_name,
1348                                           table->class->name,
1349                                           row_update_name,
1350                                           table_node->name);
1351             }
1352
1353             switch(version) {
1354             case OVSDB_UPDATE:
1355                 old_json = shash_find_data(json_object(row_update), "old");
1356                 new_json = shash_find_data(json_object(row_update), "new");
1357                 if (old_json && old_json->type != JSON_OBJECT) {
1358                     return ovsdb_syntax_error(old_json, NULL,
1359                                               "\"old\" <row> is not object");
1360                 } else if (new_json && new_json->type != JSON_OBJECT) {
1361                     return ovsdb_syntax_error(new_json, NULL,
1362                                               "\"new\" <row> is not object");
1363                 } else if ((old_json != NULL) + (new_json != NULL)
1364                            != shash_count(json_object(row_update))) {
1365                     return ovsdb_syntax_error(row_update, NULL,
1366                                               "<row-update> contains "
1367                                               "unexpected member");
1368                 } else if (!old_json && !new_json) {
1369                     return ovsdb_syntax_error(row_update, NULL,
1370                                               "<row-update> missing \"old\" "
1371                                               "and \"new\" members");
1372                 }
1373
1374                 if (ovsdb_idl_process_update(table, &uuid, old_json,
1375                                              new_json)) {
1376                     idl->change_seqno++;
1377                 }
1378                 break;
1379
1380             case OVSDB_UPDATE2: {
1381                 const char *ops[] = {"modify", "insert", "delete", "initial"};
1382                 const char *operation;
1383                 const struct json *row;
1384                 int i;
1385
1386                 for (i = 0; i < ARRAY_SIZE(ops); i++) {
1387                     operation = ops[i];
1388                     row = shash_find_data(json_object(row_update), operation);
1389
1390                     if (row)  {
1391                         if (ovsdb_idl_process_update2(table, &uuid, operation,
1392                                                       row)) {
1393                             idl->change_seqno++;
1394                         }
1395                         break;
1396                     }
1397                 }
1398
1399                 /* row_update2 should contain one of the objects */
1400                 if (i == ARRAY_SIZE(ops)) {
1401                     return ovsdb_syntax_error(row_update, NULL,
1402                                               "<row_update2> includes unknown "
1403                                               "object");
1404                 }
1405                 break;
1406             }
1407
1408             default:
1409                 OVS_NOT_REACHED();
1410             }
1411         }
1412     }
1413
1414     return NULL;
1415 }
1416
1417 static struct ovsdb_idl_row *
1418 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
1419 {
1420     struct ovsdb_idl_row *row;
1421
1422     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &table->rows) {
1423         if (uuid_equals(&row->uuid, uuid)) {
1424             return row;
1425         }
1426     }
1427     return NULL;
1428 }
1429
1430 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1431  * otherwise. */
1432 static bool
1433 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
1434                          const struct uuid *uuid, const struct json *old,
1435                          const struct json *new)
1436 {
1437     struct ovsdb_idl_row *row;
1438
1439     row = ovsdb_idl_get_row(table, uuid);
1440     if (!new) {
1441         /* Delete row. */
1442         if (row && !ovsdb_idl_row_is_orphan(row)) {
1443             /* XXX perhaps we should check the 'old' values? */
1444             ovsdb_idl_delete_row(row);
1445         } else {
1446             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1447                          "from table %s",
1448                          UUID_ARGS(uuid), table->class->name);
1449             return false;
1450         }
1451     } else if (!old) {
1452         /* Insert row. */
1453         if (!row) {
1454             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1455         } else if (ovsdb_idl_row_is_orphan(row)) {
1456             ovsdb_idl_insert_row(row, new);
1457         } else {
1458             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1459                          "table %s", UUID_ARGS(uuid), table->class->name);
1460             return ovsdb_idl_modify_row(row, new);
1461         }
1462     } else {
1463         /* Modify row. */
1464         if (row) {
1465             /* XXX perhaps we should check the 'old' values? */
1466             if (!ovsdb_idl_row_is_orphan(row)) {
1467                 return ovsdb_idl_modify_row(row, new);
1468             } else {
1469                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1470                              "referenced row "UUID_FMT" in table %s",
1471                              UUID_ARGS(uuid), table->class->name);
1472                 ovsdb_idl_insert_row(row, new);
1473             }
1474         } else {
1475             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1476                          "in table %s", UUID_ARGS(uuid), table->class->name);
1477             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
1478         }
1479     }
1480
1481     return true;
1482 }
1483
1484 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1485  * otherwise. */
1486 static bool
1487 ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
1488                           const struct uuid *uuid,
1489                           const char *operation,
1490                           const struct json *json_row)
1491 {
1492     struct ovsdb_idl_row *row;
1493
1494     row = ovsdb_idl_get_row(table, uuid);
1495     if (!strcmp(operation, "delete")) {
1496         /* Delete row. */
1497         if (row && !ovsdb_idl_row_is_orphan(row)) {
1498             ovsdb_idl_delete_row(row);
1499         } else {
1500             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
1501                          "from table %s",
1502                          UUID_ARGS(uuid), table->class->name);
1503             return false;
1504         }
1505     } else if (!strcmp(operation, "insert") || !strcmp(operation, "initial")) {
1506         /* Insert row. */
1507         if (!row) {
1508             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), json_row);
1509         } else if (ovsdb_idl_row_is_orphan(row)) {
1510             ovsdb_idl_insert_row(row, json_row);
1511         } else {
1512             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
1513                          "table %s", UUID_ARGS(uuid), table->class->name);
1514             ovsdb_idl_delete_row(row);
1515             ovsdb_idl_insert_row(row, json_row);
1516         }
1517     } else if (!strcmp(operation, "modify")) {
1518         /* Modify row. */
1519         if (row) {
1520             if (!ovsdb_idl_row_is_orphan(row)) {
1521                 return ovsdb_idl_modify_row_by_diff(row, json_row);
1522             } else {
1523                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
1524                              "referenced row "UUID_FMT" in table %s",
1525                              UUID_ARGS(uuid), table->class->name);
1526                 return false;
1527             }
1528         } else {
1529             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
1530                          "in table %s", UUID_ARGS(uuid), table->class->name);
1531             return false;
1532         }
1533     } else {
1534             VLOG_WARN_RL(&semantic_rl, "unknown operation %s to "
1535                          "table %s", operation, table->class->name);
1536             return false;
1537     }
1538
1539     return true;
1540 }
1541
1542 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1543  * otherwise.
1544  *
1545  * Change 'row' either with the content of 'row_json' or by apply 'diff'.
1546  * Caller needs to provide either valid 'row_json' or 'diff', but not
1547  * both.  */
1548 static bool
1549 ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
1550                        const struct json *diff_json,
1551                        enum ovsdb_idl_change change)
1552 {
1553     struct ovsdb_idl_table *table = row->table;
1554     const struct ovsdb_idl_table_class *class = table->class;
1555     struct shash_node *node;
1556     bool changed = false;
1557     bool apply_diff = diff_json != NULL;
1558     const struct json *json = apply_diff ? diff_json : row_json;
1559
1560     SHASH_FOR_EACH (node, json_object(json)) {
1561         const char *column_name = node->name;
1562         const struct ovsdb_idl_column *column;
1563         struct ovsdb_datum datum;
1564         struct ovsdb_error *error;
1565         unsigned int column_idx;
1566         struct ovsdb_datum *old;
1567
1568         column = shash_find_data(&table->columns, column_name);
1569         if (!column) {
1570             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
1571                          column_name, UUID_ARGS(&row->uuid));
1572             continue;
1573         }
1574
1575         column_idx = column - table->class->columns;
1576         old = &row->old[column_idx];
1577
1578         error = NULL;
1579         if (apply_diff) {
1580             struct ovsdb_datum diff;
1581
1582             ovs_assert(!row_json);
1583             error = ovsdb_transient_datum_from_json(&diff, &column->type,
1584                                                     node->data);
1585             if (!error) {
1586                 error = ovsdb_datum_apply_diff(&datum, old, &diff,
1587                                                &column->type);
1588                 ovsdb_datum_destroy(&diff, &column->type);
1589             }
1590         } else {
1591             ovs_assert(!diff_json);
1592             error = ovsdb_datum_from_json(&datum, &column->type, node->data,
1593                                           NULL);
1594         }
1595
1596         if (!error) {
1597             if (!ovsdb_datum_equals(old, &datum, &column->type)) {
1598                 ovsdb_datum_swap(old, &datum);
1599                 if (table->modes[column_idx] & OVSDB_IDL_ALERT) {
1600                     changed = true;
1601                     row->change_seqno[change]
1602                         = row->table->change_seqno[change]
1603                         = row->table->idl->change_seqno + 1;
1604                     if (table->modes[column_idx] & OVSDB_IDL_TRACK) {
1605                         if (!ovs_list_is_empty(&row->track_node)) {
1606                             ovs_list_remove(&row->track_node);
1607                         }
1608                         ovs_list_push_back(&row->table->track_list,
1609                                        &row->track_node);
1610                         if (!row->updated) {
1611                             row->updated = bitmap_allocate(class->n_columns);
1612                         }
1613                         bitmap_set1(row->updated, column_idx);
1614                     }
1615                 }
1616             } else {
1617                 /* Didn't really change but the OVSDB monitor protocol always
1618                  * includes every value in a row. */
1619             }
1620
1621             ovsdb_datum_destroy(&datum, &column->type);
1622         } else {
1623             char *s = ovsdb_error_to_string(error);
1624             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
1625                          " in table %s: %s", column_name,
1626                          UUID_ARGS(&row->uuid), table->class->name, s);
1627             free(s);
1628             ovsdb_error_destroy(error);
1629         }
1630     }
1631     return changed;
1632 }
1633
1634 static bool
1635 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json,
1636                      enum ovsdb_idl_change change)
1637 {
1638     return ovsdb_idl_row_change__(row, row_json, NULL, change);
1639 }
1640
1641 static bool
1642 ovsdb_idl_row_apply_diff(struct ovsdb_idl_row *row,
1643                          const struct json *diff_json,
1644                          enum ovsdb_idl_change change)
1645 {
1646     return ovsdb_idl_row_change__(row, NULL, diff_json, change);
1647 }
1648
1649 /* When a row A refers to row B through a column with a "refTable" constraint,
1650  * but row B does not exist, row B is called an "orphan row".  Orphan rows
1651  * should not persist, because the database enforces referential integrity, but
1652  * they can appear transiently as changes from the database are received (the
1653  * database doesn't try to topologically sort them and circular references mean
1654  * it isn't always possible anyhow).
1655  *
1656  * This function returns true if 'row' is an orphan row, otherwise false.
1657  */
1658 static bool
1659 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
1660 {
1661     return !row->old && !row->new;
1662 }
1663
1664 /* Returns true if 'row' is conceptually part of the database as modified by
1665  * the current transaction (if any), false otherwise.
1666  *
1667  * This function will return true if 'row' is not an orphan (see the comment on
1668  * ovsdb_idl_row_is_orphan()) and:
1669  *
1670  *   - 'row' exists in the database and has not been deleted within the
1671  *     current transaction (if any).
1672  *
1673  *   - 'row' was inserted within the current transaction and has not been
1674  *     deleted.  (In the latter case you should not have passed 'row' in at
1675  *     all, because ovsdb_idl_txn_delete() freed it.)
1676  *
1677  * This function will return false if 'row' is an orphan or if 'row' was
1678  * deleted within the current transaction.
1679  */
1680 static bool
1681 ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
1682 {
1683     return row->new != NULL;
1684 }
1685
1686 static void
1687 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
1688 {
1689     const struct ovsdb_idl_table_class *class = row->table->class;
1690     size_t i;
1691
1692     for (i = 0; i < class->n_columns; i++) {
1693         const struct ovsdb_idl_column *c = &class->columns[i];
1694         (c->parse)(row, &row->old[i]);
1695     }
1696 }
1697
1698 static void
1699 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
1700 {
1701     const struct ovsdb_idl_table_class *class = row->table->class;
1702     size_t i;
1703
1704     for (i = 0; i < class->n_columns; i++) {
1705         const struct ovsdb_idl_column *c = &class->columns[i];
1706         (c->unparse)(row);
1707     }
1708 }
1709
1710 static void
1711 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
1712 {
1713     ovs_assert(row->old == row->new);
1714     if (!ovsdb_idl_row_is_orphan(row)) {
1715         const struct ovsdb_idl_table_class *class = row->table->class;
1716         size_t i;
1717
1718         for (i = 0; i < class->n_columns; i++) {
1719             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
1720         }
1721         free(row->old);
1722         row->old = row->new = NULL;
1723     }
1724 }
1725
1726 static void
1727 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
1728 {
1729     if (row->old != row->new) {
1730         if (row->new) {
1731             const struct ovsdb_idl_table_class *class = row->table->class;
1732             size_t i;
1733
1734             if (row->written) {
1735                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
1736                     ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
1737                 }
1738             }
1739             free(row->new);
1740             free(row->written);
1741             row->written = NULL;
1742         }
1743         row->new = row->old;
1744     }
1745 }
1746
1747 static void
1748 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
1749 {
1750     struct ovsdb_idl_arc *arc, *next;
1751
1752     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
1753      * that this causes to be unreferenced, if tracking is not enabled.
1754      * If tracking is enabled, orphaned nodes are removed from hmap but not
1755      * freed.
1756      */
1757     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
1758         ovs_list_remove(&arc->dst_node);
1759         if (destroy_dsts
1760             && ovsdb_idl_row_is_orphan(arc->dst)
1761             && ovs_list_is_empty(&arc->dst->dst_arcs)) {
1762             ovsdb_idl_row_destroy(arc->dst);
1763         }
1764         free(arc);
1765     }
1766     ovs_list_init(&row->src_arcs);
1767 }
1768
1769 /* Force nodes that reference 'row' to reparse. */
1770 static void
1771 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
1772 {
1773     struct ovsdb_idl_arc *arc, *next;
1774
1775     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
1776      * 'arc', so we need to use the "safe" variant of list traversal.  However,
1777      * calling an ovsdb_idl_column's 'parse' function will add an arc
1778      * equivalent to 'arc' to row->arcs.  That could be a problem for
1779      * traversal, but it adds it at the beginning of the list to prevent us
1780      * from stumbling upon it again.
1781      *
1782      * (If duplicate arcs were possible then we would need to make sure that
1783      * 'next' didn't also point into 'arc''s destination, but we forbid
1784      * duplicate arcs.) */
1785     LIST_FOR_EACH_SAFE (arc, next, dst_node, &row->dst_arcs) {
1786         struct ovsdb_idl_row *ref = arc->src;
1787
1788         ovsdb_idl_row_unparse(ref);
1789         ovsdb_idl_row_clear_arcs(ref, false);
1790         ovsdb_idl_row_parse(ref);
1791     }
1792 }
1793
1794 static struct ovsdb_idl_row *
1795 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
1796 {
1797     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
1798     class->row_init(row);
1799     ovs_list_init(&row->src_arcs);
1800     ovs_list_init(&row->dst_arcs);
1801     hmap_node_nullify(&row->txn_node);
1802     ovs_list_init(&row->track_node);
1803     return row;
1804 }
1805
1806 static struct ovsdb_idl_row *
1807 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
1808 {
1809     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
1810     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
1811     row->uuid = *uuid;
1812     row->table = table;
1813     row->map_op_written = NULL;
1814     row->map_op_lists = NULL;
1815     return row;
1816 }
1817
1818 static void
1819 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
1820 {
1821     if (row) {
1822         ovsdb_idl_row_clear_old(row);
1823         hmap_remove(&row->table->rows, &row->hmap_node);
1824         ovsdb_idl_destroy_all_map_op_lists(row);
1825         if (ovsdb_idl_track_is_set(row->table)) {
1826             row->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1827                 = row->table->change_seqno[OVSDB_IDL_CHANGE_DELETE]
1828                 = row->table->idl->change_seqno + 1;
1829         }
1830         if (!ovs_list_is_empty(&row->track_node)) {
1831             ovs_list_remove(&row->track_node);
1832         }
1833         ovs_list_push_back(&row->table->track_list, &row->track_node);
1834     }
1835 }
1836
1837 static void
1838 ovsdb_idl_destroy_all_map_op_lists(struct ovsdb_idl_row *row)
1839 {
1840     if (row->map_op_written) {
1841         /* Clear Map Operation Lists */
1842         size_t idx, n_columns;
1843         const struct ovsdb_idl_column *columns;
1844         const struct ovsdb_type *type;
1845         n_columns = row->table->class->n_columns;
1846         columns = row->table->class->columns;
1847         BITMAP_FOR_EACH_1 (idx, n_columns, row->map_op_written) {
1848             type = &columns[idx].type;
1849             map_op_list_destroy(row->map_op_lists[idx], type);
1850         }
1851         free(row->map_op_lists);
1852         bitmap_free(row->map_op_written);
1853         row->map_op_lists = NULL;
1854         row->map_op_written = NULL;
1855     }
1856 }
1857
1858 static void
1859 ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
1860 {
1861     size_t i;
1862
1863     for (i = 0; i < idl->class->n_tables; i++) {
1864         struct ovsdb_idl_table *table = &idl->tables[i];
1865
1866         if (!ovs_list_is_empty(&table->track_list)) {
1867             struct ovsdb_idl_row *row, *next;
1868
1869             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
1870                 if (!ovsdb_idl_track_is_set(row->table)) {
1871                     ovs_list_remove(&row->track_node);
1872                     free(row);
1873                 }
1874             }
1875         }
1876     }
1877 }
1878
1879 static void
1880 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
1881 {
1882     const struct ovsdb_idl_table_class *class = row->table->class;
1883     size_t i;
1884
1885     ovs_assert(!row->old && !row->new);
1886     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
1887     for (i = 0; i < class->n_columns; i++) {
1888         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
1889     }
1890     ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_INSERT);
1891     ovsdb_idl_row_parse(row);
1892
1893     ovsdb_idl_row_reparse_backrefs(row);
1894 }
1895
1896 static void
1897 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
1898 {
1899     ovsdb_idl_row_unparse(row);
1900     ovsdb_idl_row_clear_arcs(row, true);
1901     ovsdb_idl_row_clear_old(row);
1902     if (ovs_list_is_empty(&row->dst_arcs)) {
1903         ovsdb_idl_row_destroy(row);
1904     } else {
1905         ovsdb_idl_row_reparse_backrefs(row);
1906     }
1907 }
1908
1909 /* Returns true if a column with mode OVSDB_IDL_MODE_RW changed, false
1910  * otherwise. */
1911 static bool
1912 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
1913 {
1914     bool changed;
1915
1916     ovsdb_idl_row_unparse(row);
1917     ovsdb_idl_row_clear_arcs(row, true);
1918     changed = ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_MODIFY);
1919     ovsdb_idl_row_parse(row);
1920
1921     return changed;
1922 }
1923
1924 static bool
1925 ovsdb_idl_modify_row_by_diff(struct ovsdb_idl_row *row,
1926                              const struct json *diff_json)
1927 {
1928     bool changed;
1929
1930     ovsdb_idl_row_unparse(row);
1931     ovsdb_idl_row_clear_arcs(row, true);
1932     changed = ovsdb_idl_row_apply_diff(row, diff_json,
1933                                        OVSDB_IDL_CHANGE_MODIFY);
1934     ovsdb_idl_row_parse(row);
1935
1936     return changed;
1937 }
1938
1939 static bool
1940 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
1941 {
1942     const struct ovsdb_idl_arc *arc;
1943
1944     /* No self-arcs. */
1945     if (src == dst) {
1946         return false;
1947     }
1948
1949     /* No duplicate arcs.
1950      *
1951      * We only need to test whether the first arc in dst->dst_arcs originates
1952      * at 'src', since we add all of the arcs from a given source in a clump
1953      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
1954      * added at the front of the dst_arcs list. */
1955     if (ovs_list_is_empty(&dst->dst_arcs)) {
1956         return true;
1957     }
1958     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
1959     return arc->src != src;
1960 }
1961
1962 static struct ovsdb_idl_table *
1963 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
1964                            const struct ovsdb_idl_table_class *table_class)
1965 {
1966     return &idl->tables[table_class - idl->class->tables];
1967 }
1968
1969 /* Called by ovsdb-idlc generated code. */
1970 struct ovsdb_idl_row *
1971 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
1972                       struct ovsdb_idl_table_class *dst_table_class,
1973                       const struct uuid *dst_uuid)
1974 {
1975     struct ovsdb_idl *idl = src->table->idl;
1976     struct ovsdb_idl_table *dst_table;
1977     struct ovsdb_idl_arc *arc;
1978     struct ovsdb_idl_row *dst;
1979
1980     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
1981     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
1982     if (idl->txn) {
1983         /* We're being called from ovsdb_idl_txn_write().  We must not update
1984          * any arcs, because the transaction will be backed out at commit or
1985          * abort time and we don't want our graph screwed up.
1986          *
1987          * Just return the destination row, if there is one and it has not been
1988          * deleted. */
1989         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
1990             return dst;
1991         }
1992         return NULL;
1993     } else {
1994         /* We're being called from some other context.  Update the graph. */
1995         if (!dst) {
1996             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
1997         }
1998
1999         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
2000         if (may_add_arc(src, dst)) {
2001             /* The arc *must* be added at the front of the dst_arcs list.  See
2002              * ovsdb_idl_row_reparse_backrefs() for details. */
2003             arc = xmalloc(sizeof *arc);
2004             ovs_list_push_front(&src->src_arcs, &arc->src_node);
2005             ovs_list_push_front(&dst->dst_arcs, &arc->dst_node);
2006             arc->src = src;
2007             arc->dst = dst;
2008         }
2009
2010         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
2011     }
2012 }
2013
2014 /* Searches 'tc''s table in 'idl' for a row with UUID 'uuid'.  Returns a
2015  * pointer to the row if there is one, otherwise a null pointer.  */
2016 const struct ovsdb_idl_row *
2017 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
2018                            const struct ovsdb_idl_table_class *tc,
2019                            const struct uuid *uuid)
2020 {
2021     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
2022 }
2023
2024 static struct ovsdb_idl_row *
2025 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
2026 {
2027     for (; node; node = hmap_next(&table->rows, node)) {
2028         struct ovsdb_idl_row *row;
2029
2030         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
2031         if (ovsdb_idl_row_exists(row)) {
2032             return row;
2033         }
2034     }
2035     return NULL;
2036 }
2037
2038 /* Returns a row in 'table_class''s table in 'idl', or a null pointer if that
2039  * table is empty.
2040  *
2041  * Database tables are internally maintained as hash tables, so adding or
2042  * removing rows while traversing the same table can cause some rows to be
2043  * visited twice or not at apply. */
2044 const struct ovsdb_idl_row *
2045 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
2046                     const struct ovsdb_idl_table_class *table_class)
2047 {
2048     struct ovsdb_idl_table *table
2049         = ovsdb_idl_table_from_class(idl, table_class);
2050     return next_real_row(table, hmap_first(&table->rows));
2051 }
2052
2053 /* Returns a row following 'row' within its table, or a null pointer if 'row'
2054  * is the last row in its table. */
2055 const struct ovsdb_idl_row *
2056 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
2057 {
2058     struct ovsdb_idl_table *table = row->table;
2059
2060     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
2061 }
2062
2063 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
2064  * transaction has changed 'column''s value, the modified value is returned.
2065  *
2066  * The caller must not modify or free the returned value.
2067  *
2068  * Various kinds of changes can invalidate the returned value: writing to the
2069  * same 'column' in 'row' (e.g. with ovsdb_idl_txn_write()), deleting 'row'
2070  * (e.g. with ovsdb_idl_txn_delete()), or completing an ongoing transaction
2071  * (e.g. with ovsdb_idl_txn_commit() or ovsdb_idl_txn_abort()).  If the
2072  * returned value is needed for a long time, it is best to make a copy of it
2073  * with ovsdb_datum_clone(). */
2074 const struct ovsdb_datum *
2075 ovsdb_idl_read(const struct ovsdb_idl_row *row,
2076                const struct ovsdb_idl_column *column)
2077 {
2078     const struct ovsdb_idl_table_class *class;
2079     size_t column_idx;
2080
2081     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
2082
2083     class = row->table->class;
2084     column_idx = column - class->columns;
2085
2086     ovs_assert(row->new != NULL);
2087     ovs_assert(column_idx < class->n_columns);
2088
2089     if (row->written && bitmap_is_set(row->written, column_idx)) {
2090         return &row->new[column_idx];
2091     } else if (row->old) {
2092         return &row->old[column_idx];
2093     } else {
2094         return ovsdb_datum_default(&column->type);
2095     }
2096 }
2097
2098 /* Same as ovsdb_idl_read(), except that it also asserts that 'column' has key
2099  * type 'key_type' and value type 'value_type'.  (Scalar and set types will
2100  * have a value type of OVSDB_TYPE_VOID.)
2101  *
2102  * This is useful in code that "knows" that a particular column has a given
2103  * type, so that it will abort if someone changes the column's type without
2104  * updating the code that uses it. */
2105 const struct ovsdb_datum *
2106 ovsdb_idl_get(const struct ovsdb_idl_row *row,
2107               const struct ovsdb_idl_column *column,
2108               enum ovsdb_atomic_type key_type OVS_UNUSED,
2109               enum ovsdb_atomic_type value_type OVS_UNUSED)
2110 {
2111     ovs_assert(column->type.key.type == key_type);
2112     ovs_assert(column->type.value.type == value_type);
2113
2114     return ovsdb_idl_read(row, column);
2115 }
2116
2117 /* Returns true if the field represented by 'column' in 'row' may be modified,
2118  * false if it is immutable.
2119  *
2120  * Normally, whether a field is mutable is controlled by its column's schema.
2121  * However, an immutable column can be set to any initial value at the time of
2122  * insertion, so if 'row' is a new row (one that is being added as part of the
2123  * current transaction, supposing that a transaction is in progress) then even
2124  * its "immutable" fields are actually mutable. */
2125 bool
2126 ovsdb_idl_is_mutable(const struct ovsdb_idl_row *row,
2127                      const struct ovsdb_idl_column *column)
2128 {
2129     return column->mutable || (row->new && !row->old);
2130 }
2131
2132 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
2133  * to all-zero-bits by some other entity.  If 'row' was set up some other way
2134  * then the return value is indeterminate. */
2135 bool
2136 ovsdb_idl_row_is_synthetic(const struct ovsdb_idl_row *row)
2137 {
2138     return row->table == NULL;
2139 }
2140 \f
2141 /* Transactions. */
2142
2143 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
2144                                    enum ovsdb_idl_txn_status);
2145
2146 /* Returns a string representation of 'status'.  The caller must not modify or
2147  * free the returned string.
2148  *
2149  * The return value is probably useful only for debug log messages and unit
2150  * tests. */
2151 const char *
2152 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
2153 {
2154     switch (status) {
2155     case TXN_UNCOMMITTED:
2156         return "uncommitted";
2157     case TXN_UNCHANGED:
2158         return "unchanged";
2159     case TXN_INCOMPLETE:
2160         return "incomplete";
2161     case TXN_ABORTED:
2162         return "aborted";
2163     case TXN_SUCCESS:
2164         return "success";
2165     case TXN_TRY_AGAIN:
2166         return "try again";
2167     case TXN_NOT_LOCKED:
2168         return "not locked";
2169     case TXN_ERROR:
2170         return "error";
2171     }
2172     return "<unknown>";
2173 }
2174
2175 /* Starts a new transaction on 'idl'.  A given ovsdb_idl may only have a single
2176  * active transaction at a time.  See the large comment in ovsdb-idl.h for
2177  * general information on transactions. */
2178 struct ovsdb_idl_txn *
2179 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
2180 {
2181     struct ovsdb_idl_txn *txn;
2182
2183     ovs_assert(!idl->txn);
2184     idl->txn = txn = xmalloc(sizeof *txn);
2185     txn->request_id = NULL;
2186     txn->idl = idl;
2187     hmap_init(&txn->txn_rows);
2188     txn->status = TXN_UNCOMMITTED;
2189     txn->error = NULL;
2190     txn->dry_run = false;
2191     ds_init(&txn->comment);
2192
2193     txn->inc_table = NULL;
2194     txn->inc_column = NULL;
2195
2196     hmap_init(&txn->inserted_rows);
2197
2198     return txn;
2199 }
2200
2201 /* Appends 's', which is treated as a printf()-type format string, to the
2202  * comments that will be passed to the OVSDB server when 'txn' is committed.
2203  * (The comment will be committed to the OVSDB log, which "ovsdb-tool
2204  * show-log" can print in a relatively human-readable form.) */
2205 void
2206 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s, ...)
2207 {
2208     va_list args;
2209
2210     if (txn->comment.length) {
2211         ds_put_char(&txn->comment, '\n');
2212     }
2213
2214     va_start(args, s);
2215     ds_put_format_valist(&txn->comment, s, args);
2216     va_end(args);
2217 }
2218
2219 /* Marks 'txn' as a transaction that will not actually modify the database.  In
2220  * almost every way, the transaction is treated like other transactions.  It
2221  * must be committed or aborted like other transactions, it will be sent to the
2222  * database server like other transactions, and so on.  The only difference is
2223  * that the operations sent to the database server will include, as the last
2224  * step, an "abort" operation, so that any changes made by the transaction will
2225  * not actually take effect. */
2226 void
2227 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
2228 {
2229     txn->dry_run = true;
2230 }
2231
2232 /* Causes 'txn', when committed, to increment the value of 'column' within
2233  * 'row' by 1.  'column' must have an integer type.  After 'txn' commits
2234  * successfully, the client may retrieve the final (incremented) value of
2235  * 'column' with ovsdb_idl_txn_get_increment_new_value().
2236  *
2237  * The client could accomplish something similar with ovsdb_idl_read(),
2238  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
2239  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
2240  * will never (by itself) fail because of a verify error.
2241  *
2242  * The intended use is for incrementing the "next_cfg" column in the
2243  * Open_vSwitch table. */
2244 void
2245 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
2246                         const struct ovsdb_idl_row *row,
2247                         const struct ovsdb_idl_column *column)
2248 {
2249     ovs_assert(!txn->inc_table);
2250     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
2251     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
2252
2253     txn->inc_table = row->table->class->name;
2254     txn->inc_column = column->name;
2255     txn->inc_row = row->uuid;
2256 }
2257
2258 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
2259  * has been called for 'txn' but the commit is still incomplete (that is, the
2260  * last call returned TXN_INCOMPLETE) then the transaction may or may not still
2261  * end up committing at the database server, but the client will not be able to
2262  * get any further status information back. */
2263 void
2264 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
2265 {
2266     struct ovsdb_idl_txn_insert *insert, *next;
2267
2268     json_destroy(txn->request_id);
2269     if (txn->status == TXN_INCOMPLETE) {
2270         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2271     }
2272     ovsdb_idl_txn_abort(txn);
2273     ds_destroy(&txn->comment);
2274     free(txn->error);
2275     HMAP_FOR_EACH_SAFE (insert, next, hmap_node, &txn->inserted_rows) {
2276         free(insert);
2277     }
2278     hmap_destroy(&txn->inserted_rows);
2279     free(txn);
2280 }
2281
2282 /* Causes poll_block() to wake up if 'txn' has completed committing. */
2283 void
2284 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
2285 {
2286     if (txn->status != TXN_UNCOMMITTED && txn->status != TXN_INCOMPLETE) {
2287         poll_immediate_wake();
2288     }
2289 }
2290
2291 static struct json *
2292 where_uuid_equals(const struct uuid *uuid)
2293 {
2294     return
2295         json_array_create_1(
2296             json_array_create_3(
2297                 json_string_create("_uuid"),
2298                 json_string_create("=="),
2299                 json_array_create_2(
2300                     json_string_create("uuid"),
2301                     json_string_create_nocopy(
2302                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
2303 }
2304
2305 static char *
2306 uuid_name_from_uuid(const struct uuid *uuid)
2307 {
2308     char *name;
2309     char *p;
2310
2311     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
2312     for (p = name; *p != '\0'; p++) {
2313         if (*p == '-') {
2314             *p = '_';
2315         }
2316     }
2317
2318     return name;
2319 }
2320
2321 static const struct ovsdb_idl_row *
2322 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
2323 {
2324     const struct ovsdb_idl_row *row;
2325
2326     HMAP_FOR_EACH_WITH_HASH (row, txn_node, uuid_hash(uuid), &txn->txn_rows) {
2327         if (uuid_equals(&row->uuid, uuid)) {
2328             return row;
2329         }
2330     }
2331     return NULL;
2332 }
2333
2334 /* XXX there must be a cleaner way to do this */
2335 static struct json *
2336 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
2337 {
2338     if (json->type == JSON_ARRAY) {
2339         struct uuid uuid;
2340         size_t i;
2341
2342         if (json->u.array.n == 2
2343             && json->u.array.elems[0]->type == JSON_STRING
2344             && json->u.array.elems[1]->type == JSON_STRING
2345             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
2346             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
2347             const struct ovsdb_idl_row *row;
2348
2349             row = ovsdb_idl_txn_get_row(txn, &uuid);
2350             if (row && !row->old && row->new) {
2351                 json_destroy(json);
2352
2353                 return json_array_create_2(
2354                     json_string_create("named-uuid"),
2355                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
2356             }
2357         }
2358
2359         for (i = 0; i < json->u.array.n; i++) {
2360             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
2361                                                       txn);
2362         }
2363     } else if (json->type == JSON_OBJECT) {
2364         struct shash_node *node;
2365
2366         SHASH_FOR_EACH (node, json_object(json)) {
2367             node->data = substitute_uuids(node->data, txn);
2368         }
2369     }
2370     return json;
2371 }
2372
2373 static void
2374 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
2375 {
2376     struct ovsdb_idl_row *row, *next;
2377
2378     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
2379      * ovsdb_idl_column's 'parse' function, which will call
2380      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
2381      * transaction and fail to update the graph.  */
2382     txn->idl->txn = NULL;
2383
2384     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
2385         ovsdb_idl_destroy_all_map_op_lists(row);
2386         if (row->old) {
2387             if (row->written) {
2388                 ovsdb_idl_row_unparse(row);
2389                 ovsdb_idl_row_clear_arcs(row, false);
2390                 ovsdb_idl_row_parse(row);
2391             }
2392         } else {
2393             ovsdb_idl_row_unparse(row);
2394         }
2395         ovsdb_idl_row_clear_new(row);
2396
2397         free(row->prereqs);
2398         row->prereqs = NULL;
2399
2400         free(row->written);
2401         row->written = NULL;
2402
2403         hmap_remove(&txn->txn_rows, &row->txn_node);
2404         hmap_node_nullify(&row->txn_node);
2405         if (!row->old) {
2406             hmap_remove(&row->table->rows, &row->hmap_node);
2407             free(row);
2408         }
2409     }
2410     hmap_destroy(&txn->txn_rows);
2411     hmap_init(&txn->txn_rows);
2412 }
2413
2414 static bool
2415 ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
2416                                 struct json *mutations)
2417 {
2418     const struct ovsdb_idl_table_class *class = row->table->class;
2419     size_t idx;
2420     bool any_mutations = false;
2421
2422     BITMAP_FOR_EACH_1(idx, class->n_columns, row->map_op_written) {
2423         struct map_op_list *map_op_list;
2424         const struct ovsdb_idl_column *column;
2425         const struct ovsdb_datum *old_datum;
2426         enum ovsdb_atomic_type key_type, value_type;
2427         struct json *mutation, *map, *col_name, *mutator;
2428         struct json *del_set, *ins_map;
2429         bool any_del, any_ins;
2430
2431         map_op_list = row->map_op_lists[idx];
2432         column = &class->columns[idx];
2433         key_type = column->type.key.type;
2434         value_type = column->type.value.type;
2435
2436         /* Get the value to be changed */
2437         if (row->new && row->written && bitmap_is_set(row->written,idx)) {
2438             old_datum = &row->new[idx];
2439         } else if (row->old != NULL) {
2440             old_datum = &row->old[idx];
2441         } else {
2442             old_datum = ovsdb_datum_default(&column->type);
2443         }
2444
2445         del_set = json_array_create_empty();
2446         ins_map = json_array_create_empty();
2447         any_del = false;
2448         any_ins = false;
2449
2450         for (struct map_op *map_op = map_op_list_first(map_op_list); map_op;
2451              map_op = map_op_list_next(map_op_list, map_op)) {
2452
2453             if (map_op_type(map_op) == MAP_OP_UPDATE) {
2454                 /* Find out if value really changed. */
2455                 struct ovsdb_datum *new_datum;
2456                 unsigned int pos;
2457                 new_datum = map_op_datum(map_op);
2458                 pos = ovsdb_datum_find_key(old_datum,
2459                                            &new_datum->keys[0],
2460                                            key_type);
2461                 if (ovsdb_atom_equals(&new_datum->values[0],
2462                                       &old_datum->values[pos],
2463                                       value_type)) {
2464                     /* No change in value. Move on to next update. */
2465                     continue;
2466                 }
2467             } else if (map_op_type(map_op) == MAP_OP_DELETE){
2468                 /* Verify that there is a key to delete. */
2469                 unsigned int pos;
2470                 pos = ovsdb_datum_find_key(old_datum,
2471                                            &map_op_datum(map_op)->keys[0],
2472                                            key_type);
2473                 if (pos == UINT_MAX) {
2474                     /* No key to delete.  Move on to next update. */
2475                     VLOG_WARN("Trying to delete a key that doesn't "
2476                               "exist in the map.");
2477                     continue;
2478                 }
2479             }
2480
2481             if (map_op_type(map_op) == MAP_OP_INSERT) {
2482                 map = json_array_create_2(
2483                     ovsdb_atom_to_json(&map_op_datum(map_op)->keys[0],
2484                                        key_type),
2485                     ovsdb_atom_to_json(&map_op_datum(map_op)->values[0],
2486                                        value_type));
2487                 json_array_add(ins_map, map);
2488                 any_ins = true;
2489             } else { /* MAP_OP_UPDATE or MAP_OP_DELETE */
2490                 map = ovsdb_atom_to_json(&map_op_datum(map_op)->keys[0],
2491                                          key_type);
2492                 json_array_add(del_set, map);
2493                 any_del = true;
2494             }
2495
2496             /* Generate an additional insert mutate for updates. */
2497             if (map_op_type(map_op) == MAP_OP_UPDATE) {
2498                 map = json_array_create_2(
2499                     ovsdb_atom_to_json(&map_op_datum(map_op)->keys[0],
2500                                        key_type),
2501                     ovsdb_atom_to_json(&map_op_datum(map_op)->values[0],
2502                                        value_type));
2503                 json_array_add(ins_map, map);
2504                 any_ins = true;
2505             }
2506         }
2507
2508         if (any_del) {
2509             col_name = json_string_create(column->name);
2510             mutator = json_string_create("delete");
2511             map = json_array_create_2(json_string_create("set"), del_set);
2512             mutation = json_array_create_3(col_name, mutator, map);
2513             json_array_add(mutations, mutation);
2514             any_mutations = true;
2515         } else {
2516             json_destroy(del_set);
2517         }
2518         if (any_ins) {
2519             col_name = json_string_create(column->name);
2520             mutator = json_string_create("insert");
2521             map = json_array_create_2(json_string_create("map"), ins_map);
2522             mutation = json_array_create_3(col_name, mutator, map);
2523             json_array_add(mutations, mutation);
2524             any_mutations = true;
2525         } else {
2526             json_destroy(ins_map);
2527         }
2528     }
2529     return any_mutations;
2530 }
2531
2532 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
2533  * of the following TXN_* constants:
2534  *
2535  *   TXN_INCOMPLETE:
2536  *
2537  *       The transaction is in progress, but not yet complete.  The caller
2538  *       should call again later, after calling ovsdb_idl_run() to let the IDL
2539  *       do OVSDB protocol processing.
2540  *
2541  *   TXN_UNCHANGED:
2542  *
2543  *       The transaction is complete.  (It didn't actually change the database,
2544  *       so the IDL didn't send any request to the database server.)
2545  *
2546  *   TXN_ABORTED:
2547  *
2548  *       The caller previously called ovsdb_idl_txn_abort().
2549  *
2550  *   TXN_SUCCESS:
2551  *
2552  *       The transaction was successful.  The update made by the transaction
2553  *       (and possibly other changes made by other database clients) should
2554  *       already be visible in the IDL.
2555  *
2556  *   TXN_TRY_AGAIN:
2557  *
2558  *       The transaction failed for some transient reason, e.g. because a
2559  *       "verify" operation reported an inconsistency or due to a network
2560  *       problem.  The caller should wait for a change to the database, then
2561  *       compose a new transaction, and commit the new transaction.
2562  *
2563  *       Use the return value of ovsdb_idl_get_seqno() to wait for a change in
2564  *       the database.  It is important to use its return value *before* the
2565  *       initial call to ovsdb_idl_txn_commit() as the baseline for this
2566  *       purpose, because the change that one should wait for can happen after
2567  *       the initial call but before the call that returns TXN_TRY_AGAIN, and
2568  *       using some other baseline value in that situation could cause an
2569  *       indefinite wait if the database rarely changes.
2570  *
2571  *   TXN_NOT_LOCKED:
2572  *
2573  *       The transaction failed because the IDL has been configured to require
2574  *       a database lock (with ovsdb_idl_set_lock()) but didn't get it yet or
2575  *       has already lost it.
2576  *
2577  * Committing a transaction rolls back all of the changes that it made to the
2578  * IDL's copy of the database.  If the transaction commits successfully, then
2579  * the database server will send an update and, thus, the IDL will be updated
2580  * with the committed changes. */
2581 enum ovsdb_idl_txn_status
2582 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
2583 {
2584     struct ovsdb_idl_row *row;
2585     struct json *operations;
2586     bool any_updates;
2587
2588     if (txn != txn->idl->txn) {
2589         goto coverage_out;
2590     }
2591
2592     /* If we need a lock but don't have it, give up quickly. */
2593     if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
2594         txn->status = TXN_NOT_LOCKED;
2595         goto disassemble_out;
2596     }
2597
2598     operations = json_array_create_1(
2599         json_string_create(txn->idl->class->database));
2600
2601     /* Assert that we have the required lock (avoiding a race). */
2602     if (txn->idl->lock_name) {
2603         struct json *op = json_object_create();
2604         json_array_add(operations, op);
2605         json_object_put_string(op, "op", "assert");
2606         json_object_put_string(op, "lock", txn->idl->lock_name);
2607     }
2608
2609     /* Add prerequisites and declarations of new rows. */
2610     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2611         /* XXX check that deleted rows exist even if no prereqs? */
2612         if (row->prereqs) {
2613             const struct ovsdb_idl_table_class *class = row->table->class;
2614             size_t n_columns = class->n_columns;
2615             struct json *op, *columns, *row_json;
2616             size_t idx;
2617
2618             op = json_object_create();
2619             json_array_add(operations, op);
2620             json_object_put_string(op, "op", "wait");
2621             json_object_put_string(op, "table", class->name);
2622             json_object_put(op, "timeout", json_integer_create(0));
2623             json_object_put(op, "where", where_uuid_equals(&row->uuid));
2624             json_object_put_string(op, "until", "==");
2625             columns = json_array_create_empty();
2626             json_object_put(op, "columns", columns);
2627             row_json = json_object_create();
2628             json_object_put(op, "rows", json_array_create_1(row_json));
2629
2630             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
2631                 const struct ovsdb_idl_column *column = &class->columns[idx];
2632                 json_array_add(columns, json_string_create(column->name));
2633                 json_object_put(row_json, column->name,
2634                                 ovsdb_datum_to_json(&row->old[idx],
2635                                                     &column->type));
2636             }
2637         }
2638     }
2639
2640     /* Add updates. */
2641     any_updates = false;
2642     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
2643         const struct ovsdb_idl_table_class *class = row->table->class;
2644
2645         if (!row->new) {
2646             if (class->is_root) {
2647                 struct json *op = json_object_create();
2648                 json_object_put_string(op, "op", "delete");
2649                 json_object_put_string(op, "table", class->name);
2650                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2651                 json_array_add(operations, op);
2652                 any_updates = true;
2653             } else {
2654                 /* Let ovsdb-server decide whether to really delete it. */
2655             }
2656         } else if (row->old != row->new) {
2657             struct json *row_json;
2658             struct json *op;
2659             size_t idx;
2660
2661             op = json_object_create();
2662             json_object_put_string(op, "op", row->old ? "update" : "insert");
2663             json_object_put_string(op, "table", class->name);
2664             if (row->old) {
2665                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
2666             } else {
2667                 struct ovsdb_idl_txn_insert *insert;
2668
2669                 any_updates = true;
2670
2671                 json_object_put(op, "uuid-name",
2672                                 json_string_create_nocopy(
2673                                     uuid_name_from_uuid(&row->uuid)));
2674
2675                 insert = xmalloc(sizeof *insert);
2676                 insert->dummy = row->uuid;
2677                 insert->op_index = operations->u.array.n - 1;
2678                 uuid_zero(&insert->real);
2679                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
2680                             uuid_hash(&insert->dummy));
2681             }
2682             row_json = json_object_create();
2683             json_object_put(op, "row", row_json);
2684
2685             if (row->written) {
2686                 BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
2687                     const struct ovsdb_idl_column *column =
2688                                                         &class->columns[idx];
2689
2690                     if (row->old
2691                         || !ovsdb_datum_is_default(&row->new[idx],
2692                                                   &column->type)) {
2693                         json_object_put(row_json, column->name,
2694                                         substitute_uuids(
2695                                             ovsdb_datum_to_json(&row->new[idx],
2696                                                                 &column->type),
2697                                             txn));
2698
2699                         /* If anything really changed, consider it an update.
2700                          * We can't suppress not-really-changed values earlier
2701                          * or transactions would become nonatomic (see the big
2702                          * comment inside ovsdb_idl_txn_write()). */
2703                         if (!any_updates && row->old &&
2704                             !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
2705                                                 &column->type)) {
2706                             any_updates = true;
2707                         }
2708                     }
2709                 }
2710             }
2711
2712             if (!row->old || !shash_is_empty(json_object(row_json))) {
2713                 json_array_add(operations, op);
2714             } else {
2715                 json_destroy(op);
2716             }
2717         }
2718
2719         /* Add mutate operation, for partial map updates. */
2720         if (row->map_op_written) {
2721             struct json *op, *mutations;
2722             bool any_mutations;
2723
2724             op = json_object_create();
2725             json_object_put_string(op, "op", "mutate");
2726             json_object_put_string(op, "table", class->name);
2727             json_object_put(op, "where", where_uuid_equals(&row->uuid));
2728             mutations = json_array_create_empty();
2729             any_mutations = ovsdb_idl_txn_extract_mutations(row, mutations);
2730             json_object_put(op, "mutations", mutations);
2731
2732             if (any_mutations) {
2733                 op = substitute_uuids(op, txn);
2734                 json_array_add(operations, op);
2735                 any_updates = true;
2736             } else {
2737                 json_destroy(op);
2738             }
2739         }
2740     }
2741
2742     /* Add increment. */
2743     if (txn->inc_table && any_updates) {
2744         struct json *op;
2745
2746         txn->inc_index = operations->u.array.n - 1;
2747
2748         op = json_object_create();
2749         json_object_put_string(op, "op", "mutate");
2750         json_object_put_string(op, "table", txn->inc_table);
2751         json_object_put(op, "where",
2752                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2753                                          txn));
2754         json_object_put(op, "mutations",
2755                         json_array_create_1(
2756                             json_array_create_3(
2757                                 json_string_create(txn->inc_column),
2758                                 json_string_create("+="),
2759                                 json_integer_create(1))));
2760         json_array_add(operations, op);
2761
2762         op = json_object_create();
2763         json_object_put_string(op, "op", "select");
2764         json_object_put_string(op, "table", txn->inc_table);
2765         json_object_put(op, "where",
2766                         substitute_uuids(where_uuid_equals(&txn->inc_row),
2767                                          txn));
2768         json_object_put(op, "columns",
2769                         json_array_create_1(json_string_create(
2770                                                 txn->inc_column)));
2771         json_array_add(operations, op);
2772     }
2773
2774     if (txn->comment.length) {
2775         struct json *op = json_object_create();
2776         json_object_put_string(op, "op", "comment");
2777         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
2778         json_array_add(operations, op);
2779     }
2780
2781     if (txn->dry_run) {
2782         struct json *op = json_object_create();
2783         json_object_put_string(op, "op", "abort");
2784         json_array_add(operations, op);
2785     }
2786
2787     if (!any_updates) {
2788         txn->status = TXN_UNCHANGED;
2789         json_destroy(operations);
2790     } else if (!jsonrpc_session_send(
2791                    txn->idl->session,
2792                    jsonrpc_create_request(
2793                        "transact", operations, &txn->request_id))) {
2794         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
2795                     json_hash(txn->request_id, 0));
2796         txn->status = TXN_INCOMPLETE;
2797     } else {
2798         txn->status = TXN_TRY_AGAIN;
2799     }
2800
2801 disassemble_out:
2802     ovsdb_idl_txn_disassemble(txn);
2803 coverage_out:
2804     switch (txn->status) {
2805     case TXN_UNCOMMITTED:   COVERAGE_INC(txn_uncommitted);    break;
2806     case TXN_UNCHANGED:     COVERAGE_INC(txn_unchanged);      break;
2807     case TXN_INCOMPLETE:    COVERAGE_INC(txn_incomplete);     break;
2808     case TXN_ABORTED:       COVERAGE_INC(txn_aborted);        break;
2809     case TXN_SUCCESS:       COVERAGE_INC(txn_success);        break;
2810     case TXN_TRY_AGAIN:     COVERAGE_INC(txn_try_again);      break;
2811     case TXN_NOT_LOCKED:    COVERAGE_INC(txn_not_locked);     break;
2812     case TXN_ERROR:         COVERAGE_INC(txn_error);          break;
2813     }
2814
2815     return txn->status;
2816 }
2817
2818 /* Attempts to commit 'txn', blocking until the commit either succeeds or
2819  * fails.  Returns the final commit status, which may be any TXN_* value other
2820  * than TXN_INCOMPLETE.
2821  *
2822  * This function calls ovsdb_idl_run() on 'txn''s IDL, so it may cause the
2823  * return value of ovsdb_idl_get_seqno() to change. */
2824 enum ovsdb_idl_txn_status
2825 ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
2826 {
2827     enum ovsdb_idl_txn_status status;
2828
2829     fatal_signal_run();
2830     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
2831         ovsdb_idl_run(txn->idl);
2832         ovsdb_idl_wait(txn->idl);
2833         ovsdb_idl_txn_wait(txn);
2834         poll_block();
2835     }
2836     return status;
2837 }
2838
2839 /* Returns the final (incremented) value of the column in 'txn' that was set to
2840  * be incremented by ovsdb_idl_txn_increment().  'txn' must have committed
2841  * successfully. */
2842 int64_t
2843 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
2844 {
2845     ovs_assert(txn->status == TXN_SUCCESS);
2846     return txn->inc_new_value;
2847 }
2848
2849 /* Aborts 'txn' without sending it to the database server.  This is effective
2850  * only if ovsdb_idl_txn_commit() has not yet been called for 'txn'.
2851  * Otherwise, it has no effect.
2852  *
2853  * Aborting a transaction doesn't free its memory.  Use
2854  * ovsdb_idl_txn_destroy() to do that. */
2855 void
2856 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
2857 {
2858     ovsdb_idl_txn_disassemble(txn);
2859     if (txn->status == TXN_UNCOMMITTED || txn->status == TXN_INCOMPLETE) {
2860         txn->status = TXN_ABORTED;
2861     }
2862 }
2863
2864 /* Returns a string that reports the error status for 'txn'.  The caller must
2865  * not modify or free the returned string.  A call to ovsdb_idl_txn_destroy()
2866  * for 'txn' may free the returned string.
2867  *
2868  * The return value is ordinarily one of the strings that
2869  * ovsdb_idl_txn_status_to_string() would return, but if the transaction failed
2870  * due to an error reported by the database server, the return value is that
2871  * error. */
2872 const char *
2873 ovsdb_idl_txn_get_error(const struct ovsdb_idl_txn *txn)
2874 {
2875     if (txn->status != TXN_ERROR) {
2876         return ovsdb_idl_txn_status_to_string(txn->status);
2877     } else if (txn->error) {
2878         return txn->error;
2879     } else {
2880         return "no error details available";
2881     }
2882 }
2883
2884 static void
2885 ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn,
2886                              const struct json *json)
2887 {
2888     if (txn->error == NULL) {
2889         txn->error = json_to_string(json, JSSF_SORT);
2890     }
2891 }
2892
2893 /* For transaction 'txn' that completed successfully, finds and returns the
2894  * permanent UUID that the database assigned to a newly inserted row, given the
2895  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
2896  *
2897  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
2898  * if it was assigned by that function and then deleted by
2899  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
2900  * and then deleted within a single transaction are never sent to the database
2901  * server, so it never assigns them a permanent UUID.) */
2902 const struct uuid *
2903 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
2904                               const struct uuid *uuid)
2905 {
2906     const struct ovsdb_idl_txn_insert *insert;
2907
2908     ovs_assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
2909     HMAP_FOR_EACH_IN_BUCKET (insert, hmap_node,
2910                              uuid_hash(uuid), &txn->inserted_rows) {
2911         if (uuid_equals(uuid, &insert->dummy)) {
2912             return &insert->real;
2913         }
2914     }
2915     return NULL;
2916 }
2917
2918 static void
2919 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
2920                        enum ovsdb_idl_txn_status status)
2921 {
2922     txn->status = status;
2923     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
2924 }
2925
2926 /* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
2927  * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
2928  * ovs-vswitchd).
2929  *
2930  * 'datum' must have the correct type for its column.  The IDL does not check
2931  * that it meets schema constraints, but ovsdb-server will do so at commit time
2932  * so it had better be correct.
2933  *
2934  * A transaction must be in progress.  Replication of 'column' must not have
2935  * been disabled (by calling ovsdb_idl_omit()).
2936  *
2937  * Usually this function is used indirectly through one of the "set" functions
2938  * generated by ovsdb-idlc.
2939  *
2940  * Takes ownership of what 'datum' points to (and in some cases destroys that
2941  * data before returning) but makes a copy of 'datum' itself.  (Commonly
2942  * 'datum' is on the caller's stack.) */
2943 static void
2944 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
2945                       const struct ovsdb_idl_column *column,
2946                       struct ovsdb_datum *datum, bool owns_datum)
2947 {
2948     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
2949     const struct ovsdb_idl_table_class *class;
2950     size_t column_idx;
2951     bool write_only;
2952
2953     if (ovsdb_idl_row_is_synthetic(row)) {
2954         goto discard_datum;
2955     }
2956
2957     class = row->table->class;
2958     column_idx = column - class->columns;
2959     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
2960
2961     ovs_assert(row->new != NULL);
2962     ovs_assert(column_idx < class->n_columns);
2963     ovs_assert(row->old == NULL ||
2964                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
2965
2966     if (row->table->idl->verify_write_only && !write_only) {
2967         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
2968                  " explicitly configured not to.", class->name, column->name);
2969         goto discard_datum;
2970     }
2971
2972     /* If this is a write-only column and the datum being written is the same
2973      * as the one already there, just skip the update entirely.  This is worth
2974      * optimizing because we have a lot of columns that get periodically
2975      * refreshed into the database but don't actually change that often.
2976      *
2977      * We don't do this for read/write columns because that would break
2978      * atomicity of transactions--some other client might have written a
2979      * different value in that column since we read it.  (But if a whole
2980      * transaction only does writes of existing values, without making any real
2981      * changes, we will drop the whole transaction later in
2982      * ovsdb_idl_txn_commit().) */
2983     if (write_only && ovsdb_datum_equals(ovsdb_idl_read(row, column),
2984                                          datum, &column->type)) {
2985         goto discard_datum;
2986     }
2987
2988     if (hmap_node_is_null(&row->txn_node)) {
2989         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
2990                     uuid_hash(&row->uuid));
2991     }
2992     if (row->old == row->new) {
2993         row->new = xmalloc(class->n_columns * sizeof *row->new);
2994     }
2995     if (!row->written) {
2996         row->written = bitmap_allocate(class->n_columns);
2997     }
2998     if (bitmap_is_set(row->written, column_idx)) {
2999         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
3000     } else {
3001         bitmap_set1(row->written, column_idx);
3002     }
3003     if (owns_datum) {
3004         row->new[column_idx] = *datum;
3005     } else {
3006         ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
3007     }
3008     (column->unparse)(row);
3009     (column->parse)(row, &row->new[column_idx]);
3010     return;
3011
3012 discard_datum:
3013     if (owns_datum) {
3014         ovsdb_datum_destroy(datum, &column->type);
3015     }
3016 }
3017
3018 void
3019 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
3020                     const struct ovsdb_idl_column *column,
3021                     struct ovsdb_datum *datum)
3022 {
3023     ovsdb_idl_txn_write__(row, column, datum, true);
3024 }
3025
3026 void
3027 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
3028                           const struct ovsdb_idl_column *column,
3029                           const struct ovsdb_datum *datum)
3030 {
3031     ovsdb_idl_txn_write__(row, column,
3032                           CONST_CAST(struct ovsdb_datum *, datum), false);
3033 }
3034
3035 /* Causes the original contents of 'column' in 'row_' to be verified as a
3036  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
3037  * changed (or if 'row_' was deleted) between the time that the IDL originally
3038  * read its contents and the time that the transaction commits, then the
3039  * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
3040  * TXN_AGAIN_NOW (depending on whether the database change has already been
3041  * received).
3042  *
3043  * The intention is that, to ensure that no transaction commits based on dirty
3044  * reads, an application should call ovsdb_idl_txn_verify() on each data item
3045  * read as part of a read-modify-write operation.
3046  *
3047  * In some cases ovsdb_idl_txn_verify() reduces to a no-op, because the current
3048  * value of 'column' is already known:
3049  *
3050  *   - If 'row_' is a row created by the current transaction (returned by
3051  *     ovsdb_idl_txn_insert()).
3052  *
3053  *   - If 'column' has already been modified (with ovsdb_idl_txn_write())
3054  *     within the current transaction.
3055  *
3056  * Because of the latter property, always call ovsdb_idl_txn_verify() *before*
3057  * ovsdb_idl_txn_write() for a given read-modify-write.
3058  *
3059  * A transaction must be in progress.
3060  *
3061  * Usually this function is used indirectly through one of the "verify"
3062  * functions generated by ovsdb-idlc. */
3063 void
3064 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
3065                      const struct ovsdb_idl_column *column)
3066 {
3067     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
3068     const struct ovsdb_idl_table_class *class;
3069     size_t column_idx;
3070
3071     if (ovsdb_idl_row_is_synthetic(row)) {
3072         return;
3073     }
3074
3075     class = row->table->class;
3076     column_idx = column - class->columns;
3077
3078     ovs_assert(row->new != NULL);
3079     ovs_assert(row->old == NULL ||
3080                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
3081     if (!row->old
3082         || (row->written && bitmap_is_set(row->written, column_idx))) {
3083         return;
3084     }
3085
3086     if (hmap_node_is_null(&row->txn_node)) {
3087         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
3088                     uuid_hash(&row->uuid));
3089     }
3090     if (!row->prereqs) {
3091         row->prereqs = bitmap_allocate(class->n_columns);
3092     }
3093     bitmap_set1(row->prereqs, column_idx);
3094 }
3095
3096 /* Deletes 'row_' from its table.  May free 'row_', so it must not be
3097  * accessed afterward.
3098  *
3099  * A transaction must be in progress.
3100  *
3101  * Usually this function is used indirectly through one of the "delete"
3102  * functions generated by ovsdb-idlc. */
3103 void
3104 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
3105 {
3106     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
3107
3108     if (ovsdb_idl_row_is_synthetic(row)) {
3109         return;
3110     }
3111
3112     ovs_assert(row->new != NULL);
3113     if (!row->old) {
3114         ovsdb_idl_row_unparse(row);
3115         ovsdb_idl_row_clear_new(row);
3116         ovs_assert(!row->prereqs);
3117         hmap_remove(&row->table->rows, &row->hmap_node);
3118         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
3119         free(row);
3120         return;
3121     }
3122     if (hmap_node_is_null(&row->txn_node)) {
3123         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
3124                     uuid_hash(&row->uuid));
3125     }
3126     ovsdb_idl_row_clear_new(row);
3127     row->new = NULL;
3128 }
3129
3130 /* Inserts and returns a new row in the table with the specified 'class' in the
3131  * database with open transaction 'txn'.
3132  *
3133  * The new row is assigned a provisional UUID.  If 'uuid' is null then one is
3134  * randomly generated; otherwise 'uuid' should specify a randomly generated
3135  * UUID not otherwise in use.  ovsdb-server will assign a different UUID when
3136  * 'txn' is committed, but the IDL will replace any uses of the provisional
3137  * UUID in the data to be to be committed by the UUID assigned by
3138  * ovsdb-server.
3139  *
3140  * Usually this function is used indirectly through one of the "insert"
3141  * functions generated by ovsdb-idlc. */
3142 const struct ovsdb_idl_row *
3143 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
3144                      const struct ovsdb_idl_table_class *class,
3145                      const struct uuid *uuid)
3146 {
3147     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
3148
3149     if (uuid) {
3150         ovs_assert(!ovsdb_idl_txn_get_row(txn, uuid));
3151         row->uuid = *uuid;
3152     } else {
3153         uuid_generate(&row->uuid);
3154     }
3155
3156     row->table = ovsdb_idl_table_from_class(txn->idl, class);
3157     row->new = xmalloc(class->n_columns * sizeof *row->new);
3158     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
3159     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
3160     return row;
3161 }
3162
3163 static void
3164 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
3165 {
3166     struct ovsdb_idl_txn *txn;
3167
3168     HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
3169         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
3170     }
3171 }
3172
3173 static struct ovsdb_idl_txn *
3174 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
3175 {
3176     struct ovsdb_idl_txn *txn;
3177
3178     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
3179                              json_hash(id, 0), &idl->outstanding_txns) {
3180         if (json_equal(id, txn->request_id)) {
3181             return txn;
3182         }
3183     }
3184     return NULL;
3185 }
3186
3187 static bool
3188 check_json_type(const struct json *json, enum json_type type, const char *name)
3189 {
3190     if (!json) {
3191         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
3192         return false;
3193     } else if (json->type != type) {
3194         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
3195                      name, json_type_to_string(json->type),
3196                      json_type_to_string(type));
3197         return false;
3198     } else {
3199         return true;
3200     }
3201 }
3202
3203 static bool
3204 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
3205                                 const struct json_array *results)
3206 {
3207     struct json *count, *rows, *row, *column;
3208     struct shash *mutate, *select;
3209
3210     if (txn->inc_index + 2 > results->n) {
3211         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
3212                      "for increment (has %"PRIuSIZE", needs %u)",
3213                      results->n, txn->inc_index + 2);
3214         return false;
3215     }
3216
3217     /* We know that this is a JSON object because the loop in
3218      * ovsdb_idl_txn_process_reply() checked. */
3219     mutate = json_object(results->elems[txn->inc_index]);
3220     count = shash_find_data(mutate, "count");
3221     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
3222         return false;
3223     }
3224     if (count->u.integer != 1) {
3225         VLOG_WARN_RL(&syntax_rl,
3226                      "\"mutate\" reply \"count\" is %lld instead of 1",
3227                      count->u.integer);
3228         return false;
3229     }
3230
3231     select = json_object(results->elems[txn->inc_index + 1]);
3232     rows = shash_find_data(select, "rows");
3233     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
3234         return false;
3235     }
3236     if (rows->u.array.n != 1) {
3237         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %"PRIuSIZE" elements "
3238                      "instead of 1",
3239                      rows->u.array.n);
3240         return false;
3241     }
3242     row = rows->u.array.elems[0];
3243     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
3244         return false;
3245     }
3246     column = shash_find_data(json_object(row), txn->inc_column);
3247     if (!check_json_type(column, JSON_INTEGER,
3248                          "\"select\" reply inc column")) {
3249         return false;
3250     }
3251     txn->inc_new_value = column->u.integer;
3252     return true;
3253 }
3254
3255 static bool
3256 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
3257                                    const struct json_array *results)
3258 {
3259     static const struct ovsdb_base_type uuid_type = OVSDB_BASE_UUID_INIT;
3260     struct ovsdb_error *error;
3261     struct json *json_uuid;
3262     union ovsdb_atom uuid;
3263     struct shash *reply;
3264
3265     if (insert->op_index >= results->n) {
3266         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
3267                      "for insert (has %"PRIuSIZE", needs %u)",
3268                      results->n, insert->op_index);
3269         return false;
3270     }
3271
3272     /* We know that this is a JSON object because the loop in
3273      * ovsdb_idl_txn_process_reply() checked. */
3274     reply = json_object(results->elems[insert->op_index]);
3275     json_uuid = shash_find_data(reply, "uuid");
3276     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
3277         return false;
3278     }
3279
3280     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
3281     if (error) {
3282         char *s = ovsdb_error_to_string(error);
3283         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
3284                      "UUID: %s", s);
3285         free(s);
3286         ovsdb_error_destroy(error);
3287         return false;
3288     }
3289
3290     insert->real = uuid.uuid;
3291
3292     return true;
3293 }
3294
3295 static bool
3296 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
3297                             const struct jsonrpc_msg *msg)
3298 {
3299     struct ovsdb_idl_txn *txn;
3300     enum ovsdb_idl_txn_status status;
3301
3302     txn = ovsdb_idl_txn_find(idl, msg->id);
3303     if (!txn) {
3304         return false;
3305     }
3306
3307     if (msg->type == JSONRPC_ERROR) {
3308         status = TXN_ERROR;
3309     } else if (msg->result->type != JSON_ARRAY) {
3310         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
3311         status = TXN_ERROR;
3312     } else {
3313         struct json_array *ops = &msg->result->u.array;
3314         int hard_errors = 0;
3315         int soft_errors = 0;
3316         int lock_errors = 0;
3317         size_t i;
3318
3319         for (i = 0; i < ops->n; i++) {
3320             struct json *op = ops->elems[i];
3321
3322             if (op->type == JSON_NULL) {
3323                 /* This isn't an error in itself but indicates that some prior
3324                  * operation failed, so make sure that we know about it. */
3325                 soft_errors++;
3326             } else if (op->type == JSON_OBJECT) {
3327                 struct json *error;
3328
3329                 error = shash_find_data(json_object(op), "error");
3330                 if (error) {
3331                     if (error->type == JSON_STRING) {
3332                         if (!strcmp(error->u.string, "timed out")) {
3333                             soft_errors++;
3334                         } else if (!strcmp(error->u.string, "not owner")) {
3335                             lock_errors++;
3336                         } else if (strcmp(error->u.string, "aborted")) {
3337                             hard_errors++;
3338                             ovsdb_idl_txn_set_error_json(txn, op);
3339                         }
3340                     } else {
3341                         hard_errors++;
3342                         ovsdb_idl_txn_set_error_json(txn, op);
3343                         VLOG_WARN_RL(&syntax_rl,
3344                                      "\"error\" in reply is not JSON string");
3345                     }
3346                 }
3347             } else {
3348                 hard_errors++;
3349                 ovsdb_idl_txn_set_error_json(txn, op);
3350                 VLOG_WARN_RL(&syntax_rl,
3351                              "operation reply is not JSON null or object");
3352             }
3353         }
3354
3355         if (!soft_errors && !hard_errors && !lock_errors) {
3356             struct ovsdb_idl_txn_insert *insert;
3357
3358             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
3359                 hard_errors++;
3360             }
3361
3362             HMAP_FOR_EACH (insert, hmap_node, &txn->inserted_rows) {
3363                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
3364                     hard_errors++;
3365                 }
3366             }
3367         }
3368
3369         status = (hard_errors ? TXN_ERROR
3370                   : lock_errors ? TXN_NOT_LOCKED
3371                   : soft_errors ? TXN_TRY_AGAIN
3372                   : TXN_SUCCESS);
3373     }
3374
3375     ovsdb_idl_txn_complete(txn, status);
3376     return true;
3377 }
3378
3379 /* Returns the transaction currently active for 'row''s IDL.  A transaction
3380  * must currently be active. */
3381 struct ovsdb_idl_txn *
3382 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
3383 {
3384     struct ovsdb_idl_txn *txn = row->table->idl->txn;
3385     ovs_assert(txn != NULL);
3386     return txn;
3387 }
3388
3389 /* Returns the IDL on which 'txn' acts. */
3390 struct ovsdb_idl *
3391 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
3392 {
3393     return txn->idl;
3394 }
3395
3396 /* Blocks until 'idl' successfully connects to the remote database and
3397  * retrieves its contents. */
3398 void
3399 ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
3400 {
3401     while (1) {
3402         ovsdb_idl_run(idl);
3403         if (ovsdb_idl_has_ever_connected(idl)) {
3404             return;
3405         }
3406         ovsdb_idl_wait(idl);
3407         poll_block();
3408     }
3409 }
3410 \f
3411 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
3412  * the database server and to avoid modifying the database when the lock cannot
3413  * be acquired (that is, when another client has the same lock).
3414  *
3415  * If 'lock_name' is NULL, drops the locking requirement and releases the
3416  * lock. */
3417 void
3418 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
3419 {
3420     ovs_assert(!idl->txn);
3421     ovs_assert(hmap_is_empty(&idl->outstanding_txns));
3422
3423     if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
3424         /* Release previous lock. */
3425         ovsdb_idl_send_unlock_request(idl);
3426         free(idl->lock_name);
3427         idl->lock_name = NULL;
3428         idl->is_lock_contended = false;
3429     }
3430
3431     if (lock_name && !idl->lock_name) {
3432         /* Acquire new lock. */
3433         idl->lock_name = xstrdup(lock_name);
3434         ovsdb_idl_send_lock_request(idl);
3435     }
3436 }
3437
3438 /* Returns true if 'idl' is configured to obtain a lock and owns that lock.
3439  *
3440  * Locking and unlocking happens asynchronously from the database client's
3441  * point of view, so the information is only useful for optimization (e.g. if
3442  * the client doesn't have the lock then there's no point in trying to write to
3443  * the database). */
3444 bool
3445 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
3446 {
3447     return idl->has_lock;
3448 }
3449
3450 /* Returns true if 'idl' is configured to obtain a lock but the database server
3451  * has indicated that some other client already owns the requested lock. */
3452 bool
3453 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
3454 {
3455     return idl->is_lock_contended;
3456 }
3457
3458 static void
3459 ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
3460 {
3461     if (new_has_lock && !idl->has_lock) {
3462         if (idl->state == IDL_S_MONITORING ||
3463             idl->state == IDL_S_MONITORING_COND) {
3464             idl->change_seqno++;
3465         } else {
3466             /* We're setting up a session, so don't signal that the database
3467              * changed.  Finalizing the session will increment change_seqno
3468              * anyhow. */
3469         }
3470         idl->is_lock_contended = false;
3471     }
3472     idl->has_lock = new_has_lock;
3473 }
3474
3475 static void
3476 ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
3477                               struct json **idp)
3478 {
3479     ovsdb_idl_update_has_lock(idl, false);
3480
3481     json_destroy(idl->lock_request_id);
3482     idl->lock_request_id = NULL;
3483
3484     if (jsonrpc_session_is_connected(idl->session)) {
3485         struct json *params;
3486
3487         params = json_array_create_1(json_string_create(idl->lock_name));
3488         jsonrpc_session_send(idl->session,
3489                              jsonrpc_create_request(method, params, idp));
3490     }
3491 }
3492
3493 static void
3494 ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
3495 {
3496     ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
3497 }
3498
3499 static void
3500 ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
3501 {
3502     ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
3503 }
3504
3505 static void
3506 ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
3507 {
3508     bool got_lock;
3509
3510     json_destroy(idl->lock_request_id);
3511     idl->lock_request_id = NULL;
3512
3513     if (result->type == JSON_OBJECT) {
3514         const struct json *locked;
3515
3516         locked = shash_find_data(json_object(result), "locked");
3517         got_lock = locked && locked->type == JSON_TRUE;
3518     } else {
3519         got_lock = false;
3520     }
3521
3522     ovsdb_idl_update_has_lock(idl, got_lock);
3523     if (!got_lock) {
3524         idl->is_lock_contended = true;
3525     }
3526 }
3527
3528 static void
3529 ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
3530                             const struct json *params,
3531                             bool new_has_lock)
3532 {
3533     if (idl->lock_name
3534         && params->type == JSON_ARRAY
3535         && json_array(params)->n > 0
3536         && json_array(params)->elems[0]->type == JSON_STRING) {
3537         const char *lock_name = json_string(json_array(params)->elems[0]);
3538
3539         if (!strcmp(idl->lock_name, lock_name)) {
3540             ovsdb_idl_update_has_lock(idl, new_has_lock);
3541             if (!new_has_lock) {
3542                 idl->is_lock_contended = true;
3543             }
3544         }
3545     }
3546 }
3547
3548 /* Inserts a new Map Operation into current transaction. */
3549 static void
3550 ovsdb_idl_txn_add_map_op(struct ovsdb_idl_row *row,
3551                          const struct ovsdb_idl_column *column,
3552                          struct ovsdb_datum *datum,
3553                          enum map_op_type op_type)
3554 {
3555     const struct ovsdb_idl_table_class *class;
3556     size_t column_idx;
3557     struct map_op *map_op;
3558
3559     class = row->table->class;
3560     column_idx = column - class->columns;
3561
3562     /* Check if a map operation list exists for this column. */
3563     if (!row->map_op_written) {
3564         row->map_op_written = bitmap_allocate(class->n_columns);
3565         row->map_op_lists = xzalloc(class->n_columns *
3566                                     sizeof *row->map_op_lists);
3567     }
3568     if (!row->map_op_lists[column_idx]) {
3569         row->map_op_lists[column_idx] = map_op_list_create();
3570     }
3571
3572     /* Add a map operation to the corresponding list. */
3573     map_op = map_op_create(datum, op_type);
3574     bitmap_set1(row->map_op_written, column_idx);
3575     map_op_list_add(row->map_op_lists[column_idx], map_op, &column->type);
3576
3577     /* Add this row to transaction's list of rows. */
3578     if (hmap_node_is_null(&row->txn_node)) {
3579         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
3580                     uuid_hash(&row->uuid));
3581     }
3582 }
3583
3584 static bool
3585 is_valid_partial_update(const struct ovsdb_idl_row *row,
3586                         const struct ovsdb_idl_column *column,
3587                         struct ovsdb_datum *datum)
3588 {
3589     /* Verify that this column is being monitored. */
3590     unsigned int column_idx = column - row->table->class->columns;
3591     if (!(row->table->modes[column_idx] & OVSDB_IDL_MONITOR)) {
3592         VLOG_WARN("cannot partially update non-monitored column");
3593         return false;
3594     }
3595
3596     /* Verify that the update affects a single element. */
3597     if (datum->n != 1) {
3598         VLOG_WARN("invalid datum for partial update");
3599         return false;
3600     }
3601
3602     return true;
3603 }
3604
3605 /* Inserts the key-value specified in 'datum' into the map in 'column' in
3606  * 'row_'. If the key already exist in 'column', then it's value is updated
3607  * with the value in 'datum'. The key-value in 'datum' must be of the same type
3608  * as the keys-values in 'column'. This function takes ownership of 'datum'.
3609  *
3610  * Usually this function is used indirectly through one of the "update"
3611  * functions generated by vswitch-idl. */
3612 void
3613 ovsdb_idl_txn_write_partial_map(const struct ovsdb_idl_row *row_,
3614                                 const struct ovsdb_idl_column *column,
3615                                 struct ovsdb_datum *datum)
3616 {
3617     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
3618     enum ovsdb_atomic_type key_type;
3619     enum map_op_type op_type;
3620     unsigned int pos;
3621     const struct ovsdb_datum *old_datum;
3622
3623     if (!is_valid_partial_update(row, column, datum)) {
3624         ovsdb_datum_destroy(datum, &column->type);
3625         free(datum);
3626         return;
3627     }
3628
3629     /* Find out if this is an insert or an update. */
3630     key_type = column->type.key.type;
3631     old_datum = ovsdb_idl_read(row, column);
3632     pos = ovsdb_datum_find_key(old_datum, &datum->keys[0], key_type);
3633     op_type = pos == UINT_MAX ? MAP_OP_INSERT : MAP_OP_UPDATE;
3634
3635     ovsdb_idl_txn_add_map_op(row, column, datum, op_type);
3636 }
3637
3638 /* Deletes the key specified in 'datum' from the map in 'column' in 'row_'.
3639  * The key in 'datum' must be of the same type as the keys in 'column'.
3640  * The value in 'datum' must be NULL. This function takes ownership of
3641  * 'datum'.
3642  *
3643  * Usually this function is used indirectly through one of the "update"
3644  * functions generated by vswitch-idl. */
3645 void
3646 ovsdb_idl_txn_delete_partial_map(const struct ovsdb_idl_row *row_,
3647                                  const struct ovsdb_idl_column *column,
3648                                  struct ovsdb_datum *datum)
3649 {
3650     struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
3651
3652     if (!is_valid_partial_update(row, column, datum)) {
3653         struct ovsdb_type type_ = column->type;
3654         type_.value.type = OVSDB_TYPE_VOID;
3655         ovsdb_datum_destroy(datum, &type_);
3656         free(datum);
3657         return;
3658     }
3659     ovsdb_idl_txn_add_map_op(row, column, datum, MAP_OP_DELETE);
3660 }
3661
3662 void
3663 ovsdb_idl_loop_destroy(struct ovsdb_idl_loop *loop)
3664 {
3665     if (loop) {
3666         ovsdb_idl_destroy(loop->idl);
3667     }
3668 }
3669
3670 struct ovsdb_idl_txn *
3671 ovsdb_idl_loop_run(struct ovsdb_idl_loop *loop)
3672 {
3673     ovsdb_idl_run(loop->idl);
3674     loop->open_txn = (loop->committing_txn
3675                       || ovsdb_idl_get_seqno(loop->idl) == loop->skip_seqno
3676                       ? NULL
3677                       : ovsdb_idl_txn_create(loop->idl));
3678     return loop->open_txn;
3679 }
3680
3681 void
3682 ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
3683 {
3684     if (loop->open_txn) {
3685         loop->committing_txn = loop->open_txn;
3686         loop->open_txn = NULL;
3687
3688         loop->precommit_seqno = ovsdb_idl_get_seqno(loop->idl);
3689     }
3690
3691     struct ovsdb_idl_txn *txn = loop->committing_txn;
3692     if (txn) {
3693         enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(txn);
3694         if (status != TXN_INCOMPLETE) {
3695             switch (status) {
3696             case TXN_TRY_AGAIN:
3697                 /* We want to re-evaluate the database when it's changed from
3698                  * the contents that it had when we started the commit.  (That
3699                  * might have already happened.) */
3700                 loop->skip_seqno = loop->precommit_seqno;
3701                 if (ovsdb_idl_get_seqno(loop->idl) != loop->skip_seqno) {
3702                     poll_immediate_wake();
3703                 }
3704                 break;
3705
3706             case TXN_SUCCESS:
3707                 /* If the database has already changed since we started the
3708                  * commit, re-evaluate it immediately to avoid missing a change
3709                  * for a while. */
3710                 loop->cur_cfg = loop->next_cfg;
3711                 if (ovsdb_idl_get_seqno(loop->idl) != loop->precommit_seqno) {
3712                     poll_immediate_wake();
3713                 }
3714                 break;
3715
3716             case TXN_UNCHANGED:
3717                 loop->cur_cfg = loop->next_cfg;
3718                 break;
3719
3720             case TXN_ABORTED:
3721             case TXN_NOT_LOCKED:
3722             case TXN_ERROR:
3723                 break;
3724
3725             case TXN_UNCOMMITTED:
3726             case TXN_INCOMPLETE:
3727                 OVS_NOT_REACHED();
3728             }
3729             ovsdb_idl_txn_destroy(txn);
3730             loop->committing_txn = NULL;
3731         }
3732     }
3733
3734     ovsdb_idl_wait(loop->idl);
3735 }