1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 import ovs.socket_util
24 vlog = ovs.vlog.Vlog("stream")
27 def stream_or_pstream_needs_probes(name):
28 """ 1 if the stream or pstream specified by 'name' needs periodic probes to
29 verify connectivity. For [p]streams which need probes, it can take a long
30 time to notice the connection was dropped. Returns 0 if probes aren't
31 needed, and -1 if 'name' is invalid"""
33 if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
34 # Only unix and punix are supported currently.
41 """Bidirectional byte stream. Currently only Unix domain sockets
49 # Kinds of events that one might wait for.
50 W_CONNECT = 0 # Connect complete (success or failure).
51 W_RECV = 1 # Data received.
52 W_SEND = 2 # Send buffer room available.
57 def register_method(method, cls):
58 Stream._SOCKET_METHODS[method + ":"] = cls
61 def _find_method(name):
62 for method, cls in Stream._SOCKET_METHODS.items():
63 if name.startswith(method):
68 def is_valid_name(name):
69 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
70 TYPE is a supported stream type (currently only "unix:" and "tcp:"),
72 return bool(Stream._find_method(name))
74 def __init__(self, socket, name, status):
77 if status == errno.EAGAIN:
78 self.state = Stream.__S_CONNECTING
80 self.state = Stream.__S_CONNECTED
82 self.state = Stream.__S_DISCONNECTED
86 # Default value of dscp bits for connection between controller and manager.
87 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
88 # in <netinet/ip.h> is used.
89 IPTOS_PREC_INTERNETCONTROL = 0xc0
90 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
93 def open(name, dscp=DSCP_DEFAULT):
94 """Attempts to connect a stream to a remote peer. 'name' is a
95 connection name in the form "TYPE:ARGS", where TYPE is an active stream
96 class's name and ARGS are stream class-specific. Currently the only
97 supported TYPEs are "unix" and "tcp".
99 Returns (error, stream): on success 'error' is 0 and 'stream' is the
100 new Stream, on failure 'error' is a positive errno value and 'stream'
103 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
104 and a new Stream. The connect() method can be used to check for
105 successful connection completion."""
106 cls = Stream._find_method(name)
108 return errno.EAFNOSUPPORT, None
110 suffix = name.split(":", 1)[1]
111 error, sock = cls._open(suffix, dscp)
115 status = ovs.socket_util.check_connection_completion(sock)
116 return 0, Stream(sock, name, status)
119 def _open(suffix, dscp):
120 raise NotImplementedError("This method must be overrided by subclass")
123 def open_block((error, stream)):
124 """Blocks until a Stream completes its connection attempt, either
125 succeeding or failing. (error, stream) should be the tuple returned by
126 Stream.open(). Returns a tuple of the same form.
129 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
133 error = stream.connect()
134 if error != errno.EAGAIN:
137 poller = ovs.poller.Poller()
138 stream.run_wait(poller)
139 stream.connect_wait(poller)
141 assert error != errno.EINPROGRESS
151 def __scs_connecting(self):
152 retval = ovs.socket_util.check_connection_completion(self.socket)
153 assert retval != errno.EINPROGRESS
155 self.state = Stream.__S_CONNECTED
156 elif retval != errno.EAGAIN:
157 self.state = Stream.__S_DISCONNECTED
161 """Tries to complete the connection on this stream. If the connection
162 is complete, returns 0 if the connection was successful or a positive
163 errno value if it failed. If the connection is still in progress,
164 returns errno.EAGAIN."""
166 if self.state == Stream.__S_CONNECTING:
167 self.__scs_connecting()
169 if self.state == Stream.__S_CONNECTING:
171 elif self.state == Stream.__S_CONNECTED:
174 assert self.state == Stream.__S_DISCONNECTED
178 """Tries to receive up to 'n' bytes from this stream. Returns a
179 (error, string) tuple:
181 - If successful, 'error' is zero and 'string' contains between 1
182 and 'n' bytes of data.
184 - On error, 'error' is a positive errno value.
186 - If the connection has been closed in the normal fashion or if 'n'
187 is 0, the tuple is (0, "").
189 The recv function will not block waiting for data to arrive. If no
190 data have been received, it returns (errno.EAGAIN, "") immediately."""
192 retval = self.connect()
199 return (0, self.socket.recv(n))
200 except socket.error, e:
201 return (ovs.socket_util.get_exception_errno(e), "")
204 """Tries to send 'buf' on this stream.
206 If successful, returns the number of bytes sent, between 1 and
207 len(buf). 0 is only a valid return value if len(buf) is 0.
209 On error, returns a negative errno value.
211 Will not block. If no bytes can be immediately accepted for
212 transmission, returns -errno.EAGAIN immediately."""
214 retval = self.connect()
221 return self.socket.send(buf)
222 except socket.error, e:
223 return -ovs.socket_util.get_exception_errno(e)
228 def run_wait(self, poller):
231 def wait(self, poller, wait):
232 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
234 if self.state == Stream.__S_DISCONNECTED:
235 poller.immediate_wake()
238 if self.state == Stream.__S_CONNECTING:
239 wait = Stream.W_CONNECT
240 if wait == Stream.W_RECV:
241 poller.fd_wait(self.socket, select.POLLIN)
243 poller.fd_wait(self.socket, select.POLLOUT)
245 def connect_wait(self, poller):
246 self.wait(poller, Stream.W_CONNECT)
248 def recv_wait(self, poller):
249 self.wait(poller, Stream.W_RECV)
251 def send_wait(self, poller):
252 self.wait(poller, Stream.W_SEND)
255 # Don't delete the file: we might have forked.
259 class PassiveStream(object):
261 def is_valid_name(name):
262 """Returns True if 'name' is a passive stream name in the form
263 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
264 "punix:"), otherwise False."""
265 return name.startswith("punix:")
267 def __init__(self, sock, name, bind_path):
270 self.bind_path = bind_path
274 """Attempts to start listening for remote stream connections. 'name'
275 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
276 stream class's name and ARGS are stream class-specific. Currently the
277 only supported TYPE is "punix".
279 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
280 new PassiveStream, on failure 'error' is a positive errno value and
281 'pstream' is None."""
282 if not PassiveStream.is_valid_name(name):
283 return errno.EAFNOSUPPORT, None
286 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
287 True, bind_path, None)
293 except socket.error, e:
294 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
298 return 0, PassiveStream(sock, name, bind_path)
301 """Closes this PassiveStream."""
303 if self.bind_path is not None:
304 ovs.fatal_signal.unlink_file_now(self.bind_path)
305 self.bind_path = None
308 """Tries to accept a new connection on this passive stream. Returns
309 (error, stream): if successful, 'error' is 0 and 'stream' is the new
310 Stream object, and on failure 'error' is a positive errno value and
313 Will not block waiting for a connection. If no connection is ready to
314 be accepted, returns (errno.EAGAIN, None) immediately."""
318 sock, addr = self.socket.accept()
319 ovs.socket_util.set_nonblocking(sock)
320 return 0, Stream(sock, "unix:%s" % addr, 0)
321 except socket.error, e:
322 error = ovs.socket_util.get_exception_errno(e)
323 if error != errno.EAGAIN:
325 vlog.dbg("accept: %s" % os.strerror(error))
328 def wait(self, poller):
329 poller.fd_wait(self.socket, select.POLLIN)
332 # Don't delete the file: we might have forked.
338 Active %s connection methods:
339 unix:FILE Unix domain socket named FILE
340 tcp:IP:PORT TCP socket to IP with port no of PORT
342 Passive %s connection methods:
343 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
346 class UnixStream(Stream):
348 def _open(suffix, dscp):
349 connect_path = suffix
350 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
351 True, None, connect_path)
352 Stream.register_method("unix", UnixStream)
355 class TCPStream(Stream):
357 def _open(suffix, dscp):
358 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
361 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
363 Stream.register_method("tcp", TCPStream)