ovsdb: Provide a way for for_each_txn_row() callback to delete any row.
[cascardo/ovs.git] / ovsdb / transaction.c
1 /* Copyright (c) 2009, 2010, 2011 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "transaction.h"
19
20 #include <assert.h>
21
22 #include "bitmap.h"
23 #include "dynamic-string.h"
24 #include "hash.h"
25 #include "hmap.h"
26 #include "json.h"
27 #include "list.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb.h"
30 #include "row.h"
31 #include "table.h"
32 #include "uuid.h"
33
34 struct ovsdb_txn {
35     struct ovsdb *db;
36     struct list txn_tables;     /* Contains "struct ovsdb_txn_table"s. */
37     struct ds comment;
38 };
39
40 /* A table modified by a transaction. */
41 struct ovsdb_txn_table {
42     struct list node;           /* Element in ovsdb_txn's txn_tables list. */
43     struct ovsdb_table *table;
44     struct hmap txn_rows;       /* Contains "struct ovsdb_txn_row"s. */
45
46     /* Used by for_each_txn_row(). */
47     unsigned int serial;        /* Serial number of in-progress iteration. */
48     unsigned int n_processed;   /* Number of rows processed. */
49 };
50
51 /* A row modified by the transaction:
52  *
53  *      - A row added by a transaction will have null 'old' and non-null 'new'.
54  *
55  *      - A row deleted by a transaction will have non-null 'old' and null
56  *        'new'.
57  *
58  *      - A row modified by a transaction will have non-null 'old' and 'new'.
59  *
60  *      - 'old' and 'new' both null indicates that a row was added then deleted
61  *        within a single transaction.  Most of the time we instead delete the
62  *        ovsdb_txn_row entirely, but inside a for_each_txn_row() callback
63  *        there are restrictions that sometimes mean we have to leave the
64  *        ovsdb_txn_row in place.
65  */
66 struct ovsdb_txn_row {
67     struct hmap_node hmap_node; /* In ovsdb_txn_table's txn_rows hmap. */
68     struct ovsdb_row *old;      /* The old row. */
69     struct ovsdb_row *new;      /* The new row. */
70     size_t n_refs;              /* Number of remaining references. */
71
72     /* These members are the same as the corresponding members of 'old' or
73      * 'new'.  They are present here for convenience and because occasionally
74      * there can be an ovsdb_txn_row where both 'old' and 'new' are NULL. */
75     struct uuid uuid;
76     struct ovsdb_table *table;
77
78     /* Used by for_each_txn_row(). */
79     unsigned int serial;        /* Serial number of in-progress commit. */
80
81     unsigned long changed[];    /* Bits set to 1 for columns that changed. */
82 };
83
84 static void ovsdb_txn_row_prefree(struct ovsdb_txn_row *);
85 static struct ovsdb_error * WARN_UNUSED_RESULT
86 for_each_txn_row(struct ovsdb_txn *txn,
87                       struct ovsdb_error *(*)(struct ovsdb_txn *,
88                                               struct ovsdb_txn_row *));
89
90 /* Used by for_each_txn_row() to track tables and rows that have been
91  * processed.  */
92 static unsigned int serial;
93
94 struct ovsdb_txn *
95 ovsdb_txn_create(struct ovsdb *db)
96 {
97     struct ovsdb_txn *txn = xmalloc(sizeof *txn);
98     txn->db = db;
99     list_init(&txn->txn_tables);
100     ds_init(&txn->comment);
101     return txn;
102 }
103
104 static void
105 ovsdb_txn_free(struct ovsdb_txn *txn)
106 {
107     assert(list_is_empty(&txn->txn_tables));
108     ds_destroy(&txn->comment);
109     free(txn);
110 }
111
112 static struct ovsdb_error *
113 ovsdb_txn_row_abort(struct ovsdb_txn *txn OVS_UNUSED,
114                     struct ovsdb_txn_row *txn_row)
115 {
116     struct ovsdb_row *old = txn_row->old;
117     struct ovsdb_row *new = txn_row->new;
118
119     ovsdb_txn_row_prefree(txn_row);
120     if (!old) {
121         if (new) {
122             hmap_remove(&new->table->rows, &new->hmap_node);
123         }
124     } else if (!new) {
125         hmap_insert(&old->table->rows, &old->hmap_node, ovsdb_row_hash(old));
126     } else {
127         hmap_replace(&new->table->rows, &new->hmap_node, &old->hmap_node);
128     }
129     ovsdb_row_destroy(new);
130     free(txn_row);
131
132     return NULL;
133 }
134
135 void
136 ovsdb_txn_abort(struct ovsdb_txn *txn)
137 {
138     ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_abort));
139     ovsdb_txn_free(txn);
140 }
141
142 static struct ovsdb_txn_row *
143 find_txn_row(const struct ovsdb_table *table, const struct uuid *uuid)
144 {
145     struct ovsdb_txn_row *txn_row;
146
147     if (!table->txn_table) {
148         return NULL;
149     }
150
151     HMAP_FOR_EACH_WITH_HASH (txn_row, hmap_node,
152                              uuid_hash(uuid), &table->txn_table->txn_rows) {
153         if (uuid_equals(uuid, &txn_row->uuid)) {
154             return txn_row;
155         }
156     }
157
158     return NULL;
159 }
160
161 static struct ovsdb_error * WARN_UNUSED_RESULT
162 ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
163                            const struct ovsdb_column *c,
164                            const struct ovsdb_base_type *base,
165                            const union ovsdb_atom *atoms, unsigned int n,
166                            int delta)
167 {
168     const struct ovsdb_table *table;
169     unsigned int i;
170
171     if (!ovsdb_base_type_is_strong_ref(base)) {
172         return NULL;
173     }
174
175     table = base->u.uuid.refTable;
176     for (i = 0; i < n; i++) {
177         const struct uuid *uuid = &atoms[i].uuid;
178         struct ovsdb_txn_row *txn_row = find_txn_row(table, uuid);
179         if (!txn_row) {
180             const struct ovsdb_row *row = ovsdb_table_get_row(table, uuid);
181             if (row) {
182                 txn_row = ovsdb_txn_row_modify(txn, row)->txn_row;
183             } else {
184                 return ovsdb_error("referential integrity violation",
185                                    "Table %s column %s row "UUID_FMT" "
186                                    "references nonexistent row "UUID_FMT" in "
187                                    "table %s.",
188                                    r->table->schema->name, c->name,
189                                    UUID_ARGS(ovsdb_row_get_uuid(r)),
190                                    UUID_ARGS(uuid), table->schema->name);
191             }
192         }
193         txn_row->n_refs += delta;
194     }
195
196     return NULL;
197 }
198
199 static struct ovsdb_error * WARN_UNUSED_RESULT
200 ovsdb_txn_adjust_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
201                           const struct ovsdb_column *column, int delta)
202 {
203     const struct ovsdb_datum *field = &r->fields[column->index];
204     struct ovsdb_error *error;
205
206     error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.key,
207                                        field->keys, field->n, delta);
208     if (!error) {
209         error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.value,
210                                            field->values, field->n, delta);
211     }
212     return error;
213 }
214
215 static struct ovsdb_error * WARN_UNUSED_RESULT
216 update_row_ref_count(struct ovsdb_txn *txn, struct ovsdb_txn_row *r)
217 {
218     struct ovsdb_table *table = r->table;
219     struct shash_node *node;
220
221     SHASH_FOR_EACH (node, &table->schema->columns) {
222         const struct ovsdb_column *column = node->data;
223         struct ovsdb_error *error;
224
225         if (r->old) {
226             error = ovsdb_txn_adjust_row_refs(txn, r->old, column, -1);
227             if (error) {
228                 return OVSDB_WRAP_BUG("error decreasing refcount", error);
229             }
230         }
231         if (r->new) {
232             error = ovsdb_txn_adjust_row_refs(txn, r->new, column, 1);
233             if (error) {
234                 return error;
235             }
236         }
237     }
238
239     return NULL;
240 }
241
242 static struct ovsdb_error * WARN_UNUSED_RESULT
243 check_ref_count(struct ovsdb_txn *txn OVS_UNUSED, struct ovsdb_txn_row *r)
244 {
245     if (r->new || !r->n_refs) {
246         return NULL;
247     } else {
248         return ovsdb_error("referential integrity violation",
249                            "cannot delete %s row "UUID_FMT" because "
250                            "of %zu remaining reference(s)",
251                            r->table->schema->name, UUID_ARGS(&r->uuid),
252                            r->n_refs);
253     }
254 }
255
256 static struct ovsdb_error * WARN_UNUSED_RESULT
257 update_ref_counts(struct ovsdb_txn *txn)
258 {
259     struct ovsdb_error *error;
260
261     error = for_each_txn_row(txn, update_row_ref_count);
262     if (error) {
263         return error;
264     }
265
266     return for_each_txn_row(txn, check_ref_count);
267 }
268
269 static struct ovsdb_error *
270 ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
271                      struct ovsdb_txn_row *txn_row)
272 {
273     ovsdb_txn_row_prefree(txn_row);
274     if (txn_row->new) {
275         txn_row->new->n_refs = txn_row->n_refs;
276     }
277     ovsdb_row_destroy(txn_row->old);
278     free(txn_row);
279
280     return NULL;
281 }
282
283 static void
284 add_weak_ref(struct ovsdb_txn *txn,
285              const struct ovsdb_row *src_, const struct ovsdb_row *dst_)
286 {
287     struct ovsdb_row *src = (struct ovsdb_row *) src_;
288     struct ovsdb_row *dst = (struct ovsdb_row *) dst_;
289     struct ovsdb_weak_ref *weak;
290
291     if (src == dst) {
292         return;
293     }
294
295     dst = ovsdb_txn_row_modify(txn, dst);
296
297     if (!list_is_empty(&dst->dst_refs)) {
298         /* Omit duplicates. */
299         weak = CONTAINER_OF(list_back(&dst->dst_refs),
300                             struct ovsdb_weak_ref, dst_node);
301         if (weak->src == src) {
302             return;
303         }
304     }
305
306     weak = xmalloc(sizeof *weak);
307     weak->src = src;
308     list_push_back(&dst->dst_refs, &weak->dst_node);
309     list_push_back(&src->src_refs, &weak->src_node);
310 }
311
312 static struct ovsdb_error * WARN_UNUSED_RESULT
313 assess_weak_refs(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
314 {
315     struct ovsdb_table *table;
316     struct shash_node *node;
317
318     if (txn_row->old) {
319         /* Mark rows that have weak references to 'txn_row' as modified, so
320          * that their weak references will get reassessed. */
321         struct ovsdb_weak_ref *weak, *next;
322
323         LIST_FOR_EACH_SAFE (weak, next, dst_node, &txn_row->old->dst_refs) {
324             if (!weak->src->txn_row) {
325                 ovsdb_txn_row_modify(txn, weak->src);
326             }
327         }
328     }
329
330     if (!txn_row->new) {
331         /* We don't have to do anything about references that originate at
332          * 'txn_row', because ovsdb_row_destroy() will remove those weak
333          * references. */
334         return NULL;
335     }
336
337     table = txn_row->table;
338     SHASH_FOR_EACH (node, &table->schema->columns) {
339         const struct ovsdb_column *column = node->data;
340         struct ovsdb_datum *datum = &txn_row->new->fields[column->index];
341         unsigned int orig_n, i;
342         bool zero = false;
343
344         orig_n = datum->n;
345
346         if (ovsdb_base_type_is_weak_ref(&column->type.key)) {
347             for (i = 0; i < datum->n; ) {
348                 const struct ovsdb_row *row;
349
350                 row = ovsdb_table_get_row(column->type.key.u.uuid.refTable,
351                                           &datum->keys[i].uuid);
352                 if (row) {
353                     add_weak_ref(txn, txn_row->new, row);
354                     i++;
355                 } else {
356                     if (uuid_is_zero(&datum->keys[i].uuid)) {
357                         zero = true;
358                     }
359                     ovsdb_datum_remove_unsafe(datum, i, &column->type);
360                 }
361             }
362         }
363
364         if (ovsdb_base_type_is_weak_ref(&column->type.value)) {
365             for (i = 0; i < datum->n; ) {
366                 const struct ovsdb_row *row;
367
368                 row = ovsdb_table_get_row(column->type.value.u.uuid.refTable,
369                                           &datum->values[i].uuid);
370                 if (row) {
371                     add_weak_ref(txn, txn_row->new, row);
372                     i++;
373                 } else {
374                     if (uuid_is_zero(&datum->values[i].uuid)) {
375                         zero = true;
376                     }
377                     ovsdb_datum_remove_unsafe(datum, i, &column->type);
378                 }
379             }
380         }
381
382         if (datum->n != orig_n) {
383             bitmap_set1(txn_row->changed, column->index);
384             ovsdb_datum_sort_assert(datum, column->type.key.type);
385             if (datum->n < column->type.n_min) {
386                 const struct uuid *row_uuid = ovsdb_row_get_uuid(txn_row->new);
387                 if (zero && !txn_row->old) {
388                     return ovsdb_error(
389                         "constraint violation",
390                         "Weak reference column \"%s\" in \"%s\" row "UUID_FMT
391                         " (inserted within this transaction) contained "
392                         "all-zeros UUID (probably as the default value for "
393                         "this column) but deleting this value caused a "
394                         "constraint volation because this column is not "
395                         "allowed to be empty.", column->name,
396                         table->schema->name, UUID_ARGS(row_uuid));
397                 } else {
398                     return ovsdb_error(
399                         "constraint violation",
400                         "Deletion of %u weak reference(s) to deleted (or "
401                         "never-existing) rows from column \"%s\" in \"%s\" "
402                         "row "UUID_FMT" %scaused this column to become empty, "
403                         "but constraints on this column disallow an "
404                         "empty column.",
405                         orig_n - datum->n, column->name, table->schema->name,
406                         UUID_ARGS(row_uuid),
407                         (txn_row->old
408                          ? ""
409                          : "(inserted within this transaction) "));
410                 }
411             }
412         }
413     }
414
415     return NULL;
416 }
417
418 static struct ovsdb_error * WARN_UNUSED_RESULT
419 determine_changes(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
420 {
421     struct ovsdb_table *table = txn_row->table;
422
423     if (txn_row->old && txn_row->new) {
424         struct shash_node *node;
425         bool changed = false;
426
427         SHASH_FOR_EACH (node, &table->schema->columns) {
428             const struct ovsdb_column *column = node->data;
429             const struct ovsdb_type *type = &column->type;
430             unsigned int idx = column->index;
431
432             if (!ovsdb_datum_equals(&txn_row->old->fields[idx],
433                                     &txn_row->new->fields[idx],
434                                     type)) {
435                 bitmap_set1(txn_row->changed, idx);
436                 changed = true;
437             }
438         }
439
440         if (!changed) {
441             /* Nothing actually changed in this row, so drop it. */
442             ovsdb_txn_row_abort(txn, txn_row);
443         }
444     } else {
445         bitmap_set_multiple(txn_row->changed, 0,
446                             shash_count(&table->schema->columns), 1);
447     }
448
449     return NULL;
450 }
451
452 static struct ovsdb_error * WARN_UNUSED_RESULT
453 check_max_rows(struct ovsdb_txn *txn)
454 {
455     struct ovsdb_txn_table *t;
456
457     LIST_FOR_EACH (t, node, &txn->txn_tables) {
458         size_t n_rows = hmap_count(&t->table->rows);
459         unsigned int max_rows = t->table->schema->max_rows;
460
461         if (n_rows > max_rows) {
462             return ovsdb_error("constraint violation",
463                                "transaction causes \"%s\" table to contain "
464                                "%zu rows, greater than the schema-defined "
465                                "limit of %u row(s)",
466                                t->table->schema->name, n_rows, max_rows);
467         }
468     }
469
470     return NULL;
471 }
472
473 struct ovsdb_error *
474 ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
475 {
476     struct ovsdb_replica *replica;
477     struct ovsdb_error *error;
478
479     /* Figure out what actually changed, and abort early if the transaction
480      * was really a no-op. */
481     error = for_each_txn_row(txn, determine_changes);
482     if (error) {
483         return OVSDB_WRAP_BUG("can't happen", error);
484     }
485     if (list_is_empty(&txn->txn_tables)) {
486         ovsdb_txn_abort(txn);
487         return NULL;
488     }
489
490     /* Check maximum rows table constraints. */
491     error = check_max_rows(txn);
492     if (error) {
493         ovsdb_txn_abort(txn);
494         return error;
495     }
496
497     /* Update reference counts and check referential integrity. */
498     error = update_ref_counts(txn);
499     if (error) {
500         ovsdb_txn_abort(txn);
501         return error;
502     }
503
504     /* Check reference counts and remove bad reference for "weak" referential
505      * integrity. */
506     error = for_each_txn_row(txn, assess_weak_refs);
507     if (error) {
508         ovsdb_txn_abort(txn);
509         return error;
510     }
511
512     /* Send the commit to each replica. */
513     LIST_FOR_EACH (replica, node, &txn->db->replicas) {
514         error = (replica->class->commit)(replica, txn, durable);
515         if (error) {
516             /* We don't support two-phase commit so only the first replica is
517              * allowed to report an error. */
518             assert(&replica->node == txn->db->replicas.next);
519
520             ovsdb_txn_abort(txn);
521             return error;
522         }
523     }
524
525     /* Finalize commit. */
526     txn->db->run_triggers = true;
527     ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_commit));
528     ovsdb_txn_free(txn);
529
530     return NULL;
531 }
532
533 void
534 ovsdb_txn_for_each_change(const struct ovsdb_txn *txn,
535                           ovsdb_txn_row_cb_func *cb, void *aux)
536 {
537     struct ovsdb_txn_table *t;
538     struct ovsdb_txn_row *r;
539
540     LIST_FOR_EACH (t, node, &txn->txn_tables) {
541         HMAP_FOR_EACH (r, hmap_node, &t->txn_rows) {
542             if ((r->old || r->new) && !cb(r->old, r->new, r->changed, aux)) {
543                 break;
544             }
545         }
546    }
547 }
548
549 static struct ovsdb_txn_table *
550 ovsdb_txn_create_txn_table(struct ovsdb_txn *txn, struct ovsdb_table *table)
551 {
552     if (!table->txn_table) {
553         struct ovsdb_txn_table *txn_table;
554
555         table->txn_table = txn_table = xmalloc(sizeof *table->txn_table);
556         txn_table->table = table;
557         hmap_init(&txn_table->txn_rows);
558         txn_table->serial = serial - 1;
559         list_push_back(&txn->txn_tables, &txn_table->node);
560     }
561     return table->txn_table;
562 }
563
564 static struct ovsdb_txn_row *
565 ovsdb_txn_row_create(struct ovsdb_txn *txn, struct ovsdb_table *table,
566                      const struct ovsdb_row *old_, struct ovsdb_row *new)
567 {
568     const struct ovsdb_row *row = old_ ? old_ : new;
569     struct ovsdb_row *old = (struct ovsdb_row *) old_;
570     size_t n_columns = shash_count(&table->schema->columns);
571     struct ovsdb_txn_table *txn_table;
572     struct ovsdb_txn_row *txn_row;
573
574     txn_row = xzalloc(offsetof(struct ovsdb_txn_row, changed)
575                       + bitmap_n_bytes(n_columns));
576     txn_row->uuid = *ovsdb_row_get_uuid(row);
577     txn_row->table = row->table;
578     txn_row->old = old;
579     txn_row->new = new;
580     txn_row->n_refs = old ? old->n_refs : 0;
581     txn_row->serial = serial - 1;
582
583     if (old) {
584         old->txn_row = txn_row;
585     }
586     if (new) {
587         new->txn_row = txn_row;
588     }
589
590     txn_table = ovsdb_txn_create_txn_table(txn, table);
591     hmap_insert(&txn_table->txn_rows, &txn_row->hmap_node,
592                 ovsdb_row_hash(old ? old : new));
593
594     return txn_row;
595 }
596
597 struct ovsdb_row *
598 ovsdb_txn_row_modify(struct ovsdb_txn *txn, const struct ovsdb_row *ro_row_)
599 {
600     struct ovsdb_row *ro_row = (struct ovsdb_row *) ro_row_;
601
602     if (ro_row->txn_row) {
603         assert(ro_row == ro_row->txn_row->new);
604         return ro_row;
605     } else {
606         struct ovsdb_table *table = ro_row->table;
607         struct ovsdb_row *rw_row;
608
609         rw_row = ovsdb_row_clone(ro_row);
610         rw_row->n_refs = ro_row->n_refs;
611         uuid_generate(ovsdb_row_get_version_rw(rw_row));
612         ovsdb_txn_row_create(txn, table, ro_row, rw_row);
613         hmap_replace(&table->rows, &ro_row->hmap_node, &rw_row->hmap_node);
614
615         return rw_row;
616     }
617 }
618
619 void
620 ovsdb_txn_row_insert(struct ovsdb_txn *txn, struct ovsdb_row *row)
621 {
622     uint32_t hash = ovsdb_row_hash(row);
623     struct ovsdb_table *table = row->table;
624
625     uuid_generate(ovsdb_row_get_version_rw(row));
626
627     ovsdb_txn_row_create(txn, table, NULL, row);
628     hmap_insert(&table->rows, &row->hmap_node, hash);
629 }
630
631 /* 'row' must be assumed destroyed upon return; the caller must not reference
632  * it again. */
633 void
634 ovsdb_txn_row_delete(struct ovsdb_txn *txn, const struct ovsdb_row *row_)
635 {
636     struct ovsdb_row *row = (struct ovsdb_row *) row_;
637     struct ovsdb_table *table = row->table;
638     struct ovsdb_txn_row *txn_row = row->txn_row;
639
640     hmap_remove(&table->rows, &row->hmap_node);
641
642     if (!txn_row) {
643         ovsdb_txn_row_create(txn, table, row, NULL);
644     } else {
645         assert(txn_row->new == row);
646         if (txn_row->old) {
647             txn_row->new = NULL;
648         } else {
649             hmap_remove(&table->txn_table->txn_rows, &txn_row->hmap_node);
650             free(txn_row);
651         }
652         ovsdb_row_destroy(row);
653     }
654 }
655
656 void
657 ovsdb_txn_add_comment(struct ovsdb_txn *txn, const char *s)
658 {
659     if (txn->comment.length) {
660         ds_put_char(&txn->comment, '\n');
661     }
662     ds_put_cstr(&txn->comment, s);
663 }
664
665 const char *
666 ovsdb_txn_get_comment(const struct ovsdb_txn *txn)
667 {
668     return txn->comment.length ? ds_cstr_ro(&txn->comment) : NULL;
669 }
670 \f
671 static void
672 ovsdb_txn_row_prefree(struct ovsdb_txn_row *txn_row)
673 {
674     struct ovsdb_txn_table *txn_table = txn_row->table->txn_table;
675
676     txn_table->n_processed--;
677     hmap_remove(&txn_table->txn_rows, &txn_row->hmap_node);
678
679     if (txn_row->old) {
680         txn_row->old->txn_row = NULL;
681     }
682     if (txn_row->new) {
683         txn_row->new->txn_row = NULL;
684     }
685 }
686
687 static void
688 ovsdb_txn_table_destroy(struct ovsdb_txn_table *txn_table)
689 {
690     assert(hmap_is_empty(&txn_table->txn_rows));
691     txn_table->table->txn_table = NULL;
692     hmap_destroy(&txn_table->txn_rows);
693     list_remove(&txn_table->node);
694     free(txn_table);
695 }
696
697 /* Calls 'cb' for every txn_row within 'txn'.  If 'cb' returns nonnull, this
698  * aborts the iteration and for_each_txn_row() passes the error up.  Otherwise,
699  * returns a null pointer after iteration is complete.
700  *
701  * 'cb' may insert new txn_rows and new txn_tables into 'txn'.  It may delete
702  * the txn_row that it is passed in, or txn_rows in txn_tables other than the
703  * one passed to 'cb'.  It may *not* delete txn_rows other than the one passed
704  * in within the same txn_table.  It may *not* delete any txn_tables.  As long
705  * as these rules are followed, 'cb' will be called exactly once for each
706  * txn_row in 'txn', even those added by 'cb'.
707  *
708  * (Even though 'cb' is not allowed to delete some txn_rows, it can still
709  * delete any actual row by clearing a txn_row's 'new' member.)
710  */
711 static struct ovsdb_error * WARN_UNUSED_RESULT
712 for_each_txn_row(struct ovsdb_txn *txn,
713                  struct ovsdb_error *(*cb)(struct ovsdb_txn *,
714                                            struct ovsdb_txn_row *))
715 {
716     bool any_work;
717
718     serial++;
719
720     do {
721         struct ovsdb_txn_table *t, *next_txn_table;
722
723         any_work = false;
724         LIST_FOR_EACH_SAFE (t, next_txn_table, node, &txn->txn_tables) {
725             if (t->serial != serial) {
726                 t->serial = serial;
727                 t->n_processed = 0;
728             }
729
730             while (t->n_processed < hmap_count(&t->txn_rows)) {
731                 struct ovsdb_txn_row *r, *next_txn_row;
732
733                 HMAP_FOR_EACH_SAFE (r, next_txn_row, hmap_node, &t->txn_rows) {
734                     if (r->serial != serial) {
735                         struct ovsdb_error *error;
736
737                         r->serial = serial;
738                         t->n_processed++;
739                         any_work = true;
740
741                         error = cb(txn, r);
742                         if (error) {
743                             return error;
744                         }
745                     }
746                 }
747             }
748             if (hmap_is_empty(&t->txn_rows)) {
749                 /* Table is empty.  Drop it. */
750                 ovsdb_txn_table_destroy(t);
751             }
752         }
753     } while (any_work);
754
755     return NULL;
756 }