-# Copyright (c) 2010, 2011, 2012 Nicira, Inc.
+# Copyright (c) 2010, 2011, 2012, 2013 Nicira, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
return "\"params\" must be JSON array"
pattern = {Message.T_REQUEST: 0x11001,
- Message.T_NOTIFY: 0x11000,
- Message.T_REPLY: 0x00101,
- Message.T_ERROR: 0x00011}.get(self.type)
+ Message.T_NOTIFY: 0x11000,
+ Message.T_REPLY: 0x00101,
+ Message.T_ERROR: 0x00011}.get(self.type)
if pattern is None:
return "invalid JSON-RPC message type %s" % self.type
self.input = ""
self.output = ""
self.parser = None
+ self.received_bytes = 0
def close(self):
self.stream.close()
else:
return len(self.output)
+ def get_received_bytes(self):
+ return self.received_bytes
+
def __log_msg(self, title, msg):
- vlog.dbg("%s: %s %s" % (self.name, title, msg))
+ if vlog.dbg_is_enabled():
+ vlog.dbg("%s: %s %s" % (self.name, title, msg))
def send(self, msg):
if self.status:
return EOF, None
else:
self.input += data
+ self.received_bytes += len(data)
else:
if self.parser is None:
self.parser = ovs.json.Parser()
if ovs.stream.PassiveStream.is_valid_name(name):
reconnect.set_passive(True, ovs.timeval.msec())
- if ovs.stream.stream_or_pstream_needs_probes(name):
+ if not ovs.stream.stream_or_pstream_needs_probes(name):
reconnect.set_probe_interval(0)
return Session(reconnect, None)
self.pstream = None
if self.rpc:
+ backlog = self.rpc.get_backlog()
self.rpc.run()
+ if self.rpc.get_backlog() < backlog:
+ # Data previously caught in a queue was successfully sent (or
+ # there's an error, which we'll catch below).
+ #
+ # We don't count data that is successfully sent immediately as
+ # activity, because there's a lot of queuing downstream from
+ # us, which means that we can push a lot of data into a
+ # connection that has stalled and won't ever recover.
+ self.reconnect.activity(ovs.timeval.msec())
+
error = self.rpc.get_status()
if error != 0:
self.reconnect.disconnected(ovs.timeval.msec(), error)
request.id = "echo"
self.rpc.send(request)
else:
- assert action == None
+ assert action is None
def wait(self, poller):
if self.rpc is not None:
def recv(self):
if self.rpc is not None:
+ received_bytes = self.rpc.get_received_bytes()
error, msg = self.rpc.recv()
- if not error:
+ if received_bytes != self.rpc.get_received_bytes():
+ # Data was successfully received.
+ #
+ # Previously we only counted receiving a full message as
+ # activity, but with large messages or a slow connection that
+ # policy could time out the session mid-message.
self.reconnect.activity(ovs.timeval.msec())
+
+ if not error:
if msg.type == Message.T_REQUEST and msg.method == "echo":
# Echo request. Send reply.
self.send(Message.create_reply(msg.params, msg.id))