netdev-dpdk: fix mbuf leaks
[cascardo/ovs.git] / ovsdb / file.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "file.h"
19
20 #include <errno.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23
24 #include "bitmap.h"
25 #include "column.h"
26 #include "log.h"
27 #include "json.h"
28 #include "lockfile.h"
29 #include "ovsdb.h"
30 #include "ovsdb-error.h"
31 #include "row.h"
32 #include "socket-util.h"
33 #include "table.h"
34 #include "timeval.h"
35 #include "transaction.h"
36 #include "uuid.h"
37 #include "util.h"
38 #include "openvswitch/vlog.h"
39
40 VLOG_DEFINE_THIS_MODULE(ovsdb_file);
41
42 /* Minimum number of milliseconds between database compactions. */
43 #define COMPACT_MIN_MSEC        (10 * 60 * 1000) /* 10 minutes. */
44
45 /* Minimum number of milliseconds between trying to compact the database if
46  * compacting fails. */
47 #define COMPACT_RETRY_MSEC      (60 * 1000)      /* 1 minute. */
48
49 /* A transaction being converted to JSON for writing to a file. */
50 struct ovsdb_file_txn {
51     struct json *json;          /* JSON for the whole transaction. */
52     struct json *table_json;    /* JSON for 'table''s transaction. */
53     struct ovsdb_table *table;  /* Table described in 'table_json'.  */
54 };
55
56 static void ovsdb_file_txn_init(struct ovsdb_file_txn *);
57 static void ovsdb_file_txn_add_row(struct ovsdb_file_txn *,
58                                    const struct ovsdb_row *old,
59                                    const struct ovsdb_row *new,
60                                    const unsigned long int *changed);
61 static struct ovsdb_error *ovsdb_file_txn_commit(struct json *,
62                                                  const char *comment,
63                                                  bool durable,
64                                                  struct ovsdb_log *);
65
66 static struct ovsdb_error *ovsdb_file_open__(const char *file_name,
67                                              const struct ovsdb_schema *,
68                                              bool read_only, struct ovsdb **,
69                                              struct ovsdb_file **);
70 static struct ovsdb_error *ovsdb_file_txn_from_json(
71     struct ovsdb *, const struct json *, bool converting, struct ovsdb_txn **);
72 static struct ovsdb_error *ovsdb_file_create(struct ovsdb *,
73                                              struct ovsdb_log *,
74                                              const char *file_name,
75                                              unsigned int n_transactions,
76                                              struct ovsdb_file **filep);
77
78 /* Opens database 'file_name' and stores a pointer to the new database in
79  * '*dbp'.  If 'read_only' is false, then the database will be locked and
80  * changes to the database will be written to disk.  If 'read_only' is true,
81  * the database will not be locked and changes to the database will persist
82  * only as long as the "struct ovsdb".
83  *
84  * If 'filep' is nonnull and 'read_only' is false, then on success sets
85  * '*filep' to an ovsdb_file that represents the open file.  This ovsdb_file
86  * persists until '*dbp' is destroyed.
87  *
88  * On success, returns NULL.  On failure, returns an ovsdb_error (which the
89  * caller must destroy) and sets '*dbp' and '*filep' to NULL. */
90 struct ovsdb_error *
91 ovsdb_file_open(const char *file_name, bool read_only,
92                 struct ovsdb **dbp, struct ovsdb_file **filep)
93 {
94     return ovsdb_file_open__(file_name, NULL, read_only, dbp, filep);
95 }
96
97 /* Opens database 'file_name' with an alternate schema.  The specified 'schema'
98  * is used to interpret the data in 'file_name', ignoring the schema actually
99  * stored in the file.  Data in the file for tables or columns that do not
100  * exist in 'schema' are ignored, but the ovsdb file format must otherwise be
101  * observed, including column constraints.
102  *
103  * This function can be useful for upgrading or downgrading databases to
104  * "almost-compatible" formats.
105  *
106  * The database will not be locked.  Changes to the database will persist only
107  * as long as the "struct ovsdb".
108  *
109  * On success, stores a pointer to the new database in '*dbp' and returns a
110  * null pointer.  On failure, returns an ovsdb_error (which the caller must
111  * destroy) and sets '*dbp' to NULL. */
112 struct ovsdb_error *
113 ovsdb_file_open_as_schema(const char *file_name,
114                           const struct ovsdb_schema *schema,
115                           struct ovsdb **dbp)
116 {
117     return ovsdb_file_open__(file_name, schema, true, dbp, NULL);
118 }
119
120 static struct ovsdb_error *
121 ovsdb_file_open_log(const char *file_name, enum ovsdb_log_open_mode open_mode,
122                     struct ovsdb_log **logp, struct ovsdb_schema **schemap)
123 {
124     struct ovsdb_schema *schema = NULL;
125     struct ovsdb_log *log = NULL;
126     struct ovsdb_error *error;
127     struct json *json = NULL;
128
129     ovs_assert(logp || schemap);
130
131     error = ovsdb_log_open(file_name, open_mode, -1, &log);
132     if (error) {
133         goto error;
134     }
135
136     error = ovsdb_log_read(log, &json);
137     if (error) {
138         goto error;
139     } else if (!json) {
140         error = ovsdb_io_error(EOF, "%s: database file contains no schema",
141                                file_name);
142         goto error;
143     }
144
145     if (schemap) {
146         error = ovsdb_schema_from_json(json, &schema);
147         if (error) {
148             error = ovsdb_wrap_error(error,
149                                      "failed to parse \"%s\" as ovsdb schema",
150                                      file_name);
151             goto error;
152         }
153     }
154     json_destroy(json);
155
156     if (logp) {
157         *logp = log;
158     } else {
159         ovsdb_log_close(log);
160     }
161     if (schemap) {
162         *schemap = schema;
163     }
164     return NULL;
165
166 error:
167     ovsdb_log_close(log);
168     json_destroy(json);
169     if (logp) {
170         *logp = NULL;
171     }
172     if (schemap) {
173         *schemap = NULL;
174     }
175     return error;
176 }
177
178 static struct ovsdb_error *
179 ovsdb_file_open__(const char *file_name,
180                   const struct ovsdb_schema *alternate_schema,
181                   bool read_only, struct ovsdb **dbp,
182                   struct ovsdb_file **filep)
183 {
184     enum ovsdb_log_open_mode open_mode;
185     unsigned int n_transactions;
186     struct ovsdb_schema *schema = NULL;
187     struct ovsdb_error *error;
188     struct ovsdb_log *log;
189     struct json *json;
190     struct ovsdb *db = NULL;
191
192     /* In read-only mode there is no ovsdb_file so 'filep' must be null. */
193     ovs_assert(!(read_only && filep));
194
195     open_mode = read_only ? OVSDB_LOG_READ_ONLY : OVSDB_LOG_READ_WRITE;
196     error = ovsdb_file_open_log(file_name, open_mode, &log,
197                                 alternate_schema ? NULL : &schema);
198     if (error) {
199         goto error;
200     }
201
202     db = ovsdb_create(schema ? schema : ovsdb_schema_clone(alternate_schema));
203
204     n_transactions = 0;
205     while ((error = ovsdb_log_read(log, &json)) == NULL && json) {
206         struct ovsdb_txn *txn;
207
208         error = ovsdb_file_txn_from_json(db, json, alternate_schema != NULL,
209                                          &txn);
210         json_destroy(json);
211         if (error) {
212             ovsdb_log_unread(log);
213             break;
214         }
215
216         n_transactions++;
217         error = ovsdb_txn_commit(txn, false);
218         if (error) {
219             ovsdb_log_unread(log);
220             break;
221         }
222     }
223     if (error) {
224         /* Log error but otherwise ignore it.  Probably the database just got
225          * truncated due to power failure etc. and we should use its current
226          * contents. */
227         char *msg = ovsdb_error_to_string(error);
228         VLOG_ERR("%s", msg);
229         free(msg);
230
231         ovsdb_error_destroy(error);
232     }
233
234     if (!read_only) {
235         struct ovsdb_file *file;
236
237         error = ovsdb_file_create(db, log, file_name, n_transactions, &file);
238         if (error) {
239             goto error;
240         }
241         if (filep) {
242             *filep = file;
243         }
244     } else {
245         ovsdb_log_close(log);
246     }
247
248     *dbp = db;
249     return NULL;
250
251 error:
252     *dbp = NULL;
253     if (filep) {
254         *filep = NULL;
255     }
256     ovsdb_destroy(db);
257     ovsdb_log_close(log);
258     return error;
259 }
260
261 static struct ovsdb_error *
262 ovsdb_file_update_row_from_json(struct ovsdb_row *row, bool converting,
263                                 const struct json *json)
264 {
265     struct ovsdb_table_schema *schema = row->table->schema;
266     struct ovsdb_error *error;
267     struct shash_node *node;
268
269     if (json->type != JSON_OBJECT) {
270         return ovsdb_syntax_error(json, NULL, "row must be JSON object");
271     }
272
273     SHASH_FOR_EACH (node, json_object(json)) {
274         const char *column_name = node->name;
275         const struct ovsdb_column *column;
276         struct ovsdb_datum datum;
277
278         column = ovsdb_table_schema_get_column(schema, column_name);
279         if (!column) {
280             if (converting) {
281                 continue;
282             }
283             return ovsdb_syntax_error(json, "unknown column",
284                                       "No column %s in table %s.",
285                                       column_name, schema->name);
286         }
287
288         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
289         if (error) {
290             return error;
291         }
292         ovsdb_datum_swap(&row->fields[column->index], &datum);
293         ovsdb_datum_destroy(&datum, &column->type);
294     }
295
296     return NULL;
297 }
298
299 static struct ovsdb_error *
300 ovsdb_file_txn_row_from_json(struct ovsdb_txn *txn, struct ovsdb_table *table,
301                              bool converting,
302                              const struct uuid *row_uuid, struct json *json)
303 {
304     const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
305     if (json->type == JSON_NULL) {
306         if (!row) {
307             return ovsdb_syntax_error(NULL, NULL, "transaction deletes "
308                                       "row "UUID_FMT" that does not exist",
309                                       UUID_ARGS(row_uuid));
310         }
311         ovsdb_txn_row_delete(txn, row);
312         return NULL;
313     } else if (row) {
314         return ovsdb_file_update_row_from_json(ovsdb_txn_row_modify(txn, row),
315                                                converting, json);
316     } else {
317         struct ovsdb_error *error;
318         struct ovsdb_row *new;
319
320         new = ovsdb_row_create(table);
321         *ovsdb_row_get_uuid_rw(new) = *row_uuid;
322         error = ovsdb_file_update_row_from_json(new, converting, json);
323         if (error) {
324             ovsdb_row_destroy(new);
325         } else {
326             ovsdb_txn_row_insert(txn, new);
327         }
328         return error;
329     }
330 }
331
332 static struct ovsdb_error *
333 ovsdb_file_txn_table_from_json(struct ovsdb_txn *txn,
334                                struct ovsdb_table *table,
335                                bool converting, struct json *json)
336 {
337     struct shash_node *node;
338
339     if (json->type != JSON_OBJECT) {
340         return ovsdb_syntax_error(json, NULL, "object expected");
341     }
342
343     SHASH_FOR_EACH (node, json->u.object) {
344         const char *uuid_string = node->name;
345         struct json *txn_row_json = node->data;
346         struct ovsdb_error *error;
347         struct uuid row_uuid;
348
349         if (!uuid_from_string(&row_uuid, uuid_string)) {
350             return ovsdb_syntax_error(json, NULL, "\"%s\" is not a valid UUID",
351                                       uuid_string);
352         }
353
354         error = ovsdb_file_txn_row_from_json(txn, table, converting,
355                                              &row_uuid, txn_row_json);
356         if (error) {
357             return error;
358         }
359     }
360
361     return NULL;
362 }
363
364 /* Converts 'json' to an ovsdb_txn for 'db', storing the new transaction in
365  * '*txnp'.  Returns NULL if successful, otherwise an error.
366  *
367  * If 'converting' is true, then unknown table and column names are ignored
368  * (which can ease upgrading and downgrading schemas); otherwise, they are
369  * treated as errors. */
370 static struct ovsdb_error *
371 ovsdb_file_txn_from_json(struct ovsdb *db, const struct json *json,
372                          bool converting, struct ovsdb_txn **txnp)
373 {
374     struct ovsdb_error *error;
375     struct shash_node *node;
376     struct ovsdb_txn *txn;
377
378     *txnp = NULL;
379
380     if (json->type != JSON_OBJECT) {
381         return ovsdb_syntax_error(json, NULL, "object expected");
382     }
383
384     txn = ovsdb_txn_create(db);
385     SHASH_FOR_EACH (node, json->u.object) {
386         const char *table_name = node->name;
387         struct json *node_json = node->data;
388         struct ovsdb_table *table;
389
390         table = shash_find_data(&db->tables, table_name);
391         if (!table) {
392             if (!strcmp(table_name, "_date")
393                 && node_json->type == JSON_INTEGER) {
394                 continue;
395             } else if (!strcmp(table_name, "_comment") || converting) {
396                 continue;
397             }
398
399             error = ovsdb_syntax_error(json, "unknown table",
400                                        "No table named %s.", table_name);
401             goto error;
402         }
403
404         error = ovsdb_file_txn_table_from_json(txn, table, converting,
405                                                node_json);
406         if (error) {
407             goto error;
408         }
409     }
410     *txnp = txn;
411     return NULL;
412
413 error:
414     ovsdb_txn_abort(txn);
415     return error;
416 }
417
418 static struct ovsdb_error *
419 ovsdb_file_save_copy__(const char *file_name, int locking,
420                        const char *comment, const struct ovsdb *db,
421                        struct ovsdb_log **logp)
422 {
423     const struct shash_node *node;
424     struct ovsdb_file_txn ftxn;
425     struct ovsdb_error *error;
426     struct ovsdb_log *log;
427     struct json *json;
428
429     error = ovsdb_log_open(file_name, OVSDB_LOG_CREATE, locking, &log);
430     if (error) {
431         return error;
432     }
433
434     /* Write schema. */
435     json = ovsdb_schema_to_json(db->schema);
436     error = ovsdb_log_write(log, json);
437     json_destroy(json);
438     if (error) {
439         goto exit;
440     }
441
442     /* Write data. */
443     ovsdb_file_txn_init(&ftxn);
444     SHASH_FOR_EACH (node, &db->tables) {
445         const struct ovsdb_table *table = node->data;
446         const struct ovsdb_row *row;
447
448         HMAP_FOR_EACH (row, hmap_node, &table->rows) {
449             ovsdb_file_txn_add_row(&ftxn, NULL, row, NULL);
450         }
451     }
452     error = ovsdb_file_txn_commit(ftxn.json, comment, true, log);
453
454 exit:
455     if (logp) {
456         if (!error) {
457             *logp = log;
458             log = NULL;
459         } else {
460             *logp = NULL;
461         }
462     }
463     ovsdb_log_close(log);
464     if (error) {
465         remove(file_name);
466     }
467     return error;
468 }
469
470 /* Saves a snapshot of 'db''s current contents as 'file_name'.  If 'comment' is
471  * nonnull, then it is added along with the data contents and can be viewed
472  * with "ovsdb-tool show-log".
473  *
474  * 'locking' is passed along to ovsdb_log_open() untouched. */
475 struct ovsdb_error *
476 ovsdb_file_save_copy(const char *file_name, int locking,
477                      const char *comment, const struct ovsdb *db)
478 {
479     return ovsdb_file_save_copy__(file_name, locking, comment, db, NULL);
480 }
481
482 /* Opens database 'file_name', reads its schema, and closes it.  On success,
483  * stores the schema into '*schemap' and returns NULL; the caller then owns the
484  * schema.  On failure, returns an ovsdb_error (which the caller must destroy)
485  * and sets '*dbp' to NULL. */
486 struct ovsdb_error *
487 ovsdb_file_read_schema(const char *file_name, struct ovsdb_schema **schemap)
488 {
489     ovs_assert(schemap != NULL);
490     return ovsdb_file_open_log(file_name, OVSDB_LOG_READ_ONLY, NULL, schemap);
491 }
492 \f
493 /* Replica implementation. */
494
495 struct ovsdb_file {
496     struct ovsdb_replica replica;
497     struct ovsdb *db;
498     struct ovsdb_log *log;
499     char *file_name;
500     long long int last_compact;
501     long long int next_compact;
502     unsigned int n_transactions;
503 };
504
505 static const struct ovsdb_replica_class ovsdb_file_class;
506
507 static struct ovsdb_error *
508 ovsdb_file_create(struct ovsdb *db, struct ovsdb_log *log,
509                   const char *file_name,
510                   unsigned int n_transactions,
511                   struct ovsdb_file **filep)
512 {
513     struct ovsdb_file *file;
514     char *deref_name;
515     char *abs_name;
516
517     /* Use the absolute name of the file because ovsdb-server opens its
518      * database before daemonize() chdirs to "/". */
519     deref_name = follow_symlinks(file_name);
520     abs_name = abs_file_name(NULL, deref_name);
521     free(deref_name);
522     if (!abs_name) {
523         *filep = NULL;
524         return ovsdb_io_error(0, "could not determine current "
525                               "working directory");
526     }
527
528     file = xmalloc(sizeof *file);
529     ovsdb_replica_init(&file->replica, &ovsdb_file_class);
530     file->db = db;
531     file->log = log;
532     file->file_name = abs_name;
533     file->last_compact = time_msec();
534     file->next_compact = file->last_compact + COMPACT_MIN_MSEC;
535     file->n_transactions = n_transactions;
536     ovsdb_add_replica(db, &file->replica);
537
538     *filep = file;
539     return NULL;
540 }
541
542 static struct ovsdb_file *
543 ovsdb_file_cast(struct ovsdb_replica *replica)
544 {
545     ovs_assert(replica->class == &ovsdb_file_class);
546     return CONTAINER_OF(replica, struct ovsdb_file, replica);
547 }
548
549 static bool
550 ovsdb_file_change_cb(const struct ovsdb_row *old,
551                      const struct ovsdb_row *new,
552                      const unsigned long int *changed,
553                      void *ftxn_)
554 {
555     struct ovsdb_file_txn *ftxn = ftxn_;
556     ovsdb_file_txn_add_row(ftxn, old, new, changed);
557     return true;
558 }
559
560 static struct ovsdb_error *
561 ovsdb_file_commit(struct ovsdb_replica *replica,
562                   const struct ovsdb_txn *txn, bool durable)
563 {
564     struct ovsdb_file *file = ovsdb_file_cast(replica);
565     struct ovsdb_file_txn ftxn;
566     struct ovsdb_error *error;
567
568     ovsdb_file_txn_init(&ftxn);
569     ovsdb_txn_for_each_change(txn, ovsdb_file_change_cb, &ftxn);
570     if (!ftxn.json) {
571         /* Nothing to commit. */
572         return NULL;
573     }
574
575     error = ovsdb_file_txn_commit(ftxn.json, ovsdb_txn_get_comment(txn),
576                                   durable, file->log);
577     if (error) {
578         return error;
579     }
580     file->n_transactions++;
581
582     /* If it has been at least COMPACT_MIN_MSEC ms since the last time we
583      * compacted (or at least COMPACT_RETRY_MSEC ms since the last time we
584      * tried), and if there are at least 100 transactions in the database, and
585      * if the database is at least 10 MB, then compact the database. */
586     if (time_msec() >= file->next_compact
587         && file->n_transactions >= 100
588         && ovsdb_log_get_offset(file->log) >= 10 * 1024 * 1024)
589     {
590         error = ovsdb_file_compact(file);
591         if (error) {
592             char *s = ovsdb_error_to_string(error);
593             ovsdb_error_destroy(error);
594             VLOG_WARN("%s: compacting database failed (%s), retrying in "
595                       "%d seconds",
596                       file->file_name, s, COMPACT_RETRY_MSEC / 1000);
597             free(s);
598
599             file->next_compact = time_msec() + COMPACT_RETRY_MSEC;
600         }
601     }
602
603     return NULL;
604 }
605
606 struct ovsdb_error *
607 ovsdb_file_compact(struct ovsdb_file *file)
608 {
609     struct ovsdb_log *new_log = NULL;
610     struct lockfile *tmp_lock = NULL;
611     struct ovsdb_error *error;
612     char *tmp_name = NULL;
613     char *comment = NULL;
614     int retval;
615
616     comment = xasprintf("compacting database online "
617                         "(%.3f seconds old, %u transactions, %llu bytes)",
618                         (time_wall_msec() - file->last_compact) / 1000.0,
619                         file->n_transactions,
620                         (unsigned long long) ovsdb_log_get_offset(file->log));
621     VLOG_INFO("%s: %s", file->file_name, comment);
622
623     /* Commit the old version, so that we can be assured that we'll eventually
624      * have either the old or the new version. */
625     error = ovsdb_log_commit(file->log);
626     if (error) {
627         goto exit;
628     }
629
630     /* Lock temporary file. */
631     tmp_name = xasprintf("%s.tmp", file->file_name);
632     retval = lockfile_lock(tmp_name, &tmp_lock);
633     if (retval) {
634         error = ovsdb_io_error(retval, "could not get lock on %s", tmp_name);
635         goto exit;
636     }
637
638     /* Remove temporary file.  (It might not exist.) */
639     if (unlink(tmp_name) < 0 && errno != ENOENT) {
640         error = ovsdb_io_error(errno, "failed to remove %s", tmp_name);
641         goto exit;
642     }
643
644     /* Save a copy. */
645     error = ovsdb_file_save_copy__(tmp_name, false, comment, file->db,
646                                    &new_log);
647     if (error) {
648         goto exit;
649     }
650
651     /* Replace original by temporary. */
652     if (rename(tmp_name, file->file_name)) {
653         error = ovsdb_io_error(errno, "failed to rename \"%s\" to \"%s\"",
654                                tmp_name, file->file_name);
655         goto exit;
656     }
657     fsync_parent_dir(file->file_name);
658
659 exit:
660     if (!error) {
661         ovsdb_log_close(file->log);
662         file->log = new_log;
663         file->last_compact = time_msec();
664         file->next_compact = file->last_compact + COMPACT_MIN_MSEC;
665         file->n_transactions = 1;
666     } else {
667         ovsdb_log_close(new_log);
668         if (tmp_lock) {
669             unlink(tmp_name);
670         }
671     }
672
673     lockfile_unlock(tmp_lock);
674     free(tmp_name);
675     free(comment);
676
677     return error;
678 }
679
680 static void
681 ovsdb_file_destroy(struct ovsdb_replica *replica)
682 {
683     struct ovsdb_file *file = ovsdb_file_cast(replica);
684
685     ovsdb_log_close(file->log);
686     free(file->file_name);
687     free(file);
688 }
689
690 static const struct ovsdb_replica_class ovsdb_file_class = {
691     ovsdb_file_commit,
692     ovsdb_file_destroy
693 };
694 \f
695 static void
696 ovsdb_file_txn_init(struct ovsdb_file_txn *ftxn)
697 {
698     ftxn->json = NULL;
699     ftxn->table_json = NULL;
700     ftxn->table = NULL;
701 }
702
703 static void
704 ovsdb_file_txn_add_row(struct ovsdb_file_txn *ftxn,
705                        const struct ovsdb_row *old,
706                        const struct ovsdb_row *new,
707                        const unsigned long int *changed)
708 {
709     struct json *row;
710
711     if (!new) {
712         row = json_null_create();
713     } else {
714         struct shash_node *node;
715
716         row = old ? NULL : json_object_create();
717         SHASH_FOR_EACH (node, &new->table->schema->columns) {
718             const struct ovsdb_column *column = node->data;
719             const struct ovsdb_type *type = &column->type;
720             unsigned int idx = column->index;
721
722             if (idx != OVSDB_COL_UUID && column->persistent
723                 && (old
724                     ? bitmap_is_set(changed, idx)
725                     : !ovsdb_datum_is_default(&new->fields[idx], type)))
726             {
727                 if (!row) {
728                     row = json_object_create();
729                 }
730                 json_object_put(row, column->name,
731                                 ovsdb_datum_to_json(&new->fields[idx], type));
732             }
733         }
734     }
735
736     if (row) {
737         struct ovsdb_table *table = new ? new->table : old->table;
738         char uuid[UUID_LEN + 1];
739
740         if (table != ftxn->table) {
741             /* Create JSON object for transaction overall. */
742             if (!ftxn->json) {
743                 ftxn->json = json_object_create();
744             }
745
746             /* Create JSON object for transaction on this table. */
747             ftxn->table_json = json_object_create();
748             ftxn->table = table;
749             json_object_put(ftxn->json, table->schema->name, ftxn->table_json);
750         }
751
752         /* Add row to transaction for this table. */
753         snprintf(uuid, sizeof uuid,
754                  UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
755         json_object_put(ftxn->table_json, uuid, row);
756     }
757 }
758
759 static struct ovsdb_error *
760 ovsdb_file_txn_commit(struct json *json, const char *comment,
761                       bool durable, struct ovsdb_log *log)
762 {
763     struct ovsdb_error *error;
764
765     if (!json) {
766         json = json_object_create();
767     }
768     if (comment) {
769         json_object_put_string(json, "_comment", comment);
770     }
771     json_object_put(json, "_date", json_integer_create(time_wall_msec()));
772
773     error = ovsdb_log_write(log, json);
774     json_destroy(json);
775     if (error) {
776         return ovsdb_wrap_error(error, "writing transaction failed");
777     }
778
779     if (durable) {
780         error = ovsdb_log_commit(log);
781         if (error) {
782             return ovsdb_wrap_error(error, "committing transaction failed");
783         }
784     }
785
786     return NULL;
787 }