ovsdb.at: Run Python tests for Python 2 and 3.
[cascardo/ovs.git] / python / ovs / jsonrpc.py
1 # Copyright (c) 2010, 2011, 2012, 2013 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17
18 import six
19
20 import ovs.json
21 import ovs.poller
22 import ovs.reconnect
23 import ovs.stream
24 import ovs.timeval
25 import ovs.util
26 import ovs.vlog
27
28 EOF = ovs.util.EOF
29 vlog = ovs.vlog.Vlog("jsonrpc")
30
31
32 class Message(object):
33     T_REQUEST = 0               # Request.
34     T_NOTIFY = 1                # Notification.
35     T_REPLY = 2                 # Successful reply.
36     T_ERROR = 3                 # Error reply.
37
38     __types = {T_REQUEST: "request",
39                T_NOTIFY: "notification",
40                T_REPLY: "reply",
41                T_ERROR: "error"}
42
43     def __init__(self, type_, method, params, result, error, id):
44         self.type = type_
45         self.method = method
46         self.params = params
47         self.result = result
48         self.error = error
49         self.id = id
50
51     _next_id = 0
52
53     @staticmethod
54     def _create_id():
55         this_id = Message._next_id
56         Message._next_id += 1
57         return this_id
58
59     @staticmethod
60     def create_request(method, params):
61         return Message(Message.T_REQUEST, method, params, None, None,
62                        Message._create_id())
63
64     @staticmethod
65     def create_notify(method, params):
66         return Message(Message.T_NOTIFY, method, params, None, None,
67                        None)
68
69     @staticmethod
70     def create_reply(result, id):
71         return Message(Message.T_REPLY, None, None, result, None, id)
72
73     @staticmethod
74     def create_error(error, id):
75         return Message(Message.T_ERROR, None, None, None, error, id)
76
77     @staticmethod
78     def type_to_string(type_):
79         return Message.__types[type_]
80
81     def __validate_arg(self, value, name, must_have):
82         if (value is not None) == (must_have != 0):
83             return None
84         else:
85             type_name = Message.type_to_string(self.type)
86             if must_have:
87                 verb = "must"
88             else:
89                 verb = "must not"
90             return "%s %s have \"%s\"" % (type_name, verb, name)
91
92     def is_valid(self):
93         if self.params is not None and not isinstance(self.params, list):
94             return "\"params\" must be JSON array"
95
96         pattern = {Message.T_REQUEST: 0x11001,
97                    Message.T_NOTIFY: 0x11000,
98                    Message.T_REPLY: 0x00101,
99                    Message.T_ERROR: 0x00011}.get(self.type)
100         if pattern is None:
101             return "invalid JSON-RPC message type %s" % self.type
102
103         return (
104             self.__validate_arg(self.method, "method", pattern & 0x10000) or
105             self.__validate_arg(self.params, "params", pattern & 0x1000) or
106             self.__validate_arg(self.result, "result", pattern & 0x100) or
107             self.__validate_arg(self.error, "error", pattern & 0x10) or
108             self.__validate_arg(self.id, "id", pattern & 0x1))
109
110     @staticmethod
111     def from_json(json):
112         if not isinstance(json, dict):
113             return "message is not a JSON object"
114
115         # Make a copy to avoid modifying the caller's dict.
116         json = dict(json)
117
118         if "method" in json:
119             method = json.pop("method")
120             if not isinstance(method, six.string_types):
121                 return "method is not a JSON string"
122         else:
123             method = None
124
125         params = json.pop("params", None)
126         result = json.pop("result", None)
127         error = json.pop("error", None)
128         id_ = json.pop("id", None)
129         if len(json):
130             return "message has unexpected member \"%s\"" % json.popitem()[0]
131
132         if result is not None:
133             msg_type = Message.T_REPLY
134         elif error is not None:
135             msg_type = Message.T_ERROR
136         elif id_ is not None:
137             msg_type = Message.T_REQUEST
138         else:
139             msg_type = Message.T_NOTIFY
140
141         msg = Message(msg_type, method, params, result, error, id_)
142         validation_error = msg.is_valid()
143         if validation_error is not None:
144             return validation_error
145         else:
146             return msg
147
148     def to_json(self):
149         json = {}
150
151         if self.method is not None:
152             json["method"] = self.method
153
154         if self.params is not None:
155             json["params"] = self.params
156
157         if self.result is not None or self.type == Message.T_ERROR:
158             json["result"] = self.result
159
160         if self.error is not None or self.type == Message.T_REPLY:
161             json["error"] = self.error
162
163         if self.id is not None or self.type == Message.T_NOTIFY:
164             json["id"] = self.id
165
166         return json
167
168     def __str__(self):
169         s = [Message.type_to_string(self.type)]
170         if self.method is not None:
171             s.append("method=\"%s\"" % self.method)
172         if self.params is not None:
173             s.append("params=" + ovs.json.to_string(self.params))
174         if self.result is not None:
175             s.append("result=" + ovs.json.to_string(self.result))
176         if self.error is not None:
177             s.append("error=" + ovs.json.to_string(self.error))
178         if self.id is not None:
179             s.append("id=" + ovs.json.to_string(self.id))
180         return ", ".join(s)
181
182
183 class Connection(object):
184     def __init__(self, stream):
185         self.name = stream.name
186         self.stream = stream
187         self.status = 0
188         self.input = ""
189         self.output = ""
190         self.parser = None
191         self.received_bytes = 0
192
193     def close(self):
194         self.stream.close()
195         self.stream = None
196
197     def run(self):
198         if self.status:
199             return
200
201         while len(self.output):
202             retval = self.stream.send(self.output)
203             if retval >= 0:
204                 self.output = self.output[retval:]
205             else:
206                 if retval != -errno.EAGAIN:
207                     vlog.warn("%s: send error: %s" %
208                               (self.name, os.strerror(-retval)))
209                     self.error(-retval)
210                 break
211
212     def wait(self, poller):
213         if not self.status:
214             self.stream.run_wait(poller)
215             if len(self.output):
216                 self.stream.send_wait(poller)
217
218     def get_status(self):
219         return self.status
220
221     def get_backlog(self):
222         if self.status != 0:
223             return 0
224         else:
225             return len(self.output)
226
227     def get_received_bytes(self):
228         return self.received_bytes
229
230     def __log_msg(self, title, msg):
231         if vlog.dbg_is_enabled():
232             vlog.dbg("%s: %s %s" % (self.name, title, msg))
233
234     def send(self, msg):
235         if self.status:
236             return self.status
237
238         self.__log_msg("send", msg)
239
240         was_empty = len(self.output) == 0
241         self.output += ovs.json.to_string(msg.to_json())
242         if was_empty:
243             self.run()
244         return self.status
245
246     def send_block(self, msg):
247         error = self.send(msg)
248         if error:
249             return error
250
251         while True:
252             self.run()
253             if not self.get_backlog() or self.get_status():
254                 return self.status
255
256             poller = ovs.poller.Poller()
257             self.wait(poller)
258             poller.block()
259
260     def recv(self):
261         if self.status:
262             return self.status, None
263
264         while True:
265             if not self.input:
266                 error, data = self.stream.recv(4096)
267                 # Python 3 has separate types for strings and bytes.  We
268                 # received bytes from a socket.  We expect it to be string
269                 # data, so we convert it here as soon as possible.
270                 if (data and not error
271                         and not isinstance(data, six.string_types)):
272                     try:
273                         data = data.decode('utf-8')
274                     except UnicodeError:
275                         error = errno.EILSEQ
276                 if error:
277                     if error == errno.EAGAIN:
278                         return error, None
279                     else:
280                         # XXX rate-limit
281                         vlog.warn("%s: receive error: %s"
282                                   % (self.name, os.strerror(error)))
283                         self.error(error)
284                         return self.status, None
285                 elif not data:
286                     self.error(EOF)
287                     return EOF, None
288                 else:
289                     self.input += data
290                     self.received_bytes += len(data)
291             else:
292                 if self.parser is None:
293                     self.parser = ovs.json.Parser()
294                 self.input = self.input[self.parser.feed(self.input):]
295                 if self.parser.is_done():
296                     msg = self.__process_msg()
297                     if msg:
298                         return 0, msg
299                     else:
300                         return self.status, None
301
302     def recv_block(self):
303         while True:
304             error, msg = self.recv()
305             if error != errno.EAGAIN:
306                 return error, msg
307
308             self.run()
309
310             poller = ovs.poller.Poller()
311             self.wait(poller)
312             self.recv_wait(poller)
313             poller.block()
314
315     def transact_block(self, request):
316         id_ = request.id
317
318         error = self.send(request)
319         reply = None
320         while not error:
321             error, reply = self.recv_block()
322             if (reply
323                 and (reply.type == Message.T_REPLY
324                      or reply.type == Message.T_ERROR)
325                 and reply.id == id_):
326                 break
327         return error, reply
328
329     def __process_msg(self):
330         json = self.parser.finish()
331         self.parser = None
332         if isinstance(json, six.string_types):
333             # XXX rate-limit
334             vlog.warn("%s: error parsing stream: %s" % (self.name, json))
335             self.error(errno.EPROTO)
336             return
337
338         msg = Message.from_json(json)
339         if not isinstance(msg, Message):
340             # XXX rate-limit
341             vlog.warn("%s: received bad JSON-RPC message: %s"
342                       % (self.name, msg))
343             self.error(errno.EPROTO)
344             return
345
346         self.__log_msg("received", msg)
347         return msg
348
349     def recv_wait(self, poller):
350         if self.status or self.input:
351             poller.immediate_wake()
352         else:
353             self.stream.recv_wait(poller)
354
355     def error(self, error):
356         if self.status == 0:
357             self.status = error
358             self.stream.close()
359             self.output = ""
360
361
362 class Session(object):
363     """A JSON-RPC session with reconnection."""
364
365     def __init__(self, reconnect, rpc):
366         self.reconnect = reconnect
367         self.rpc = rpc
368         self.stream = None
369         self.pstream = None
370         self.seqno = 0
371
372     @staticmethod
373     def open(name):
374         """Creates and returns a Session that maintains a JSON-RPC session to
375         'name', which should be a string acceptable to ovs.stream.Stream or
376         ovs.stream.PassiveStream's initializer.
377
378         If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
379         session connects and reconnects, with back-off, to 'name'.
380
381         If 'name' is a passive connection method, e.g. "ptcp:", the new session
382         listens for connections to 'name'.  It maintains at most one connection
383         at any given time.  Any new connection causes the previous one (if any)
384         to be dropped."""
385         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
386         reconnect.set_name(name)
387         reconnect.enable(ovs.timeval.msec())
388
389         if ovs.stream.PassiveStream.is_valid_name(name):
390             reconnect.set_passive(True, ovs.timeval.msec())
391
392         if not ovs.stream.stream_or_pstream_needs_probes(name):
393             reconnect.set_probe_interval(0)
394
395         return Session(reconnect, None)
396
397     @staticmethod
398     def open_unreliably(jsonrpc):
399         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
400         reconnect.set_quiet(True)
401         reconnect.set_name(jsonrpc.name)
402         reconnect.set_max_tries(0)
403         reconnect.connected(ovs.timeval.msec())
404         return Session(reconnect, jsonrpc)
405
406     def close(self):
407         if self.rpc is not None:
408             self.rpc.close()
409             self.rpc = None
410         if self.stream is not None:
411             self.stream.close()
412             self.stream = None
413         if self.pstream is not None:
414             self.pstream.close()
415             self.pstream = None
416
417     def __disconnect(self):
418         if self.rpc is not None:
419             self.rpc.error(EOF)
420             self.rpc.close()
421             self.rpc = None
422             self.seqno += 1
423         elif self.stream is not None:
424             self.stream.close()
425             self.stream = None
426             self.seqno += 1
427
428     def __connect(self):
429         self.__disconnect()
430
431         name = self.reconnect.get_name()
432         if not self.reconnect.is_passive():
433             error, self.stream = ovs.stream.Stream.open(name)
434             if not error:
435                 self.reconnect.connecting(ovs.timeval.msec())
436             else:
437                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
438         elif self.pstream is not None:
439             error, self.pstream = ovs.stream.PassiveStream.open(name)
440             if not error:
441                 self.reconnect.listening(ovs.timeval.msec())
442             else:
443                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
444
445         self.seqno += 1
446
447     def run(self):
448         if self.pstream is not None:
449             error, stream = self.pstream.accept()
450             if error == 0:
451                 if self.rpc or self.stream:
452                     # XXX rate-limit
453                     vlog.info("%s: new connection replacing active "
454                               "connection" % self.reconnect.get_name())
455                     self.__disconnect()
456                 self.reconnect.connected(ovs.timeval.msec())
457                 self.rpc = Connection(stream)
458             elif error != errno.EAGAIN:
459                 self.reconnect.listen_error(ovs.timeval.msec(), error)
460                 self.pstream.close()
461                 self.pstream = None
462
463         if self.rpc:
464             backlog = self.rpc.get_backlog()
465             self.rpc.run()
466             if self.rpc.get_backlog() < backlog:
467                 # Data previously caught in a queue was successfully sent (or
468                 # there's an error, which we'll catch below).
469                 #
470                 # We don't count data that is successfully sent immediately as
471                 # activity, because there's a lot of queuing downstream from
472                 # us, which means that we can push a lot of data into a
473                 # connection that has stalled and won't ever recover.
474                 self.reconnect.activity(ovs.timeval.msec())
475
476             error = self.rpc.get_status()
477             if error != 0:
478                 self.reconnect.disconnected(ovs.timeval.msec(), error)
479                 self.__disconnect()
480         elif self.stream is not None:
481             self.stream.run()
482             error = self.stream.connect()
483             if error == 0:
484                 self.reconnect.connected(ovs.timeval.msec())
485                 self.rpc = Connection(self.stream)
486                 self.stream = None
487             elif error != errno.EAGAIN:
488                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
489                 self.stream.close()
490                 self.stream = None
491
492         action = self.reconnect.run(ovs.timeval.msec())
493         if action == ovs.reconnect.CONNECT:
494             self.__connect()
495         elif action == ovs.reconnect.DISCONNECT:
496             self.reconnect.disconnected(ovs.timeval.msec(), 0)
497             self.__disconnect()
498         elif action == ovs.reconnect.PROBE:
499             if self.rpc:
500                 request = Message.create_request("echo", [])
501                 request.id = "echo"
502                 self.rpc.send(request)
503         else:
504             assert action is None
505
506     def wait(self, poller):
507         if self.rpc is not None:
508             self.rpc.wait(poller)
509         elif self.stream is not None:
510             self.stream.run_wait(poller)
511             self.stream.connect_wait(poller)
512         if self.pstream is not None:
513             self.pstream.wait(poller)
514         self.reconnect.wait(poller, ovs.timeval.msec())
515
516     def get_backlog(self):
517         if self.rpc is not None:
518             return self.rpc.get_backlog()
519         else:
520             return 0
521
522     def get_name(self):
523         return self.reconnect.get_name()
524
525     def send(self, msg):
526         if self.rpc is not None:
527             return self.rpc.send(msg)
528         else:
529             return errno.ENOTCONN
530
531     def recv(self):
532         if self.rpc is not None:
533             received_bytes = self.rpc.get_received_bytes()
534             error, msg = self.rpc.recv()
535             if received_bytes != self.rpc.get_received_bytes():
536                 # Data was successfully received.
537                 #
538                 # Previously we only counted receiving a full message as
539                 # activity, but with large messages or a slow connection that
540                 # policy could time out the session mid-message.
541                 self.reconnect.activity(ovs.timeval.msec())
542
543             if not error:
544                 if msg.type == Message.T_REQUEST and msg.method == "echo":
545                     # Echo request.  Send reply.
546                     self.send(Message.create_reply(msg.params, msg.id))
547                 elif msg.type == Message.T_REPLY and msg.id == "echo":
548                     # It's a reply to our echo request.  Suppress it.
549                     pass
550                 else:
551                     return msg
552         return None
553
554     def recv_wait(self, poller):
555         if self.rpc is not None:
556             self.rpc.recv_wait(poller)
557
558     def is_alive(self):
559         if self.rpc is not None or self.stream is not None:
560             return True
561         else:
562             max_tries = self.reconnect.get_max_tries()
563             return max_tries is None or max_tries > 0
564
565     def is_connected(self):
566         return self.rpc is not None
567
568     def get_seqno(self):
569         return self.seqno
570
571     def force_reconnect(self):
572         self.reconnect.force_reconnect(ovs.timeval.msec())