X-Git-Url: http://git.cascardo.eti.br/?a=blobdiff_plain;f=lib%2Fjsonrpc.c;h=35428a67ecffe624c0a8fcbebde9555f34a41c18;hb=HEAD;hp=50073b6d7e9d32c768b2b4476773f1b512963200;hpb=cb22974d773942d66da42b700b8bca0db27a0920;p=cascardo%2Fovs.git diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c index 50073b6d7..35428a67e 100644 --- a/lib/jsonrpc.c +++ b/lib/jsonrpc.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc. + * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,11 +26,12 @@ #include "json.h" #include "list.h" #include "ofpbuf.h" +#include "ovs-thread.h" #include "poll-loop.h" #include "reconnect.h" #include "stream.h" #include "timeval.h" -#include "vlog.h" +#include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(jsonrpc); @@ -41,38 +42,36 @@ struct jsonrpc { /* Input. */ struct byteq input; + uint8_t input_buffer[512]; struct json_parser *parser; - struct jsonrpc_msg *received; /* Output. */ - struct list output; /* Contains "struct ofpbuf"s. */ + struct ovs_list output; /* Contains "struct ofpbuf"s. */ + size_t output_count; /* Number of elements in "output". */ size_t backlog; }; /* Rate limit for error messages. */ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); -static void jsonrpc_received(struct jsonrpc *); +static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *); static void jsonrpc_cleanup(struct jsonrpc *); static void jsonrpc_error(struct jsonrpc *, int error); /* This is just the same as stream_open() except that it uses the default - * JSONRPC ports if none is specified. */ + * JSONRPC port if none is specified. */ int jsonrpc_stream_open(const char *name, struct stream **streamp, uint8_t dscp) { - return stream_open_with_default_ports(name, JSONRPC_TCP_PORT, - JSONRPC_SSL_PORT, streamp, - dscp); + return stream_open_with_default_port(name, OVSDB_PORT, streamp, dscp); } /* This is just the same as pstream_open() except that it uses the default - * JSONRPC ports if none is specified. */ + * JSONRPC port if none is specified. */ int jsonrpc_pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp) { - return pstream_open_with_default_ports(name, JSONRPC_TCP_PORT, - JSONRPC_SSL_PORT, pstreamp, dscp); + return pstream_open_with_default_port(name, OVSDB_PORT, pstreamp, dscp); } /* Returns a new JSON-RPC stream that uses 'stream' for input and output. The @@ -87,7 +86,7 @@ jsonrpc_open(struct stream *stream) rpc = xzalloc(sizeof *rpc); rpc->name = xstrdup(stream_get_name(stream)); rpc->stream = stream; - byteq_init(&rpc->input); + byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer); list_init(&rpc->output); return rpc; @@ -124,12 +123,13 @@ jsonrpc_run(struct jsonrpc *rpc) ofpbuf_pull(buf, retval); if (!buf->size) { list_remove(&buf->list_node); + rpc->output_count--; ofpbuf_delete(buf); } } else { if (retval != -EAGAIN) { VLOG_WARN_RL(&rl, "%s: send error: %s", - rpc->name, strerror(-retval)); + rpc->name, ovs_strerror(-retval)); jsonrpc_error(rpc, -retval); } break; @@ -238,8 +238,8 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) { struct ofpbuf *buf; struct json *json; + struct ds ds = DS_EMPTY_INITIALIZER; size_t length; - char *s; if (rpc->status) { jsonrpc_msg_destroy(msg); @@ -249,16 +249,22 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) jsonrpc_log_msg(rpc, "send", msg); json = jsonrpc_msg_to_json(msg); - s = json_to_string(json, 0); - length = strlen(s); + json_to_ds(json, 0, &ds); + length = ds.length; json_destroy(json); buf = xmalloc(sizeof *buf); - ofpbuf_use(buf, s, length); - buf->size = length; + ofpbuf_use_ds(buf, &ds); list_push_back(&rpc->output, &buf->list_node); + rpc->output_count++; rpc->backlog += length; + if (rpc->output_count >= 50) { + VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of" + " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name, + rpc->output_count, rpc->backlog); + } + if (rpc->backlog == length) { jsonrpc_run(rpc); } @@ -292,11 +298,10 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) } for (i = 0; i < 50; i++) { - if (rpc->received) { - *msgp = rpc->received; - rpc->received = NULL; - return 0; - } else if (byteq_is_empty(&rpc->input)) { + size_t n, used; + + /* Fill our input buffer if it's empty. */ + if (byteq_is_empty(&rpc->input)) { size_t chunk; int retval; @@ -307,7 +312,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) return EAGAIN; } else { VLOG_WARN_RL(&rl, "%s: receive error: %s", - rpc->name, strerror(-retval)); + rpc->name, ovs_strerror(-retval)); jsonrpc_error(rpc, -retval); return rpc->status; } @@ -316,27 +321,31 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) return EOF; } byteq_advance_head(&rpc->input, retval); - } else { - size_t n, used; + } - if (!rpc->parser) { - rpc->parser = json_parser_create(0); + /* We have some input. Feed it into the JSON parser. */ + if (!rpc->parser) { + rpc->parser = json_parser_create(0); + } + n = byteq_tailroom(&rpc->input); + used = json_parser_feed(rpc->parser, + (char *) byteq_tail(&rpc->input), n); + byteq_advance_tail(&rpc->input, used); + + /* If we have complete JSON, attempt to parse it as JSON-RPC. */ + if (json_parser_is_done(rpc->parser)) { + *msgp = jsonrpc_parse_received_message(rpc); + if (*msgp) { + return 0; } - n = byteq_tailroom(&rpc->input); - used = json_parser_feed(rpc->parser, - (char *) byteq_tail(&rpc->input), n); - byteq_advance_tail(&rpc->input, used); - if (json_parser_is_done(rpc->parser)) { - jsonrpc_received(rpc); - if (rpc->status) { - const struct byteq *q = &rpc->input; - if (q->head <= BYTEQ_SIZE) { - stream_report_content(q->buffer, q->head, - STREAM_JSONRPC, - THIS_MODULE, rpc->name); - } - return rpc->status; + + if (rpc->status) { + const struct byteq *q = &rpc->input; + if (q->head <= q->size) { + stream_report_content(q->buffer, q->head, STREAM_JSONRPC, + &this_module, rpc->name); } + return rpc->status; } } } @@ -349,8 +358,8 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) void jsonrpc_recv_wait(struct jsonrpc *rpc) { - if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) { - (poll_immediate_wake)(rpc->name); + if (rpc->status || !byteq_is_empty(&rpc->input)) { + poll_immediate_wake_at(rpc->name); } else { stream_recv_wait(rpc->stream); } @@ -439,8 +448,11 @@ jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request, return error; } -static void -jsonrpc_received(struct jsonrpc *rpc) +/* Attempts to parse the content of 'rpc->parser' (which is complete JSON) as a + * JSON-RPC message. If successful, returns the JSON-RPC message. On failure, + * signals an error on 'rpc' with jsonrpc_error() and returns NULL. */ +static struct jsonrpc_msg * +jsonrpc_parse_received_message(struct jsonrpc *rpc) { struct jsonrpc_msg *msg; struct json *json; @@ -453,7 +465,7 @@ jsonrpc_received(struct jsonrpc *rpc) rpc->name, json_string(json)); jsonrpc_error(rpc, EPROTO); json_destroy(json); - return; + return NULL; } error = jsonrpc_msg_from_json(json, &msg); @@ -462,11 +474,11 @@ jsonrpc_received(struct jsonrpc *rpc) rpc->name, error); free(error); jsonrpc_error(rpc, EPROTO); - return; + return NULL; } jsonrpc_log_msg(rpc, "received", msg); - rpc->received = msg; + return msg; } static void @@ -488,11 +500,9 @@ jsonrpc_cleanup(struct jsonrpc *rpc) json_parser_abort(rpc->parser); rpc->parser = NULL; - jsonrpc_msg_destroy(rpc->received); - rpc->received = NULL; - ofpbuf_list_delete(&rpc->output); rpc->backlog = 0; + rpc->output_count = 0; } static struct jsonrpc_msg * @@ -513,8 +523,11 @@ jsonrpc_create(enum jsonrpc_msg_type type, const char *method, static struct json * jsonrpc_create_id(void) { - static unsigned int id; - return json_integer_create(id++); + static atomic_count next_id = ATOMIC_COUNT_INIT(0); + unsigned int id; + + id = atomic_count_inc(&next_id); + return json_integer_create(id); } struct jsonrpc_msg * @@ -744,6 +757,7 @@ struct jsonrpc_session { struct jsonrpc *rpc; struct stream *stream; struct pstream *pstream; + int last_error; unsigned int seqno; uint8_t dscp; }; @@ -752,14 +766,18 @@ struct jsonrpc_session { * acceptable to stream_open() or pstream_open(). * * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new - * jsonrpc_session connects and reconnects, with back-off, to 'name'. + * jsonrpc_session connects to 'name'. If 'retry' is true, then the new + * session connects and reconnects to 'name', with backoff. If 'retry' is + * false, the new session will only try to connect once and after a connection + * failure or a disconnection jsonrpc_session_is_alive() will return false for + * the new session. * * If 'name' is a passive connection method, e.g. "ptcp:", the new * jsonrpc_session listens for connections to 'name'. It maintains at most one * connection at any given time. Any new connection causes the previous one * (if any) to be dropped. */ struct jsonrpc_session * -jsonrpc_session_open(const char *name) +jsonrpc_session_open(const char *name, bool retry) { struct jsonrpc_session *s; @@ -772,9 +790,13 @@ jsonrpc_session_open(const char *name) s->pstream = NULL; s->seqno = 0; s->dscp = 0; + s->last_error = 0; if (!pstream_verify_name(name)) { reconnect_set_passive(s->reconnect, true, time_msec()); + } else if (!retry) { + reconnect_set_max_tries(s->reconnect, 1); + reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX); } if (!stream_or_pstream_needs_probes(name)) { @@ -847,6 +869,8 @@ jsonrpc_session_connect(struct jsonrpc_session *s) error = jsonrpc_stream_open(name, &s->stream, s->dscp); if (!error) { reconnect_connecting(s->reconnect, time_msec()); + } else { + s->last_error = error; } } else { error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream, @@ -908,6 +932,7 @@ jsonrpc_session_run(struct jsonrpc_session *s) if (error) { reconnect_disconnected(s->reconnect, time_msec(), error); jsonrpc_session_disconnect(s); + s->last_error = error; } } else if (s->stream) { int error; @@ -922,6 +947,7 @@ jsonrpc_session_run(struct jsonrpc_session *s) reconnect_connect_failed(s->reconnect, time_msec(), error); stream_close(s->stream); s->stream = NULL; + s->last_error = error; } } @@ -1037,30 +1063,49 @@ jsonrpc_session_recv_wait(struct jsonrpc_session *s) } } +/* Returns true if 's' is currently connected or trying to connect. */ bool jsonrpc_session_is_alive(const struct jsonrpc_session *s) { return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect); } +/* Returns true if 's' is currently connected. */ bool jsonrpc_session_is_connected(const struct jsonrpc_session *s) { return s->rpc != NULL; } +/* Returns a sequence number for 's'. The sequence number increments every + * time 's' connects or disconnects. Thus, a caller can use the change (or + * lack of change) in the sequence number to figure out whether the underlying + * connection is the same as before. */ unsigned int jsonrpc_session_get_seqno(const struct jsonrpc_session *s) { return s->seqno; } +/* Returns the current status of 's'. If 's' is NULL or is disconnected, this + * is 0, otherwise it is the status of the connection, as reported by + * jsonrpc_get_status(). */ int jsonrpc_session_get_status(const struct jsonrpc_session *s) { return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0; } +/* Returns the last error reported on a connection by 's'. The return value is + * 0 only if no connection made by 's' has ever encountered an error. See + * jsonrpc_get_status() for return value interpretation. */ +int +jsonrpc_session_get_last_error(const struct jsonrpc_session *s) +{ + return s->last_error; +} + +/* Populates 'stats' with statistics from 's'. */ void jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s, struct reconnect_stats *stats) @@ -1068,18 +1113,36 @@ jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s, reconnect_get_stats(s->reconnect, time_msec(), stats); } +/* Enables 's' to reconnect to the peer if the connection drops. */ +void +jsonrpc_session_enable_reconnect(struct jsonrpc_session *s) +{ + reconnect_set_max_tries(s->reconnect, UINT_MAX); + reconnect_set_backoff(s->reconnect, RECONNECT_DEFAULT_MIN_BACKOFF, + RECONNECT_DEFAULT_MAX_BACKOFF); +} + +/* Forces 's' to drop its connection (if any) and reconnect. */ void jsonrpc_session_force_reconnect(struct jsonrpc_session *s) { reconnect_force_reconnect(s->reconnect, time_msec()); } +/* Sets 'max_backoff' as the maximum time, in milliseconds, to wait after a + * connection attempt fails before attempting to connect again. */ void jsonrpc_session_set_max_backoff(struct jsonrpc_session *s, int max_backoff) { reconnect_set_backoff(s->reconnect, 0, max_backoff); } +/* Sets the "probe interval" for 's' to 'probe_interval', in milliseconds. If + * this is zero, it disables the connection keepalive feature. Otherwise, if + * 's' is idle for 'probe_interval' milliseconds then 's' will send an echo + * request and, if no reply is received within an additional 'probe_interval' + * milliseconds, close the connection (then reconnect, if that feature is + * enabled). */ void jsonrpc_session_set_probe_interval(struct jsonrpc_session *s, int probe_interval) @@ -1087,25 +1150,16 @@ jsonrpc_session_set_probe_interval(struct jsonrpc_session *s, reconnect_set_probe_interval(s->reconnect, probe_interval); } +/* Sets the DSCP value used for 's''s connection to 'dscp'. If this is + * different from the DSCP value currently in use then the connection is closed + * and reconnected. */ void -jsonrpc_session_set_dscp(struct jsonrpc_session *s, - uint8_t dscp) +jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp) { if (s->dscp != dscp) { - if (s->pstream) { - int error; + pstream_close(s->pstream); + s->pstream = NULL; - error = pstream_set_dscp(s->pstream, dscp); - if (error) { - VLOG_ERR("%s: failed set_dscp %s", - reconnect_get_name(s->reconnect), strerror(error)); - } - /* - * XXX race window between setting dscp to listening socket - * and accepting socket. accepted socket may have old dscp value. - * Ignore this race window for now. - */ - } s->dscp = dscp; jsonrpc_session_force_reconnect(s); }