/*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include "reconnect.h"
#include "stream.h"
#include "timeval.h"
-#include "vlog.h"
+#include "openvswitch/vlog.h"
VLOG_DEFINE_THIS_MODULE(jsonrpc);
\f
struct json_parser *parser;
/* Output. */
- struct list output; /* Contains "struct ofpbuf"s. */
+ struct ovs_list output; /* Contains "struct ofpbuf"s. */
+ size_t output_count; /* Number of elements in "output". */
size_t backlog;
};
int
jsonrpc_stream_open(const char *name, struct stream **streamp, uint8_t dscp)
{
- return stream_open_with_default_port(name, OVSDB_OLD_PORT,
- streamp, dscp);
+ return stream_open_with_default_port(name, OVSDB_PORT, streamp, dscp);
}
/* This is just the same as pstream_open() except that it uses the default
int
jsonrpc_pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
{
- return pstream_open_with_default_port(name, OVSDB_OLD_PORT,
- pstreamp, dscp);
+ return pstream_open_with_default_port(name, OVSDB_PORT, pstreamp, dscp);
}
/* Returns a new JSON-RPC stream that uses 'stream' for input and output. The
struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
int retval;
- retval = stream_send(rpc->stream, ofpbuf_data(buf), ofpbuf_size(buf));
+ retval = stream_send(rpc->stream, buf->data, buf->size);
if (retval >= 0) {
rpc->backlog -= retval;
ofpbuf_pull(buf, retval);
- if (!ofpbuf_size(buf)) {
+ if (!buf->size) {
list_remove(&buf->list_node);
+ rpc->output_count--;
ofpbuf_delete(buf);
}
} else {
{
struct ofpbuf *buf;
struct json *json;
+ struct ds ds = DS_EMPTY_INITIALIZER;
size_t length;
- char *s;
if (rpc->status) {
jsonrpc_msg_destroy(msg);
jsonrpc_log_msg(rpc, "send", msg);
json = jsonrpc_msg_to_json(msg);
- s = json_to_string(json, 0);
- length = strlen(s);
+ json_to_ds(json, 0, &ds);
+ length = ds.length;
json_destroy(json);
buf = xmalloc(sizeof *buf);
- ofpbuf_use(buf, s, length);
- ofpbuf_set_size(buf, length);
+ ofpbuf_use_ds(buf, &ds);
list_push_back(&rpc->output, &buf->list_node);
+ rpc->output_count++;
rpc->backlog += length;
+ if (rpc->output_count >= 50) {
+ VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
+ " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
+ rpc->output_count, rpc->backlog);
+ }
+
if (rpc->backlog == length) {
jsonrpc_run(rpc);
}
ofpbuf_list_delete(&rpc->output);
rpc->backlog = 0;
+ rpc->output_count = 0;
}
\f
static struct jsonrpc_msg *
static struct json *
jsonrpc_create_id(void)
{
- static atomic_uint next_id = ATOMIC_VAR_INIT(0);
+ static atomic_count next_id = ATOMIC_COUNT_INIT(0);
unsigned int id;
- atomic_add(&next_id, 1, &id);
+ id = atomic_count_inc(&next_id);
return json_integer_create(id);
}
reconnect_connect_failed(s->reconnect, time_msec(), error);
stream_close(s->stream);
s->stream = NULL;
+ s->last_error = error;
}
}
}
}
+/* Returns true if 's' is currently connected or trying to connect. */
bool
jsonrpc_session_is_alive(const struct jsonrpc_session *s)
{
return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
}
+/* Returns true if 's' is currently connected. */
bool
jsonrpc_session_is_connected(const struct jsonrpc_session *s)
{
return s->rpc != NULL;
}
+/* Returns a sequence number for 's'. The sequence number increments every
+ * time 's' connects or disconnects. Thus, a caller can use the change (or
+ * lack of change) in the sequence number to figure out whether the underlying
+ * connection is the same as before. */
unsigned int
jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
{
return s->seqno;
}
+/* Returns the current status of 's'. If 's' is NULL or is disconnected, this
+ * is 0, otherwise it is the status of the connection, as reported by
+ * jsonrpc_get_status(). */
int
jsonrpc_session_get_status(const struct jsonrpc_session *s)
{
return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0;
}
+/* Returns the last error reported on a connection by 's'. The return value is
+ * 0 only if no connection made by 's' has ever encountered an error. See
+ * jsonrpc_get_status() for return value interpretation. */
int
jsonrpc_session_get_last_error(const struct jsonrpc_session *s)
{
return s->last_error;
}
+/* Populates 'stats' with statistics from 's'. */
void
jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s,
struct reconnect_stats *stats)
reconnect_get_stats(s->reconnect, time_msec(), stats);
}
+/* Enables 's' to reconnect to the peer if the connection drops. */
void
jsonrpc_session_enable_reconnect(struct jsonrpc_session *s)
{
RECONNECT_DEFAULT_MAX_BACKOFF);
}
+/* Forces 's' to drop its connection (if any) and reconnect. */
void
jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
{
reconnect_force_reconnect(s->reconnect, time_msec());
}
+/* Sets 'max_backoff' as the maximum time, in milliseconds, to wait after a
+ * connection attempt fails before attempting to connect again. */
void
jsonrpc_session_set_max_backoff(struct jsonrpc_session *s, int max_backoff)
{
reconnect_set_backoff(s->reconnect, 0, max_backoff);
}
+/* Sets the "probe interval" for 's' to 'probe_interval', in milliseconds. If
+ * this is zero, it disables the connection keepalive feature. Otherwise, if
+ * 's' is idle for 'probe_interval' milliseconds then 's' will send an echo
+ * request and, if no reply is received within an additional 'probe_interval'
+ * milliseconds, close the connection (then reconnect, if that feature is
+ * enabled). */
void
jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
int probe_interval)
reconnect_set_probe_interval(s->reconnect, probe_interval);
}
+/* Sets the DSCP value used for 's''s connection to 'dscp'. If this is
+ * different from the DSCP value currently in use then the connection is closed
+ * and reconnected. */
void
-jsonrpc_session_set_dscp(struct jsonrpc_session *s,
- uint8_t dscp)
+jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp)
{
if (s->dscp != dscp) {
- if (s->pstream) {
- int error;
+ pstream_close(s->pstream);
+ s->pstream = NULL;
- error = pstream_set_dscp(s->pstream, dscp);
- if (error) {
- VLOG_ERR("%s: failed set_dscp %s",
- reconnect_get_name(s->reconnect),
- ovs_strerror(error));
- }
- /*
- * XXX race window between setting dscp to listening socket
- * and accepting socket. accepted socket may have old dscp value.
- * Ignore this race window for now.
- */
- }
s->dscp = dscp;
jsonrpc_session_force_reconnect(s);
}