42bc4cc7a4e5fff8979410ad09caec03a8ac57c0
[cascardo/ovs.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import socket
18
19 import ovs.poller
20 import ovs.socket_util
21 import ovs.vlog
22
23 vlog = ovs.vlog.Vlog("stream")
24
25
26 def stream_or_pstream_needs_probes(name):
27     """ 1 if the stream or pstream specified by 'name' needs periodic probes to
28     verify connectivity.  For [p]streams which need probes, it can take a long
29     time to notice the connection was dropped.  Returns 0 if probes aren't
30     needed, and -1 if 'name' is invalid"""
31
32     if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
33         # Only unix and punix are supported currently.
34         return 0
35     else:
36         return -1
37
38
39 class Stream(object):
40     """Bidirectional byte stream.  Currently only Unix domain sockets
41     are implemented."""
42
43     # States.
44     __S_CONNECTING = 0
45     __S_CONNECTED = 1
46     __S_DISCONNECTED = 2
47
48     # Kinds of events that one might wait for.
49     W_CONNECT = 0               # Connect complete (success or failure).
50     W_RECV = 1                  # Data received.
51     W_SEND = 2                  # Send buffer room available.
52
53     _SOCKET_METHODS = {}
54
55     @staticmethod
56     def register_method(method, cls):
57         Stream._SOCKET_METHODS[method + ":"] = cls
58
59     @staticmethod
60     def _find_method(name):
61         for method, cls in Stream._SOCKET_METHODS.items():
62             if name.startswith(method):
63                 return cls
64         return None
65
66     @staticmethod
67     def is_valid_name(name):
68         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
69         TYPE is a supported stream type (currently only "unix:" and "tcp:"),
70         otherwise False."""
71         return bool(Stream._find_method(name))
72
73     def __init__(self, socket, name, status):
74         self.socket = socket
75         self.name = name
76         if status == errno.EAGAIN:
77             self.state = Stream.__S_CONNECTING
78         elif status == 0:
79             self.state = Stream.__S_CONNECTED
80         else:
81             self.state = Stream.__S_DISCONNECTED
82
83         self.error = 0
84
85     # Default value of dscp bits for connection between controller and manager.
86     # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
87     # in <netinet/ip.h> is used.
88     IPTOS_PREC_INTERNETCONTROL = 0xc0
89     DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
90
91     @staticmethod
92     def open(name, dscp=DSCP_DEFAULT):
93         """Attempts to connect a stream to a remote peer.  'name' is a
94         connection name in the form "TYPE:ARGS", where TYPE is an active stream
95         class's name and ARGS are stream class-specific.  Currently the only
96         supported TYPEs are "unix" and "tcp".
97
98         Returns (error, stream): on success 'error' is 0 and 'stream' is the
99         new Stream, on failure 'error' is a positive errno value and 'stream'
100         is None.
101
102         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
103         and a new Stream.  The connect() method can be used to check for
104         successful connection completion."""
105         cls = Stream._find_method(name)
106         if not cls:
107             return errno.EAFNOSUPPORT, None
108
109         suffix = name.split(":", 1)[1]
110         if name.startswith("unix:"):
111             suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
112         error, sock = cls._open(suffix, dscp)
113         if error:
114             return error, None
115         else:
116             status = ovs.socket_util.check_connection_completion(sock)
117             return 0, Stream(sock, name, status)
118
119     @staticmethod
120     def _open(suffix, dscp):
121         raise NotImplementedError("This method must be overrided by subclass")
122
123     @staticmethod
124     def open_block(error_stream):
125         """Blocks until a Stream completes its connection attempt, either
126         succeeding or failing.  (error, stream) should be the tuple returned by
127         Stream.open().  Returns a tuple of the same form.
128
129         Typical usage:
130         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
131
132         # Py3 doesn't support tuple parameter unpacking - PEP 3113
133         error, stream = error_stream
134         if not error:
135             while True:
136                 error = stream.connect()
137                 if error != errno.EAGAIN:
138                     break
139                 stream.run()
140                 poller = ovs.poller.Poller()
141                 stream.run_wait(poller)
142                 stream.connect_wait(poller)
143                 poller.block()
144             assert error != errno.EINPROGRESS
145
146         if error and stream:
147             stream.close()
148             stream = None
149         return error, stream
150
151     def close(self):
152         self.socket.close()
153
154     def __scs_connecting(self):
155         retval = ovs.socket_util.check_connection_completion(self.socket)
156         assert retval != errno.EINPROGRESS
157         if retval == 0:
158             self.state = Stream.__S_CONNECTED
159         elif retval != errno.EAGAIN:
160             self.state = Stream.__S_DISCONNECTED
161             self.error = retval
162
163     def connect(self):
164         """Tries to complete the connection on this stream.  If the connection
165         is complete, returns 0 if the connection was successful or a positive
166         errno value if it failed.  If the connection is still in progress,
167         returns errno.EAGAIN."""
168
169         if self.state == Stream.__S_CONNECTING:
170             self.__scs_connecting()
171
172         if self.state == Stream.__S_CONNECTING:
173             return errno.EAGAIN
174         elif self.state == Stream.__S_CONNECTED:
175             return 0
176         else:
177             assert self.state == Stream.__S_DISCONNECTED
178             return self.error
179
180     def recv(self, n):
181         """Tries to receive up to 'n' bytes from this stream.  Returns a
182         (error, string) tuple:
183
184             - If successful, 'error' is zero and 'string' contains between 1
185               and 'n' bytes of data.
186
187             - On error, 'error' is a positive errno value.
188
189             - If the connection has been closed in the normal fashion or if 'n'
190               is 0, the tuple is (0, "").
191
192         The recv function will not block waiting for data to arrive.  If no
193         data have been received, it returns (errno.EAGAIN, "") immediately."""
194
195         retval = self.connect()
196         if retval != 0:
197             return (retval, "")
198         elif n == 0:
199             return (0, "")
200
201         try:
202             return (0, self.socket.recv(n))
203         except socket.error as e:
204             return (ovs.socket_util.get_exception_errno(e), "")
205
206     def send(self, buf):
207         """Tries to send 'buf' on this stream.
208
209         If successful, returns the number of bytes sent, between 1 and
210         len(buf).  0 is only a valid return value if len(buf) is 0.
211
212         On error, returns a negative errno value.
213
214         Will not block.  If no bytes can be immediately accepted for
215         transmission, returns -errno.EAGAIN immediately."""
216
217         retval = self.connect()
218         if retval != 0:
219             return -retval
220         elif len(buf) == 0:
221             return 0
222
223         try:
224             return self.socket.send(buf)
225         except socket.error as e:
226             return -ovs.socket_util.get_exception_errno(e)
227
228     def run(self):
229         pass
230
231     def run_wait(self, poller):
232         pass
233
234     def wait(self, poller, wait):
235         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
236
237         if self.state == Stream.__S_DISCONNECTED:
238             poller.immediate_wake()
239             return
240
241         if self.state == Stream.__S_CONNECTING:
242             wait = Stream.W_CONNECT
243         if wait == Stream.W_RECV:
244             poller.fd_wait(self.socket, ovs.poller.POLLIN)
245         else:
246             poller.fd_wait(self.socket, ovs.poller.POLLOUT)
247
248     def connect_wait(self, poller):
249         self.wait(poller, Stream.W_CONNECT)
250
251     def recv_wait(self, poller):
252         self.wait(poller, Stream.W_RECV)
253
254     def send_wait(self, poller):
255         self.wait(poller, Stream.W_SEND)
256
257     def __del__(self):
258         # Don't delete the file: we might have forked.
259         self.socket.close()
260
261
262 class PassiveStream(object):
263     @staticmethod
264     def is_valid_name(name):
265         """Returns True if 'name' is a passive stream name in the form
266         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
267         "punix:"), otherwise False."""
268         return name.startswith("punix:")
269
270     def __init__(self, sock, name, bind_path):
271         self.name = name
272         self.socket = sock
273         self.bind_path = bind_path
274
275     @staticmethod
276     def open(name):
277         """Attempts to start listening for remote stream connections.  'name'
278         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
279         stream class's name and ARGS are stream class-specific.  Currently the
280         only supported TYPE is "punix".
281
282         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
283         new PassiveStream, on failure 'error' is a positive errno value and
284         'pstream' is None."""
285         if not PassiveStream.is_valid_name(name):
286             return errno.EAFNOSUPPORT, None
287
288         bind_path = name[6:]
289         if name.startswith("punix:"):
290             bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
291         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
292                                                        True, bind_path, None)
293         if error:
294             return error, None
295
296         try:
297             sock.listen(10)
298         except socket.error as e:
299             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
300             sock.close()
301             return e.error, None
302
303         return 0, PassiveStream(sock, name, bind_path)
304
305     def close(self):
306         """Closes this PassiveStream."""
307         self.socket.close()
308         if self.bind_path is not None:
309             ovs.fatal_signal.unlink_file_now(self.bind_path)
310             self.bind_path = None
311
312     def accept(self):
313         """Tries to accept a new connection on this passive stream.  Returns
314         (error, stream): if successful, 'error' is 0 and 'stream' is the new
315         Stream object, and on failure 'error' is a positive errno value and
316         'stream' is None.
317
318         Will not block waiting for a connection.  If no connection is ready to
319         be accepted, returns (errno.EAGAIN, None) immediately."""
320
321         while True:
322             try:
323                 sock, addr = self.socket.accept()
324                 ovs.socket_util.set_nonblocking(sock)
325                 return 0, Stream(sock, "unix:%s" % addr, 0)
326             except socket.error as e:
327                 error = ovs.socket_util.get_exception_errno(e)
328                 if error != errno.EAGAIN:
329                     # XXX rate-limit
330                     vlog.dbg("accept: %s" % os.strerror(error))
331                 return error, None
332
333     def wait(self, poller):
334         poller.fd_wait(self.socket, ovs.poller.POLLIN)
335
336     def __del__(self):
337         # Don't delete the file: we might have forked.
338         self.socket.close()
339
340
341 def usage(name):
342     return """
343 Active %s connection methods:
344   unix:FILE               Unix domain socket named FILE
345   tcp:IP:PORT             TCP socket to IP with port no of PORT
346
347 Passive %s connection methods:
348   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
349
350
351 class UnixStream(Stream):
352     @staticmethod
353     def _open(suffix, dscp):
354         connect_path = suffix
355         return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
356                                                 True, None, connect_path)
357 Stream.register_method("unix", UnixStream)
358
359
360 class TCPStream(Stream):
361     @staticmethod
362     def _open(suffix, dscp):
363         error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
364                                                        suffix, 0, dscp)
365         if not error:
366             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
367         return error, sock
368 Stream.register_method("tcp", TCPStream)