Use the IANA-assigned ports for OpenFlow and OVSDB.
[cascardo/ovs.git] / lib / jsonrpc.c
1 /*
2  * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include "jsonrpc.h"
20
21 #include <errno.h>
22
23 #include "byteq.h"
24 #include "dynamic-string.h"
25 #include "fatal-signal.h"
26 #include "json.h"
27 #include "list.h"
28 #include "ofpbuf.h"
29 #include "ovs-thread.h"
30 #include "poll-loop.h"
31 #include "reconnect.h"
32 #include "stream.h"
33 #include "timeval.h"
34 #include "openvswitch/vlog.h"
35
36 VLOG_DEFINE_THIS_MODULE(jsonrpc);
37 \f
38 struct jsonrpc {
39     struct stream *stream;
40     char *name;
41     int status;
42
43     /* Input. */
44     struct byteq input;
45     uint8_t input_buffer[512];
46     struct json_parser *parser;
47
48     /* Output. */
49     struct ovs_list output;     /* Contains "struct ofpbuf"s. */
50     size_t output_count;        /* Number of elements in "output". */
51     size_t backlog;
52 };
53
54 /* Rate limit for error messages. */
55 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
56
57 static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
58 static void jsonrpc_cleanup(struct jsonrpc *);
59 static void jsonrpc_error(struct jsonrpc *, int error);
60
61 /* This is just the same as stream_open() except that it uses the default
62  * JSONRPC port if none is specified. */
63 int
64 jsonrpc_stream_open(const char *name, struct stream **streamp, uint8_t dscp)
65 {
66     return stream_open_with_default_port(name, OVSDB_PORT, streamp, dscp);
67 }
68
69 /* This is just the same as pstream_open() except that it uses the default
70  * JSONRPC port if none is specified. */
71 int
72 jsonrpc_pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
73 {
74     return pstream_open_with_default_port(name, OVSDB_PORT, pstreamp, dscp);
75 }
76
77 /* Returns a new JSON-RPC stream that uses 'stream' for input and output.  The
78  * new jsonrpc object takes ownership of 'stream'. */
79 struct jsonrpc *
80 jsonrpc_open(struct stream *stream)
81 {
82     struct jsonrpc *rpc;
83
84     ovs_assert(stream != NULL);
85
86     rpc = xzalloc(sizeof *rpc);
87     rpc->name = xstrdup(stream_get_name(stream));
88     rpc->stream = stream;
89     byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
90     list_init(&rpc->output);
91
92     return rpc;
93 }
94
95 /* Destroys 'rpc', closing the stream on which it is based, and frees its
96  * memory. */
97 void
98 jsonrpc_close(struct jsonrpc *rpc)
99 {
100     if (rpc) {
101         jsonrpc_cleanup(rpc);
102         free(rpc->name);
103         free(rpc);
104     }
105 }
106
107 /* Performs periodic maintenance on 'rpc', such as flushing output buffers. */
108 void
109 jsonrpc_run(struct jsonrpc *rpc)
110 {
111     if (rpc->status) {
112         return;
113     }
114
115     stream_run(rpc->stream);
116     while (!list_is_empty(&rpc->output)) {
117         struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
118         int retval;
119
120         retval = stream_send(rpc->stream, buf->data, buf->size);
121         if (retval >= 0) {
122             rpc->backlog -= retval;
123             ofpbuf_pull(buf, retval);
124             if (!buf->size) {
125                 list_remove(&buf->list_node);
126                 rpc->output_count--;
127                 ofpbuf_delete(buf);
128             }
129         } else {
130             if (retval != -EAGAIN) {
131                 VLOG_WARN_RL(&rl, "%s: send error: %s",
132                              rpc->name, ovs_strerror(-retval));
133                 jsonrpc_error(rpc, -retval);
134             }
135             break;
136         }
137     }
138 }
139
140 /* Arranges for the poll loop to wake up when 'rpc' needs to perform
141  * maintenance activities. */
142 void
143 jsonrpc_wait(struct jsonrpc *rpc)
144 {
145     if (!rpc->status) {
146         stream_run_wait(rpc->stream);
147         if (!list_is_empty(&rpc->output)) {
148             stream_send_wait(rpc->stream);
149         }
150     }
151 }
152
153 /*
154  * Returns the current status of 'rpc'.  The possible return values are:
155  * - 0: no error yet
156  * - >0: errno value
157  * - EOF: end of file (remote end closed connection; not necessarily an error).
158  *
159  * When this functions nonzero, 'rpc' is effectively out of commission.  'rpc'
160  * will not receive any more messages and any further messages that one
161  * attempts to send with 'rpc' will be discarded.  The caller can keep 'rpc'
162  * around as long as it wants, but it's not going to provide any more useful
163  * services.
164  */
165 int
166 jsonrpc_get_status(const struct jsonrpc *rpc)
167 {
168     return rpc->status;
169 }
170
171 /* Returns the number of bytes buffered by 'rpc' to be written to the
172  * underlying stream.  Always returns 0 if 'rpc' has encountered an error or if
173  * the remote end closed the connection. */
174 size_t
175 jsonrpc_get_backlog(const struct jsonrpc *rpc)
176 {
177     return rpc->status ? 0 : rpc->backlog;
178 }
179
180 /* Returns the number of bytes that have been received on 'rpc''s underlying
181  * stream.  (The value wraps around if it exceeds UINT_MAX.) */
182 unsigned int
183 jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
184 {
185     return rpc->input.head;
186 }
187
188 /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
189  * the stream underlying 'rpc' when 'rpc' was created. */
190 const char *
191 jsonrpc_get_name(const struct jsonrpc *rpc)
192 {
193     return rpc->name;
194 }
195
196 static void
197 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
198                 const struct jsonrpc_msg *msg)
199 {
200     if (VLOG_IS_DBG_ENABLED()) {
201         struct ds s = DS_EMPTY_INITIALIZER;
202         if (msg->method) {
203             ds_put_format(&s, ", method=\"%s\"", msg->method);
204         }
205         if (msg->params) {
206             ds_put_cstr(&s, ", params=");
207             json_to_ds(msg->params, 0, &s);
208         }
209         if (msg->result) {
210             ds_put_cstr(&s, ", result=");
211             json_to_ds(msg->result, 0, &s);
212         }
213         if (msg->error) {
214             ds_put_cstr(&s, ", error=");
215             json_to_ds(msg->error, 0, &s);
216         }
217         if (msg->id) {
218             ds_put_cstr(&s, ", id=");
219             json_to_ds(msg->id, 0, &s);
220         }
221         VLOG_DBG("%s: %s %s%s", rpc->name, title,
222                  jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
223         ds_destroy(&s);
224     }
225 }
226
227 /* Schedules 'msg' to be sent on 'rpc' and returns 'rpc''s status (as with
228  * jsonrpc_get_status()).
229  *
230  * If 'msg' cannot be sent immediately, it is appended to a buffer.  The caller
231  * is responsible for ensuring that the amount of buffered data is somehow
232  * limited.  (jsonrpc_get_backlog() returns the amount of data currently
233  * buffered in 'rpc'.)
234  *
235  * Always takes ownership of 'msg', regardless of success. */
236 int
237 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
238 {
239     struct ofpbuf *buf;
240     struct json *json;
241     size_t length;
242     char *s;
243
244     if (rpc->status) {
245         jsonrpc_msg_destroy(msg);
246         return rpc->status;
247     }
248
249     jsonrpc_log_msg(rpc, "send", msg);
250
251     json = jsonrpc_msg_to_json(msg);
252     s = json_to_string(json, 0);
253     length = strlen(s);
254     json_destroy(json);
255
256     buf = xmalloc(sizeof *buf);
257     ofpbuf_use(buf, s, length);
258     buf->size = length;
259     list_push_back(&rpc->output, &buf->list_node);
260     rpc->output_count++;
261     rpc->backlog += length;
262
263     if (rpc->output_count >= 50) {
264         VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
265                      " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
266                      rpc->output_count, rpc->backlog);
267     }
268
269     if (rpc->backlog == length) {
270         jsonrpc_run(rpc);
271     }
272     return rpc->status;
273 }
274
275 /* Attempts to receive a message from 'rpc'.
276  *
277  * If successful, stores the received message in '*msgp' and returns 0.  The
278  * caller takes ownership of '*msgp' and must eventually destroy it with
279  * jsonrpc_msg_destroy().
280  *
281  * Otherwise, stores NULL in '*msgp' and returns one of the following:
282  *
283  *   - EAGAIN: No message has been received.
284  *
285  *   - EOF: The remote end closed the connection gracefully.
286  *
287  *   - Otherwise an errno value that represents a JSON-RPC protocol violation
288  *     or another error fatal to the connection.  'rpc' will not send or
289  *     receive any more messages.
290  */
291 int
292 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
293 {
294     int i;
295
296     *msgp = NULL;
297     if (rpc->status) {
298         return rpc->status;
299     }
300
301     for (i = 0; i < 50; i++) {
302         size_t n, used;
303
304         /* Fill our input buffer if it's empty. */
305         if (byteq_is_empty(&rpc->input)) {
306             size_t chunk;
307             int retval;
308
309             chunk = byteq_headroom(&rpc->input);
310             retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
311             if (retval < 0) {
312                 if (retval == -EAGAIN) {
313                     return EAGAIN;
314                 } else {
315                     VLOG_WARN_RL(&rl, "%s: receive error: %s",
316                                  rpc->name, ovs_strerror(-retval));
317                     jsonrpc_error(rpc, -retval);
318                     return rpc->status;
319                 }
320             } else if (retval == 0) {
321                 jsonrpc_error(rpc, EOF);
322                 return EOF;
323             }
324             byteq_advance_head(&rpc->input, retval);
325         }
326
327         /* We have some input.  Feed it into the JSON parser. */
328         if (!rpc->parser) {
329             rpc->parser = json_parser_create(0);
330         }
331         n = byteq_tailroom(&rpc->input);
332         used = json_parser_feed(rpc->parser,
333                                 (char *) byteq_tail(&rpc->input), n);
334         byteq_advance_tail(&rpc->input, used);
335
336         /* If we have complete JSON, attempt to parse it as JSON-RPC. */
337         if (json_parser_is_done(rpc->parser)) {
338             *msgp = jsonrpc_parse_received_message(rpc);
339             if (*msgp) {
340                 return 0;
341             }
342
343             if (rpc->status) {
344                 const struct byteq *q = &rpc->input;
345                 if (q->head <= q->size) {
346                     stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
347                                           THIS_MODULE, rpc->name);
348                 }
349                 return rpc->status;
350             }
351         }
352     }
353
354     return EAGAIN;
355 }
356
357 /* Causes the poll loop to wake up when jsonrpc_recv() may return a value other
358  * than EAGAIN. */
359 void
360 jsonrpc_recv_wait(struct jsonrpc *rpc)
361 {
362     if (rpc->status || !byteq_is_empty(&rpc->input)) {
363         poll_immediate_wake_at(rpc->name);
364     } else {
365         stream_recv_wait(rpc->stream);
366     }
367 }
368
369 /* Sends 'msg' on 'rpc' and waits for it to be successfully queued to the
370  * underlying stream.  Returns 0 if 'msg' was sent successfully, otherwise a
371  * status value (see jsonrpc_get_status()).
372  *
373  * Always takes ownership of 'msg', regardless of success. */
374 int
375 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
376 {
377     int error;
378
379     fatal_signal_run();
380
381     error = jsonrpc_send(rpc, msg);
382     if (error) {
383         return error;
384     }
385
386     for (;;) {
387         jsonrpc_run(rpc);
388         if (list_is_empty(&rpc->output) || rpc->status) {
389             return rpc->status;
390         }
391         jsonrpc_wait(rpc);
392         poll_block();
393     }
394 }
395
396 /* Waits for a message to be received on 'rpc'.  Same semantics as
397  * jsonrpc_recv() except that EAGAIN will never be returned. */
398 int
399 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
400 {
401     for (;;) {
402         int error = jsonrpc_recv(rpc, msgp);
403         if (error != EAGAIN) {
404             fatal_signal_run();
405             return error;
406         }
407
408         jsonrpc_run(rpc);
409         jsonrpc_wait(rpc);
410         jsonrpc_recv_wait(rpc);
411         poll_block();
412     }
413 }
414
415 /* Sends 'request' to 'rpc' then waits for a reply.  The return value is 0 if
416  * successful, in which case '*replyp' is set to the reply, which the caller
417  * must eventually free with jsonrpc_msg_destroy().  Otherwise returns a status
418  * value (see jsonrpc_get_status()).
419  *
420  * Discards any message received on 'rpc' that is not a reply to 'request'
421  * (based on message id).
422  *
423  * Always takes ownership of 'request', regardless of success. */
424 int
425 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
426                        struct jsonrpc_msg **replyp)
427 {
428     struct jsonrpc_msg *reply = NULL;
429     struct json *id;
430     int error;
431
432     id = json_clone(request->id);
433     error = jsonrpc_send_block(rpc, request);
434     if (!error) {
435         for (;;) {
436             error = jsonrpc_recv_block(rpc, &reply);
437             if (error) {
438                 break;
439             }
440             if ((reply->type == JSONRPC_REPLY || reply->type == JSONRPC_ERROR)
441                 && json_equal(id, reply->id)) {
442                 break;
443             }
444             jsonrpc_msg_destroy(reply);
445         }
446     }
447     *replyp = error ? NULL : reply;
448     json_destroy(id);
449     return error;
450 }
451
452 /* Attempts to parse the content of 'rpc->parser' (which is complete JSON) as a
453  * JSON-RPC message.  If successful, returns the JSON-RPC message.  On failure,
454  * signals an error on 'rpc' with jsonrpc_error() and returns NULL. */
455 static struct jsonrpc_msg *
456 jsonrpc_parse_received_message(struct jsonrpc *rpc)
457 {
458     struct jsonrpc_msg *msg;
459     struct json *json;
460     char *error;
461
462     json = json_parser_finish(rpc->parser);
463     rpc->parser = NULL;
464     if (json->type == JSON_STRING) {
465         VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
466                      rpc->name, json_string(json));
467         jsonrpc_error(rpc, EPROTO);
468         json_destroy(json);
469         return NULL;
470     }
471
472     error = jsonrpc_msg_from_json(json, &msg);
473     if (error) {
474         VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
475                      rpc->name, error);
476         free(error);
477         jsonrpc_error(rpc, EPROTO);
478         return NULL;
479     }
480
481     jsonrpc_log_msg(rpc, "received", msg);
482     return msg;
483 }
484
485 static void
486 jsonrpc_error(struct jsonrpc *rpc, int error)
487 {
488     ovs_assert(error);
489     if (!rpc->status) {
490         rpc->status = error;
491         jsonrpc_cleanup(rpc);
492     }
493 }
494
495 static void
496 jsonrpc_cleanup(struct jsonrpc *rpc)
497 {
498     stream_close(rpc->stream);
499     rpc->stream = NULL;
500
501     json_parser_abort(rpc->parser);
502     rpc->parser = NULL;
503
504     ofpbuf_list_delete(&rpc->output);
505     rpc->backlog = 0;
506     rpc->output_count = 0;
507 }
508 \f
509 static struct jsonrpc_msg *
510 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
511                 struct json *params, struct json *result, struct json *error,
512                 struct json *id)
513 {
514     struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
515     msg->type = type;
516     msg->method = method ? xstrdup(method) : NULL;
517     msg->params = params;
518     msg->result = result;
519     msg->error = error;
520     msg->id = id;
521     return msg;
522 }
523
524 static struct json *
525 jsonrpc_create_id(void)
526 {
527     static atomic_count next_id = ATOMIC_COUNT_INIT(0);
528     unsigned int id;
529
530     id = atomic_count_inc(&next_id);
531     return json_integer_create(id);
532 }
533
534 struct jsonrpc_msg *
535 jsonrpc_create_request(const char *method, struct json *params,
536                        struct json **idp)
537 {
538     struct json *id = jsonrpc_create_id();
539     if (idp) {
540         *idp = json_clone(id);
541     }
542     return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
543 }
544
545 struct jsonrpc_msg *
546 jsonrpc_create_notify(const char *method, struct json *params)
547 {
548     return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
549 }
550
551 struct jsonrpc_msg *
552 jsonrpc_create_reply(struct json *result, const struct json *id)
553 {
554     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
555                            json_clone(id));
556 }
557
558 struct jsonrpc_msg *
559 jsonrpc_create_error(struct json *error, const struct json *id)
560 {
561     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
562                            json_clone(id));
563 }
564
565 const char *
566 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
567 {
568     switch (type) {
569     case JSONRPC_REQUEST:
570         return "request";
571
572     case JSONRPC_NOTIFY:
573         return "notification";
574
575     case JSONRPC_REPLY:
576         return "reply";
577
578     case JSONRPC_ERROR:
579         return "error";
580     }
581     return "(null)";
582 }
583
584 char *
585 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
586 {
587     const char *type_name;
588     unsigned int pattern;
589
590     if (m->params && m->params->type != JSON_ARRAY) {
591         return xstrdup("\"params\" must be JSON array");
592     }
593
594     switch (m->type) {
595     case JSONRPC_REQUEST:
596         pattern = 0x11001;
597         break;
598
599     case JSONRPC_NOTIFY:
600         pattern = 0x11000;
601         break;
602
603     case JSONRPC_REPLY:
604         pattern = 0x00101;
605         break;
606
607     case JSONRPC_ERROR:
608         pattern = 0x00011;
609         break;
610
611     default:
612         return xasprintf("invalid JSON-RPC message type %d", m->type);
613     }
614
615     type_name = jsonrpc_msg_type_to_string(m->type);
616     if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
617         return xasprintf("%s must%s have \"method\"",
618                          type_name, (pattern & 0x10000) ? "" : " not");
619
620     }
621     if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
622         return xasprintf("%s must%s have \"params\"",
623                          type_name, (pattern & 0x1000) ? "" : " not");
624
625     }
626     if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
627         return xasprintf("%s must%s have \"result\"",
628                          type_name, (pattern & 0x100) ? "" : " not");
629
630     }
631     if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
632         return xasprintf("%s must%s have \"error\"",
633                          type_name, (pattern & 0x10) ? "" : " not");
634
635     }
636     if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
637         return xasprintf("%s must%s have \"id\"",
638                          type_name, (pattern & 0x1) ? "" : " not");
639
640     }
641     return NULL;
642 }
643
644 void
645 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
646 {
647     if (m) {
648         free(m->method);
649         json_destroy(m->params);
650         json_destroy(m->result);
651         json_destroy(m->error);
652         json_destroy(m->id);
653         free(m);
654     }
655 }
656
657 static struct json *
658 null_from_json_null(struct json *json)
659 {
660     if (json && json->type == JSON_NULL) {
661         json_destroy(json);
662         return NULL;
663     }
664     return json;
665 }
666
667 char *
668 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
669 {
670     struct json *method = NULL;
671     struct jsonrpc_msg *msg = NULL;
672     struct shash *object;
673     char *error;
674
675     if (json->type != JSON_OBJECT) {
676         error = xstrdup("message is not a JSON object");
677         goto exit;
678     }
679     object = json_object(json);
680
681     method = shash_find_and_delete(object, "method");
682     if (method && method->type != JSON_STRING) {
683         error = xstrdup("method is not a JSON string");
684         goto exit;
685     }
686
687     msg = xzalloc(sizeof *msg);
688     msg->method = method ? xstrdup(method->u.string) : NULL;
689     msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
690     msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
691     msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
692     msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
693     msg->type = (msg->result ? JSONRPC_REPLY
694                  : msg->error ? JSONRPC_ERROR
695                  : msg->id ? JSONRPC_REQUEST
696                  : JSONRPC_NOTIFY);
697     if (!shash_is_empty(object)) {
698         error = xasprintf("message has unexpected member \"%s\"",
699                           shash_first(object)->name);
700         goto exit;
701     }
702     error = jsonrpc_msg_is_valid(msg);
703     if (error) {
704         goto exit;
705     }
706
707 exit:
708     json_destroy(method);
709     json_destroy(json);
710     if (error) {
711         jsonrpc_msg_destroy(msg);
712         msg = NULL;
713     }
714     *msgp = msg;
715     return error;
716 }
717
718 struct json *
719 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
720 {
721     struct json *json = json_object_create();
722
723     if (m->method) {
724         json_object_put(json, "method", json_string_create_nocopy(m->method));
725     }
726
727     if (m->params) {
728         json_object_put(json, "params", m->params);
729     }
730
731     if (m->result) {
732         json_object_put(json, "result", m->result);
733     } else if (m->type == JSONRPC_ERROR) {
734         json_object_put(json, "result", json_null_create());
735     }
736
737     if (m->error) {
738         json_object_put(json, "error", m->error);
739     } else if (m->type == JSONRPC_REPLY) {
740         json_object_put(json, "error", json_null_create());
741     }
742
743     if (m->id) {
744         json_object_put(json, "id", m->id);
745     } else if (m->type == JSONRPC_NOTIFY) {
746         json_object_put(json, "id", json_null_create());
747     }
748
749     free(m);
750
751     return json;
752 }
753 \f
754 /* A JSON-RPC session with reconnection. */
755
756 struct jsonrpc_session {
757     struct reconnect *reconnect;
758     struct jsonrpc *rpc;
759     struct stream *stream;
760     struct pstream *pstream;
761     int last_error;
762     unsigned int seqno;
763     uint8_t dscp;
764 };
765
766 /* Creates and returns a jsonrpc_session to 'name', which should be a string
767  * acceptable to stream_open() or pstream_open().
768  *
769  * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
770  * jsonrpc_session connects to 'name'.  If 'retry' is true, then the new
771  * session connects and reconnects to 'name', with backoff.  If 'retry' is
772  * false, the new session will only try to connect once and after a connection
773  * failure or a disconnection jsonrpc_session_is_alive() will return false for
774  * the new session.
775  *
776  * If 'name' is a passive connection method, e.g. "ptcp:", the new
777  * jsonrpc_session listens for connections to 'name'.  It maintains at most one
778  * connection at any given time.  Any new connection causes the previous one
779  * (if any) to be dropped. */
780 struct jsonrpc_session *
781 jsonrpc_session_open(const char *name, bool retry)
782 {
783     struct jsonrpc_session *s;
784
785     s = xmalloc(sizeof *s);
786     s->reconnect = reconnect_create(time_msec());
787     reconnect_set_name(s->reconnect, name);
788     reconnect_enable(s->reconnect, time_msec());
789     s->rpc = NULL;
790     s->stream = NULL;
791     s->pstream = NULL;
792     s->seqno = 0;
793     s->dscp = 0;
794     s->last_error = 0;
795
796     if (!pstream_verify_name(name)) {
797         reconnect_set_passive(s->reconnect, true, time_msec());
798     } else if (!retry) {
799         reconnect_set_max_tries(s->reconnect, 1);
800         reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX);
801     }
802
803     if (!stream_or_pstream_needs_probes(name)) {
804         reconnect_set_probe_interval(s->reconnect, 0);
805     }
806
807     return s;
808 }
809
810 /* Creates and returns a jsonrpc_session that is initially connected to
811  * 'jsonrpc'.  If the connection is dropped, it will not be reconnected.
812  *
813  * On the assumption that such connections are likely to be short-lived
814  * (e.g. from ovs-vsctl), informational logging for them is suppressed. */
815 struct jsonrpc_session *
816 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
817 {
818     struct jsonrpc_session *s;
819
820     s = xmalloc(sizeof *s);
821     s->reconnect = reconnect_create(time_msec());
822     reconnect_set_quiet(s->reconnect, true);
823     reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
824     reconnect_set_max_tries(s->reconnect, 0);
825     reconnect_connected(s->reconnect, time_msec());
826     s->dscp = dscp;
827     s->rpc = jsonrpc;
828     s->stream = NULL;
829     s->pstream = NULL;
830     s->seqno = 0;
831
832     return s;
833 }
834
835 void
836 jsonrpc_session_close(struct jsonrpc_session *s)
837 {
838     if (s) {
839         jsonrpc_close(s->rpc);
840         reconnect_destroy(s->reconnect);
841         stream_close(s->stream);
842         pstream_close(s->pstream);
843         free(s);
844     }
845 }
846
847 static void
848 jsonrpc_session_disconnect(struct jsonrpc_session *s)
849 {
850     if (s->rpc) {
851         jsonrpc_error(s->rpc, EOF);
852         jsonrpc_close(s->rpc);
853         s->rpc = NULL;
854         s->seqno++;
855     } else if (s->stream) {
856         stream_close(s->stream);
857         s->stream = NULL;
858         s->seqno++;
859     }
860 }
861
862 static void
863 jsonrpc_session_connect(struct jsonrpc_session *s)
864 {
865     const char *name = reconnect_get_name(s->reconnect);
866     int error;
867
868     jsonrpc_session_disconnect(s);
869     if (!reconnect_is_passive(s->reconnect)) {
870         error = jsonrpc_stream_open(name, &s->stream, s->dscp);
871         if (!error) {
872             reconnect_connecting(s->reconnect, time_msec());
873         } else {
874             s->last_error = error;
875         }
876     } else {
877         error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream,
878                                                       s->dscp);
879         if (!error) {
880             reconnect_listening(s->reconnect, time_msec());
881         }
882     }
883
884     if (error) {
885         reconnect_connect_failed(s->reconnect, time_msec(), error);
886     }
887     s->seqno++;
888 }
889
890 void
891 jsonrpc_session_run(struct jsonrpc_session *s)
892 {
893     if (s->pstream) {
894         struct stream *stream;
895         int error;
896
897         error = pstream_accept(s->pstream, &stream);
898         if (!error) {
899             if (s->rpc || s->stream) {
900                 VLOG_INFO_RL(&rl,
901                              "%s: new connection replacing active connection",
902                              reconnect_get_name(s->reconnect));
903                 jsonrpc_session_disconnect(s);
904             }
905             reconnect_connected(s->reconnect, time_msec());
906             s->rpc = jsonrpc_open(stream);
907         } else if (error != EAGAIN) {
908             reconnect_listen_error(s->reconnect, time_msec(), error);
909             pstream_close(s->pstream);
910             s->pstream = NULL;
911         }
912     }
913
914     if (s->rpc) {
915         size_t backlog;
916         int error;
917
918         backlog = jsonrpc_get_backlog(s->rpc);
919         jsonrpc_run(s->rpc);
920         if (jsonrpc_get_backlog(s->rpc) < backlog) {
921             /* Data previously caught in a queue was successfully sent (or
922              * there's an error, which we'll catch below.)
923              *
924              * We don't count data that is successfully sent immediately as
925              * activity, because there's a lot of queuing downstream from us,
926              * which means that we can push a lot of data into a connection
927              * that has stalled and won't ever recover.
928              */
929             reconnect_activity(s->reconnect, time_msec());
930         }
931
932         error = jsonrpc_get_status(s->rpc);
933         if (error) {
934             reconnect_disconnected(s->reconnect, time_msec(), error);
935             jsonrpc_session_disconnect(s);
936             s->last_error = error;
937         }
938     } else if (s->stream) {
939         int error;
940
941         stream_run(s->stream);
942         error = stream_connect(s->stream);
943         if (!error) {
944             reconnect_connected(s->reconnect, time_msec());
945             s->rpc = jsonrpc_open(s->stream);
946             s->stream = NULL;
947         } else if (error != EAGAIN) {
948             reconnect_connect_failed(s->reconnect, time_msec(), error);
949             stream_close(s->stream);
950             s->stream = NULL;
951         }
952     }
953
954     switch (reconnect_run(s->reconnect, time_msec())) {
955     case RECONNECT_CONNECT:
956         jsonrpc_session_connect(s);
957         break;
958
959     case RECONNECT_DISCONNECT:
960         reconnect_disconnected(s->reconnect, time_msec(), 0);
961         jsonrpc_session_disconnect(s);
962         break;
963
964     case RECONNECT_PROBE:
965         if (s->rpc) {
966             struct json *params;
967             struct jsonrpc_msg *request;
968
969             params = json_array_create_empty();
970             request = jsonrpc_create_request("echo", params, NULL);
971             json_destroy(request->id);
972             request->id = json_string_create("echo");
973             jsonrpc_send(s->rpc, request);
974         }
975         break;
976     }
977 }
978
979 void
980 jsonrpc_session_wait(struct jsonrpc_session *s)
981 {
982     if (s->rpc) {
983         jsonrpc_wait(s->rpc);
984     } else if (s->stream) {
985         stream_run_wait(s->stream);
986         stream_connect_wait(s->stream);
987     }
988     if (s->pstream) {
989         pstream_wait(s->pstream);
990     }
991     reconnect_wait(s->reconnect, time_msec());
992 }
993
994 size_t
995 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
996 {
997     return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
998 }
999
1000 /* Always returns a pointer to a valid C string, assuming 's' was initialized
1001  * correctly. */
1002 const char *
1003 jsonrpc_session_get_name(const struct jsonrpc_session *s)
1004 {
1005     return reconnect_get_name(s->reconnect);
1006 }
1007
1008 /* Always takes ownership of 'msg', regardless of success. */
1009 int
1010 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
1011 {
1012     if (s->rpc) {
1013         return jsonrpc_send(s->rpc, msg);
1014     } else {
1015         jsonrpc_msg_destroy(msg);
1016         return ENOTCONN;
1017     }
1018 }
1019
1020 struct jsonrpc_msg *
1021 jsonrpc_session_recv(struct jsonrpc_session *s)
1022 {
1023     if (s->rpc) {
1024         unsigned int received_bytes;
1025         struct jsonrpc_msg *msg;
1026
1027         received_bytes = jsonrpc_get_received_bytes(s->rpc);
1028         jsonrpc_recv(s->rpc, &msg);
1029         if (received_bytes != jsonrpc_get_received_bytes(s->rpc)) {
1030             /* Data was successfully received.
1031              *
1032              * Previously we only counted receiving a full message as activity,
1033              * but with large messages or a slow connection that policy could
1034              * time out the session mid-message. */
1035             reconnect_activity(s->reconnect, time_msec());
1036         }
1037
1038         if (msg) {
1039             if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
1040                 /* Echo request.  Send reply. */
1041                 struct jsonrpc_msg *reply;
1042
1043                 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
1044                 jsonrpc_session_send(s, reply);
1045             } else if (msg->type == JSONRPC_REPLY
1046                        && msg->id && msg->id->type == JSON_STRING
1047                        && !strcmp(msg->id->u.string, "echo")) {
1048                 /* It's a reply to our echo request.  Suppress it. */
1049             } else {
1050                 return msg;
1051             }
1052             jsonrpc_msg_destroy(msg);
1053         }
1054     }
1055     return NULL;
1056 }
1057
1058 void
1059 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
1060 {
1061     if (s->rpc) {
1062         jsonrpc_recv_wait(s->rpc);
1063     }
1064 }
1065
1066 bool
1067 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
1068 {
1069     return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
1070 }
1071
1072 bool
1073 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
1074 {
1075     return s->rpc != NULL;
1076 }
1077
1078 unsigned int
1079 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
1080 {
1081     return s->seqno;
1082 }
1083
1084 int
1085 jsonrpc_session_get_status(const struct jsonrpc_session *s)
1086 {
1087     return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0;
1088 }
1089
1090 int
1091 jsonrpc_session_get_last_error(const struct jsonrpc_session *s)
1092 {
1093     return s->last_error;
1094 }
1095
1096 void
1097 jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s,
1098                                     struct reconnect_stats *stats)
1099 {
1100     reconnect_get_stats(s->reconnect, time_msec(), stats);
1101 }
1102
1103 void
1104 jsonrpc_session_enable_reconnect(struct jsonrpc_session *s)
1105 {
1106     reconnect_set_max_tries(s->reconnect, UINT_MAX);
1107     reconnect_set_backoff(s->reconnect, RECONNECT_DEFAULT_MIN_BACKOFF,
1108                           RECONNECT_DEFAULT_MAX_BACKOFF);
1109 }
1110
1111 void
1112 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
1113 {
1114     reconnect_force_reconnect(s->reconnect, time_msec());
1115 }
1116
1117 void
1118 jsonrpc_session_set_max_backoff(struct jsonrpc_session *s, int max_backoff)
1119 {
1120     reconnect_set_backoff(s->reconnect, 0, max_backoff);
1121 }
1122
1123 void
1124 jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
1125                                    int probe_interval)
1126 {
1127     reconnect_set_probe_interval(s->reconnect, probe_interval);
1128 }
1129
1130 void
1131 jsonrpc_session_set_dscp(struct jsonrpc_session *s,
1132                          uint8_t dscp)
1133 {
1134     if (s->dscp != dscp) {
1135         pstream_close(s->pstream);
1136         s->pstream = NULL;
1137
1138         s->dscp = dscp;
1139         jsonrpc_session_force_reconnect(s);
1140     }
1141 }