python: Fix several pep8 whitespace errors.
[cascardo/ovs.git] / python / ovs / jsonrpc.py
index 906e93c..99aa27c 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2010, 2011 Nicira Networks
+# Copyright (c) 2010, 2011, 2012, 2013 Nicira, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,7 +13,6 @@
 # limitations under the License.
 
 import errno
-import logging
 import os
 
 import ovs.json
@@ -21,8 +20,12 @@ import ovs.poller
 import ovs.reconnect
 import ovs.stream
 import ovs.timeval
+import ovs.util
+import ovs.vlog
+
+EOF = ovs.util.EOF
+vlog = ovs.vlog.Vlog("jsonrpc")
 
-EOF = -1
 
 class Message(object):
     T_REQUEST = 0               # Request.
@@ -34,7 +37,6 @@ class Message(object):
                T_NOTIFY: "notification",
                T_REPLY: "reply",
                T_ERROR: "error"}
-    __next_id = 0
 
     def __init__(self, type_, method, params, result, error, id):
         self.type = type_
@@ -45,6 +47,7 @@ class Message(object):
         self.id = id
 
     _next_id = 0
+
     @staticmethod
     def _create_id():
         this_id = Message._next_id
@@ -73,8 +76,7 @@ class Message(object):
     def type_to_string(type_):
         return Message.__types[type_]
 
-    @staticmethod
-    def __validate_arg(value, name, must_have):
+    def __validate_arg(self, value, name, must_have):
         if (value is not None) == (must_have != 0):
             return None
         else:
@@ -90,18 +92,18 @@ class Message(object):
             return "\"params\" must be JSON array"
 
         pattern = {Message.T_REQUEST: 0x11001,
-                   Message.T_NOTIFY:  0x11000,
-                   Message.T_REPLY:   0x00101,
-                   Message.T_ERROR:   0x00011}.get(self.type)
+                   Message.T_NOTIFY: 0x11000,
+                   Message.T_REPLY: 0x00101,
+                   Message.T_ERROR: 0x00011}.get(self.type)
         if pattern is None:
             return "invalid JSON-RPC message type %s" % self.type
 
         return (
-            Message.__validate_arg(self.method, "method", pattern & 0x10000) or
-            Message.__validate_arg(self.params, "params", pattern & 0x1000) or
-            Message.__validate_arg(self.result, "result", pattern & 0x100) or
-            Message.__validate_arg(self.error, "error", pattern & 0x10) or
-            Message.__validate_arg(self.id, "id", pattern & 0x1))
+            self.__validate_arg(self.method, "method", pattern & 0x10000) or
+            self.__validate_arg(self.params, "params", pattern & 0x1000) or
+            self.__validate_arg(self.result, "result", pattern & 0x100) or
+            self.__validate_arg(self.error, "error", pattern & 0x10) or
+            self.__validate_arg(self.id, "id", pattern & 0x1))
 
     @staticmethod
     def from_json(json):
@@ -121,7 +123,7 @@ class Message(object):
         params = json.pop("params", None)
         result = json.pop("result", None)
         error = json.pop("error", None)
-        id = json.pop("id", None)
+        id_ = json.pop("id", None)
         if len(json):
             return "message has unexpected member \"%s\"" % json.popitem()[0]
 
@@ -129,12 +131,12 @@ class Message(object):
             msg_type = Message.T_REPLY
         elif error is not None:
             msg_type = Message.T_ERROR
-        elif id is not None:
+        elif id_ is not None:
             msg_type = Message.T_REQUEST
         else:
             msg_type = Message.T_NOTIFY
-        
-        msg = Message(msg_type, method, params, result, error, id)
+
+        msg = Message(msg_type, method, params, result, error, id_)
         validation_error = msg.is_valid()
         if validation_error is not None:
             return validation_error
@@ -167,20 +169,24 @@ class Message(object):
             s.append("method=\"%s\"" % self.method)
         if self.params is not None:
             s.append("params=" + ovs.json.to_string(self.params))
+        if self.result is not None:
+            s.append("result=" + ovs.json.to_string(self.result))
         if self.error is not None:
             s.append("error=" + ovs.json.to_string(self.error))
         if self.id is not None:
             s.append("id=" + ovs.json.to_string(self.id))
         return ", ".join(s)
 
+
 class Connection(object):
     def __init__(self, stream):
-        self.name = stream.get_name()
+        self.name = stream.name
         self.stream = stream
         self.status = 0
         self.input = ""
         self.output = ""
         self.parser = None
+        self.received_bytes = 0
 
     def close(self):
         self.stream.close()
@@ -196,8 +202,8 @@ class Connection(object):
                 self.output = self.output[retval:]
             else:
                 if retval != -errno.EAGAIN:
-                    logging.warn("%s: send error: %s" % (self.name,
-                                                         os.strerror(-retval)))
+                    vlog.warn("%s: send error: %s" %
+                              (self.name, os.strerror(-retval)))
                     self.error(-retval)
                 break
 
@@ -205,7 +211,7 @@ class Connection(object):
         if not self.status:
             self.stream.run_wait(poller)
             if len(self.output):
-                self.stream.send_wait()
+                self.stream.send_wait(poller)
 
     def get_status(self):
         return self.status
@@ -216,11 +222,12 @@ class Connection(object):
         else:
             return len(self.output)
 
-    def get_name(self):
-        return self.name
+    def get_received_bytes(self):
+        return self.received_bytes
 
     def __log_msg(self, title, msg):
-        logging.debug("%s: %s %s" % (self.name, title, msg))
+        if vlog.dbg_is_enabled():
+            vlog.dbg("%s: %s %s" % (self.name, title, msg))
 
     def send(self, msg):
         if self.status:
@@ -253,22 +260,23 @@ class Connection(object):
             return self.status, None
 
         while True:
-            if len(self.input) == 0:
+            if not self.input:
                 error, data = self.stream.recv(4096)
                 if error:
                     if error == errno.EAGAIN:
                         return error, None
                     else:
                         # XXX rate-limit
-                        logging.warning("%s: receive error: %s"
-                                        % (self.name, os.strerror(error)))
+                        vlog.warn("%s: receive error: %s"
+                                  % (self.name, os.strerror(error)))
                         self.error(error)
                         return self.status, None
-                elif len(data) == 0:
+                elif not data:
                     self.error(EOF)
                     return EOF, None
                 else:
                     self.input += data
+                    self.received_bytes += len(data)
             else:
                 if self.parser is None:
                     self.parser = ovs.json.Parser()
@@ -292,15 +300,18 @@ class Connection(object):
             self.wait(poller)
             self.recv_wait(poller)
             poller.block()
-    
+
     def transact_block(self, request):
-        id = request.id
+        id_ = request.id
 
         error = self.send(request)
         reply = None
         while not error:
             error, reply = self.recv_block()
-            if reply and reply.type == Message.T_REPLY and reply.id == id:
+            if (reply
+                and (reply.type == Message.T_REPLY
+                     or reply.type == Message.T_ERROR)
+                and reply.id == id_):
                 break
         return error, reply
 
@@ -309,23 +320,23 @@ class Connection(object):
         self.parser = None
         if type(json) in [str, unicode]:
             # XXX rate-limit
-            logging.warning("%s: error parsing stream: %s" % (self.name, json))
+            vlog.warn("%s: error parsing stream: %s" % (self.name, json))
             self.error(errno.EPROTO)
             return
 
         msg = Message.from_json(json)
         if not isinstance(msg, Message):
             # XXX rate-limit
-            logging.warning("%s: received bad JSON-RPC message: %s"
-                            % (self.name, msg))
+            vlog.warn("%s: received bad JSON-RPC message: %s"
+                      % (self.name, msg))
             self.error(errno.EPROTO)
             return
 
         self.__log_msg("received", msg)
         return msg
-        
+
     def recv_wait(self, poller):
-        if self.status or len(self.input) > 0:
+        if self.status or self.input:
             poller.immediate_wake()
         else:
             self.stream.recv_wait(poller)
@@ -335,7 +346,8 @@ class Connection(object):
             self.status = error
             self.stream.close()
             self.output = ""
-            
+
+
 class Session(object):
     """A JSON-RPC session with reconnection."""
 
@@ -351,10 +363,10 @@ class Session(object):
         """Creates and returns a Session that maintains a JSON-RPC session to
         'name', which should be a string acceptable to ovs.stream.Stream or
         ovs.stream.PassiveStream's initializer.
-        
+
         If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
         session connects and reconnects, with back-off, to 'name'.
-        
+
         If 'name' is a passive connection method, e.g. "ptcp:", the new session
         listens for connections to 'name'.  It maintains at most one connection
         at any given time.  Any new connection causes the previous one (if any)
@@ -364,7 +376,10 @@ class Session(object):
         reconnect.enable(ovs.timeval.msec())
 
         if ovs.stream.PassiveStream.is_valid_name(name):
-            self.reconnect.set_passive(True, ovs.timeval.msec())
+            reconnect.set_passive(True, ovs.timeval.msec())
+
+        if not ovs.stream.stream_or_pstream_needs_probes(name):
+            reconnect.set_probe_interval(0)
 
         return Session(reconnect, None)
 
@@ -372,7 +387,7 @@ class Session(object):
     def open_unreliably(jsonrpc):
         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
         reconnect.set_quiet(True)
-        reconnect.set_name(jsonrpc.get_name())
+        reconnect.set_name(jsonrpc.name)
         reconnect.set_max_tries(0)
         reconnect.connected(ovs.timeval.msec())
         return Session(reconnect, jsonrpc)
@@ -398,7 +413,7 @@ class Session(object):
             self.stream.close()
             self.stream = None
             self.seqno += 1
-    
+
     def __connect(self):
         self.__disconnect()
 
@@ -424,8 +439,8 @@ class Session(object):
             if error == 0:
                 if self.rpc or self.stream:
                     # XXX rate-limit
-                    logging.info("%s: new connection replacing active "
-                                 "connection" % self.reconnect.get_name())
+                    vlog.info("%s: new connection replacing active "
+                              "connection" % self.reconnect.get_name())
                     self.__disconnect()
                 self.reconnect.connected(ovs.timeval.msec())
                 self.rpc = Connection(stream)
@@ -435,7 +450,18 @@ class Session(object):
                 self.pstream = None
 
         if self.rpc:
+            backlog = self.rpc.get_backlog()
             self.rpc.run()
+            if self.rpc.get_backlog() < backlog:
+                # Data previously caught in a queue was successfully sent (or
+                # there's an error, which we'll catch below).
+                #
+                # We don't count data that is successfully sent immediately as
+                # activity, because there's a lot of queuing downstream from
+                # us, which means that we can push a lot of data into a
+                # connection that has stalled and won't ever recover.
+                self.reconnect.activity(ovs.timeval.msec())
+
             error = self.rpc.get_status()
             if error != 0:
                 self.reconnect.disconnected(ovs.timeval.msec(), error)
@@ -464,7 +490,7 @@ class Session(object):
                 request.id = "echo"
                 self.rpc.send(request)
         else:
-            assert action == None
+            assert action is None
 
     def wait(self, poller):
         if self.rpc is not None:
@@ -493,9 +519,17 @@ class Session(object):
 
     def recv(self):
         if self.rpc is not None:
+            received_bytes = self.rpc.get_received_bytes()
             error, msg = self.rpc.recv()
+            if received_bytes != self.rpc.get_received_bytes():
+                # Data was successfully received.
+                #
+                # Previously we only counted receiving a full message as
+                # activity, but with large messages or a slow connection that
+                # policy could time out the session mid-message.
+                self.reconnect.activity(ovs.timeval.msec())
+
             if not error:
-                self.reconnect.received(ovs.timeval.msec())
                 if msg.type == Message.T_REQUEST and msg.method == "echo":
                     # Echo request.  Send reply.
                     self.send(Message.create_reply(msg.params, msg.id))
@@ -516,7 +550,7 @@ class Session(object):
         else:
             max_tries = self.reconnect.get_max_tries()
             return max_tries is None or max_tries > 0
-    
+
     def is_connected(self):
         return self.rpc is not None