ovsdb-server: Reconnect to clients specified on --connect.
authorBen Pfaff <blp@nicira.com>
Thu, 12 Nov 2009 20:58:53 +0000 (12:58 -0800)
committerBen Pfaff <blp@nicira.com>
Thu, 12 Nov 2009 20:58:53 +0000 (12:58 -0800)
ovsdb/SPECS
ovsdb/jsonrpc-server.c
ovsdb/ovsdb-server.1.in

index 12d9768..f0d7748 100644 (file)
@@ -277,6 +277,28 @@ form:
 
 The "cancel" notification itself has no reply.
 
+echo
+....
+
+Request object members:
+
+    "method": "echo"                                required
+    "params": any JSON value                        required
+    "id": any JSON value                            required
+
+Response object members:
+
+    "result": same as "params"
+    "error": null
+    "id": the request "id" member
+
+Both the JSON-RPC client and the server must implement this request.
+
+This JSON-RPC request and response can be used to implement connection
+keepalives, by allowing the server to check that the client is still
+there or vice versa.
+
+
 Notation for the Wire Protocol
 ------------------------------
 
index e97a2c3..7a33d77 100644 (file)
@@ -22,6 +22,7 @@
 #include "json.h"
 #include "jsonrpc.h"
 #include "ovsdb.h"
+#include "reconnect.h"
 #include "stream.h"
 #include "svec.h"
 #include "timeval.h"
@@ -44,14 +45,23 @@ static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
 struct ovsdb_jsonrpc_session {
     struct ovsdb_jsonrpc_server *server;
     struct list node;           /* Element in server's sessions list. */
-    struct jsonrpc *rpc;
     struct hmap triggers;
     struct list completions;    /* Completed triggers. */
+
+    struct reconnect *reconnect; /* For back-off. */
+    bool active;                /* Active or passive connection? */
+    struct jsonrpc *rpc;
+    struct stream *stream;      /* Only if active == false and rpc == NULL. */
 };
 
-static void ovsdb_jsonrpc_session_open(struct ovsdb_jsonrpc_server *,
-                                       struct stream *);
+static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
+                                                const char *name);
+static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
+                                                 struct stream *);
 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
+static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
                                              struct jsonrpc_msg *);
 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
@@ -88,16 +98,7 @@ ovsdb_jsonrpc_server_create(struct ovsdb *db, const struct svec *active,
     list_init(&server->sessions);
 
     SVEC_FOR_EACH (i, name, active) {
-        struct stream *stream;
-        int error;
-
-        error = stream_open(name, &stream);
-        if (!error) {
-            ovsdb_jsonrpc_session_open(server, stream);
-        } else {
-            ovs_error(error, "%s: connection failed", name);
-            retval = error;
-        }
+        ovsdb_jsonrpc_session_create_active(server, name);
     }
 
     SVEC_FOR_EACH (i, name, passive) {
@@ -131,7 +132,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
 
         error = pstream_accept(listener, &stream);
         if (!error) {
-            ovsdb_jsonrpc_session_open(svr, stream);
+            ovsdb_jsonrpc_session_create_passive(svr, stream);
         } else if (error == EAGAIN) {
             i++;
         } else if (error) {
@@ -145,33 +146,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
     /* Handle each session. */
     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
                         &svr->sessions) {
-        struct jsonrpc_msg *msg;
-        int error;
-
-        jsonrpc_run(s->rpc);
-
-        while (!list_is_empty(&s->completions)) {
-            struct ovsdb_jsonrpc_trigger *t
-                = CONTAINER_OF(s->completions.next,
-                               struct ovsdb_jsonrpc_trigger, trigger.node);
-            ovsdb_jsonrpc_trigger_complete(t);
-        }
-
-        if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
-            if (msg->type == JSONRPC_REQUEST) {
-                ovsdb_jsonrpc_session_got_request(s, msg);
-            } else if (msg->type == JSONRPC_NOTIFY) {
-                ovsdb_jsonrpc_session_got_notify(s, msg);
-            } else {
-                VLOG_WARN("%s: received unexpected %s message",
-                          jsonrpc_get_name(s->rpc),
-                          jsonrpc_msg_type_to_string(msg->type));
-                jsonrpc_error(s->rpc, EPROTO);
-                jsonrpc_msg_destroy(msg);
-            }
-        }
-
-        error = jsonrpc_get_status(s->rpc);
+        int error = ovsdb_jsonrpc_session_run(s);
         if (error) {
             ovsdb_jsonrpc_session_close(s);
         }
@@ -192,10 +167,7 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
     }
 
     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
-        jsonrpc_wait(s->rpc);
-        if (!jsonrpc_get_backlog(s->rpc)) {
-            jsonrpc_recv_wait(s->rpc);
-        }
+        ovsdb_jsonrpc_session_wait(s);
     }
 }
 
@@ -231,7 +203,7 @@ ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
 {
     struct ovsdb_jsonrpc_session *s = t->session;
 
-    if (!jsonrpc_get_status(s->rpc)) {
+    if (s->rpc && !jsonrpc_get_status(s->rpc)) {
         struct jsonrpc_msg *reply;
         struct json *result;
 
@@ -251,38 +223,174 @@ ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
     free(t);
 }
 
-static void
-ovsdb_jsonrpc_session_open(struct ovsdb_jsonrpc_server *svr,
-                           struct stream *stream)
+static struct ovsdb_jsonrpc_session *
+ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
+                             const char *name, bool active)
 {
     struct ovsdb_jsonrpc_session *s;
 
     s = xzalloc(sizeof *s);
     s->server = svr;
     list_push_back(&svr->sessions, &s->node);
-    s->rpc = jsonrpc_open(stream);
     hmap_init(&s->triggers);
     list_init(&s->completions);
+    s->reconnect = reconnect_create(time_msec());
+    reconnect_set_name(s->reconnect, name);
+    reconnect_enable(s->reconnect, time_msec());
+    s->active = active;
+
     svr->n_sessions++;
+
+    return s;
 }
 
 static void
-ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
+ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
+                                    const char *name)
 {
-    struct ovsdb_jsonrpc_trigger *t, *next;
+    ovsdb_jsonrpc_session_create(svr, name, true);
+}
 
-    jsonrpc_error(s->rpc, EOF);
-    HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
-                        &s->triggers) {
-        ovsdb_jsonrpc_trigger_complete(t);
-    }
+static void
+ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
+                                     struct stream *stream)
+{
+    struct ovsdb_jsonrpc_session *s;
 
-    jsonrpc_close(s->rpc);
+    s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
+    reconnect_connected(s->reconnect, time_msec());
+    s->rpc = jsonrpc_open(stream);
+}
 
+static void
+ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
+{
+    ovsdb_jsonrpc_session_disconnect(s);
     list_remove(&s->node);
     s->server->n_sessions--;
 }
 
+static void
+ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
+{
+    reconnect_disconnected(s->reconnect, time_msec(), 0);
+    if (s->rpc) {
+        struct ovsdb_jsonrpc_trigger *t, *next;
+
+        jsonrpc_error(s->rpc, EOF);
+        HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
+                            &s->triggers) {
+            ovsdb_jsonrpc_trigger_complete(t);
+        }
+
+        jsonrpc_close(s->rpc);
+        s->rpc = NULL;
+    } else if (s->stream) {
+        stream_close(s->stream);
+        s->stream = NULL;
+    }
+}
+
+static void
+ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
+{
+    ovsdb_jsonrpc_session_disconnect(s);
+    if (s->active) {
+        int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
+        if (error) {
+            reconnect_connect_failed(s->reconnect, time_msec(), error);
+        } else {
+            reconnect_connecting(s->reconnect, time_msec());
+        }
+    }
+}
+
+static int
+ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
+{
+    if (s->rpc) {
+        struct jsonrpc_msg *msg;
+        int error;
+
+        jsonrpc_run(s->rpc);
+
+        while (!list_is_empty(&s->completions)) {
+            struct ovsdb_jsonrpc_trigger *t
+                = CONTAINER_OF(s->completions.next,
+                               struct ovsdb_jsonrpc_trigger, trigger.node);
+            ovsdb_jsonrpc_trigger_complete(t);
+        }
+
+        if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
+            reconnect_received(s->reconnect, time_msec());
+            if (msg->type == JSONRPC_REQUEST) {
+                ovsdb_jsonrpc_session_got_request(s, msg);
+            } else if (msg->type == JSONRPC_NOTIFY) {
+                ovsdb_jsonrpc_session_got_notify(s, msg);
+            } else {
+                VLOG_WARN("%s: received unexpected %s message",
+                          jsonrpc_get_name(s->rpc),
+                          jsonrpc_msg_type_to_string(msg->type));
+                jsonrpc_error(s->rpc, EPROTO);
+                jsonrpc_msg_destroy(msg);
+            }
+        }
+
+        error = jsonrpc_get_status(s->rpc);
+        if (error) {
+            if (s->active) {
+                ovsdb_jsonrpc_session_disconnect(s);
+            } else {
+                return error;
+            }
+        }
+    } else if (s->stream) {
+        int error = stream_connect(s->stream);
+        if (!error) {
+            reconnect_connected(s->reconnect, time_msec());
+            s->rpc = jsonrpc_open(s->stream);
+            s->stream = NULL;
+        } else if (error != EAGAIN) {
+            reconnect_connect_failed(s->reconnect, time_msec(), error);
+            stream_close(s->stream);
+            s->stream = NULL;
+        }
+    }
+
+    switch (reconnect_run(s->reconnect, time_msec())) {
+    case RECONNECT_CONNECT:
+        ovsdb_jsonrpc_session_connect(s);
+        break;
+
+    case RECONNECT_DISCONNECT:
+        ovsdb_jsonrpc_session_disconnect(s);
+        break;
+
+    case RECONNECT_PROBE:
+        if (s->rpc) {
+            struct json *params = json_integer_create(0);
+            jsonrpc_send(s->rpc, jsonrpc_create_request("echo", params));
+        }
+        break;
+    }
+    return s->active || s->rpc ? 0 : ETIMEDOUT;
+
+}
+
+static void
+ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
+{
+    if (s->rpc) {
+        jsonrpc_wait(s->rpc);
+        if (!jsonrpc_get_backlog(s->rpc)) {
+            jsonrpc_recv_wait(s->rpc);
+        }
+    } else if (s->stream) {
+        stream_connect_wait(s->stream);
+    }
+    reconnect_wait(s->reconnect, time_msec());
+}
+
 static struct jsonrpc_msg *
 execute_transaction(struct ovsdb_jsonrpc_session *s,
                     struct jsonrpc_msg *request)
@@ -329,6 +437,8 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
     } else if (!strcmp(request->method, "get_schema")) {
         reply = jsonrpc_create_reply(
             ovsdb_schema_to_json(s->server->db->schema), request->id);
+    } else if (!strcmp(request->method, "echo")) {
+        reply = jsonrpc_create_reply(json_clone(request->params), request->id);
     } else {
         reply = jsonrpc_create_error(json_string_create("unknown method"),
                                      request->id);
index d24e443..9a888fc 100644 (file)
@@ -45,9 +45,7 @@ named \fIfile\fR.
 .IP "\fB\-\-connect=\fIremote\fR"
 Makes \fBovsdb\-server\fR initiate a JSON-RPC connection to
 \fIremote\fR, which must take one of the forms listed below.  The
-current implementation only attempts to connect once, and does not
-reconnect after a failure or after the connection closes.  This will
-be fixed later.
+server will reconnect to \fIremote\fR as necessary.
 .
 .RS
 .IP "\fBtcp:\fIip\fB:\fIport\fR"