db-ctl-base: Make common database command code into library.
[cascardo/ovs.git] / lib / db-ctl-base.c
1 /*
2  * Copyright (c) 2015 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include <ctype.h>
20 #include <getopt.h>
21 #include <unistd.h>
22
23 #include "db-ctl-base.h"
24
25 #include "command-line.h"
26 #include "compiler.h"
27 #include "dirs.h"
28 #include "dynamic-string.h"
29 #include "fatal-signal.h"
30 #include "hash.h"
31 #include "json.h"
32 #include "openvswitch/vlog.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-idl.h"
35 #include "ovsdb-idl-provider.h"
36 #include "shash.h"
37 #include "string.h"
38 #include "table.h"
39 #include "util.h"
40
41 VLOG_DEFINE_THIS_MODULE(db_ctl_base);
42
43 /* The IDL we're using and the current transaction, if any.
44  * This is for use by ctl_exit() only, to allow it to clean up.
45  * Other code should use its context arguments. */
46 struct ovsdb_idl *the_idl;
47 struct ovsdb_idl_txn *the_idl_txn;
48
49 static struct shash all_commands = SHASH_INITIALIZER(&all_commands);
50
51 \f
52 static void
53 die_if_error(char *error)
54 {
55     if (error) {
56         ctl_fatal("%s", error);
57     }
58 }
59
60 static int
61 to_lower_and_underscores(unsigned c)
62 {
63     return c == '-' ? '_' : tolower(c);
64 }
65
66 static unsigned int
67 score_partial_match(const char *name, const char *s)
68 {
69     int score;
70
71     if (!strcmp(name, s)) {
72         return UINT_MAX;
73     }
74     for (score = 0; ; score++, name++, s++) {
75         if (to_lower_and_underscores(*name) != to_lower_and_underscores(*s)) {
76             break;
77         } else if (*name == '\0') {
78             return UINT_MAX - 1;
79         }
80     }
81     return *s == '\0' ? score : 0;
82 }
83
84 static struct ovsdb_symbol *
85 create_symbol(struct ovsdb_symbol_table *symtab, const char *id, bool *newp)
86 {
87     struct ovsdb_symbol *symbol;
88
89     if (id[0] != '@') {
90         ctl_fatal("row id \"%s\" does not begin with \"@\"", id);
91     }
92
93     if (newp) {
94         *newp = ovsdb_symbol_table_get(symtab, id) == NULL;
95     }
96
97     symbol = ovsdb_symbol_table_insert(symtab, id);
98     if (symbol->created) {
99         ctl_fatal("row id \"%s\" may only be specified on one --id option",
100                   id);
101     }
102     symbol->created = true;
103     return symbol;
104 }
105
106 static const struct ovsdb_idl_row *
107 get_row_by_id(struct ctl_context *ctx, const struct ctl_table_class *table,
108               const struct ctl_row_id *id, const char *record_id)
109 {
110     const struct ovsdb_idl_row *referrer, *final;
111
112     if (!id->table) {
113         return NULL;
114     }
115
116     if (!id->name_column) {
117         if (strcmp(record_id, ".")) {
118             return NULL;
119         }
120         referrer = ovsdb_idl_first_row(ctx->idl, id->table);
121         if (!referrer || ovsdb_idl_next_row(referrer)) {
122             return NULL;
123         }
124     } else {
125         const struct ovsdb_idl_row *row;
126
127         referrer = NULL;
128         for (row = ovsdb_idl_first_row(ctx->idl, id->table);
129              row != NULL;
130              row = ovsdb_idl_next_row(row))
131             {
132                 const struct ovsdb_datum *name;
133
134                 name = ovsdb_idl_get(row, id->name_column,
135                                      OVSDB_TYPE_STRING, OVSDB_TYPE_VOID);
136                 if (name->n == 1 && !strcmp(name->keys[0].string, record_id)) {
137                     if (referrer) {
138                         ctl_fatal("multiple rows in %s match \"%s\"",
139                                   table->class->name, record_id);
140                     }
141                     referrer = row;
142                 }
143             }
144     }
145     if (!referrer) {
146         return NULL;
147     }
148
149     final = NULL;
150     if (id->uuid_column) {
151         const struct ovsdb_datum *uuid;
152
153         ovsdb_idl_txn_verify(referrer, id->uuid_column);
154         uuid = ovsdb_idl_get(referrer, id->uuid_column,
155                              OVSDB_TYPE_UUID, OVSDB_TYPE_VOID);
156         if (uuid->n == 1) {
157             final = ovsdb_idl_get_row_for_uuid(ctx->idl, table->class,
158                                                &uuid->keys[0].uuid);
159         }
160     } else {
161         final = referrer;
162     }
163
164     return final;
165 }
166
167 static const struct ovsdb_idl_row *
168 get_row(struct ctl_context *ctx,
169         const struct ctl_table_class *table, const char *record_id,
170         bool must_exist)
171 {
172     const struct ovsdb_idl_row *row;
173     struct uuid uuid;
174
175     row = NULL;
176     if (uuid_from_string(&uuid, record_id)) {
177         row = ovsdb_idl_get_row_for_uuid(ctx->idl, table->class, &uuid);
178     }
179     if (!row) {
180         int i;
181
182         for (i = 0; i < ARRAY_SIZE(table->row_ids); i++) {
183             row = get_row_by_id(ctx, table, &table->row_ids[i], record_id);
184             if (row) {
185                 break;
186             }
187         }
188     }
189     if (must_exist && !row) {
190         ctl_fatal("no row \"%s\" in table %s",
191                   record_id, table->class->name);
192     }
193     return row;
194 }
195
196 static char *
197 get_column(const struct ctl_table_class *table, const char *column_name,
198            const struct ovsdb_idl_column **columnp)
199 {
200     const struct ovsdb_idl_column *best_match = NULL;
201     unsigned int best_score = 0;
202     size_t i;
203
204     for (i = 0; i < table->class->n_columns; i++) {
205         const struct ovsdb_idl_column *column = &table->class->columns[i];
206         unsigned int score = score_partial_match(column->name, column_name);
207         if (score > best_score) {
208             best_match = column;
209             best_score = score;
210         } else if (score == best_score) {
211             best_match = NULL;
212         }
213     }
214
215     *columnp = best_match;
216     if (best_match) {
217         return NULL;
218     } else if (best_score) {
219         return xasprintf("%s contains more than one column whose name "
220                          "matches \"%s\"", table->class->name, column_name);
221     } else {
222         return xasprintf("%s does not contain a column whose name matches "
223                          "\"%s\"", table->class->name, column_name);
224     }
225 }
226
227 static void
228 pre_get_column(struct ctl_context *ctx,
229                const struct ctl_table_class *table, const char *column_name,
230                const struct ovsdb_idl_column **columnp)
231 {
232     die_if_error(get_column(table, column_name, columnp));
233     ovsdb_idl_add_column(ctx->idl, *columnp);
234 }
235
236 static const struct ctl_table_class *
237 pre_get_table(struct ctl_context *ctx, const char *table_name)
238 {
239     const struct ctl_table_class *table_class;
240     int i;
241
242     table_class = get_table(table_name);
243     ovsdb_idl_add_table(ctx->idl, table_class->class);
244
245     for (i = 0; i < ARRAY_SIZE(table_class->row_ids); i++) {
246         const struct ctl_row_id *id = &table_class->row_ids[i];
247         if (id->table) {
248             ovsdb_idl_add_table(ctx->idl, id->table);
249         }
250         if (id->name_column) {
251             ovsdb_idl_add_column(ctx->idl, id->name_column);
252         }
253         if (id->uuid_column) {
254             ovsdb_idl_add_column(ctx->idl, id->uuid_column);
255         }
256     }
257
258     return table_class;
259 }
260
261 static char *
262 missing_operator_error(const char *arg, const char **allowed_operators,
263                        size_t n_allowed)
264 {
265     struct ds s;
266
267     ds_init(&s);
268     ds_put_format(&s, "%s: argument does not end in ", arg);
269     ds_put_format(&s, "\"%s\"", allowed_operators[0]);
270     if (n_allowed == 2) {
271         ds_put_format(&s, " or \"%s\"", allowed_operators[1]);
272     } else if (n_allowed > 2) {
273         size_t i;
274
275         for (i = 1; i < n_allowed - 1; i++) {
276             ds_put_format(&s, ", \"%s\"", allowed_operators[i]);
277         }
278         ds_put_format(&s, ", or \"%s\"", allowed_operators[i]);
279     }
280     ds_put_format(&s, " followed by a value.");
281
282     return ds_steal_cstr(&s);
283 }
284
285 /* Breaks 'arg' apart into a number of fields in the following order:
286  *
287  *      - The name of a column in 'table', stored into '*columnp'.  The column
288  *        name may be abbreviated.
289  *
290  *      - Optionally ':' followed by a key string.  The key is stored as a
291  *        malloc()'d string into '*keyp', or NULL if no key is present in
292  *        'arg'.
293  *
294  *      - If 'valuep' is nonnull, an operator followed by a value string.  The
295  *        allowed operators are the 'n_allowed' string in 'allowed_operators',
296  *        or just "=" if 'n_allowed' is 0.  If 'operatorp' is nonnull, then the
297  *        index of the operator within 'allowed_operators' is stored into
298  *        '*operatorp'.  The value is stored as a malloc()'d string into
299  *        '*valuep', or NULL if no value is present in 'arg'.
300  *
301  * On success, returns NULL.  On failure, returned a malloc()'d string error
302  * message and stores NULL into all of the nonnull output arguments. */
303 static char * OVS_WARN_UNUSED_RESULT
304 parse_column_key_value(const char *arg,
305                        const struct ctl_table_class *table,
306                        const struct ovsdb_idl_column **columnp, char **keyp,
307                        int *operatorp,
308                        const char **allowed_operators, size_t n_allowed,
309                        char **valuep)
310 {
311     const char *p = arg;
312     char *column_name;
313     char *error;
314
315     ovs_assert(!(operatorp && !valuep));
316     *keyp = NULL;
317     if (valuep) {
318         *valuep = NULL;
319     }
320
321     /* Parse column name. */
322     error = ovsdb_token_parse(&p, &column_name);
323     if (error) {
324         goto error;
325     }
326     if (column_name[0] == '\0') {
327         free(column_name);
328         error = xasprintf("%s: missing column name", arg);
329         goto error;
330     }
331     error = get_column(table, column_name, columnp);
332     free(column_name);
333     if (error) {
334         goto error;
335     }
336
337     /* Parse key string. */
338     if (*p == ':') {
339         p++;
340         error = ovsdb_token_parse(&p, keyp);
341         if (error) {
342             goto error;
343         }
344     }
345
346     /* Parse value string. */
347     if (valuep) {
348         size_t best_len;
349         size_t i;
350         int best;
351
352         if (!allowed_operators) {
353             static const char *equals = "=";
354             allowed_operators = &equals;
355             n_allowed = 1;
356         }
357
358         best = -1;
359         best_len = 0;
360         for (i = 0; i < n_allowed; i++) {
361             const char *op = allowed_operators[i];
362             size_t op_len = strlen(op);
363
364             if (op_len > best_len && !strncmp(op, p, op_len) && p[op_len]) {
365                 best_len = op_len;
366                 best = i;
367             }
368         }
369         if (best < 0) {
370             error = missing_operator_error(arg, allowed_operators, n_allowed);
371             goto error;
372         }
373
374         if (operatorp) {
375             *operatorp = best;
376         }
377         *valuep = xstrdup(p + best_len);
378     } else {
379         if (*p != '\0') {
380             error = xasprintf("%s: trailing garbage \"%s\" in argument",
381                               arg, p);
382             goto error;
383         }
384     }
385     return NULL;
386
387  error:
388     *columnp = NULL;
389     free(*keyp);
390     *keyp = NULL;
391     if (valuep) {
392         free(*valuep);
393         *valuep = NULL;
394         if (operatorp) {
395             *operatorp = -1;
396         }
397     }
398     return error;
399 }
400
401 static const struct ovsdb_idl_column *
402 pre_parse_column_key_value(struct ctl_context *ctx,
403                            const char *arg,
404                            const struct ctl_table_class *table)
405 {
406     const struct ovsdb_idl_column *column;
407     const char *p;
408     char *column_name;
409
410     p = arg;
411     die_if_error(ovsdb_token_parse(&p, &column_name));
412     if (column_name[0] == '\0') {
413         ctl_fatal("%s: missing column name", arg);
414     }
415
416     pre_get_column(ctx, table, column_name, &column);
417     free(column_name);
418
419     return column;
420 }
421
422 static void
423 check_mutable(const struct ovsdb_idl_row *row,
424               const struct ovsdb_idl_column *column)
425 {
426     if (!ovsdb_idl_is_mutable(row, column)) {
427         ctl_fatal("cannot modify read-only column %s in table %s",
428                   column->name, row->table->class->name);
429     }
430 }
431
432 #define RELOPS                                  \
433     RELOP(RELOP_EQ,     "=")                    \
434     RELOP(RELOP_NE,     "!=")                   \
435     RELOP(RELOP_LT,     "<")                    \
436     RELOP(RELOP_GT,     ">")                    \
437     RELOP(RELOP_LE,     "<=")                   \
438     RELOP(RELOP_GE,     ">=")                   \
439     RELOP(RELOP_SET_EQ, "{=}")                  \
440     RELOP(RELOP_SET_NE, "{!=}")                 \
441     RELOP(RELOP_SET_LT, "{<}")                  \
442     RELOP(RELOP_SET_GT, "{>}")                  \
443     RELOP(RELOP_SET_LE, "{<=}")                 \
444     RELOP(RELOP_SET_GE, "{>=}")
445
446 enum relop {
447 #define RELOP(ENUM, STRING) ENUM,
448     RELOPS
449 #undef RELOP
450 };
451
452 static bool
453 is_set_operator(enum relop op)
454 {
455     return (op == RELOP_SET_EQ || op == RELOP_SET_NE ||
456             op == RELOP_SET_LT || op == RELOP_SET_GT ||
457             op == RELOP_SET_LE || op == RELOP_SET_GE);
458 }
459
460 static bool
461 evaluate_relop(const struct ovsdb_datum *a, const struct ovsdb_datum *b,
462                const struct ovsdb_type *type, enum relop op)
463 {
464     switch (op) {
465     case RELOP_EQ:
466     case RELOP_SET_EQ:
467         return ovsdb_datum_compare_3way(a, b, type) == 0;
468     case RELOP_NE:
469     case RELOP_SET_NE:
470         return ovsdb_datum_compare_3way(a, b, type) != 0;
471     case RELOP_LT:
472         return ovsdb_datum_compare_3way(a, b, type) < 0;
473     case RELOP_GT:
474         return ovsdb_datum_compare_3way(a, b, type) > 0;
475     case RELOP_LE:
476         return ovsdb_datum_compare_3way(a, b, type) <= 0;
477     case RELOP_GE:
478         return ovsdb_datum_compare_3way(a, b, type) >= 0;
479
480     case RELOP_SET_LT:
481         return b->n > a->n && ovsdb_datum_includes_all(a, b, type);
482     case RELOP_SET_GT:
483         return a->n > b->n && ovsdb_datum_includes_all(b, a, type);
484     case RELOP_SET_LE:
485         return ovsdb_datum_includes_all(a, b, type);
486     case RELOP_SET_GE:
487         return ovsdb_datum_includes_all(b, a, type);
488
489     default:
490         OVS_NOT_REACHED();
491     }
492 }
493
494 static bool
495 is_condition_satisfied(const struct ctl_table_class *table,
496                        const struct ovsdb_idl_row *row, const char *arg,
497                        struct ovsdb_symbol_table *symtab)
498 {
499     static const char *operators[] = {
500 #define RELOP(ENUM, STRING) STRING,
501         RELOPS
502 #undef RELOP
503     };
504
505     const struct ovsdb_idl_column *column;
506     const struct ovsdb_datum *have_datum;
507     char *key_string, *value_string;
508     struct ovsdb_type type;
509     int operator;
510     bool retval;
511     char *error;
512
513     error = parse_column_key_value(arg, table, &column, &key_string,
514                                    &operator, operators, ARRAY_SIZE(operators),
515                                    &value_string);
516     die_if_error(error);
517     if (!value_string) {
518         ctl_fatal("%s: missing value", arg);
519     }
520
521     type = column->type;
522     type.n_max = UINT_MAX;
523
524     have_datum = ovsdb_idl_read(row, column);
525     if (key_string) {
526         union ovsdb_atom want_key;
527         struct ovsdb_datum b;
528         unsigned int idx;
529
530         if (column->type.value.type == OVSDB_TYPE_VOID) {
531             ctl_fatal("cannot specify key to check for non-map column %s",
532                       column->name);
533         }
534
535         die_if_error(ovsdb_atom_from_string(&want_key, &column->type.key,
536                                             key_string, symtab));
537
538         type.key = type.value;
539         type.value.type = OVSDB_TYPE_VOID;
540         die_if_error(ovsdb_datum_from_string(&b, &type, value_string, symtab));
541
542         idx = ovsdb_datum_find_key(have_datum,
543                                    &want_key, column->type.key.type);
544         if (idx == UINT_MAX && !is_set_operator(operator)) {
545             retval = false;
546         } else {
547             struct ovsdb_datum a;
548
549             if (idx != UINT_MAX) {
550                 a.n = 1;
551                 a.keys = &have_datum->values[idx];
552                 a.values = NULL;
553             } else {
554                 a.n = 0;
555                 a.keys = NULL;
556                 a.values = NULL;
557             }
558
559             retval = evaluate_relop(&a, &b, &type, operator);
560         }
561
562         ovsdb_atom_destroy(&want_key, column->type.key.type);
563         ovsdb_datum_destroy(&b, &type);
564     } else {
565         struct ovsdb_datum want_datum;
566
567         die_if_error(ovsdb_datum_from_string(&want_datum, &column->type,
568                                              value_string, symtab));
569         retval = evaluate_relop(have_datum, &want_datum, &type, operator);
570         ovsdb_datum_destroy(&want_datum, &column->type);
571     }
572
573     free(key_string);
574     free(value_string);
575
576     return retval;
577 }
578
579 static void
580 invalidate_cache(struct ctl_context *ctx)
581 {
582     if (ctx->invalidate_cache) {
583         (ctx->invalidate_cache)(ctx);
584     }
585 }
586 \f
587 static void
588 pre_cmd_get(struct ctl_context *ctx)
589 {
590     const char *id = shash_find_data(&ctx->options, "--id");
591     const char *table_name = ctx->argv[1];
592     const struct ctl_table_class *table;
593     int i;
594
595     /* Using "get" without --id or a column name could possibly make sense.
596      * Maybe, for example, a *ctl command run wants to assert that a row
597      * exists.  But it is unlikely that an interactive user would want to do
598      * that, so issue a warning if we're running on a terminal. */
599     if (!id && ctx->argc <= 3 && isatty(STDOUT_FILENO)) {
600         VLOG_WARN("\"get\" command without row arguments or \"--id\" is "
601                   "possibly erroneous");
602     }
603
604     table = pre_get_table(ctx, table_name);
605     for (i = 3; i < ctx->argc; i++) {
606         if (!strcasecmp(ctx->argv[i], "_uuid")
607             || !strcasecmp(ctx->argv[i], "-uuid")) {
608             continue;
609         }
610
611         pre_parse_column_key_value(ctx, ctx->argv[i], table);
612     }
613 }
614
615 static void
616 cmd_get(struct ctl_context *ctx)
617 {
618     const char *id = shash_find_data(&ctx->options, "--id");
619     bool must_exist = !shash_find(&ctx->options, "--if-exists");
620     const char *table_name = ctx->argv[1];
621     const char *record_id = ctx->argv[2];
622     const struct ctl_table_class *table;
623     const struct ovsdb_idl_row *row;
624     struct ds *out = &ctx->output;
625     int i;
626
627     if (id && !must_exist) {
628         ctl_fatal("--if-exists and --id may not be specified together");
629     }
630
631     table = get_table(table_name);
632     row = get_row(ctx, table, record_id, must_exist);
633     if (!row) {
634         return;
635     }
636
637     if (id) {
638         struct ovsdb_symbol *symbol;
639         bool new;
640
641         symbol = create_symbol(ctx->symtab, id, &new);
642         if (!new) {
643             ctl_fatal("row id \"%s\" specified on \"get\" command was used "
644                       "before it was defined", id);
645         }
646         symbol->uuid = row->uuid;
647
648         /* This symbol refers to a row that already exists, so disable warnings
649          * about it being unreferenced. */
650         symbol->strong_ref = true;
651     }
652     for (i = 3; i < ctx->argc; i++) {
653         const struct ovsdb_idl_column *column;
654         const struct ovsdb_datum *datum;
655         char *key_string;
656
657         /* Special case for obtaining the UUID of a row.  We can't just do this
658          * through parse_column_key_value() below since it returns a "struct
659          * ovsdb_idl_column" and the UUID column doesn't have one. */
660         if (!strcasecmp(ctx->argv[i], "_uuid")
661             || !strcasecmp(ctx->argv[i], "-uuid")) {
662             ds_put_format(out, UUID_FMT"\n", UUID_ARGS(&row->uuid));
663             continue;
664         }
665
666         die_if_error(parse_column_key_value(ctx->argv[i], table,
667                                             &column, &key_string,
668                                             NULL, NULL, 0, NULL));
669
670         ovsdb_idl_txn_verify(row, column);
671         datum = ovsdb_idl_read(row, column);
672         if (key_string) {
673             union ovsdb_atom key;
674             unsigned int idx;
675
676             if (column->type.value.type == OVSDB_TYPE_VOID) {
677                 ctl_fatal("cannot specify key to get for non-map column %s",
678                           column->name);
679             }
680
681             die_if_error(ovsdb_atom_from_string(&key,
682                                                 &column->type.key,
683                                                 key_string, ctx->symtab));
684
685             idx = ovsdb_datum_find_key(datum, &key,
686                                        column->type.key.type);
687             if (idx == UINT_MAX) {
688                 if (must_exist) {
689                     ctl_fatal("no key \"%s\" in %s record \"%s\" column %s",
690                               key_string, table->class->name, record_id,
691                               column->name);
692                 }
693             } else {
694                 ovsdb_atom_to_string(&datum->values[idx],
695                                      column->type.value.type, out);
696             }
697             ovsdb_atom_destroy(&key, column->type.key.type);
698         } else {
699             ovsdb_datum_to_string(datum, &column->type, out);
700         }
701         ds_put_char(out, '\n');
702
703         free(key_string);
704     }
705 }
706
707 static void
708 parse_column_names(const char *column_names,
709                    const struct ctl_table_class *table,
710                    const struct ovsdb_idl_column ***columnsp,
711                    size_t *n_columnsp)
712 {
713     const struct ovsdb_idl_column **columns;
714     size_t n_columns;
715
716     if (!column_names) {
717         size_t i;
718
719         n_columns = table->class->n_columns + 1;
720         columns = xmalloc(n_columns * sizeof *columns);
721         columns[0] = NULL;
722         for (i = 0; i < table->class->n_columns; i++) {
723             columns[i + 1] = &table->class->columns[i];
724         }
725     } else {
726         char *s = xstrdup(column_names);
727         size_t allocated_columns;
728         char *save_ptr = NULL;
729         char *column_name;
730
731         columns = NULL;
732         allocated_columns = n_columns = 0;
733         for (column_name = strtok_r(s, ", ", &save_ptr); column_name;
734              column_name = strtok_r(NULL, ", ", &save_ptr)) {
735             const struct ovsdb_idl_column *column;
736
737             if (!strcasecmp(column_name, "_uuid")) {
738                 column = NULL;
739             } else {
740                 die_if_error(get_column(table, column_name, &column));
741             }
742             if (n_columns >= allocated_columns) {
743                 columns = x2nrealloc(columns, &allocated_columns,
744                                      sizeof *columns);
745             }
746             columns[n_columns++] = column;
747         }
748         free(s);
749
750         if (!n_columns) {
751             ctl_fatal("must specify at least one column name");
752         }
753     }
754     *columnsp = columns;
755     *n_columnsp = n_columns;
756 }
757
758 static void
759 pre_list_columns(struct ctl_context *ctx,
760                  const struct ctl_table_class *table,
761                  const char *column_names)
762 {
763     const struct ovsdb_idl_column **columns;
764     size_t n_columns;
765     size_t i;
766
767     parse_column_names(column_names, table, &columns, &n_columns);
768     for (i = 0; i < n_columns; i++) {
769         if (columns[i]) {
770             ovsdb_idl_add_column(ctx->idl, columns[i]);
771         }
772     }
773     free(columns);
774 }
775
776 static void
777 pre_cmd_list(struct ctl_context *ctx)
778 {
779     const char *column_names = shash_find_data(&ctx->options, "--columns");
780     const char *table_name = ctx->argv[1];
781     const struct ctl_table_class *table;
782
783     table = pre_get_table(ctx, table_name);
784     pre_list_columns(ctx, table, column_names);
785 }
786
787 static struct table *
788 list_make_table(const struct ovsdb_idl_column **columns, size_t n_columns)
789 {
790     struct table *out;
791     size_t i;
792
793     out = xmalloc(sizeof *out);
794     table_init(out);
795
796     for (i = 0; i < n_columns; i++) {
797         const struct ovsdb_idl_column *column = columns[i];
798         const char *column_name = column ? column->name : "_uuid";
799
800         table_add_column(out, "%s", column_name);
801     }
802
803     return out;
804 }
805
806 static void
807 list_record(const struct ovsdb_idl_row *row,
808             const struct ovsdb_idl_column **columns, size_t n_columns,
809             struct table *out)
810 {
811     size_t i;
812
813     if (!row) {
814         return;
815     }
816
817     table_add_row(out);
818     for (i = 0; i < n_columns; i++) {
819         const struct ovsdb_idl_column *column = columns[i];
820         struct cell *cell = table_add_cell(out);
821
822         if (!column) {
823             struct ovsdb_datum datum;
824             union ovsdb_atom atom;
825
826             atom.uuid = row->uuid;
827
828             datum.keys = &atom;
829             datum.values = NULL;
830             datum.n = 1;
831
832             cell->json = ovsdb_datum_to_json(&datum, &ovsdb_type_uuid);
833             cell->type = &ovsdb_type_uuid;
834         } else {
835             const struct ovsdb_datum *datum = ovsdb_idl_read(row, column);
836
837             cell->json = ovsdb_datum_to_json(datum, &column->type);
838             cell->type = &column->type;
839         }
840     }
841 }
842
843 static void
844 cmd_list(struct ctl_context *ctx)
845 {
846     const char *column_names = shash_find_data(&ctx->options, "--columns");
847     bool must_exist = !shash_find(&ctx->options, "--if-exists");
848     const struct ovsdb_idl_column **columns;
849     const char *table_name = ctx->argv[1];
850     const struct ctl_table_class *table;
851     struct table *out;
852     size_t n_columns;
853     int i;
854
855     table = get_table(table_name);
856     parse_column_names(column_names, table, &columns, &n_columns);
857     out = ctx->table = list_make_table(columns, n_columns);
858     if (ctx->argc > 2) {
859         for (i = 2; i < ctx->argc; i++) {
860             list_record(get_row(ctx, table, ctx->argv[i], must_exist),
861                         columns, n_columns, out);
862         }
863     } else {
864         const struct ovsdb_idl_row *row;
865
866         for (row = ovsdb_idl_first_row(ctx->idl, table->class); row != NULL;
867              row = ovsdb_idl_next_row(row)) {
868             list_record(row, columns, n_columns, out);
869         }
870     }
871     free(columns);
872 }
873
874 static void
875 pre_cmd_find(struct ctl_context *ctx)
876 {
877     const char *column_names = shash_find_data(&ctx->options, "--columns");
878     const char *table_name = ctx->argv[1];
879     const struct ctl_table_class *table;
880     int i;
881
882     table = pre_get_table(ctx, table_name);
883     pre_list_columns(ctx, table, column_names);
884     for (i = 2; i < ctx->argc; i++) {
885         pre_parse_column_key_value(ctx, ctx->argv[i], table);
886     }
887 }
888
889 static void
890 cmd_find(struct ctl_context *ctx)
891 {
892     const char *column_names = shash_find_data(&ctx->options, "--columns");
893     const struct ovsdb_idl_column **columns;
894     const char *table_name = ctx->argv[1];
895     const struct ctl_table_class *table;
896     const struct ovsdb_idl_row *row;
897     struct table *out;
898     size_t n_columns;
899
900     table = get_table(table_name);
901     parse_column_names(column_names, table, &columns, &n_columns);
902     out = ctx->table = list_make_table(columns, n_columns);
903     for (row = ovsdb_idl_first_row(ctx->idl, table->class); row;
904          row = ovsdb_idl_next_row(row)) {
905         int i;
906
907         for (i = 2; i < ctx->argc; i++) {
908             if (!is_condition_satisfied(table, row, ctx->argv[i],
909                                         ctx->symtab)) {
910                 goto next_row;
911             }
912         }
913         list_record(row, columns, n_columns, out);
914
915     next_row: ;
916     }
917     free(columns);
918 }
919
920 static void
921 pre_cmd_set(struct ctl_context *ctx)
922 {
923     const char *table_name = ctx->argv[1];
924     const struct ctl_table_class *table;
925     int i;
926
927     table = pre_get_table(ctx, table_name);
928     for (i = 3; i < ctx->argc; i++) {
929         pre_parse_column_key_value(ctx, ctx->argv[i], table);
930     }
931 }
932
933 static void
934 cmd_set(struct ctl_context *ctx)
935 {
936     bool must_exist = !shash_find(&ctx->options, "--if-exists");
937     const char *table_name = ctx->argv[1];
938     const char *record_id = ctx->argv[2];
939     const struct ctl_table_class*table;
940     const struct ovsdb_idl_row *row;
941     int i;
942
943     table = get_table(table_name);
944     row = get_row(ctx, table, record_id, must_exist);
945     if (!row) {
946         return;
947     }
948
949     for (i = 3; i < ctx->argc; i++) {
950         set_column(table, row, ctx->argv[i], ctx->symtab);
951     }
952
953     invalidate_cache(ctx);
954 }
955
956 static void
957 pre_cmd_add(struct ctl_context *ctx)
958 {
959     const char *table_name = ctx->argv[1];
960     const char *column_name = ctx->argv[3];
961     const struct ctl_table_class *table;
962     const struct ovsdb_idl_column *column;
963
964     table = pre_get_table(ctx, table_name);
965     pre_get_column(ctx, table, column_name, &column);
966 }
967
968 static void
969 cmd_add(struct ctl_context *ctx)
970 {
971     bool must_exist = !shash_find(&ctx->options, "--if-exists");
972     const char *table_name = ctx->argv[1];
973     const char *record_id = ctx->argv[2];
974     const char *column_name = ctx->argv[3];
975     const struct ctl_table_class *table;
976     const struct ovsdb_idl_column *column;
977     const struct ovsdb_idl_row *row;
978     const struct ovsdb_type *type;
979     struct ovsdb_datum old;
980     int i;
981
982     table = get_table(table_name);
983     die_if_error(get_column(table, column_name, &column));
984     row = get_row(ctx, table, record_id, must_exist);
985     if (!row) {
986         return;
987     }
988     check_mutable(row, column);
989
990     type = &column->type;
991     ovsdb_datum_clone(&old, ovsdb_idl_read(row, column), &column->type);
992     for (i = 4; i < ctx->argc; i++) {
993         struct ovsdb_type add_type;
994         struct ovsdb_datum add;
995
996         add_type = *type;
997         add_type.n_min = 1;
998         add_type.n_max = UINT_MAX;
999         die_if_error(ovsdb_datum_from_string(&add, &add_type, ctx->argv[i],
1000                                              ctx->symtab));
1001         ovsdb_datum_union(&old, &add, type, false);
1002         ovsdb_datum_destroy(&add, type);
1003     }
1004     if (old.n > type->n_max) {
1005         ctl_fatal("\"add\" operation would put %u %s in column %s of "
1006                   "table %s but the maximum number is %u",
1007                   old.n,
1008                   type->value.type == OVSDB_TYPE_VOID ? "values" : "pairs",
1009                   column->name, table->class->name, type->n_max);
1010     }
1011     ovsdb_idl_txn_verify(row, column);
1012     ovsdb_idl_txn_write(row, column, &old);
1013
1014     invalidate_cache(ctx);
1015 }
1016
1017 static void
1018 pre_cmd_remove(struct ctl_context *ctx)
1019 {
1020     const char *table_name = ctx->argv[1];
1021     const char *column_name = ctx->argv[3];
1022     const struct ctl_table_class *table;
1023     const struct ovsdb_idl_column *column;
1024
1025     table = pre_get_table(ctx, table_name);
1026     pre_get_column(ctx, table, column_name, &column);
1027 }
1028
1029 static void
1030 cmd_remove(struct ctl_context *ctx)
1031 {
1032     bool must_exist = !shash_find(&ctx->options, "--if-exists");
1033     const char *table_name = ctx->argv[1];
1034     const char *record_id = ctx->argv[2];
1035     const char *column_name = ctx->argv[3];
1036     const struct ctl_table_class *table;
1037     const struct ovsdb_idl_column *column;
1038     const struct ovsdb_idl_row *row;
1039     const struct ovsdb_type *type;
1040     struct ovsdb_datum old;
1041     int i;
1042
1043     table = get_table(table_name);
1044     die_if_error(get_column(table, column_name, &column));
1045     row = get_row(ctx, table, record_id, must_exist);
1046     if (!row) {
1047         return;
1048     }
1049     check_mutable(row, column);
1050
1051     type = &column->type;
1052     ovsdb_datum_clone(&old, ovsdb_idl_read(row, column), &column->type);
1053     for (i = 4; i < ctx->argc; i++) {
1054         struct ovsdb_type rm_type;
1055         struct ovsdb_datum rm;
1056         char *error;
1057
1058         rm_type = *type;
1059         rm_type.n_min = 1;
1060         rm_type.n_max = UINT_MAX;
1061         error = ovsdb_datum_from_string(&rm, &rm_type,
1062                                         ctx->argv[i], ctx->symtab);
1063
1064         if (error) {
1065             if (ovsdb_type_is_map(&rm_type)) {
1066                 rm_type.value.type = OVSDB_TYPE_VOID;
1067                 free(error);
1068                 die_if_error(ovsdb_datum_from_string(
1069                                                      &rm, &rm_type, ctx->argv[i], ctx->symtab));
1070             } else {
1071                 ctl_fatal("%s", error);
1072             }
1073         }
1074         ovsdb_datum_subtract(&old, type, &rm, &rm_type);
1075         ovsdb_datum_destroy(&rm, &rm_type);
1076     }
1077     if (old.n < type->n_min) {
1078         ctl_fatal("\"remove\" operation would put %u %s in column %s of "
1079                   "table %s but the minimum number is %u",
1080                   old.n,
1081                   type->value.type == OVSDB_TYPE_VOID ? "values" : "pairs",
1082                   column->name, table->class->name, type->n_min);
1083     }
1084     ovsdb_idl_txn_verify(row, column);
1085     ovsdb_idl_txn_write(row, column, &old);
1086
1087     invalidate_cache(ctx);
1088 }
1089
1090 static void
1091 pre_cmd_clear(struct ctl_context *ctx)
1092 {
1093     const char *table_name = ctx->argv[1];
1094     const struct ctl_table_class *table;
1095     int i;
1096
1097     table = pre_get_table(ctx, table_name);
1098     for (i = 3; i < ctx->argc; i++) {
1099         const struct ovsdb_idl_column *column;
1100
1101         pre_get_column(ctx, table, ctx->argv[i], &column);
1102     }
1103 }
1104
1105 static void
1106 cmd_clear(struct ctl_context *ctx)
1107 {
1108     bool must_exist = !shash_find(&ctx->options, "--if-exists");
1109     const char *table_name = ctx->argv[1];
1110     const char *record_id = ctx->argv[2];
1111     const struct ctl_table_class *table;
1112     const struct ovsdb_idl_row *row;
1113     int i;
1114
1115     table = get_table(table_name);
1116     row = get_row(ctx, table, record_id, must_exist);
1117     if (!row) {
1118         return;
1119     }
1120
1121     for (i = 3; i < ctx->argc; i++) {
1122         const struct ovsdb_idl_column *column;
1123         const struct ovsdb_type *type;
1124         struct ovsdb_datum datum;
1125
1126         die_if_error(get_column(table, ctx->argv[i], &column));
1127         check_mutable(row, column);
1128
1129         type = &column->type;
1130         if (type->n_min > 0) {
1131             ctl_fatal("\"clear\" operation cannot be applied to column %s "
1132                       "of table %s, which is not allowed to be empty",
1133                       column->name, table->class->name);
1134         }
1135
1136         ovsdb_datum_init_empty(&datum);
1137         ovsdb_idl_txn_write(row, column, &datum);
1138     }
1139
1140     invalidate_cache(ctx);
1141 }
1142
1143 static void
1144 pre_create(struct ctl_context *ctx)
1145 {
1146     const char *id = shash_find_data(&ctx->options, "--id");
1147     const char *table_name = ctx->argv[1];
1148     const struct ctl_table_class *table;
1149
1150     table = get_table(table_name);
1151     if (!id && !table->class->is_root) {
1152         VLOG_WARN("applying \"create\" command to table %s without --id "
1153                   "option will have no effect", table->class->name);
1154     }
1155 }
1156
1157 static void
1158 cmd_create(struct ctl_context *ctx)
1159 {
1160     const char *id = shash_find_data(&ctx->options, "--id");
1161     const char *table_name = ctx->argv[1];
1162     const struct ctl_table_class *table = get_table(table_name);
1163     const struct ovsdb_idl_row *row;
1164     const struct uuid *uuid;
1165     int i;
1166
1167     if (id) {
1168         struct ovsdb_symbol *symbol = create_symbol(ctx->symtab, id, NULL);
1169         if (table->class->is_root) {
1170             /* This table is in the root set, meaning that rows created in it
1171              * won't disappear even if they are unreferenced, so disable
1172              * warnings about that by pretending that there is a reference. */
1173             symbol->strong_ref = true;
1174         }
1175         uuid = &symbol->uuid;
1176     } else {
1177         uuid = NULL;
1178     }
1179
1180     row = ovsdb_idl_txn_insert(ctx->txn, table->class, uuid);
1181     for (i = 2; i < ctx->argc; i++) {
1182         set_column(table, row, ctx->argv[i], ctx->symtab);
1183     }
1184     ds_put_format(&ctx->output, UUID_FMT, UUID_ARGS(&row->uuid));
1185 }
1186
1187 /* This function may be used as the 'postprocess' function for commands that
1188  * insert new rows into the database.  It expects that the command's 'run'
1189  * function prints the UUID reported by ovsdb_idl_txn_insert() as the command's
1190  * sole output.  It replaces that output by the row's permanent UUID assigned
1191  * by the database server and appends a new-line.
1192  *
1193  * Currently we use this only for "create", because the higher-level commands
1194  * are supposed to be independent of the actual structure of the vswitch
1195  * configuration. */
1196 static void
1197 post_create(struct ctl_context *ctx)
1198 {
1199     const struct uuid *real;
1200     struct uuid dummy;
1201
1202     if (!uuid_from_string(&dummy, ds_cstr(&ctx->output))) {
1203         OVS_NOT_REACHED();
1204     }
1205     real = ovsdb_idl_txn_get_insert_uuid(ctx->txn, &dummy);
1206     if (real) {
1207         ds_clear(&ctx->output);
1208         ds_put_format(&ctx->output, UUID_FMT, UUID_ARGS(real));
1209     }
1210     ds_put_char(&ctx->output, '\n');
1211 }
1212
1213 static void
1214 pre_cmd_destroy(struct ctl_context *ctx)
1215 {
1216     const char *table_name = ctx->argv[1];
1217
1218     pre_get_table(ctx, table_name);
1219 }
1220
1221 static void
1222 cmd_destroy(struct ctl_context *ctx)
1223 {
1224     bool must_exist = !shash_find(&ctx->options, "--if-exists");
1225     bool delete_all = shash_find(&ctx->options, "--all");
1226     const char *table_name = ctx->argv[1];
1227     const struct ctl_table_class *table;
1228     int i;
1229
1230     table = get_table(table_name);
1231
1232     if (delete_all && ctx->argc > 2) {
1233         ctl_fatal("--all and records argument should not be specified together");
1234     }
1235
1236     if (delete_all && !must_exist) {
1237         ctl_fatal("--all and --if-exists should not be specified together");
1238     }
1239
1240     if (delete_all) {
1241         const struct ovsdb_idl_row *row;
1242         const struct ovsdb_idl_row *next_row;
1243
1244         for (row = ovsdb_idl_first_row(ctx->idl, table->class);
1245              row;) {
1246             next_row = ovsdb_idl_next_row(row);
1247             ovsdb_idl_txn_delete(row);
1248             row = next_row;
1249         }
1250     } else {
1251         for (i = 2; i < ctx->argc; i++) {
1252             const struct ovsdb_idl_row *row;
1253
1254             row = get_row(ctx, table, ctx->argv[i], must_exist);
1255             if (row) {
1256                 ovsdb_idl_txn_delete(row);
1257             }
1258         }
1259     }
1260     invalidate_cache(ctx);
1261 }
1262
1263 static void
1264 pre_cmd_wait_until(struct ctl_context *ctx)
1265 {
1266     const char *table_name = ctx->argv[1];
1267     const struct ctl_table_class *table;
1268     int i;
1269
1270     table = pre_get_table(ctx, table_name);
1271
1272     for (i = 3; i < ctx->argc; i++) {
1273         pre_parse_column_key_value(ctx, ctx->argv[i], table);
1274     }
1275 }
1276
1277 static void
1278 cmd_wait_until(struct ctl_context *ctx)
1279 {
1280     const char *table_name = ctx->argv[1];
1281     const char *record_id = ctx->argv[2];
1282     const struct ctl_table_class *table;
1283     const struct ovsdb_idl_row *row;
1284     int i;
1285
1286     table = get_table(table_name);
1287
1288     row = get_row(ctx, table, record_id, false);
1289     if (!row) {
1290         ctx->try_again = true;
1291         return;
1292     }
1293
1294     for (i = 3; i < ctx->argc; i++) {
1295         if (!is_condition_satisfied(table, row, ctx->argv[i], ctx->symtab)) {
1296             ctx->try_again = true;
1297             return;
1298         }
1299     }
1300 }
1301
1302 /* Parses one command. */
1303 static void
1304 parse_command(int argc, char *argv[], struct shash *local_options,
1305               struct ctl_command *command)
1306 {
1307     const struct ctl_command_syntax *p;
1308     struct shash_node *node;
1309     int n_arg;
1310     int i;
1311
1312     shash_init(&command->options);
1313     shash_swap(local_options, &command->options);
1314     for (i = 0; i < argc; i++) {
1315         const char *option = argv[i];
1316         const char *equals;
1317         char *key, *value;
1318
1319         if (option[0] != '-') {
1320             break;
1321         }
1322
1323         equals = strchr(option, '=');
1324         if (equals) {
1325             key = xmemdup0(option, equals - option);
1326             value = xstrdup(equals + 1);
1327         } else {
1328             key = xstrdup(option);
1329             value = NULL;
1330         }
1331
1332         if (shash_find(&command->options, key)) {
1333             ctl_fatal("'%s' option specified multiple times", argv[i]);
1334         }
1335         shash_add_nocopy(&command->options, key, value);
1336     }
1337     if (i == argc) {
1338         ctl_fatal("missing command name (use --help for help)");
1339     }
1340
1341     p = shash_find_data(&all_commands, argv[i]);
1342     if (!p) {
1343         ctl_fatal("unknown command '%s'; use --help for help", argv[i]);
1344     }
1345
1346     SHASH_FOR_EACH (node, &command->options) {
1347         const char *s = strstr(p->options, node->name);
1348         int end = s ? s[strlen(node->name)] : EOF;
1349
1350         if (end != '=' && end != ',' && end != ' ' && end != '\0') {
1351             ctl_fatal("'%s' command has no '%s' option",
1352                       argv[i], node->name);
1353         }
1354         if ((end == '=') != (node->data != NULL)) {
1355             if (end == '=') {
1356                 ctl_fatal("missing argument to '%s' option on '%s' "
1357                           "command", node->name, argv[i]);
1358             } else {
1359                 ctl_fatal("'%s' option on '%s' does not accept an "
1360                           "argument", node->name, argv[i]);
1361             }
1362         }
1363     }
1364
1365     n_arg = argc - i - 1;
1366     if (n_arg < p->min_args) {
1367         ctl_fatal("'%s' command requires at least %d arguments",
1368                   p->name, p->min_args);
1369     } else if (n_arg > p->max_args) {
1370         int j;
1371
1372         for (j = i + 1; j < argc; j++) {
1373             if (argv[j][0] == '-') {
1374                 ctl_fatal("'%s' command takes at most %d arguments "
1375                           "(note that options must precede command "
1376                           "names and follow a \"--\" argument)",
1377                           p->name, p->max_args);
1378             }
1379         }
1380
1381         ctl_fatal("'%s' command takes at most %d arguments",
1382                   p->name, p->max_args);
1383     }
1384
1385     command->syntax = p;
1386     command->argc = n_arg + 1;
1387     command->argv = &argv[i];
1388 }
1389
1390 \f
1391 /* Parses command-line input for commands. */
1392 struct ctl_command *
1393 ctl_parse_commands(int argc, char *argv[], struct shash *local_options,
1394                    size_t *n_commandsp)
1395 {
1396     struct ctl_command *commands;
1397     size_t n_commands, allocated_commands;
1398     int i, start;
1399
1400     commands = NULL;
1401     n_commands = allocated_commands = 0;
1402
1403     for (start = i = 0; i <= argc; i++) {
1404         if (i == argc || !strcmp(argv[i], "--")) {
1405             if (i > start) {
1406                 if (n_commands >= allocated_commands) {
1407                     struct ctl_command *c;
1408
1409                     commands = x2nrealloc(commands, &allocated_commands,
1410                                           sizeof *commands);
1411                     for (c = commands; c < &commands[n_commands]; c++) {
1412                         shash_moved(&c->options);
1413                     }
1414                 }
1415                 parse_command(i - start, &argv[start], local_options,
1416                               &commands[n_commands++]);
1417             } else if (!shash_is_empty(local_options)) {
1418                 ctl_fatal("missing command name (use --help for help)");
1419             }
1420             start = i + 1;
1421         }
1422     }
1423     if (!n_commands) {
1424         ctl_fatal("missing command name (use --help for help)");
1425     }
1426     *n_commandsp = n_commands;
1427     return commands;
1428 }
1429
1430 /* Returns true if it looks like this set of arguments might modify the
1431  * database, otherwise false.  (Not very smart, so it's prone to false
1432  * positives.) */
1433 bool
1434 ctl_might_write_to_db(char **argv)
1435 {
1436     for (; *argv; argv++) {
1437         const struct ctl_command_syntax *p = shash_find_data(&all_commands, *argv);
1438         if (p && p->mode == RW) {
1439             return true;
1440         }
1441     }
1442     return false;
1443 }
1444
1445 void
1446 ctl_fatal(const char *format, ...)
1447 {
1448     char *message;
1449     va_list args;
1450
1451     va_start(args, format);
1452     message = xvasprintf(format, args);
1453     va_end(args);
1454
1455     vlog_set_levels(&VLM_db_ctl_base, VLF_CONSOLE, VLL_OFF);
1456     VLOG_ERR("%s", message);
1457     ovs_error(0, "%s", message);
1458     ctl_exit(EXIT_FAILURE);
1459 }
1460
1461 /* Frees the current transaction and the underlying IDL and then calls
1462  * exit(status).
1463  *
1464  * Freeing the transaction and the IDL is not strictly necessary, but it makes
1465  * for a clean memory leak report from valgrind in the normal case.  That makes
1466  * it easier to notice real memory leaks. */
1467 void
1468 ctl_exit(int status)
1469 {
1470     if (the_idl_txn) {
1471         ovsdb_idl_txn_abort(the_idl_txn);
1472         ovsdb_idl_txn_destroy(the_idl_txn);
1473     }
1474     ovsdb_idl_destroy(the_idl);
1475     exit(status);
1476 }
1477
1478 /* Comman database commands to be registered. */
1479 static const struct ctl_command_syntax db_ctl_commands[] = {
1480     {"comment", 0, INT_MAX, "[ARG]...", NULL, NULL, NULL, "", RO},
1481     {"get", 2, INT_MAX, "TABLE RECORD [COLUMN[:KEY]]...",pre_cmd_get, cmd_get,
1482      NULL, "--if-exists,--id=", RO},
1483     {"list", 1, INT_MAX, "TABLE [RECORD]...", pre_cmd_list, cmd_list, NULL,
1484      "--if-exists,--columns=", RO},
1485     {"find", 1, INT_MAX, "TABLE [COLUMN[:KEY]=VALUE]...", pre_cmd_find,
1486      cmd_find, NULL, "--columns=", RO},
1487     {"set", 3, INT_MAX, "TABLE RECORD COLUMN[:KEY]=VALUE...", pre_cmd_set,
1488      cmd_set, NULL, "--if-exists", RW},
1489     {"add", 4, INT_MAX, "TABLE RECORD COLUMN [KEY=]VALUE...", pre_cmd_add,
1490      cmd_add, NULL, "--if-exists", RW},
1491     {"remove", 4, INT_MAX, "TABLE RECORD COLUMN KEY|VALUE|KEY=VALUE...",
1492      pre_cmd_remove, cmd_remove, NULL, "--if-exists", RW},
1493     {"clear", 3, INT_MAX, "TABLE RECORD COLUMN...", pre_cmd_clear, cmd_clear,
1494      NULL, "--if-exists", RW},
1495     {"create", 2, INT_MAX, "TABLE COLUMN[:KEY]=VALUE...", pre_create,
1496      cmd_create, post_create, "--id=", RW},
1497     {"destroy", 1, INT_MAX, "TABLE [RECORD]...", pre_cmd_destroy, cmd_destroy,
1498      NULL, "--if-exists,--all", RW},
1499     {"wait-until", 2, INT_MAX, "TABLE RECORD [COLUMN[:KEY]=VALUE]...",
1500      pre_cmd_wait_until, cmd_wait_until, NULL, "", RO},
1501     {NULL, 0, 0, NULL, NULL, NULL, NULL, NULL, RO},
1502 };
1503
1504 /* Registers commands represented by 'struct ctl_command_syntax's to
1505  * 'all_commands'.  The last element of 'commands' must be an all-NULL
1506  * element. */
1507 void
1508 ctl_register_commands(const struct ctl_command_syntax *commands)
1509 {
1510     const struct ctl_command_syntax *p;
1511
1512     for (p = commands; p->name; p++) {
1513         shash_add_assert(&all_commands, p->name, p);
1514     }
1515 }
1516
1517 /* Registers the 'db_ctl_commands' to 'all_commands'. */
1518 void
1519 ctl_init(void)
1520 {
1521     ctl_register_commands(db_ctl_commands);
1522 }
1523
1524 /* Returns 'all_commands'. */
1525 const struct shash *
1526 ctl_get_all_commands(void)
1527 {
1528     return &all_commands;
1529 }
1530
1531 /* Returns the text for the database commands usage.  */
1532 const char *
1533 ctl_get_db_cmd_usage(void)
1534 {
1535     return "Database commands:\n\
1536   list TBL [REC]              list RECord (or all records) in TBL\n\
1537   find TBL CONDITION...       list records satisfying CONDITION in TBL\n\
1538   get TBL REC COL[:KEY]       print values of COLumns in RECord in TBL\n\
1539   set TBL REC COL[:KEY]=VALUE set COLumn values in RECord in TBL\n\
1540   add TBL REC COL [KEY=]VALUE add (KEY=)VALUE to COLumn in RECord in TBL\n\
1541   remove TBL REC COL [KEY=]VALUE  remove (KEY=)VALUE from COLumn\n\
1542   clear TBL REC COL           clear values from COLumn in RECord in TBL\n\
1543   create TBL COL[:KEY]=VALUE  create and initialize new record\n\
1544   destroy TBL REC             delete RECord from TBL\n\
1545   wait-until TBL REC [COL[:KEY]=VALUE]  wait until condition is true\n\
1546 Potentially unsafe database commands require --force option.\n";
1547 }
1548
1549 /* Initializes 'ctx' from 'command'. */
1550 void
1551 ctl_context_init_command(struct ctl_context *ctx,
1552                          struct ctl_command *command)
1553 {
1554     ctx->argc = command->argc;
1555     ctx->argv = command->argv;
1556     ctx->options = command->options;
1557
1558     ds_swap(&ctx->output, &command->output);
1559     ctx->table = command->table;
1560     ctx->try_again = false;
1561 }
1562
1563 /* Initializes the entire 'ctx'. */
1564 void
1565 ctl_context_init(struct ctl_context *ctx, struct ctl_command *command,
1566                  struct ovsdb_idl *idl, struct ovsdb_idl_txn *txn,
1567                  struct ovsdb_symbol_table *symtab,
1568                  void (*invalidate_cache)(struct ctl_context *))
1569 {
1570     if (command) {
1571         ctl_context_init_command(ctx, command);
1572     }
1573     ctx->idl = idl;
1574     ctx->txn = txn;
1575     ctx->symtab = symtab;
1576     ctx->invalidate_cache = invalidate_cache;
1577 }
1578
1579 /* Completes processing of 'command' within 'ctx'. */
1580 void
1581 ctl_context_done_command(struct ctl_context *ctx,
1582                          struct ctl_command *command)
1583 {
1584     ds_swap(&ctx->output, &command->output);
1585     command->table = ctx->table;
1586 }
1587
1588 /* Finishes up with 'ctx'.
1589  *
1590  * If command is nonnull, first calls ctl_context_done_command() to complete
1591  * processing that command within 'ctx'. */
1592 void
1593 ctl_context_done(struct ctl_context *ctx,
1594                  struct ctl_command *command)
1595 {
1596     if (command) {
1597         ctl_context_done_command(ctx, command);
1598     }
1599     invalidate_cache(ctx);
1600 }
1601
1602 /* Finds and returns the "struct ctl_table_class *" with 'table_name' by
1603  * searching the 'tables'. */
1604 const struct ctl_table_class *
1605 get_table(const char *table_name)
1606 {
1607     const struct ctl_table_class *table;
1608     const struct ctl_table_class *best_match = NULL;
1609     unsigned int best_score = 0;
1610
1611     for (table = tables; table->class; table++) {
1612         unsigned int score = score_partial_match(table->class->name,
1613                                                  table_name);
1614         if (score > best_score) {
1615             best_match = table;
1616             best_score = score;
1617         } else if (score == best_score) {
1618             best_match = NULL;
1619         }
1620     }
1621     if (best_match) {
1622         return best_match;
1623     } else if (best_score) {
1624         ctl_fatal("multiple table names match \"%s\"", table_name);
1625     } else {
1626         ctl_fatal("unknown table \"%s\"", table_name);
1627     }
1628     return NULL;
1629 }
1630
1631 /* Sets the column of 'row' in 'table'. */
1632 void
1633 set_column(const struct ctl_table_class *table,
1634            const struct ovsdb_idl_row *row, const char *arg,
1635            struct ovsdb_symbol_table *symtab)
1636 {
1637     const struct ovsdb_idl_column *column;
1638     char *key_string, *value_string;
1639     char *error;
1640
1641     error = parse_column_key_value(arg, table, &column, &key_string,
1642                                    NULL, NULL, 0, &value_string);
1643     die_if_error(error);
1644     if (!value_string) {
1645         ctl_fatal("%s: missing value", arg);
1646     }
1647     check_mutable(row, column);
1648
1649     if (key_string) {
1650         union ovsdb_atom key, value;
1651         struct ovsdb_datum datum;
1652
1653         if (column->type.value.type == OVSDB_TYPE_VOID) {
1654             ctl_fatal("cannot specify key to set for non-map column %s",
1655                       column->name);
1656         }
1657
1658         die_if_error(ovsdb_atom_from_string(&key, &column->type.key,
1659                                             key_string, symtab));
1660         die_if_error(ovsdb_atom_from_string(&value, &column->type.value,
1661                                             value_string, symtab));
1662
1663         ovsdb_datum_init_empty(&datum);
1664         ovsdb_datum_add_unsafe(&datum, &key, &value, &column->type);
1665
1666         ovsdb_atom_destroy(&key, column->type.key.type);
1667         ovsdb_atom_destroy(&value, column->type.value.type);
1668
1669         ovsdb_datum_union(&datum, ovsdb_idl_read(row, column),
1670                           &column->type, false);
1671         ovsdb_idl_txn_verify(row, column);
1672         ovsdb_idl_txn_write(row, column, &datum);
1673     } else {
1674         struct ovsdb_datum datum;
1675
1676         die_if_error(ovsdb_datum_from_string(&datum, &column->type,
1677                                              value_string, symtab));
1678         ovsdb_idl_txn_write(row, column, &datum);
1679     }
1680
1681     free(key_string);
1682     free(value_string);
1683 }