1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 import ovs.socket_util
26 vlog = ovs.vlog.Vlog("stream")
29 def stream_or_pstream_needs_probes(name):
30 """ 1 if the stream or pstream specified by 'name' needs periodic probes to
31 verify connectivity. For [p]streams which need probes, it can take a long
32 time to notice the connection was dropped. Returns 0 if probes aren't
33 needed, and -1 if 'name' is invalid"""
35 if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
36 # Only unix and punix are supported currently.
43 """Bidirectional byte stream. Currently only Unix domain sockets
51 # Kinds of events that one might wait for.
52 W_CONNECT = 0 # Connect complete (success or failure).
53 W_RECV = 1 # Data received.
54 W_SEND = 2 # Send buffer room available.
59 def register_method(method, cls):
60 Stream._SOCKET_METHODS[method + ":"] = cls
63 def _find_method(name):
64 for method, cls in six.iteritems(Stream._SOCKET_METHODS):
65 if name.startswith(method):
70 def is_valid_name(name):
71 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
72 TYPE is a supported stream type (currently only "unix:" and "tcp:"),
74 return bool(Stream._find_method(name))
76 def __init__(self, socket, name, status):
79 if status == errno.EAGAIN:
80 self.state = Stream.__S_CONNECTING
82 self.state = Stream.__S_CONNECTED
84 self.state = Stream.__S_DISCONNECTED
88 # Default value of dscp bits for connection between controller and manager.
89 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
90 # in <netinet/ip.h> is used.
91 IPTOS_PREC_INTERNETCONTROL = 0xc0
92 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
95 def open(name, dscp=DSCP_DEFAULT):
96 """Attempts to connect a stream to a remote peer. 'name' is a
97 connection name in the form "TYPE:ARGS", where TYPE is an active stream
98 class's name and ARGS are stream class-specific. Currently the only
99 supported TYPEs are "unix" and "tcp".
101 Returns (error, stream): on success 'error' is 0 and 'stream' is the
102 new Stream, on failure 'error' is a positive errno value and 'stream'
105 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
106 and a new Stream. The connect() method can be used to check for
107 successful connection completion."""
108 cls = Stream._find_method(name)
110 return errno.EAFNOSUPPORT, None
112 suffix = name.split(":", 1)[1]
113 if name.startswith("unix:"):
114 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
115 error, sock = cls._open(suffix, dscp)
119 status = ovs.socket_util.check_connection_completion(sock)
120 return 0, Stream(sock, name, status)
123 def _open(suffix, dscp):
124 raise NotImplementedError("This method must be overrided by subclass")
127 def open_block(error_stream):
128 """Blocks until a Stream completes its connection attempt, either
129 succeeding or failing. (error, stream) should be the tuple returned by
130 Stream.open(). Returns a tuple of the same form.
133 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
135 # Py3 doesn't support tuple parameter unpacking - PEP 3113
136 error, stream = error_stream
139 error = stream.connect()
140 if error != errno.EAGAIN:
143 poller = ovs.poller.Poller()
144 stream.run_wait(poller)
145 stream.connect_wait(poller)
147 assert error != errno.EINPROGRESS
157 def __scs_connecting(self):
158 retval = ovs.socket_util.check_connection_completion(self.socket)
159 assert retval != errno.EINPROGRESS
161 self.state = Stream.__S_CONNECTED
162 elif retval != errno.EAGAIN:
163 self.state = Stream.__S_DISCONNECTED
167 """Tries to complete the connection on this stream. If the connection
168 is complete, returns 0 if the connection was successful or a positive
169 errno value if it failed. If the connection is still in progress,
170 returns errno.EAGAIN."""
172 if self.state == Stream.__S_CONNECTING:
173 self.__scs_connecting()
175 if self.state == Stream.__S_CONNECTING:
177 elif self.state == Stream.__S_CONNECTED:
180 assert self.state == Stream.__S_DISCONNECTED
184 """Tries to receive up to 'n' bytes from this stream. Returns a
185 (error, string) tuple:
187 - If successful, 'error' is zero and 'string' contains between 1
188 and 'n' bytes of data.
190 - On error, 'error' is a positive errno value.
192 - If the connection has been closed in the normal fashion or if 'n'
193 is 0, the tuple is (0, "").
195 The recv function will not block waiting for data to arrive. If no
196 data have been received, it returns (errno.EAGAIN, "") immediately."""
198 retval = self.connect()
205 return (0, self.socket.recv(n))
206 except socket.error as e:
207 return (ovs.socket_util.get_exception_errno(e), "")
210 """Tries to send 'buf' on this stream.
212 If successful, returns the number of bytes sent, between 1 and
213 len(buf). 0 is only a valid return value if len(buf) is 0.
215 On error, returns a negative errno value.
217 Will not block. If no bytes can be immediately accepted for
218 transmission, returns -errno.EAGAIN immediately."""
220 retval = self.connect()
227 # Python 3 has separate types for strings and bytes. We must have
229 if (sys.version_info[0] >= 3
230 and not isinstance(buf, six.binary_type)):
231 buf = six.binary_type(buf, 'utf-8')
232 return self.socket.send(buf)
233 except socket.error as e:
234 return -ovs.socket_util.get_exception_errno(e)
239 def run_wait(self, poller):
242 def wait(self, poller, wait):
243 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
245 if self.state == Stream.__S_DISCONNECTED:
246 poller.immediate_wake()
249 if self.state == Stream.__S_CONNECTING:
250 wait = Stream.W_CONNECT
251 if wait == Stream.W_RECV:
252 poller.fd_wait(self.socket, ovs.poller.POLLIN)
254 poller.fd_wait(self.socket, ovs.poller.POLLOUT)
256 def connect_wait(self, poller):
257 self.wait(poller, Stream.W_CONNECT)
259 def recv_wait(self, poller):
260 self.wait(poller, Stream.W_RECV)
262 def send_wait(self, poller):
263 self.wait(poller, Stream.W_SEND)
266 # Don't delete the file: we might have forked.
270 class PassiveStream(object):
272 def is_valid_name(name):
273 """Returns True if 'name' is a passive stream name in the form
274 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
275 "punix:"), otherwise False."""
276 return name.startswith("punix:")
278 def __init__(self, sock, name, bind_path):
281 self.bind_path = bind_path
285 """Attempts to start listening for remote stream connections. 'name'
286 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
287 stream class's name and ARGS are stream class-specific. Currently the
288 only supported TYPE is "punix".
290 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
291 new PassiveStream, on failure 'error' is a positive errno value and
292 'pstream' is None."""
293 if not PassiveStream.is_valid_name(name):
294 return errno.EAFNOSUPPORT, None
297 if name.startswith("punix:"):
298 bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
299 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
300 True, bind_path, None)
306 except socket.error as e:
307 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
311 return 0, PassiveStream(sock, name, bind_path)
314 """Closes this PassiveStream."""
316 if self.bind_path is not None:
317 ovs.fatal_signal.unlink_file_now(self.bind_path)
318 self.bind_path = None
321 """Tries to accept a new connection on this passive stream. Returns
322 (error, stream): if successful, 'error' is 0 and 'stream' is the new
323 Stream object, and on failure 'error' is a positive errno value and
326 Will not block waiting for a connection. If no connection is ready to
327 be accepted, returns (errno.EAGAIN, None) immediately."""
331 sock, addr = self.socket.accept()
332 ovs.socket_util.set_nonblocking(sock)
333 return 0, Stream(sock, "unix:%s" % addr, 0)
334 except socket.error as e:
335 error = ovs.socket_util.get_exception_errno(e)
336 if error != errno.EAGAIN:
338 vlog.dbg("accept: %s" % os.strerror(error))
341 def wait(self, poller):
342 poller.fd_wait(self.socket, ovs.poller.POLLIN)
345 # Don't delete the file: we might have forked.
351 Active %s connection methods:
352 unix:FILE Unix domain socket named FILE
353 tcp:IP:PORT TCP socket to IP with port no of PORT
355 Passive %s connection methods:
356 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
359 class UnixStream(Stream):
361 def _open(suffix, dscp):
362 connect_path = suffix
363 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
364 True, None, connect_path)
365 Stream.register_method("unix", UnixStream)
368 class TCPStream(Stream):
370 def _open(suffix, dscp):
371 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
374 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
376 Stream.register_method("tcp", TCPStream)