unixctl: Log commands received and their replies (at debug level).
[cascardo/ovs.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import socket
18 import sys
19
20 import six
21
22 import ovs.poller
23 import ovs.socket_util
24 import ovs.vlog
25
26 vlog = ovs.vlog.Vlog("stream")
27
28
29 def stream_or_pstream_needs_probes(name):
30     """ 1 if the stream or pstream specified by 'name' needs periodic probes to
31     verify connectivity.  For [p]streams which need probes, it can take a long
32     time to notice the connection was dropped.  Returns 0 if probes aren't
33     needed, and -1 if 'name' is invalid"""
34
35     if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
36         # Only unix and punix are supported currently.
37         return 0
38     else:
39         return -1
40
41
42 class Stream(object):
43     """Bidirectional byte stream.  Currently only Unix domain sockets
44     are implemented."""
45
46     # States.
47     __S_CONNECTING = 0
48     __S_CONNECTED = 1
49     __S_DISCONNECTED = 2
50
51     # Kinds of events that one might wait for.
52     W_CONNECT = 0               # Connect complete (success or failure).
53     W_RECV = 1                  # Data received.
54     W_SEND = 2                  # Send buffer room available.
55
56     _SOCKET_METHODS = {}
57
58     @staticmethod
59     def register_method(method, cls):
60         Stream._SOCKET_METHODS[method + ":"] = cls
61
62     @staticmethod
63     def _find_method(name):
64         for method, cls in six.iteritems(Stream._SOCKET_METHODS):
65             if name.startswith(method):
66                 return cls
67         return None
68
69     @staticmethod
70     def is_valid_name(name):
71         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
72         TYPE is a supported stream type (currently only "unix:" and "tcp:"),
73         otherwise False."""
74         return bool(Stream._find_method(name))
75
76     def __init__(self, socket, name, status):
77         self.socket = socket
78         self.name = name
79         if status == errno.EAGAIN:
80             self.state = Stream.__S_CONNECTING
81         elif status == 0:
82             self.state = Stream.__S_CONNECTED
83         else:
84             self.state = Stream.__S_DISCONNECTED
85
86         self.error = 0
87
88     # Default value of dscp bits for connection between controller and manager.
89     # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
90     # in <netinet/ip.h> is used.
91     IPTOS_PREC_INTERNETCONTROL = 0xc0
92     DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
93
94     @staticmethod
95     def open(name, dscp=DSCP_DEFAULT):
96         """Attempts to connect a stream to a remote peer.  'name' is a
97         connection name in the form "TYPE:ARGS", where TYPE is an active stream
98         class's name and ARGS are stream class-specific.  Currently the only
99         supported TYPEs are "unix" and "tcp".
100
101         Returns (error, stream): on success 'error' is 0 and 'stream' is the
102         new Stream, on failure 'error' is a positive errno value and 'stream'
103         is None.
104
105         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
106         and a new Stream.  The connect() method can be used to check for
107         successful connection completion."""
108         cls = Stream._find_method(name)
109         if not cls:
110             return errno.EAFNOSUPPORT, None
111
112         suffix = name.split(":", 1)[1]
113         if name.startswith("unix:"):
114             suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
115         error, sock = cls._open(suffix, dscp)
116         if error:
117             return error, None
118         else:
119             status = ovs.socket_util.check_connection_completion(sock)
120             return 0, Stream(sock, name, status)
121
122     @staticmethod
123     def _open(suffix, dscp):
124         raise NotImplementedError("This method must be overrided by subclass")
125
126     @staticmethod
127     def open_block(error_stream):
128         """Blocks until a Stream completes its connection attempt, either
129         succeeding or failing.  (error, stream) should be the tuple returned by
130         Stream.open().  Returns a tuple of the same form.
131
132         Typical usage:
133         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
134
135         # Py3 doesn't support tuple parameter unpacking - PEP 3113
136         error, stream = error_stream
137         if not error:
138             while True:
139                 error = stream.connect()
140                 if error != errno.EAGAIN:
141                     break
142                 stream.run()
143                 poller = ovs.poller.Poller()
144                 stream.run_wait(poller)
145                 stream.connect_wait(poller)
146                 poller.block()
147             assert error != errno.EINPROGRESS
148
149         if error and stream:
150             stream.close()
151             stream = None
152         return error, stream
153
154     def close(self):
155         self.socket.close()
156
157     def __scs_connecting(self):
158         retval = ovs.socket_util.check_connection_completion(self.socket)
159         assert retval != errno.EINPROGRESS
160         if retval == 0:
161             self.state = Stream.__S_CONNECTED
162         elif retval != errno.EAGAIN:
163             self.state = Stream.__S_DISCONNECTED
164             self.error = retval
165
166     def connect(self):
167         """Tries to complete the connection on this stream.  If the connection
168         is complete, returns 0 if the connection was successful or a positive
169         errno value if it failed.  If the connection is still in progress,
170         returns errno.EAGAIN."""
171
172         if self.state == Stream.__S_CONNECTING:
173             self.__scs_connecting()
174
175         if self.state == Stream.__S_CONNECTING:
176             return errno.EAGAIN
177         elif self.state == Stream.__S_CONNECTED:
178             return 0
179         else:
180             assert self.state == Stream.__S_DISCONNECTED
181             return self.error
182
183     def recv(self, n):
184         """Tries to receive up to 'n' bytes from this stream.  Returns a
185         (error, string) tuple:
186
187             - If successful, 'error' is zero and 'string' contains between 1
188               and 'n' bytes of data.
189
190             - On error, 'error' is a positive errno value.
191
192             - If the connection has been closed in the normal fashion or if 'n'
193               is 0, the tuple is (0, "").
194
195         The recv function will not block waiting for data to arrive.  If no
196         data have been received, it returns (errno.EAGAIN, "") immediately."""
197
198         retval = self.connect()
199         if retval != 0:
200             return (retval, "")
201         elif n == 0:
202             return (0, "")
203
204         try:
205             return (0, self.socket.recv(n))
206         except socket.error as e:
207             return (ovs.socket_util.get_exception_errno(e), "")
208
209     def send(self, buf):
210         """Tries to send 'buf' on this stream.
211
212         If successful, returns the number of bytes sent, between 1 and
213         len(buf).  0 is only a valid return value if len(buf) is 0.
214
215         On error, returns a negative errno value.
216
217         Will not block.  If no bytes can be immediately accepted for
218         transmission, returns -errno.EAGAIN immediately."""
219
220         retval = self.connect()
221         if retval != 0:
222             return -retval
223         elif len(buf) == 0:
224             return 0
225
226         try:
227             # Python 3 has separate types for strings and bytes.  We must have
228             # bytes here.
229             if (sys.version_info[0] >= 3
230                     and not isinstance(buf, six.binary_type)):
231                 buf = six.binary_type(buf, 'utf-8')
232             return self.socket.send(buf)
233         except socket.error as e:
234             return -ovs.socket_util.get_exception_errno(e)
235
236     def run(self):
237         pass
238
239     def run_wait(self, poller):
240         pass
241
242     def wait(self, poller, wait):
243         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
244
245         if self.state == Stream.__S_DISCONNECTED:
246             poller.immediate_wake()
247             return
248
249         if self.state == Stream.__S_CONNECTING:
250             wait = Stream.W_CONNECT
251         if wait == Stream.W_RECV:
252             poller.fd_wait(self.socket, ovs.poller.POLLIN)
253         else:
254             poller.fd_wait(self.socket, ovs.poller.POLLOUT)
255
256     def connect_wait(self, poller):
257         self.wait(poller, Stream.W_CONNECT)
258
259     def recv_wait(self, poller):
260         self.wait(poller, Stream.W_RECV)
261
262     def send_wait(self, poller):
263         self.wait(poller, Stream.W_SEND)
264
265     def __del__(self):
266         # Don't delete the file: we might have forked.
267         self.socket.close()
268
269
270 class PassiveStream(object):
271     @staticmethod
272     def is_valid_name(name):
273         """Returns True if 'name' is a passive stream name in the form
274         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
275         "punix:"), otherwise False."""
276         return name.startswith("punix:")
277
278     def __init__(self, sock, name, bind_path):
279         self.name = name
280         self.socket = sock
281         self.bind_path = bind_path
282
283     @staticmethod
284     def open(name):
285         """Attempts to start listening for remote stream connections.  'name'
286         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
287         stream class's name and ARGS are stream class-specific.  Currently the
288         only supported TYPE is "punix".
289
290         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
291         new PassiveStream, on failure 'error' is a positive errno value and
292         'pstream' is None."""
293         if not PassiveStream.is_valid_name(name):
294             return errno.EAFNOSUPPORT, None
295
296         bind_path = name[6:]
297         if name.startswith("punix:"):
298             bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
299         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
300                                                        True, bind_path, None)
301         if error:
302             return error, None
303
304         try:
305             sock.listen(10)
306         except socket.error as e:
307             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
308             sock.close()
309             return e.error, None
310
311         return 0, PassiveStream(sock, name, bind_path)
312
313     def close(self):
314         """Closes this PassiveStream."""
315         self.socket.close()
316         if self.bind_path is not None:
317             ovs.fatal_signal.unlink_file_now(self.bind_path)
318             self.bind_path = None
319
320     def accept(self):
321         """Tries to accept a new connection on this passive stream.  Returns
322         (error, stream): if successful, 'error' is 0 and 'stream' is the new
323         Stream object, and on failure 'error' is a positive errno value and
324         'stream' is None.
325
326         Will not block waiting for a connection.  If no connection is ready to
327         be accepted, returns (errno.EAGAIN, None) immediately."""
328
329         while True:
330             try:
331                 sock, addr = self.socket.accept()
332                 ovs.socket_util.set_nonblocking(sock)
333                 return 0, Stream(sock, "unix:%s" % addr, 0)
334             except socket.error as e:
335                 error = ovs.socket_util.get_exception_errno(e)
336                 if error != errno.EAGAIN:
337                     # XXX rate-limit
338                     vlog.dbg("accept: %s" % os.strerror(error))
339                 return error, None
340
341     def wait(self, poller):
342         poller.fd_wait(self.socket, ovs.poller.POLLIN)
343
344     def __del__(self):
345         # Don't delete the file: we might have forked.
346         self.socket.close()
347
348
349 def usage(name):
350     return """
351 Active %s connection methods:
352   unix:FILE               Unix domain socket named FILE
353   tcp:IP:PORT             TCP socket to IP with port no of PORT
354
355 Passive %s connection methods:
356   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
357
358
359 class UnixStream(Stream):
360     @staticmethod
361     def _open(suffix, dscp):
362         connect_path = suffix
363         return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
364                                                 True, None, connect_path)
365 Stream.register_method("unix", UnixStream)
366
367
368 class TCPStream(Stream):
369     @staticmethod
370     def _open(suffix, dscp):
371         error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
372                                                        suffix, 0, dscp)
373         if not error:
374             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
375         return error, sock
376 Stream.register_method("tcp", TCPStream)