unixctl: Log commands received and their replies (at debug level).
[cascardo/ovs.git] / lib / unixctl.c
index 7d6fdd6..c1a5048 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009 Nicira Networks.
+ * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #include <config.h>
 #include "unixctl.h"
-#include <assert.h>
-#include <ctype.h>
 #include <errno.h>
-#include <poll.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/stat.h>
 #include <unistd.h>
 #include "coverage.h"
 #include "dirs.h"
 #include "dynamic-string.h"
-#include "fatal-signal.h"
+#include "json.h"
+#include "jsonrpc.h"
 #include "list.h"
-#include "ofpbuf.h"
 #include "poll-loop.h"
 #include "shash.h"
-#include "socket-util.h"
-#include "util.h"
+#include "stream.h"
+#include "stream-provider.h"
+#include "svec.h"
+#include "openvswitch/vlog.h"
 
-#ifndef SCM_CREDENTIALS
-#include <time.h>
-#endif
+VLOG_DEFINE_THIS_MODULE(unixctl);
 
-#define THIS_MODULE VLM_unixctl
-#include "vlog.h"
+COVERAGE_DEFINE(unixctl_received);
+COVERAGE_DEFINE(unixctl_replied);
 \f
 struct unixctl_command {
-    void (*cb)(struct unixctl_conn *, const char *args);
+    const char *usage;
+    int min_args, max_args;
+    unixctl_cb_func *cb;
+    void *aux;
 };
 
 struct unixctl_conn {
-    struct list node;
-    int fd;
+    struct ovs_list node;
+    struct jsonrpc *rpc;
 
-    enum { S_RECV, S_PROCESS, S_SEND } state;
-    struct ofpbuf in;
-    struct ds out;
-    size_t out_pos;
+    /* Only one request can be in progress at a time.  While the request is
+     * being processed, 'request_id' is populated, otherwise it is null. */
+    struct json *request_id;   /* ID of the currently active request. */
 };
 
 /* Server for control connection. */
 struct unixctl_server {
-    char *path;
-    int fd;
-    struct list conns;
-};
-
-/* Client for control connection. */
-struct unixctl_client {
-    char *connect_path;
-    char *bind_path;
-    FILE *stream;
+    struct pstream *listener;
+    struct ovs_list conns;
 };
 
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
@@ -76,86 +63,129 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
 static struct shash commands = SHASH_INITIALIZER(&commands);
 
 static void
-unixctl_help(struct unixctl_conn *conn, const char *args UNUSED)
+unixctl_list_commands(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                      const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
 {
     struct ds ds = DS_EMPTY_INITIALIZER;
-    struct shash_node *node;
+    const struct shash_node **nodes = shash_sort(&commands);
+    size_t i;
 
     ds_put_cstr(&ds, "The available commands are:\n");
-    SHASH_FOR_EACH (node, &commands) {
-        ds_put_format(&ds, "\t%s\n", node->name);
+
+    for (i = 0; i < shash_count(&commands); i++) {
+        const struct shash_node *node = nodes[i];
+        const struct unixctl_command *command = node->data;
+
+        ds_put_format(&ds, "  %-23s %s\n", node->name, command->usage);
     }
-    unixctl_command_reply(conn, 214, ds_cstr(&ds));
+    free(nodes);
+
+    unixctl_command_reply(conn, ds_cstr(&ds));
     ds_destroy(&ds);
 }
 
+static void
+unixctl_version(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
+{
+    unixctl_command_reply(conn, ovs_get_program_version());
+}
+
+/* Registers a unixctl command with the given 'name'.  'usage' describes the
+ * arguments to the command; it is used only for presentation to the user in
+ * "list-commands" output.
+ *
+ * 'cb' is called when the command is received.  It is passed an array
+ * containing the command name and arguments, plus a copy of 'aux'.  Normally
+ * 'cb' should reply by calling unixctl_command_reply() or
+ * unixctl_command_reply_error() before it returns, but if the command cannot
+ * be handled immediately then it can defer the reply until later.  A given
+ * connection can only process a single request at a time, so a reply must be
+ * made eventually to avoid blocking that connection. */
 void
-unixctl_command_register(const char *name,
-                         void (*cb)(struct unixctl_conn *, const char *args))
+unixctl_command_register(const char *name, const char *usage,
+                         int min_args, int max_args,
+                         unixctl_cb_func *cb, void *aux)
 {
     struct unixctl_command *command;
+    struct unixctl_command *lookup = shash_find_data(&commands, name);
+
+    ovs_assert(!lookup || lookup->cb == cb);
+
+    if (lookup) {
+        return;
+    }
 
-    assert(!shash_find_data(&commands, name)
-           || shash_find_data(&commands, name) == cb);
     command = xmalloc(sizeof *command);
+    command->usage = usage;
+    command->min_args = min_args;
+    command->max_args = max_args;
     command->cb = cb;
+    command->aux = aux;
     shash_add(&commands, name, command);
 }
 
-static const char *
-translate_reply_code(int code)
+static void
+unixctl_command_reply__(struct unixctl_conn *conn,
+                        bool success, const char *body)
 {
-    switch (code) {
-    case 200: return "OK";
-    case 201: return "Created";
-    case 202: return "Accepted";
-    case 204: return "No Content";
-    case 211: return "System Status";
-    case 214: return "Help";
-    case 400: return "Bad Request";
-    case 401: return "Unauthorized";
-    case 403: return "Forbidden";
-    case 404: return "Not Found";
-    case 500: return "Internal Server Error";
-    case 501: return "Invalid Argument";
-    case 503: return "Service Unavailable";
-    default: return "Unknown";
+    struct json *body_json;
+    struct jsonrpc_msg *reply;
+
+    COVERAGE_INC(unixctl_replied);
+    ovs_assert(conn->request_id);
+
+    if (!body) {
+        body = "";
+    }
+
+    if (body[0] && body[strlen(body) - 1] != '\n') {
+        body_json = json_string_create_nocopy(xasprintf("%s\n", body));
+    } else {
+        body_json = json_string_create(body);
+    }
+
+    if (success) {
+        reply = jsonrpc_create_reply(body_json, conn->request_id);
+    } else {
+        reply = jsonrpc_create_error(body_json, conn->request_id);
+    }
+
+    if (VLOG_IS_DBG_ENABLED()) {
+        char *id = json_to_string(conn->request_id, 0);
+        VLOG_DBG("replying with %s, id=%s: \"%s\"",
+                 success ? "success" : "error", id, body);
+        free(id);
     }
+
+    /* If jsonrpc_send() returns an error, the run loop will take care of the
+     * problem eventually. */
+    jsonrpc_send(conn->rpc, reply);
+    json_destroy(conn->request_id);
+    conn->request_id = NULL;
 }
 
+/* Replies to the active unixctl connection 'conn'.  'result' is sent to the
+ * client indicating the command was processed successfully.  Only one call to
+ * unixctl_command_reply() or unixctl_command_reply_error() may be made per
+ * request. */
 void
-unixctl_command_reply(struct unixctl_conn *conn,
-                      int code, const char *body)
+unixctl_command_reply(struct unixctl_conn *conn, const char *result)
 {
-    struct ds *out = &conn->out;
+    unixctl_command_reply__(conn, true, result);
+}
 
-    COVERAGE_INC(unixctl_replied);
-    assert(conn->state == S_PROCESS);
-    conn->state = S_SEND;
-    conn->out_pos = 0;
-
-    ds_clear(out);
-    ds_put_format(out, "%03d %s\n", code, translate_reply_code(code));
-    if (body) {
-        const char *p;
-        for (p = body; *p != '\0'; ) {
-            size_t n = strcspn(p, "\n");
-
-            if (*p == '.') {
-                ds_put_char(out, '.');
-            }
-            ds_put_buffer(out, p, n);
-            ds_put_char(out, '\n');
-            p += n;
-            if (*p == '\n') {
-                p++;
-            }
-        }
-    }
-    ds_put_cstr(out, ".\n");
+/* Replies to the active unixctl connection 'conn'. 'error' is sent to the
+ * client indicating an error occurred processing the command.  Only one call to
+ * unixctl_command_reply() or unixctl_command_reply_error() may be made per
+ * request. */
+void
+unixctl_command_reply_error(struct unixctl_conn *conn, const char *error)
+{
+    unixctl_command_reply__(conn, false, error);
 }
 
-/* Creates a unixctl server listening on 'path', which may be:
+/* Creates a unixctl server listening on 'path', which for POSIX may be:
  *
  *      - NULL, in which case <rundir>/<program>.<pid>.ctl is used.
  *
@@ -165,222 +195,176 @@ unixctl_command_reply(struct unixctl_conn *conn,
  *      - An absolute path (starting with '/') that gives the exact name of
  *        the Unix domain socket to listen on.
  *
+ * For Windows, a kernel assigned TCP port is used and written in 'path'
+ * which may be:
+ *
+ *      - NULL, in which case <rundir>/<program>.ctl is used.
+ *
+ *      - An absolute path that gives the name of the file.
+ *
+ * For both POSIX and Windows, if the path is "none", the function will
+ * return successfully but no socket will actually be created.
+ *
  * A program that (optionally) daemonizes itself should call this function
  * *after* daemonization, so that the socket name contains the pid of the
  * daemon instead of the pid of the program that exited.  (Otherwise,
- * "ovs-appctl --target <program>.pid" will fail.)
+ * "ovs-appctl --target=<program>" will fail.)
  *
  * Returns 0 if successful, otherwise a positive errno value.  If successful,
- * sets '*serverp' to the new unixctl_server, otherwise to NULL. */
+ * sets '*serverp' to the new unixctl_server (or to NULL if 'path' was "none"),
+ * otherwise to NULL. */
 int
 unixctl_server_create(const char *path, struct unixctl_server **serverp)
 {
     struct unixctl_server *server;
+    struct pstream *listener;
+    char *punix_path;
     int error;
 
-    unixctl_command_register("help", unixctl_help);
-
-    server = xmalloc(sizeof *server);
-    list_init(&server->conns);
+    *serverp = NULL;
+    if (path && !strcmp(path, "none")) {
+        return 0;
+    }
 
     if (path) {
-        if (path[0] == '/') {
-            server->path = xstrdup(path);
-        } else {
-            server->path = xasprintf("%s/%s", ovs_rundir, path);
-        }
+        char *abs_path;
+#ifndef _WIN32
+        abs_path = abs_file_name(ovs_rundir(), path);
+#else
+        abs_path = xstrdup(path);
+#endif
+        punix_path = xasprintf("punix:%s", abs_path);
+        free(abs_path);
     } else {
-        server->path = xasprintf("%s/%s.%ld.ctl", ovs_rundir,
-                                 program_name, (long int) getpid());
-    }
-
-    server->fd = make_unix_socket(SOCK_STREAM, true, false, server->path,
-                                  NULL);
-    if (server->fd < 0) {
-        error = -server->fd;
-        fprintf(stderr, "Could not initialize control socket %s (%s)\n",
-                server->path, strerror(error));
-        goto error;
+#ifndef _WIN32
+        punix_path = xasprintf("punix:%s/%s.%ld.ctl", ovs_rundir(),
+                               program_name, (long int) getpid());
+#else
+        punix_path = xasprintf("punix:%s/%s.ctl", ovs_rundir(), program_name);
+#endif
     }
 
-    if (chmod(server->path, S_IRUSR | S_IWUSR) < 0) {
-        error = errno;
-        fprintf(stderr, "Failed to chmod control socket %s (%s)\n",
-                server->path, strerror(error));
-        goto error;
+    error = pstream_open(punix_path, &listener, 0);
+    if (error) {
+        ovs_error(error, "could not initialize control socket %s", punix_path);
+        goto exit;
     }
 
-    if (listen(server->fd, 10) < 0) {
-        error = errno;
-        fprintf(stderr, "Failed to listen on control socket %s (%s)\n",
-                server->path, strerror(error));
-        goto error;
-    }
+    unixctl_command_register("list-commands", "", 0, 0, unixctl_list_commands,
+                             NULL);
+    unixctl_command_register("version", "", 0, 0, unixctl_version, NULL);
 
+    server = xmalloc(sizeof *server);
+    server->listener = listener;
+    list_init(&server->conns);
     *serverp = server;
-    return 0;
 
-error:
-    if (server->fd >= 0) {
-        close(server->fd);
-    }
-    free(server->path);
-    free(server);
-    *serverp = NULL;
+exit:
+    free(punix_path);
     return error;
 }
 
 static void
-new_connection(struct unixctl_server *server, int fd)
+process_command(struct unixctl_conn *conn, struct jsonrpc_msg *request)
 {
-    struct unixctl_conn *conn;
-
-    set_nonblocking(fd);
-
-    conn = xmalloc(sizeof *conn);
-    list_push_back(&server->conns, &conn->node);
-    conn->fd = fd;
-    conn->state = S_RECV;
-    ofpbuf_init(&conn->in, 128);
-    ds_init(&conn->out);
-    conn->out_pos = 0;
-}
-
-static int
-run_connection_output(struct unixctl_conn *conn)
-{
-    while (conn->out_pos < conn->out.length) {
-        size_t bytes_written;
-        int error;
-
-        error = write_fully(conn->fd, conn->out.string + conn->out_pos,
-                            conn->out.length - conn->out_pos, &bytes_written);
-        conn->out_pos += bytes_written;
-        if (error) {
-            return error;
-        }
-    }
-    conn->state = S_RECV;
-    return 0;
-}
+    char *error = NULL;
 
-static void
-process_command(struct unixctl_conn *conn, char *s)
-{
     struct unixctl_command *command;
-    size_t name_len;
-    char *name, *args;
+    struct json_array *params;
 
     COVERAGE_INC(unixctl_received);
-    conn->state = S_PROCESS;
-
-    name = s;
-    name_len = strcspn(name, " ");
-    args = name + name_len;
-    args += strspn(args, " ");
-    name[name_len] = '\0';
-
-    command = shash_find_data(&commands, name);
-    if (command) {
-        command->cb(conn, args);
-    } else {
-        char *msg = xasprintf("\"%s\" is not a valid command", name);
-        unixctl_command_reply(conn, 400, msg);
-        free(msg);
+    conn->request_id = json_clone(request->id);
+
+    if (VLOG_IS_DBG_ENABLED()) {
+        char *params_s = json_to_string(request->params, 0);
+        char *id_s = json_to_string(request->id, 0);
+        VLOG_DBG("received request %s%s, id=%s",
+                 request->method, params_s, id_s);
+        free(params_s);
+        free(id_s);
     }
-}
 
-static int
-run_connection_input(struct unixctl_conn *conn)
-{
-    for (;;) {
-        size_t bytes_read;
-        char *newline;
-        int error;
-
-        newline = memchr(conn->in.data, '\n', conn->in.size);
-        if (newline) {
-            char *command = conn->in.data;
-            size_t n = newline - command + 1;
-
-            if (n > 0 && newline[-1] == '\r') {
-                newline--;
-            }
-            *newline = '\0';
-
-            process_command(conn, command);
-
-            ofpbuf_pull(&conn->in, n);
-            if (!conn->in.size) {
-                ofpbuf_clear(&conn->in);
+    params = json_array(request->params);
+    command = shash_find_data(&commands, request->method);
+    if (!command) {
+        error = xasprintf("\"%s\" is not a valid command", request->method);
+    } else if (params->n < command->min_args) {
+        error = xasprintf("\"%s\" command requires at least %d arguments",
+                          request->method, command->min_args);
+    } else if (params->n > command->max_args) {
+        error = xasprintf("\"%s\" command takes at most %d arguments",
+                          request->method, command->max_args);
+    } else {
+        struct svec argv = SVEC_EMPTY_INITIALIZER;
+        int  i;
+
+        svec_add(&argv, request->method);
+        for (i = 0; i < params->n; i++) {
+            if (params->elems[i]->type != JSON_STRING) {
+                error = xasprintf("\"%s\" command has non-string argument",
+                                  request->method);
+                break;
             }
-            return 0;
+            svec_add(&argv, json_string(params->elems[i]));
         }
+        svec_terminate(&argv);
 
-        ofpbuf_prealloc_tailroom(&conn->in, 128);
-        error = read_fully(conn->fd, ofpbuf_tail(&conn->in),
-                           ofpbuf_tailroom(&conn->in), &bytes_read);
-        conn->in.size += bytes_read;
-        if (conn->in.size > 65536) {
-            VLOG_WARN_RL(&rl, "excess command length, killing connection");
-            return EPROTO;
-        }
-        if (error) {
-            if (error == EAGAIN || error == EWOULDBLOCK) {
-                if (!bytes_read) {
-                    return EAGAIN;
-                }
-            } else {
-                if (error != EOF || conn->in.size != 0) {
-                    VLOG_WARN_RL(&rl, "read failed: %s",
-                                 (error == EOF
-                                  ? "connection dropped mid-command"
-                                  : strerror(error)));
-                }
-                return error;
-            }
+        if (!error) {
+            command->cb(conn, argv.n, (const char **) argv.names,
+                        command->aux);
         }
+
+        svec_destroy(&argv);
+    }
+
+    if (error) {
+        unixctl_command_reply_error(conn, error);
+        free(error);
     }
 }
 
 static int
 run_connection(struct unixctl_conn *conn)
 {
-    int old_state;
-    do {
-        int error;
+    int error, i;
 
-        old_state = conn->state;
-        switch (conn->state) {
-        case S_RECV:
-            error = run_connection_input(conn);
-            break;
+    jsonrpc_run(conn->rpc);
+    error = jsonrpc_get_status(conn->rpc);
+    if (error || jsonrpc_get_backlog(conn->rpc)) {
+        return error;
+    }
 
-        case S_PROCESS:
-            error = 0;
-            break;
+    for (i = 0; i < 10; i++) {
+        struct jsonrpc_msg *msg;
 
-        case S_SEND:
-            error = run_connection_output(conn);
+        if (error || conn->request_id) {
             break;
-
-        default:
-            NOT_REACHED();
         }
-        if (error) {
-            return error;
+
+        jsonrpc_recv(conn->rpc, &msg);
+        if (msg) {
+            if (msg->type == JSONRPC_REQUEST) {
+                process_command(conn, msg);
+            } else {
+                VLOG_WARN_RL(&rl, "%s: received unexpected %s message",
+                             jsonrpc_get_name(conn->rpc),
+                             jsonrpc_msg_type_to_string(msg->type));
+                error = EINVAL;
+            }
+            jsonrpc_msg_destroy(msg);
         }
-    } while (conn->state != old_state);
-    return 0;
+        error = error ? error : jsonrpc_get_status(conn->rpc);
+    }
+
+    return error;
 }
 
 static void
 kill_connection(struct unixctl_conn *conn)
 {
     list_remove(&conn->node);
-    ofpbuf_uninit(&conn->in);
-    ds_destroy(&conn->out);
-    close(conn->fd);
+    jsonrpc_close(conn->rpc);
+    json_destroy(conn->request_id);
     free(conn);
 }
 
@@ -390,19 +374,29 @@ unixctl_server_run(struct unixctl_server *server)
     struct unixctl_conn *conn, *next;
     int i;
 
+    if (!server) {
+        return;
+    }
+
     for (i = 0; i < 10; i++) {
-        int fd = accept(server->fd, NULL, NULL);
-        if (fd < 0) {
-            if (errno != EAGAIN && errno != EWOULDBLOCK) {
-                VLOG_WARN_RL(&rl, "accept failed: %s", strerror(errno));
-            }
+        struct stream *stream;
+        int error;
+
+        error = pstream_accept(server->listener, &stream);
+        if (!error) {
+            struct unixctl_conn *conn = xzalloc(sizeof *conn);
+            list_push_back(&server->conns, &conn->node);
+            conn->rpc = jsonrpc_open(stream);
+        } else if (error == EAGAIN) {
             break;
+        } else {
+            VLOG_WARN_RL(&rl, "%s: accept failed: %s",
+                         pstream_get_name(server->listener),
+                         ovs_strerror(error));
         }
-        new_connection(server, fd);
     }
 
-    LIST_FOR_EACH_SAFE (conn, next,
-                        struct unixctl_conn, node, &server->conns) {
+    LIST_FOR_EACH_SAFE (conn, next, node, &server->conns) {
         int error = run_connection(conn);
         if (error && error != EAGAIN) {
             kill_connection(conn);
@@ -415,12 +409,15 @@ unixctl_server_wait(struct unixctl_server *server)
 {
     struct unixctl_conn *conn;
 
-    poll_fd_wait(server->fd, POLLIN);
-    LIST_FOR_EACH (conn, struct unixctl_conn, node, &server->conns) {
-        if (conn->state == S_RECV) {
-            poll_fd_wait(conn->fd, POLLIN);
-        } else if (conn->state == S_SEND) {
-            poll_fd_wait(conn->fd, POLLOUT);
+    if (!server) {
+        return;
+    }
+
+    pstream_wait(server->listener);
+    LIST_FOR_EACH (conn, node, &server->conns) {
+        jsonrpc_wait(conn->rpc);
+        if (!jsonrpc_get_backlog(conn->rpc)) {
+            jsonrpc_recv_wait(conn->rpc);
         }
     }
 }
@@ -432,163 +429,105 @@ unixctl_server_destroy(struct unixctl_server *server)
     if (server) {
         struct unixctl_conn *conn, *next;
 
-        LIST_FOR_EACH_SAFE (conn, next,
-                            struct unixctl_conn, node, &server->conns) {
+        LIST_FOR_EACH_SAFE (conn, next, node, &server->conns) {
             kill_connection(conn);
         }
 
-        close(server->fd);
-        unlink(server->path);
-        fatal_signal_remove_file_to_unlink(server->path);
-        free(server->path);
+        pstream_close(server->listener);
         free(server);
     }
 }
 \f
-/* Connects to a Vlog server socket.  'path' should be the name of a Vlog
- * server socket.  If it does not start with '/', it will be prefixed with
- * ovs_rundir (e.g. /var/run).
+/* On POSIX based systems, connects to a unixctl server socket.  'path' should
+ * be the name of a unixctl server socket.  If it does not start with '/', it
+ * will be prefixed with the rundir (e.g. /usr/local/var/run/openvswitch).
+ *
+ * On Windows, connects to a localhost TCP port as written inside 'path'.
+ * 'path' should be an absolute path of the file.
  *
  * Returns 0 if successful, otherwise a positive errno value.  If successful,
- * sets '*clientp' to the new unixctl_client, otherwise to NULL. */
+ * sets '*client' to the new jsonrpc, otherwise to NULL. */
 int
-unixctl_client_create(const char *path, struct unixctl_client **clientp)
+unixctl_client_create(const char *path, struct jsonrpc **client)
 {
-    static int counter;
-    struct unixctl_client *client;
+    char *abs_path, *unix_path;
+    struct stream *stream;
     int error;
-    int fd = -1;
 
-    /* Determine location. */
-    client = xmalloc(sizeof *client);
-    if (path[0] == '/') {
-        client->connect_path = xstrdup(path);
-    } else {
-        client->connect_path = xasprintf("%s/%s", ovs_rundir, path);
-    }
-    client->bind_path = xasprintf("/tmp/vlog.%ld.%d",
-                                  (long int) getpid(), counter++);
-
-    /* Open socket. */
-    fd = make_unix_socket(SOCK_STREAM, false, false,
-                          client->bind_path, client->connect_path);
-    if (fd < 0) {
-        error = -fd;
-        goto error;
-    }
+#ifdef _WIN32
+    abs_path = xstrdup(path);
+#else
+    abs_path = abs_file_name(ovs_rundir(), path);
+#endif
+    unix_path = xasprintf("unix:%s", abs_path);
 
-    /* Bind socket to stream. */
-    client->stream = fdopen(fd, "r+");
-    if (!client->stream) {
-        error = errno;
-        VLOG_WARN("%s: fdopen failed (%s)",
-                  client->connect_path, strerror(error));
-        goto error;
-    }
-    *clientp = client;
-    return 0;
+    *client = NULL;
 
-error:
-    if (fd >= 0) {
-        close(fd);
-    }
-    free(client->connect_path);
-    free(client->bind_path);
-    free(client);
-    *clientp = NULL;
-    return error;
-}
+    error = stream_open_block(stream_open(unix_path, &stream, DSCP_DEFAULT),
+                              &stream);
+    free(unix_path);
+    free(abs_path);
 
-/* Destroys 'client'. */
-void
-unixctl_client_destroy(struct unixctl_client *client)
-{
-    if (client) {
-        unlink(client->bind_path);
-        fatal_signal_remove_file_to_unlink(client->bind_path);
-        free(client->bind_path);
-        free(client->connect_path);
-        fclose(client->stream);
-        free(client);
+    if (error) {
+        VLOG_WARN("failed to connect to %s", path);
+        return error;
     }
+
+    *client = jsonrpc_open(stream);
+    return 0;
 }
 
-/* Sends 'request' to the server socket and waits for a reply.  Returns 0 if
- * successful, otherwise to a positive errno value.  If successful, sets
- * '*reply' to the reply, which the caller must free, otherwise to NULL. */
+/* Executes 'command' on the server with an argument vector 'argv' containing
+ * 'argc' elements.  If successfully communicated with the server, returns 0
+ * and sets '*result', or '*err' (not both) to the result or error the server
+ * returned.  Otherwise, sets '*result' and '*err' to NULL and returns a
+ * positive errno value.  The caller is responsible for freeing '*result' or
+ * '*err' if not NULL. */
 int
-unixctl_client_transact(struct unixctl_client *client,
-                        const char *request,
-                        int *reply_code, char **reply_body)
+unixctl_client_transact(struct jsonrpc *client, const char *command, int argc,
+                        char *argv[], char **result, char **err)
 {
-    struct ds line = DS_EMPTY_INITIALIZER;
-    struct ds reply = DS_EMPTY_INITIALIZER;
-    int error;
+    struct jsonrpc_msg *request, *reply;
+    struct json **json_args, *params;
+    int error, i;
 
-    /* Send 'request' to server.  Add a new-line if 'request' didn't end in
-     * one. */
-    fputs(request, client->stream);
-    if (request[0] == '\0' || request[strlen(request) - 1] != '\n') {
-        putc('\n', client->stream);
+    *result = NULL;
+    *err = NULL;
+
+    json_args = xmalloc(argc * sizeof *json_args);
+    for (i = 0; i < argc; i++) {
+        json_args[i] = json_string_create(argv[i]);
     }
-    if (ferror(client->stream)) {
-        VLOG_WARN("error sending request to %s: %s",
-                  client->connect_path, strerror(errno));
-        return errno;
+    params = json_array_create(json_args, argc);
+    request = jsonrpc_create_request(command, params, NULL);
+
+    error = jsonrpc_transact_block(client, request, &reply);
+    if (error) {
+        VLOG_WARN("error communicating with %s: %s", jsonrpc_get_name(client),
+                  ovs_retval_to_string(error));
+        return error;
     }
 
-    /* Wait for response. */
-    *reply_code = -1;
-    for (;;) {
-        const char *s;
-
-        error = ds_get_line(&line, client->stream);
-        if (error) {
-            VLOG_WARN("error reading reply from %s: %s",
-                      client->connect_path,
-                      (error == EOF ? "unexpected end of file"
-                       : strerror(error)));
-            goto error;
+    if (reply->error) {
+        if (reply->error->type == JSON_STRING) {
+            *err = xstrdup(json_string(reply->error));
+        } else {
+            VLOG_WARN("%s: unexpected error type in JSON RPC reply: %s",
+                      jsonrpc_get_name(client),
+                      json_type_to_string(reply->error->type));
+            error = EINVAL;
         }
-
-        s = ds_cstr(&line);
-        if (*reply_code == -1) {
-            if (!isdigit((unsigned char)s[0])
-                    || !isdigit((unsigned char)s[1])
-                    || !isdigit((unsigned char)s[2])) {
-                VLOG_WARN("reply from %s does not start with 3-digit code",
-                          client->connect_path);
-                error = EPROTO;
-                goto error;
-            }
-            sscanf(s, "%3d", reply_code);
+    } else if (reply->result) {
+        if (reply->result->type == JSON_STRING) {
+            *result = xstrdup(json_string(reply->result));
         } else {
-            if (s[0] == '.') {
-                if (s[1] == '\0') {
-                    break;
-                }
-                s++;
-            }
-            ds_put_cstr(&reply, s);
-            ds_put_char(&reply, '\n');
+            VLOG_WARN("%s: unexpected result type in JSON rpc reply: %s",
+                      jsonrpc_get_name(client),
+                      json_type_to_string(reply->result->type));
+            error = EINVAL;
         }
     }
-    *reply_body = ds_cstr(&reply);
-    ds_destroy(&line);
-    return 0;
 
-error:
-    ds_destroy(&line);
-    ds_destroy(&reply);
-    *reply_code = 0;
-    *reply_body = NULL;
-    return error == EOF ? EPROTO : error;
-}
-
-/* Returns the path of the server socket to which 'client' is connected.  The
- * caller must not modify or free the returned string. */
-const char *
-unixctl_client_target(const struct unixctl_client *client)
-{
-    return client->connect_path;
+    jsonrpc_msg_destroy(reply);
+    return error;
 }