python: Convert dict iterators.
[cascardo/ovs.git] / python / ovs / stream.py
index 9c10612..a555a76 100644 (file)
 
 import errno
 import os
-import select
 import socket
 
+import six
+
 import ovs.poller
 import ovs.socket_util
 import ovs.vlog
@@ -51,12 +52,25 @@ class Stream(object):
     W_RECV = 1                  # Data received.
     W_SEND = 2                  # Send buffer room available.
 
+    _SOCKET_METHODS = {}
+
+    @staticmethod
+    def register_method(method, cls):
+        Stream._SOCKET_METHODS[method + ":"] = cls
+
+    @staticmethod
+    def _find_method(name):
+        for method, cls in six.iteritems(Stream._SOCKET_METHODS):
+            if name.startswith(method):
+                return cls
+        return None
+
     @staticmethod
     def is_valid_name(name):
         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
-        TYPE is a supported stream type (currently only "unix:"), otherwise
-        False."""
-        return name.startswith("unix:")
+        TYPE is a supported stream type (currently only "unix:" and "tcp:"),
+        otherwise False."""
+        return bool(Stream._find_method(name))
 
     def __init__(self, socket, name, status):
         self.socket = socket
@@ -70,12 +84,18 @@ class Stream(object):
 
         self.error = 0
 
+    # Default value of dscp bits for connection between controller and manager.
+    # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
+    # in <netinet/ip.h> is used.
+    IPTOS_PREC_INTERNETCONTROL = 0xc0
+    DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
+
     @staticmethod
-    def open(name):
+    def open(name, dscp=DSCP_DEFAULT):
         """Attempts to connect a stream to a remote peer.  'name' is a
         connection name in the form "TYPE:ARGS", where TYPE is an active stream
         class's name and ARGS are stream class-specific.  Currently the only
-        supported TYPE is "unix".
+        supported TYPEs are "unix" and "tcp".
 
         Returns (error, stream): on success 'error' is 0 and 'stream' is the
         new Stream, on failure 'error' is a positive errno value and 'stream'
@@ -84,13 +104,14 @@ class Stream(object):
         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
         and a new Stream.  The connect() method can be used to check for
         successful connection completion."""
-        if not Stream.is_valid_name(name):
+        cls = Stream._find_method(name)
+        if not cls:
             return errno.EAFNOSUPPORT, None
 
-        connect_path = name[5:]
-        error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
-                                                       True, None,
-                                                       connect_path)
+        suffix = name.split(":", 1)[1]
+        if name.startswith("unix:"):
+            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
+        error, sock = cls._open(suffix, dscp)
         if error:
             return error, None
         else:
@@ -98,7 +119,11 @@ class Stream(object):
             return 0, Stream(sock, name, status)
 
     @staticmethod
-    def open_block((error, stream)):
+    def _open(suffix, dscp):
+        raise NotImplementedError("This method must be overrided by subclass")
+
+    @staticmethod
+    def open_block(error_stream):
         """Blocks until a Stream completes its connection attempt, either
         succeeding or failing.  (error, stream) should be the tuple returned by
         Stream.open().  Returns a tuple of the same form.
@@ -106,6 +131,8 @@ class Stream(object):
         Typical usage:
         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
 
+        # Py3 doesn't support tuple parameter unpacking - PEP 3113
+        error, stream = error_stream
         if not error:
             while True:
                 error = stream.connect()
@@ -140,15 +167,17 @@ class Stream(object):
         is complete, returns 0 if the connection was successful or a positive
         errno value if it failed.  If the connection is still in progress,
         returns errno.EAGAIN."""
-        last_state = -1         # Always differs from initial self.state
-        while self.state != last_state:
-            last_state = self.state
-            if self.state == Stream.__S_CONNECTING:
-                self.__scs_connecting()
-            elif self.state == Stream.__S_CONNECTED:
-                return 0
-            elif self.state == Stream.__S_DISCONNECTED:
-                return self.error
+
+        if self.state == Stream.__S_CONNECTING:
+            self.__scs_connecting()
+
+        if self.state == Stream.__S_CONNECTING:
+            return errno.EAGAIN
+        elif self.state == Stream.__S_CONNECTED:
+            return 0
+        else:
+            assert self.state == Stream.__S_DISCONNECTED
+            return self.error
 
     def recv(self, n):
         """Tries to receive up to 'n' bytes from this stream.  Returns a
@@ -173,7 +202,7 @@ class Stream(object):
 
         try:
             return (0, self.socket.recv(n))
-        except socket.error, e:
+        except socket.error as e:
             return (ovs.socket_util.get_exception_errno(e), "")
 
     def send(self, buf):
@@ -195,7 +224,7 @@ class Stream(object):
 
         try:
             return self.socket.send(buf)
-        except socket.error, e:
+        except socket.error as e:
             return -ovs.socket_util.get_exception_errno(e)
 
     def run(self):
@@ -214,9 +243,9 @@ class Stream(object):
         if self.state == Stream.__S_CONNECTING:
             wait = Stream.W_CONNECT
         if wait == Stream.W_RECV:
-            poller.fd_wait(self.socket, select.POLLIN)
+            poller.fd_wait(self.socket, ovs.poller.POLLIN)
         else:
-            poller.fd_wait(self.socket, select.POLLOUT)
+            poller.fd_wait(self.socket, ovs.poller.POLLOUT)
 
     def connect_wait(self, poller):
         self.wait(poller, Stream.W_CONNECT)
@@ -259,6 +288,8 @@ class PassiveStream(object):
             return errno.EAFNOSUPPORT, None
 
         bind_path = name[6:]
+        if name.startswith("punix:"):
+            bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
                                                        True, bind_path, None)
         if error:
@@ -266,7 +297,7 @@ class PassiveStream(object):
 
         try:
             sock.listen(10)
-        except socket.error, e:
+        except socket.error as e:
             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
             sock.close()
             return e.error, None
@@ -294,7 +325,7 @@ class PassiveStream(object):
                 sock, addr = self.socket.accept()
                 ovs.socket_util.set_nonblocking(sock)
                 return 0, Stream(sock, "unix:%s" % addr, 0)
-            except socket.error, e:
+            except socket.error as e:
                 error = ovs.socket_util.get_exception_errno(e)
                 if error != errno.EAGAIN:
                     # XXX rate-limit
@@ -302,7 +333,7 @@ class PassiveStream(object):
                 return error, None
 
     def wait(self, poller):
-        poller.fd_wait(self.socket, select.POLLIN)
+        poller.fd_wait(self.socket, ovs.poller.POLLIN)
 
     def __del__(self):
         # Don't delete the file: we might have forked.
@@ -313,6 +344,27 @@ def usage(name):
     return """
 Active %s connection methods:
   unix:FILE               Unix domain socket named FILE
+  tcp:IP:PORT             TCP socket to IP with port no of PORT
 
 Passive %s connection methods:
   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
+
+
+class UnixStream(Stream):
+    @staticmethod
+    def _open(suffix, dscp):
+        connect_path = suffix
+        return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
+                                                True, None, connect_path)
+Stream.register_method("unix", UnixStream)
+
+
+class TCPStream(Stream):
+    @staticmethod
+    def _open(suffix, dscp):
+        error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
+                                                       suffix, 0, dscp)
+        if not error:
+            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        return error, sock
+Stream.register_method("tcp", TCPStream)