1 # Copyright (c) 2010, 2011, 2012, 2013 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
29 vlog = ovs.vlog.Vlog("jsonrpc")
32 class Message(object):
33 T_REQUEST = 0 # Request.
34 T_NOTIFY = 1 # Notification.
35 T_REPLY = 2 # Successful reply.
36 T_ERROR = 3 # Error reply.
38 __types = {T_REQUEST: "request",
39 T_NOTIFY: "notification",
43 def __init__(self, type_, method, params, result, error, id):
55 this_id = Message._next_id
60 def create_request(method, params):
61 return Message(Message.T_REQUEST, method, params, None, None,
65 def create_notify(method, params):
66 return Message(Message.T_NOTIFY, method, params, None, None,
70 def create_reply(result, id):
71 return Message(Message.T_REPLY, None, None, result, None, id)
74 def create_error(error, id):
75 return Message(Message.T_ERROR, None, None, None, error, id)
78 def type_to_string(type_):
79 return Message.__types[type_]
81 def __validate_arg(self, value, name, must_have):
82 if (value is not None) == (must_have != 0):
85 type_name = Message.type_to_string(self.type)
90 return "%s %s have \"%s\"" % (type_name, verb, name)
93 if self.params is not None and not isinstance(self.params, list):
94 return "\"params\" must be JSON array"
96 pattern = {Message.T_REQUEST: 0x11001,
97 Message.T_NOTIFY: 0x11000,
98 Message.T_REPLY: 0x00101,
99 Message.T_ERROR: 0x00011}.get(self.type)
101 return "invalid JSON-RPC message type %s" % self.type
104 self.__validate_arg(self.method, "method", pattern & 0x10000) or
105 self.__validate_arg(self.params, "params", pattern & 0x1000) or
106 self.__validate_arg(self.result, "result", pattern & 0x100) or
107 self.__validate_arg(self.error, "error", pattern & 0x10) or
108 self.__validate_arg(self.id, "id", pattern & 0x1))
112 if not isinstance(json, dict):
113 return "message is not a JSON object"
115 # Make a copy to avoid modifying the caller's dict.
119 method = json.pop("method")
120 if not isinstance(method, six.string_types):
121 return "method is not a JSON string"
125 params = json.pop("params", None)
126 result = json.pop("result", None)
127 error = json.pop("error", None)
128 id_ = json.pop("id", None)
130 return "message has unexpected member \"%s\"" % json.popitem()[0]
132 if result is not None:
133 msg_type = Message.T_REPLY
134 elif error is not None:
135 msg_type = Message.T_ERROR
136 elif id_ is not None:
137 msg_type = Message.T_REQUEST
139 msg_type = Message.T_NOTIFY
141 msg = Message(msg_type, method, params, result, error, id_)
142 validation_error = msg.is_valid()
143 if validation_error is not None:
144 return validation_error
151 if self.method is not None:
152 json["method"] = self.method
154 if self.params is not None:
155 json["params"] = self.params
157 if self.result is not None or self.type == Message.T_ERROR:
158 json["result"] = self.result
160 if self.error is not None or self.type == Message.T_REPLY:
161 json["error"] = self.error
163 if self.id is not None or self.type == Message.T_NOTIFY:
169 s = [Message.type_to_string(self.type)]
170 if self.method is not None:
171 s.append("method=\"%s\"" % self.method)
172 if self.params is not None:
173 s.append("params=" + ovs.json.to_string(self.params))
174 if self.result is not None:
175 s.append("result=" + ovs.json.to_string(self.result))
176 if self.error is not None:
177 s.append("error=" + ovs.json.to_string(self.error))
178 if self.id is not None:
179 s.append("id=" + ovs.json.to_string(self.id))
183 class Connection(object):
184 def __init__(self, stream):
185 self.name = stream.name
191 self.received_bytes = 0
201 while len(self.output):
202 retval = self.stream.send(self.output)
204 self.output = self.output[retval:]
206 if retval != -errno.EAGAIN:
207 vlog.warn("%s: send error: %s" %
208 (self.name, os.strerror(-retval)))
212 def wait(self, poller):
214 self.stream.run_wait(poller)
216 self.stream.send_wait(poller)
218 def get_status(self):
221 def get_backlog(self):
225 return len(self.output)
227 def get_received_bytes(self):
228 return self.received_bytes
230 def __log_msg(self, title, msg):
231 if vlog.dbg_is_enabled():
232 vlog.dbg("%s: %s %s" % (self.name, title, msg))
238 self.__log_msg("send", msg)
240 was_empty = len(self.output) == 0
241 self.output += ovs.json.to_string(msg.to_json())
246 def send_block(self, msg):
247 error = self.send(msg)
253 if not self.get_backlog() or self.get_status():
256 poller = ovs.poller.Poller()
262 return self.status, None
266 error, data = self.stream.recv(4096)
267 # Python 3 has separate types for strings and bytes. We
268 # received bytes from a socket. We expect it to be string
269 # data, so we convert it here as soon as possible.
270 if (data and not error
271 and not isinstance(data, six.string_types)):
273 data = data.decode('utf-8')
277 if error == errno.EAGAIN:
281 vlog.warn("%s: receive error: %s"
282 % (self.name, os.strerror(error)))
284 return self.status, None
290 self.received_bytes += len(data)
292 if self.parser is None:
293 self.parser = ovs.json.Parser()
294 self.input = self.input[self.parser.feed(self.input):]
295 if self.parser.is_done():
296 msg = self.__process_msg()
300 return self.status, None
302 def recv_block(self):
304 error, msg = self.recv()
305 if error != errno.EAGAIN:
310 poller = ovs.poller.Poller()
312 self.recv_wait(poller)
315 def transact_block(self, request):
318 error = self.send(request)
321 error, reply = self.recv_block()
323 and (reply.type == Message.T_REPLY
324 or reply.type == Message.T_ERROR)
325 and reply.id == id_):
329 def __process_msg(self):
330 json = self.parser.finish()
332 if isinstance(json, six.string_types):
334 vlog.warn("%s: error parsing stream: %s" % (self.name, json))
335 self.error(errno.EPROTO)
338 msg = Message.from_json(json)
339 if not isinstance(msg, Message):
341 vlog.warn("%s: received bad JSON-RPC message: %s"
343 self.error(errno.EPROTO)
346 self.__log_msg("received", msg)
349 def recv_wait(self, poller):
350 if self.status or self.input:
351 poller.immediate_wake()
353 self.stream.recv_wait(poller)
355 def error(self, error):
362 class Session(object):
363 """A JSON-RPC session with reconnection."""
365 def __init__(self, reconnect, rpc):
366 self.reconnect = reconnect
374 """Creates and returns a Session that maintains a JSON-RPC session to
375 'name', which should be a string acceptable to ovs.stream.Stream or
376 ovs.stream.PassiveStream's initializer.
378 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
379 session connects and reconnects, with back-off, to 'name'.
381 If 'name' is a passive connection method, e.g. "ptcp:", the new session
382 listens for connections to 'name'. It maintains at most one connection
383 at any given time. Any new connection causes the previous one (if any)
385 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
386 reconnect.set_name(name)
387 reconnect.enable(ovs.timeval.msec())
389 if ovs.stream.PassiveStream.is_valid_name(name):
390 reconnect.set_passive(True, ovs.timeval.msec())
392 if not ovs.stream.stream_or_pstream_needs_probes(name):
393 reconnect.set_probe_interval(0)
395 return Session(reconnect, None)
398 def open_unreliably(jsonrpc):
399 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
400 reconnect.set_quiet(True)
401 reconnect.set_name(jsonrpc.name)
402 reconnect.set_max_tries(0)
403 reconnect.connected(ovs.timeval.msec())
404 return Session(reconnect, jsonrpc)
407 if self.rpc is not None:
410 if self.stream is not None:
413 if self.pstream is not None:
417 def __disconnect(self):
418 if self.rpc is not None:
423 elif self.stream is not None:
431 name = self.reconnect.get_name()
432 if not self.reconnect.is_passive():
433 error, self.stream = ovs.stream.Stream.open(name)
435 self.reconnect.connecting(ovs.timeval.msec())
437 self.reconnect.connect_failed(ovs.timeval.msec(), error)
438 elif self.pstream is not None:
439 error, self.pstream = ovs.stream.PassiveStream.open(name)
441 self.reconnect.listening(ovs.timeval.msec())
443 self.reconnect.connect_failed(ovs.timeval.msec(), error)
448 if self.pstream is not None:
449 error, stream = self.pstream.accept()
451 if self.rpc or self.stream:
453 vlog.info("%s: new connection replacing active "
454 "connection" % self.reconnect.get_name())
456 self.reconnect.connected(ovs.timeval.msec())
457 self.rpc = Connection(stream)
458 elif error != errno.EAGAIN:
459 self.reconnect.listen_error(ovs.timeval.msec(), error)
464 backlog = self.rpc.get_backlog()
466 if self.rpc.get_backlog() < backlog:
467 # Data previously caught in a queue was successfully sent (or
468 # there's an error, which we'll catch below).
470 # We don't count data that is successfully sent immediately as
471 # activity, because there's a lot of queuing downstream from
472 # us, which means that we can push a lot of data into a
473 # connection that has stalled and won't ever recover.
474 self.reconnect.activity(ovs.timeval.msec())
476 error = self.rpc.get_status()
478 self.reconnect.disconnected(ovs.timeval.msec(), error)
480 elif self.stream is not None:
482 error = self.stream.connect()
484 self.reconnect.connected(ovs.timeval.msec())
485 self.rpc = Connection(self.stream)
487 elif error != errno.EAGAIN:
488 self.reconnect.connect_failed(ovs.timeval.msec(), error)
492 action = self.reconnect.run(ovs.timeval.msec())
493 if action == ovs.reconnect.CONNECT:
495 elif action == ovs.reconnect.DISCONNECT:
496 self.reconnect.disconnected(ovs.timeval.msec(), 0)
498 elif action == ovs.reconnect.PROBE:
500 request = Message.create_request("echo", [])
502 self.rpc.send(request)
504 assert action is None
506 def wait(self, poller):
507 if self.rpc is not None:
508 self.rpc.wait(poller)
509 elif self.stream is not None:
510 self.stream.run_wait(poller)
511 self.stream.connect_wait(poller)
512 if self.pstream is not None:
513 self.pstream.wait(poller)
514 self.reconnect.wait(poller, ovs.timeval.msec())
516 def get_backlog(self):
517 if self.rpc is not None:
518 return self.rpc.get_backlog()
523 return self.reconnect.get_name()
526 if self.rpc is not None:
527 return self.rpc.send(msg)
529 return errno.ENOTCONN
532 if self.rpc is not None:
533 received_bytes = self.rpc.get_received_bytes()
534 error, msg = self.rpc.recv()
535 if received_bytes != self.rpc.get_received_bytes():
536 # Data was successfully received.
538 # Previously we only counted receiving a full message as
539 # activity, but with large messages or a slow connection that
540 # policy could time out the session mid-message.
541 self.reconnect.activity(ovs.timeval.msec())
544 if msg.type == Message.T_REQUEST and msg.method == "echo":
545 # Echo request. Send reply.
546 self.send(Message.create_reply(msg.params, msg.id))
547 elif msg.type == Message.T_REPLY and msg.id == "echo":
548 # It's a reply to our echo request. Suppress it.
554 def recv_wait(self, poller):
555 if self.rpc is not None:
556 self.rpc.recv_wait(poller)
559 if self.rpc is not None or self.stream is not None:
562 max_tries = self.reconnect.get_max_tries()
563 return max_tries is None or max_tries > 0
565 def is_connected(self):
566 return self.rpc is not None
571 def force_reconnect(self):
572 self.reconnect.force_reconnect(ovs.timeval.msec())