9744cf24149963094ba784356cd9419b47af3116
[cascardo/ovs.git] / python / ovs / unixctl / server.py
1 # Copyright (c) 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import copy
16 import errno
17 import os
18 import types
19
20 from six.moves import range
21
22 import ovs.dirs
23 import ovs.jsonrpc
24 import ovs.stream
25 import ovs.unixctl
26 import ovs.util
27 import ovs.version
28 import ovs.vlog
29
30 Message = ovs.jsonrpc.Message
31 vlog = ovs.vlog.Vlog("unixctl_server")
32 strtypes = types.StringTypes
33
34
35 class UnixctlConnection(object):
36     def __init__(self, rpc):
37         assert isinstance(rpc, ovs.jsonrpc.Connection)
38         self._rpc = rpc
39         self._request_id = None
40
41     def run(self):
42         self._rpc.run()
43         error = self._rpc.get_status()
44         if error or self._rpc.get_backlog():
45             return error
46
47         for _ in range(10):
48             if error or self._request_id:
49                 break
50
51             error, msg = self._rpc.recv()
52             if msg:
53                 if msg.type == Message.T_REQUEST:
54                     self._process_command(msg)
55                 else:
56                     # XXX: rate-limit
57                     vlog.warn("%s: received unexpected %s message"
58                               % (self._rpc.name,
59                                  Message.type_to_string(msg.type)))
60                     error = errno.EINVAL
61
62             if not error:
63                 error = self._rpc.get_status()
64
65         return error
66
67     def reply(self, body):
68         self._reply_impl(True, body)
69
70     def reply_error(self, body):
71         self._reply_impl(False, body)
72
73     # Called only by unixctl classes.
74     def _close(self):
75         self._rpc.close()
76         self._request_id = None
77
78     def _wait(self, poller):
79         self._rpc.wait(poller)
80         if not self._rpc.get_backlog():
81             self._rpc.recv_wait(poller)
82
83     def _reply_impl(self, success, body):
84         assert isinstance(success, bool)
85         assert body is None or isinstance(body, strtypes)
86
87         assert self._request_id is not None
88
89         if body is None:
90             body = ""
91
92         if body and not body.endswith("\n"):
93             body += "\n"
94
95         if success:
96             reply = Message.create_reply(body, self._request_id)
97         else:
98             reply = Message.create_error(body, self._request_id)
99
100         self._rpc.send(reply)
101         self._request_id = None
102
103     def _process_command(self, request):
104         assert isinstance(request, ovs.jsonrpc.Message)
105         assert request.type == ovs.jsonrpc.Message.T_REQUEST
106
107         self._request_id = request.id
108
109         error = None
110         params = request.params
111         method = request.method
112         command = ovs.unixctl.commands.get(method)
113         if command is None:
114             error = '"%s" is not a valid command' % method
115         elif len(params) < command.min_args:
116             error = '"%s" command requires at least %d arguments' \
117                     % (method, command.min_args)
118         elif len(params) > command.max_args:
119             error = '"%s" command takes at most %d arguments' \
120                     % (method, command.max_args)
121         else:
122             for param in params:
123                 if not isinstance(param, strtypes):
124                     error = '"%s" command has non-string argument' % method
125                     break
126
127             if error is None:
128                 unicode_params = [unicode(p) for p in params]
129                 command.callback(self, unicode_params, command.aux)
130
131         if error:
132             self.reply_error(error)
133
134
135 def _unixctl_version(conn, unused_argv, version):
136     assert isinstance(conn, UnixctlConnection)
137     version = "%s (Open vSwitch) %s" % (ovs.util.PROGRAM_NAME, version)
138     conn.reply(version)
139
140
141 class UnixctlServer(object):
142     def __init__(self, listener):
143         assert isinstance(listener, ovs.stream.PassiveStream)
144         self._listener = listener
145         self._conns = []
146
147     def run(self):
148         for _ in range(10):
149             error, stream = self._listener.accept()
150             if not error:
151                 rpc = ovs.jsonrpc.Connection(stream)
152                 self._conns.append(UnixctlConnection(rpc))
153             elif error == errno.EAGAIN:
154                 break
155             else:
156                 # XXX: rate-limit
157                 vlog.warn("%s: accept failed: %s" % (self._listener.name,
158                                                      os.strerror(error)))
159
160         for conn in copy.copy(self._conns):
161             error = conn.run()
162             if error and error != errno.EAGAIN:
163                 conn._close()
164                 self._conns.remove(conn)
165
166     def wait(self, poller):
167         self._listener.wait(poller)
168         for conn in self._conns:
169             conn._wait(poller)
170
171     def close(self):
172         for conn in self._conns:
173             conn._close()
174         self._conns = None
175
176         self._listener.close()
177         self._listener = None
178
179     @staticmethod
180     def create(path, version=None):
181         """Creates a new UnixctlServer which listens on a unixctl socket
182         created at 'path'.  If 'path' is None, the default path is chosen.
183         'version' contains the version of the server as reported by the unixctl
184         version command.  If None, ovs.version.VERSION is used."""
185
186         assert path is None or isinstance(path, strtypes)
187
188         if path is not None:
189             path = "punix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path)
190         else:
191             path = "punix:%s/%s.%d.ctl" % (ovs.dirs.RUNDIR,
192                                            ovs.util.PROGRAM_NAME, os.getpid())
193
194         if version is None:
195             version = ovs.version.VERSION
196
197         error, listener = ovs.stream.PassiveStream.open(path)
198         if error:
199             ovs.util.ovs_error(error, "could not initialize control socket %s"
200                                % path)
201             return error, None
202
203         ovs.unixctl.command_register("version", "", 0, 0, _unixctl_version,
204                                      version)
205
206         return 0, UnixctlServer(listener)
207
208
209 class UnixctlClient(object):
210     def __init__(self, conn):
211         assert isinstance(conn, ovs.jsonrpc.Connection)
212         self._conn = conn
213
214     def transact(self, command, argv):
215         assert isinstance(command, strtypes)
216         assert isinstance(argv, list)
217         for arg in argv:
218             assert isinstance(arg, strtypes)
219
220         request = Message.create_request(command, argv)
221         error, reply = self._conn.transact_block(request)
222
223         if error:
224             vlog.warn("error communicating with %s: %s"
225                       % (self._conn.name, os.strerror(error)))
226             return error, None, None
227
228         if reply.error is not None:
229             return 0, str(reply.error), None
230         else:
231             assert reply.result is not None
232             return 0, None, str(reply.result)
233
234     def close(self):
235         self._conn.close()
236         self.conn = None
237
238     @staticmethod
239     def create(path):
240         assert isinstance(path, str)
241
242         unix = "unix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path)
243         error, stream = ovs.stream.Stream.open_block(
244             ovs.stream.Stream.open(unix))
245
246         if error:
247             vlog.warn("failed to connect to %s" % path)
248             return error, None
249
250         return 0, UnixctlClient(ovs.jsonrpc.Connection(stream))