json: Move from lib to include/openvswitch.
[cascardo/ovs.git] / lib / db-ctl-base.c
1 /*
2  * Copyright (c) 2015, 2016 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include <ctype.h>
20 #include <getopt.h>
21 #include <unistd.h>
22
23 #include "db-ctl-base.h"
24
25 #include "command-line.h"
26 #include "compiler.h"
27 #include "dirs.h"
28 #include "openvswitch/dynamic-string.h"
29 #include "fatal-signal.h"
30 #include "hash.h"
31 #include "openvswitch/json.h"
32 #include "openvswitch/vlog.h"
33 #include "ovsdb-data.h"
34 #include "ovsdb-idl.h"
35 #include "ovsdb-idl-provider.h"
36 #include "openvswitch/shash.h"
37 #include "sset.h"
38 #include "string.h"
39 #include "table.h"
40 #include "util.h"
41
42 VLOG_DEFINE_THIS_MODULE(db_ctl_base);
43
44 /* This array defines the 'show' command output format.  User can check the
45  * definition in utilities/ovs-vsctl.c as reference.
46  *
47  * Particularly, if an element in 'columns[]' represents a reference to
48  * another table, the referred table must also be defined as an entry in
49  * in 'cmd_show_tables[]'.
50  *
51  * The definition must end with an all-NULL entry.  It is initalized once
52  * when ctl_init() is called.
53  *
54  * */
55 static const struct cmd_show_table *cmd_show_tables;
56
57 /* ctl_exit() is called by ctl_fatal(). User can optionally supply an exit
58  * function ctl_exit_func() via ctl_init. If supplied, this function will
59  * be called by ctl_exit()
60  */
61 static void (*ctl_exit_func)(int status) = NULL;
62 OVS_NO_RETURN static void ctl_exit(int status);
63
64 /* Represents all tables in the schema.  User must define 'tables'
65  * in implementation and supply via ctl_init().  The definition must end
66  * with an all-NULL entry. */
67 static const struct ctl_table_class *tables;
68
69 static struct shash all_commands = SHASH_INITIALIZER(&all_commands);
70 static const struct ctl_table_class *get_table(const char *table_name);
71 static void set_column(const struct ctl_table_class *,
72                        const struct ovsdb_idl_row *, const char *,
73                        struct ovsdb_symbol_table *);
74
75 \f
76 static struct option *
77 find_option(const char *name, struct option *options, size_t n_options)
78 {
79     size_t i;
80
81     for (i = 0; i < n_options; i++) {
82         if (!strcmp(options[i].name, name)) {
83             return &options[i];
84         }
85     }
86     return NULL;
87 }
88
89 static struct option *
90 add_option(struct option **optionsp, size_t *n_optionsp,
91            size_t *allocated_optionsp)
92 {
93     if (*n_optionsp >= *allocated_optionsp) {
94         *optionsp = x2nrealloc(*optionsp, allocated_optionsp,
95                                sizeof **optionsp);
96     }
97     return &(*optionsp)[(*n_optionsp)++];
98 }
99
100 /* Converts the command arguments into format that can be parsed by
101  * bash completion script.
102  *
103  * Therein, arguments will be attached with following prefixes:
104  *
105  *    !argument :: The argument is required
106  *    ?argument :: The argument is optional
107  *    *argument :: The argument may appear any number (0 or more) times
108  *    +argument :: The argument may appear one or more times
109  *
110  */
111 static void
112 print_command_arguments(const struct ctl_command_syntax *command)
113 {
114     /*
115      * The argument string is parsed in reverse.  We use a stack 'oew_stack' to
116      * keep track of nested optionals.  Whenever a ']' is encountered, we push
117      * a bit to 'oew_stack'.  The bit is set to 1 if the ']' is not nested.
118      * Subsequently, we pop an entry everytime '[' is met.
119      *
120      * We use 'whole_word_is_optional' value to decide whether or not a ! or +
121      * should be added on encountering a space: if the optional surrounds the
122      * whole word then it shouldn't be, but if it is only a part of the word
123      * (i.e. [key=]value), it should be.
124      */
125     uint32_t oew_stack = 0;
126
127     const char *arguments = command->arguments;
128     int length = strlen(arguments);
129     if (!length) {
130         return;
131     }
132
133     /* Output buffer, written backward from end. */
134     char *output = xmalloc(2 * length);
135     char *outp = output + 2 * length;
136     *--outp = '\0';
137
138     bool in_repeated = false;
139     bool whole_word_is_optional = false;
140
141     for (const char *inp = arguments + length; inp > arguments; ) {
142         switch (*--inp) {
143         case ']':
144             oew_stack <<= 1;
145             if (inp[1] == '\0' || inp[1] == ' ' || inp[1] == '.') {
146                 oew_stack |= 1;
147             }
148             break;
149         case '[':
150             /* Checks if the whole word is optional, and sets the
151              * 'whole_word_is_optional' accordingly. */
152             if ((inp == arguments || inp[-1] == ' ') && oew_stack & 1) {
153                 *--outp = in_repeated ? '*' : '?';
154                 whole_word_is_optional = true;
155             } else {
156                 *--outp = '?';
157                 whole_word_is_optional = false;
158             }
159             oew_stack >>= 1;
160             break;
161         case ' ':
162             if (!whole_word_is_optional) {
163                 *--outp = in_repeated ? '+' : '!';
164             }
165             *--outp = ' ';
166             in_repeated = false;
167             whole_word_is_optional = false;
168             break;
169         case '.':
170             in_repeated = true;
171             break;
172         default:
173             *--outp = *inp;
174             break;
175         }
176     }
177     if (arguments[0] != '[' && outp != output + 2 * length - 1) {
178         *--outp = in_repeated ? '+' : '!';
179     }
180     printf("%s", outp);
181     free(output);
182 }
183
184 static void
185 die_if_error(char *error)
186 {
187     if (error) {
188         ctl_fatal("%s", error);
189     }
190 }
191
192 static int
193 to_lower_and_underscores(unsigned c)
194 {
195     return c == '-' ? '_' : tolower(c);
196 }
197
198 static unsigned int
199 score_partial_match(const char *name, const char *s)
200 {
201     int score;
202
203     if (!strcmp(name, s)) {
204         return UINT_MAX;
205     }
206     for (score = 0; ; score++, name++, s++) {
207         if (to_lower_and_underscores(*name) != to_lower_and_underscores(*s)) {
208             break;
209         } else if (*name == '\0') {
210             return UINT_MAX - 1;
211         }
212     }
213     return *s == '\0' ? score : 0;
214 }
215
216 static struct ovsdb_symbol *
217 create_symbol(struct ovsdb_symbol_table *symtab, const char *id, bool *newp)
218 {
219     struct ovsdb_symbol *symbol;
220
221     if (id[0] != '@') {
222         ctl_fatal("row id \"%s\" does not begin with \"@\"", id);
223     }
224
225     if (newp) {
226         *newp = ovsdb_symbol_table_get(symtab, id) == NULL;
227     }
228
229     symbol = ovsdb_symbol_table_insert(symtab, id);
230     if (symbol->created) {
231         ctl_fatal("row id \"%s\" may only be specified on one --id option",
232                   id);
233     }
234     symbol->created = true;
235     return symbol;
236 }
237
238 static const struct ovsdb_idl_row *
239 get_row_by_id(struct ctl_context *ctx, const struct ctl_table_class *table,
240               const struct ctl_row_id *id, const char *record_id)
241 {
242     const struct ovsdb_idl_row *referrer, *final;
243
244     if (!id->table) {
245         return NULL;
246     }
247
248     if (!id->name_column) {
249         if (strcmp(record_id, ".")) {
250             return NULL;
251         }
252         referrer = ovsdb_idl_first_row(ctx->idl, id->table);
253         if (!referrer || ovsdb_idl_next_row(referrer)) {
254             return NULL;
255         }
256     } else {
257         referrer = NULL;
258
259         ovs_assert(id->name_column->type.value.type == OVSDB_TYPE_VOID);
260
261         enum ovsdb_atomic_type key = id->name_column->type.key.type;
262         if (key == OVSDB_TYPE_INTEGER) {
263             if (!record_id[0] || record_id[strspn(record_id, "0123456789")]) {
264                 return NULL;
265             }
266         } else {
267             ovs_assert(key == OVSDB_TYPE_STRING);
268         }
269
270         for (const struct ovsdb_idl_row *row = ovsdb_idl_first_row(ctx->idl,
271                                                                    id->table);
272              row != NULL;
273              row = ovsdb_idl_next_row(row)) {
274             const struct ovsdb_datum *name = ovsdb_idl_get(
275                 row, id->name_column, key, OVSDB_TYPE_VOID);
276             if (name->n == 1) {
277                 const union ovsdb_atom *atom = &name->keys[0];
278                 if (key == OVSDB_TYPE_STRING
279                     ? !strcmp(atom->string, record_id)
280                     : atom->integer == strtoll(record_id, NULL, 10)) {
281                     if (referrer) {
282                         ctl_fatal("multiple rows in %s match \"%s\"",
283                                   table->class->name, record_id);
284                     }
285                     referrer = row;
286                 }
287             }
288         }
289     }
290     if (!referrer) {
291         return NULL;
292     }
293
294     final = NULL;
295     if (id->uuid_column) {
296         const struct ovsdb_datum *uuid;
297
298         ovsdb_idl_txn_verify(referrer, id->uuid_column);
299         uuid = ovsdb_idl_get(referrer, id->uuid_column,
300                              OVSDB_TYPE_UUID, OVSDB_TYPE_VOID);
301         if (uuid->n == 1) {
302             final = ovsdb_idl_get_row_for_uuid(ctx->idl, table->class,
303                                                &uuid->keys[0].uuid);
304         }
305     } else {
306         final = referrer;
307     }
308
309     return final;
310 }
311
312 static const struct ovsdb_idl_row *
313 get_row(struct ctl_context *ctx,
314         const struct ctl_table_class *table, const char *record_id,
315         bool must_exist)
316 {
317     const struct ovsdb_idl_row *row;
318     struct uuid uuid;
319
320     row = NULL;
321     if (uuid_from_string(&uuid, record_id)) {
322         row = ovsdb_idl_get_row_for_uuid(ctx->idl, table->class, &uuid);
323     }
324     if (!row) {
325         int i;
326
327         for (i = 0; i < ARRAY_SIZE(table->row_ids); i++) {
328             row = get_row_by_id(ctx, table, &table->row_ids[i], record_id);
329             if (row) {
330                 break;
331             }
332         }
333     }
334     if (must_exist && !row) {
335         ctl_fatal("no row \"%s\" in table %s",
336                   record_id, table->class->name);
337     }
338     return row;
339 }
340
341 static char *
342 get_column(const struct ctl_table_class *table, const char *column_name,
343            const struct ovsdb_idl_column **columnp)
344 {
345     const struct ovsdb_idl_column *best_match = NULL;
346     unsigned int best_score = 0;
347     size_t i;
348
349     for (i = 0; i < table->class->n_columns; i++) {
350         const struct ovsdb_idl_column *column = &table->class->columns[i];
351         unsigned int score = score_partial_match(column->name, column_name);
352         if (score > best_score) {
353             best_match = column;
354             best_score = score;
355         } else if (score == best_score) {
356             best_match = NULL;
357         }
358     }
359
360     *columnp = best_match;
361     if (best_match) {
362         return NULL;
363     } else if (best_score) {
364         return xasprintf("%s contains more than one column whose name "
365                          "matches \"%s\"", table->class->name, column_name);
366     } else {
367         return xasprintf("%s does not contain a column whose name matches "
368                          "\"%s\"", table->class->name, column_name);
369     }
370 }
371
372 static void
373 pre_get_column(struct ctl_context *ctx,
374                const struct ctl_table_class *table, const char *column_name,
375                const struct ovsdb_idl_column **columnp)
376 {
377     die_if_error(get_column(table, column_name, columnp));
378     ovsdb_idl_add_column(ctx->idl, *columnp);
379 }
380
381 static const struct ctl_table_class *
382 pre_get_table(struct ctl_context *ctx, const char *table_name)
383 {
384     const struct ctl_table_class *table_class;
385     int i;
386
387     table_class = get_table(table_name);
388     ovsdb_idl_add_table(ctx->idl, table_class->class);
389
390     for (i = 0; i < ARRAY_SIZE(table_class->row_ids); i++) {
391         const struct ctl_row_id *id = &table_class->row_ids[i];
392         if (id->table) {
393             ovsdb_idl_add_table(ctx->idl, id->table);
394         }
395         if (id->name_column) {
396             ovsdb_idl_add_column(ctx->idl, id->name_column);
397         }
398         if (id->uuid_column) {
399             ovsdb_idl_add_column(ctx->idl, id->uuid_column);
400         }
401     }
402
403     return table_class;
404 }
405
406 static char *
407 missing_operator_error(const char *arg, const char **allowed_operators,
408                        size_t n_allowed)
409 {
410     struct ds s;
411
412     ds_init(&s);
413     ds_put_format(&s, "%s: argument does not end in ", arg);
414     ds_put_format(&s, "\"%s\"", allowed_operators[0]);
415     if (n_allowed == 2) {
416         ds_put_format(&s, " or \"%s\"", allowed_operators[1]);
417     } else if (n_allowed > 2) {
418         size_t i;
419
420         for (i = 1; i < n_allowed - 1; i++) {
421             ds_put_format(&s, ", \"%s\"", allowed_operators[i]);
422         }
423         ds_put_format(&s, ", or \"%s\"", allowed_operators[i]);
424     }
425     ds_put_format(&s, " followed by a value.");
426
427     return ds_steal_cstr(&s);
428 }
429
430 /* Breaks 'arg' apart into a number of fields in the following order:
431  *
432  *      - The name of a column in 'table', stored into '*columnp'.  The column
433  *        name may be abbreviated.
434  *
435  *      - Optionally ':' followed by a key string.  The key is stored as a
436  *        malloc()'d string into '*keyp', or NULL if no key is present in
437  *        'arg'.
438  *
439  *      - If 'valuep' is nonnull, an operator followed by a value string.  The
440  *        allowed operators are the 'n_allowed' string in 'allowed_operators',
441  *        or just "=" if 'n_allowed' is 0.  If 'operatorp' is nonnull, then the
442  *        index of the operator within 'allowed_operators' is stored into
443  *        '*operatorp'.  The value is stored as a malloc()'d string into
444  *        '*valuep', or NULL if no value is present in 'arg'.
445  *
446  * On success, returns NULL.  On failure, returned a malloc()'d string error
447  * message and stores NULL into all of the nonnull output arguments. */
448 static char * OVS_WARN_UNUSED_RESULT
449 parse_column_key_value(const char *arg,
450                        const struct ctl_table_class *table,
451                        const struct ovsdb_idl_column **columnp, char **keyp,
452                        int *operatorp,
453                        const char **allowed_operators, size_t n_allowed,
454                        char **valuep)
455 {
456     const char *p = arg;
457     char *column_name;
458     char *error;
459
460     ovs_assert(!(operatorp && !valuep));
461     *keyp = NULL;
462     if (valuep) {
463         *valuep = NULL;
464     }
465
466     /* Parse column name. */
467     error = ovsdb_token_parse(&p, &column_name);
468     if (error) {
469         goto error;
470     }
471     if (column_name[0] == '\0') {
472         free(column_name);
473         error = xasprintf("%s: missing column name", arg);
474         goto error;
475     }
476     error = get_column(table, column_name, columnp);
477     free(column_name);
478     if (error) {
479         goto error;
480     }
481
482     /* Parse key string. */
483     if (*p == ':') {
484         p++;
485         error = ovsdb_token_parse(&p, keyp);
486         if (error) {
487             goto error;
488         }
489     }
490
491     /* Parse value string. */
492     if (valuep) {
493         size_t best_len;
494         size_t i;
495         int best;
496
497         if (!allowed_operators) {
498             static const char *equals = "=";
499             allowed_operators = &equals;
500             n_allowed = 1;
501         }
502
503         best = -1;
504         best_len = 0;
505         for (i = 0; i < n_allowed; i++) {
506             const char *op = allowed_operators[i];
507             size_t op_len = strlen(op);
508
509             if (op_len > best_len && !strncmp(op, p, op_len) && p[op_len]) {
510                 best_len = op_len;
511                 best = i;
512             }
513         }
514         if (best < 0) {
515             error = missing_operator_error(arg, allowed_operators, n_allowed);
516             goto error;
517         }
518
519         if (operatorp) {
520             *operatorp = best;
521         }
522         *valuep = xstrdup(p + best_len);
523     } else {
524         if (*p != '\0') {
525             error = xasprintf("%s: trailing garbage \"%s\" in argument",
526                               arg, p);
527             goto error;
528         }
529     }
530     return NULL;
531
532  error:
533     *columnp = NULL;
534     free(*keyp);
535     *keyp = NULL;
536     if (valuep) {
537         free(*valuep);
538         *valuep = NULL;
539         if (operatorp) {
540             *operatorp = -1;
541         }
542     }
543     return error;
544 }
545
546 static const struct ovsdb_idl_column *
547 pre_parse_column_key_value(struct ctl_context *ctx,
548                            const char *arg,
549                            const struct ctl_table_class *table)
550 {
551     const struct ovsdb_idl_column *column;
552     const char *p;
553     char *column_name;
554
555     p = arg;
556     die_if_error(ovsdb_token_parse(&p, &column_name));
557     if (column_name[0] == '\0') {
558         ctl_fatal("%s: missing column name", arg);
559     }
560
561     pre_get_column(ctx, table, column_name, &column);
562     free(column_name);
563
564     return column;
565 }
566
567 static void
568 check_mutable(const struct ovsdb_idl_row *row,
569               const struct ovsdb_idl_column *column)
570 {
571     if (!ovsdb_idl_is_mutable(row, column)) {
572         ctl_fatal("cannot modify read-only column %s in table %s",
573                   column->name, row->table->class->name);
574     }
575 }
576
577 #define RELOPS                                  \
578     RELOP(RELOP_EQ,     "=")                    \
579     RELOP(RELOP_NE,     "!=")                   \
580     RELOP(RELOP_LT,     "<")                    \
581     RELOP(RELOP_GT,     ">")                    \
582     RELOP(RELOP_LE,     "<=")                   \
583     RELOP(RELOP_GE,     ">=")                   \
584     RELOP(RELOP_SET_EQ, "{=}")                  \
585     RELOP(RELOP_SET_NE, "{!=}")                 \
586     RELOP(RELOP_SET_LT, "{<}")                  \
587     RELOP(RELOP_SET_GT, "{>}")                  \
588     RELOP(RELOP_SET_LE, "{<=}")                 \
589     RELOP(RELOP_SET_GE, "{>=}")
590
591 enum relop {
592 #define RELOP(ENUM, STRING) ENUM,
593     RELOPS
594 #undef RELOP
595 };
596
597 static bool
598 is_set_operator(enum relop op)
599 {
600     return (op == RELOP_SET_EQ || op == RELOP_SET_NE ||
601             op == RELOP_SET_LT || op == RELOP_SET_GT ||
602             op == RELOP_SET_LE || op == RELOP_SET_GE);
603 }
604
605 static bool
606 evaluate_relop(const struct ovsdb_datum *a, const struct ovsdb_datum *b,
607                const struct ovsdb_type *type, enum relop op)
608 {
609     switch (op) {
610     case RELOP_EQ:
611     case RELOP_SET_EQ:
612         return ovsdb_datum_compare_3way(a, b, type) == 0;
613     case RELOP_NE:
614     case RELOP_SET_NE:
615         return ovsdb_datum_compare_3way(a, b, type) != 0;
616     case RELOP_LT:
617         return ovsdb_datum_compare_3way(a, b, type) < 0;
618     case RELOP_GT:
619         return ovsdb_datum_compare_3way(a, b, type) > 0;
620     case RELOP_LE:
621         return ovsdb_datum_compare_3way(a, b, type) <= 0;
622     case RELOP_GE:
623         return ovsdb_datum_compare_3way(a, b, type) >= 0;
624
625     case RELOP_SET_LT:
626         return b->n > a->n && ovsdb_datum_includes_all(a, b, type);
627     case RELOP_SET_GT:
628         return a->n > b->n && ovsdb_datum_includes_all(b, a, type);
629     case RELOP_SET_LE:
630         return ovsdb_datum_includes_all(a, b, type);
631     case RELOP_SET_GE:
632         return ovsdb_datum_includes_all(b, a, type);
633
634     default:
635         OVS_NOT_REACHED();
636     }
637 }
638
639 static bool
640 is_condition_satisfied(const struct ctl_table_class *table,
641                        const struct ovsdb_idl_row *row, const char *arg,
642                        struct ovsdb_symbol_table *symtab)
643 {
644     static const char *operators[] = {
645 #define RELOP(ENUM, STRING) STRING,
646         RELOPS
647 #undef RELOP
648     };
649
650     const struct ovsdb_idl_column *column;
651     const struct ovsdb_datum *have_datum;
652     char *key_string, *value_string;
653     struct ovsdb_type type;
654     int operator;
655     bool retval;
656     char *error;
657
658     error = parse_column_key_value(arg, table, &column, &key_string,
659                                    &operator, operators, ARRAY_SIZE(operators),
660                                    &value_string);
661     die_if_error(error);
662     if (!value_string) {
663         ctl_fatal("%s: missing value", arg);
664     }
665
666     type = column->type;
667     type.n_max = UINT_MAX;
668
669     have_datum = ovsdb_idl_read(row, column);
670     if (key_string) {
671         union ovsdb_atom want_key;
672         struct ovsdb_datum b;
673         unsigned int idx;
674
675         if (column->type.value.type == OVSDB_TYPE_VOID) {
676             ctl_fatal("cannot specify key to check for non-map column %s",
677                       column->name);
678         }
679
680         die_if_error(ovsdb_atom_from_string(&want_key, &column->type.key,
681                                             key_string, symtab));
682
683         type.key = type.value;
684         type.value.type = OVSDB_TYPE_VOID;
685         die_if_error(ovsdb_datum_from_string(&b, &type, value_string, symtab));
686
687         idx = ovsdb_datum_find_key(have_datum,
688                                    &want_key, column->type.key.type);
689         if (idx == UINT_MAX && !is_set_operator(operator)) {
690             retval = false;
691         } else {
692             struct ovsdb_datum a;
693
694             if (idx != UINT_MAX) {
695                 a.n = 1;
696                 a.keys = &have_datum->values[idx];
697                 a.values = NULL;
698             } else {
699                 a.n = 0;
700                 a.keys = NULL;
701                 a.values = NULL;
702             }
703
704             retval = evaluate_relop(&a, &b, &type, operator);
705         }
706
707         ovsdb_atom_destroy(&want_key, column->type.key.type);
708         ovsdb_datum_destroy(&b, &type);
709     } else {
710         struct ovsdb_datum want_datum;
711
712         die_if_error(ovsdb_datum_from_string(&want_datum, &column->type,
713                                              value_string, symtab));
714         retval = evaluate_relop(have_datum, &want_datum, &type, operator);
715         ovsdb_datum_destroy(&want_datum, &column->type);
716     }
717
718     free(key_string);
719     free(value_string);
720
721     return retval;
722 }
723
724 static void
725 invalidate_cache(struct ctl_context *ctx)
726 {
727     if (ctx->invalidate_cache) {
728         (ctx->invalidate_cache)(ctx);
729     }
730 }
731 \f
732 static void
733 pre_cmd_get(struct ctl_context *ctx)
734 {
735     const char *id = shash_find_data(&ctx->options, "--id");
736     const char *table_name = ctx->argv[1];
737     const struct ctl_table_class *table;
738     int i;
739
740     /* Using "get" without --id or a column name could possibly make sense.
741      * Maybe, for example, a *ctl command run wants to assert that a row
742      * exists.  But it is unlikely that an interactive user would want to do
743      * that, so issue a warning if we're running on a terminal. */
744     if (!id && ctx->argc <= 3 && isatty(STDOUT_FILENO)) {
745         VLOG_WARN("\"get\" command without row arguments or \"--id\" is "
746                   "possibly erroneous");
747     }
748
749     table = pre_get_table(ctx, table_name);
750     for (i = 3; i < ctx->argc; i++) {
751         if (!strcasecmp(ctx->argv[i], "_uuid")
752             || !strcasecmp(ctx->argv[i], "-uuid")) {
753             continue;
754         }
755
756         pre_parse_column_key_value(ctx, ctx->argv[i], table);
757     }
758 }
759
760 static void
761 cmd_get(struct ctl_context *ctx)
762 {
763     const char *id = shash_find_data(&ctx->options, "--id");
764     bool must_exist = !shash_find(&ctx->options, "--if-exists");
765     const char *table_name = ctx->argv[1];
766     const char *record_id = ctx->argv[2];
767     const struct ctl_table_class *table;
768     const struct ovsdb_idl_row *row;
769     struct ds *out = &ctx->output;
770     int i;
771
772     if (id && !must_exist) {
773         ctl_fatal("--if-exists and --id may not be specified together");
774     }
775
776     table = get_table(table_name);
777     row = get_row(ctx, table, record_id, must_exist);
778     if (!row) {
779         return;
780     }
781
782     if (id) {
783         struct ovsdb_symbol *symbol;
784         bool new;
785
786         symbol = create_symbol(ctx->symtab, id, &new);
787         if (!new) {
788             ctl_fatal("row id \"%s\" specified on \"get\" command was used "
789                       "before it was defined", id);
790         }
791         symbol->uuid = row->uuid;
792
793         /* This symbol refers to a row that already exists, so disable warnings
794          * about it being unreferenced. */
795         symbol->strong_ref = true;
796     }
797     for (i = 3; i < ctx->argc; i++) {
798         const struct ovsdb_idl_column *column;
799         const struct ovsdb_datum *datum;
800         char *key_string;
801
802         /* Special case for obtaining the UUID of a row.  We can't just do this
803          * through parse_column_key_value() below since it returns a "struct
804          * ovsdb_idl_column" and the UUID column doesn't have one. */
805         if (!strcasecmp(ctx->argv[i], "_uuid")
806             || !strcasecmp(ctx->argv[i], "-uuid")) {
807             ds_put_format(out, UUID_FMT"\n", UUID_ARGS(&row->uuid));
808             continue;
809         }
810
811         die_if_error(parse_column_key_value(ctx->argv[i], table,
812                                             &column, &key_string,
813                                             NULL, NULL, 0, NULL));
814
815         ovsdb_idl_txn_verify(row, column);
816         datum = ovsdb_idl_read(row, column);
817         if (key_string) {
818             union ovsdb_atom key;
819             unsigned int idx;
820
821             if (column->type.value.type == OVSDB_TYPE_VOID) {
822                 ctl_fatal("cannot specify key to get for non-map column %s",
823                           column->name);
824             }
825
826             die_if_error(ovsdb_atom_from_string(&key,
827                                                 &column->type.key,
828                                                 key_string, ctx->symtab));
829
830             idx = ovsdb_datum_find_key(datum, &key,
831                                        column->type.key.type);
832             if (idx == UINT_MAX) {
833                 if (must_exist) {
834                     ctl_fatal("no key \"%s\" in %s record \"%s\" column %s",
835                               key_string, table->class->name, record_id,
836                               column->name);
837                 }
838             } else {
839                 ovsdb_atom_to_string(&datum->values[idx],
840                                      column->type.value.type, out);
841             }
842             ovsdb_atom_destroy(&key, column->type.key.type);
843         } else {
844             ovsdb_datum_to_string(datum, &column->type, out);
845         }
846         ds_put_char(out, '\n');
847
848         free(key_string);
849     }
850 }
851
852 static void
853 parse_column_names(const char *column_names,
854                    const struct ctl_table_class *table,
855                    const struct ovsdb_idl_column ***columnsp,
856                    size_t *n_columnsp)
857 {
858     const struct ovsdb_idl_column **columns;
859     size_t n_columns;
860
861     if (!column_names) {
862         size_t i;
863
864         n_columns = table->class->n_columns + 1;
865         columns = xmalloc(n_columns * sizeof *columns);
866         columns[0] = NULL;
867         for (i = 0; i < table->class->n_columns; i++) {
868             columns[i + 1] = &table->class->columns[i];
869         }
870     } else {
871         char *s = xstrdup(column_names);
872         size_t allocated_columns;
873         char *save_ptr = NULL;
874         char *column_name;
875
876         columns = NULL;
877         allocated_columns = n_columns = 0;
878         for (column_name = strtok_r(s, ", ", &save_ptr); column_name;
879              column_name = strtok_r(NULL, ", ", &save_ptr)) {
880             const struct ovsdb_idl_column *column;
881
882             if (!strcasecmp(column_name, "_uuid")) {
883                 column = NULL;
884             } else {
885                 die_if_error(get_column(table, column_name, &column));
886             }
887             if (n_columns >= allocated_columns) {
888                 columns = x2nrealloc(columns, &allocated_columns,
889                                      sizeof *columns);
890             }
891             columns[n_columns++] = column;
892         }
893         free(s);
894
895         if (!n_columns) {
896             ctl_fatal("must specify at least one column name");
897         }
898     }
899     *columnsp = columns;
900     *n_columnsp = n_columns;
901 }
902
903 static void
904 pre_list_columns(struct ctl_context *ctx,
905                  const struct ctl_table_class *table,
906                  const char *column_names)
907 {
908     const struct ovsdb_idl_column **columns;
909     size_t n_columns;
910     size_t i;
911
912     parse_column_names(column_names, table, &columns, &n_columns);
913     for (i = 0; i < n_columns; i++) {
914         if (columns[i]) {
915             ovsdb_idl_add_column(ctx->idl, columns[i]);
916         }
917     }
918     free(columns);
919 }
920
921 static void
922 pre_cmd_list(struct ctl_context *ctx)
923 {
924     const char *column_names = shash_find_data(&ctx->options, "--columns");
925     const char *table_name = ctx->argv[1];
926     const struct ctl_table_class *table;
927
928     table = pre_get_table(ctx, table_name);
929     pre_list_columns(ctx, table, column_names);
930 }
931
932 static struct table *
933 list_make_table(const struct ovsdb_idl_column **columns, size_t n_columns)
934 {
935     struct table *out;
936     size_t i;
937
938     out = xmalloc(sizeof *out);
939     table_init(out);
940
941     for (i = 0; i < n_columns; i++) {
942         const struct ovsdb_idl_column *column = columns[i];
943         const char *column_name = column ? column->name : "_uuid";
944
945         table_add_column(out, "%s", column_name);
946     }
947
948     return out;
949 }
950
951 static void
952 list_record(const struct ovsdb_idl_row *row,
953             const struct ovsdb_idl_column **columns, size_t n_columns,
954             struct table *out)
955 {
956     size_t i;
957
958     if (!row) {
959         return;
960     }
961
962     table_add_row(out);
963     for (i = 0; i < n_columns; i++) {
964         const struct ovsdb_idl_column *column = columns[i];
965         struct cell *cell = table_add_cell(out);
966
967         if (!column) {
968             struct ovsdb_datum datum;
969             union ovsdb_atom atom;
970
971             atom.uuid = row->uuid;
972
973             datum.keys = &atom;
974             datum.values = NULL;
975             datum.n = 1;
976
977             cell->json = ovsdb_datum_to_json(&datum, &ovsdb_type_uuid);
978             cell->type = &ovsdb_type_uuid;
979         } else {
980             const struct ovsdb_datum *datum = ovsdb_idl_read(row, column);
981
982             cell->json = ovsdb_datum_to_json(datum, &column->type);
983             cell->type = &column->type;
984         }
985     }
986 }
987
988 static void
989 cmd_list(struct ctl_context *ctx)
990 {
991     const char *column_names = shash_find_data(&ctx->options, "--columns");
992     bool must_exist = !shash_find(&ctx->options, "--if-exists");
993     const struct ovsdb_idl_column **columns;
994     const char *table_name = ctx->argv[1];
995     const struct ctl_table_class *table;
996     struct table *out;
997     size_t n_columns;
998     int i;
999
1000     table = get_table(table_name);
1001     parse_column_names(column_names, table, &columns, &n_columns);
1002     out = ctx->table = list_make_table(columns, n_columns);
1003     if (ctx->argc > 2) {
1004         for (i = 2; i < ctx->argc; i++) {
1005             list_record(get_row(ctx, table, ctx->argv[i], must_exist),
1006                         columns, n_columns, out);
1007         }
1008     } else {
1009         const struct ovsdb_idl_row *row;
1010
1011         for (row = ovsdb_idl_first_row(ctx->idl, table->class); row != NULL;
1012              row = ovsdb_idl_next_row(row)) {
1013             list_record(row, columns, n_columns, out);
1014         }
1015     }
1016     free(columns);
1017 }
1018
1019 /* Finds and returns the "struct ctl_table_class *" with 'table_name' by
1020  * searching the 'tables'. */
1021 static const struct ctl_table_class *
1022 get_table(const char *table_name)
1023 {
1024     const struct ctl_table_class *table;
1025     const struct ctl_table_class *best_match = NULL;
1026     unsigned int best_score = 0;
1027
1028     for (table = tables; table->class; table++) {
1029         unsigned int score = score_partial_match(table->class->name,
1030                                                  table_name);
1031         if (score > best_score) {
1032             best_match = table;
1033             best_score = score;
1034         } else if (score == best_score) {
1035             best_match = NULL;
1036         }
1037     }
1038     if (best_match) {
1039         return best_match;
1040     } else if (best_score) {
1041         ctl_fatal("multiple table names match \"%s\"", table_name);
1042     } else {
1043         ctl_fatal("unknown table \"%s\"", table_name);
1044     }
1045     return NULL;
1046 }
1047
1048 static void
1049 pre_cmd_find(struct ctl_context *ctx)
1050 {
1051     const char *column_names = shash_find_data(&ctx->options, "--columns");
1052     const char *table_name = ctx->argv[1];
1053     const struct ctl_table_class *table;
1054     int i;
1055
1056     table = pre_get_table(ctx, table_name);
1057     pre_list_columns(ctx, table, column_names);
1058     for (i = 2; i < ctx->argc; i++) {
1059         pre_parse_column_key_value(ctx, ctx->argv[i], table);
1060     }
1061 }
1062
1063 static void
1064 cmd_find(struct ctl_context *ctx)
1065 {
1066     const char *column_names = shash_find_data(&ctx->options, "--columns");
1067     const struct ovsdb_idl_column **columns;
1068     const char *table_name = ctx->argv[1];
1069     const struct ctl_table_class *table;
1070     const struct ovsdb_idl_row *row;
1071     struct table *out;
1072     size_t n_columns;
1073
1074     table = get_table(table_name);
1075     parse_column_names(column_names, table, &columns, &n_columns);
1076     out = ctx->table = list_make_table(columns, n_columns);
1077     for (row = ovsdb_idl_first_row(ctx->idl, table->class); row;
1078          row = ovsdb_idl_next_row(row)) {
1079         int i;
1080
1081         for (i = 2; i < ctx->argc; i++) {
1082             if (!is_condition_satisfied(table, row, ctx->argv[i],
1083                                         ctx->symtab)) {
1084                 goto next_row;
1085             }
1086         }
1087         list_record(row, columns, n_columns, out);
1088
1089     next_row: ;
1090     }
1091     free(columns);
1092 }
1093
1094 /* Sets the column of 'row' in 'table'. */
1095 static void
1096 set_column(const struct ctl_table_class *table,
1097            const struct ovsdb_idl_row *row, const char *arg,
1098            struct ovsdb_symbol_table *symtab)
1099 {
1100     const struct ovsdb_idl_column *column;
1101     char *key_string, *value_string;
1102     char *error;
1103
1104     error = parse_column_key_value(arg, table, &column, &key_string,
1105                                    NULL, NULL, 0, &value_string);
1106     die_if_error(error);
1107     if (!value_string) {
1108         ctl_fatal("%s: missing value", arg);
1109     }
1110     check_mutable(row, column);
1111
1112     if (key_string) {
1113         union ovsdb_atom key, value;
1114         struct ovsdb_datum datum;
1115
1116         if (column->type.value.type == OVSDB_TYPE_VOID) {
1117             ctl_fatal("cannot specify key to set for non-map column %s",
1118                       column->name);
1119         }
1120
1121         die_if_error(ovsdb_atom_from_string(&key, &column->type.key,
1122                                             key_string, symtab));
1123         die_if_error(ovsdb_atom_from_string(&value, &column->type.value,
1124                                             value_string, symtab));
1125
1126         ovsdb_datum_init_empty(&datum);
1127         ovsdb_datum_add_unsafe(&datum, &key, &value, &column->type);
1128
1129         ovsdb_atom_destroy(&key, column->type.key.type);
1130         ovsdb_atom_destroy(&value, column->type.value.type);
1131
1132         ovsdb_datum_union(&datum, ovsdb_idl_read(row, column),
1133                           &column->type, false);
1134         ovsdb_idl_txn_verify(row, column);
1135         ovsdb_idl_txn_write(row, column, &datum);
1136     } else {
1137         struct ovsdb_datum datum;
1138
1139         die_if_error(ovsdb_datum_from_string(&datum, &column->type,
1140                                              value_string, symtab));
1141         ovsdb_idl_txn_write(row, column, &datum);
1142     }
1143
1144     free(key_string);
1145     free(value_string);
1146 }
1147
1148 static void
1149 pre_cmd_set(struct ctl_context *ctx)
1150 {
1151     const char *table_name = ctx->argv[1];
1152     const struct ctl_table_class *table;
1153     int i;
1154
1155     table = pre_get_table(ctx, table_name);
1156     for (i = 3; i < ctx->argc; i++) {
1157         pre_parse_column_key_value(ctx, ctx->argv[i], table);
1158     }
1159 }
1160
1161 static void
1162 cmd_set(struct ctl_context *ctx)
1163 {
1164     bool must_exist = !shash_find(&ctx->options, "--if-exists");
1165     const char *table_name = ctx->argv[1];
1166     const char *record_id = ctx->argv[2];
1167     const struct ctl_table_class *table;
1168     const struct ovsdb_idl_row *row;
1169     int i;
1170
1171     table = get_table(table_name);
1172     row = get_row(ctx, table, record_id, must_exist);
1173     if (!row) {
1174         return;
1175     }
1176
1177     for (i = 3; i < ctx->argc; i++) {
1178         set_column(table, row, ctx->argv[i], ctx->symtab);
1179     }
1180
1181     invalidate_cache(ctx);
1182 }
1183
1184 static void
1185 pre_cmd_add(struct ctl_context *ctx)
1186 {
1187     const char *table_name = ctx->argv[1];
1188     const char *column_name = ctx->argv[3];
1189     const struct ctl_table_class *table;
1190     const struct ovsdb_idl_column *column;
1191
1192     table = pre_get_table(ctx, table_name);
1193     pre_get_column(ctx, table, column_name, &column);
1194 }
1195
1196 static void
1197 cmd_add(struct ctl_context *ctx)
1198 {
1199     bool must_exist = !shash_find(&ctx->options, "--if-exists");
1200     const char *table_name = ctx->argv[1];
1201     const char *record_id = ctx->argv[2];
1202     const char *column_name = ctx->argv[3];
1203     const struct ctl_table_class *table;
1204     const struct ovsdb_idl_column *column;
1205     const struct ovsdb_idl_row *row;
1206     const struct ovsdb_type *type;
1207     struct ovsdb_datum old;
1208     int i;
1209
1210     table = get_table(table_name);
1211     die_if_error(get_column(table, column_name, &column));
1212     row = get_row(ctx, table, record_id, must_exist);
1213     if (!row) {
1214         return;
1215     }
1216     check_mutable(row, column);
1217
1218     type = &column->type;
1219     ovsdb_datum_clone(&old, ovsdb_idl_read(row, column), &column->type);
1220     for (i = 4; i < ctx->argc; i++) {
1221         struct ovsdb_type add_type;
1222         struct ovsdb_datum add;
1223
1224         add_type = *type;
1225         add_type.n_min = 1;
1226         add_type.n_max = UINT_MAX;
1227         die_if_error(ovsdb_datum_from_string(&add, &add_type, ctx->argv[i],
1228                                              ctx->symtab));
1229         ovsdb_datum_union(&old, &add, type, false);
1230         ovsdb_datum_destroy(&add, type);
1231     }
1232     if (old.n > type->n_max) {
1233         ctl_fatal("\"add\" operation would put %u %s in column %s of "
1234                   "table %s but the maximum number is %u",
1235                   old.n,
1236                   type->value.type == OVSDB_TYPE_VOID ? "values" : "pairs",
1237                   column->name, table->class->name, type->n_max);
1238     }
1239     ovsdb_idl_txn_verify(row, column);
1240     ovsdb_idl_txn_write(row, column, &old);
1241
1242     invalidate_cache(ctx);
1243 }
1244
1245 static void
1246 pre_cmd_remove(struct ctl_context *ctx)
1247 {
1248     const char *table_name = ctx->argv[1];
1249     const char *column_name = ctx->argv[3];
1250     const struct ctl_table_class *table;
1251     const struct ovsdb_idl_column *column;
1252
1253     table = pre_get_table(ctx, table_name);
1254     pre_get_column(ctx, table, column_name, &column);
1255 }
1256
1257 static void
1258 cmd_remove(struct ctl_context *ctx)
1259 {
1260     bool must_exist = !shash_find(&ctx->options, "--if-exists");
1261     const char *table_name = ctx->argv[1];
1262     const char *record_id = ctx->argv[2];
1263     const char *column_name = ctx->argv[3];
1264     const struct ctl_table_class *table;
1265     const struct ovsdb_idl_column *column;
1266     const struct ovsdb_idl_row *row;
1267     const struct ovsdb_type *type;
1268     struct ovsdb_datum old;
1269     int i;
1270
1271     table = get_table(table_name);
1272     die_if_error(get_column(table, column_name, &column));
1273     row = get_row(ctx, table, record_id, must_exist);
1274     if (!row) {
1275         return;
1276     }
1277     check_mutable(row, column);
1278
1279     type = &column->type;
1280     ovsdb_datum_clone(&old, ovsdb_idl_read(row, column), &column->type);
1281     for (i = 4; i < ctx->argc; i++) {
1282         struct ovsdb_type rm_type;
1283         struct ovsdb_datum rm;
1284         char *error;
1285
1286         rm_type = *type;
1287         rm_type.n_min = 1;
1288         rm_type.n_max = UINT_MAX;
1289         error = ovsdb_datum_from_string(&rm, &rm_type,
1290                                         ctx->argv[i], ctx->symtab);
1291
1292         if (error) {
1293             if (ovsdb_type_is_map(&rm_type)) {
1294                 rm_type.value.type = OVSDB_TYPE_VOID;
1295                 free(error);
1296                 die_if_error(ovsdb_datum_from_string(
1297                                                      &rm, &rm_type, ctx->argv[i], ctx->symtab));
1298             } else {
1299                 ctl_fatal("%s", error);
1300             }
1301         }
1302         ovsdb_datum_subtract(&old, type, &rm, &rm_type);
1303         ovsdb_datum_destroy(&rm, &rm_type);
1304     }
1305     if (old.n < type->n_min) {
1306         ctl_fatal("\"remove\" operation would put %u %s in column %s of "
1307                   "table %s but the minimum number is %u",
1308                   old.n,
1309                   type->value.type == OVSDB_TYPE_VOID ? "values" : "pairs",
1310                   column->name, table->class->name, type->n_min);
1311     }
1312     ovsdb_idl_txn_verify(row, column);
1313     ovsdb_idl_txn_write(row, column, &old);
1314
1315     invalidate_cache(ctx);
1316 }
1317
1318 static void
1319 pre_cmd_clear(struct ctl_context *ctx)
1320 {
1321     const char *table_name = ctx->argv[1];
1322     const struct ctl_table_class *table;
1323     int i;
1324
1325     table = pre_get_table(ctx, table_name);
1326     for (i = 3; i < ctx->argc; i++) {
1327         const struct ovsdb_idl_column *column;
1328
1329         pre_get_column(ctx, table, ctx->argv[i], &column);
1330     }
1331 }
1332
1333 static void
1334 cmd_clear(struct ctl_context *ctx)
1335 {
1336     bool must_exist = !shash_find(&ctx->options, "--if-exists");
1337     const char *table_name = ctx->argv[1];
1338     const char *record_id = ctx->argv[2];
1339     const struct ctl_table_class *table;
1340     const struct ovsdb_idl_row *row;
1341     int i;
1342
1343     table = get_table(table_name);
1344     row = get_row(ctx, table, record_id, must_exist);
1345     if (!row) {
1346         return;
1347     }
1348
1349     for (i = 3; i < ctx->argc; i++) {
1350         const struct ovsdb_idl_column *column;
1351         const struct ovsdb_type *type;
1352         struct ovsdb_datum datum;
1353
1354         die_if_error(get_column(table, ctx->argv[i], &column));
1355         check_mutable(row, column);
1356
1357         type = &column->type;
1358         if (type->n_min > 0) {
1359             ctl_fatal("\"clear\" operation cannot be applied to column %s "
1360                       "of table %s, which is not allowed to be empty",
1361                       column->name, table->class->name);
1362         }
1363
1364         ovsdb_datum_init_empty(&datum);
1365         ovsdb_idl_txn_write(row, column, &datum);
1366     }
1367
1368     invalidate_cache(ctx);
1369 }
1370
1371 static void
1372 pre_create(struct ctl_context *ctx)
1373 {
1374     const char *id = shash_find_data(&ctx->options, "--id");
1375     const char *table_name = ctx->argv[1];
1376     const struct ctl_table_class *table;
1377
1378     table = get_table(table_name);
1379     if (!id && !table->class->is_root) {
1380         VLOG_WARN("applying \"create\" command to table %s without --id "
1381                   "option will have no effect", table->class->name);
1382     }
1383 }
1384
1385 static void
1386 cmd_create(struct ctl_context *ctx)
1387 {
1388     const char *id = shash_find_data(&ctx->options, "--id");
1389     const char *table_name = ctx->argv[1];
1390     const struct ctl_table_class *table = get_table(table_name);
1391     const struct ovsdb_idl_row *row;
1392     const struct uuid *uuid;
1393     int i;
1394
1395     if (id) {
1396         struct ovsdb_symbol *symbol = create_symbol(ctx->symtab, id, NULL);
1397         if (table->class->is_root) {
1398             /* This table is in the root set, meaning that rows created in it
1399              * won't disappear even if they are unreferenced, so disable
1400              * warnings about that by pretending that there is a reference. */
1401             symbol->strong_ref = true;
1402         }
1403         uuid = &symbol->uuid;
1404     } else {
1405         uuid = NULL;
1406     }
1407
1408     row = ovsdb_idl_txn_insert(ctx->txn, table->class, uuid);
1409     for (i = 2; i < ctx->argc; i++) {
1410         set_column(table, row, ctx->argv[i], ctx->symtab);
1411     }
1412     ds_put_format(&ctx->output, UUID_FMT, UUID_ARGS(&row->uuid));
1413 }
1414
1415 /* This function may be used as the 'postprocess' function for commands that
1416  * insert new rows into the database.  It expects that the command's 'run'
1417  * function prints the UUID reported by ovsdb_idl_txn_insert() as the command's
1418  * sole output.  It replaces that output by the row's permanent UUID assigned
1419  * by the database server and appends a new-line.
1420  *
1421  * Currently we use this only for "create", because the higher-level commands
1422  * are supposed to be independent of the actual structure of the vswitch
1423  * configuration. */
1424 static void
1425 post_create(struct ctl_context *ctx)
1426 {
1427     const struct uuid *real;
1428     struct uuid dummy;
1429
1430     if (!uuid_from_string(&dummy, ds_cstr(&ctx->output))) {
1431         OVS_NOT_REACHED();
1432     }
1433     real = ovsdb_idl_txn_get_insert_uuid(ctx->txn, &dummy);
1434     if (real) {
1435         ds_clear(&ctx->output);
1436         ds_put_format(&ctx->output, UUID_FMT, UUID_ARGS(real));
1437     }
1438     ds_put_char(&ctx->output, '\n');
1439 }
1440
1441 static void
1442 pre_cmd_destroy(struct ctl_context *ctx)
1443 {
1444     const char *table_name = ctx->argv[1];
1445
1446     pre_get_table(ctx, table_name);
1447 }
1448
1449 static void
1450 cmd_destroy(struct ctl_context *ctx)
1451 {
1452     bool must_exist = !shash_find(&ctx->options, "--if-exists");
1453     bool delete_all = shash_find(&ctx->options, "--all");
1454     const char *table_name = ctx->argv[1];
1455     const struct ctl_table_class *table;
1456     int i;
1457
1458     table = get_table(table_name);
1459
1460     if (delete_all && ctx->argc > 2) {
1461         ctl_fatal("--all and records argument should not be specified together");
1462     }
1463
1464     if (delete_all && !must_exist) {
1465         ctl_fatal("--all and --if-exists should not be specified together");
1466     }
1467
1468     if (delete_all) {
1469         const struct ovsdb_idl_row *row;
1470         const struct ovsdb_idl_row *next_row;
1471
1472         for (row = ovsdb_idl_first_row(ctx->idl, table->class);
1473              row;) {
1474             next_row = ovsdb_idl_next_row(row);
1475             ovsdb_idl_txn_delete(row);
1476             row = next_row;
1477         }
1478     } else {
1479         for (i = 2; i < ctx->argc; i++) {
1480             const struct ovsdb_idl_row *row;
1481
1482             row = get_row(ctx, table, ctx->argv[i], must_exist);
1483             if (row) {
1484                 ovsdb_idl_txn_delete(row);
1485             }
1486         }
1487     }
1488     invalidate_cache(ctx);
1489 }
1490
1491 static void
1492 pre_cmd_wait_until(struct ctl_context *ctx)
1493 {
1494     const char *table_name = ctx->argv[1];
1495     const struct ctl_table_class *table;
1496     int i;
1497
1498     table = pre_get_table(ctx, table_name);
1499
1500     for (i = 3; i < ctx->argc; i++) {
1501         pre_parse_column_key_value(ctx, ctx->argv[i], table);
1502     }
1503 }
1504
1505 static void
1506 cmd_wait_until(struct ctl_context *ctx)
1507 {
1508     const char *table_name = ctx->argv[1];
1509     const char *record_id = ctx->argv[2];
1510     const struct ctl_table_class *table;
1511     const struct ovsdb_idl_row *row;
1512     int i;
1513
1514     table = get_table(table_name);
1515
1516     row = get_row(ctx, table, record_id, false);
1517     if (!row) {
1518         ctx->try_again = true;
1519         return;
1520     }
1521
1522     for (i = 3; i < ctx->argc; i++) {
1523         if (!is_condition_satisfied(table, row, ctx->argv[i], ctx->symtab)) {
1524             ctx->try_again = true;
1525             return;
1526         }
1527     }
1528 }
1529
1530 /* Parses one command. */
1531 static void
1532 parse_command(int argc, char *argv[], struct shash *local_options,
1533               struct ctl_command *command)
1534 {
1535     const struct ctl_command_syntax *p;
1536     struct shash_node *node;
1537     int n_arg;
1538     int i;
1539
1540     shash_init(&command->options);
1541     shash_swap(local_options, &command->options);
1542     for (i = 0; i < argc; i++) {
1543         const char *option = argv[i];
1544         const char *equals;
1545         char *key, *value;
1546
1547         if (option[0] != '-') {
1548             break;
1549         }
1550
1551         equals = strchr(option, '=');
1552         if (equals) {
1553             key = xmemdup0(option, equals - option);
1554             value = xstrdup(equals + 1);
1555         } else {
1556             key = xstrdup(option);
1557             value = NULL;
1558         }
1559
1560         if (shash_find(&command->options, key)) {
1561             ctl_fatal("'%s' option specified multiple times", argv[i]);
1562         }
1563         shash_add_nocopy(&command->options, key, value);
1564     }
1565     if (i == argc) {
1566         ctl_fatal("missing command name (use --help for help)");
1567     }
1568
1569     p = shash_find_data(&all_commands, argv[i]);
1570     if (!p) {
1571         ctl_fatal("unknown command '%s'; use --help for help", argv[i]);
1572     }
1573
1574     SHASH_FOR_EACH (node, &command->options) {
1575         const char *s = strstr(p->options, node->name);
1576         int end = s ? s[strlen(node->name)] : EOF;
1577
1578         if (end != '=' && end != ',' && end != ' ' && end != '\0') {
1579             ctl_fatal("'%s' command has no '%s' option",
1580                       argv[i], node->name);
1581         }
1582         if ((end == '=') != (node->data != NULL)) {
1583             if (end == '=') {
1584                 ctl_fatal("missing argument to '%s' option on '%s' "
1585                           "command", node->name, argv[i]);
1586             } else {
1587                 ctl_fatal("'%s' option on '%s' does not accept an "
1588                           "argument", node->name, argv[i]);
1589             }
1590         }
1591     }
1592
1593     n_arg = argc - i - 1;
1594     if (n_arg < p->min_args) {
1595         ctl_fatal("'%s' command requires at least %d arguments",
1596                   p->name, p->min_args);
1597     } else if (n_arg > p->max_args) {
1598         int j;
1599
1600         for (j = i + 1; j < argc; j++) {
1601             if (argv[j][0] == '-') {
1602                 ctl_fatal("'%s' command takes at most %d arguments "
1603                           "(note that options must precede command "
1604                           "names and follow a \"--\" argument)",
1605                           p->name, p->max_args);
1606             }
1607         }
1608
1609         ctl_fatal("'%s' command takes at most %d arguments",
1610                   p->name, p->max_args);
1611     }
1612
1613     command->syntax = p;
1614     command->argc = n_arg + 1;
1615     command->argv = &argv[i];
1616 }
1617
1618 static void
1619 pre_cmd_show(struct ctl_context *ctx)
1620 {
1621     const struct cmd_show_table *show;
1622
1623     for (show = cmd_show_tables; show->table; show++) {
1624         size_t i;
1625
1626         ovsdb_idl_add_table(ctx->idl, show->table);
1627         if (show->name_column) {
1628             ovsdb_idl_add_column(ctx->idl, show->name_column);
1629         }
1630         for (i = 0; i < ARRAY_SIZE(show->columns); i++) {
1631             const struct ovsdb_idl_column *column = show->columns[i];
1632             if (column) {
1633                 ovsdb_idl_add_column(ctx->idl, column);
1634             }
1635         }
1636         if (show->wref_table.table) {
1637             ovsdb_idl_add_table(ctx->idl, show->wref_table.table);
1638         }
1639         if (show->wref_table.name_column) {
1640             ovsdb_idl_add_column(ctx->idl, show->wref_table.name_column);
1641         }
1642         if (show->wref_table.wref_column) {
1643             ovsdb_idl_add_column(ctx->idl, show->wref_table.wref_column);
1644         }
1645     }
1646 }
1647
1648 static const struct cmd_show_table *
1649 cmd_show_find_table_by_row(const struct ovsdb_idl_row *row)
1650 {
1651     const struct cmd_show_table *show;
1652
1653     for (show = cmd_show_tables; show->table; show++) {
1654         if (show->table == row->table->class) {
1655             return show;
1656         }
1657     }
1658     return NULL;
1659 }
1660
1661 static const struct cmd_show_table *
1662 cmd_show_find_table_by_name(const char *name)
1663 {
1664     const struct cmd_show_table *show;
1665
1666     for (show = cmd_show_tables; show->table; show++) {
1667         if (!strcmp(show->table->name, name)) {
1668             return show;
1669         }
1670     }
1671     return NULL;
1672 }
1673
1674 /*  Prints table entries that weak reference the 'cur_row'. */
1675 static void
1676 cmd_show_weak_ref(struct ctl_context *ctx, const struct cmd_show_table *show,
1677                   const struct ovsdb_idl_row *cur_row, int level)
1678 {
1679     const struct ovsdb_idl_row *row_wref;
1680     const struct ovsdb_idl_table_class *table = show->wref_table.table;
1681     const struct ovsdb_idl_column *name_column
1682         = show->wref_table.name_column;
1683     const struct ovsdb_idl_column *wref_column
1684         = show->wref_table.wref_column;
1685
1686     if (!table || !name_column || !wref_column) {
1687         return;
1688     }
1689
1690     for (row_wref = ovsdb_idl_first_row(ctx->idl, table); row_wref;
1691          row_wref = ovsdb_idl_next_row(row_wref)) {
1692         const struct ovsdb_datum *wref_datum
1693             = ovsdb_idl_read(row_wref, wref_column);
1694         /* If weak reference refers to the 'cur_row', prints it. */
1695         if (wref_datum->n
1696             && uuid_equals(&cur_row->uuid, &wref_datum->keys[0].uuid)) {
1697             const struct ovsdb_datum *name_datum
1698                 = ovsdb_idl_read(row_wref, name_column);
1699             ds_put_char_multiple(&ctx->output, ' ', (level + 1) * 4);
1700             ds_put_format(&ctx->output, "%s ", table->name);
1701             ovsdb_datum_to_string(name_datum, &name_column->type, &ctx->output);
1702             ds_put_char(&ctx->output, '\n');
1703         }
1704     }
1705 }
1706
1707 /* 'shown' records the tables that has been displayed by the current
1708  * command to avoid duplicated prints.
1709  */
1710 static void
1711 cmd_show_row(struct ctl_context *ctx, const struct ovsdb_idl_row *row,
1712              int level, struct sset *shown)
1713 {
1714     const struct cmd_show_table *show = cmd_show_find_table_by_row(row);
1715     size_t i;
1716
1717     ds_put_char_multiple(&ctx->output, ' ', level * 4);
1718     if (show && show->name_column) {
1719         const struct ovsdb_datum *datum;
1720
1721         ds_put_format(&ctx->output, "%s ", show->table->name);
1722         datum = ovsdb_idl_read(row, show->name_column);
1723         ovsdb_datum_to_string(datum, &show->name_column->type, &ctx->output);
1724     } else {
1725         ds_put_format(&ctx->output, UUID_FMT, UUID_ARGS(&row->uuid));
1726     }
1727     ds_put_char(&ctx->output, '\n');
1728
1729     if (!show || sset_find(shown, show->table->name)) {
1730         return;
1731     }
1732
1733     sset_add(shown, show->table->name);
1734     for (i = 0; i < ARRAY_SIZE(show->columns); i++) {
1735         const struct ovsdb_idl_column *column = show->columns[i];
1736         const struct ovsdb_datum *datum;
1737
1738         if (!column) {
1739             break;
1740         }
1741
1742         datum = ovsdb_idl_read(row, column);
1743         if (column->type.key.type == OVSDB_TYPE_UUID &&
1744             column->type.key.u.uuid.refTableName) {
1745             const struct cmd_show_table *ref_show;
1746             size_t j;
1747
1748             ref_show = cmd_show_find_table_by_name(
1749                 column->type.key.u.uuid.refTableName);
1750             if (ref_show) {
1751                 for (j = 0; j < datum->n; j++) {
1752                     const struct ovsdb_idl_row *ref_row;
1753
1754                     ref_row = ovsdb_idl_get_row_for_uuid(ctx->idl,
1755                                                          ref_show->table,
1756                                                          &datum->keys[j].uuid);
1757                     if (ref_row) {
1758                         cmd_show_row(ctx, ref_row, level + 1, shown);
1759                     }
1760                 }
1761                 continue;
1762             }
1763         } else if (ovsdb_type_is_map(&column->type) &&
1764                    column->type.value.type == OVSDB_TYPE_UUID &&
1765                    column->type.value.u.uuid.refTableName) {
1766             const struct cmd_show_table *ref_show;
1767             size_t j;
1768
1769             /* Prints the key to ref'ed table name map if the ref'ed table
1770              * is also defined in 'cmd_show_tables'.  */
1771             ref_show = cmd_show_find_table_by_name(
1772                 column->type.value.u.uuid.refTableName);
1773             if (ref_show && ref_show->name_column) {
1774                 ds_put_char_multiple(&ctx->output, ' ', (level + 1) * 4);
1775                 ds_put_format(&ctx->output, "%s:\n", column->name);
1776                 for (j = 0; j < datum->n; j++) {
1777                     const struct ovsdb_idl_row *ref_row;
1778
1779                     ref_row = ovsdb_idl_get_row_for_uuid(ctx->idl,
1780                                                          ref_show->table,
1781                                                          &datum->values[j].uuid);
1782
1783                     ds_put_char_multiple(&ctx->output, ' ', (level + 2) * 4);
1784                     ovsdb_atom_to_string(&datum->keys[j], column->type.key.type,
1785                                          &ctx->output);
1786                     ds_put_char(&ctx->output, '=');
1787                     if (ref_row) {
1788                         const struct ovsdb_datum *ref_datum;
1789
1790                         ref_datum = ovsdb_idl_read(ref_row,
1791                                                    ref_show->name_column);
1792                         ovsdb_datum_to_string(ref_datum,
1793                                               &ref_show->name_column->type,
1794                                               &ctx->output);
1795                     } else {
1796                         ds_put_cstr(&ctx->output, "\"<null>\"");
1797                     }
1798                     ds_put_char(&ctx->output, '\n');
1799                 }
1800                 continue;
1801             }
1802         }
1803
1804         if (!ovsdb_datum_is_default(datum, &column->type)) {
1805             ds_put_char_multiple(&ctx->output, ' ', (level + 1) * 4);
1806             ds_put_format(&ctx->output, "%s: ", column->name);
1807             ovsdb_datum_to_string(datum, &column->type, &ctx->output);
1808             ds_put_char(&ctx->output, '\n');
1809         }
1810     }
1811     cmd_show_weak_ref(ctx, show, row, level);
1812     sset_find_and_delete_assert(shown, show->table->name);
1813 }
1814
1815 static void
1816 cmd_show(struct ctl_context *ctx)
1817 {
1818     const struct ovsdb_idl_row *row;
1819     struct sset shown = SSET_INITIALIZER(&shown);
1820
1821     for (row = ovsdb_idl_first_row(ctx->idl, cmd_show_tables[0].table);
1822          row; row = ovsdb_idl_next_row(row)) {
1823         cmd_show_row(ctx, row, 0, &shown);
1824     }
1825
1826     ovs_assert(sset_is_empty(&shown));
1827     sset_destroy(&shown);
1828 }
1829
1830 \f
1831 /* Given pointer to dynamic array 'options_p',  array's current size
1832  * 'allocated_options_p' and number of added options 'n_options_p',
1833  * adds all command options to the array.  Enlarges the array if
1834  * necessary. */
1835 void
1836 ctl_add_cmd_options(struct option **options_p, size_t *n_options_p,
1837                     size_t *allocated_options_p, int opt_val)
1838 {
1839     struct option *o;
1840     const struct shash_node *node;
1841     size_t n_existing_options = *n_options_p;
1842
1843     SHASH_FOR_EACH (node, &all_commands) {
1844         const struct ctl_command_syntax *p = node->data;
1845
1846         if (p->options[0]) {
1847             char *save_ptr = NULL;
1848             char *name;
1849             char *s;
1850
1851             s = xstrdup(p->options);
1852             for (name = strtok_r(s, ",", &save_ptr); name != NULL;
1853                  name = strtok_r(NULL, ",", &save_ptr)) {
1854                 char *equals;
1855                 int has_arg;
1856
1857                 ovs_assert(name[0] == '-' && name[1] == '-' && name[2]);
1858                 name += 2;
1859
1860                 equals = strchr(name, '=');
1861                 if (equals) {
1862                     has_arg = required_argument;
1863                     *equals = '\0';
1864                 } else {
1865                     has_arg = no_argument;
1866                 }
1867
1868                 o = find_option(name, *options_p, *n_options_p);
1869                 if (o) {
1870                     ovs_assert(o - *options_p >= n_existing_options);
1871                     ovs_assert(o->has_arg == has_arg);
1872                 } else {
1873                     o = add_option(options_p, n_options_p, allocated_options_p);
1874                     o->name = xstrdup(name);
1875                     o->has_arg = has_arg;
1876                     o->flag = NULL;
1877                     o->val = opt_val;
1878                 }
1879             }
1880
1881             free(s);
1882         }
1883     }
1884     o = add_option(options_p, n_options_p, allocated_options_p);
1885     memset(o, 0, sizeof *o);
1886 }
1887
1888 /* Parses command-line input for commands. */
1889 struct ctl_command *
1890 ctl_parse_commands(int argc, char *argv[], struct shash *local_options,
1891                    size_t *n_commandsp)
1892 {
1893     struct ctl_command *commands;
1894     size_t n_commands, allocated_commands;
1895     int i, start;
1896
1897     commands = NULL;
1898     n_commands = allocated_commands = 0;
1899
1900     for (start = i = 0; i <= argc; i++) {
1901         if (i == argc || !strcmp(argv[i], "--")) {
1902             if (i > start) {
1903                 if (n_commands >= allocated_commands) {
1904                     struct ctl_command *c;
1905
1906                     commands = x2nrealloc(commands, &allocated_commands,
1907                                           sizeof *commands);
1908                     for (c = commands; c < &commands[n_commands]; c++) {
1909                         shash_moved(&c->options);
1910                     }
1911                 }
1912                 parse_command(i - start, &argv[start], local_options,
1913                               &commands[n_commands++]);
1914             } else if (!shash_is_empty(local_options)) {
1915                 ctl_fatal("missing command name (use --help for help)");
1916             }
1917             start = i + 1;
1918         }
1919     }
1920     if (!n_commands) {
1921         ctl_fatal("missing command name (use --help for help)");
1922     }
1923     *n_commandsp = n_commands;
1924     return commands;
1925 }
1926
1927 /* Prints all registered commands. */
1928 void
1929 ctl_print_commands(void)
1930 {
1931     const struct shash_node *node;
1932
1933     SHASH_FOR_EACH (node, &all_commands) {
1934         const struct ctl_command_syntax *p = node->data;
1935         char *options = xstrdup(p->options);
1936         char *options_begin = options;
1937         char *item;
1938
1939         for (item = strsep(&options, ","); item != NULL;
1940              item = strsep(&options, ",")) {
1941             if (item[0] != '\0') {
1942                 printf("[%s] ", item);
1943             }
1944         }
1945         printf(",%s,", p->name);
1946         print_command_arguments(p);
1947         printf("\n");
1948
1949         free(options_begin);
1950     }
1951
1952     exit(EXIT_SUCCESS);
1953 }
1954
1955 /* Given array of options 'options', prints them. */
1956 void
1957 ctl_print_options(const struct option *options)
1958 {
1959     for (; options->name; options++) {
1960         const struct option *o = options;
1961
1962         printf("--%s%s\n", o->name, o->has_arg ? "=ARG" : "");
1963         if (o->flag == NULL && o->val > 0 && o->val <= UCHAR_MAX) {
1964             printf("-%c%s\n", o->val, o->has_arg ? " ARG" : "");
1965         }
1966     }
1967
1968     exit(EXIT_SUCCESS);
1969 }
1970
1971 /* Returns the default local database path. */
1972 char *
1973 ctl_default_db(void)
1974 {
1975     static char *def;
1976     if (!def) {
1977         def = xasprintf("unix:%s/db.sock", ovs_rundir());
1978     }
1979     return def;
1980 }
1981
1982 /* Returns true if it looks like this set of arguments might modify the
1983  * database, otherwise false.  (Not very smart, so it's prone to false
1984  * positives.) */
1985 bool
1986 ctl_might_write_to_db(char **argv)
1987 {
1988     for (; *argv; argv++) {
1989         const struct ctl_command_syntax *p = shash_find_data(&all_commands,
1990                                                              *argv);
1991         if (p && p->mode == RW) {
1992             return true;
1993         }
1994     }
1995     return false;
1996 }
1997
1998 void
1999 ctl_fatal(const char *format, ...)
2000 {
2001     char *message;
2002     va_list args;
2003
2004     va_start(args, format);
2005     message = xvasprintf(format, args);
2006     va_end(args);
2007
2008     vlog_set_levels(&this_module, VLF_CONSOLE, VLL_OFF);
2009     VLOG_ERR("%s", message);
2010     ovs_error(0, "%s", message);
2011     ctl_exit(EXIT_FAILURE);
2012 }
2013
2014 /* Frees the current transaction and the underlying IDL and then calls
2015  * exit(status).
2016  *
2017  * Freeing the transaction and the IDL is not strictly necessary, but it makes
2018  * for a clean memory leak report from valgrind in the normal case.  That makes
2019  * it easier to notice real memory leaks. */
2020 static void
2021 ctl_exit(int status)
2022 {
2023     if (ctl_exit_func) {
2024         ctl_exit_func(status);
2025     }
2026     exit(status);
2027 }
2028
2029 /* Comman database commands to be registered. */
2030 static const struct ctl_command_syntax db_ctl_commands[] = {
2031     {"comment", 0, INT_MAX, "[ARG]...", NULL, NULL, NULL, "", RO},
2032     {"get", 2, INT_MAX, "TABLE RECORD [COLUMN[:KEY]]...",pre_cmd_get, cmd_get,
2033      NULL, "--if-exists,--id=", RO},
2034     {"list", 1, INT_MAX, "TABLE [RECORD]...", pre_cmd_list, cmd_list, NULL,
2035      "--if-exists,--columns=", RO},
2036     {"find", 1, INT_MAX, "TABLE [COLUMN[:KEY]=VALUE]...", pre_cmd_find,
2037      cmd_find, NULL, "--columns=", RO},
2038     {"set", 3, INT_MAX, "TABLE RECORD COLUMN[:KEY]=VALUE...", pre_cmd_set,
2039      cmd_set, NULL, "--if-exists", RW},
2040     {"add", 4, INT_MAX, "TABLE RECORD COLUMN [KEY=]VALUE...", pre_cmd_add,
2041      cmd_add, NULL, "--if-exists", RW},
2042     {"remove", 4, INT_MAX, "TABLE RECORD COLUMN KEY|VALUE|KEY=VALUE...",
2043      pre_cmd_remove, cmd_remove, NULL, "--if-exists", RW},
2044     {"clear", 3, INT_MAX, "TABLE RECORD COLUMN...", pre_cmd_clear, cmd_clear,
2045      NULL, "--if-exists", RW},
2046     {"create", 2, INT_MAX, "TABLE COLUMN[:KEY]=VALUE...", pre_create,
2047      cmd_create, post_create, "--id=", RW},
2048     {"destroy", 1, INT_MAX, "TABLE [RECORD]...", pre_cmd_destroy, cmd_destroy,
2049      NULL, "--if-exists,--all", RW},
2050     {"wait-until", 2, INT_MAX, "TABLE RECORD [COLUMN[:KEY]=VALUE]...",
2051      pre_cmd_wait_until, cmd_wait_until, NULL, "", RO},
2052     {NULL, 0, 0, NULL, NULL, NULL, NULL, NULL, RO},
2053 };
2054
2055 static void
2056 ctl_register_command(const struct ctl_command_syntax *command)
2057 {
2058     shash_add_assert(&all_commands, command->name, command);
2059 }
2060
2061 /* Registers commands represented by 'struct ctl_command_syntax's to
2062  * 'all_commands'.  The last element of 'commands' must be an all-NULL
2063  * element. */
2064 void
2065 ctl_register_commands(const struct ctl_command_syntax *commands)
2066 {
2067     const struct ctl_command_syntax *p;
2068
2069     for (p = commands; p->name; p++) {
2070         ctl_register_command(p);
2071     }
2072 }
2073
2074 /* Registers the 'db_ctl_commands' to 'all_commands'. */
2075 void
2076 ctl_init(const struct ctl_table_class tables_[],
2077          const struct cmd_show_table cmd_show_tables_[],
2078          void (*ctl_exit_func_)(int status))
2079 {
2080     tables = tables_;
2081     ctl_exit_func = ctl_exit_func_;
2082     ctl_register_commands(db_ctl_commands);
2083
2084     cmd_show_tables = cmd_show_tables_;
2085     if (cmd_show_tables) {
2086         static const struct ctl_command_syntax show =
2087             {"show", 0, 0, "", pre_cmd_show, cmd_show, NULL, "", RO};
2088         ctl_register_command(&show);
2089     }
2090 }
2091
2092 /* Returns the text for the database commands usage.  */
2093 const char *
2094 ctl_get_db_cmd_usage(void)
2095 {
2096     return "Database commands:\n\
2097   list TBL [REC]              list RECord (or all records) in TBL\n\
2098   find TBL CONDITION...       list records satisfying CONDITION in TBL\n\
2099   get TBL REC COL[:KEY]       print values of COLumns in RECord in TBL\n\
2100   set TBL REC COL[:KEY]=VALUE set COLumn values in RECord in TBL\n\
2101   add TBL REC COL [KEY=]VALUE add (KEY=)VALUE to COLumn in RECord in TBL\n\
2102   remove TBL REC COL [KEY=]VALUE  remove (KEY=)VALUE from COLumn\n\
2103   clear TBL REC COL           clear values from COLumn in RECord in TBL\n\
2104   create TBL COL[:KEY]=VALUE  create and initialize new record\n\
2105   destroy TBL REC             delete RECord from TBL\n\
2106   wait-until TBL REC [COL[:KEY]=VALUE]  wait until condition is true\n\
2107 Potentially unsafe database commands require --force option.\n";
2108 }
2109
2110 /* Initializes 'ctx' from 'command'. */
2111 void
2112 ctl_context_init_command(struct ctl_context *ctx,
2113                          struct ctl_command *command)
2114 {
2115     ctx->argc = command->argc;
2116     ctx->argv = command->argv;
2117     ctx->options = command->options;
2118
2119     ds_swap(&ctx->output, &command->output);
2120     ctx->table = command->table;
2121     ctx->try_again = false;
2122 }
2123
2124 /* Initializes the entire 'ctx'. */
2125 void
2126 ctl_context_init(struct ctl_context *ctx, struct ctl_command *command,
2127                  struct ovsdb_idl *idl, struct ovsdb_idl_txn *txn,
2128                  struct ovsdb_symbol_table *symtab,
2129                  void (*invalidate_cache)(struct ctl_context *))
2130 {
2131     if (command) {
2132         ctl_context_init_command(ctx, command);
2133     }
2134     ctx->idl = idl;
2135     ctx->txn = txn;
2136     ctx->symtab = symtab;
2137     ctx->invalidate_cache = invalidate_cache;
2138 }
2139
2140 /* Completes processing of 'command' within 'ctx'. */
2141 void
2142 ctl_context_done_command(struct ctl_context *ctx,
2143                          struct ctl_command *command)
2144 {
2145     ds_swap(&ctx->output, &command->output);
2146     command->table = ctx->table;
2147 }
2148
2149 /* Finishes up with 'ctx'.
2150  *
2151  * If command is nonnull, first calls ctl_context_done_command() to complete
2152  * processing that command within 'ctx'. */
2153 void
2154 ctl_context_done(struct ctl_context *ctx,
2155                  struct ctl_command *command)
2156 {
2157     if (command) {
2158         ctl_context_done_command(ctx, command);
2159     }
2160     invalidate_cache(ctx);
2161 }
2162
2163 void ctl_set_column(const char *table_name,
2164                     const struct ovsdb_idl_row *row, const char *arg,
2165                     struct ovsdb_symbol_table *symtab)
2166 {
2167     set_column(get_table(table_name), row, arg, symtab);
2168 }