ovsdb-idl: Fix deletion of modified row.
[cascardo/ovs.git] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009 Nicira Networks.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-data.h"
29 #include "ovsdb-error.h"
30 #include "ovsdb-idl-provider.h"
31 #include "poll-loop.h"
32 #include "shash.h"
33 #include "util.h"
34
35 #define THIS_MODULE VLM_ovsdb_idl
36 #include "vlog.h"
37
38 /* An arc from one idl_row to another.  When row A contains a UUID that
39  * references row B, this is represented by an arc from A (the source) to B
40  * (the destination).
41  *
42  * Arcs from a row to itself are omitted, that is, src and dst are always
43  * different.
44  *
45  * Arcs are never duplicated, that is, even if there are multiple references
46  * from A to B, there is only a single arc from A to B.
47  *
48  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
49  * A.  Both an arc and its converse may both be present, if each row refers
50  * to the other circularly.
51  *
52  * The source and destination row may be in the same table or in different
53  * tables.
54  */
55 struct ovsdb_idl_arc {
56     struct list src_node;       /* In src->src_arcs list. */
57     struct list dst_node;       /* In dst->dst_arcs list. */
58     struct ovsdb_idl_row *src;  /* Source row. */
59     struct ovsdb_idl_row *dst;  /* Destination row. */
60 };
61
62 struct ovsdb_idl {
63     const struct ovsdb_idl_class *class;
64     struct jsonrpc_session *session;
65     struct shash table_by_name;
66     struct ovsdb_idl_table *tables;
67     struct json *monitor_request_id;
68     unsigned int last_monitor_request_seqno;
69     unsigned int change_seqno;
70
71     /* Transaction support. */
72     struct ovsdb_idl_txn *txn;
73     struct hmap outstanding_txns;
74 };
75
76 struct ovsdb_idl_txn {
77     struct hmap_node hmap_node;
78     struct json *request_id;
79     struct ovsdb_idl *idl;
80     struct hmap txn_rows;
81     enum ovsdb_idl_txn_status status;
82     bool dry_run;
83 };
84
85 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
86 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
87
88 static void ovsdb_idl_clear(struct ovsdb_idl *);
89 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
90 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
91 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
92                                                     const struct json *);
93 static void ovsdb_idl_process_update(struct ovsdb_idl_table *,
94                                      const struct uuid *,
95                                      const struct json *old,
96                                      const struct json *new);
97 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
98 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
99 static void ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
100
101 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
102 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
103     const struct ovsdb_idl_table_class *);
104 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
105                                                   const struct uuid *);
106 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
107
108 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
109 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
110
111 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
112 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
113                                         const struct jsonrpc_msg *msg);
114
115 struct ovsdb_idl *
116 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class)
117 {
118     struct ovsdb_idl *idl;
119     size_t i;
120
121     idl = xzalloc(sizeof *idl);
122     idl->class = class;
123     idl->session = jsonrpc_session_open(remote);
124     shash_init(&idl->table_by_name);
125     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
126     for (i = 0; i < class->n_tables; i++) {
127         const struct ovsdb_idl_table_class *tc = &class->tables[i];
128         struct ovsdb_idl_table *table = &idl->tables[i];
129         size_t j;
130
131         assert(!shash_find(&idl->table_by_name, tc->name));
132         shash_add(&idl->table_by_name, tc->name, table);
133         table->class = tc;
134         shash_init(&table->columns);
135         for (j = 0; j < tc->n_columns; j++) {
136             const struct ovsdb_idl_column *column = &tc->columns[j];
137
138             assert(!shash_find(&table->columns, column->name));
139             shash_add(&table->columns, column->name, column);
140         }
141         hmap_init(&table->rows);
142         table->idl = idl;
143     }
144     idl->last_monitor_request_seqno = UINT_MAX;
145     hmap_init(&idl->outstanding_txns);
146
147     return idl;
148 }
149
150 void
151 ovsdb_idl_destroy(struct ovsdb_idl *idl)
152 {
153     if (idl) {
154         size_t i;
155
156         assert(!idl->txn);
157         ovsdb_idl_clear(idl);
158         jsonrpc_session_close(idl->session);
159
160         for (i = 0; i < idl->class->n_tables; i++) {
161             struct ovsdb_idl_table *table = &idl->tables[i];
162             shash_destroy(&table->columns);
163             hmap_destroy(&table->rows);
164         }
165         shash_destroy(&idl->table_by_name);
166         free(idl->tables);
167         json_destroy(idl->monitor_request_id);
168         free(idl);
169     }
170 }
171
172 static void
173 ovsdb_idl_clear(struct ovsdb_idl *idl)
174 {
175     bool changed = false;
176     size_t i;
177
178     for (i = 0; i < idl->class->n_tables; i++) {
179         struct ovsdb_idl_table *table = &idl->tables[i];
180         struct ovsdb_idl_row *row, *next_row;
181
182         if (hmap_is_empty(&table->rows)) {
183             continue;
184         }
185
186         changed = true;
187         HMAP_FOR_EACH_SAFE (row, next_row, struct ovsdb_idl_row, hmap_node,
188                             &table->rows) {
189             struct ovsdb_idl_arc *arc, *next_arc;
190
191             if (!ovsdb_idl_row_is_orphan(row)) {
192                 (row->table->class->unparse)(row);
193                 ovsdb_idl_row_clear_old(row);
194             }
195             hmap_remove(&table->rows, &row->hmap_node);
196             LIST_FOR_EACH_SAFE (arc, next_arc, struct ovsdb_idl_arc, src_node,
197                                 &row->src_arcs) {
198                 free(arc);
199             }
200             /* No need to do anything with dst_arcs: some node has those arcs
201              * as forward arcs and will destroy them itself. */
202
203             free(row);
204         }
205     }
206
207     if (changed) {
208         idl->change_seqno++;
209     }
210 }
211
212 void
213 ovsdb_idl_run(struct ovsdb_idl *idl)
214 {
215     int i;
216
217     assert(!idl->txn);
218     jsonrpc_session_run(idl->session);
219     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
220         struct jsonrpc_msg *msg, *reply;
221         unsigned int seqno;
222
223         seqno = jsonrpc_session_get_seqno(idl->session);
224         if (idl->last_monitor_request_seqno != seqno) {
225             idl->last_monitor_request_seqno = seqno;
226             ovsdb_idl_txn_abort_all(idl);
227             ovsdb_idl_send_monitor_request(idl);
228             break;
229         }
230
231         msg = jsonrpc_session_recv(idl->session);
232         if (!msg) {
233             break;
234         }
235
236         reply = NULL;
237         if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
238             reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
239         } else if (msg->type == JSONRPC_NOTIFY
240                    && !strcmp(msg->method, "update")
241                    && msg->params->type == JSON_ARRAY
242                    && msg->params->u.array.n == 2
243                    && msg->params->u.array.elems[0]->type == JSON_NULL) {
244             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
245         } else if (msg->type == JSONRPC_REPLY
246                    && idl->monitor_request_id
247                    && json_equal(idl->monitor_request_id, msg->id)) {
248             json_destroy(idl->monitor_request_id);
249             idl->monitor_request_id = NULL;
250             ovsdb_idl_clear(idl);
251             ovsdb_idl_parse_update(idl, msg->result);
252         } else if (msg->type == JSONRPC_REPLY
253                    && msg->id && msg->id->type == JSON_STRING
254                    && !strcmp(msg->id->u.string, "echo")) {
255             /* It's a reply to our echo request.  Ignore it. */
256         } else if ((msg->type == JSONRPC_ERROR
257                     || msg->type == JSONRPC_REPLY)
258                    && ovsdb_idl_txn_process_reply(idl, msg)) {
259             /* ovsdb_idl_txn_process_reply() did everything needful. */
260         } else {
261             VLOG_WARN("%s: received unexpected %s message",
262                       jsonrpc_session_get_name(idl->session),
263                       jsonrpc_msg_type_to_string(msg->type));
264             jsonrpc_session_force_reconnect(idl->session);
265         }
266         if (reply) {
267             jsonrpc_session_send(idl->session, reply);
268         }
269         jsonrpc_msg_destroy(msg);
270     }
271 }
272
273 void
274 ovsdb_idl_wait(struct ovsdb_idl *idl)
275 {
276     jsonrpc_session_wait(idl->session);
277     jsonrpc_session_recv_wait(idl->session);
278 }
279
280 unsigned int
281 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
282 {
283     return idl->change_seqno;
284 }
285
286 void
287 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
288 {
289     jsonrpc_session_force_reconnect(idl->session);
290 }
291 \f
292 static void
293 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
294 {
295     struct json *monitor_requests;
296     struct jsonrpc_msg *msg;
297     size_t i;
298
299     monitor_requests = json_object_create();
300     for (i = 0; i < idl->class->n_tables; i++) {
301         const struct ovsdb_idl_table *table = &idl->tables[i];
302         const struct ovsdb_idl_table_class *tc = table->class;
303         struct json *monitor_request, *columns;
304         size_t i;
305
306         monitor_request = json_object_create();
307         columns = json_array_create_empty();
308         for (i = 0; i < tc->n_columns; i++) {
309             const struct ovsdb_idl_column *column = &tc->columns[i];
310             json_array_add(columns, json_string_create(column->name));
311         }
312         json_object_put(monitor_request, "columns", columns);
313         json_object_put(monitor_requests, tc->name, monitor_request);
314     }
315
316     json_destroy(idl->monitor_request_id);
317     msg = jsonrpc_create_request(
318         "monitor", json_array_create_2(json_null_create(), monitor_requests),
319         &idl->monitor_request_id);
320     jsonrpc_session_send(idl->session, msg);
321 }
322
323 static void
324 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates)
325 {
326     struct ovsdb_error *error;
327
328     idl->change_seqno++;
329
330     error = ovsdb_idl_parse_update__(idl, table_updates);
331     if (error) {
332         if (!VLOG_DROP_WARN(&syntax_rl)) {
333             char *s = ovsdb_error_to_string(error);
334             VLOG_WARN_RL(&syntax_rl, "%s", s);
335             free(s);
336         }
337         ovsdb_error_destroy(error);
338     }
339 }
340
341 static struct ovsdb_error *
342 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
343                          const struct json *table_updates)
344 {
345     const struct shash_node *tables_node;
346
347     if (table_updates->type != JSON_OBJECT) {
348         return ovsdb_syntax_error(table_updates, NULL,
349                                   "<table-updates> is not an object");
350     }
351     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
352         const struct json *table_update = tables_node->data;
353         const struct shash_node *table_node;
354         struct ovsdb_idl_table *table;
355
356         table = shash_find_data(&idl->table_by_name, tables_node->name);
357         if (!table) {
358             return ovsdb_syntax_error(
359                 table_updates, NULL,
360                 "<table-updates> includes unknown table \"%s\"",
361                 tables_node->name);
362         }
363
364         if (table_update->type != JSON_OBJECT) {
365             return ovsdb_syntax_error(table_update, NULL,
366                                       "<table-update> for table \"%s\" is "
367                                       "not an object", table->class->name);
368         }
369         SHASH_FOR_EACH (table_node, json_object(table_update)) {
370             const struct json *row_update = table_node->data;
371             const struct json *old_json, *new_json;
372             struct uuid uuid;
373
374             if (!uuid_from_string(&uuid, table_node->name)) {
375                 return ovsdb_syntax_error(table_update, NULL,
376                                           "<table-update> for table \"%s\" "
377                                           "contains bad UUID "
378                                           "\"%s\" as member name",
379                                           table->class->name,
380                                           table_node->name);
381             }
382             if (row_update->type != JSON_OBJECT) {
383                 return ovsdb_syntax_error(row_update, NULL,
384                                           "<table-update> for table \"%s\" "
385                                           "contains <row-update> for %s that "
386                                           "is not an object",
387                                           table->class->name,
388                                           table_node->name);
389             }
390
391             old_json = shash_find_data(json_object(row_update), "old");
392             new_json = shash_find_data(json_object(row_update), "new");
393             if (old_json && old_json->type != JSON_OBJECT) {
394                 return ovsdb_syntax_error(old_json, NULL,
395                                           "\"old\" <row> is not object");
396             } else if (new_json && new_json->type != JSON_OBJECT) {
397                 return ovsdb_syntax_error(new_json, NULL,
398                                           "\"new\" <row> is not object");
399             } else if ((old_json != NULL) + (new_json != NULL)
400                        != shash_count(json_object(row_update))) {
401                 return ovsdb_syntax_error(row_update, NULL,
402                                           "<row-update> contains unexpected "
403                                           "member");
404             } else if (!old_json && !new_json) {
405                 return ovsdb_syntax_error(row_update, NULL,
406                                           "<row-update> missing \"old\" "
407                                           "and \"new\" members");
408             }
409
410             ovsdb_idl_process_update(table, &uuid, old_json, new_json);
411         }
412     }
413
414     return NULL;
415 }
416
417 static struct ovsdb_idl_row *
418 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
419 {
420     struct ovsdb_idl_row *row;
421
422     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, hmap_node,
423                              uuid_hash(uuid), &table->rows) {
424         if (uuid_equals(&row->uuid, uuid)) {
425             return row;
426         }
427     }
428     return NULL;
429 }
430
431 static void
432 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
433                          const struct uuid *uuid, const struct json *old,
434                          const struct json *new)
435 {
436     struct ovsdb_idl_row *row;
437
438     row = ovsdb_idl_get_row(table, uuid);
439     if (!new) {
440         /* Delete row. */
441         if (row && !ovsdb_idl_row_is_orphan(row)) {
442             /* XXX perhaps we should check the 'old' values? */
443             ovsdb_idl_delete_row(row);
444         } else {
445             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
446                          "from table %s",
447                          UUID_ARGS(uuid), table->class->name);
448         }
449     } else if (!old) {
450         /* Insert row. */
451         if (!row) {
452             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
453         } else if (ovsdb_idl_row_is_orphan(row)) {
454             ovsdb_idl_insert_row(row, new);
455         } else {
456             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
457                          "table %s", UUID_ARGS(uuid), table->class->name);
458             ovsdb_idl_modify_row(row, new);
459         }
460     } else {
461         /* Modify row. */
462         if (row) {
463             /* XXX perhaps we should check the 'old' values? */
464             if (!ovsdb_idl_row_is_orphan(row)) {
465                 ovsdb_idl_modify_row(row, new);
466             } else {
467                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
468                              "referenced row "UUID_FMT" in table %s",
469                              UUID_ARGS(uuid), table->class->name);
470                 ovsdb_idl_insert_row(row, new);
471             }
472         } else {
473             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
474                          "in table %s", UUID_ARGS(uuid), table->class->name);
475             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
476         }
477     }
478 }
479
480 static void
481 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
482 {
483     struct ovsdb_idl_table *table = row->table;
484     struct shash_node *node;
485
486     SHASH_FOR_EACH (node, json_object(row_json)) {
487         const char *column_name = node->name;
488         const struct ovsdb_idl_column *column;
489         struct ovsdb_datum datum;
490         struct ovsdb_error *error;
491
492         column = shash_find_data(&table->columns, column_name);
493         if (!column) {
494             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
495                          column_name, UUID_ARGS(&row->uuid));
496             continue;
497         }
498
499         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
500         if (!error) {
501             ovsdb_datum_swap(&row->old[column - table->class->columns],
502                              &datum);
503             ovsdb_datum_destroy(&datum, &column->type);
504         } else {
505             char *s = ovsdb_error_to_string(error);
506             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
507                          " in table %s: %s", column_name,
508                          UUID_ARGS(&row->uuid), table->class->name, s);
509             free(s);
510             ovsdb_error_destroy(error);
511         }
512     }
513 }
514
515 static bool
516 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
517 {
518     return !row->old;
519 }
520
521 static void
522 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
523 {
524     assert(row->old == row->new);
525     if (!ovsdb_idl_row_is_orphan(row)) {
526         const struct ovsdb_idl_table_class *class = row->table->class;
527         size_t i;
528
529         for (i = 0; i < class->n_columns; i++) {
530             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
531         }
532         free(row->old);
533         row->old = row->new = NULL;
534     }
535 }
536
537 static void
538 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
539 {
540     if (row->old != row->new) {
541         if (row->new) {
542             const struct ovsdb_idl_table_class *class = row->table->class;
543             size_t i;
544
545             BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
546                 ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
547             }
548             free(row->new);
549             free(row->written);
550             row->written = NULL;
551         }
552         row->new = row->old;
553     }
554 }
555
556 static void
557 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
558 {
559     struct ovsdb_idl_arc *arc, *next;
560
561     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
562      * that this causes to be unreferenced. */
563     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, src_node,
564                         &row->src_arcs) {
565         list_remove(&arc->dst_node);
566         if (destroy_dsts
567             && ovsdb_idl_row_is_orphan(arc->dst)
568             && list_is_empty(&arc->dst->dst_arcs)) {
569             ovsdb_idl_row_destroy(arc->dst);
570         }
571         free(arc);
572     }
573     list_init(&row->src_arcs);
574 }
575
576 /* Force nodes that reference 'row' to reparse. */
577 static void
578 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row, bool destroy_dsts)
579 {
580     struct ovsdb_idl_arc *arc, *next;
581
582     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
583      * 'arc', so we need to use the "safe" variant of list traversal.  However,
584      * calling ref->table->class->parse will add an arc equivalent to 'arc' to
585      * row->arcs.  That could be a problem for traversal, but it adds it at the
586      * beginning of the list to prevent us from stumbling upon it again.
587      *
588      * (If duplicate arcs were possible then we would need to make sure that
589      * 'next' didn't also point into 'arc''s destination, but we forbid
590      * duplicate arcs.) */
591     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, dst_node,
592                         &row->dst_arcs) {
593         struct ovsdb_idl_row *ref = arc->src;
594
595         (ref->table->class->unparse)(ref);
596         ovsdb_idl_row_clear_arcs(ref, destroy_dsts);
597         (ref->table->class->parse)(ref);
598     }
599 }
600
601 static struct ovsdb_idl_row *
602 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
603 {
604     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
605     memset(row, 0, sizeof *row);
606     list_init(&row->src_arcs);
607     list_init(&row->dst_arcs);
608     hmap_node_nullify(&row->txn_node);
609     return row;
610 }
611
612 static struct ovsdb_idl_row *
613 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
614 {
615     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
616     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
617     row->uuid = *uuid;
618     row->table = table;
619     return row;
620 }
621
622 static void
623 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
624 {
625     if (row) {
626         ovsdb_idl_row_clear_old(row);
627         hmap_remove(&row->table->rows, &row->hmap_node);
628         free(row);
629     }
630 }
631
632 static void
633 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
634 {
635     const struct ovsdb_idl_table_class *class = row->table->class;
636     size_t i;
637
638     assert(!row->old && !row->new);
639     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
640     for (i = 0; i < class->n_columns; i++) {
641         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
642     }
643     ovsdb_idl_row_update(row, row_json);
644     (class->parse)(row);
645
646     ovsdb_idl_row_reparse_backrefs(row, false);
647 }
648
649 static void
650 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
651 {
652     (row->table->class->unparse)(row);
653     ovsdb_idl_row_clear_arcs(row, true);
654     ovsdb_idl_row_clear_old(row);
655     if (list_is_empty(&row->dst_arcs)) {
656         ovsdb_idl_row_destroy(row);
657     } else {
658         ovsdb_idl_row_reparse_backrefs(row, true);
659     }
660 }
661
662 static void
663 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
664 {
665     (row->table->class->unparse)(row);
666     ovsdb_idl_row_clear_arcs(row, true);
667     ovsdb_idl_row_update(row, row_json);
668     (row->table->class->parse)(row);
669 }
670
671 static bool
672 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
673 {
674     const struct ovsdb_idl_arc *arc;
675
676     /* No self-arcs. */
677     if (src == dst) {
678         return false;
679     }
680
681     /* No duplicate arcs.
682      *
683      * We only need to test whether the first arc in dst->dst_arcs originates
684      * at 'src', since we add all of the arcs from a given source in a clump
685      * (in a single call to a row's ->parse function) and new arcs are always
686      * added at the front of the dst_arcs list. */
687     if (list_is_empty(&dst->dst_arcs)) {
688         return true;
689     }
690     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
691     return arc->src != src;
692 }
693
694 static struct ovsdb_idl_table *
695 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
696                            const struct ovsdb_idl_table_class *table_class)
697 {
698     return &idl->tables[table_class - idl->class->tables];
699 }
700
701 struct ovsdb_idl_row *
702 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
703                       struct ovsdb_idl_table_class *dst_table_class,
704                       const struct uuid *dst_uuid)
705 {
706     struct ovsdb_idl *idl = src->table->idl;
707     struct ovsdb_idl_table *dst_table;
708     struct ovsdb_idl_arc *arc;
709     struct ovsdb_idl_row *dst;
710
711     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
712     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
713     if (!dst) {
714         dst = ovsdb_idl_row_create(dst_table, dst_uuid);
715     }
716
717     /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
718     if (may_add_arc(src, dst)) {
719         /* The arc *must* be added at the front of the dst_arcs list.  See
720          * ovsdb_idl_row_reparse_backrefs() for details. */
721         arc = xmalloc(sizeof *arc);
722         list_push_front(&src->src_arcs, &arc->src_node);
723         list_push_front(&dst->dst_arcs, &arc->dst_node);
724         arc->src = src;
725         arc->dst = dst;
726     }
727
728     return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
729 }
730
731 static struct ovsdb_idl_row *
732 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
733 {
734     for (; node; node = hmap_next(&table->rows, node)) {
735         struct ovsdb_idl_row *row;
736
737         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
738         if (!ovsdb_idl_row_is_orphan(row)) {
739             return row;
740         }
741     }
742     return NULL;
743 }
744
745 struct ovsdb_idl_row *
746 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
747                     const struct ovsdb_idl_table_class *table_class)
748 {
749     struct ovsdb_idl_table *table
750         = ovsdb_idl_table_from_class(idl, table_class);
751     return next_real_row(table, hmap_first(&table->rows));
752 }
753
754 struct ovsdb_idl_row *
755 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
756 {
757     struct ovsdb_idl_table *table = row->table;
758
759     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
760 }
761 \f
762 /* Transactions. */
763
764 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
765                                    enum ovsdb_idl_txn_status);
766
767 const char *
768 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
769 {
770     switch (status) {
771     case TXN_INCOMPLETE:
772         return "incomplete";
773     case TXN_ABORTED:
774         return "aborted";
775     case TXN_SUCCESS:
776         return "success";
777     case TXN_TRY_AGAIN:
778         return "try again";
779     case TXN_ERROR:
780         return "error";
781     }
782     return "<unknown>";
783 }
784
785 struct ovsdb_idl_txn *
786 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
787 {
788     struct ovsdb_idl_txn *txn;
789
790     assert(!idl->txn);
791     idl->txn = txn = xmalloc(sizeof *txn);
792     txn->idl = idl;
793     txn->status = TXN_INCOMPLETE;
794     hmap_init(&txn->txn_rows);
795     txn->dry_run = false;
796     return txn;
797 }
798
799 void
800 ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
801 {
802     txn->dry_run = true;
803 }
804
805 void
806 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
807 {
808     ovsdb_idl_txn_abort(txn);
809     free(txn);
810 }
811
812 void
813 ovsdb_idl_txn_wait(const struct ovsdb_idl_txn *txn)
814 {
815     if (txn->status != TXN_INCOMPLETE) {
816         poll_immediate_wake();
817     }
818 }
819
820 static struct json *
821 where_uuid_equals(const struct uuid *uuid)
822 {
823     return
824         json_array_create_1(
825             json_array_create_3(
826                 json_string_create("_uuid"),
827                 json_string_create("=="),
828                 json_array_create_2(
829                     json_string_create("uuid"),
830                     json_string_create_nocopy(
831                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
832 }
833
834 static char *
835 uuid_name_from_uuid(const struct uuid *uuid)
836 {
837     char *name;
838     char *p;
839
840     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
841     for (p = name; *p != '\0'; p++) {
842         if (*p == '-') {
843             *p = '_';
844         }
845     }
846
847     return name;
848 }
849
850 static const struct ovsdb_idl_row *
851 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
852 {
853     const struct ovsdb_idl_row *row;
854
855     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, txn_node,
856                              uuid_hash(uuid), &txn->txn_rows) {
857         if (uuid_equals(&row->uuid, uuid)) {
858             return row;
859         }
860     }
861     return NULL;
862 }
863
864 /* XXX there must be a cleaner way to do this */
865 static struct json *
866 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
867 {
868     if (json->type == JSON_ARRAY) {
869         struct uuid uuid;
870         size_t i;
871
872         if (json->u.array.n == 2
873             && json->u.array.elems[0]->type == JSON_STRING
874             && json->u.array.elems[1]->type == JSON_STRING
875             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
876             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
877             const struct ovsdb_idl_row *row;
878
879             row = ovsdb_idl_txn_get_row(txn, &uuid);
880             if (row && !row->old && row->new) {
881                 json_destroy(json);
882
883                 return json_array_create_2(
884                     json_string_create("named-uuid"),
885                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
886             }
887         }
888
889         for (i = 0; i < json->u.array.n; i++) {
890             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
891                                                       txn);
892         }
893     } else if (json->type == JSON_OBJECT) {
894         struct shash_node *node;
895
896         SHASH_FOR_EACH (node, json_object(json)) {
897             node->data = substitute_uuids(node->data, txn);
898         }
899     }
900     return json;
901 }
902
903 static void
904 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
905 {
906     struct ovsdb_idl_row *row, *next;
907
908     HMAP_FOR_EACH_SAFE (row, next, struct ovsdb_idl_row, txn_node,
909                         &txn->txn_rows) {
910         if (row->old && row->written) {
911             (row->table->class->unparse)(row);
912             ovsdb_idl_row_clear_arcs(row, false);
913             (row->table->class->parse)(row);
914         }
915         ovsdb_idl_row_clear_new(row);
916
917         free(row->prereqs);
918         row->prereqs = NULL;
919
920         free(row->written);
921         row->written = NULL;
922
923         hmap_remove(&txn->txn_rows, &row->txn_node);
924         hmap_node_nullify(&row->txn_node);
925     }
926     hmap_destroy(&txn->txn_rows);
927     hmap_init(&txn->txn_rows);
928 }
929
930 enum ovsdb_idl_txn_status
931 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
932 {
933     struct ovsdb_idl_row *row;
934     struct json *operations;
935     bool any_updates;
936
937     if (txn != txn->idl->txn) {
938         return txn->status;
939     }
940
941     operations = json_array_create_empty();
942
943     /* Add prerequisites and declarations of new rows. */
944     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
945         /* XXX check that deleted rows exist even if no prereqs? */
946         if (row->prereqs) {
947             const struct ovsdb_idl_table_class *class = row->table->class;
948             size_t n_columns = class->n_columns;
949             struct json *op, *columns, *row_json;
950             size_t idx;
951
952             op = json_object_create();
953             json_array_add(operations, op);
954             json_object_put_string(op, "op", "wait");
955             json_object_put_string(op, "table", class->name);
956             json_object_put(op, "timeout", json_integer_create(0));
957             json_object_put(op, "where", where_uuid_equals(&row->uuid));
958             json_object_put_string(op, "until", "==");
959             columns = json_array_create_empty();
960             json_object_put(op, "columns", columns);
961             row_json = json_object_create();
962             json_object_put(op, "rows", json_array_create_1(row_json));
963
964             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
965                 const struct ovsdb_idl_column *column = &class->columns[idx];
966                 json_array_add(columns, json_string_create(column->name));
967                 json_object_put(row_json, column->name,
968                                 ovsdb_datum_to_json(&row->old[idx],
969                                                     &column->type));
970             }
971         }
972         if (row->new && !row->old) {
973             struct json *op;
974
975             op = json_object_create();
976             json_array_add(operations, op);
977             json_object_put_string(op, "op", "declare");
978             json_object_put(op, "uuid-name",
979                             json_string_create_nocopy(
980                                 uuid_name_from_uuid(&row->uuid)));
981         }
982     }
983
984     /* Add updates. */
985     any_updates = false;
986     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
987         const struct ovsdb_idl_table_class *class = row->table->class;
988
989         if (row->old == row->new) {
990             continue;
991         } else if (!row->new) {
992             struct json *op = json_object_create();
993             json_object_put_string(op, "op", "delete");
994             json_object_put_string(op, "table", class->name);
995             json_object_put(op, "where", where_uuid_equals(&row->uuid));
996             json_array_add(operations, op);
997             any_updates = true;
998         } else {
999             struct json *row_json;
1000             struct json *op;
1001             size_t idx;
1002
1003             op = json_object_create();
1004             json_object_put_string(op, "op", row->old ? "update" : "insert");
1005             json_object_put_string(op, "table", class->name);
1006             if (row->old) {
1007                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
1008             } else {
1009                 json_object_put(op, "uuid-name",
1010                                 json_string_create_nocopy(
1011                                     uuid_name_from_uuid(&row->uuid)));
1012             }
1013             row_json = json_object_create();
1014             json_object_put(op, "row", row_json);
1015
1016             BITMAP_FOR_EACH_1 (idx, class->n_columns, row->written) {
1017                 const struct ovsdb_idl_column *column = &class->columns[idx];
1018
1019                 if (!row->old || !ovsdb_datum_equals(&row->old[idx],
1020                                                      &row->new[idx],
1021                                                      &column->type)) {
1022                     json_object_put(row_json, column->name,
1023                                     substitute_uuids(
1024                                         ovsdb_datum_to_json(&row->new[idx],
1025                                                             &column->type),
1026                                         txn));
1027                 }
1028             }
1029
1030             if (!row->old || !shash_is_empty(json_object(row_json))) {
1031                 json_array_add(operations, op);
1032                 any_updates = true;
1033             } else {
1034                 json_destroy(op);
1035             }
1036         }
1037     }
1038
1039     if (txn->dry_run) {
1040         struct json *op = json_object_create();
1041         json_object_put_string(op, "op", "abort");
1042         json_array_add(operations, op);
1043     }
1044
1045     if (!any_updates) {
1046         txn->status = TXN_SUCCESS;
1047     } else if (!jsonrpc_session_send(
1048                    txn->idl->session,
1049                    jsonrpc_create_request(
1050                        "transact", operations, &txn->request_id))) {
1051         hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
1052                     json_hash(txn->request_id, 0));
1053     } else {
1054         txn->status = TXN_INCOMPLETE;
1055     }
1056
1057     txn->idl->txn = NULL;
1058     ovsdb_idl_txn_disassemble(txn);
1059     return txn->status;
1060 }
1061
1062 void
1063 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
1064 {
1065     ovsdb_idl_txn_disassemble(txn);
1066     if (txn->status == TXN_INCOMPLETE) {
1067         txn->status = TXN_ABORTED;
1068     }
1069 }
1070
1071 static void
1072 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1073                        enum ovsdb_idl_txn_status status)
1074 {
1075     txn->status = status;
1076     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1077 }
1078
1079 void
1080 ovsdb_idl_txn_write(struct ovsdb_idl_row *row,
1081                     const struct ovsdb_idl_column *column,
1082                     struct ovsdb_datum *datum)
1083 {
1084     const struct ovsdb_idl_table_class *class = row->table->class;
1085     size_t column_idx = column - class->columns;
1086
1087     assert(row->new);
1088     if (hmap_node_is_null(&row->txn_node)) {
1089         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1090                     uuid_hash(&row->uuid));
1091     }
1092     if (row->old == row->new) {
1093         row->new = xmalloc(class->n_columns * sizeof *row->new);
1094     }
1095     if (!row->written) {
1096         row->written = bitmap_allocate(class->n_columns);
1097     }
1098     if (bitmap_is_set(row->written, column_idx)) {
1099         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
1100     } else {
1101         bitmap_set1(row->written, column_idx);
1102     }
1103     row->new[column_idx] = *datum;
1104 }
1105
1106 void
1107 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
1108                      const struct ovsdb_idl_column *column)
1109 {
1110     struct ovsdb_idl_row *row = (struct ovsdb_idl_row *) row_;
1111     const struct ovsdb_idl_table_class *class = row->table->class;
1112     size_t column_idx = column - class->columns;
1113
1114     assert(row->new);
1115     if (!row->old
1116         || (row->written && bitmap_is_set(row->written, column_idx))) {
1117         return;
1118     }
1119
1120     if (hmap_node_is_null(&row->txn_node)) {
1121         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1122                     uuid_hash(&row->uuid));
1123     }
1124     if (!row->prereqs) {
1125         row->prereqs = bitmap_allocate(class->n_columns);
1126     }
1127     bitmap_set1(row->prereqs, column_idx);
1128 }
1129
1130 void
1131 ovsdb_idl_txn_delete(struct ovsdb_idl_row *row)
1132 {
1133     assert(row->new);
1134     if (!row->old) {
1135         ovsdb_idl_row_clear_new(row);
1136         assert(!row->prereqs);
1137         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
1138         free(row);
1139     }
1140     if (hmap_node_is_null(&row->txn_node)) {
1141         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1142                     uuid_hash(&row->uuid));
1143     }
1144     ovsdb_idl_row_clear_new(row);
1145     row->new = NULL;
1146 }
1147
1148 struct ovsdb_idl_row *
1149 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
1150                      const struct ovsdb_idl_table_class *class)
1151 {
1152     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
1153     uuid_generate(&row->uuid);
1154     row->table = ovsdb_idl_table_from_class(txn->idl, class);
1155     row->new = xmalloc(class->n_columns * sizeof *row->new);
1156     row->written = bitmap_allocate(class->n_columns);
1157     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
1158     return row;
1159 }
1160
1161 static void
1162 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
1163 {
1164     struct ovsdb_idl_txn *txn;
1165
1166     HMAP_FOR_EACH (txn, struct ovsdb_idl_txn, hmap_node,
1167                    &idl->outstanding_txns) {
1168         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
1169     }
1170 }
1171
1172 static struct ovsdb_idl_txn *
1173 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
1174 {
1175     struct ovsdb_idl_txn *txn;
1176
1177     HMAP_FOR_EACH_WITH_HASH (txn, struct ovsdb_idl_txn, hmap_node,
1178                              json_hash(id, 0), &idl->outstanding_txns) {
1179         if (json_equal(id, txn->request_id)) {
1180             return txn;
1181         }
1182     }
1183     return NULL;
1184 }
1185
1186 static bool
1187 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
1188                             const struct jsonrpc_msg *msg)
1189 {
1190     struct ovsdb_idl_txn *txn;
1191     enum ovsdb_idl_txn_status status;
1192
1193     txn = ovsdb_idl_txn_find(idl, msg->id);
1194     if (!txn) {
1195         return false;
1196     }
1197
1198     if (msg->type == JSONRPC_ERROR) {
1199         status = TXN_ERROR;
1200     } else if (msg->result->type != JSON_ARRAY) {
1201         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
1202         status = TXN_ERROR;
1203     } else {
1204         int hard_errors = 0;
1205         int soft_errors = 0;
1206         size_t i;
1207
1208         for (i = 0; i < msg->result->u.array.n; i++) {
1209             struct json *json = msg->result->u.array.elems[i];
1210
1211             if (json->type == JSON_NULL) {
1212                 /* This isn't an error in itself but indicates that some prior
1213                  * operation failed, so make sure that we know about it. */
1214                 soft_errors++;
1215             } else if (json->type == JSON_OBJECT) {
1216                 struct json *error;
1217
1218                 error = shash_find_data(json_object(json), "error");
1219                 if (error) {
1220                     if (error->type == JSON_STRING) {
1221                         if (!strcmp(error->u.string, "timed out")) {
1222                             soft_errors++;
1223                         } else if (strcmp(error->u.string, "aborted")) {
1224                             hard_errors++;
1225                         }
1226                     } else {
1227                         hard_errors++;
1228                         VLOG_WARN_RL(&syntax_rl,
1229                                      "\"error\" in reply is not JSON string");
1230                     }
1231                 }
1232             } else {
1233                 hard_errors++;
1234                 VLOG_WARN_RL(&syntax_rl,
1235                              "operation reply is not JSON null or object");
1236             }
1237         }
1238
1239         status = (hard_errors ? TXN_ERROR
1240                   : soft_errors ? TXN_TRY_AGAIN
1241                   : TXN_SUCCESS);
1242     }
1243
1244     ovsdb_idl_txn_complete(txn, status);
1245     return true;
1246 }
1247
1248 struct ovsdb_idl_txn *
1249 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
1250 {
1251     struct ovsdb_idl_txn *txn = row->table->idl->txn;
1252     assert(txn != NULL);
1253     return txn;
1254 }