lib: add monitor_cond_change API to C IDL lib
[cascardo/ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import functools
16 import uuid
17
18 import six
19
20 import ovs.jsonrpc
21 import ovs.db.parser
22 import ovs.db.schema
23 from ovs.db import error
24 import ovs.ovsuuid
25 import ovs.poller
26 import ovs.vlog
27
28 vlog = ovs.vlog.Vlog("idl")
29
30 __pychecker__ = 'no-classattr no-objattrs'
31
32 ROW_CREATE = "create"
33 ROW_UPDATE = "update"
34 ROW_DELETE = "delete"
35
36 OVSDB_UPDATE = 0
37 OVSDB_UPDATE2 = 1
38
39
40 class Idl(object):
41     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
42
43     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
44     requests to an OVSDB database server and parses the responses, converting
45     raw JSON into data structures that are easier for clients to digest.
46
47     The IDL also assists with issuing database transactions.  The client
48     creates a transaction, manipulates the IDL data structures, and commits or
49     aborts the transaction.  The IDL then composes and issues the necessary
50     JSON-RPC requests and reports to the client whether the transaction
51     completed successfully.
52
53     The client is allowed to access the following attributes directly, in a
54     read-only fashion:
55
56     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
57       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
58       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
59       to a Row object.
60
61       The client may directly read and write the Row objects referenced by the
62       'rows' map values.  Refer to Row for more details.
63
64     - 'change_seqno': A number that represents the IDL's state.  When the IDL
65       is updated (by Idl.run()), its value changes.  The sequence number can
66       occasionally change even if the database does not.  This happens if the
67       connection to the database drops and reconnects, which causes the
68       database contents to be reloaded even if they didn't change.  (It could
69       also happen if the database server sends out a "change" that reflects
70       what the IDL already thought was in the database.  The database server is
71       not supposed to do that, but bugs could in theory cause it to do so.)
72
73     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
74       if no lock is configured.
75
76     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
77       lock, and False otherwise.
78
79       Locking and unlocking happens asynchronously from the database client's
80       point of view, so the information is only useful for optimization
81       (e.g. if the client doesn't have the lock then there's no point in trying
82       to write to the database).
83
84     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
85       the database server has indicated that some other client already owns the
86       requested lock, and False otherwise.
87
88     - 'txn': The ovs.db.idl.Transaction object for the database transaction
89       currently being constructed, if there is one, or None otherwise.
90 """
91
92     IDL_S_INITIAL = 0
93     IDL_S_MONITOR_REQUESTED = 1
94     IDL_S_MONITOR_COND_REQUESTED = 2
95
96     def __init__(self, remote, schema):
97         """Creates and returns a connection to the database named 'db_name' on
98         'remote', which should be in a form acceptable to
99         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
100         replica of the remote database.
101
102         'schema' should be the schema for the remote database.  The caller may
103         have cut it down by removing tables or columns that are not of
104         interest.  The IDL will only replicate the tables and columns that
105         remain.  The caller may also add a attribute named 'alert' to selected
106         remaining columns, setting its value to False; if so, then changes to
107         those columns will not be considered changes to the database for the
108         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
109         useful for columns that the IDL's client will write but not read.
110
111         As a convenience to users, 'schema' may also be an instance of the
112         SchemaHelper class.
113
114         The IDL uses and modifies 'schema' directly."""
115
116         assert isinstance(schema, SchemaHelper)
117         schema = schema.get_idl_schema()
118
119         self.tables = schema.tables
120         self.readonly = schema.readonly
121         self._db = schema
122         self._session = ovs.jsonrpc.Session.open(remote)
123         self._monitor_request_id = None
124         self._last_seqno = None
125         self.change_seqno = 0
126         self.uuid = uuid.uuid1()
127         self.state = self.IDL_S_INITIAL
128
129         # Database locking.
130         self.lock_name = None          # Name of lock we need, None if none.
131         self.has_lock = False          # Has db server said we have the lock?
132         self.is_lock_contended = False  # Has db server said we can't get lock?
133         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
134
135         # Transaction support.
136         self.txn = None
137         self._outstanding_txns = {}
138
139         for table in six.itervalues(schema.tables):
140             for column in six.itervalues(table.columns):
141                 if not hasattr(column, 'alert'):
142                     column.alert = True
143             table.need_table = False
144             table.rows = {}
145             table.idl = self
146             table.condition = []
147             table.cond_changed = False
148
149     def close(self):
150         """Closes the connection to the database.  The IDL will no longer
151         update."""
152         self._session.close()
153
154     def run(self):
155         """Processes a batch of messages from the database server.  Returns
156         True if the database as seen through the IDL changed, False if it did
157         not change.  The initial fetch of the entire contents of the remote
158         database is considered to be one kind of change.  If the IDL has been
159         configured to acquire a database lock (with Idl.set_lock()), then
160         successfully acquiring the lock is also considered to be a change.
161
162         This function can return occasional false positives, that is, report
163         that the database changed even though it didn't.  This happens if the
164         connection to the database drops and reconnects, which causes the
165         database contents to be reloaded even if they didn't change.  (It could
166         also happen if the database server sends out a "change" that reflects
167         what we already thought was in the database, but the database server is
168         not supposed to do that.)
169
170         As an alternative to checking the return value, the client may check
171         for changes in self.change_seqno."""
172         assert not self.txn
173         initial_change_seqno = self.change_seqno
174
175         self.send_cond_change()
176         self._session.run()
177         i = 0
178         while i < 50:
179             i += 1
180             if not self._session.is_connected():
181                 break
182
183             seqno = self._session.get_seqno()
184             if seqno != self._last_seqno:
185                 self._last_seqno = seqno
186                 self.__txn_abort_all()
187                 self.__send_monitor_request()
188                 if self.lock_name:
189                     self.__send_lock_request()
190                 break
191
192             msg = self._session.recv()
193             if msg is None:
194                 break
195             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
196                     and msg.method == "update2"
197                     and len(msg.params) == 2):
198                 # Database contents changed.
199                 self.__parse_update(msg.params[1], OVSDB_UPDATE2)
200             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
201                     and msg.method == "update"
202                     and len(msg.params) == 2):
203                 # Database contents changed.
204                 self.__parse_update(msg.params[1], OVSDB_UPDATE)
205             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
206                   and self._monitor_request_id is not None
207                   and self._monitor_request_id == msg.id):
208                 # Reply to our "monitor" request.
209                 try:
210                     self.change_seqno += 1
211                     self._monitor_request_id = None
212                     self.__clear()
213                     if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
214                         self.__parse_update(msg.result, OVSDB_UPDATE2)
215                     else:
216                         assert self.state == self.IDL_S_MONITOR_REQUESTED
217                         self.__parse_update(msg.result, OVSDB_UPDATE)
218
219                 except error.Error as e:
220                     vlog.err("%s: parse error in received schema: %s"
221                              % (self._session.get_name(), e))
222                     self.__error()
223             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
224                   and self._lock_request_id is not None
225                   and self._lock_request_id == msg.id):
226                 # Reply to our "lock" request.
227                 self.__parse_lock_reply(msg.result)
228             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
229                   and msg.method == "locked"):
230                 # We got our lock.
231                 self.__parse_lock_notify(msg.params, True)
232             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
233                   and msg.method == "stolen"):
234                 # Someone else stole our lock.
235                 self.__parse_lock_notify(msg.params, False)
236             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
237                 # Reply to our echo request.  Ignore it.
238                 pass
239             elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
240                   self.state == self.IDL_S_MONITOR_COND_REQUESTED and
241                   self._monitor_request_id == msg.id):
242                 if msg.error == "unknown method":
243                     self.__send_monitor_request()
244             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
245                                ovs.jsonrpc.Message.T_REPLY)
246                   and self.__txn_process_reply(msg)):
247                 # __txn_process_reply() did everything needed.
248                 pass
249             else:
250                 # This can happen if a transaction is destroyed before we
251                 # receive the reply, so keep the log level low.
252                 vlog.dbg("%s: received unexpected %s message"
253                          % (self._session.get_name(),
254                              ovs.jsonrpc.Message.type_to_string(msg.type)))
255
256         return initial_change_seqno != self.change_seqno
257
258     def send_cond_change(self):
259         if not self._session.is_connected():
260             return
261
262         for table in six.itervalues(self.tables):
263             if table.cond_changed:
264                 self.__send_cond_change(table, table.condition)
265                 table.cond_changed = False
266
267     def cond_change(self, table_name, add_cmd, cond):
268         """Change conditions for this IDL session. If session is not already
269         connected, add condtion to table and submit it on send_monitor_request.
270         Otherwise  send monitor_cond_change method with the requested
271         changes."""
272
273         table = self.tables.get(table_name)
274         if not table:
275             raise error.Error('Unknown table "%s"' % table_name)
276
277         if add_cmd:
278             table.condition += cond
279         else:
280             for c in cond:
281                 table.condition.remove(c)
282
283         table.cond_changed = True
284
285     def wait(self, poller):
286         """Arranges for poller.block() to wake up when self.run() has something
287         to do or when activity occurs on a transaction on 'self'."""
288         self._session.wait(poller)
289         self._session.recv_wait(poller)
290
291     def has_ever_connected(self):
292         """Returns True, if the IDL successfully connected to the remote
293         database and retrieved its contents (even if the connection
294         subsequently dropped and is in the process of reconnecting).  If so,
295         then the IDL contains an atomic snapshot of the database's contents
296         (but it might be arbitrarily old if the connection dropped).
297
298         Returns False if the IDL has never connected or retrieved the
299         database's contents.  If so, the IDL is empty."""
300         return self.change_seqno != 0
301
302     def force_reconnect(self):
303         """Forces the IDL to drop its connection to the database and reconnect.
304         In the meantime, the contents of the IDL will not change."""
305         self._session.force_reconnect()
306
307     def set_lock(self, lock_name):
308         """If 'lock_name' is not None, configures the IDL to obtain the named
309         lock from the database server and to avoid modifying the database when
310         the lock cannot be acquired (that is, when another client has the same
311         lock).
312
313         If 'lock_name' is None, drops the locking requirement and releases the
314         lock."""
315         assert not self.txn
316         assert not self._outstanding_txns
317
318         if self.lock_name and (not lock_name or lock_name != self.lock_name):
319             # Release previous lock.
320             self.__send_unlock_request()
321             self.lock_name = None
322             self.is_lock_contended = False
323
324         if lock_name and not self.lock_name:
325             # Acquire new lock.
326             self.lock_name = lock_name
327             self.__send_lock_request()
328
329     def notify(self, event, row, updates=None):
330         """Hook for implementing create/update/delete notifications
331
332         :param event:   The event that was triggered
333         :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE
334         :param row:     The row as it is after the operation has occured
335         :type row:      Row
336         :param updates: For updates, row with only updated columns
337         :type updates:  Row
338         """
339
340     def __send_cond_change(self, table, cond):
341         monitor_cond_change = {table.name: [{"where": cond}]}
342         old_uuid = str(self.uuid)
343         self.uuid = uuid.uuid1()
344         params = [old_uuid, str(self.uuid), monitor_cond_change]
345         msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
346         self._session.send(msg)
347
348     def __clear(self):
349         changed = False
350
351         for table in six.itervalues(self.tables):
352             if table.rows:
353                 changed = True
354                 table.rows = {}
355
356         if changed:
357             self.change_seqno += 1
358
359     def __update_has_lock(self, new_has_lock):
360         if new_has_lock and not self.has_lock:
361             if self._monitor_request_id is None:
362                 self.change_seqno += 1
363             else:
364                 # We're waiting for a monitor reply, so don't signal that the
365                 # database changed.  The monitor reply will increment
366                 # change_seqno anyhow.
367                 pass
368             self.is_lock_contended = False
369         self.has_lock = new_has_lock
370
371     def __do_send_lock_request(self, method):
372         self.__update_has_lock(False)
373         self._lock_request_id = None
374         if self._session.is_connected():
375             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
376             msg_id = msg.id
377             self._session.send(msg)
378         else:
379             msg_id = None
380         return msg_id
381
382     def __send_lock_request(self):
383         self._lock_request_id = self.__do_send_lock_request("lock")
384
385     def __send_unlock_request(self):
386         self.__do_send_lock_request("unlock")
387
388     def __parse_lock_reply(self, result):
389         self._lock_request_id = None
390         got_lock = isinstance(result, dict) and result.get("locked") is True
391         self.__update_has_lock(got_lock)
392         if not got_lock:
393             self.is_lock_contended = True
394
395     def __parse_lock_notify(self, params, new_has_lock):
396         if (self.lock_name is not None
397             and isinstance(params, (list, tuple))
398             and params
399             and params[0] == self.lock_name):
400             self.__update_has_lock(new_has_lock)
401             if not new_has_lock:
402                 self.is_lock_contended = True
403
404     def __send_monitor_request(self):
405         if self.state == self.IDL_S_INITIAL:
406             self.state = self.IDL_S_MONITOR_COND_REQUESTED
407             method = "monitor_cond"
408         else:
409             self.state = self.IDL_S_MONITOR_REQUESTED
410             method = "monitor"
411
412         monitor_requests = {}
413         for table in six.itervalues(self.tables):
414             columns = []
415             for column in six.iterkeys(table.columns):
416                 if ((table.name not in self.readonly) or
417                         (table.name in self.readonly) and
418                         (column not in self.readonly[table.name])):
419                     columns.append(column)
420             monitor_requests[table.name] = {"columns": columns}
421             if method == "monitor_cond" and table.cond_changed and \
422                    table.condition:
423                 monitor_requests[table.name]["where"] = table.condition
424                 table.cond_change = False
425
426         msg = ovs.jsonrpc.Message.create_request(
427             method, [self._db.name, str(self.uuid), monitor_requests])
428         self._monitor_request_id = msg.id
429         self._session.send(msg)
430
431     def __parse_update(self, update, version):
432         try:
433             self.__do_parse_update(update, version)
434         except error.Error as e:
435             vlog.err("%s: error parsing update: %s"
436                      % (self._session.get_name(), e))
437
438     def __do_parse_update(self, table_updates, version):
439         if not isinstance(table_updates, dict):
440             raise error.Error("<table-updates> is not an object",
441                               table_updates)
442
443         for table_name, table_update in six.iteritems(table_updates):
444             table = self.tables.get(table_name)
445             if not table:
446                 raise error.Error('<table-updates> includes unknown '
447                                   'table "%s"' % table_name)
448
449             if not isinstance(table_update, dict):
450                 raise error.Error('<table-update> for table "%s" is not '
451                                   'an object' % table_name, table_update)
452
453             for uuid_string, row_update in six.iteritems(table_update):
454                 if not ovs.ovsuuid.is_valid_string(uuid_string):
455                     raise error.Error('<table-update> for table "%s" '
456                                       'contains bad UUID "%s" as member '
457                                       'name' % (table_name, uuid_string),
458                                       table_update)
459                 uuid = ovs.ovsuuid.from_string(uuid_string)
460
461                 if not isinstance(row_update, dict):
462                     raise error.Error('<table-update> for table "%s" '
463                                       'contains <row-update> for %s that '
464                                       'is not an object'
465                                       % (table_name, uuid_string))
466
467                 if version == OVSDB_UPDATE2:
468                     if self.__process_update2(table, uuid, row_update):
469                         self.change_seqno += 1
470                     continue
471
472                 parser = ovs.db.parser.Parser(row_update, "row-update")
473                 old = parser.get_optional("old", [dict])
474                 new = parser.get_optional("new", [dict])
475                 parser.finish()
476
477                 if not old and not new:
478                     raise error.Error('<row-update> missing "old" and '
479                                       '"new" members', row_update)
480
481                 if self.__process_update(table, uuid, old, new):
482                     self.change_seqno += 1
483
484     def __process_update2(self, table, uuid, row_update):
485         row = table.rows.get(uuid)
486         changed = False
487         if "delete" in row_update:
488             if row:
489                 del table.rows[uuid]
490                 self.notify(ROW_DELETE, row)
491                 changed = True
492             else:
493                 # XXX rate-limit
494                 vlog.warn("cannot delete missing row %s from table"
495                           "%s" % (uuid, table.name))
496         elif "insert" in row_update or "initial" in row_update:
497             if row:
498                 vlog.warn("cannot add existing row %s from table"
499                           " %s" % (uuid, table.name))
500                 del table.rows[uuid]
501             row = self.__create_row(table, uuid)
502             if "insert" in row_update:
503                 row_update = row_update['insert']
504             else:
505                 row_update = row_update['initial']
506             self.__add_default(table, row_update)
507             if self.__row_update(table, row, row_update):
508                 changed = True
509                 self.notify(ROW_CREATE, row)
510         elif "modify" in row_update:
511             if not row:
512                 raise error.Error('Modify non-existing row')
513
514             self.__apply_diff(table, row, row_update['modify'])
515             self.notify(ROW_UPDATE, row,
516                         Row.from_json(self, table, uuid, row_update['modify']))
517             changed = True
518         else:
519             raise error.Error('<row-update> unknown operation',
520                               row_update)
521         return changed
522
523     def __process_update(self, table, uuid, old, new):
524         """Returns True if a column changed, False otherwise."""
525         row = table.rows.get(uuid)
526         changed = False
527         if not new:
528             # Delete row.
529             if row:
530                 del table.rows[uuid]
531                 changed = True
532                 self.notify(ROW_DELETE, row)
533             else:
534                 # XXX rate-limit
535                 vlog.warn("cannot delete missing row %s from table %s"
536                           % (uuid, table.name))
537         elif not old:
538             # Insert row.
539             if not row:
540                 row = self.__create_row(table, uuid)
541                 changed = True
542             else:
543                 # XXX rate-limit
544                 vlog.warn("cannot add existing row %s to table %s"
545                           % (uuid, table.name))
546             if self.__row_update(table, row, new):
547                 changed = True
548                 self.notify(ROW_CREATE, row)
549         else:
550             op = ROW_UPDATE
551             if not row:
552                 row = self.__create_row(table, uuid)
553                 changed = True
554                 op = ROW_CREATE
555                 # XXX rate-limit
556                 vlog.warn("cannot modify missing row %s in table %s"
557                           % (uuid, table.name))
558             if self.__row_update(table, row, new):
559                 changed = True
560                 self.notify(op, row, Row.from_json(self, table, uuid, old))
561         return changed
562
563     def __column_name(self, column):
564         if column.type.key.type == ovs.db.types.UuidType:
565             return ovs.ovsuuid.to_json(column.type.key.type.default)
566         else:
567             return column.type.key.type.default
568
569     def __add_default(self, table, row_update):
570         for column in six.itervalues(table.columns):
571             if column.name not in row_update:
572                 if ((table.name not in self.readonly) or
573                         (table.name in self.readonly) and
574                         (column.name not in self.readonly[table.name])):
575                     if column.type.n_min != 0 and not column.type.is_map():
576                         row_update[column.name] = self.__column_name(column)
577
578     def __apply_diff(self, table, row, row_diff):
579         for column_name, datum_json in six.iteritems(row_diff):
580             column = table.columns.get(column_name)
581             if not column:
582                 # XXX rate-limit
583                 vlog.warn("unknown column %s updating table %s"
584                           % (column_name, table.name))
585                 continue
586
587             try:
588                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
589             except error.Error as e:
590                 # XXX rate-limit
591                 vlog.warn("error parsing column %s in table %s: %s"
592                           % (column_name, table.name, e))
593                 continue
594
595             datum = row._data[column_name].diff(datum)
596             if datum != row._data[column_name]:
597                 row._data[column_name] = datum
598
599     def __row_update(self, table, row, row_json):
600         changed = False
601         for column_name, datum_json in six.iteritems(row_json):
602             column = table.columns.get(column_name)
603             if not column:
604                 # XXX rate-limit
605                 vlog.warn("unknown column %s updating table %s"
606                           % (column_name, table.name))
607                 continue
608
609             try:
610                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
611             except error.Error as e:
612                 # XXX rate-limit
613                 vlog.warn("error parsing column %s in table %s: %s"
614                           % (column_name, table.name, e))
615                 continue
616
617             if datum != row._data[column_name]:
618                 row._data[column_name] = datum
619                 if column.alert:
620                     changed = True
621             else:
622                 # Didn't really change but the OVSDB monitor protocol always
623                 # includes every value in a row.
624                 pass
625         return changed
626
627     def __create_row(self, table, uuid):
628         data = {}
629         for column in six.itervalues(table.columns):
630             data[column.name] = ovs.db.data.Datum.default(column.type)
631         row = table.rows[uuid] = Row(self, table, uuid, data)
632         return row
633
634     def __error(self):
635         self._session.force_reconnect()
636
637     def __txn_abort_all(self):
638         while self._outstanding_txns:
639             txn = self._outstanding_txns.popitem()[1]
640             txn._status = Transaction.TRY_AGAIN
641
642     def __txn_process_reply(self, msg):
643         txn = self._outstanding_txns.pop(msg.id, None)
644         if txn:
645             txn._process_reply(msg)
646
647
648 def _uuid_to_row(atom, base):
649     if base.ref_table:
650         return base.ref_table.rows.get(atom)
651     else:
652         return atom
653
654
655 def _row_to_uuid(value):
656     if isinstance(value, Row):
657         return value.uuid
658     else:
659         return value
660
661
662 @functools.total_ordering
663 class Row(object):
664     """A row within an IDL.
665
666     The client may access the following attributes directly:
667
668     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
669
670     - An attribute for each column in the Row's table, named for the column,
671       whose values are as returned by Datum.to_python() for the column's type.
672
673       If some error occurs (e.g. the database server's idea of the column is
674       different from the IDL's idea), then the attribute values is the
675       "default" value return by Datum.default() for the column's type.  (It is
676       important to know this because the default value may violate constraints
677       for the column's type, e.g. the default integer value is 0 even if column
678       contraints require the column's value to be positive.)
679
680       When a transaction is active, column attributes may also be assigned new
681       values.  Committing the transaction will then cause the new value to be
682       stored into the database.
683
684       *NOTE*: In the current implementation, the value of a column is a *copy*
685       of the value in the database.  This means that modifying its value
686       directly will have no useful effect.  For example, the following:
687         row.mycolumn["a"] = "b"              # don't do this
688       will not change anything in the database, even after commit.  To modify
689       the column, instead assign the modified column value back to the column:
690         d = row.mycolumn
691         d["a"] = "b"
692         row.mycolumn = d
693 """
694     def __init__(self, idl, table, uuid, data):
695         # All of the explicit references to self.__dict__ below are required
696         # to set real attributes with invoking self.__getattr__().
697         self.__dict__["uuid"] = uuid
698
699         self.__dict__["_idl"] = idl
700         self.__dict__["_table"] = table
701
702         # _data is the committed data.  It takes the following values:
703         #
704         #   - A dictionary that maps every column name to a Datum, if the row
705         #     exists in the committed form of the database.
706         #
707         #   - None, if this row is newly inserted within the active transaction
708         #     and thus has no committed form.
709         self.__dict__["_data"] = data
710
711         # _changes describes changes to this row within the active transaction.
712         # It takes the following values:
713         #
714         #   - {}, the empty dictionary, if no transaction is active or if the
715         #     row has yet not been changed within this transaction.
716         #
717         #   - A dictionary that maps a column name to its new Datum, if an
718         #     active transaction changes those columns' values.
719         #
720         #   - A dictionary that maps every column name to a Datum, if the row
721         #     is newly inserted within the active transaction.
722         #
723         #   - None, if this transaction deletes this row.
724         self.__dict__["_changes"] = {}
725
726         # A dictionary whose keys are the names of columns that must be
727         # verified as prerequisites when the transaction commits.  The values
728         # in the dictionary are all None.
729         self.__dict__["_prereqs"] = {}
730
731     def __lt__(self, other):
732         if not isinstance(other, Row):
733             return NotImplemented
734         return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
735
736     def __eq__(self, other):
737         if not isinstance(other, Row):
738             return NotImplemented
739         return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
740
741     def __hash__(self):
742         return int(self.__dict__['uuid'])
743
744     def __getattr__(self, column_name):
745         assert self._changes is not None
746
747         datum = self._changes.get(column_name)
748         if datum is None:
749             if self._data is None:
750                 raise AttributeError("%s instance has no attribute '%s'" %
751                                      (self.__class__.__name__, column_name))
752             if column_name in self._data:
753                 datum = self._data[column_name]
754             else:
755                 raise AttributeError("%s instance has no attribute '%s'" %
756                                      (self.__class__.__name__, column_name))
757
758         return datum.to_python(_uuid_to_row)
759
760     def __setattr__(self, column_name, value):
761         assert self._changes is not None
762         assert self._idl.txn
763
764         if ((self._table.name in self._idl.readonly) and
765                 (column_name in self._idl.readonly[self._table.name])):
766             vlog.warn("attempting to write to readonly column %s"
767                       % column_name)
768             return
769
770         column = self._table.columns[column_name]
771         try:
772             datum = ovs.db.data.Datum.from_python(column.type, value,
773                                                   _row_to_uuid)
774         except error.Error as e:
775             # XXX rate-limit
776             vlog.err("attempting to write bad value to column %s (%s)"
777                      % (column_name, e))
778             return
779         self._idl.txn._write(self, column, datum)
780
781     @classmethod
782     def from_json(cls, idl, table, uuid, row_json):
783         data = {}
784         for column_name, datum_json in six.iteritems(row_json):
785             column = table.columns.get(column_name)
786             if not column:
787                 # XXX rate-limit
788                 vlog.warn("unknown column %s in table %s"
789                           % (column_name, table.name))
790                 continue
791             try:
792                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
793             except error.Error as e:
794                 # XXX rate-limit
795                 vlog.warn("error parsing column %s in table %s: %s"
796                           % (column_name, table.name, e))
797                 continue
798             data[column_name] = datum
799         return cls(idl, table, uuid, data)
800
801     def verify(self, column_name):
802         """Causes the original contents of column 'column_name' in this row to
803         be verified as a prerequisite to completing the transaction.  That is,
804         if 'column_name' changed in this row (or if this row was deleted)
805         between the time that the IDL originally read its contents and the time
806         that the transaction commits, then the transaction aborts and
807         Transaction.commit() returns Transaction.TRY_AGAIN.
808
809         The intention is that, to ensure that no transaction commits based on
810         dirty reads, an application should call Row.verify() on each data item
811         read as part of a read-modify-write operation.
812
813         In some cases Row.verify() reduces to a no-op, because the current
814         value of the column is already known:
815
816           - If this row is a row created by the current transaction (returned
817             by Transaction.insert()).
818
819           - If the column has already been modified within the current
820             transaction.
821
822         Because of the latter property, always call Row.verify() *before*
823         modifying the column, for a given read-modify-write.
824
825         A transaction must be in progress."""
826         assert self._idl.txn
827         assert self._changes is not None
828         if not self._data or column_name in self._changes:
829             return
830
831         self._prereqs[column_name] = None
832
833     def delete(self):
834         """Deletes this row from its table.
835
836         A transaction must be in progress."""
837         assert self._idl.txn
838         assert self._changes is not None
839         if self._data is None:
840             del self._idl.txn._txn_rows[self.uuid]
841         else:
842             self._idl.txn._txn_rows[self.uuid] = self
843         self.__dict__["_changes"] = None
844         del self._table.rows[self.uuid]
845
846     def fetch(self, column_name):
847         self._idl.txn._fetch(self, column_name)
848
849     def increment(self, column_name):
850         """Causes the transaction, when committed, to increment the value of
851         'column_name' within this row by 1.  'column_name' must have an integer
852         type.  After the transaction commits successfully, the client may
853         retrieve the final (incremented) value of 'column_name' with
854         Transaction.get_increment_new_value().
855
856         The client could accomplish something similar by reading and writing
857         and verify()ing columns.  However, increment() will never (by itself)
858         cause a transaction to fail because of a verify error.
859
860         The intended use is for incrementing the "next_cfg" column in
861         the Open_vSwitch table."""
862         self._idl.txn._increment(self, column_name)
863
864
865 def _uuid_name_from_uuid(uuid):
866     return "row%s" % str(uuid).replace("-", "_")
867
868
869 def _where_uuid_equals(uuid):
870     return [["_uuid", "==", ["uuid", str(uuid)]]]
871
872
873 class _InsertedRow(object):
874     def __init__(self, op_index):
875         self.op_index = op_index
876         self.real = None
877
878
879 class Transaction(object):
880     """A transaction may modify the contents of a database by modifying the
881     values of columns, deleting rows, inserting rows, or adding checks that
882     columns in the database have not changed ("verify" operations), through
883     Row methods.
884
885     Reading and writing columns and inserting and deleting rows are all
886     straightforward.  The reasons to verify columns are less obvious.
887     Verification is the key to maintaining transactional integrity.  Because
888     OVSDB handles multiple clients, it can happen that between the time that
889     OVSDB client A reads a column and writes a new value, OVSDB client B has
890     written that column.  Client A's write should not ordinarily overwrite
891     client B's, especially if the column in question is a "map" column that
892     contains several more or less independent data items.  If client A adds a
893     "verify" operation before it writes the column, then the transaction fails
894     in case client B modifies it first.  Client A will then see the new value
895     of the column and compose a new transaction based on the new contents
896     written by client B.
897
898     When a transaction is complete, which must be before the next call to
899     Idl.run(), call Transaction.commit() or Transaction.abort().
900
901     The life-cycle of a transaction looks like this:
902
903     1. Create the transaction and record the initial sequence number:
904
905         seqno = idl.change_seqno(idl)
906         txn = Transaction(idl)
907
908     2. Modify the database with Row and Transaction methods.
909
910     3. Commit the transaction by calling Transaction.commit().  The first call
911        to this function probably returns Transaction.INCOMPLETE.  The client
912        must keep calling again along as this remains true, calling Idl.run() in
913        between to let the IDL do protocol processing.  (If the client doesn't
914        have anything else to do in the meantime, it can use
915        Transaction.commit_block() to avoid having to loop itself.)
916
917     4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
918        to change from the saved 'seqno' (it's possible that it's already
919        changed, in which case the client should not wait at all), then start
920        over from step 1.  Only a call to Idl.run() will change the return value
921        of Idl.change_seqno.  (Transaction.commit_block() calls Idl.run().)"""
922
923     # Status values that Transaction.commit() can return.
924
925     # Not yet committed or aborted.
926     UNCOMMITTED = "uncommitted"
927     # Transaction didn't include any changes.
928     UNCHANGED = "unchanged"
929     # Commit in progress, please wait.
930     INCOMPLETE = "incomplete"
931     # ovsdb_idl_txn_abort() called.
932     ABORTED = "aborted"
933     # Commit successful.
934     SUCCESS = "success"
935     # Commit failed because a "verify" operation
936     # reported an inconsistency, due to a network
937     # problem, or other transient failure.  Wait
938     # for a change, then try again.
939     TRY_AGAIN = "try again"
940     # Server hasn't given us the lock yet.
941     NOT_LOCKED = "not locked"
942     # Commit failed due to a hard error.
943     ERROR = "error"
944
945     @staticmethod
946     def status_to_string(status):
947         """Converts one of the status values that Transaction.commit() can
948         return into a human-readable string.
949
950         (The status values are in fact such strings already, so
951         there's nothing to do.)"""
952         return status
953
954     def __init__(self, idl):
955         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
956         A given Idl may only have a single active transaction at a time.
957
958         A Transaction may modify the contents of a database by assigning new
959         values to columns (attributes of Row), deleting rows (with
960         Row.delete()), or inserting rows (with Transaction.insert()).  It may
961         also check that columns in the database have not changed with
962         Row.verify().
963
964         When a transaction is complete (which must be before the next call to
965         Idl.run()), call Transaction.commit() or Transaction.abort()."""
966         assert idl.txn is None
967
968         idl.txn = self
969         self._request_id = None
970         self.idl = idl
971         self.dry_run = False
972         self._txn_rows = {}
973         self._status = Transaction.UNCOMMITTED
974         self._error = None
975         self._comments = []
976
977         self._inc_row = None
978         self._inc_column = None
979
980         self._fetch_requests = []
981
982         self._inserted_rows = {}  # Map from UUID to _InsertedRow
983
984     def add_comment(self, comment):
985         """Appends 'comment' to the comments that will be passed to the OVSDB
986         server when this transaction is committed.  (The comment will be
987         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
988         relatively human-readable form.)"""
989         self._comments.append(comment)
990
991     def wait(self, poller):
992         """Causes poll_block() to wake up if this transaction has completed
993         committing."""
994         if self._status not in (Transaction.UNCOMMITTED,
995                                 Transaction.INCOMPLETE):
996             poller.immediate_wake()
997
998     def _substitute_uuids(self, json):
999         if isinstance(json, (list, tuple)):
1000             if (len(json) == 2
1001                     and json[0] == 'uuid'
1002                     and ovs.ovsuuid.is_valid_string(json[1])):
1003                 uuid = ovs.ovsuuid.from_string(json[1])
1004                 row = self._txn_rows.get(uuid, None)
1005                 if row and row._data is None:
1006                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
1007             else:
1008                 return [self._substitute_uuids(elem) for elem in json]
1009         return json
1010
1011     def __disassemble(self):
1012         self.idl.txn = None
1013
1014         for row in six.itervalues(self._txn_rows):
1015             if row._changes is None:
1016                 row._table.rows[row.uuid] = row
1017             elif row._data is None:
1018                 del row._table.rows[row.uuid]
1019             row.__dict__["_changes"] = {}
1020             row.__dict__["_prereqs"] = {}
1021         self._txn_rows = {}
1022
1023     def commit(self):
1024         """Attempts to commit 'txn'.  Returns the status of the commit
1025         operation, one of the following constants:
1026
1027           Transaction.INCOMPLETE:
1028
1029               The transaction is in progress, but not yet complete.  The caller
1030               should call again later, after calling Idl.run() to let the
1031               IDL do OVSDB protocol processing.
1032
1033           Transaction.UNCHANGED:
1034
1035               The transaction is complete.  (It didn't actually change the
1036               database, so the IDL didn't send any request to the database
1037               server.)
1038
1039           Transaction.ABORTED:
1040
1041               The caller previously called Transaction.abort().
1042
1043           Transaction.SUCCESS:
1044
1045               The transaction was successful.  The update made by the
1046               transaction (and possibly other changes made by other database
1047               clients) should already be visible in the IDL.
1048
1049           Transaction.TRY_AGAIN:
1050
1051               The transaction failed for some transient reason, e.g. because a
1052               "verify" operation reported an inconsistency or due to a network
1053               problem.  The caller should wait for a change to the database,
1054               then compose a new transaction, and commit the new transaction.
1055
1056               Use Idl.change_seqno to wait for a change in the database.  It is
1057               important to use its value *before* the initial call to
1058               Transaction.commit() as the baseline for this purpose, because
1059               the change that one should wait for can happen after the initial
1060               call but before the call that returns Transaction.TRY_AGAIN, and
1061               using some other baseline value in that situation could cause an
1062               indefinite wait if the database rarely changes.
1063
1064           Transaction.NOT_LOCKED:
1065
1066               The transaction failed because the IDL has been configured to
1067               require a database lock (with Idl.set_lock()) but didn't
1068               get it yet or has already lost it.
1069
1070         Committing a transaction rolls back all of the changes that it made to
1071         the IDL's copy of the database.  If the transaction commits
1072         successfully, then the database server will send an update and, thus,
1073         the IDL will be updated with the committed changes."""
1074         # The status can only change if we're the active transaction.
1075         # (Otherwise, our status will change only in Idl.run().)
1076         if self != self.idl.txn:
1077             return self._status
1078
1079         # If we need a lock but don't have it, give up quickly.
1080         if self.idl.lock_name and not self.idl.has_lock:
1081             self._status = Transaction.NOT_LOCKED
1082             self.__disassemble()
1083             return self._status
1084
1085         operations = [self.idl._db.name]
1086
1087         # Assert that we have the required lock (avoiding a race).
1088         if self.idl.lock_name:
1089             operations.append({"op": "assert",
1090                                "lock": self.idl.lock_name})
1091
1092         # Add prerequisites and declarations of new rows.
1093         for row in six.itervalues(self._txn_rows):
1094             if row._prereqs:
1095                 rows = {}
1096                 columns = []
1097                 for column_name in row._prereqs:
1098                     columns.append(column_name)
1099                     rows[column_name] = row._data[column_name].to_json()
1100                 operations.append({"op": "wait",
1101                                    "table": row._table.name,
1102                                    "timeout": 0,
1103                                    "where": _where_uuid_equals(row.uuid),
1104                                    "until": "==",
1105                                    "columns": columns,
1106                                    "rows": [rows]})
1107
1108         # Add updates.
1109         any_updates = False
1110         for row in six.itervalues(self._txn_rows):
1111             if row._changes is None:
1112                 if row._table.is_root:
1113                     operations.append({"op": "delete",
1114                                        "table": row._table.name,
1115                                        "where": _where_uuid_equals(row.uuid)})
1116                     any_updates = True
1117                 else:
1118                     # Let ovsdb-server decide whether to really delete it.
1119                     pass
1120             elif row._changes:
1121                 op = {"table": row._table.name}
1122                 if row._data is None:
1123                     op["op"] = "insert"
1124                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
1125                     any_updates = True
1126
1127                     op_index = len(operations) - 1
1128                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
1129                 else:
1130                     op["op"] = "update"
1131                     op["where"] = _where_uuid_equals(row.uuid)
1132
1133                 row_json = {}
1134                 op["row"] = row_json
1135
1136                 for column_name, datum in six.iteritems(row._changes):
1137                     if row._data is not None or not datum.is_default():
1138                         row_json[column_name] = (
1139                             self._substitute_uuids(datum.to_json()))
1140
1141                         # If anything really changed, consider it an update.
1142                         # We can't suppress not-really-changed values earlier
1143                         # or transactions would become nonatomic (see the big
1144                         # comment inside Transaction._write()).
1145                         if (not any_updates and row._data is not None and
1146                                 row._data[column_name] != datum):
1147                             any_updates = True
1148
1149                 if row._data is None or row_json:
1150                     operations.append(op)
1151
1152         if self._fetch_requests:
1153             for fetch in self._fetch_requests:
1154                 fetch["index"] = len(operations) - 1
1155                 operations.append({"op": "select",
1156                                    "table": fetch["row"]._table.name,
1157                                    "where": self._substitute_uuids(
1158                                        _where_uuid_equals(fetch["row"].uuid)),
1159                                    "columns": [fetch["column_name"]]})
1160             any_updates = True
1161
1162         # Add increment.
1163         if self._inc_row and any_updates:
1164             self._inc_index = len(operations) - 1
1165
1166             operations.append({"op": "mutate",
1167                                "table": self._inc_row._table.name,
1168                                "where": self._substitute_uuids(
1169                                    _where_uuid_equals(self._inc_row.uuid)),
1170                                "mutations": [[self._inc_column, "+=", 1]]})
1171             operations.append({"op": "select",
1172                                "table": self._inc_row._table.name,
1173                                "where": self._substitute_uuids(
1174                                    _where_uuid_equals(self._inc_row.uuid)),
1175                                "columns": [self._inc_column]})
1176
1177         # Add comment.
1178         if self._comments:
1179             operations.append({"op": "comment",
1180                                "comment": "\n".join(self._comments)})
1181
1182         # Dry run?
1183         if self.dry_run:
1184             operations.append({"op": "abort"})
1185
1186         if not any_updates:
1187             self._status = Transaction.UNCHANGED
1188         else:
1189             msg = ovs.jsonrpc.Message.create_request("transact", operations)
1190             self._request_id = msg.id
1191             if not self.idl._session.send(msg):
1192                 self.idl._outstanding_txns[self._request_id] = self
1193                 self._status = Transaction.INCOMPLETE
1194             else:
1195                 self._status = Transaction.TRY_AGAIN
1196
1197         self.__disassemble()
1198         return self._status
1199
1200     def commit_block(self):
1201         """Attempts to commit this transaction, blocking until the commit
1202         either succeeds or fails.  Returns the final commit status, which may
1203         be any Transaction.* value other than Transaction.INCOMPLETE.
1204
1205         This function calls Idl.run() on this transaction'ss IDL, so it may
1206         cause Idl.change_seqno to change."""
1207         while True:
1208             status = self.commit()
1209             if status != Transaction.INCOMPLETE:
1210                 return status
1211
1212             self.idl.run()
1213
1214             poller = ovs.poller.Poller()
1215             self.idl.wait(poller)
1216             self.wait(poller)
1217             poller.block()
1218
1219     def get_increment_new_value(self):
1220         """Returns the final (incremented) value of the column in this
1221         transaction that was set to be incremented by Row.increment.  This
1222         transaction must have committed successfully."""
1223         assert self._status == Transaction.SUCCESS
1224         return self._inc_new_value
1225
1226     def abort(self):
1227         """Aborts this transaction.  If Transaction.commit() has already been
1228         called then the transaction might get committed anyhow."""
1229         self.__disassemble()
1230         if self._status in (Transaction.UNCOMMITTED,
1231                             Transaction.INCOMPLETE):
1232             self._status = Transaction.ABORTED
1233
1234     def get_error(self):
1235         """Returns a string representing this transaction's current status,
1236         suitable for use in log messages."""
1237         if self._status != Transaction.ERROR:
1238             return Transaction.status_to_string(self._status)
1239         elif self._error:
1240             return self._error
1241         else:
1242             return "no error details available"
1243
1244     def __set_error_json(self, json):
1245         if self._error is None:
1246             self._error = ovs.json.to_string(json)
1247
1248     def get_insert_uuid(self, uuid):
1249         """Finds and returns the permanent UUID that the database assigned to a
1250         newly inserted row, given the UUID that Transaction.insert() assigned
1251         locally to that row.
1252
1253         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1254         or if it was assigned by that function and then deleted by Row.delete()
1255         within the same transaction.  (Rows that are inserted and then deleted
1256         within a single transaction are never sent to the database server, so
1257         it never assigns them a permanent UUID.)
1258
1259         This transaction must have completed successfully."""
1260         assert self._status in (Transaction.SUCCESS,
1261                                 Transaction.UNCHANGED)
1262         inserted_row = self._inserted_rows.get(uuid)
1263         if inserted_row:
1264             return inserted_row.real
1265         return None
1266
1267     def _increment(self, row, column):
1268         assert not self._inc_row
1269         self._inc_row = row
1270         self._inc_column = column
1271
1272     def _fetch(self, row, column_name):
1273         self._fetch_requests.append({"row": row, "column_name": column_name})
1274
1275     def _write(self, row, column, datum):
1276         assert row._changes is not None
1277
1278         txn = row._idl.txn
1279
1280         # If this is a write-only column and the datum being written is the
1281         # same as the one already there, just skip the update entirely.  This
1282         # is worth optimizing because we have a lot of columns that get
1283         # periodically refreshed into the database but don't actually change
1284         # that often.
1285         #
1286         # We don't do this for read/write columns because that would break
1287         # atomicity of transactions--some other client might have written a
1288         # different value in that column since we read it.  (But if a whole
1289         # transaction only does writes of existing values, without making any
1290         # real changes, we will drop the whole transaction later in
1291         # ovsdb_idl_txn_commit().)
1292         if (not column.alert and row._data and
1293                 row._data.get(column.name) == datum):
1294             new_value = row._changes.get(column.name)
1295             if new_value is None or new_value == datum:
1296                 return
1297
1298         txn._txn_rows[row.uuid] = row
1299         row._changes[column.name] = datum.copy()
1300
1301     def insert(self, table, new_uuid=None):
1302         """Inserts and returns a new row in 'table', which must be one of the
1303         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1304
1305         The new row is assigned a provisional UUID.  If 'uuid' is None then one
1306         is randomly generated; otherwise 'uuid' should specify a randomly
1307         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
1308         different UUID when 'txn' is committed, but the IDL will replace any
1309         uses of the provisional UUID in the data to be to be committed by the
1310         UUID assigned by ovsdb-server."""
1311         assert self._status == Transaction.UNCOMMITTED
1312         if new_uuid is None:
1313             new_uuid = uuid.uuid4()
1314         row = Row(self.idl, table, new_uuid, None)
1315         table.rows[row.uuid] = row
1316         self._txn_rows[row.uuid] = row
1317         return row
1318
1319     def _process_reply(self, msg):
1320         if msg.type == ovs.jsonrpc.Message.T_ERROR:
1321             self._status = Transaction.ERROR
1322         elif not isinstance(msg.result, (list, tuple)):
1323             # XXX rate-limit
1324             vlog.warn('reply to "transact" is not JSON array')
1325         else:
1326             hard_errors = False
1327             soft_errors = False
1328             lock_errors = False
1329
1330             ops = msg.result
1331             for op in ops:
1332                 if op is None:
1333                     # This isn't an error in itself but indicates that some
1334                     # prior operation failed, so make sure that we know about
1335                     # it.
1336                     soft_errors = True
1337                 elif isinstance(op, dict):
1338                     error = op.get("error")
1339                     if error is not None:
1340                         if error == "timed out":
1341                             soft_errors = True
1342                         elif error == "not owner":
1343                             lock_errors = True
1344                         elif error == "aborted":
1345                             pass
1346                         else:
1347                             hard_errors = True
1348                             self.__set_error_json(op)
1349                 else:
1350                     hard_errors = True
1351                     self.__set_error_json(op)
1352                     # XXX rate-limit
1353                     vlog.warn("operation reply is not JSON null or object")
1354
1355             if not soft_errors and not hard_errors and not lock_errors:
1356                 if self._inc_row and not self.__process_inc_reply(ops):
1357                     hard_errors = True
1358                 if self._fetch_requests:
1359                     if self.__process_fetch_reply(ops):
1360                         self.idl.change_seqno += 1
1361                     else:
1362                         hard_errors = True
1363
1364                 for insert in six.itervalues(self._inserted_rows):
1365                     if not self.__process_insert_reply(insert, ops):
1366                         hard_errors = True
1367
1368             if hard_errors:
1369                 self._status = Transaction.ERROR
1370             elif lock_errors:
1371                 self._status = Transaction.NOT_LOCKED
1372             elif soft_errors:
1373                 self._status = Transaction.TRY_AGAIN
1374             else:
1375                 self._status = Transaction.SUCCESS
1376
1377     @staticmethod
1378     def __check_json_type(json, types, name):
1379         if not json:
1380             # XXX rate-limit
1381             vlog.warn("%s is missing" % name)
1382             return False
1383         elif not isinstance(json, tuple(types)):
1384             # XXX rate-limit
1385             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1386             return False
1387         else:
1388             return True
1389
1390     def __process_fetch_reply(self, ops):
1391         update = False
1392         for fetch_request in self._fetch_requests:
1393             row = fetch_request["row"]
1394             column_name = fetch_request["column_name"]
1395             index = fetch_request["index"]
1396             table = row._table
1397
1398             select = ops[index]
1399             fetched_rows = select.get("rows")
1400             if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1401                                                  '"select" reply "rows"'):
1402                 return False
1403             if len(fetched_rows) != 1:
1404                 # XXX rate-limit
1405                 vlog.warn('"select" reply "rows" has %d elements '
1406                           'instead of 1' % len(fetched_rows))
1407                 continue
1408             fetched_row = fetched_rows[0]
1409             if not Transaction.__check_json_type(fetched_row, (dict,),
1410                                                  '"select" reply row'):
1411                 continue
1412
1413             column = table.columns.get(column_name)
1414             datum_json = fetched_row.get(column_name)
1415             datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1416
1417             row._data[column_name] = datum
1418             update = True
1419
1420         return update
1421
1422     def __process_inc_reply(self, ops):
1423         if self._inc_index + 2 > len(ops):
1424             # XXX rate-limit
1425             vlog.warn("reply does not contain enough operations for "
1426                       "increment (has %d, needs %d)" %
1427                       (len(ops), self._inc_index + 2))
1428
1429         # We know that this is a JSON object because the loop in
1430         # __process_reply() already checked.
1431         mutate = ops[self._inc_index]
1432         count = mutate.get("count")
1433         if not Transaction.__check_json_type(count, six.integer_types,
1434                                              '"mutate" reply "count"'):
1435             return False
1436         if count != 1:
1437             # XXX rate-limit
1438             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1439             return False
1440
1441         select = ops[self._inc_index + 1]
1442         rows = select.get("rows")
1443         if not Transaction.__check_json_type(rows, (list, tuple),
1444                                              '"select" reply "rows"'):
1445             return False
1446         if len(rows) != 1:
1447             # XXX rate-limit
1448             vlog.warn('"select" reply "rows" has %d elements '
1449                       'instead of 1' % len(rows))
1450             return False
1451         row = rows[0]
1452         if not Transaction.__check_json_type(row, (dict,),
1453                                              '"select" reply row'):
1454             return False
1455         column = row.get(self._inc_column)
1456         if not Transaction.__check_json_type(column, six.integer_types,
1457                                              '"select" reply inc column'):
1458             return False
1459         self._inc_new_value = column
1460         return True
1461
1462     def __process_insert_reply(self, insert, ops):
1463         if insert.op_index >= len(ops):
1464             # XXX rate-limit
1465             vlog.warn("reply does not contain enough operations "
1466                       "for insert (has %d, needs %d)"
1467                       % (len(ops), insert.op_index))
1468             return False
1469
1470         # We know that this is a JSON object because the loop in
1471         # __process_reply() already checked.
1472         reply = ops[insert.op_index]
1473         json_uuid = reply.get("uuid")
1474         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1475                                              '"insert" reply "uuid"'):
1476             return False
1477
1478         try:
1479             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1480         except error.Error:
1481             # XXX rate-limit
1482             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1483             return False
1484
1485         insert.real = uuid_
1486         return True
1487
1488
1489 class SchemaHelper(object):
1490     """IDL Schema helper.
1491
1492     This class encapsulates the logic required to generate schemas suitable
1493     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1494     they are interested in using register_columns().  When finished, the
1495     get_idl_schema() function may be called.
1496
1497     The location on disk of the schema used may be found in the
1498     'schema_location' variable."""
1499
1500     def __init__(self, location=None, schema_json=None):
1501         """Creates a new Schema object.
1502
1503         'location' file path to ovs schema. None means default location
1504         'schema_json' schema in json preresentation in memory
1505         """
1506
1507         if location and schema_json:
1508             raise ValueError("both location and schema_json can't be "
1509                              "specified. it's ambiguous.")
1510         if schema_json is None:
1511             if location is None:
1512                 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1513             schema_json = ovs.json.from_file(location)
1514
1515         self.schema_json = schema_json
1516         self._tables = {}
1517         self._readonly = {}
1518         self._all = False
1519
1520     def register_columns(self, table, columns, readonly=[]):
1521         """Registers interest in the given 'columns' of 'table'.  Future calls
1522         to get_idl_schema() will include 'table':column for each column in
1523         'columns'. This function automatically avoids adding duplicate entries
1524         to the schema.
1525         A subset of 'columns' can be specified as 'readonly'. The readonly
1526         columns are not replicated but can be fetched on-demand by the user
1527         with Row.fetch().
1528
1529         'table' must be a string.
1530         'columns' must be a list of strings.
1531         'readonly' must be a list of strings.
1532         """
1533
1534         assert isinstance(table, six.string_types)
1535         assert isinstance(columns, list)
1536
1537         columns = set(columns) | self._tables.get(table, set())
1538         self._tables[table] = columns
1539         self._readonly[table] = readonly
1540
1541     def register_table(self, table):
1542         """Registers interest in the given all columns of 'table'. Future calls
1543         to get_idl_schema() will include all columns of 'table'.
1544
1545         'table' must be a string
1546         """
1547         assert isinstance(table, six.string_types)
1548         self._tables[table] = set()  # empty set means all columns in the table
1549
1550     def register_all(self):
1551         """Registers interest in every column of every table."""
1552         self._all = True
1553
1554     def get_idl_schema(self):
1555         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1556         object based on columns registered using the register_columns()
1557         function."""
1558
1559         schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1560         self.schema_json = None
1561
1562         if not self._all:
1563             schema_tables = {}
1564             for table, columns in six.iteritems(self._tables):
1565                 schema_tables[table] = (
1566                     self._keep_table_columns(schema, table, columns))
1567
1568             schema.tables = schema_tables
1569         schema.readonly = self._readonly
1570         return schema
1571
1572     def _keep_table_columns(self, schema, table_name, columns):
1573         assert table_name in schema.tables
1574         table = schema.tables[table_name]
1575
1576         if not columns:
1577             # empty set means all columns in the table
1578             return table
1579
1580         new_columns = {}
1581         for column_name in columns:
1582             assert isinstance(column_name, six.string_types)
1583             assert column_name in table.columns
1584
1585             new_columns[column_name] = table.columns[column_name]
1586
1587         table.columns = new_columns
1588         return table