1 # Copyright (c) 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
28 Message = ovs.jsonrpc.Message
29 vlog = ovs.vlog.Vlog("unixctl_server")
30 strtypes = types.StringTypes
33 class UnixctlConnection(object):
34 def __init__(self, rpc):
35 assert isinstance(rpc, ovs.jsonrpc.Connection)
37 self._request_id = None
41 error = self._rpc.get_status()
42 if error or self._rpc.get_backlog():
46 if error or self._request_id:
49 error, msg = self._rpc.recv()
51 if msg.type == Message.T_REQUEST:
52 self._process_command(msg)
55 vlog.warn("%s: received unexpected %s message"
57 Message.type_to_string(msg.type)))
61 error = self._rpc.get_status()
65 def reply(self, body):
66 self._reply_impl(True, body)
68 def reply_error(self, body):
69 self._reply_impl(False, body)
71 # Called only by unixctl classes.
74 self._request_id = None
76 def _wait(self, poller):
77 self._rpc.wait(poller)
78 if not self._rpc.get_backlog():
79 self._rpc.recv_wait(poller)
81 def _reply_impl(self, success, body):
82 assert isinstance(success, bool)
83 assert body is None or isinstance(body, strtypes)
85 assert self._request_id is not None
90 if body and not body.endswith("\n"):
94 reply = Message.create_reply(body, self._request_id)
96 reply = Message.create_error(body, self._request_id)
99 self._request_id = None
101 def _process_command(self, request):
102 assert isinstance(request, ovs.jsonrpc.Message)
103 assert request.type == ovs.jsonrpc.Message.T_REQUEST
105 self._request_id = request.id
108 params = request.params
109 method = request.method
110 command = ovs.unixctl.commands.get(method)
112 error = '"%s" is not a valid command' % method
113 elif len(params) < command.min_args:
114 error = '"%s" command requires at least %d arguments' \
115 % (method, command.min_args)
116 elif len(params) > command.max_args:
117 error = '"%s" command takes at most %d arguments' \
118 % (method, command.max_args)
121 if not isinstance(param, strtypes):
122 error = '"%s" command has non-string argument' % method
126 unicode_params = [unicode(p) for p in params]
127 command.callback(self, unicode_params, command.aux)
130 self.reply_error(error)
133 def _unixctl_version(conn, unused_argv, version):
134 assert isinstance(conn, UnixctlConnection)
135 version = "%s (Open vSwitch) %s" % (ovs.util.PROGRAM_NAME, version)
139 class UnixctlServer(object):
140 def __init__(self, listener):
141 assert isinstance(listener, ovs.stream.PassiveStream)
142 self._listener = listener
147 error, stream = self._listener.accept()
149 rpc = ovs.jsonrpc.Connection(stream)
150 self._conns.append(UnixctlConnection(rpc))
151 elif error == errno.EAGAIN:
155 vlog.warn("%s: accept failed: %s" % (self._listener.name,
158 for conn in copy.copy(self._conns):
160 if error and error != errno.EAGAIN:
162 self._conns.remove(conn)
164 def wait(self, poller):
165 self._listener.wait(poller)
166 for conn in self._conns:
170 for conn in self._conns:
174 self._listener.close()
175 self._listener = None
178 def create(path, version=None):
179 """Creates a new UnixctlServer which listens on a unixctl socket
180 created at 'path'. If 'path' is None, the default path is chosen.
181 'version' contains the version of the server as reported by the unixctl
182 version command. If None, ovs.version.VERSION is used."""
184 assert path is None or isinstance(path, strtypes)
187 path = "punix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path)
189 path = "punix:%s/%s.%d.ctl" % (ovs.dirs.RUNDIR,
190 ovs.util.PROGRAM_NAME, os.getpid())
193 version = ovs.version.VERSION
195 error, listener = ovs.stream.PassiveStream.open(path)
197 ovs.util.ovs_error(error, "could not initialize control socket %s"
201 ovs.unixctl.command_register("version", "", 0, 0, _unixctl_version,
204 return 0, UnixctlServer(listener)
207 class UnixctlClient(object):
208 def __init__(self, conn):
209 assert isinstance(conn, ovs.jsonrpc.Connection)
212 def transact(self, command, argv):
213 assert isinstance(command, strtypes)
214 assert isinstance(argv, list)
216 assert isinstance(arg, strtypes)
218 request = Message.create_request(command, argv)
219 error, reply = self._conn.transact_block(request)
222 vlog.warn("error communicating with %s: %s"
223 % (self._conn.name, os.strerror(error)))
224 return error, None, None
226 if reply.error is not None:
227 return 0, str(reply.error), None
229 assert reply.result is not None
230 return 0, None, str(reply.result)
238 assert isinstance(path, str)
240 unix = "unix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path)
241 error, stream = ovs.stream.Stream.open_block(
242 ovs.stream.Stream.open(unix))
245 vlog.warn("failed to connect to %s" % path)
248 return 0, UnixctlClient(ovs.jsonrpc.Connection(stream))