1 # Copyright (c) 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from six.moves import range
30 Message = ovs.jsonrpc.Message
31 vlog = ovs.vlog.Vlog("unixctl_server")
32 strtypes = types.StringTypes
35 class UnixctlConnection(object):
36 def __init__(self, rpc):
37 assert isinstance(rpc, ovs.jsonrpc.Connection)
39 self._request_id = None
43 error = self._rpc.get_status()
44 if error or self._rpc.get_backlog():
48 if error or self._request_id:
51 error, msg = self._rpc.recv()
53 if msg.type == Message.T_REQUEST:
54 self._process_command(msg)
57 vlog.warn("%s: received unexpected %s message"
59 Message.type_to_string(msg.type)))
63 error = self._rpc.get_status()
67 def reply(self, body):
68 self._reply_impl(True, body)
70 def reply_error(self, body):
71 self._reply_impl(False, body)
73 # Called only by unixctl classes.
76 self._request_id = None
78 def _wait(self, poller):
79 self._rpc.wait(poller)
80 if not self._rpc.get_backlog():
81 self._rpc.recv_wait(poller)
83 def _reply_impl(self, success, body):
84 assert isinstance(success, bool)
85 assert body is None or isinstance(body, strtypes)
87 assert self._request_id is not None
92 if body and not body.endswith("\n"):
96 reply = Message.create_reply(body, self._request_id)
98 reply = Message.create_error(body, self._request_id)
100 self._rpc.send(reply)
101 self._request_id = None
103 def _process_command(self, request):
104 assert isinstance(request, ovs.jsonrpc.Message)
105 assert request.type == ovs.jsonrpc.Message.T_REQUEST
107 self._request_id = request.id
110 params = request.params
111 method = request.method
112 command = ovs.unixctl.commands.get(method)
114 error = '"%s" is not a valid command' % method
115 elif len(params) < command.min_args:
116 error = '"%s" command requires at least %d arguments' \
117 % (method, command.min_args)
118 elif len(params) > command.max_args:
119 error = '"%s" command takes at most %d arguments' \
120 % (method, command.max_args)
123 if not isinstance(param, strtypes):
124 error = '"%s" command has non-string argument' % method
128 unicode_params = [unicode(p) for p in params]
129 command.callback(self, unicode_params, command.aux)
132 self.reply_error(error)
135 def _unixctl_version(conn, unused_argv, version):
136 assert isinstance(conn, UnixctlConnection)
137 version = "%s (Open vSwitch) %s" % (ovs.util.PROGRAM_NAME, version)
141 class UnixctlServer(object):
142 def __init__(self, listener):
143 assert isinstance(listener, ovs.stream.PassiveStream)
144 self._listener = listener
149 error, stream = self._listener.accept()
151 rpc = ovs.jsonrpc.Connection(stream)
152 self._conns.append(UnixctlConnection(rpc))
153 elif error == errno.EAGAIN:
157 vlog.warn("%s: accept failed: %s" % (self._listener.name,
160 for conn in copy.copy(self._conns):
162 if error and error != errno.EAGAIN:
164 self._conns.remove(conn)
166 def wait(self, poller):
167 self._listener.wait(poller)
168 for conn in self._conns:
172 for conn in self._conns:
176 self._listener.close()
177 self._listener = None
180 def create(path, version=None):
181 """Creates a new UnixctlServer which listens on a unixctl socket
182 created at 'path'. If 'path' is None, the default path is chosen.
183 'version' contains the version of the server as reported by the unixctl
184 version command. If None, ovs.version.VERSION is used."""
186 assert path is None or isinstance(path, strtypes)
189 path = "punix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path)
191 path = "punix:%s/%s.%d.ctl" % (ovs.dirs.RUNDIR,
192 ovs.util.PROGRAM_NAME, os.getpid())
195 version = ovs.version.VERSION
197 error, listener = ovs.stream.PassiveStream.open(path)
199 ovs.util.ovs_error(error, "could not initialize control socket %s"
203 ovs.unixctl.command_register("version", "", 0, 0, _unixctl_version,
206 return 0, UnixctlServer(listener)
209 class UnixctlClient(object):
210 def __init__(self, conn):
211 assert isinstance(conn, ovs.jsonrpc.Connection)
214 def transact(self, command, argv):
215 assert isinstance(command, strtypes)
216 assert isinstance(argv, list)
218 assert isinstance(arg, strtypes)
220 request = Message.create_request(command, argv)
221 error, reply = self._conn.transact_block(request)
224 vlog.warn("error communicating with %s: %s"
225 % (self._conn.name, os.strerror(error)))
226 return error, None, None
228 if reply.error is not None:
229 return 0, str(reply.error), None
231 assert reply.result is not None
232 return 0, None, str(reply.result)
240 assert isinstance(path, str)
242 unix = "unix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path)
243 error, stream = ovs.stream.Stream.open_block(
244 ovs.stream.Stream.open(unix))
247 vlog.warn("failed to connect to %s" % path)
250 return 0, UnixctlClient(ovs.jsonrpc.Connection(stream))