Merge branch 'master' into next
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009, 2010 Nicira Networks.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <limits.h>
24 #include <stdlib.h>
25
26 #include "bitmap.h"
27 #include "dynamic-string.h"
28 #include "json.h"
29 #include "jsonrpc.h"
30 #include "ovsdb-data.h"
31 #include "ovsdb-error.h"
32 #include "ovsdb-idl-provider.h"
33 #include "poll-loop.h"
34 #include "shash.h"
35 #include "util.h"
36
37 #define THIS_MODULE VLM_ovsdb_idl
38 #include "vlog.h"
39
40 /* An arc from one idl_row to another.  When row A contains a UUID that
41  * references row B, this is represented by an arc from A (the source) to B
42  * (the destination).
43  *
44  * Arcs from a row to itself are omitted, that is, src and dst are always
45  * different.
46  *
47  * Arcs are never duplicated, that is, even if there are multiple references
48  * from A to B, there is only a single arc from A to B.
49  *
50  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
51  * A.  Both an arc and its converse may both be present, if each row refers
52  * to the other circularly.
53  *
54  * The source and destination row may be in the same table or in different
55  * tables.
56  */
57 struct ovsdb_idl_arc {
58     struct list src_node;       /* In src->src_arcs list. */
59     struct list dst_node;       /* In dst->dst_arcs list. */
60     struct ovsdb_idl_row *src;  /* Source row. */
61     struct ovsdb_idl_row *dst;  /* Destination row. */
62 };
63
64 struct ovsdb_idl {
65     const struct ovsdb_idl_class *class;
66     struct jsonrpc_session *session;
67     struct shash table_by_name;
68     struct ovsdb_idl_table *tables;
69     struct json *monitor_request_id;
70     unsigned int last_monitor_request_seqno;
71     unsigned int change_seqno;
72
73     /* Transaction support. */
74     struct ovsdb_idl_txn *txn;
75     struct hmap outstanding_txns;
76 };
77
78 struct ovsdb_idl_txn {
79     struct hmap_node hmap_node;
80     struct json *request_id;
81     struct ovsdb_idl *idl;
82     struct hmap txn_rows;
83     enum ovsdb_idl_txn_status status;
84     bool dry_run;
85     struct ds comment;
86
87     /* Increments. */
88     char *inc_table;
89     char *inc_column;
90     struct json *inc_where;
91     unsigned int inc_index;
92     int64_t inc_new_value;
93
94     /* Inserted rows. */
95     struct hmap inserted_rows;
96 };
97
98 struct ovsdb_idl_txn_insert {
99     struct hmap_node hmap_node; /* In struct ovsdb_idl_txn's inserted_rows. */
100     struct uuid dummy;          /* Dummy UUID used locally. */
101     int op_index;               /* Index into transaction's operation array. */
102     struct uuid real;           /* Real UUID used by database server. */
103 };
104
105 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
106 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
107
108 static void ovsdb_idl_clear(struct ovsdb_idl *);
109 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
110 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
111 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
112                                                     const struct json *);
113 static void ovsdb_idl_process_update(struct ovsdb_idl_table *,
114                                      const struct uuid *,
115                                      const struct json *old,
116                                      const struct json *new);
117 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
118 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
119 static void ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
120
121 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
122 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
123     const struct ovsdb_idl_table_class *);
124 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
125                                                   const struct uuid *);
126 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
127
128 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
129 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
130 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
131 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
132
133 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
134 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
135                                         const struct jsonrpc_msg *msg);
136
137 struct ovsdb_idl *
138 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class)
139 {
140     struct ovsdb_idl *idl;
141     size_t i;
142
143     idl = xzalloc(sizeof *idl);
144     idl->class = class;
145     idl->session = jsonrpc_session_open(remote);
146     shash_init(&idl->table_by_name);
147     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
148     for (i = 0; i < class->n_tables; i++) {
149         const struct ovsdb_idl_table_class *tc = &class->tables[i];
150         struct ovsdb_idl_table *table = &idl->tables[i];
151         size_t j;
152
153         assert(!shash_find(&idl->table_by_name, tc->name));
154         shash_add(&idl->table_by_name, tc->name, table);
155         table->class = tc;
156         shash_init(&table->columns);
157         for (j = 0; j < tc->n_columns; j++) {
158             const struct ovsdb_idl_column *column = &tc->columns[j];
159
160             assert(!shash_find(&table->columns, column->name));
161             shash_add(&table->columns, column->name, column);
162         }
163         hmap_init(&table->rows);
164         table->idl = idl;
165     }
166     idl->last_monitor_request_seqno = UINT_MAX;
167     hmap_init(&idl->outstanding_txns);
168
169     return idl;
170 }
171
172 void
173 ovsdb_idl_destroy(struct ovsdb_idl *idl)
174 {
175     if (idl) {
176         size_t i;
177
178         assert(!idl->txn);
179         ovsdb_idl_clear(idl);
180         jsonrpc_session_close(idl->session);
181
182         for (i = 0; i < idl->class->n_tables; i++) {
183             struct ovsdb_idl_table *table = &idl->tables[i];
184             shash_destroy(&table->columns);
185             hmap_destroy(&table->rows);
186         }
187         shash_destroy(&idl->table_by_name);
188         free(idl->tables);
189         json_destroy(idl->monitor_request_id);
190         free(idl);
191     }
192 }
193
194 static void
195 ovsdb_idl_clear(struct ovsdb_idl *idl)
196 {
197     bool changed = false;
198     size_t i;
199
200     for (i = 0; i < idl->class->n_tables; i++) {
201         struct ovsdb_idl_table *table = &idl->tables[i];
202         struct ovsdb_idl_row *row, *next_row;
203
204         if (hmap_is_empty(&table->rows)) {
205             continue;
206         }
207
208         changed = true;
209         HMAP_FOR_EACH_SAFE (row, next_row, struct ovsdb_idl_row, hmap_node,
210                             &table->rows) {
211             struct ovsdb_idl_arc *arc, *next_arc;
212
213             if (!ovsdb_idl_row_is_orphan(row)) {
214                 ovsdb_idl_row_unparse(row);
215             }
216             LIST_FOR_EACH_SAFE (arc, next_arc, struct ovsdb_idl_arc, src_node,
217                                 &row->src_arcs) {
218                 free(arc);
219             }
220             /* No need to do anything with dst_arcs: some node has those arcs
221              * as forward arcs and will destroy them itself. */
222
223             ovsdb_idl_row_destroy(row);
224         }
225     }
226
227     if (changed) {
228         idl->change_seqno++;
229     }
230 }
231
232 void
233 ovsdb_idl_run(struct ovsdb_idl *idl)
234 {
235     int i;
236
237     assert(!idl->txn);
238     jsonrpc_session_run(idl->session);
239     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
240         struct jsonrpc_msg *msg, *reply;
241         unsigned int seqno;
242
243         seqno = jsonrpc_session_get_seqno(idl->session);
244         if (idl->last_monitor_request_seqno != seqno) {
245             idl->last_monitor_request_seqno = seqno;
246             ovsdb_idl_txn_abort_all(idl);
247             ovsdb_idl_send_monitor_request(idl);
248             break;
249         }
250
251         msg = jsonrpc_session_recv(idl->session);
252         if (!msg) {
253             break;
254         }
255
256         reply = NULL;
257         if (msg->type == JSONRPC_NOTIFY
258                    && !strcmp(msg->method, "update")
259                    && msg->params->type == JSON_ARRAY
260                    && msg->params->u.array.n == 2
261                    && msg->params->u.array.elems[0]->type == JSON_NULL) {
262             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
263         } else if (msg->type == JSONRPC_REPLY
264                    && idl->monitor_request_id
265                    && json_equal(idl->monitor_request_id, msg->id)) {
266             json_destroy(idl->monitor_request_id);
267             idl->monitor_request_id = NULL;
268             ovsdb_idl_clear(idl);
269             ovsdb_idl_parse_update(idl, msg->result);
270         } else if (msg->type == JSONRPC_REPLY
271                    && msg->id && msg->id->type == JSON_STRING
272                    && !strcmp(msg->id->u.string, "echo")) {
273             /* It's a reply to our echo request.  Ignore it. */
274         } else if ((msg->type == JSONRPC_ERROR
275                     || msg->type == JSONRPC_REPLY)
276                    && ovsdb_idl_txn_process_reply(idl, msg)) {
277             /* ovsdb_idl_txn_process_reply() did everything needful. */
278         } else {
279             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
280              * a transaction before we receive the reply, so keep the log level
281              * low. */
282             VLOG_DBG("%s: received unexpected %s message",
283                      jsonrpc_session_get_name(idl->session),
284                      jsonrpc_msg_type_to_string(msg->type));
285         }
286         if (reply) {
287             jsonrpc_session_send(idl->session, reply);
288         }
289         jsonrpc_msg_destroy(msg);
290     }
291 }
292
293 void
294 ovsdb_idl_wait(struct ovsdb_idl *idl)
295 {
296     jsonrpc_session_wait(idl->session);
297     jsonrpc_session_recv_wait(idl->session);
298 }
299
300 unsigned int
301 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
302 {
303     return idl->change_seqno;
304 }
305
306 bool
307 ovsdb_idl_has_ever_connected(const struct ovsdb_idl *idl)
308 {
309     return ovsdb_idl_get_seqno(idl) != 0;
310 }
311
312 void
313 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
314 {
315     jsonrpc_session_force_reconnect(idl->session);
316 }
317 \f
318 static void
319 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
320 {
321     struct json *monitor_requests;
322     struct jsonrpc_msg *msg;
323     size_t i;
324
325     monitor_requests = json_object_create();
326     for (i = 0; i < idl->class->n_tables; i++) {
327         const struct ovsdb_idl_table *table = &idl->tables[i];
328         const struct ovsdb_idl_table_class *tc = table->class;
329         struct json *monitor_request, *columns;
330         size_t i;
331
332         monitor_request = json_object_create();
333         columns = json_array_create_empty();
334         for (i = 0; i < tc->n_columns; i++) {
335             const struct ovsdb_idl_column *column = &tc->columns[i];
336             json_array_add(columns, json_string_create(column->name));
337         }
338         json_object_put(monitor_request, "columns", columns);
339         json_object_put(monitor_requests, tc->name, monitor_request);
340     }
341
342     json_destroy(idl->monitor_request_id);
343     msg = jsonrpc_create_request(
344         "monitor", json_array_create_2(json_null_create(), monitor_requests),
345         &idl->monitor_request_id);
346     jsonrpc_session_send(idl->session, msg);
347 }
348
349 static void
350 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates)
351 {
352     struct ovsdb_error *error;
353
354     idl->change_seqno++;
355
356     error = ovsdb_idl_parse_update__(idl, table_updates);
357     if (error) {
358         if (!VLOG_DROP_WARN(&syntax_rl)) {
359             char *s = ovsdb_error_to_string(error);
360             VLOG_WARN_RL(&syntax_rl, "%s", s);
361             free(s);
362         }
363         ovsdb_error_destroy(error);
364     }
365 }
366
367 static struct ovsdb_error *
368 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
369                          const struct json *table_updates)
370 {
371     const struct shash_node *tables_node;
372
373     if (table_updates->type != JSON_OBJECT) {
374         return ovsdb_syntax_error(table_updates, NULL,
375                                   "<table-updates> is not an object");
376     }
377     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
378         const struct json *table_update = tables_node->data;
379         const struct shash_node *table_node;
380         struct ovsdb_idl_table *table;
381
382         table = shash_find_data(&idl->table_by_name, tables_node->name);
383         if (!table) {
384             return ovsdb_syntax_error(
385                 table_updates, NULL,
386                 "<table-updates> includes unknown table \"%s\"",
387                 tables_node->name);
388         }
389
390         if (table_update->type != JSON_OBJECT) {
391             return ovsdb_syntax_error(table_update, NULL,
392                                       "<table-update> for table \"%s\" is "
393                                       "not an object", table->class->name);
394         }
395         SHASH_FOR_EACH (table_node, json_object(table_update)) {
396             const struct json *row_update = table_node->data;
397             const struct json *old_json, *new_json;
398             struct uuid uuid;
399
400             if (!uuid_from_string(&uuid, table_node->name)) {
401                 return ovsdb_syntax_error(table_update, NULL,
402                                           "<table-update> for table \"%s\" "
403                                           "contains bad UUID "
404                                           "\"%s\" as member name",
405                                           table->class->name,
406                                           table_node->name);
407             }
408             if (row_update->type != JSON_OBJECT) {
409                 return ovsdb_syntax_error(row_update, NULL,
410                                           "<table-update> for table \"%s\" "
411                                           "contains <row-update> for %s that "
412                                           "is not an object",
413                                           table->class->name,
414                                           table_node->name);
415             }
416
417             old_json = shash_find_data(json_object(row_update), "old");
418             new_json = shash_find_data(json_object(row_update), "new");
419             if (old_json && old_json->type != JSON_OBJECT) {
420                 return ovsdb_syntax_error(old_json, NULL,
421                                           "\"old\" <row> is not object");
422             } else if (new_json && new_json->type != JSON_OBJECT) {
423                 return ovsdb_syntax_error(new_json, NULL,
424                                           "\"new\" <row> is not object");
425             } else if ((old_json != NULL) + (new_json != NULL)
426                        != shash_count(json_object(row_update))) {
427                 return ovsdb_syntax_error(row_update, NULL,
428                                           "<row-update> contains unexpected "
429                                           "member");
430             } else if (!old_json && !new_json) {
431                 return ovsdb_syntax_error(row_update, NULL,
432                                           "<row-update> missing \"old\" "
433                                           "and \"new\" members");
434             }
435
436             ovsdb_idl_process_update(table, &uuid, old_json, new_json);
437         }
438     }
439
440     return NULL;
441 }
442
443 static struct ovsdb_idl_row *
444 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
445 {
446     struct ovsdb_idl_row *row;
447
448     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, hmap_node,
449                              uuid_hash(uuid), &table->rows) {
450         if (uuid_equals(&row->uuid, uuid)) {
451             return row;
452         }
453     }
454     return NULL;
455 }
456
457 static void
458 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
459                          const struct uuid *uuid, const struct json *old,
460                          const struct json *new)
461 {
462     struct ovsdb_idl_row *row;
463
464     row = ovsdb_idl_get_row(table, uuid);
465     if (!new) {
466         /* Delete row. */
467         if (row && !ovsdb_idl_row_is_orphan(row)) {
468             /* XXX perhaps we should check the 'old' values? */
469             ovsdb_idl_delete_row(row);
470         } else {
471             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
472                          "from table %s",
473                          UUID_ARGS(uuid), table->class->name);
474         }
475     } else if (!old) {
476         /* Insert row. */
477         if (!row) {
478             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
479         } else if (ovsdb_idl_row_is_orphan(row)) {
480             ovsdb_idl_insert_row(row, new);
481         } else {
482             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
483                          "table %s", UUID_ARGS(uuid), table->class->name);
484             ovsdb_idl_modify_row(row, new);
485         }
486     } else {
487         /* Modify row. */
488         if (row) {
489             /* XXX perhaps we should check the 'old' values? */
490             if (!ovsdb_idl_row_is_orphan(row)) {
491                 ovsdb_idl_modify_row(row, new);
492             } else {
493                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
494                              "referenced row "UUID_FMT" in table %s",
495                              UUID_ARGS(uuid), table->class->name);
496                 ovsdb_idl_insert_row(row, new);
497             }
498         } else {
499             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
500                          "in table %s", UUID_ARGS(uuid), table->class->name);
501             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
502         }
503     }
504 }
505
506 static void
507 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
508 {
509     struct ovsdb_idl_table *table = row->table;
510     struct shash_node *node;
511
512     SHASH_FOR_EACH (node, json_object(row_json)) {
513         const char *column_name = node->name;
514         const struct ovsdb_idl_column *column;
515         struct ovsdb_datum datum;
516         struct ovsdb_error *error;
517
518         column = shash_find_data(&table->columns, column_name);
519         if (!column) {
520             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
521                          column_name, UUID_ARGS(&row->uuid));
522             continue;
523         }
524
525         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
526         if (!error) {
527             ovsdb_datum_swap(&row->old[column - table->class->columns],
528                              &datum);
529             ovsdb_datum_destroy(&datum, &column->type);
530         } else {
531             char *s = ovsdb_error_to_string(error);
532             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
533                          " in table %s: %s", column_name,
534                          UUID_ARGS(&row->uuid), table->class->name, s);
535             free(s);
536             ovsdb_error_destroy(error);
537         }
538     }
539 }
540
541 static bool
542 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
543 {
544     return !row->old;
545 }
546
547 static void
548 ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
549 {
550     const struct ovsdb_idl_table_class *class = row->table->class;
551     size_t i;
552
553     for (i = 0; i < class->n_columns; i++) {
554         const struct ovsdb_idl_column *c = &class->columns[i];
555         (c->parse)(row, &row->old[i]);
556     }
557 }
558
559 static void
560 ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
561 {
562     const struct ovsdb_idl_table_class *class = row->table->class;
563     size_t i;
564
565     for (i = 0; i < class->n_columns; i++) {
566         const struct ovsdb_idl_column *c = &class->columns[i];
567         (c->unparse)(row);
568     }
569 }
570
571 static void
572 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
573 {
574     assert(row->old == row->new);
575     if (!ovsdb_idl_row_is_orphan(row)) {
576         const struct ovsdb_idl_table_class *class = row->table->class;
577         size_t i;
578
579         for (i = 0; i < class->n_columns; i++) {
580             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
581         }
582         free(row->old);
583         row->old = row->new = NULL;
584     }
585 }
586
587 static void
588 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
589 {
590     if (row->old != row->new) {
591         if (row->new) {
592             const struct ovsdb_idl_table_class *class = row->table->class;
593             size_t i;
594
595             BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
596                 ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
597             }
598             free(row->new);
599             free(row->written);
600             row->written = NULL;
601         }
602         row->new = row->old;
603     }
604 }
605
606 static void
607 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
608 {
609     struct ovsdb_idl_arc *arc, *next;
610
611     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
612      * that this causes to be unreferenced. */
613     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, src_node,
614                         &row->src_arcs) {
615         list_remove(&arc->dst_node);
616         if (destroy_dsts
617             && ovsdb_idl_row_is_orphan(arc->dst)
618             && list_is_empty(&arc->dst->dst_arcs)) {
619             ovsdb_idl_row_destroy(arc->dst);
620         }
621         free(arc);
622     }
623     list_init(&row->src_arcs);
624 }
625
626 /* Force nodes that reference 'row' to reparse. */
627 static void
628 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row)
629 {
630     struct ovsdb_idl_arc *arc, *next;
631
632     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
633      * 'arc', so we need to use the "safe" variant of list traversal.  However,
634      * calling an ovsdb_idl_column's 'parse' function will add an arc
635      * equivalent to 'arc' to row->arcs.  That could be a problem for
636      * traversal, but it adds it at the beginning of the list to prevent us
637      * from stumbling upon it again.
638      *
639      * (If duplicate arcs were possible then we would need to make sure that
640      * 'next' didn't also point into 'arc''s destination, but we forbid
641      * duplicate arcs.) */
642     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, dst_node,
643                         &row->dst_arcs) {
644         struct ovsdb_idl_row *ref = arc->src;
645
646         ovsdb_idl_row_unparse(ref);
647         ovsdb_idl_row_clear_arcs(ref, false);
648         ovsdb_idl_row_parse(ref);
649     }
650 }
651
652 static struct ovsdb_idl_row *
653 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
654 {
655     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
656     list_init(&row->src_arcs);
657     list_init(&row->dst_arcs);
658     hmap_node_nullify(&row->txn_node);
659     return row;
660 }
661
662 static struct ovsdb_idl_row *
663 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
664 {
665     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
666     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
667     row->uuid = *uuid;
668     row->table = table;
669     return row;
670 }
671
672 static void
673 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
674 {
675     if (row) {
676         ovsdb_idl_row_clear_old(row);
677         hmap_remove(&row->table->rows, &row->hmap_node);
678         free(row);
679     }
680 }
681
682 static void
683 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
684 {
685     const struct ovsdb_idl_table_class *class = row->table->class;
686     size_t i;
687
688     assert(!row->old && !row->new);
689     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
690     for (i = 0; i < class->n_columns; i++) {
691         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
692     }
693     ovsdb_idl_row_update(row, row_json);
694     ovsdb_idl_row_parse(row);
695
696     ovsdb_idl_row_reparse_backrefs(row);
697 }
698
699 static void
700 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
701 {
702     ovsdb_idl_row_unparse(row);
703     ovsdb_idl_row_clear_arcs(row, true);
704     ovsdb_idl_row_clear_old(row);
705     if (list_is_empty(&row->dst_arcs)) {
706         ovsdb_idl_row_destroy(row);
707     } else {
708         ovsdb_idl_row_reparse_backrefs(row);
709     }
710 }
711
712 static void
713 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
714 {
715     ovsdb_idl_row_unparse(row);
716     ovsdb_idl_row_clear_arcs(row, true);
717     ovsdb_idl_row_update(row, row_json);
718     ovsdb_idl_row_parse(row);
719 }
720
721 static bool
722 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
723 {
724     const struct ovsdb_idl_arc *arc;
725
726     /* No self-arcs. */
727     if (src == dst) {
728         return false;
729     }
730
731     /* No duplicate arcs.
732      *
733      * We only need to test whether the first arc in dst->dst_arcs originates
734      * at 'src', since we add all of the arcs from a given source in a clump
735      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
736      * added at the front of the dst_arcs list. */
737     if (list_is_empty(&dst->dst_arcs)) {
738         return true;
739     }
740     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
741     return arc->src != src;
742 }
743
744 static struct ovsdb_idl_table *
745 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
746                            const struct ovsdb_idl_table_class *table_class)
747 {
748     return &idl->tables[table_class - idl->class->tables];
749 }
750
751 struct ovsdb_idl_row *
752 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
753                       struct ovsdb_idl_table_class *dst_table_class,
754                       const struct uuid *dst_uuid)
755 {
756     struct ovsdb_idl *idl = src->table->idl;
757     struct ovsdb_idl_table *dst_table;
758     struct ovsdb_idl_arc *arc;
759     struct ovsdb_idl_row *dst;
760
761     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
762     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
763     if (idl->txn) {
764         /* We're being called from ovsdb_idl_txn_write().  We must not update
765          * any arcs, because the transaction will be backed out at commit or
766          * abort time and we don't want our graph screwed up.
767          *
768          * Just return the destination row, if there is one and it has not been
769          * deleted. */
770         if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
771             return dst;
772         }
773         return NULL;
774     } else {
775         /* We're being called from some other context.  Update the graph. */
776         if (!dst) {
777             dst = ovsdb_idl_row_create(dst_table, dst_uuid);
778         }
779
780         /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
781         if (may_add_arc(src, dst)) {
782             /* The arc *must* be added at the front of the dst_arcs list.  See
783              * ovsdb_idl_row_reparse_backrefs() for details. */
784             arc = xmalloc(sizeof *arc);
785             list_push_front(&src->src_arcs, &arc->src_node);
786             list_push_front(&dst->dst_arcs, &arc->dst_node);
787             arc->src = src;
788             arc->dst = dst;
789         }
790
791         return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
792     }
793 }
794
795 const struct ovsdb_idl_row *
796 ovsdb_idl_get_row_for_uuid(const struct ovsdb_idl *idl,
797                            const struct ovsdb_idl_table_class *tc,
798                            const struct uuid *uuid)
799 {
800     return ovsdb_idl_get_row(ovsdb_idl_table_from_class(idl, tc), uuid);
801 }
802
803 static struct ovsdb_idl_row *
804 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
805 {
806     for (; node; node = hmap_next(&table->rows, node)) {
807         struct ovsdb_idl_row *row;
808
809         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
810         if (row->new || !ovsdb_idl_row_is_orphan(row)) {
811             return row;
812         }
813     }
814     return NULL;
815 }
816
817 const struct ovsdb_idl_row *
818 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
819                     const struct ovsdb_idl_table_class *table_class)
820 {
821     struct ovsdb_idl_table *table
822         = ovsdb_idl_table_from_class(idl, table_class);
823     return next_real_row(table, hmap_first(&table->rows));
824 }
825
826 const struct ovsdb_idl_row *
827 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
828 {
829     struct ovsdb_idl_table *table = row->table;
830
831     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
832 }
833 \f
834 /* Transactions. */
835
836 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
837                                    enum ovsdb_idl_txn_status);
838
839 const char *
840 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
841 {
842     switch (status) {
843     case TXN_UNCHANGED:
844         return "unchanged";
845     case TXN_INCOMPLETE:
846         return "incomplete";
847     case TXN_ABORTED:
848         return "aborted";
849     case TXN_SUCCESS:
850         return "success";
851     case TXN_TRY_AGAIN:
852         return "try again";
853     case TXN_ERROR:
854         return "error";
855     }
856     return "<unknown>";
857 }
858
859 struct ovsdb_idl_txn *
860 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
861 {
862     struct ovsdb_idl_txn *txn;
863
864     assert(!idl->txn);
865     idl->txn = txn = xmalloc(sizeof *txn);
866     txn->request_id = NULL;
867     txn->idl = idl;
868     hmap_init(&txn->txn_rows);
869     txn->status = TXN_INCOMPLETE;
870     txn->dry_run = false;
871     ds_init(&txn->comment);
872
873     txn->inc_table = NULL;
874     txn->inc_column = NULL;
875     txn->inc_where = NULL;
876
877     hmap_init(&txn->inserted_rows);
878
879     return txn;
880 }
881
882 void
883 ovsdb_idl_txn_add_comment(struct ovsdb_idl_txn *txn, const char *s)
884 {
885     if (txn->comment.length) {
886         ds_put_char(&txn->comment, '\n');
887     }
888     ds_put_cstr(&txn->comment, s);
889 }
890
891 void
892 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
893 {
894     txn->dry_run = true;
895 }
896
897 void
898 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn, const char *table,
899                         const char *column, const struct json *where)
900 {
901     assert(!txn->inc_table);
902     txn->inc_table = xstrdup(table);
903     txn->inc_column = xstrdup(column);
904     txn->inc_where = where ? json_clone(where) : json_array_create_empty();
905 }
906
907 void
908 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
909 {
910     struct ovsdb_idl_txn_insert *insert, *next;
911
912     json_destroy(txn->request_id);
913     if (txn->status == TXN_INCOMPLETE) {
914         hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
915     }
916     ovsdb_idl_txn_abort(txn);
917     ds_destroy(&txn->comment);
918     free(txn->inc_table);
919     free(txn->inc_column);
920     json_destroy(txn->inc_where);
921     HMAP_FOR_EACH_SAFE (insert, next, struct ovsdb_idl_txn_insert, hmap_node,
922                         &txn->inserted_rows) {
923         free(insert);
924     }
925     hmap_destroy(&txn->inserted_rows);
926     free(txn);
927 }
928
929 void
930 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
931 {
932     if (txn->status != TXN_INCOMPLETE) {
933         poll_immediate_wake();
934     }
935 }
936
937 static struct json *
938 where_uuid_equals(const struct uuid *uuid)
939 {
940     return
941         json_array_create_1(
942             json_array_create_3(
943                 json_string_create("_uuid"),
944                 json_string_create("=="),
945                 json_array_create_2(
946                     json_string_create("uuid"),
947                     json_string_create_nocopy(
948                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
949 }
950
951 static char *
952 uuid_name_from_uuid(const struct uuid *uuid)
953 {
954     char *name;
955     char *p;
956
957     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
958     for (p = name; *p != '\0'; p++) {
959         if (*p == '-') {
960             *p = '_';
961         }
962     }
963
964     return name;
965 }
966
967 static const struct ovsdb_idl_row *
968 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
969 {
970     const struct ovsdb_idl_row *row;
971
972     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, txn_node,
973                              uuid_hash(uuid), &txn->txn_rows) {
974         if (uuid_equals(&row->uuid, uuid)) {
975             return row;
976         }
977     }
978     return NULL;
979 }
980
981 /* XXX there must be a cleaner way to do this */
982 static struct json *
983 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
984 {
985     if (json->type == JSON_ARRAY) {
986         struct uuid uuid;
987         size_t i;
988
989         if (json->u.array.n == 2
990             && json->u.array.elems[0]->type == JSON_STRING
991             && json->u.array.elems[1]->type == JSON_STRING
992             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
993             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
994             const struct ovsdb_idl_row *row;
995
996             row = ovsdb_idl_txn_get_row(txn, &uuid);
997             if (row && !row->old && row->new) {
998                 json_destroy(json);
999
1000                 return json_array_create_2(
1001                     json_string_create("named-uuid"),
1002                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
1003             }
1004         }
1005
1006         for (i = 0; i < json->u.array.n; i++) {
1007             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
1008                                                       txn);
1009         }
1010     } else if (json->type == JSON_OBJECT) {
1011         struct shash_node *node;
1012
1013         SHASH_FOR_EACH (node, json_object(json)) {
1014             node->data = substitute_uuids(node->data, txn);
1015         }
1016     }
1017     return json;
1018 }
1019
1020 static void
1021 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
1022 {
1023     struct ovsdb_idl_row *row, *next;
1024
1025     /* This must happen early.  Otherwise, ovsdb_idl_row_parse() will call an
1026      * ovsdb_idl_column's 'parse' function, which will call
1027      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
1028      * transaction and fail to update the graph.  */
1029     txn->idl->txn = NULL;
1030
1031     HMAP_FOR_EACH_SAFE (row, next, struct ovsdb_idl_row, txn_node,
1032                         &txn->txn_rows) {
1033         if (row->old) {
1034             if (row->written) {
1035                 ovsdb_idl_row_unparse(row);
1036                 ovsdb_idl_row_clear_arcs(row, false);
1037                 ovsdb_idl_row_parse(row);
1038             }
1039         } else {
1040             ovsdb_idl_row_unparse(row);
1041         }
1042         ovsdb_idl_row_clear_new(row);
1043
1044         free(row->prereqs);
1045         row->prereqs = NULL;
1046
1047         free(row->written);
1048         row->written = NULL;
1049
1050         hmap_remove(&txn->txn_rows, &row->txn_node);
1051         hmap_node_nullify(&row->txn_node);
1052         if (!row->old) {
1053             hmap_remove(&row->table->rows, &row->hmap_node);
1054             free(row);
1055         }
1056     }
1057     hmap_destroy(&txn->txn_rows);
1058     hmap_init(&txn->txn_rows);
1059 }
1060
1061 enum ovsdb_idl_txn_status
1062 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
1063 {
1064     struct ovsdb_idl_row *row;
1065     struct json *operations;
1066     bool any_updates;
1067
1068     if (txn != txn->idl->txn) {
1069         return txn->status;
1070     }
1071
1072     operations = json_array_create_empty();
1073
1074     /* Add prerequisites and declarations of new rows. */
1075     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
1076         /* XXX check that deleted rows exist even if no prereqs? */
1077         if (row->prereqs) {
1078             const struct ovsdb_idl_table_class *class = row->table->class;
1079             size_t n_columns = class->n_columns;
1080             struct json *op, *columns, *row_json;
1081             size_t idx;
1082
1083             op = json_object_create();
1084             json_array_add(operations, op);
1085             json_object_put_string(op, "op", "wait");
1086             json_object_put_string(op, "table", class->name);
1087             json_object_put(op, "timeout", json_integer_create(0));
1088             json_object_put(op, "where", where_uuid_equals(&row->uuid));
1089             json_object_put_string(op, "until", "==");
1090             columns = json_array_create_empty();
1091             json_object_put(op, "columns", columns);
1092             row_json = json_object_create();
1093             json_object_put(op, "rows", json_array_create_1(row_json));
1094
1095             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
1096                 const struct ovsdb_idl_column *column = &class->columns[idx];
1097                 json_array_add(columns, json_string_create(column->name));
1098                 json_object_put(row_json, column->name,
1099                                 ovsdb_datum_to_json(&row->old[idx],
1100                                                     &column->type));
1101             }
1102         }
1103         if (row->new && !row->old) {
1104             struct json *op;
1105
1106             op = json_object_create();
1107             json_array_add(operations, op);
1108             json_object_put_string(op, "op", "declare");
1109             json_object_put(op, "uuid-name",
1110                             json_string_create_nocopy(
1111                                 uuid_name_from_uuid(&row->uuid)));
1112         }
1113     }
1114
1115     /* Add updates. */
1116     any_updates = false;
1117     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
1118         const struct ovsdb_idl_table_class *class = row->table->class;
1119
1120         if (row->old == row->new) {
1121             continue;
1122         } else if (!row->new) {
1123             struct json *op = json_object_create();
1124             json_object_put_string(op, "op", "delete");
1125             json_object_put_string(op, "table", class->name);
1126             json_object_put(op, "where", where_uuid_equals(&row->uuid));
1127             json_array_add(operations, op);
1128             any_updates = true;
1129         } else {
1130             struct json *row_json;
1131             struct json *op;
1132             size_t idx;
1133
1134             op = json_object_create();
1135             json_object_put_string(op, "op", row->old ? "update" : "insert");
1136             json_object_put_string(op, "table", class->name);
1137             if (row->old) {
1138                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
1139             } else {
1140                 struct ovsdb_idl_txn_insert *insert;
1141
1142                 json_object_put(op, "uuid-name",
1143                                 json_string_create_nocopy(
1144                                     uuid_name_from_uuid(&row->uuid)));
1145
1146                 insert = xmalloc(sizeof *insert);
1147                 insert->dummy = row->uuid;
1148                 insert->op_index = operations->u.array.n;
1149                 uuid_zero(&insert->real);
1150                 hmap_insert(&txn->inserted_rows, &insert->hmap_node,
1151                             uuid_hash(&insert->dummy));
1152             }
1153             row_json = json_object_create();
1154             json_object_put(op, "row", row_json);
1155
1156             BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
1157                 const struct ovsdb_idl_column *column = &class->columns[idx];
1158
1159                 if (row->old
1160                     ? !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
1161                                           &column->type)
1162                     : !ovsdb_datum_is_default(&row->new[idx], &column->type)) {
1163                     json_object_put(row_json, column->name,
1164                                     substitute_uuids(
1165                                         ovsdb_datum_to_json(&row->new[idx],
1166                                                             &column->type),
1167                                         txn));
1168                 }
1169             }
1170
1171             if (!row->old || !shash_is_empty(json_object(row_json))) {
1172                 json_array_add(operations, op);
1173                 any_updates = true;
1174             } else {
1175                 json_destroy(op);
1176             }
1177         }
1178     }
1179
1180     /* Add increment. */
1181     if (txn->inc_table && any_updates) {
1182         struct json *op;
1183
1184         txn->inc_index = operations->u.array.n;
1185
1186         op = json_object_create();
1187         json_object_put_string(op, "op", "mutate");
1188         json_object_put_string(op, "table", txn->inc_table);
1189         json_object_put(op, "where",
1190                         substitute_uuids(json_clone(txn->inc_where), txn));
1191         json_object_put(op, "mutations",
1192                         json_array_create_1(
1193                             json_array_create_3(
1194                                 json_string_create(txn->inc_column),
1195                                 json_string_create("+="),
1196                                 json_integer_create(1))));
1197         json_array_add(operations, op);
1198
1199         op = json_object_create();
1200         json_object_put_string(op, "op", "select");
1201         json_object_put_string(op, "table", txn->inc_table);
1202         json_object_put(op, "where",
1203                         substitute_uuids(json_clone(txn->inc_where), txn));
1204         json_object_put(op, "columns",
1205                         json_array_create_1(json_string_create(
1206                                                 txn->inc_column)));
1207         json_array_add(operations, op);
1208     }
1209
1210     if (txn->comment.length) {
1211         struct json *op = json_object_create();
1212         json_object_put_string(op, "op", "comment");
1213         json_object_put_string(op, "comment", ds_cstr(&txn->comment));
1214         json_array_add(operations, op);
1215     }
1216
1217     if (txn->dry_run) {
1218         struct json *op = json_object_create();
1219         json_object_put_string(op, "op", "abort");
1220         json_array_add(operations, op);
1221     }
1222
1223     if (!any_updates) {
1224         txn->status = TXN_UNCHANGED;
1225         json_destroy(operations);
1226     } else if (!jsonrpc_session_send(
1227                    txn->idl->session,
1228                    jsonrpc_create_request(
1229                        "transact", operations, &txn->request_id))) {
1230         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
1231                     json_hash(txn->request_id, 0));
1232     } else {
1233         txn->status = TXN_INCOMPLETE;
1234     }
1235
1236     ovsdb_idl_txn_disassemble(txn);
1237     return txn->status;
1238 }
1239
1240 int64_t
1241 ovsdb_idl_txn_get_increment_new_value(const struct ovsdb_idl_txn *txn)
1242 {
1243     assert(txn->status == TXN_SUCCESS);
1244     return txn->inc_new_value;
1245 }
1246
1247 void
1248 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
1249 {
1250     ovsdb_idl_txn_disassemble(txn);
1251     if (txn->status == TXN_INCOMPLETE) {
1252         txn->status = TXN_ABORTED;
1253     }
1254 }
1255
1256 /* For transaction 'txn' that completed successfully, finds and returns the
1257  * permanent UUID that the database assigned to a newly inserted row, given the
1258  * 'uuid' that ovsdb_idl_txn_insert() assigned locally to that row.
1259  *
1260  * Returns NULL if 'uuid' is not a UUID assigned by ovsdb_idl_txn_insert() or
1261  * if it was assigned by that function and then deleted by
1262  * ovsdb_idl_txn_delete() within the same transaction.  (Rows that are inserted
1263  * and then deleted within a single transaction are never sent to the database
1264  * server, so it never assigns them a permanent UUID.) */
1265 const struct uuid *
1266 ovsdb_idl_txn_get_insert_uuid(const struct ovsdb_idl_txn *txn,
1267                               const struct uuid *uuid)
1268 {
1269     const struct ovsdb_idl_txn_insert *insert;
1270
1271     assert(txn->status == TXN_SUCCESS || txn->status == TXN_UNCHANGED);
1272     HMAP_FOR_EACH_IN_BUCKET (insert, struct ovsdb_idl_txn_insert, hmap_node,
1273                              uuid_hash(uuid), &txn->inserted_rows) {
1274         if (uuid_equals(uuid, &insert->dummy)) {
1275             return &insert->real;
1276         }
1277     }
1278     return NULL;
1279 }
1280
1281 static void
1282 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1283                        enum ovsdb_idl_txn_status status)
1284 {
1285     txn->status = status;
1286     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1287 }
1288
1289 void
1290 ovsdb_idl_txn_read(const struct ovsdb_idl_row *row,
1291                    const struct ovsdb_idl_column *column,
1292                    struct ovsdb_datum *datum)
1293 {
1294     const struct ovsdb_idl_table_class *class = row->table->class;
1295     size_t column_idx = column - class->columns;
1296
1297     assert(row->new != NULL);
1298     if (row->written && bitmap_is_set(row->written, column_idx)) {
1299         ovsdb_datum_clone(datum, &row->new[column_idx], &column->type);
1300     } else if (row->old) {
1301         ovsdb_datum_clone(datum, &row->old[column_idx], &column->type);
1302     } else {
1303         ovsdb_datum_init_default(datum, &column->type);
1304     }
1305 }
1306
1307 void
1308 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row_,
1309                     const struct ovsdb_idl_column *column,
1310                     struct ovsdb_datum *datum)
1311 {
1312     struct ovsdb_idl_row *row = (struct ovsdb_idl_row *) row_;
1313     const struct ovsdb_idl_table_class *class = row->table->class;
1314     size_t column_idx = column - class->columns;
1315
1316     assert(row->new != NULL);
1317     if (hmap_node_is_null(&row->txn_node)) {
1318         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1319                     uuid_hash(&row->uuid));
1320     }
1321     if (row->old == row->new) {
1322         row->new = xmalloc(class->n_columns * sizeof *row->new);
1323     }
1324     if (!row->written) {
1325         row->written = bitmap_allocate(class->n_columns);
1326     }
1327     if (bitmap_is_set(row->written, column_idx)) {
1328         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
1329     } else {
1330         bitmap_set1(row->written, column_idx);
1331     }
1332     row->new[column_idx] = *datum;
1333     (column->unparse)(row);
1334     (column->parse)(row, &row->new[column_idx]);
1335 }
1336
1337 void
1338 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
1339                      const struct ovsdb_idl_column *column)
1340 {
1341     struct ovsdb_idl_row *row = (struct ovsdb_idl_row *) row_;
1342     const struct ovsdb_idl_table_class *class = row->table->class;
1343     size_t column_idx = column - class->columns;
1344
1345     assert(row->new != NULL);
1346     if (!row->old
1347         || (row->written && bitmap_is_set(row->written, column_idx))) {
1348         return;
1349     }
1350
1351     if (hmap_node_is_null(&row->txn_node)) {
1352         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1353                     uuid_hash(&row->uuid));
1354     }
1355     if (!row->prereqs) {
1356         row->prereqs = bitmap_allocate(class->n_columns);
1357     }
1358     bitmap_set1(row->prereqs, column_idx);
1359 }
1360
1361 void
1362 ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
1363 {
1364     struct ovsdb_idl_row *row = (struct ovsdb_idl_row *) row_;
1365
1366     assert(row->new != NULL);
1367     if (!row->old) {
1368         ovsdb_idl_row_unparse(row);
1369         ovsdb_idl_row_clear_new(row);
1370         assert(!row->prereqs);
1371         hmap_remove(&row->table->rows, &row->hmap_node);
1372         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
1373         free(row);
1374         return;
1375     }
1376     if (hmap_node_is_null(&row->txn_node)) {
1377         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1378                     uuid_hash(&row->uuid));
1379     }
1380     ovsdb_idl_row_clear_new(row);
1381     row->new = NULL;
1382 }
1383
1384 const struct ovsdb_idl_row *
1385 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
1386                      const struct ovsdb_idl_table_class *class)
1387 {
1388     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
1389     uuid_generate(&row->uuid);
1390     row->table = ovsdb_idl_table_from_class(txn->idl, class);
1391     row->new = xmalloc(class->n_columns * sizeof *row->new);
1392     row->written = bitmap_allocate(class->n_columns);
1393     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
1394     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
1395     return row;
1396 }
1397
1398 static void
1399 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
1400 {
1401     struct ovsdb_idl_txn *txn;
1402
1403     HMAP_FOR_EACH (txn, struct ovsdb_idl_txn, hmap_node,
1404                    &idl->outstanding_txns) {
1405         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
1406     }
1407 }
1408
1409 static struct ovsdb_idl_txn *
1410 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
1411 {
1412     struct ovsdb_idl_txn *txn;
1413
1414     HMAP_FOR_EACH_WITH_HASH (txn, struct ovsdb_idl_txn, hmap_node,
1415                              json_hash(id, 0), &idl->outstanding_txns) {
1416         if (json_equal(id, txn->request_id)) {
1417             return txn;
1418         }
1419     }
1420     return NULL;
1421 }
1422
1423 static bool
1424 check_json_type(const struct json *json, enum json_type type, const char *name)
1425 {
1426     if (!json) {
1427         VLOG_WARN_RL(&syntax_rl, "%s is missing", name);
1428         return false;
1429     } else if (json->type != type) {
1430         VLOG_WARN_RL(&syntax_rl, "%s is %s instead of %s",
1431                      name, json_type_to_string(json->type),
1432                      json_type_to_string(type));
1433         return false;
1434     } else {
1435         return true;
1436     }
1437 }
1438
1439 static bool
1440 ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
1441                                 const struct json_array *results)
1442 {
1443     struct json *count, *rows, *row, *column;
1444     struct shash *mutate, *select;
1445
1446     if (txn->inc_index + 2 > results->n) {
1447         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
1448                      "for increment (has %u, needs %u)",
1449                      results->n, txn->inc_index + 2);
1450         return false;
1451     }
1452
1453     /* We know that this is a JSON object because the loop in
1454      * ovsdb_idl_txn_process_reply() checked. */
1455     mutate = json_object(results->elems[txn->inc_index]);
1456     count = shash_find_data(mutate, "count");
1457     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
1458         return false;
1459     }
1460     if (count->u.integer != 1) {
1461         VLOG_WARN_RL(&syntax_rl,
1462                      "\"mutate\" reply \"count\" is %"PRId64" instead of 1",
1463                      count->u.integer);
1464         return false;
1465     }
1466
1467     select = json_object(results->elems[txn->inc_index + 1]);
1468     rows = shash_find_data(select, "rows");
1469     if (!check_json_type(rows, JSON_ARRAY, "\"select\" reply \"rows\"")) {
1470         return false;
1471     }
1472     if (rows->u.array.n != 1) {
1473         VLOG_WARN_RL(&syntax_rl, "\"select\" reply \"rows\" has %u elements "
1474                      "instead of 1",
1475                      rows->u.array.n);
1476         return false;
1477     }
1478     row = rows->u.array.elems[0];
1479     if (!check_json_type(row, JSON_OBJECT, "\"select\" reply row")) {
1480         return false;
1481     }
1482     column = shash_find_data(json_object(row), txn->inc_column);
1483     if (!check_json_type(column, JSON_INTEGER,
1484                          "\"select\" reply inc column")) {
1485         return false;
1486     }
1487     txn->inc_new_value = column->u.integer;
1488     return true;
1489 }
1490
1491 static bool
1492 ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
1493                                    const struct json_array *results)
1494 {
1495     struct ovsdb_error *error;
1496     struct json *json_uuid;
1497     union ovsdb_atom uuid;
1498     struct shash *reply;
1499
1500     if (insert->op_index >= results->n) {
1501         VLOG_WARN_RL(&syntax_rl, "reply does not contain enough operations "
1502                      "for insert (has %u, needs %u)",
1503                      results->n, insert->op_index);
1504         return false;
1505     }
1506
1507     /* We know that this is a JSON object because the loop in
1508      * ovsdb_idl_txn_process_reply() checked. */
1509     reply = json_object(results->elems[insert->op_index]);
1510     json_uuid = shash_find_data(reply, "uuid");
1511     if (!check_json_type(json_uuid, JSON_ARRAY, "\"insert\" reply \"uuid\"")) {
1512         return false;
1513     }
1514
1515     error = ovsdb_atom_from_json(&uuid, OVSDB_TYPE_UUID, json_uuid, NULL);
1516     if (error) {
1517         char *s = ovsdb_error_to_string(error);
1518         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
1519                      "UUID: %s", s);
1520         free(s);
1521         return false;
1522     }
1523
1524     insert->real = uuid.uuid;
1525
1526     return true;
1527 }
1528
1529 static bool
1530 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
1531                             const struct jsonrpc_msg *msg)
1532 {
1533     struct ovsdb_idl_txn *txn;
1534     enum ovsdb_idl_txn_status status;
1535
1536     txn = ovsdb_idl_txn_find(idl, msg->id);
1537     if (!txn) {
1538         return false;
1539     }
1540
1541     if (msg->type == JSONRPC_ERROR) {
1542         status = TXN_ERROR;
1543     } else if (msg->result->type != JSON_ARRAY) {
1544         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
1545         status = TXN_ERROR;
1546     } else {
1547         struct json_array *ops = &msg->result->u.array;
1548         int hard_errors = 0;
1549         int soft_errors = 0;
1550         size_t i;
1551
1552         for (i = 0; i < ops->n; i++) {
1553             struct json *op = ops->elems[i];
1554
1555             if (op->type == JSON_NULL) {
1556                 /* This isn't an error in itself but indicates that some prior
1557                  * operation failed, so make sure that we know about it. */
1558                 soft_errors++;
1559             } else if (op->type == JSON_OBJECT) {
1560                 struct json *error;
1561
1562                 error = shash_find_data(json_object(op), "error");
1563                 if (error) {
1564                     if (error->type == JSON_STRING) {
1565                         if (!strcmp(error->u.string, "timed out")) {
1566                             soft_errors++;
1567                         } else if (strcmp(error->u.string, "aborted")) {
1568                             hard_errors++;
1569                         }
1570                     } else {
1571                         hard_errors++;
1572                         VLOG_WARN_RL(&syntax_rl,
1573                                      "\"error\" in reply is not JSON string");
1574                     }
1575                 }
1576             } else {
1577                 hard_errors++;
1578                 VLOG_WARN_RL(&syntax_rl,
1579                              "operation reply is not JSON null or object");
1580             }
1581         }
1582
1583         if (!soft_errors && !hard_errors) {
1584             struct ovsdb_idl_txn_insert *insert;
1585
1586             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
1587                 hard_errors++;
1588             }
1589
1590             HMAP_FOR_EACH (insert, struct ovsdb_idl_txn_insert, hmap_node,
1591                            &txn->inserted_rows) {
1592                 if (!ovsdb_idl_txn_process_insert_reply(insert, ops)) {
1593                     hard_errors++;
1594                 }
1595             }
1596         }
1597
1598         status = (hard_errors ? TXN_ERROR
1599                   : soft_errors ? TXN_TRY_AGAIN
1600                   : TXN_SUCCESS);
1601     }
1602
1603     ovsdb_idl_txn_complete(txn, status);
1604     return true;
1605 }
1606
1607 struct ovsdb_idl_txn *
1608 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
1609 {
1610     struct ovsdb_idl_txn *txn = row->table->idl->txn;
1611     assert(txn != NULL);
1612     return txn;
1613 }