1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 import ovs.socket_util
23 vlog = ovs.vlog.Vlog("stream")
26 def stream_or_pstream_needs_probes(name):
27 """ 1 if the stream or pstream specified by 'name' needs periodic probes to
28 verify connectivity. For [p]streams which need probes, it can take a long
29 time to notice the connection was dropped. Returns 0 if probes aren't
30 needed, and -1 if 'name' is invalid"""
32 if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
33 # Only unix and punix are supported currently.
40 """Bidirectional byte stream. Currently only Unix domain sockets
48 # Kinds of events that one might wait for.
49 W_CONNECT = 0 # Connect complete (success or failure).
50 W_RECV = 1 # Data received.
51 W_SEND = 2 # Send buffer room available.
56 def register_method(method, cls):
57 Stream._SOCKET_METHODS[method + ":"] = cls
60 def _find_method(name):
61 for method, cls in Stream._SOCKET_METHODS.items():
62 if name.startswith(method):
67 def is_valid_name(name):
68 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
69 TYPE is a supported stream type (currently only "unix:" and "tcp:"),
71 return bool(Stream._find_method(name))
73 def __init__(self, socket, name, status):
76 if status == errno.EAGAIN:
77 self.state = Stream.__S_CONNECTING
79 self.state = Stream.__S_CONNECTED
81 self.state = Stream.__S_DISCONNECTED
85 # Default value of dscp bits for connection between controller and manager.
86 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
87 # in <netinet/ip.h> is used.
88 IPTOS_PREC_INTERNETCONTROL = 0xc0
89 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
92 def open(name, dscp=DSCP_DEFAULT):
93 """Attempts to connect a stream to a remote peer. 'name' is a
94 connection name in the form "TYPE:ARGS", where TYPE is an active stream
95 class's name and ARGS are stream class-specific. Currently the only
96 supported TYPEs are "unix" and "tcp".
98 Returns (error, stream): on success 'error' is 0 and 'stream' is the
99 new Stream, on failure 'error' is a positive errno value and 'stream'
102 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
103 and a new Stream. The connect() method can be used to check for
104 successful connection completion."""
105 cls = Stream._find_method(name)
107 return errno.EAFNOSUPPORT, None
109 suffix = name.split(":", 1)[1]
110 if name.startswith("unix:"):
111 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
112 error, sock = cls._open(suffix, dscp)
116 status = ovs.socket_util.check_connection_completion(sock)
117 return 0, Stream(sock, name, status)
120 def _open(suffix, dscp):
121 raise NotImplementedError("This method must be overrided by subclass")
124 def open_block(error_stream):
125 """Blocks until a Stream completes its connection attempt, either
126 succeeding or failing. (error, stream) should be the tuple returned by
127 Stream.open(). Returns a tuple of the same form.
130 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
132 # Py3 doesn't support tuple parameter unpacking - PEP 3113
133 error, stream = error_stream
136 error = stream.connect()
137 if error != errno.EAGAIN:
140 poller = ovs.poller.Poller()
141 stream.run_wait(poller)
142 stream.connect_wait(poller)
144 assert error != errno.EINPROGRESS
154 def __scs_connecting(self):
155 retval = ovs.socket_util.check_connection_completion(self.socket)
156 assert retval != errno.EINPROGRESS
158 self.state = Stream.__S_CONNECTED
159 elif retval != errno.EAGAIN:
160 self.state = Stream.__S_DISCONNECTED
164 """Tries to complete the connection on this stream. If the connection
165 is complete, returns 0 if the connection was successful or a positive
166 errno value if it failed. If the connection is still in progress,
167 returns errno.EAGAIN."""
169 if self.state == Stream.__S_CONNECTING:
170 self.__scs_connecting()
172 if self.state == Stream.__S_CONNECTING:
174 elif self.state == Stream.__S_CONNECTED:
177 assert self.state == Stream.__S_DISCONNECTED
181 """Tries to receive up to 'n' bytes from this stream. Returns a
182 (error, string) tuple:
184 - If successful, 'error' is zero and 'string' contains between 1
185 and 'n' bytes of data.
187 - On error, 'error' is a positive errno value.
189 - If the connection has been closed in the normal fashion or if 'n'
190 is 0, the tuple is (0, "").
192 The recv function will not block waiting for data to arrive. If no
193 data have been received, it returns (errno.EAGAIN, "") immediately."""
195 retval = self.connect()
202 return (0, self.socket.recv(n))
203 except socket.error as e:
204 return (ovs.socket_util.get_exception_errno(e), "")
207 """Tries to send 'buf' on this stream.
209 If successful, returns the number of bytes sent, between 1 and
210 len(buf). 0 is only a valid return value if len(buf) is 0.
212 On error, returns a negative errno value.
214 Will not block. If no bytes can be immediately accepted for
215 transmission, returns -errno.EAGAIN immediately."""
217 retval = self.connect()
224 return self.socket.send(buf)
225 except socket.error as e:
226 return -ovs.socket_util.get_exception_errno(e)
231 def run_wait(self, poller):
234 def wait(self, poller, wait):
235 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
237 if self.state == Stream.__S_DISCONNECTED:
238 poller.immediate_wake()
241 if self.state == Stream.__S_CONNECTING:
242 wait = Stream.W_CONNECT
243 if wait == Stream.W_RECV:
244 poller.fd_wait(self.socket, ovs.poller.POLLIN)
246 poller.fd_wait(self.socket, ovs.poller.POLLOUT)
248 def connect_wait(self, poller):
249 self.wait(poller, Stream.W_CONNECT)
251 def recv_wait(self, poller):
252 self.wait(poller, Stream.W_RECV)
254 def send_wait(self, poller):
255 self.wait(poller, Stream.W_SEND)
258 # Don't delete the file: we might have forked.
262 class PassiveStream(object):
264 def is_valid_name(name):
265 """Returns True if 'name' is a passive stream name in the form
266 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
267 "punix:"), otherwise False."""
268 return name.startswith("punix:")
270 def __init__(self, sock, name, bind_path):
273 self.bind_path = bind_path
277 """Attempts to start listening for remote stream connections. 'name'
278 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
279 stream class's name and ARGS are stream class-specific. Currently the
280 only supported TYPE is "punix".
282 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
283 new PassiveStream, on failure 'error' is a positive errno value and
284 'pstream' is None."""
285 if not PassiveStream.is_valid_name(name):
286 return errno.EAFNOSUPPORT, None
289 if name.startswith("punix:"):
290 bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
291 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
292 True, bind_path, None)
298 except socket.error as e:
299 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
303 return 0, PassiveStream(sock, name, bind_path)
306 """Closes this PassiveStream."""
308 if self.bind_path is not None:
309 ovs.fatal_signal.unlink_file_now(self.bind_path)
310 self.bind_path = None
313 """Tries to accept a new connection on this passive stream. Returns
314 (error, stream): if successful, 'error' is 0 and 'stream' is the new
315 Stream object, and on failure 'error' is a positive errno value and
318 Will not block waiting for a connection. If no connection is ready to
319 be accepted, returns (errno.EAGAIN, None) immediately."""
323 sock, addr = self.socket.accept()
324 ovs.socket_util.set_nonblocking(sock)
325 return 0, Stream(sock, "unix:%s" % addr, 0)
326 except socket.error as e:
327 error = ovs.socket_util.get_exception_errno(e)
328 if error != errno.EAGAIN:
330 vlog.dbg("accept: %s" % os.strerror(error))
333 def wait(self, poller):
334 poller.fd_wait(self.socket, ovs.poller.POLLIN)
337 # Don't delete the file: we might have forked.
343 Active %s connection methods:
344 unix:FILE Unix domain socket named FILE
345 tcp:IP:PORT TCP socket to IP with port no of PORT
347 Passive %s connection methods:
348 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
351 class UnixStream(Stream):
353 def _open(suffix, dscp):
354 connect_path = suffix
355 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
356 True, None, connect_path)
357 Stream.register_method("unix", UnixStream)
360 class TCPStream(Stream):
362 def _open(suffix, dscp):
363 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
366 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
368 Stream.register_method("tcp", TCPStream)