c6d57daeab713ee4d3b24846eaf355450777ac7e
[cascardo/ovs.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import select
18 import socket
19
20 import ovs.poller
21 import ovs.socket_util
22 import ovs.vlog
23
24 vlog = ovs.vlog.Vlog("stream")
25
26
27 def stream_or_pstream_needs_probes(name):
28     """ 1 if the stream or pstream specified by 'name' needs periodic probes to
29     verify connectivity.  For [p]streams which need probes, it can take a long
30     time to notice the connection was dropped.  Returns 0 if probes aren't
31     needed, and -1 if 'name' is invalid"""
32
33     if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
34         # Only unix and punix are supported currently.
35         return 0
36     else:
37         return -1
38
39
40 class Stream(object):
41     """Bidirectional byte stream.  Currently only Unix domain sockets
42     are implemented."""
43
44     # States.
45     __S_CONNECTING = 0
46     __S_CONNECTED = 1
47     __S_DISCONNECTED = 2
48
49     # Kinds of events that one might wait for.
50     W_CONNECT = 0               # Connect complete (success or failure).
51     W_RECV = 1                  # Data received.
52     W_SEND = 2                  # Send buffer room available.
53
54     _SOCKET_METHODS = {}
55
56     @staticmethod
57     def register_method(method, cls):
58         Stream._SOCKET_METHODS[method + ":"] = cls
59
60     @staticmethod
61     def _find_method(name):
62         for method, cls in Stream._SOCKET_METHODS.items():
63             if name.startswith(method):
64                 return cls
65         return None
66
67     @staticmethod
68     def is_valid_name(name):
69         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
70         TYPE is a supported stream type (currently only "unix:" and "tcp:"),
71         otherwise False."""
72         return bool(Stream._find_method(name))
73
74     def __init__(self, socket, name, status):
75         self.socket = socket
76         self.name = name
77         if status == errno.EAGAIN:
78             self.state = Stream.__S_CONNECTING
79         elif status == 0:
80             self.state = Stream.__S_CONNECTED
81         else:
82             self.state = Stream.__S_DISCONNECTED
83
84         self.error = 0
85
86     # Default value of dscp bits for connection between controller and manager.
87     # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
88     # in <netinet/ip.h> is used.
89     IPTOS_PREC_INTERNETCONTROL = 0xc0
90     DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
91
92     @staticmethod
93     def open(name, dscp=DSCP_DEFAULT):
94         """Attempts to connect a stream to a remote peer.  'name' is a
95         connection name in the form "TYPE:ARGS", where TYPE is an active stream
96         class's name and ARGS are stream class-specific.  Currently the only
97         supported TYPEs are "unix" and "tcp".
98
99         Returns (error, stream): on success 'error' is 0 and 'stream' is the
100         new Stream, on failure 'error' is a positive errno value and 'stream'
101         is None.
102
103         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
104         and a new Stream.  The connect() method can be used to check for
105         successful connection completion."""
106         cls = Stream._find_method(name)
107         if not cls:
108             return errno.EAFNOSUPPORT, None
109
110         suffix = name.split(":", 1)[1]
111         error, sock = cls._open(suffix, dscp)
112         if error:
113             return error, None
114         else:
115             status = ovs.socket_util.check_connection_completion(sock)
116             return 0, Stream(sock, name, status)
117
118     @staticmethod
119     def _open(suffix, dscp):
120         raise NotImplementedError("This method must be overrided by subclass")
121
122     @staticmethod
123     def open_block((error, stream)):
124         """Blocks until a Stream completes its connection attempt, either
125         succeeding or failing.  (error, stream) should be the tuple returned by
126         Stream.open().  Returns a tuple of the same form.
127
128         Typical usage:
129         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
130
131         if not error:
132             while True:
133                 error = stream.connect()
134                 if error != errno.EAGAIN:
135                     break
136                 stream.run()
137                 poller = ovs.poller.Poller()
138                 stream.run_wait(poller)
139                 stream.connect_wait(poller)
140                 poller.block()
141             assert error != errno.EINPROGRESS
142
143         if error and stream:
144             stream.close()
145             stream = None
146         return error, stream
147
148     def close(self):
149         self.socket.close()
150
151     def __scs_connecting(self):
152         retval = ovs.socket_util.check_connection_completion(self.socket)
153         assert retval != errno.EINPROGRESS
154         if retval == 0:
155             self.state = Stream.__S_CONNECTED
156         elif retval != errno.EAGAIN:
157             self.state = Stream.__S_DISCONNECTED
158             self.error = retval
159
160     def connect(self):
161         """Tries to complete the connection on this stream.  If the connection
162         is complete, returns 0 if the connection was successful or a positive
163         errno value if it failed.  If the connection is still in progress,
164         returns errno.EAGAIN."""
165
166         if self.state == Stream.__S_CONNECTING:
167             self.__scs_connecting()
168
169         if self.state == Stream.__S_CONNECTING:
170             return errno.EAGAIN
171         elif self.state == Stream.__S_CONNECTED:
172             return 0
173         else:
174             assert self.state == Stream.__S_DISCONNECTED
175             return self.error
176
177     def recv(self, n):
178         """Tries to receive up to 'n' bytes from this stream.  Returns a
179         (error, string) tuple:
180
181             - If successful, 'error' is zero and 'string' contains between 1
182               and 'n' bytes of data.
183
184             - On error, 'error' is a positive errno value.
185
186             - If the connection has been closed in the normal fashion or if 'n'
187               is 0, the tuple is (0, "").
188
189         The recv function will not block waiting for data to arrive.  If no
190         data have been received, it returns (errno.EAGAIN, "") immediately."""
191
192         retval = self.connect()
193         if retval != 0:
194             return (retval, "")
195         elif n == 0:
196             return (0, "")
197
198         try:
199             return (0, self.socket.recv(n))
200         except socket.error, e:
201             return (ovs.socket_util.get_exception_errno(e), "")
202
203     def send(self, buf):
204         """Tries to send 'buf' on this stream.
205
206         If successful, returns the number of bytes sent, between 1 and
207         len(buf).  0 is only a valid return value if len(buf) is 0.
208
209         On error, returns a negative errno value.
210
211         Will not block.  If no bytes can be immediately accepted for
212         transmission, returns -errno.EAGAIN immediately."""
213
214         retval = self.connect()
215         if retval != 0:
216             return -retval
217         elif len(buf) == 0:
218             return 0
219
220         try:
221             return self.socket.send(buf)
222         except socket.error, e:
223             return -ovs.socket_util.get_exception_errno(e)
224
225     def run(self):
226         pass
227
228     def run_wait(self, poller):
229         pass
230
231     def wait(self, poller, wait):
232         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
233
234         if self.state == Stream.__S_DISCONNECTED:
235             poller.immediate_wake()
236             return
237
238         if self.state == Stream.__S_CONNECTING:
239             wait = Stream.W_CONNECT
240         if wait == Stream.W_RECV:
241             poller.fd_wait(self.socket, select.POLLIN)
242         else:
243             poller.fd_wait(self.socket, select.POLLOUT)
244
245     def connect_wait(self, poller):
246         self.wait(poller, Stream.W_CONNECT)
247
248     def recv_wait(self, poller):
249         self.wait(poller, Stream.W_RECV)
250
251     def send_wait(self, poller):
252         self.wait(poller, Stream.W_SEND)
253
254     def __del__(self):
255         # Don't delete the file: we might have forked.
256         self.socket.close()
257
258
259 class PassiveStream(object):
260     @staticmethod
261     def is_valid_name(name):
262         """Returns True if 'name' is a passive stream name in the form
263         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
264         "punix:"), otherwise False."""
265         return name.startswith("punix:")
266
267     def __init__(self, sock, name, bind_path):
268         self.name = name
269         self.socket = sock
270         self.bind_path = bind_path
271
272     @staticmethod
273     def open(name):
274         """Attempts to start listening for remote stream connections.  'name'
275         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
276         stream class's name and ARGS are stream class-specific.  Currently the
277         only supported TYPE is "punix".
278
279         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
280         new PassiveStream, on failure 'error' is a positive errno value and
281         'pstream' is None."""
282         if not PassiveStream.is_valid_name(name):
283             return errno.EAFNOSUPPORT, None
284
285         bind_path = name[6:]
286         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
287                                                        True, bind_path, None)
288         if error:
289             return error, None
290
291         try:
292             sock.listen(10)
293         except socket.error, e:
294             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
295             sock.close()
296             return e.error, None
297
298         return 0, PassiveStream(sock, name, bind_path)
299
300     def close(self):
301         """Closes this PassiveStream."""
302         self.socket.close()
303         if self.bind_path is not None:
304             ovs.fatal_signal.unlink_file_now(self.bind_path)
305             self.bind_path = None
306
307     def accept(self):
308         """Tries to accept a new connection on this passive stream.  Returns
309         (error, stream): if successful, 'error' is 0 and 'stream' is the new
310         Stream object, and on failure 'error' is a positive errno value and
311         'stream' is None.
312
313         Will not block waiting for a connection.  If no connection is ready to
314         be accepted, returns (errno.EAGAIN, None) immediately."""
315
316         while True:
317             try:
318                 sock, addr = self.socket.accept()
319                 ovs.socket_util.set_nonblocking(sock)
320                 return 0, Stream(sock, "unix:%s" % addr, 0)
321             except socket.error, e:
322                 error = ovs.socket_util.get_exception_errno(e)
323                 if error != errno.EAGAIN:
324                     # XXX rate-limit
325                     vlog.dbg("accept: %s" % os.strerror(error))
326                 return error, None
327
328     def wait(self, poller):
329         poller.fd_wait(self.socket, select.POLLIN)
330
331     def __del__(self):
332         # Don't delete the file: we might have forked.
333         self.socket.close()
334
335
336 def usage(name):
337     return """
338 Active %s connection methods:
339   unix:FILE               Unix domain socket named FILE
340   tcp:IP:PORT             TCP socket to IP with port no of PORT
341
342 Passive %s connection methods:
343   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
344
345
346 class UnixStream(Stream):
347     @staticmethod
348     def _open(suffix, dscp):
349         connect_path = suffix
350         return  ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
351                                                  True, None, connect_path)
352 Stream.register_method("unix", UnixStream)
353
354
355 class TCPStream(Stream):
356     @staticmethod
357     def _open(suffix, dscp):
358         error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
359                                                        suffix, 0, dscp)
360         if not error:
361             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
362         return error, sock
363 Stream.register_method("tcp", TCPStream)