ovsdb-client: support monitor2
[cascardo/ovs.git] / ovsdb / ovsdb-client.c
1 /*
2  * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include <ctype.h>
20 #include <errno.h>
21 #include <getopt.h>
22 #include <limits.h>
23 #include <signal.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <unistd.h>
27
28 #include "command-line.h"
29 #include "column.h"
30 #include "compiler.h"
31 #include "daemon.h"
32 #include "dirs.h"
33 #include "dynamic-string.h"
34 #include "fatal-signal.h"
35 #include "json.h"
36 #include "jsonrpc.h"
37 #include "lib/table.h"
38 #include "ovsdb.h"
39 #include "ovsdb-data.h"
40 #include "ovsdb-error.h"
41 #include "poll-loop.h"
42 #include "sort.h"
43 #include "svec.h"
44 #include "stream.h"
45 #include "stream-ssl.h"
46 #include "table.h"
47 #include "monitor.h"
48 #include "timeval.h"
49 #include "unixctl.h"
50 #include "util.h"
51 #include "openvswitch/vlog.h"
52
53 VLOG_DEFINE_THIS_MODULE(ovsdb_client);
54
55 enum args_needed {
56     NEED_NONE,            /* No JSON-RPC connection or database name needed. */
57     NEED_RPC,             /* JSON-RPC connection needed. */
58     NEED_DATABASE         /* JSON-RPC connection and database name needed. */
59 };
60
61 struct ovsdb_client_command {
62     const char *name;
63     enum args_needed need;
64     int min_args;
65     int max_args;
66     void (*handler)(struct jsonrpc *rpc, const char *database,
67                     int argc, char *argv[]);
68 };
69
70 /* --timestamp: Print a timestamp before each update on "monitor" command? */
71 static bool timestamp;
72
73 /* Format for table output. */
74 static struct table_style table_style = TABLE_STYLE_DEFAULT;
75
76 static const struct ovsdb_client_command *get_all_commands(void);
77
78 OVS_NO_RETURN static void usage(void);
79 static void parse_options(int argc, char *argv[]);
80 static struct jsonrpc *open_jsonrpc(const char *server);
81 static void fetch_dbs(struct jsonrpc *, struct svec *dbs);
82
83 int
84 main(int argc, char *argv[])
85 {
86     const struct ovsdb_client_command *command;
87     const char *database;
88     struct jsonrpc *rpc;
89
90     ovs_cmdl_proctitle_init(argc, argv);
91     set_program_name(argv[0]);
92     parse_options(argc, argv);
93     fatal_ignore_sigpipe();
94
95     daemon_become_new_user(false);
96     if (optind >= argc) {
97         ovs_fatal(0, "missing command name; use --help for help");
98     }
99
100     for (command = get_all_commands(); ; command++) {
101         if (!command->name) {
102             VLOG_FATAL("unknown command '%s'; use --help for help",
103                        argv[optind]);
104         } else if (!strcmp(command->name, argv[optind])) {
105             break;
106         }
107     }
108     optind++;
109
110     if (command->need != NEED_NONE) {
111         if (argc - optind > command->min_args
112             && (isalpha((unsigned char) argv[optind][0])
113                 && strchr(argv[optind], ':'))) {
114             rpc = open_jsonrpc(argv[optind++]);
115         } else {
116             char *sock = xasprintf("unix:%s/db.sock", ovs_rundir());
117             rpc = open_jsonrpc(sock);
118             free(sock);
119         }
120     } else {
121         rpc = NULL;
122     }
123
124     if (command->need == NEED_DATABASE) {
125         struct svec dbs;
126
127         svec_init(&dbs);
128         fetch_dbs(rpc, &dbs);
129         if (argc - optind > command->min_args
130             && svec_contains(&dbs, argv[optind])) {
131             database = argv[optind++];
132         } else if (dbs.n == 1) {
133             database = xstrdup(dbs.names[0]);
134         } else if (svec_contains(&dbs, "Open_vSwitch")) {
135             database = "Open_vSwitch";
136         } else {
137             ovs_fatal(0, "no default database for `%s' command, please "
138                       "specify a database name", command->name);
139         }
140         svec_destroy(&dbs);
141     } else {
142         database = NULL;
143     }
144
145     if (argc - optind < command->min_args ||
146         argc - optind > command->max_args) {
147         VLOG_FATAL("invalid syntax for '%s' (use --help for help)",
148                     command->name);
149     }
150
151     command->handler(rpc, database, argc - optind, argv + optind);
152
153     jsonrpc_close(rpc);
154
155     if (ferror(stdout)) {
156         VLOG_FATAL("write to stdout failed");
157     }
158     if (ferror(stderr)) {
159         VLOG_FATAL("write to stderr failed");
160     }
161
162     return 0;
163 }
164
165 static void
166 parse_options(int argc, char *argv[])
167 {
168     enum {
169         OPT_BOOTSTRAP_CA_CERT = UCHAR_MAX + 1,
170         OPT_TIMESTAMP,
171         VLOG_OPTION_ENUMS,
172         DAEMON_OPTION_ENUMS,
173         TABLE_OPTION_ENUMS
174     };
175     static const struct option long_options[] = {
176         {"help", no_argument, NULL, 'h'},
177         {"version", no_argument, NULL, 'V'},
178         {"timestamp", no_argument, NULL, OPT_TIMESTAMP},
179         VLOG_LONG_OPTIONS,
180         DAEMON_LONG_OPTIONS,
181 #ifdef HAVE_OPENSSL
182         {"bootstrap-ca-cert", required_argument, NULL, OPT_BOOTSTRAP_CA_CERT},
183         STREAM_SSL_LONG_OPTIONS,
184 #endif
185         TABLE_LONG_OPTIONS,
186         {NULL, 0, NULL, 0},
187     };
188     char *short_options = ovs_cmdl_long_options_to_short_options(long_options);
189
190     for (;;) {
191         int c;
192
193         c = getopt_long(argc, argv, short_options, long_options, NULL);
194         if (c == -1) {
195             break;
196         }
197
198         switch (c) {
199         case 'h':
200             usage();
201
202         case 'V':
203             ovs_print_version(0, 0);
204             exit(EXIT_SUCCESS);
205
206         VLOG_OPTION_HANDLERS
207         DAEMON_OPTION_HANDLERS
208         TABLE_OPTION_HANDLERS(&table_style)
209         STREAM_SSL_OPTION_HANDLERS
210
211         case OPT_BOOTSTRAP_CA_CERT:
212             stream_ssl_set_ca_cert_file(optarg, true);
213             break;
214
215         case OPT_TIMESTAMP:
216             timestamp = true;
217             break;
218
219         case '?':
220             exit(EXIT_FAILURE);
221
222         case 0:
223             /* getopt_long() already set the value for us. */
224             break;
225
226         default:
227             abort();
228         }
229     }
230     free(short_options);
231 }
232
233 static void
234 usage(void)
235 {
236     printf("%s: Open vSwitch database JSON-RPC client\n"
237            "usage: %s [OPTIONS] COMMAND [ARG...]\n"
238            "\nValid commands are:\n"
239            "\n  list-dbs [SERVER]\n"
240            "    list databases available on SERVER\n"
241            "\n  get-schema [SERVER] [DATABASE]\n"
242            "    retrieve schema for DATABASE from SERVER\n"
243            "\n  get-schema-version [SERVER] [DATABASE]\n"
244            "    retrieve schema for DATABASE from SERVER and report only its\n"
245            "    version number on stdout\n"
246            "\n  list-tables [SERVER] [DATABASE]\n"
247            "    list tables for DATABASE on SERVER\n"
248            "\n  list-columns [SERVER] [DATABASE] [TABLE]\n"
249            "    list columns in TABLE (or all tables) in DATABASE on SERVER\n"
250            "\n  transact [SERVER] TRANSACTION\n"
251            "    run TRANSACTION (a JSON array of operations) on SERVER\n"
252            "    and print the results as JSON on stdout\n"
253            "\n  monitor [SERVER] [DATABASE] TABLE [COLUMN,...]...\n"
254            "    monitor contents of COLUMNs in TABLE in DATABASE on SERVER.\n"
255            "    COLUMNs may include !initial, !insert, !delete, !modify\n"
256            "    to avoid seeing the specified kinds of changes.\n"
257            "\n  monitor [SERVER] [DATABASE] ALL\n"
258            "    monitor all changes to all columns in all tables\n"
259            "    in DATBASE on SERVER.\n"
260            "\n  monitor2 [SERVER] [DATABASE] ALL\n"
261            "    same usage as monitor, but uses \"monitor2\" method over"
262            "    the wire."
263            "\n  dump [SERVER] [DATABASE]\n"
264            "    dump contents of DATABASE on SERVER to stdout\n"
265            "\nThe default SERVER is unix:%s/db.sock.\n"
266            "The default DATABASE is Open_vSwitch.\n",
267            program_name, program_name, ovs_rundir());
268     stream_usage("SERVER", true, true, true);
269     printf("\nOutput formatting options:\n"
270            "  -f, --format=FORMAT         set output formatting to FORMAT\n"
271            "                              (\"table\", \"html\", \"csv\", "
272            "or \"json\")\n"
273            "  --no-headings               omit table heading row\n"
274            "  --pretty                    pretty-print JSON in output\n"
275            "  --timestamp                 timestamp \"monitor\" output");
276     daemon_usage();
277     vlog_usage();
278     printf("\nOther options:\n"
279            "  -h, --help                  display this help message\n"
280            "  -V, --version               display version information\n");
281     exit(EXIT_SUCCESS);
282 }
283 \f
284 static void
285 check_txn(int error, struct jsonrpc_msg **reply_)
286 {
287     struct jsonrpc_msg *reply = *reply_;
288
289     if (error) {
290         ovs_fatal(error, "transaction failed");
291     }
292
293     if (reply->error) {
294         ovs_fatal(error, "transaction returned error: %s",
295                   json_to_string(reply->error, table_style.json_flags));
296     }
297 }
298
299 static struct json *
300 parse_json(const char *s)
301 {
302     struct json *json = json_from_string(s);
303     if (json->type == JSON_STRING) {
304         ovs_fatal(0, "\"%s\": %s", s, json->u.string);
305     }
306     return json;
307 }
308
309 static struct jsonrpc *
310 open_jsonrpc(const char *server)
311 {
312     struct stream *stream;
313     int error;
314
315     error = stream_open_block(jsonrpc_stream_open(server, &stream,
316                               DSCP_DEFAULT), &stream);
317     if (error == EAFNOSUPPORT) {
318         struct pstream *pstream;
319
320         error = jsonrpc_pstream_open(server, &pstream, DSCP_DEFAULT);
321         if (error) {
322             ovs_fatal(error, "failed to connect or listen to \"%s\"", server);
323         }
324
325         VLOG_INFO("%s: waiting for connection...", server);
326         error = pstream_accept_block(pstream, &stream);
327         if (error) {
328             ovs_fatal(error, "failed to accept connection on \"%s\"", server);
329         }
330
331         pstream_close(pstream);
332     } else if (error) {
333         ovs_fatal(error, "failed to connect to \"%s\"", server);
334     }
335
336     return jsonrpc_open(stream);
337 }
338
339 static void
340 print_json(struct json *json)
341 {
342     char *string = json_to_string(json, table_style.json_flags);
343     fputs(string, stdout);
344     free(string);
345 }
346
347 static void
348 print_and_free_json(struct json *json)
349 {
350     print_json(json);
351     json_destroy(json);
352 }
353
354 static void
355 check_ovsdb_error(struct ovsdb_error *error)
356 {
357     if (error) {
358         ovs_fatal(0, "%s", ovsdb_error_to_string(error));
359     }
360 }
361
362 static struct ovsdb_schema *
363 fetch_schema(struct jsonrpc *rpc, const char *database)
364 {
365     struct jsonrpc_msg *request, *reply;
366     struct ovsdb_schema *schema;
367
368     request = jsonrpc_create_request("get_schema",
369                                      json_array_create_1(
370                                          json_string_create(database)),
371                                      NULL);
372     check_txn(jsonrpc_transact_block(rpc, request, &reply), &reply);
373     check_ovsdb_error(ovsdb_schema_from_json(reply->result, &schema));
374     jsonrpc_msg_destroy(reply);
375
376     return schema;
377 }
378
379 static void
380 fetch_dbs(struct jsonrpc *rpc, struct svec *dbs)
381 {
382     struct jsonrpc_msg *request, *reply;
383     size_t i;
384
385     request = jsonrpc_create_request("list_dbs", json_array_create_empty(),
386                                      NULL);
387
388     check_txn(jsonrpc_transact_block(rpc, request, &reply), &reply);
389     if (reply->result->type != JSON_ARRAY) {
390         ovs_fatal(0, "list_dbs response is not array");
391     }
392
393     for (i = 0; i < reply->result->u.array.n; i++) {
394         const struct json *name = reply->result->u.array.elems[i];
395
396         if (name->type != JSON_STRING) {
397             ovs_fatal(0, "list_dbs response %"PRIuSIZE" is not string", i);
398         }
399         svec_add(dbs, name->u.string);
400     }
401     jsonrpc_msg_destroy(reply);
402     svec_sort(dbs);
403 }
404 \f
405 static void
406 do_list_dbs(struct jsonrpc *rpc, const char *database OVS_UNUSED,
407             int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
408 {
409     const char *db_name;
410     struct svec dbs;
411     size_t i;
412
413     svec_init(&dbs);
414     fetch_dbs(rpc, &dbs);
415     SVEC_FOR_EACH (i, db_name, &dbs) {
416         puts(db_name);
417     }
418     svec_destroy(&dbs);
419 }
420
421 static void
422 do_get_schema(struct jsonrpc *rpc, const char *database,
423               int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
424 {
425     struct ovsdb_schema *schema = fetch_schema(rpc, database);
426     print_and_free_json(ovsdb_schema_to_json(schema));
427     ovsdb_schema_destroy(schema);
428 }
429
430 static void
431 do_get_schema_version(struct jsonrpc *rpc, const char *database,
432                       int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
433 {
434     struct ovsdb_schema *schema = fetch_schema(rpc, database);
435     puts(schema->version);
436     ovsdb_schema_destroy(schema);
437 }
438
439 static void
440 do_list_tables(struct jsonrpc *rpc, const char *database,
441                int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
442 {
443     struct ovsdb_schema *schema;
444     struct shash_node *node;
445     struct table t;
446
447     schema = fetch_schema(rpc, database);
448     table_init(&t);
449     table_add_column(&t, "Table");
450     SHASH_FOR_EACH (node, &schema->tables) {
451         struct ovsdb_table_schema *ts = node->data;
452
453         table_add_row(&t);
454         table_add_cell(&t)->text = xstrdup(ts->name);
455     }
456     ovsdb_schema_destroy(schema);
457     table_print(&t, &table_style);
458 }
459
460 static void
461 do_list_columns(struct jsonrpc *rpc, const char *database,
462                 int argc OVS_UNUSED, char *argv[])
463 {
464     const char *table_name = argv[0];
465     struct ovsdb_schema *schema;
466     struct shash_node *table_node;
467     struct table t;
468
469     schema = fetch_schema(rpc, database);
470     table_init(&t);
471     if (!table_name) {
472         table_add_column(&t, "Table");
473     }
474     table_add_column(&t, "Column");
475     table_add_column(&t, "Type");
476     SHASH_FOR_EACH (table_node, &schema->tables) {
477         struct ovsdb_table_schema *ts = table_node->data;
478
479         if (!table_name || !strcmp(table_name, ts->name)) {
480             struct shash_node *column_node;
481
482             SHASH_FOR_EACH (column_node, &ts->columns) {
483                 const struct ovsdb_column *column = column_node->data;
484
485                 table_add_row(&t);
486                 if (!table_name) {
487                     table_add_cell(&t)->text = xstrdup(ts->name);
488                 }
489                 table_add_cell(&t)->text = xstrdup(column->name);
490                 table_add_cell(&t)->json = ovsdb_type_to_json(&column->type);
491             }
492         }
493     }
494     ovsdb_schema_destroy(schema);
495     table_print(&t, &table_style);
496 }
497
498 static void
499 do_transact(struct jsonrpc *rpc, const char *database OVS_UNUSED,
500             int argc OVS_UNUSED, char *argv[])
501 {
502     struct jsonrpc_msg *request, *reply;
503     struct json *transaction;
504
505     transaction = parse_json(argv[0]);
506
507     request = jsonrpc_create_request("transact", transaction, NULL);
508     check_txn(jsonrpc_transact_block(rpc, request, &reply), &reply);
509     print_json(reply->result);
510     putchar('\n');
511     jsonrpc_msg_destroy(reply);
512 }
513 \f
514 /* "monitor" command. */
515
516 struct monitored_table {
517     struct ovsdb_table_schema *table;
518     struct ovsdb_column_set columns;
519 };
520
521 static void
522 monitor_print_row(struct json *row, const char *type, const char *uuid,
523                   const struct ovsdb_column_set *columns, struct table *t)
524 {
525     size_t i;
526
527     if (!row) {
528         ovs_error(0, "missing %s row", type);
529         return;
530     } else if (row->type != JSON_OBJECT) {
531         ovs_error(0, "<row> is not object");
532         return;
533     }
534
535     table_add_row(t);
536     table_add_cell(t)->text = xstrdup(uuid);
537     table_add_cell(t)->text = xstrdup(type);
538     for (i = 0; i < columns->n_columns; i++) {
539         const struct ovsdb_column *column = columns->columns[i];
540         struct json *value = shash_find_data(json_object(row), column->name);
541         struct cell *cell = table_add_cell(t);
542         if (value) {
543             cell->json = json_clone(value);
544             cell->type = &column->type;
545         }
546     }
547 }
548
549 static void
550 monitor_print_table(struct json *table_update,
551                     const struct monitored_table *mt, char *caption,
552                     bool initial)
553 {
554     const struct ovsdb_table_schema *table = mt->table;
555     const struct ovsdb_column_set *columns = &mt->columns;
556     struct shash_node *node;
557     struct table t;
558     size_t i;
559
560     if (table_update->type != JSON_OBJECT) {
561         ovs_error(0, "<table-update> for table %s is not object", table->name);
562         return;
563     }
564
565     table_init(&t);
566     table_set_timestamp(&t, timestamp);
567     table_set_caption(&t, caption);
568
569     table_add_column(&t, "row");
570     table_add_column(&t, "action");
571     for (i = 0; i < columns->n_columns; i++) {
572         table_add_column(&t, "%s", columns->columns[i]->name);
573     }
574     SHASH_FOR_EACH (node, json_object(table_update)) {
575         struct json *row_update = node->data;
576         struct json *old, *new;
577
578         if (row_update->type != JSON_OBJECT) {
579             ovs_error(0, "<row-update> is not object");
580             continue;
581         }
582         old = shash_find_data(json_object(row_update), "old");
583         new = shash_find_data(json_object(row_update), "new");
584         if (initial) {
585             monitor_print_row(new, "initial", node->name, columns, &t);
586         } else if (!old) {
587             monitor_print_row(new, "insert", node->name, columns, &t);
588         } else if (!new) {
589             monitor_print_row(old, "delete", node->name, columns, &t);
590         } else {
591             monitor_print_row(old, "old", node->name, columns, &t);
592             monitor_print_row(new, "new", "", columns, &t);
593         }
594     }
595     table_print(&t, &table_style);
596     table_destroy(&t);
597 }
598
599 static void
600 monitor_print(struct json *table_updates,
601               const struct monitored_table *mts, size_t n_mts,
602               bool initial)
603 {
604     size_t i;
605
606     if (table_updates->type != JSON_OBJECT) {
607         ovs_error(0, "<table-updates> is not object");
608         return;
609     }
610
611     for (i = 0; i < n_mts; i++) {
612         const struct monitored_table *mt = &mts[i];
613         struct json *table_update = shash_find_data(json_object(table_updates),
614                                                     mt->table->name);
615         if (table_update) {
616             monitor_print_table(table_update, mt,
617                                 n_mts > 1 ? xstrdup(mt->table->name) : NULL,
618                                 initial);
619         }
620     }
621 }
622
623 static void
624 monitor2_print_row(struct json *row, const char *type, const char *uuid,
625                    const struct ovsdb_column_set *columns, struct table *t)
626 {
627     if (!strcmp(type, "delete")) {
628         if (row->type != JSON_NULL) {
629             ovs_error(0, "delete method does not expect <row>");
630             return;
631         }
632
633         table_add_row(t);
634         table_add_cell(t)->text = xstrdup(uuid);
635         table_add_cell(t)->text = xstrdup(type);
636     } else {
637         if (!row || row->type != JSON_OBJECT) {
638             ovs_error(0, "<row> is not object");
639             return;
640         }
641         monitor_print_row(row, type, uuid, columns, t);
642     }
643 }
644
645 static void
646 monitor2_print_table(struct json *table_update2,
647                     const struct monitored_table *mt, char *caption)
648 {
649     const struct ovsdb_table_schema *table = mt->table;
650     const struct ovsdb_column_set *columns = &mt->columns;
651     struct shash_node *node;
652     struct table t;
653     size_t i;
654
655     if (table_update2->type != JSON_OBJECT) {
656         ovs_error(0, "<table-update> for table %s is not object", table->name);
657         return;
658     }
659
660     table_init(&t);
661     table_set_timestamp(&t, timestamp);
662     table_set_caption(&t, caption);
663
664     table_add_column(&t, "row");
665     table_add_column(&t, "action");
666     for (i = 0; i < columns->n_columns; i++) {
667         table_add_column(&t, "%s", columns->columns[i]->name);
668     }
669     SHASH_FOR_EACH (node, json_object(table_update2)) {
670         struct json *row_update2 = node->data;
671         const char *operation;
672         struct json *row;
673         const char *ops[] = {"delete", "initial", "modify", "insert"};
674
675         if (row_update2->type != JSON_OBJECT) {
676             ovs_error(0, "<row-update2> is not object");
677             continue;
678         }
679
680         /* row_update2 contains one of objects indexed by ops[] */
681         for (int i = 0; i < ARRAY_SIZE(ops); i++) {
682             operation = ops[i];
683             row = shash_find_data(json_object(row_update2), operation);
684
685             if (row) {
686                 monitor2_print_row(row, operation, node->name, columns, &t);
687                 break;
688             }
689         }
690     }
691     table_print(&t, &table_style);
692     table_destroy(&t);
693 }
694
695 static void
696 monitor2_print(struct json *table_updates2,
697                const struct monitored_table *mts, size_t n_mts)
698 {
699     size_t i;
700
701     if (table_updates2->type != JSON_OBJECT) {
702         ovs_error(0, "<table-updates2> is not object");
703         return;
704     }
705
706     for (i = 0; i < n_mts; i++) {
707         const struct monitored_table *mt = &mts[i];
708         struct json *table_update = shash_find_data(
709                                         json_object(table_updates2),
710                                         mt->table->name);
711         if (table_update) {
712             monitor2_print_table(table_update, mt,
713                                 n_mts > 1 ? xstrdup(mt->table->name) : NULL);
714         }
715     }
716 }
717
718 static void
719 add_column(const char *server, const struct ovsdb_column *column,
720            struct ovsdb_column_set *columns, struct json *columns_json)
721 {
722     if (ovsdb_column_set_contains(columns, column->index)) {
723         ovs_fatal(0, "%s: column \"%s\" mentioned multiple times",
724                   server, column->name);
725     }
726     ovsdb_column_set_add(columns, column);
727     json_array_add(columns_json, json_string_create(column->name));
728 }
729
730 static struct json *
731 parse_monitor_columns(char *arg, const char *server, const char *database,
732                       const struct ovsdb_table_schema *table,
733                       struct ovsdb_column_set *columns)
734 {
735     bool initial, insert, delete, modify;
736     struct json *mr, *columns_json;
737     char *save_ptr = NULL;
738     char *token;
739
740     mr = json_object_create();
741     columns_json = json_array_create_empty();
742     json_object_put(mr, "columns", columns_json);
743
744     initial = insert = delete = modify = true;
745     for (token = strtok_r(arg, ",", &save_ptr); token != NULL;
746          token = strtok_r(NULL, ",", &save_ptr)) {
747         if (!strcmp(token, "!initial")) {
748             initial = false;
749         } else if (!strcmp(token, "!insert")) {
750             insert = false;
751         } else if (!strcmp(token, "!delete")) {
752             delete = false;
753         } else if (!strcmp(token, "!modify")) {
754             modify = false;
755         } else {
756             const struct ovsdb_column *column;
757
758             column = ovsdb_table_schema_get_column(table, token);
759             if (!column) {
760                 ovs_fatal(0, "%s: table \"%s\" in %s does not have a "
761                           "column named \"%s\"",
762                           server, table->name, database, token);
763             }
764             add_column(server, column, columns, columns_json);
765         }
766     }
767
768     if (columns_json->u.array.n == 0) {
769         const struct shash_node **nodes;
770         size_t i, n;
771
772         n = shash_count(&table->columns);
773         nodes = shash_sort(&table->columns);
774         for (i = 0; i < n; i++) {
775             const struct ovsdb_column *column = nodes[i]->data;
776             if (column->index != OVSDB_COL_UUID
777                 && column->index != OVSDB_COL_VERSION) {
778                 add_column(server, column, columns, columns_json);
779             }
780         }
781         free(nodes);
782
783         add_column(server, ovsdb_table_schema_get_column(table, "_version"),
784                    columns, columns_json);
785     }
786
787     if (!initial || !insert || !delete || !modify) {
788         struct json *select = json_object_create();
789         json_object_put(select, "initial", json_boolean_create(initial));
790         json_object_put(select, "insert", json_boolean_create(insert));
791         json_object_put(select, "delete", json_boolean_create(delete));
792         json_object_put(select, "modify", json_boolean_create(modify));
793         json_object_put(mr, "select", select);
794     }
795
796     return mr;
797 }
798
799 static void
800 ovsdb_client_exit(struct unixctl_conn *conn, int argc OVS_UNUSED,
801                   const char *argv[] OVS_UNUSED, void *exiting_)
802 {
803     bool *exiting = exiting_;
804     *exiting = true;
805     unixctl_command_reply(conn, NULL);
806 }
807
808 static void
809 ovsdb_client_block(struct unixctl_conn *conn, int argc OVS_UNUSED,
810                    const char *argv[] OVS_UNUSED, void *blocked_)
811 {
812     bool *blocked = blocked_;
813
814     if (!*blocked) {
815         *blocked = true;
816         unixctl_command_reply(conn, NULL);
817     } else {
818         unixctl_command_reply(conn, "already blocking");
819     }
820 }
821
822 static void
823 ovsdb_client_unblock(struct unixctl_conn *conn, int argc OVS_UNUSED,
824                      const char *argv[] OVS_UNUSED, void *blocked_)
825 {
826     bool *blocked = blocked_;
827
828     if (*blocked) {
829         *blocked = false;
830         unixctl_command_reply(conn, NULL);
831     } else {
832         unixctl_command_reply(conn, "already unblocked");
833     }
834 }
835
836 static void
837 add_monitored_table(int argc, char *argv[],
838                     const char *server, const char *database,
839                     struct ovsdb_table_schema *table,
840                     struct json *monitor_requests,
841                     struct monitored_table **mts,
842                     size_t *n_mts, size_t *allocated_mts)
843 {
844     struct json *monitor_request_array;
845     struct monitored_table *mt;
846
847     if (*n_mts >= *allocated_mts) {
848         *mts = x2nrealloc(*mts, allocated_mts, sizeof **mts);
849     }
850     mt = &(*mts)[(*n_mts)++];
851     mt->table = table;
852     ovsdb_column_set_init(&mt->columns);
853
854     monitor_request_array = json_array_create_empty();
855     if (argc > 1) {
856         int i;
857
858         for (i = 1; i < argc; i++) {
859             json_array_add(
860                 monitor_request_array,
861                 parse_monitor_columns(argv[i], server, database, table,
862                                       &mt->columns));
863         }
864     } else {
865         /* Allocate a writable empty string since parse_monitor_columns()
866          * is going to strtok() it and that's risky with literal "". */
867         char empty[] = "";
868         json_array_add(
869             monitor_request_array,
870             parse_monitor_columns(empty, server, database,
871                                   table, &mt->columns));
872     }
873
874     json_object_put(monitor_requests, table->name, monitor_request_array);
875 }
876
877 static void
878 do_monitor__(struct jsonrpc *rpc, const char *database,
879              enum ovsdb_monitor_version version,
880              int argc, char *argv[])
881 {
882     const char *server = jsonrpc_get_name(rpc);
883     const char *table_name = argv[0];
884     struct unixctl_server *unixctl;
885     struct ovsdb_schema *schema;
886     struct jsonrpc_msg *request;
887     struct json *monitor, *monitor_requests, *request_id;
888     bool exiting = false;
889     bool blocked = false;
890
891     struct monitored_table *mts;
892     size_t n_mts, allocated_mts;
893
894     ovs_assert(version < OVSDB_MONITOR_VERSION_MAX);
895
896     daemon_save_fd(STDOUT_FILENO);
897     daemonize_start(false);
898     if (get_detach()) {
899         int error;
900
901         error = unixctl_server_create(NULL, &unixctl);
902         if (error) {
903             ovs_fatal(error, "failed to create unixctl server");
904         }
905
906         unixctl_command_register("exit", "", 0, 0,
907                                  ovsdb_client_exit, &exiting);
908         unixctl_command_register("ovsdb-client/block", "", 0, 0,
909                                  ovsdb_client_block, &blocked);
910         unixctl_command_register("ovsdb-client/unblock", "", 0, 0,
911                                  ovsdb_client_unblock, &blocked);
912     } else {
913         unixctl = NULL;
914     }
915
916     schema = fetch_schema(rpc, database);
917
918     monitor_requests = json_object_create();
919
920     mts = NULL;
921     n_mts = allocated_mts = 0;
922     if (strcmp(table_name, "ALL")) {
923         struct ovsdb_table_schema *table;
924
925         table = shash_find_data(&schema->tables, table_name);
926         if (!table) {
927             ovs_fatal(0, "%s: %s does not have a table named \"%s\"",
928                       server, database, table_name);
929         }
930
931         add_monitored_table(argc, argv, server, database, table,
932                             monitor_requests, &mts, &n_mts, &allocated_mts);
933     } else {
934         size_t n = shash_count(&schema->tables);
935         const struct shash_node **nodes = shash_sort(&schema->tables);
936         size_t i;
937
938         for (i = 0; i < n; i++) {
939             struct ovsdb_table_schema *table = nodes[i]->data;
940
941             add_monitored_table(argc, argv, server, database, table,
942                                 monitor_requests,
943                                 &mts, &n_mts, &allocated_mts);
944         }
945         free(nodes);
946     }
947
948     monitor = json_array_create_3(json_string_create(database),
949                                   json_null_create(), monitor_requests);
950     const char *method = version == OVSDB_MONITOR_V2 ? "monitor2"
951                                                      : "monitor";
952
953     request = jsonrpc_create_request(method, monitor, NULL);
954     request_id = json_clone(request->id);
955     jsonrpc_send(rpc, request);
956
957     for (;;) {
958         unixctl_server_run(unixctl);
959         while (!blocked) {
960             struct jsonrpc_msg *msg;
961             int error;
962
963             error = jsonrpc_recv(rpc, &msg);
964             if (error == EAGAIN) {
965                 break;
966             } else if (error) {
967                 ovs_fatal(error, "%s: receive failed", server);
968             }
969
970             if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
971                 jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
972                                                        msg->id));
973             } else if (msg->type == JSONRPC_REPLY
974                        && json_equal(msg->id, request_id)) {
975                 switch(version) {
976                 case OVSDB_MONITOR_V1:
977                     monitor_print(msg->result, mts, n_mts, true);
978                     break;
979                 case OVSDB_MONITOR_V2:
980                     monitor2_print(msg->result, mts, n_mts);
981                     break;
982                 case OVSDB_MONITOR_VERSION_MAX:
983                 default:
984                     OVS_NOT_REACHED();
985                 }
986                 fflush(stdout);
987                 daemonize_complete();
988             } else if (msg->type == JSONRPC_NOTIFY
989                        && !strcmp(msg->method, "update")) {
990                 struct json *params = msg->params;
991                 if (params->type == JSON_ARRAY
992                     && params->u.array.n == 2
993                     && params->u.array.elems[0]->type == JSON_NULL) {
994                     monitor_print(params->u.array.elems[1], mts, n_mts, false);
995                     fflush(stdout);
996                 }
997             } else if (msg->type == JSONRPC_NOTIFY
998                        && version == OVSDB_MONITOR_V2
999                        && !strcmp(msg->method, "update2")) {
1000                 struct json *params = msg->params;
1001                 if (params->type == JSON_ARRAY
1002                     && params->u.array.n == 2
1003                     && params->u.array.elems[0]->type == JSON_NULL) {
1004                     monitor2_print(params->u.array.elems[1], mts, n_mts);
1005                     fflush(stdout);
1006                 }
1007             }
1008             jsonrpc_msg_destroy(msg);
1009         }
1010
1011         if (exiting) {
1012             break;
1013         }
1014
1015         jsonrpc_run(rpc);
1016         jsonrpc_wait(rpc);
1017         if (!blocked) {
1018             jsonrpc_recv_wait(rpc);
1019         }
1020         unixctl_server_wait(unixctl);
1021         poll_block();
1022     }
1023 }
1024
1025 static void
1026 do_monitor(struct jsonrpc *rpc, const char *database,
1027            int argc, char *argv[])
1028 {
1029     do_monitor__(rpc, database, OVSDB_MONITOR_V1, argc, argv);
1030 }
1031
1032 static void
1033 do_monitor2(struct jsonrpc *rpc, const char *database,
1034            int argc, char *argv[])
1035 {
1036     do_monitor__(rpc, database, OVSDB_MONITOR_V2, argc, argv);
1037 }
1038
1039 struct dump_table_aux {
1040     struct ovsdb_datum **data;
1041     const struct ovsdb_column **columns;
1042     size_t n_columns;
1043 };
1044
1045 static int
1046 compare_data(size_t a_y, size_t b_y, size_t x,
1047              const struct dump_table_aux *aux)
1048 {
1049     return ovsdb_datum_compare_3way(&aux->data[a_y][x],
1050                                     &aux->data[b_y][x],
1051                                     &aux->columns[x]->type);
1052 }
1053
1054 static int
1055 compare_rows(size_t a_y, size_t b_y, void *aux_)
1056 {
1057     struct dump_table_aux *aux = aux_;
1058     size_t x;
1059
1060     /* Skip UUID columns on the first pass, since their values tend to be
1061      * random and make our results less reproducible. */
1062     for (x = 0; x < aux->n_columns; x++) {
1063         if (aux->columns[x]->type.key.type != OVSDB_TYPE_UUID) {
1064             int cmp = compare_data(a_y, b_y, x, aux);
1065             if (cmp) {
1066                 return cmp;
1067             }
1068         }
1069     }
1070
1071     /* Use UUID columns as tie-breakers. */
1072     for (x = 0; x < aux->n_columns; x++) {
1073         if (aux->columns[x]->type.key.type == OVSDB_TYPE_UUID) {
1074             int cmp = compare_data(a_y, b_y, x, aux);
1075             if (cmp) {
1076                 return cmp;
1077             }
1078         }
1079     }
1080
1081     return 0;
1082 }
1083
1084 static void
1085 swap_rows(size_t a_y, size_t b_y, void *aux_)
1086 {
1087     struct dump_table_aux *aux = aux_;
1088     struct ovsdb_datum *tmp = aux->data[a_y];
1089     aux->data[a_y] = aux->data[b_y];
1090     aux->data[b_y] = tmp;
1091 }
1092
1093 static int
1094 compare_columns(const void *a_, const void *b_)
1095 {
1096     const struct ovsdb_column *const *ap = a_;
1097     const struct ovsdb_column *const *bp = b_;
1098     const struct ovsdb_column *a = *ap;
1099     const struct ovsdb_column *b = *bp;
1100
1101     return strcmp(a->name, b->name);
1102 }
1103
1104 static void
1105 dump_table(const char *table_name, const struct shash *cols,
1106            struct json_array *rows)
1107 {
1108     const struct ovsdb_column **columns;
1109     size_t n_columns;
1110
1111     struct ovsdb_datum **data;
1112
1113     struct dump_table_aux aux;
1114     struct shash_node *node;
1115     struct table t;
1116     size_t x, y;
1117
1118     /* Sort columns by name, for reproducibility. */
1119     columns = xmalloc(shash_count(cols) * sizeof *columns);
1120     n_columns = 0;
1121     SHASH_FOR_EACH (node, cols) {
1122         struct ovsdb_column *column = node->data;
1123         if (strcmp(column->name, "_version")) {
1124             columns[n_columns++] = column;
1125         }
1126     }
1127     qsort(columns, n_columns, sizeof *columns, compare_columns);
1128
1129     /* Extract data from table. */
1130     data = xmalloc(rows->n * sizeof *data);
1131     for (y = 0; y < rows->n; y++) {
1132         struct shash *row;
1133
1134         if (rows->elems[y]->type != JSON_OBJECT) {
1135             ovs_fatal(0,  "row %"PRIuSIZE" in table %s response is not a JSON object: "
1136                       "%s", y, table_name, json_to_string(rows->elems[y], 0));
1137         }
1138         row = json_object(rows->elems[y]);
1139
1140         data[y] = xmalloc(n_columns * sizeof **data);
1141         for (x = 0; x < n_columns; x++) {
1142             const struct json *json = shash_find_data(row, columns[x]->name);
1143             if (!json) {
1144                 ovs_fatal(0, "row %"PRIuSIZE" in table %s response lacks %s column",
1145                           y, table_name, columns[x]->name);
1146             }
1147
1148             check_ovsdb_error(ovsdb_datum_from_json(&data[y][x],
1149                                                     &columns[x]->type,
1150                                                     json, NULL));
1151         }
1152     }
1153
1154     /* Sort rows by column values, for reproducibility. */
1155     aux.data = data;
1156     aux.columns = columns;
1157     aux.n_columns = n_columns;
1158     sort(rows->n, compare_rows, swap_rows, &aux);
1159
1160     /* Add column headings. */
1161     table_init(&t);
1162     table_set_caption(&t, xasprintf("%s table", table_name));
1163     for (x = 0; x < n_columns; x++) {
1164         table_add_column(&t, "%s", columns[x]->name);
1165     }
1166
1167     /* Print rows. */
1168     for (y = 0; y < rows->n; y++) {
1169         table_add_row(&t);
1170         for (x = 0; x < n_columns; x++) {
1171             struct cell *cell = table_add_cell(&t);
1172             cell->json = ovsdb_datum_to_json(&data[y][x], &columns[x]->type);
1173             cell->type = &columns[x]->type;
1174             ovsdb_datum_destroy(&data[y][x], &columns[x]->type);
1175         }
1176         free(data[y]);
1177     }
1178     table_print(&t, &table_style);
1179     table_destroy(&t);
1180
1181     free(data);
1182     free(columns);
1183 }
1184
1185 static void
1186 do_dump(struct jsonrpc *rpc, const char *database,
1187         int argc, char *argv[])
1188 {
1189     struct jsonrpc_msg *request, *reply;
1190     struct ovsdb_schema *schema;
1191     struct json *transaction;
1192
1193     const struct shash_node *node, **tables;
1194     size_t n_tables;
1195     struct ovsdb_table_schema *tschema;
1196     const struct shash *columns;
1197     struct shash custom_columns;
1198
1199     size_t i;
1200
1201     shash_init(&custom_columns);
1202     schema = fetch_schema(rpc, database);
1203     if (argc) {
1204         node = shash_find(&schema->tables, argv[0]);
1205         if (!node) {
1206             ovs_fatal(0, "No table \"%s\" found.", argv[0]);
1207         }
1208         tables = xmemdup(&node, sizeof(&node));
1209         n_tables = 1;
1210         tschema = tables[0]->data;
1211         for (i = 1; i < argc; i++) {
1212             node = shash_find(&tschema->columns, argv[i]);
1213             if (!node) {
1214                 ovs_fatal(0, "Table \"%s\" has no column %s.", argv[0], argv[1]);
1215             }
1216             shash_add(&custom_columns, argv[1], node->data);
1217         }
1218     } else {
1219         tables = shash_sort(&schema->tables);
1220         n_tables = shash_count(&schema->tables);
1221     }
1222
1223     /* Construct transaction to retrieve entire database. */
1224     transaction = json_array_create_1(json_string_create(database));
1225     for (i = 0; i < n_tables; i++) {
1226         const struct ovsdb_table_schema *ts = tables[i]->data;
1227         struct json *op, *jcolumns;
1228
1229         if (argc > 1) {
1230             columns = &custom_columns;
1231         } else {
1232             columns = &ts->columns;
1233         }
1234         jcolumns = json_array_create_empty();
1235         SHASH_FOR_EACH (node, columns) {
1236             const struct ovsdb_column *column = node->data;
1237
1238             if (strcmp(column->name, "_version")) {
1239                 json_array_add(jcolumns, json_string_create(column->name));
1240             }
1241         }
1242
1243         op = json_object_create();
1244         json_object_put_string(op, "op", "select");
1245         json_object_put_string(op, "table", tables[i]->name);
1246         json_object_put(op, "where", json_array_create_empty());
1247         json_object_put(op, "columns", jcolumns);
1248         json_array_add(transaction, op);
1249     }
1250
1251     /* Send request, get reply. */
1252     request = jsonrpc_create_request("transact", transaction, NULL);
1253     check_txn(jsonrpc_transact_block(rpc, request, &reply), &reply);
1254
1255     /* Print database contents. */
1256     if (reply->result->type != JSON_ARRAY
1257         || reply->result->u.array.n != n_tables) {
1258         ovs_fatal(0, "reply is not array of %"PRIuSIZE" elements: %s",
1259                   n_tables, json_to_string(reply->result, 0));
1260     }
1261     for (i = 0; i < n_tables; i++) {
1262         const struct ovsdb_table_schema *ts = tables[i]->data;
1263         const struct json *op_result = reply->result->u.array.elems[i];
1264         struct json *rows;
1265
1266         if (op_result->type != JSON_OBJECT
1267             || !(rows = shash_find_data(json_object(op_result), "rows"))
1268             || rows->type != JSON_ARRAY) {
1269             ovs_fatal(0, "%s table reply is not an object with a \"rows\" "
1270                       "member array: %s",
1271                       ts->name, json_to_string(op_result, 0));
1272         }
1273
1274         if (argc > 1) {
1275             dump_table(tables[i]->name, &custom_columns, &rows->u.array);
1276         } else {
1277             dump_table(tables[i]->name, &ts->columns, &rows->u.array);
1278         }
1279     }
1280
1281     jsonrpc_msg_destroy(reply);
1282     shash_destroy(&custom_columns);
1283     free(tables);
1284     ovsdb_schema_destroy(schema);
1285 }
1286
1287 static void
1288 do_help(struct jsonrpc *rpc OVS_UNUSED, const char *database OVS_UNUSED,
1289         int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
1290 {
1291     usage();
1292 }
1293
1294 /* All command handlers (except for "help") are expected to take an optional
1295  * server socket name (e.g. "unix:...") as their first argument.  The socket
1296  * name argument must be included in max_args (but left out of min_args).  The
1297  * command name and socket name are not included in the arguments passed to the
1298  * handler: the argv[0] passed to the handler is the first argument after the
1299  * optional server socket name.  The connection to the server is available as
1300  * global variable 'rpc'. */
1301 static const struct ovsdb_client_command all_commands[] = {
1302     { "list-dbs",           NEED_RPC,      0, 0,       do_list_dbs },
1303     { "get-schema",         NEED_DATABASE, 0, 0,       do_get_schema },
1304     { "get-schema-version", NEED_DATABASE, 0, 0,       do_get_schema_version },
1305     { "list-tables",        NEED_DATABASE, 0, 0,       do_list_tables },
1306     { "list-columns",       NEED_DATABASE, 0, 1,       do_list_columns },
1307     { "transact",           NEED_RPC,      1, 1,       do_transact },
1308     { "monitor",            NEED_DATABASE, 1, INT_MAX, do_monitor },
1309     { "monitor2",           NEED_DATABASE, 1, INT_MAX, do_monitor2 },
1310     { "dump",               NEED_DATABASE, 0, INT_MAX, do_dump },
1311     { "help",               NEED_NONE,     0, INT_MAX, do_help },
1312
1313     { NULL,                 0,             0, 0,       NULL },
1314 };
1315
1316 static const struct ovsdb_client_command *get_all_commands(void)
1317 {
1318     return all_commands;
1319 }