ovs.db.idl: Improve error reporting for bad <row-update>s.
[cascardo/ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16
17 import ovs.jsonrpc
18 import ovs.db.parser
19 import ovs.db.schema
20 from ovs.db import error
21 import ovs.ovsuuid
22
23 class Idl:
24     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
25
26     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
27     requests to an OVSDB database server and parses the responses, converting
28     raw JSON into data structures that are easier for clients to digest.
29
30     The IDL also assists with issuing database transactions.  The client
31     creates a transaction, manipulates the IDL data structures, and commits or
32     aborts the transaction.  The IDL then composes and issues the necessary
33     JSON-RPC requests and reports to the client whether the transaction
34     completed successfully.
35
36     If 'schema_cb' is provided, it should be a callback function that accepts
37     an ovs.db.schema.DbSchema as its argument.  It should determine whether the
38     schema is acceptable and raise an ovs.db.error.Error if it is not.  It may
39     also delete any tables or columns from the schema that the client has no
40     interest in monitoring, to save time and bandwidth during monitoring.  Its
41     return value is ignored."""
42
43     def __init__(self, remote, db_name, schema_cb=None):
44         """Creates and returns a connection to the database named 'db_name' on
45         'remote', which should be in a form acceptable to
46         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
47         replica of the remote database."""
48         self.remote = remote
49         self.session = ovs.jsonrpc.Session.open(remote)
50         self.db_name = db_name
51         self.last_seqno = None
52         self.schema = None
53         self.state = None
54         self.change_seqno = 0
55         self.data = {}
56         self.schema_cb = schema_cb
57
58     def close(self):
59         self.session.close()
60
61     def run(self):
62         """Processes a batch of messages from the database server.  Returns
63         True if the database as seen through the IDL changed, False if it did
64         not change.  The initial fetch of the entire contents of the remote
65         database is considered to be one kind of change.
66
67         This function can return occasional false positives, that is, report
68         that the database changed even though it didn't.  This happens if the
69         connection to the database drops and reconnects, which causes the
70         database contents to be reloaded even if they didn't change.  (It could
71         also happen if the database server sends out a "change" that reflects
72         what we already thought was in the database, but the database server is
73         not supposed to do that.)
74
75         As an alternative to checking the return value, the client may check
76         for changes in the value returned by self.get_seqno()."""
77         initial_change_seqno = self.change_seqno
78         self.session.run()
79         if self.session.is_connected():
80             seqno = self.session.get_seqno()
81             if seqno != self.last_seqno:
82                 self.last_seqno = seqno
83                 self.state = (self.__send_schema_request, None)
84             if self.state:
85                 self.state[0]()
86         return initial_change_seqno != self.change_seqno
87
88     def wait(self, poller):
89         """Arranges for poller.block() to wake up when self.run() has something
90         to do or when activity occurs on a transaction on 'self'."""
91         self.session.wait(poller)
92         if self.state and self.state[1]:
93             self.state[1](poller)
94
95     def get_seqno(self):
96         """Returns a number that represents the IDL's state.  When the IDL
97         updated (by self.run()), the return value changes."""
98         return self.change_seqno
99         
100     def __send_schema_request(self):
101         msg = ovs.jsonrpc.Message.create_request("get_schema", [self.db_name])
102         self.session.send(msg)
103         self.state = (lambda: self.__recv_schema(msg.id), self.__recv_wait)
104
105     def __recv_schema(self, id):
106         msg = self.session.recv()
107         if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
108             try:
109                 self.schema = ovs.db.schema.DbSchema.from_json(msg.result)
110             except error.Error, e:
111                 logging.error("%s: parse error in received schema: %s"
112                               % (self.remote, e))
113                 self.__error()
114                 return
115
116             if self.schema_cb:
117                 try:
118                     self.schema_cb(self.schema)
119                 except error.Error, e:
120                     logging.error("%s: error validating schema: %s"
121                                   % (self.remote, e))
122                     self.__error()
123                     return
124
125             self.__send_monitor_request()
126         elif msg:
127             logging.error("%s: unexpected message expecting schema: %s"
128                           % (self.remote, msg))
129             self.__error()
130             
131     def __recv_wait(self, poller):
132         self.session.recv_wait(poller)
133
134     def __send_monitor_request(self):
135         monitor_requests = {}
136         for table in self.schema.tables.itervalues():
137             monitor_requests[table.name] = {"columns": table.columns.keys()}
138         msg = ovs.jsonrpc.Message.create_request(
139             "monitor", [self.db_name, None, monitor_requests])
140         self.session.send(msg)
141         self.state = (lambda: self.__recv_monitor_reply(msg.id),
142                       self.__recv_wait)
143
144     def __recv_monitor_reply(self, id):
145         msg = self.session.recv()
146         if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
147             try:
148                 self.change_seqno += 1
149                 self.state = (self.__recv_update, self.__recv_wait)
150                 self.__clear()
151                 self.__parse_update(msg.result)
152             except error.Error, e:
153                 logging.error("%s: parse error in received schema: %s"
154                               % (self.remote, e))
155                 self.__error()
156         elif msg:
157             logging.error("%s: unexpected message expecting schema: %s"
158                           % (self.remote, msg))
159             self.__error()
160
161     def __recv_update(self):
162         msg = self.session.recv()
163         if (msg and msg.type == ovs.jsonrpc.Message.T_NOTIFY and
164             type(msg.params) == list and len(msg.params) == 2 and
165             msg.params[0] is None):
166             self.__parse_update(msg.params[1])
167         elif msg:
168             logging.error("%s: unexpected message expecting update: %s"
169                           % (self.remote, msg))
170             self.__error()
171
172     def __error(self):
173         self.session.force_reconnect()
174
175     def __parse_update(self, update):
176         try:
177             self.__do_parse_update(update)
178         except error.Error, e:
179             logging.error("%s: error parsing update: %s" % (self.remote, e))
180
181     def __do_parse_update(self, table_updates):
182         if type(table_updates) != dict:
183             raise error.Error("<table-updates> is not an object",
184                               table_updates)
185
186         for table_name, table_update in table_updates.iteritems():
187             table = self.schema.tables.get(table_name)
188             if not table:
189                 raise error.Error('<table-updates> includes unknown '
190                                   'table "%s"' % table_name)
191
192             if type(table_update) != dict:
193                 raise error.Error('<table-update> for table "%s" is not '
194                                   'an object' % table_name, table_update)
195
196             for uuid_string, row_update in table_update.iteritems():
197                 if not ovs.ovsuuid.UUID.is_valid_string(uuid_string):
198                     raise error.Error('<table-update> for table "%s" '
199                                       'contains bad UUID "%s" as member '
200                                       'name' % (table_name, uuid_string),
201                                       table_update)
202                 uuid = ovs.ovsuuid.UUID.from_string(uuid_string)
203
204                 if type(row_update) != dict:
205                     raise error.Error('<table-update> for table "%s" '
206                                       'contains <row-update> for %s that '
207                                       'is not an object'
208                                       % (table_name, uuid_string))
209
210                 parser = ovs.db.parser.Parser(json, "row-update")
211                 old = parser.get_optional("old", [dict])
212                 new = parser.get_optional("new", [dict])
213                 parser.finish()
214
215                 if not old and not new:
216                     raise error.Error('<row-update> missing "old" and '
217                                       '"new" members', row_update)
218
219                 if self.__parse_row_update(table, uuid, old, new):
220                     self.change_seqno += 1
221
222     def __parse_row_update(self, table, uuid, old, new):
223         """Returns True if a column changed, False otherwise."""
224         row = self.data[table.name].get(uuid)
225         if not new:
226             # Delete row.
227             if row:
228                 del self.data[table.name][uuid]
229             else:
230                 # XXX rate-limit
231                 logging.warning("cannot delete missing row %s from table %s"
232                                 % (uuid, table.name))
233                 return False
234         elif not old:
235             # Insert row.
236             if not row:
237                 row = self.__create_row(table, uuid)
238             else:
239                 # XXX rate-limit
240                 logging.warning("cannot add existing row %s to table %s"
241                                 % (uuid, table.name))
242             self.__modify_row(table, row, new)
243         else:
244             if not row:
245                 row = self.__create_row(table, uuid)
246                 # XXX rate-limit
247                 logging.warning("cannot modify missing row %s in table %s"
248                                 % (uuid, table_name))
249             self.__modify_row(table, row, new)
250         return True
251
252     def __modify_row(self, table, row, row_json):
253         changed = False
254         for column_name, datum_json in row_json.iteritems():
255             column = table.columns.get(column_name)
256             if not column:
257                 # XXX rate-limit
258                 logging.warning("unknown column %s updating table %s"
259                                 % (column_name, table.name))
260                 continue
261
262             try:
263                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
264             except error.Error, e:
265                 # XXX rate-limit
266                 logging.warning("error parsing column %s in table %s: %s"
267                                 % (column_name, table_name, e))
268                 continue
269
270             if datum != getattr(row, column_name):
271                 setattr(row, column_name, datum)
272                 changed = True
273             else:
274                 # Didn't really change but the OVSDB monitor protocol always
275                 # includes every value in a row.
276                 pass
277         return changed
278
279     def __clear(self):
280         if self.data != {}:
281             for table_name in self.schema.tables:
282                 if self.data[table_name] != {}:
283                     self.change_seqno += 1
284                     break
285
286         self.data = {}
287         for table_name in self.schema.tables:
288             self.data[table_name] = {}
289
290     def __create_row(self, table, uuid):
291         class Row(object):
292             pass
293         row = self.data[table.name][uuid] = Row()
294         for column in table.columns.itervalues():
295             setattr(row, column.name, ovs.db.data.Datum.default(column.type))
296         return row
297
298     def force_reconnect(self):
299         """Forces the IDL to drop its connection to the database and reconnect.
300         In the meantime, the contents of the IDL will not change."""
301         self.session.force_reconnect()