python: move Python idl to work with monitor_cond
[cascardo/ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import functools
16 import uuid
17
18 import six
19
20 import ovs.jsonrpc
21 import ovs.db.parser
22 import ovs.db.schema
23 from ovs.db import error
24 import ovs.ovsuuid
25 import ovs.poller
26 import ovs.vlog
27
28 vlog = ovs.vlog.Vlog("idl")
29
30 __pychecker__ = 'no-classattr no-objattrs'
31
32 ROW_CREATE = "create"
33 ROW_UPDATE = "update"
34 ROW_DELETE = "delete"
35
36 OVSDB_UPDATE = 0
37 OVSDB_UPDATE2 = 1
38
39
40 class Idl(object):
41     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
42
43     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
44     requests to an OVSDB database server and parses the responses, converting
45     raw JSON into data structures that are easier for clients to digest.
46
47     The IDL also assists with issuing database transactions.  The client
48     creates a transaction, manipulates the IDL data structures, and commits or
49     aborts the transaction.  The IDL then composes and issues the necessary
50     JSON-RPC requests and reports to the client whether the transaction
51     completed successfully.
52
53     The client is allowed to access the following attributes directly, in a
54     read-only fashion:
55
56     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
57       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
58       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
59       to a Row object.
60
61       The client may directly read and write the Row objects referenced by the
62       'rows' map values.  Refer to Row for more details.
63
64     - 'change_seqno': A number that represents the IDL's state.  When the IDL
65       is updated (by Idl.run()), its value changes.  The sequence number can
66       occasionally change even if the database does not.  This happens if the
67       connection to the database drops and reconnects, which causes the
68       database contents to be reloaded even if they didn't change.  (It could
69       also happen if the database server sends out a "change" that reflects
70       what the IDL already thought was in the database.  The database server is
71       not supposed to do that, but bugs could in theory cause it to do so.)
72
73     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
74       if no lock is configured.
75
76     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
77       lock, and False otherwise.
78
79       Locking and unlocking happens asynchronously from the database client's
80       point of view, so the information is only useful for optimization
81       (e.g. if the client doesn't have the lock then there's no point in trying
82       to write to the database).
83
84     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
85       the database server has indicated that some other client already owns the
86       requested lock, and False otherwise.
87
88     - 'txn': The ovs.db.idl.Transaction object for the database transaction
89       currently being constructed, if there is one, or None otherwise.
90 """
91
92     IDL_S_INITIAL = 0
93     IDL_S_MONITOR_REQUESTED = 1
94     IDL_S_MONITOR_COND_REQUESTED = 2
95
96     def __init__(self, remote, schema):
97         """Creates and returns a connection to the database named 'db_name' on
98         'remote', which should be in a form acceptable to
99         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
100         replica of the remote database.
101
102         'schema' should be the schema for the remote database.  The caller may
103         have cut it down by removing tables or columns that are not of
104         interest.  The IDL will only replicate the tables and columns that
105         remain.  The caller may also add a attribute named 'alert' to selected
106         remaining columns, setting its value to False; if so, then changes to
107         those columns will not be considered changes to the database for the
108         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
109         useful for columns that the IDL's client will write but not read.
110
111         As a convenience to users, 'schema' may also be an instance of the
112         SchemaHelper class.
113
114         The IDL uses and modifies 'schema' directly."""
115
116         assert isinstance(schema, SchemaHelper)
117         schema = schema.get_idl_schema()
118
119         self.tables = schema.tables
120         self.readonly = schema.readonly
121         self._db = schema
122         self._session = ovs.jsonrpc.Session.open(remote)
123         self._monitor_request_id = None
124         self._last_seqno = None
125         self.change_seqno = 0
126         self.uuid = uuid.uuid1()
127         self.state = self.IDL_S_INITIAL
128
129         # Database locking.
130         self.lock_name = None          # Name of lock we need, None if none.
131         self.has_lock = False          # Has db server said we have the lock?
132         self.is_lock_contended = False  # Has db server said we can't get lock?
133         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
134
135         # Transaction support.
136         self.txn = None
137         self._outstanding_txns = {}
138
139         for table in six.itervalues(schema.tables):
140             for column in six.itervalues(table.columns):
141                 if not hasattr(column, 'alert'):
142                     column.alert = True
143             table.need_table = False
144             table.rows = {}
145             table.idl = self
146             table.condition = []
147
148     def close(self):
149         """Closes the connection to the database.  The IDL will no longer
150         update."""
151         self._session.close()
152
153     def run(self):
154         """Processes a batch of messages from the database server.  Returns
155         True if the database as seen through the IDL changed, False if it did
156         not change.  The initial fetch of the entire contents of the remote
157         database is considered to be one kind of change.  If the IDL has been
158         configured to acquire a database lock (with Idl.set_lock()), then
159         successfully acquiring the lock is also considered to be a change.
160
161         This function can return occasional false positives, that is, report
162         that the database changed even though it didn't.  This happens if the
163         connection to the database drops and reconnects, which causes the
164         database contents to be reloaded even if they didn't change.  (It could
165         also happen if the database server sends out a "change" that reflects
166         what we already thought was in the database, but the database server is
167         not supposed to do that.)
168
169         As an alternative to checking the return value, the client may check
170         for changes in self.change_seqno."""
171         assert not self.txn
172         initial_change_seqno = self.change_seqno
173         self._session.run()
174         i = 0
175         while i < 50:
176             i += 1
177             if not self._session.is_connected():
178                 break
179
180             seqno = self._session.get_seqno()
181             if seqno != self._last_seqno:
182                 self._last_seqno = seqno
183                 self.__txn_abort_all()
184                 self.__send_monitor_request()
185                 if self.lock_name:
186                     self.__send_lock_request()
187                 break
188
189             msg = self._session.recv()
190             if msg is None:
191                 break
192             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
193                     and msg.method == "update2"
194                     and len(msg.params) == 2):
195                 # Database contents changed.
196                 self.__parse_update(msg.params[1], OVSDB_UPDATE2)
197             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
198                     and msg.method == "update"
199                     and len(msg.params) == 2):
200                 # Database contents changed.
201                 self.__parse_update(msg.params[1], OVSDB_UPDATE)
202             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
203                   and self._monitor_request_id is not None
204                   and self._monitor_request_id == msg.id):
205                 # Reply to our "monitor" request.
206                 try:
207                     self.change_seqno += 1
208                     self._monitor_request_id = None
209                     self.__clear()
210                     if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
211                         self.__parse_update(msg.result, OVSDB_UPDATE2)
212                     else:
213                         assert self.state == self.IDL_S_MONITOR_REQUESTED
214                         self.__parse_update(msg.result, OVSDB_UPDATE)
215
216                 except error.Error as e:
217                     vlog.err("%s: parse error in received schema: %s"
218                              % (self._session.get_name(), e))
219                     self.__error()
220             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
221                   and self._lock_request_id is not None
222                   and self._lock_request_id == msg.id):
223                 # Reply to our "lock" request.
224                 self.__parse_lock_reply(msg.result)
225             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
226                   and msg.method == "locked"):
227                 # We got our lock.
228                 self.__parse_lock_notify(msg.params, True)
229             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
230                   and msg.method == "stolen"):
231                 # Someone else stole our lock.
232                 self.__parse_lock_notify(msg.params, False)
233             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
234                 # Reply to our echo request.  Ignore it.
235                 pass
236             elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
237                   self.state == self.IDL_S_MONITOR_COND_REQUESTED and
238                   self._monitor_request_id == msg.id):
239                 if msg.error == "unknown method":
240                     self.__send_monitor_request()
241             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
242                                ovs.jsonrpc.Message.T_REPLY)
243                   and self.__txn_process_reply(msg)):
244                 # __txn_process_reply() did everything needed.
245                 pass
246             else:
247                 # This can happen if a transaction is destroyed before we
248                 # receive the reply, so keep the log level low.
249                 vlog.dbg("%s: received unexpected %s message"
250                          % (self._session.get_name(),
251                              ovs.jsonrpc.Message.type_to_string(msg.type)))
252
253         return initial_change_seqno != self.change_seqno
254
255     def cond_change(self, table_name, cond):
256         """Change conditions for this IDL session. If session is not already
257         connected, add condtion to table and submit it on send_monitor_request.
258         Otherwise  send monitor_cond_change method with the requested
259         changes."""
260         table = self.tables.get(table_name)
261         if not table:
262             raise error.Error('Unknown table "%s"' % table_name)
263         if self._session.is_connected():
264             self.__send_cond_change(table, cond)
265         else:
266             table.condition = cond
267
268     def wait(self, poller):
269         """Arranges for poller.block() to wake up when self.run() has something
270         to do or when activity occurs on a transaction on 'self'."""
271         self._session.wait(poller)
272         self._session.recv_wait(poller)
273
274     def has_ever_connected(self):
275         """Returns True, if the IDL successfully connected to the remote
276         database and retrieved its contents (even if the connection
277         subsequently dropped and is in the process of reconnecting).  If so,
278         then the IDL contains an atomic snapshot of the database's contents
279         (but it might be arbitrarily old if the connection dropped).
280
281         Returns False if the IDL has never connected or retrieved the
282         database's contents.  If so, the IDL is empty."""
283         return self.change_seqno != 0
284
285     def force_reconnect(self):
286         """Forces the IDL to drop its connection to the database and reconnect.
287         In the meantime, the contents of the IDL will not change."""
288         self._session.force_reconnect()
289
290     def set_lock(self, lock_name):
291         """If 'lock_name' is not None, configures the IDL to obtain the named
292         lock from the database server and to avoid modifying the database when
293         the lock cannot be acquired (that is, when another client has the same
294         lock).
295
296         If 'lock_name' is None, drops the locking requirement and releases the
297         lock."""
298         assert not self.txn
299         assert not self._outstanding_txns
300
301         if self.lock_name and (not lock_name or lock_name != self.lock_name):
302             # Release previous lock.
303             self.__send_unlock_request()
304             self.lock_name = None
305             self.is_lock_contended = False
306
307         if lock_name and not self.lock_name:
308             # Acquire new lock.
309             self.lock_name = lock_name
310             self.__send_lock_request()
311
312     def notify(self, event, row, updates=None):
313         """Hook for implementing create/update/delete notifications
314
315         :param event:   The event that was triggered
316         :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE
317         :param row:     The row as it is after the operation has occured
318         :type row:      Row
319         :param updates: For updates, row with only updated columns
320         :type updates:  Row
321         """
322
323     def __send_cond_change(self, table, cond):
324         monitor_cond_change = {table.name: [{"where": cond}]}
325         old_uuid = str(self.uuid)
326         self.uuid = uuid.uuid1()
327         params = [old_uuid, str(self.uuid), monitor_cond_change]
328         msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
329         self._session.send(msg)
330
331     def __clear(self):
332         changed = False
333
334         for table in six.itervalues(self.tables):
335             if table.rows:
336                 changed = True
337                 table.rows = {}
338
339         if changed:
340             self.change_seqno += 1
341
342     def __update_has_lock(self, new_has_lock):
343         if new_has_lock and not self.has_lock:
344             if self._monitor_request_id is None:
345                 self.change_seqno += 1
346             else:
347                 # We're waiting for a monitor reply, so don't signal that the
348                 # database changed.  The monitor reply will increment
349                 # change_seqno anyhow.
350                 pass
351             self.is_lock_contended = False
352         self.has_lock = new_has_lock
353
354     def __do_send_lock_request(self, method):
355         self.__update_has_lock(False)
356         self._lock_request_id = None
357         if self._session.is_connected():
358             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
359             msg_id = msg.id
360             self._session.send(msg)
361         else:
362             msg_id = None
363         return msg_id
364
365     def __send_lock_request(self):
366         self._lock_request_id = self.__do_send_lock_request("lock")
367
368     def __send_unlock_request(self):
369         self.__do_send_lock_request("unlock")
370
371     def __parse_lock_reply(self, result):
372         self._lock_request_id = None
373         got_lock = isinstance(result, dict) and result.get("locked") is True
374         self.__update_has_lock(got_lock)
375         if not got_lock:
376             self.is_lock_contended = True
377
378     def __parse_lock_notify(self, params, new_has_lock):
379         if (self.lock_name is not None
380             and isinstance(params, (list, tuple))
381             and params
382             and params[0] == self.lock_name):
383             self.__update_has_lock(new_has_lock)
384             if not new_has_lock:
385                 self.is_lock_contended = True
386
387     def __send_monitor_request(self):
388         if self.state == self.IDL_S_INITIAL:
389             self.state = self.IDL_S_MONITOR_COND_REQUESTED
390             method = "monitor_cond"
391         else:
392             self.state = self.IDL_S_MONITOR_REQUESTED
393             method = "monitor"
394
395         monitor_requests = {}
396         for table in six.itervalues(self.tables):
397             columns = []
398             for column in six.iterkeys(table.columns):
399                 if ((table.name not in self.readonly) or
400                         (table.name in self.readonly) and
401                         (column not in self.readonly[table.name])):
402                     columns.append(column)
403             monitor_requests[table.name] = {"columns": columns}
404             if method == "monitor_cond" and table.condition:
405                 monitor_requests[table.name]["where"] = table.condition
406                 table.condition = None
407
408         msg = ovs.jsonrpc.Message.create_request(
409             method, [self._db.name, str(self.uuid), monitor_requests])
410         self._monitor_request_id = msg.id
411         self._session.send(msg)
412
413     def __parse_update(self, update, version):
414         try:
415             self.__do_parse_update(update, version)
416         except error.Error as e:
417             vlog.err("%s: error parsing update: %s"
418                      % (self._session.get_name(), e))
419
420     def __do_parse_update(self, table_updates, version):
421         if not isinstance(table_updates, dict):
422             raise error.Error("<table-updates> is not an object",
423                               table_updates)
424
425         for table_name, table_update in six.iteritems(table_updates):
426             table = self.tables.get(table_name)
427             if not table:
428                 raise error.Error('<table-updates> includes unknown '
429                                   'table "%s"' % table_name)
430
431             if not isinstance(table_update, dict):
432                 raise error.Error('<table-update> for table "%s" is not '
433                                   'an object' % table_name, table_update)
434
435             for uuid_string, row_update in six.iteritems(table_update):
436                 if not ovs.ovsuuid.is_valid_string(uuid_string):
437                     raise error.Error('<table-update> for table "%s" '
438                                       'contains bad UUID "%s" as member '
439                                       'name' % (table_name, uuid_string),
440                                       table_update)
441                 uuid = ovs.ovsuuid.from_string(uuid_string)
442
443                 if not isinstance(row_update, dict):
444                     raise error.Error('<table-update> for table "%s" '
445                                       'contains <row-update> for %s that '
446                                       'is not an object'
447                                       % (table_name, uuid_string))
448
449                 if version == OVSDB_UPDATE2:
450                     if self.__process_update2(table, uuid, row_update):
451                         self.change_seqno += 1
452                     continue
453
454                 parser = ovs.db.parser.Parser(row_update, "row-update")
455                 old = parser.get_optional("old", [dict])
456                 new = parser.get_optional("new", [dict])
457                 parser.finish()
458
459                 if not old and not new:
460                     raise error.Error('<row-update> missing "old" and '
461                                       '"new" members', row_update)
462
463                 if self.__process_update(table, uuid, old, new):
464                     self.change_seqno += 1
465
466     def __process_update2(self, table, uuid, row_update):
467         row = table.rows.get(uuid)
468         changed = False
469         if "delete" in row_update:
470             if row:
471                 del table.rows[uuid]
472                 self.notify(ROW_DELETE, row)
473                 changed = True
474             else:
475                 # XXX rate-limit
476                 vlog.warn("cannot delete missing row %s from table"
477                           "%s" % (uuid, table.name))
478         elif "insert" in row_update or "initial" in row_update:
479             if row:
480                 vlog.warn("cannot add existing row %s from table"
481                           " %s" % (uuid, table.name))
482                 del table.rows[uuid]
483             row = self.__create_row(table, uuid)
484             if "insert" in row_update:
485                 row_update = row_update['insert']
486             else:
487                 row_update = row_update['initial']
488             self.__add_default(table, row_update)
489             if self.__row_update(table, row, row_update):
490                 changed = True
491                 self.notify(ROW_CREATE, row)
492         elif "modify" in row_update:
493             if not row:
494                 raise error.Error('Modify non-existing row')
495
496             self.__apply_diff(table, row, row_update['modify'])
497             self.notify(ROW_UPDATE, row,
498                         Row.from_json(self, table, uuid, row_update['modify']))
499             changed = True
500         else:
501             raise error.Error('<row-update> unknown operation',
502                               row_update)
503         return changed
504
505     def __process_update(self, table, uuid, old, new):
506         """Returns True if a column changed, False otherwise."""
507         row = table.rows.get(uuid)
508         changed = False
509         if not new:
510             # Delete row.
511             if row:
512                 del table.rows[uuid]
513                 changed = True
514                 self.notify(ROW_DELETE, row)
515             else:
516                 # XXX rate-limit
517                 vlog.warn("cannot delete missing row %s from table %s"
518                           % (uuid, table.name))
519         elif not old:
520             # Insert row.
521             if not row:
522                 row = self.__create_row(table, uuid)
523                 changed = True
524             else:
525                 # XXX rate-limit
526                 vlog.warn("cannot add existing row %s to table %s"
527                           % (uuid, table.name))
528             if self.__row_update(table, row, new):
529                 changed = True
530                 self.notify(ROW_CREATE, row)
531         else:
532             op = ROW_UPDATE
533             if not row:
534                 row = self.__create_row(table, uuid)
535                 changed = True
536                 op = ROW_CREATE
537                 # XXX rate-limit
538                 vlog.warn("cannot modify missing row %s in table %s"
539                           % (uuid, table.name))
540             if self.__row_update(table, row, new):
541                 changed = True
542                 self.notify(op, row, Row.from_json(self, table, uuid, old))
543         return changed
544
545     def __column_name(self, column):
546         if column.type.key.type == ovs.db.types.UuidType:
547             return ovs.ovsuuid.to_json(column.type.key.type.default)
548         else:
549             return column.type.key.type.default
550
551     def __add_default(self, table, row_update):
552         for column in six.itervalues(table.columns):
553             if column.name not in row_update:
554                 if ((table.name not in self.readonly) or
555                         (table.name in self.readonly) and
556                         (column.name not in self.readonly[table.name])):
557                     if column.type.n_min != 0 and not column.type.is_map():
558                         row_update[column.name] = self.__column_name(column)
559
560     def __apply_diff(self, table, row, row_diff):
561         for column_name, datum_json in six.iteritems(row_diff):
562             column = table.columns.get(column_name)
563             if not column:
564                 # XXX rate-limit
565                 vlog.warn("unknown column %s updating table %s"
566                           % (column_name, table.name))
567                 continue
568
569             try:
570                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
571             except error.Error as e:
572                 # XXX rate-limit
573                 vlog.warn("error parsing column %s in table %s: %s"
574                           % (column_name, table.name, e))
575                 continue
576
577             datum = row._data[column_name].diff(datum)
578             if datum != row._data[column_name]:
579                 row._data[column_name] = datum
580
581     def __row_update(self, table, row, row_json):
582         changed = False
583         for column_name, datum_json in six.iteritems(row_json):
584             column = table.columns.get(column_name)
585             if not column:
586                 # XXX rate-limit
587                 vlog.warn("unknown column %s updating table %s"
588                           % (column_name, table.name))
589                 continue
590
591             try:
592                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
593             except error.Error as e:
594                 # XXX rate-limit
595                 vlog.warn("error parsing column %s in table %s: %s"
596                           % (column_name, table.name, e))
597                 continue
598
599             if datum != row._data[column_name]:
600                 row._data[column_name] = datum
601                 if column.alert:
602                     changed = True
603             else:
604                 # Didn't really change but the OVSDB monitor protocol always
605                 # includes every value in a row.
606                 pass
607         return changed
608
609     def __create_row(self, table, uuid):
610         data = {}
611         for column in six.itervalues(table.columns):
612             data[column.name] = ovs.db.data.Datum.default(column.type)
613         row = table.rows[uuid] = Row(self, table, uuid, data)
614         return row
615
616     def __error(self):
617         self._session.force_reconnect()
618
619     def __txn_abort_all(self):
620         while self._outstanding_txns:
621             txn = self._outstanding_txns.popitem()[1]
622             txn._status = Transaction.TRY_AGAIN
623
624     def __txn_process_reply(self, msg):
625         txn = self._outstanding_txns.pop(msg.id, None)
626         if txn:
627             txn._process_reply(msg)
628
629
630 def _uuid_to_row(atom, base):
631     if base.ref_table:
632         return base.ref_table.rows.get(atom)
633     else:
634         return atom
635
636
637 def _row_to_uuid(value):
638     if isinstance(value, Row):
639         return value.uuid
640     else:
641         return value
642
643
644 @functools.total_ordering
645 class Row(object):
646     """A row within an IDL.
647
648     The client may access the following attributes directly:
649
650     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
651
652     - An attribute for each column in the Row's table, named for the column,
653       whose values are as returned by Datum.to_python() for the column's type.
654
655       If some error occurs (e.g. the database server's idea of the column is
656       different from the IDL's idea), then the attribute values is the
657       "default" value return by Datum.default() for the column's type.  (It is
658       important to know this because the default value may violate constraints
659       for the column's type, e.g. the default integer value is 0 even if column
660       contraints require the column's value to be positive.)
661
662       When a transaction is active, column attributes may also be assigned new
663       values.  Committing the transaction will then cause the new value to be
664       stored into the database.
665
666       *NOTE*: In the current implementation, the value of a column is a *copy*
667       of the value in the database.  This means that modifying its value
668       directly will have no useful effect.  For example, the following:
669         row.mycolumn["a"] = "b"              # don't do this
670       will not change anything in the database, even after commit.  To modify
671       the column, instead assign the modified column value back to the column:
672         d = row.mycolumn
673         d["a"] = "b"
674         row.mycolumn = d
675 """
676     def __init__(self, idl, table, uuid, data):
677         # All of the explicit references to self.__dict__ below are required
678         # to set real attributes with invoking self.__getattr__().
679         self.__dict__["uuid"] = uuid
680
681         self.__dict__["_idl"] = idl
682         self.__dict__["_table"] = table
683
684         # _data is the committed data.  It takes the following values:
685         #
686         #   - A dictionary that maps every column name to a Datum, if the row
687         #     exists in the committed form of the database.
688         #
689         #   - None, if this row is newly inserted within the active transaction
690         #     and thus has no committed form.
691         self.__dict__["_data"] = data
692
693         # _changes describes changes to this row within the active transaction.
694         # It takes the following values:
695         #
696         #   - {}, the empty dictionary, if no transaction is active or if the
697         #     row has yet not been changed within this transaction.
698         #
699         #   - A dictionary that maps a column name to its new Datum, if an
700         #     active transaction changes those columns' values.
701         #
702         #   - A dictionary that maps every column name to a Datum, if the row
703         #     is newly inserted within the active transaction.
704         #
705         #   - None, if this transaction deletes this row.
706         self.__dict__["_changes"] = {}
707
708         # A dictionary whose keys are the names of columns that must be
709         # verified as prerequisites when the transaction commits.  The values
710         # in the dictionary are all None.
711         self.__dict__["_prereqs"] = {}
712
713     def __lt__(self, other):
714         if not isinstance(other, Row):
715             return NotImplemented
716         return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
717
718     def __eq__(self, other):
719         if not isinstance(other, Row):
720             return NotImplemented
721         return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
722
723     def __hash__(self):
724         return int(self.__dict__['uuid'])
725
726     def __getattr__(self, column_name):
727         assert self._changes is not None
728
729         datum = self._changes.get(column_name)
730         if datum is None:
731             if self._data is None:
732                 raise AttributeError("%s instance has no attribute '%s'" %
733                                      (self.__class__.__name__, column_name))
734             if column_name in self._data:
735                 datum = self._data[column_name]
736             else:
737                 raise AttributeError("%s instance has no attribute '%s'" %
738                                      (self.__class__.__name__, column_name))
739
740         return datum.to_python(_uuid_to_row)
741
742     def __setattr__(self, column_name, value):
743         assert self._changes is not None
744         assert self._idl.txn
745
746         if ((self._table.name in self._idl.readonly) and
747                 (column_name in self._idl.readonly[self._table.name])):
748             vlog.warn("attempting to write to readonly column %s"
749                       % column_name)
750             return
751
752         column = self._table.columns[column_name]
753         try:
754             datum = ovs.db.data.Datum.from_python(column.type, value,
755                                                   _row_to_uuid)
756         except error.Error as e:
757             # XXX rate-limit
758             vlog.err("attempting to write bad value to column %s (%s)"
759                      % (column_name, e))
760             return
761         self._idl.txn._write(self, column, datum)
762
763     @classmethod
764     def from_json(cls, idl, table, uuid, row_json):
765         data = {}
766         for column_name, datum_json in six.iteritems(row_json):
767             column = table.columns.get(column_name)
768             if not column:
769                 # XXX rate-limit
770                 vlog.warn("unknown column %s in table %s"
771                           % (column_name, table.name))
772                 continue
773             try:
774                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
775             except error.Error as e:
776                 # XXX rate-limit
777                 vlog.warn("error parsing column %s in table %s: %s"
778                           % (column_name, table.name, e))
779                 continue
780             data[column_name] = datum
781         return cls(idl, table, uuid, data)
782
783     def verify(self, column_name):
784         """Causes the original contents of column 'column_name' in this row to
785         be verified as a prerequisite to completing the transaction.  That is,
786         if 'column_name' changed in this row (or if this row was deleted)
787         between the time that the IDL originally read its contents and the time
788         that the transaction commits, then the transaction aborts and
789         Transaction.commit() returns Transaction.TRY_AGAIN.
790
791         The intention is that, to ensure that no transaction commits based on
792         dirty reads, an application should call Row.verify() on each data item
793         read as part of a read-modify-write operation.
794
795         In some cases Row.verify() reduces to a no-op, because the current
796         value of the column is already known:
797
798           - If this row is a row created by the current transaction (returned
799             by Transaction.insert()).
800
801           - If the column has already been modified within the current
802             transaction.
803
804         Because of the latter property, always call Row.verify() *before*
805         modifying the column, for a given read-modify-write.
806
807         A transaction must be in progress."""
808         assert self._idl.txn
809         assert self._changes is not None
810         if not self._data or column_name in self._changes:
811             return
812
813         self._prereqs[column_name] = None
814
815     def delete(self):
816         """Deletes this row from its table.
817
818         A transaction must be in progress."""
819         assert self._idl.txn
820         assert self._changes is not None
821         if self._data is None:
822             del self._idl.txn._txn_rows[self.uuid]
823         else:
824             self._idl.txn._txn_rows[self.uuid] = self
825         self.__dict__["_changes"] = None
826         del self._table.rows[self.uuid]
827
828     def fetch(self, column_name):
829         self._idl.txn._fetch(self, column_name)
830
831     def increment(self, column_name):
832         """Causes the transaction, when committed, to increment the value of
833         'column_name' within this row by 1.  'column_name' must have an integer
834         type.  After the transaction commits successfully, the client may
835         retrieve the final (incremented) value of 'column_name' with
836         Transaction.get_increment_new_value().
837
838         The client could accomplish something similar by reading and writing
839         and verify()ing columns.  However, increment() will never (by itself)
840         cause a transaction to fail because of a verify error.
841
842         The intended use is for incrementing the "next_cfg" column in
843         the Open_vSwitch table."""
844         self._idl.txn._increment(self, column_name)
845
846
847 def _uuid_name_from_uuid(uuid):
848     return "row%s" % str(uuid).replace("-", "_")
849
850
851 def _where_uuid_equals(uuid):
852     return [["_uuid", "==", ["uuid", str(uuid)]]]
853
854
855 class _InsertedRow(object):
856     def __init__(self, op_index):
857         self.op_index = op_index
858         self.real = None
859
860
861 class Transaction(object):
862     """A transaction may modify the contents of a database by modifying the
863     values of columns, deleting rows, inserting rows, or adding checks that
864     columns in the database have not changed ("verify" operations), through
865     Row methods.
866
867     Reading and writing columns and inserting and deleting rows are all
868     straightforward.  The reasons to verify columns are less obvious.
869     Verification is the key to maintaining transactional integrity.  Because
870     OVSDB handles multiple clients, it can happen that between the time that
871     OVSDB client A reads a column and writes a new value, OVSDB client B has
872     written that column.  Client A's write should not ordinarily overwrite
873     client B's, especially if the column in question is a "map" column that
874     contains several more or less independent data items.  If client A adds a
875     "verify" operation before it writes the column, then the transaction fails
876     in case client B modifies it first.  Client A will then see the new value
877     of the column and compose a new transaction based on the new contents
878     written by client B.
879
880     When a transaction is complete, which must be before the next call to
881     Idl.run(), call Transaction.commit() or Transaction.abort().
882
883     The life-cycle of a transaction looks like this:
884
885     1. Create the transaction and record the initial sequence number:
886
887         seqno = idl.change_seqno(idl)
888         txn = Transaction(idl)
889
890     2. Modify the database with Row and Transaction methods.
891
892     3. Commit the transaction by calling Transaction.commit().  The first call
893        to this function probably returns Transaction.INCOMPLETE.  The client
894        must keep calling again along as this remains true, calling Idl.run() in
895        between to let the IDL do protocol processing.  (If the client doesn't
896        have anything else to do in the meantime, it can use
897        Transaction.commit_block() to avoid having to loop itself.)
898
899     4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
900        to change from the saved 'seqno' (it's possible that it's already
901        changed, in which case the client should not wait at all), then start
902        over from step 1.  Only a call to Idl.run() will change the return value
903        of Idl.change_seqno.  (Transaction.commit_block() calls Idl.run().)"""
904
905     # Status values that Transaction.commit() can return.
906
907     # Not yet committed or aborted.
908     UNCOMMITTED = "uncommitted"
909     # Transaction didn't include any changes.
910     UNCHANGED = "unchanged"
911     # Commit in progress, please wait.
912     INCOMPLETE = "incomplete"
913     # ovsdb_idl_txn_abort() called.
914     ABORTED = "aborted"
915     # Commit successful.
916     SUCCESS = "success"
917     # Commit failed because a "verify" operation
918     # reported an inconsistency, due to a network
919     # problem, or other transient failure.  Wait
920     # for a change, then try again.
921     TRY_AGAIN = "try again"
922     # Server hasn't given us the lock yet.
923     NOT_LOCKED = "not locked"
924     # Commit failed due to a hard error.
925     ERROR = "error"
926
927     @staticmethod
928     def status_to_string(status):
929         """Converts one of the status values that Transaction.commit() can
930         return into a human-readable string.
931
932         (The status values are in fact such strings already, so
933         there's nothing to do.)"""
934         return status
935
936     def __init__(self, idl):
937         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
938         A given Idl may only have a single active transaction at a time.
939
940         A Transaction may modify the contents of a database by assigning new
941         values to columns (attributes of Row), deleting rows (with
942         Row.delete()), or inserting rows (with Transaction.insert()).  It may
943         also check that columns in the database have not changed with
944         Row.verify().
945
946         When a transaction is complete (which must be before the next call to
947         Idl.run()), call Transaction.commit() or Transaction.abort()."""
948         assert idl.txn is None
949
950         idl.txn = self
951         self._request_id = None
952         self.idl = idl
953         self.dry_run = False
954         self._txn_rows = {}
955         self._status = Transaction.UNCOMMITTED
956         self._error = None
957         self._comments = []
958
959         self._inc_row = None
960         self._inc_column = None
961
962         self._fetch_requests = []
963
964         self._inserted_rows = {}  # Map from UUID to _InsertedRow
965
966     def add_comment(self, comment):
967         """Appends 'comment' to the comments that will be passed to the OVSDB
968         server when this transaction is committed.  (The comment will be
969         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
970         relatively human-readable form.)"""
971         self._comments.append(comment)
972
973     def wait(self, poller):
974         """Causes poll_block() to wake up if this transaction has completed
975         committing."""
976         if self._status not in (Transaction.UNCOMMITTED,
977                                 Transaction.INCOMPLETE):
978             poller.immediate_wake()
979
980     def _substitute_uuids(self, json):
981         if isinstance(json, (list, tuple)):
982             if (len(json) == 2
983                     and json[0] == 'uuid'
984                     and ovs.ovsuuid.is_valid_string(json[1])):
985                 uuid = ovs.ovsuuid.from_string(json[1])
986                 row = self._txn_rows.get(uuid, None)
987                 if row and row._data is None:
988                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
989             else:
990                 return [self._substitute_uuids(elem) for elem in json]
991         return json
992
993     def __disassemble(self):
994         self.idl.txn = None
995
996         for row in six.itervalues(self._txn_rows):
997             if row._changes is None:
998                 row._table.rows[row.uuid] = row
999             elif row._data is None:
1000                 del row._table.rows[row.uuid]
1001             row.__dict__["_changes"] = {}
1002             row.__dict__["_prereqs"] = {}
1003         self._txn_rows = {}
1004
1005     def commit(self):
1006         """Attempts to commit 'txn'.  Returns the status of the commit
1007         operation, one of the following constants:
1008
1009           Transaction.INCOMPLETE:
1010
1011               The transaction is in progress, but not yet complete.  The caller
1012               should call again later, after calling Idl.run() to let the
1013               IDL do OVSDB protocol processing.
1014
1015           Transaction.UNCHANGED:
1016
1017               The transaction is complete.  (It didn't actually change the
1018               database, so the IDL didn't send any request to the database
1019               server.)
1020
1021           Transaction.ABORTED:
1022
1023               The caller previously called Transaction.abort().
1024
1025           Transaction.SUCCESS:
1026
1027               The transaction was successful.  The update made by the
1028               transaction (and possibly other changes made by other database
1029               clients) should already be visible in the IDL.
1030
1031           Transaction.TRY_AGAIN:
1032
1033               The transaction failed for some transient reason, e.g. because a
1034               "verify" operation reported an inconsistency or due to a network
1035               problem.  The caller should wait for a change to the database,
1036               then compose a new transaction, and commit the new transaction.
1037
1038               Use Idl.change_seqno to wait for a change in the database.  It is
1039               important to use its value *before* the initial call to
1040               Transaction.commit() as the baseline for this purpose, because
1041               the change that one should wait for can happen after the initial
1042               call but before the call that returns Transaction.TRY_AGAIN, and
1043               using some other baseline value in that situation could cause an
1044               indefinite wait if the database rarely changes.
1045
1046           Transaction.NOT_LOCKED:
1047
1048               The transaction failed because the IDL has been configured to
1049               require a database lock (with Idl.set_lock()) but didn't
1050               get it yet or has already lost it.
1051
1052         Committing a transaction rolls back all of the changes that it made to
1053         the IDL's copy of the database.  If the transaction commits
1054         successfully, then the database server will send an update and, thus,
1055         the IDL will be updated with the committed changes."""
1056         # The status can only change if we're the active transaction.
1057         # (Otherwise, our status will change only in Idl.run().)
1058         if self != self.idl.txn:
1059             return self._status
1060
1061         # If we need a lock but don't have it, give up quickly.
1062         if self.idl.lock_name and not self.idl.has_lock:
1063             self._status = Transaction.NOT_LOCKED
1064             self.__disassemble()
1065             return self._status
1066
1067         operations = [self.idl._db.name]
1068
1069         # Assert that we have the required lock (avoiding a race).
1070         if self.idl.lock_name:
1071             operations.append({"op": "assert",
1072                                "lock": self.idl.lock_name})
1073
1074         # Add prerequisites and declarations of new rows.
1075         for row in six.itervalues(self._txn_rows):
1076             if row._prereqs:
1077                 rows = {}
1078                 columns = []
1079                 for column_name in row._prereqs:
1080                     columns.append(column_name)
1081                     rows[column_name] = row._data[column_name].to_json()
1082                 operations.append({"op": "wait",
1083                                    "table": row._table.name,
1084                                    "timeout": 0,
1085                                    "where": _where_uuid_equals(row.uuid),
1086                                    "until": "==",
1087                                    "columns": columns,
1088                                    "rows": [rows]})
1089
1090         # Add updates.
1091         any_updates = False
1092         for row in six.itervalues(self._txn_rows):
1093             if row._changes is None:
1094                 if row._table.is_root:
1095                     operations.append({"op": "delete",
1096                                        "table": row._table.name,
1097                                        "where": _where_uuid_equals(row.uuid)})
1098                     any_updates = True
1099                 else:
1100                     # Let ovsdb-server decide whether to really delete it.
1101                     pass
1102             elif row._changes:
1103                 op = {"table": row._table.name}
1104                 if row._data is None:
1105                     op["op"] = "insert"
1106                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
1107                     any_updates = True
1108
1109                     op_index = len(operations) - 1
1110                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
1111                 else:
1112                     op["op"] = "update"
1113                     op["where"] = _where_uuid_equals(row.uuid)
1114
1115                 row_json = {}
1116                 op["row"] = row_json
1117
1118                 for column_name, datum in six.iteritems(row._changes):
1119                     if row._data is not None or not datum.is_default():
1120                         row_json[column_name] = (
1121                             self._substitute_uuids(datum.to_json()))
1122
1123                         # If anything really changed, consider it an update.
1124                         # We can't suppress not-really-changed values earlier
1125                         # or transactions would become nonatomic (see the big
1126                         # comment inside Transaction._write()).
1127                         if (not any_updates and row._data is not None and
1128                                 row._data[column_name] != datum):
1129                             any_updates = True
1130
1131                 if row._data is None or row_json:
1132                     operations.append(op)
1133
1134         if self._fetch_requests:
1135             for fetch in self._fetch_requests:
1136                 fetch["index"] = len(operations) - 1
1137                 operations.append({"op": "select",
1138                                    "table": fetch["row"]._table.name,
1139                                    "where": self._substitute_uuids(
1140                                        _where_uuid_equals(fetch["row"].uuid)),
1141                                    "columns": [fetch["column_name"]]})
1142             any_updates = True
1143
1144         # Add increment.
1145         if self._inc_row and any_updates:
1146             self._inc_index = len(operations) - 1
1147
1148             operations.append({"op": "mutate",
1149                                "table": self._inc_row._table.name,
1150                                "where": self._substitute_uuids(
1151                                    _where_uuid_equals(self._inc_row.uuid)),
1152                                "mutations": [[self._inc_column, "+=", 1]]})
1153             operations.append({"op": "select",
1154                                "table": self._inc_row._table.name,
1155                                "where": self._substitute_uuids(
1156                                    _where_uuid_equals(self._inc_row.uuid)),
1157                                "columns": [self._inc_column]})
1158
1159         # Add comment.
1160         if self._comments:
1161             operations.append({"op": "comment",
1162                                "comment": "\n".join(self._comments)})
1163
1164         # Dry run?
1165         if self.dry_run:
1166             operations.append({"op": "abort"})
1167
1168         if not any_updates:
1169             self._status = Transaction.UNCHANGED
1170         else:
1171             msg = ovs.jsonrpc.Message.create_request("transact", operations)
1172             self._request_id = msg.id
1173             if not self.idl._session.send(msg):
1174                 self.idl._outstanding_txns[self._request_id] = self
1175                 self._status = Transaction.INCOMPLETE
1176             else:
1177                 self._status = Transaction.TRY_AGAIN
1178
1179         self.__disassemble()
1180         return self._status
1181
1182     def commit_block(self):
1183         """Attempts to commit this transaction, blocking until the commit
1184         either succeeds or fails.  Returns the final commit status, which may
1185         be any Transaction.* value other than Transaction.INCOMPLETE.
1186
1187         This function calls Idl.run() on this transaction'ss IDL, so it may
1188         cause Idl.change_seqno to change."""
1189         while True:
1190             status = self.commit()
1191             if status != Transaction.INCOMPLETE:
1192                 return status
1193
1194             self.idl.run()
1195
1196             poller = ovs.poller.Poller()
1197             self.idl.wait(poller)
1198             self.wait(poller)
1199             poller.block()
1200
1201     def get_increment_new_value(self):
1202         """Returns the final (incremented) value of the column in this
1203         transaction that was set to be incremented by Row.increment.  This
1204         transaction must have committed successfully."""
1205         assert self._status == Transaction.SUCCESS
1206         return self._inc_new_value
1207
1208     def abort(self):
1209         """Aborts this transaction.  If Transaction.commit() has already been
1210         called then the transaction might get committed anyhow."""
1211         self.__disassemble()
1212         if self._status in (Transaction.UNCOMMITTED,
1213                             Transaction.INCOMPLETE):
1214             self._status = Transaction.ABORTED
1215
1216     def get_error(self):
1217         """Returns a string representing this transaction's current status,
1218         suitable for use in log messages."""
1219         if self._status != Transaction.ERROR:
1220             return Transaction.status_to_string(self._status)
1221         elif self._error:
1222             return self._error
1223         else:
1224             return "no error details available"
1225
1226     def __set_error_json(self, json):
1227         if self._error is None:
1228             self._error = ovs.json.to_string(json)
1229
1230     def get_insert_uuid(self, uuid):
1231         """Finds and returns the permanent UUID that the database assigned to a
1232         newly inserted row, given the UUID that Transaction.insert() assigned
1233         locally to that row.
1234
1235         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1236         or if it was assigned by that function and then deleted by Row.delete()
1237         within the same transaction.  (Rows that are inserted and then deleted
1238         within a single transaction are never sent to the database server, so
1239         it never assigns them a permanent UUID.)
1240
1241         This transaction must have completed successfully."""
1242         assert self._status in (Transaction.SUCCESS,
1243                                 Transaction.UNCHANGED)
1244         inserted_row = self._inserted_rows.get(uuid)
1245         if inserted_row:
1246             return inserted_row.real
1247         return None
1248
1249     def _increment(self, row, column):
1250         assert not self._inc_row
1251         self._inc_row = row
1252         self._inc_column = column
1253
1254     def _fetch(self, row, column_name):
1255         self._fetch_requests.append({"row": row, "column_name": column_name})
1256
1257     def _write(self, row, column, datum):
1258         assert row._changes is not None
1259
1260         txn = row._idl.txn
1261
1262         # If this is a write-only column and the datum being written is the
1263         # same as the one already there, just skip the update entirely.  This
1264         # is worth optimizing because we have a lot of columns that get
1265         # periodically refreshed into the database but don't actually change
1266         # that often.
1267         #
1268         # We don't do this for read/write columns because that would break
1269         # atomicity of transactions--some other client might have written a
1270         # different value in that column since we read it.  (But if a whole
1271         # transaction only does writes of existing values, without making any
1272         # real changes, we will drop the whole transaction later in
1273         # ovsdb_idl_txn_commit().)
1274         if (not column.alert and row._data and
1275                 row._data.get(column.name) == datum):
1276             new_value = row._changes.get(column.name)
1277             if new_value is None or new_value == datum:
1278                 return
1279
1280         txn._txn_rows[row.uuid] = row
1281         row._changes[column.name] = datum.copy()
1282
1283     def insert(self, table, new_uuid=None):
1284         """Inserts and returns a new row in 'table', which must be one of the
1285         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1286
1287         The new row is assigned a provisional UUID.  If 'uuid' is None then one
1288         is randomly generated; otherwise 'uuid' should specify a randomly
1289         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
1290         different UUID when 'txn' is committed, but the IDL will replace any
1291         uses of the provisional UUID in the data to be to be committed by the
1292         UUID assigned by ovsdb-server."""
1293         assert self._status == Transaction.UNCOMMITTED
1294         if new_uuid is None:
1295             new_uuid = uuid.uuid4()
1296         row = Row(self.idl, table, new_uuid, None)
1297         table.rows[row.uuid] = row
1298         self._txn_rows[row.uuid] = row
1299         return row
1300
1301     def _process_reply(self, msg):
1302         if msg.type == ovs.jsonrpc.Message.T_ERROR:
1303             self._status = Transaction.ERROR
1304         elif not isinstance(msg.result, (list, tuple)):
1305             # XXX rate-limit
1306             vlog.warn('reply to "transact" is not JSON array')
1307         else:
1308             hard_errors = False
1309             soft_errors = False
1310             lock_errors = False
1311
1312             ops = msg.result
1313             for op in ops:
1314                 if op is None:
1315                     # This isn't an error in itself but indicates that some
1316                     # prior operation failed, so make sure that we know about
1317                     # it.
1318                     soft_errors = True
1319                 elif isinstance(op, dict):
1320                     error = op.get("error")
1321                     if error is not None:
1322                         if error == "timed out":
1323                             soft_errors = True
1324                         elif error == "not owner":
1325                             lock_errors = True
1326                         elif error == "aborted":
1327                             pass
1328                         else:
1329                             hard_errors = True
1330                             self.__set_error_json(op)
1331                 else:
1332                     hard_errors = True
1333                     self.__set_error_json(op)
1334                     # XXX rate-limit
1335                     vlog.warn("operation reply is not JSON null or object")
1336
1337             if not soft_errors and not hard_errors and not lock_errors:
1338                 if self._inc_row and not self.__process_inc_reply(ops):
1339                     hard_errors = True
1340                 if self._fetch_requests:
1341                     if self.__process_fetch_reply(ops):
1342                         self.idl.change_seqno += 1
1343                     else:
1344                         hard_errors = True
1345
1346                 for insert in six.itervalues(self._inserted_rows):
1347                     if not self.__process_insert_reply(insert, ops):
1348                         hard_errors = True
1349
1350             if hard_errors:
1351                 self._status = Transaction.ERROR
1352             elif lock_errors:
1353                 self._status = Transaction.NOT_LOCKED
1354             elif soft_errors:
1355                 self._status = Transaction.TRY_AGAIN
1356             else:
1357                 self._status = Transaction.SUCCESS
1358
1359     @staticmethod
1360     def __check_json_type(json, types, name):
1361         if not json:
1362             # XXX rate-limit
1363             vlog.warn("%s is missing" % name)
1364             return False
1365         elif not isinstance(json, tuple(types)):
1366             # XXX rate-limit
1367             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1368             return False
1369         else:
1370             return True
1371
1372     def __process_fetch_reply(self, ops):
1373         update = False
1374         for fetch_request in self._fetch_requests:
1375             row = fetch_request["row"]
1376             column_name = fetch_request["column_name"]
1377             index = fetch_request["index"]
1378             table = row._table
1379
1380             select = ops[index]
1381             fetched_rows = select.get("rows")
1382             if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1383                                                  '"select" reply "rows"'):
1384                 return False
1385             if len(fetched_rows) != 1:
1386                 # XXX rate-limit
1387                 vlog.warn('"select" reply "rows" has %d elements '
1388                           'instead of 1' % len(fetched_rows))
1389                 continue
1390             fetched_row = fetched_rows[0]
1391             if not Transaction.__check_json_type(fetched_row, (dict,),
1392                                                  '"select" reply row'):
1393                 continue
1394
1395             column = table.columns.get(column_name)
1396             datum_json = fetched_row.get(column_name)
1397             datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1398
1399             row._data[column_name] = datum
1400             update = True
1401
1402         return update
1403
1404     def __process_inc_reply(self, ops):
1405         if self._inc_index + 2 > len(ops):
1406             # XXX rate-limit
1407             vlog.warn("reply does not contain enough operations for "
1408                       "increment (has %d, needs %d)" %
1409                       (len(ops), self._inc_index + 2))
1410
1411         # We know that this is a JSON object because the loop in
1412         # __process_reply() already checked.
1413         mutate = ops[self._inc_index]
1414         count = mutate.get("count")
1415         if not Transaction.__check_json_type(count, six.integer_types,
1416                                              '"mutate" reply "count"'):
1417             return False
1418         if count != 1:
1419             # XXX rate-limit
1420             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1421             return False
1422
1423         select = ops[self._inc_index + 1]
1424         rows = select.get("rows")
1425         if not Transaction.__check_json_type(rows, (list, tuple),
1426                                              '"select" reply "rows"'):
1427             return False
1428         if len(rows) != 1:
1429             # XXX rate-limit
1430             vlog.warn('"select" reply "rows" has %d elements '
1431                       'instead of 1' % len(rows))
1432             return False
1433         row = rows[0]
1434         if not Transaction.__check_json_type(row, (dict,),
1435                                              '"select" reply row'):
1436             return False
1437         column = row.get(self._inc_column)
1438         if not Transaction.__check_json_type(column, six.integer_types,
1439                                              '"select" reply inc column'):
1440             return False
1441         self._inc_new_value = column
1442         return True
1443
1444     def __process_insert_reply(self, insert, ops):
1445         if insert.op_index >= len(ops):
1446             # XXX rate-limit
1447             vlog.warn("reply does not contain enough operations "
1448                       "for insert (has %d, needs %d)"
1449                       % (len(ops), insert.op_index))
1450             return False
1451
1452         # We know that this is a JSON object because the loop in
1453         # __process_reply() already checked.
1454         reply = ops[insert.op_index]
1455         json_uuid = reply.get("uuid")
1456         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1457                                              '"insert" reply "uuid"'):
1458             return False
1459
1460         try:
1461             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1462         except error.Error:
1463             # XXX rate-limit
1464             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1465             return False
1466
1467         insert.real = uuid_
1468         return True
1469
1470
1471 class SchemaHelper(object):
1472     """IDL Schema helper.
1473
1474     This class encapsulates the logic required to generate schemas suitable
1475     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1476     they are interested in using register_columns().  When finished, the
1477     get_idl_schema() function may be called.
1478
1479     The location on disk of the schema used may be found in the
1480     'schema_location' variable."""
1481
1482     def __init__(self, location=None, schema_json=None):
1483         """Creates a new Schema object.
1484
1485         'location' file path to ovs schema. None means default location
1486         'schema_json' schema in json preresentation in memory
1487         """
1488
1489         if location and schema_json:
1490             raise ValueError("both location and schema_json can't be "
1491                              "specified. it's ambiguous.")
1492         if schema_json is None:
1493             if location is None:
1494                 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1495             schema_json = ovs.json.from_file(location)
1496
1497         self.schema_json = schema_json
1498         self._tables = {}
1499         self._readonly = {}
1500         self._all = False
1501
1502     def register_columns(self, table, columns, readonly=[]):
1503         """Registers interest in the given 'columns' of 'table'.  Future calls
1504         to get_idl_schema() will include 'table':column for each column in
1505         'columns'. This function automatically avoids adding duplicate entries
1506         to the schema.
1507         A subset of 'columns' can be specified as 'readonly'. The readonly
1508         columns are not replicated but can be fetched on-demand by the user
1509         with Row.fetch().
1510
1511         'table' must be a string.
1512         'columns' must be a list of strings.
1513         'readonly' must be a list of strings.
1514         """
1515
1516         assert isinstance(table, six.string_types)
1517         assert isinstance(columns, list)
1518
1519         columns = set(columns) | self._tables.get(table, set())
1520         self._tables[table] = columns
1521         self._readonly[table] = readonly
1522
1523     def register_table(self, table):
1524         """Registers interest in the given all columns of 'table'. Future calls
1525         to get_idl_schema() will include all columns of 'table'.
1526
1527         'table' must be a string
1528         """
1529         assert isinstance(table, six.string_types)
1530         self._tables[table] = set()  # empty set means all columns in the table
1531
1532     def register_all(self):
1533         """Registers interest in every column of every table."""
1534         self._all = True
1535
1536     def get_idl_schema(self):
1537         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1538         object based on columns registered using the register_columns()
1539         function."""
1540
1541         schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1542         self.schema_json = None
1543
1544         if not self._all:
1545             schema_tables = {}
1546             for table, columns in six.iteritems(self._tables):
1547                 schema_tables[table] = (
1548                     self._keep_table_columns(schema, table, columns))
1549
1550             schema.tables = schema_tables
1551         schema.readonly = self._readonly
1552         return schema
1553
1554     def _keep_table_columns(self, schema, table_name, columns):
1555         assert table_name in schema.tables
1556         table = schema.tables[table_name]
1557
1558         if not columns:
1559             # empty set means all columns in the table
1560             return table
1561
1562         new_columns = {}
1563         for column_name in columns:
1564             assert isinstance(column_name, six.string_types)
1565             assert column_name in table.columns
1566
1567             new_columns[column_name] = table.columns[column_name]
1568
1569         table.columns = new_columns
1570         return table