e69d35ea7d428631ce5248c489d4cfa4d80b95be
[cascardo/ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import functools
16 import uuid
17
18 import six
19
20 import ovs.jsonrpc
21 import ovs.db.parser
22 import ovs.db.schema
23 from ovs.db import error
24 import ovs.ovsuuid
25 import ovs.poller
26 import ovs.vlog
27
28 vlog = ovs.vlog.Vlog("idl")
29
30 __pychecker__ = 'no-classattr no-objattrs'
31
32 ROW_CREATE = "create"
33 ROW_UPDATE = "update"
34 ROW_DELETE = "delete"
35
36
37 class Idl(object):
38     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
39
40     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
41     requests to an OVSDB database server and parses the responses, converting
42     raw JSON into data structures that are easier for clients to digest.
43
44     The IDL also assists with issuing database transactions.  The client
45     creates a transaction, manipulates the IDL data structures, and commits or
46     aborts the transaction.  The IDL then composes and issues the necessary
47     JSON-RPC requests and reports to the client whether the transaction
48     completed successfully.
49
50     The client is allowed to access the following attributes directly, in a
51     read-only fashion:
52
53     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
54       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
55       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
56       to a Row object.
57
58       The client may directly read and write the Row objects referenced by the
59       'rows' map values.  Refer to Row for more details.
60
61     - 'change_seqno': A number that represents the IDL's state.  When the IDL
62       is updated (by Idl.run()), its value changes.  The sequence number can
63       occasionally change even if the database does not.  This happens if the
64       connection to the database drops and reconnects, which causes the
65       database contents to be reloaded even if they didn't change.  (It could
66       also happen if the database server sends out a "change" that reflects
67       what the IDL already thought was in the database.  The database server is
68       not supposed to do that, but bugs could in theory cause it to do so.)
69
70     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
71       if no lock is configured.
72
73     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
74       lock, and False otherwise.
75
76       Locking and unlocking happens asynchronously from the database client's
77       point of view, so the information is only useful for optimization
78       (e.g. if the client doesn't have the lock then there's no point in trying
79       to write to the database).
80
81     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
82       the database server has indicated that some other client already owns the
83       requested lock, and False otherwise.
84
85     - 'txn': The ovs.db.idl.Transaction object for the database transaction
86       currently being constructed, if there is one, or None otherwise.
87 """
88
89     def __init__(self, remote, schema):
90         """Creates and returns a connection to the database named 'db_name' on
91         'remote', which should be in a form acceptable to
92         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
93         replica of the remote database.
94
95         'schema' should be the schema for the remote database.  The caller may
96         have cut it down by removing tables or columns that are not of
97         interest.  The IDL will only replicate the tables and columns that
98         remain.  The caller may also add a attribute named 'alert' to selected
99         remaining columns, setting its value to False; if so, then changes to
100         those columns will not be considered changes to the database for the
101         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
102         useful for columns that the IDL's client will write but not read.
103
104         As a convenience to users, 'schema' may also be an instance of the
105         SchemaHelper class.
106
107         The IDL uses and modifies 'schema' directly."""
108
109         assert isinstance(schema, SchemaHelper)
110         schema = schema.get_idl_schema()
111
112         self.tables = schema.tables
113         self.readonly = schema.readonly
114         self._db = schema
115         self._session = ovs.jsonrpc.Session.open(remote)
116         self._monitor_request_id = None
117         self._last_seqno = None
118         self.change_seqno = 0
119
120         # Database locking.
121         self.lock_name = None          # Name of lock we need, None if none.
122         self.has_lock = False          # Has db server said we have the lock?
123         self.is_lock_contended = False  # Has db server said we can't get lock?
124         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
125
126         # Transaction support.
127         self.txn = None
128         self._outstanding_txns = {}
129
130         for table in six.itervalues(schema.tables):
131             for column in six.itervalues(table.columns):
132                 if not hasattr(column, 'alert'):
133                     column.alert = True
134             table.need_table = False
135             table.rows = {}
136             table.idl = self
137
138     def close(self):
139         """Closes the connection to the database.  The IDL will no longer
140         update."""
141         self._session.close()
142
143     def run(self):
144         """Processes a batch of messages from the database server.  Returns
145         True if the database as seen through the IDL changed, False if it did
146         not change.  The initial fetch of the entire contents of the remote
147         database is considered to be one kind of change.  If the IDL has been
148         configured to acquire a database lock (with Idl.set_lock()), then
149         successfully acquiring the lock is also considered to be a change.
150
151         This function can return occasional false positives, that is, report
152         that the database changed even though it didn't.  This happens if the
153         connection to the database drops and reconnects, which causes the
154         database contents to be reloaded even if they didn't change.  (It could
155         also happen if the database server sends out a "change" that reflects
156         what we already thought was in the database, but the database server is
157         not supposed to do that.)
158
159         As an alternative to checking the return value, the client may check
160         for changes in self.change_seqno."""
161         assert not self.txn
162         initial_change_seqno = self.change_seqno
163         self._session.run()
164         i = 0
165         while i < 50:
166             i += 1
167             if not self._session.is_connected():
168                 break
169
170             seqno = self._session.get_seqno()
171             if seqno != self._last_seqno:
172                 self._last_seqno = seqno
173                 self.__txn_abort_all()
174                 self.__send_monitor_request()
175                 if self.lock_name:
176                     self.__send_lock_request()
177                 break
178
179             msg = self._session.recv()
180             if msg is None:
181                 break
182             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
183                 and msg.method == "update"
184                 and len(msg.params) == 2
185                 and msg.params[0] is None):
186                 # Database contents changed.
187                 self.__parse_update(msg.params[1])
188             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
189                   and self._monitor_request_id is not None
190                   and self._monitor_request_id == msg.id):
191                 # Reply to our "monitor" request.
192                 try:
193                     self.change_seqno += 1
194                     self._monitor_request_id = None
195                     self.__clear()
196                     self.__parse_update(msg.result)
197                 except error.Error as e:
198                     vlog.err("%s: parse error in received schema: %s"
199                               % (self._session.get_name(), e))
200                     self.__error()
201             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
202                   and self._lock_request_id is not None
203                   and self._lock_request_id == msg.id):
204                 # Reply to our "lock" request.
205                 self.__parse_lock_reply(msg.result)
206             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
207                   and msg.method == "locked"):
208                 # We got our lock.
209                 self.__parse_lock_notify(msg.params, True)
210             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
211                   and msg.method == "stolen"):
212                 # Someone else stole our lock.
213                 self.__parse_lock_notify(msg.params, False)
214             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
215                 # Reply to our echo request.  Ignore it.
216                 pass
217             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
218                                ovs.jsonrpc.Message.T_REPLY)
219                   and self.__txn_process_reply(msg)):
220                 # __txn_process_reply() did everything needed.
221                 pass
222             else:
223                 # This can happen if a transaction is destroyed before we
224                 # receive the reply, so keep the log level low.
225                 vlog.dbg("%s: received unexpected %s message"
226                          % (self._session.get_name(),
227                              ovs.jsonrpc.Message.type_to_string(msg.type)))
228
229         return initial_change_seqno != self.change_seqno
230
231     def wait(self, poller):
232         """Arranges for poller.block() to wake up when self.run() has something
233         to do or when activity occurs on a transaction on 'self'."""
234         self._session.wait(poller)
235         self._session.recv_wait(poller)
236
237     def has_ever_connected(self):
238         """Returns True, if the IDL successfully connected to the remote
239         database and retrieved its contents (even if the connection
240         subsequently dropped and is in the process of reconnecting).  If so,
241         then the IDL contains an atomic snapshot of the database's contents
242         (but it might be arbitrarily old if the connection dropped).
243
244         Returns False if the IDL has never connected or retrieved the
245         database's contents.  If so, the IDL is empty."""
246         return self.change_seqno != 0
247
248     def force_reconnect(self):
249         """Forces the IDL to drop its connection to the database and reconnect.
250         In the meantime, the contents of the IDL will not change."""
251         self._session.force_reconnect()
252
253     def set_lock(self, lock_name):
254         """If 'lock_name' is not None, configures the IDL to obtain the named
255         lock from the database server and to avoid modifying the database when
256         the lock cannot be acquired (that is, when another client has the same
257         lock).
258
259         If 'lock_name' is None, drops the locking requirement and releases the
260         lock."""
261         assert not self.txn
262         assert not self._outstanding_txns
263
264         if self.lock_name and (not lock_name or lock_name != self.lock_name):
265             # Release previous lock.
266             self.__send_unlock_request()
267             self.lock_name = None
268             self.is_lock_contended = False
269
270         if lock_name and not self.lock_name:
271             # Acquire new lock.
272             self.lock_name = lock_name
273             self.__send_lock_request()
274
275     def notify(self, event, row, updates=None):
276         """Hook for implementing create/update/delete notifications
277
278         :param event:   The event that was triggered
279         :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE
280         :param row:     The row as it is after the operation has occured
281         :type row:      Row
282         :param updates: For updates, a Row object with just the changed columns
283         :type updates:  Row
284         """
285
286     def __clear(self):
287         changed = False
288
289         for table in six.itervalues(self.tables):
290             if table.rows:
291                 changed = True
292                 table.rows = {}
293
294         if changed:
295             self.change_seqno += 1
296
297     def __update_has_lock(self, new_has_lock):
298         if new_has_lock and not self.has_lock:
299             if self._monitor_request_id is None:
300                 self.change_seqno += 1
301             else:
302                 # We're waiting for a monitor reply, so don't signal that the
303                 # database changed.  The monitor reply will increment
304                 # change_seqno anyhow.
305                 pass
306             self.is_lock_contended = False
307         self.has_lock = new_has_lock
308
309     def __do_send_lock_request(self, method):
310         self.__update_has_lock(False)
311         self._lock_request_id = None
312         if self._session.is_connected():
313             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
314             msg_id = msg.id
315             self._session.send(msg)
316         else:
317             msg_id = None
318         return msg_id
319
320     def __send_lock_request(self):
321         self._lock_request_id = self.__do_send_lock_request("lock")
322
323     def __send_unlock_request(self):
324         self.__do_send_lock_request("unlock")
325
326     def __parse_lock_reply(self, result):
327         self._lock_request_id = None
328         got_lock = isinstance(result, dict) and result.get("locked") is True
329         self.__update_has_lock(got_lock)
330         if not got_lock:
331             self.is_lock_contended = True
332
333     def __parse_lock_notify(self, params, new_has_lock):
334         if (self.lock_name is not None
335             and isinstance(params, (list, tuple))
336             and params
337             and params[0] == self.lock_name):
338             self.__update_has_lock(new_has_lock)
339             if not new_has_lock:
340                 self.is_lock_contended = True
341
342     def __send_monitor_request(self):
343         monitor_requests = {}
344         for table in six.itervalues(self.tables):
345             columns = []
346             for column in six.iterkeys(table.columns):
347                 if ((table.name not in self.readonly) or
348                     (table.name in self.readonly) and
349                     (column not in self.readonly[table.name])):
350                     columns.append(column)
351             monitor_requests[table.name] = {"columns": columns}
352         msg = ovs.jsonrpc.Message.create_request(
353             "monitor", [self._db.name, None, monitor_requests])
354         self._monitor_request_id = msg.id
355         self._session.send(msg)
356
357     def __parse_update(self, update):
358         try:
359             self.__do_parse_update(update)
360         except error.Error as e:
361             vlog.err("%s: error parsing update: %s"
362                      % (self._session.get_name(), e))
363
364     def __do_parse_update(self, table_updates):
365         if not isinstance(table_updates, dict):
366             raise error.Error("<table-updates> is not an object",
367                               table_updates)
368
369         for table_name, table_update in six.iteritems(table_updates):
370             table = self.tables.get(table_name)
371             if not table:
372                 raise error.Error('<table-updates> includes unknown '
373                                   'table "%s"' % table_name)
374
375             if not isinstance(table_update, dict):
376                 raise error.Error('<table-update> for table "%s" is not '
377                                   'an object' % table_name, table_update)
378
379             for uuid_string, row_update in six.iteritems(table_update):
380                 if not ovs.ovsuuid.is_valid_string(uuid_string):
381                     raise error.Error('<table-update> for table "%s" '
382                                       'contains bad UUID "%s" as member '
383                                       'name' % (table_name, uuid_string),
384                                       table_update)
385                 uuid = ovs.ovsuuid.from_string(uuid_string)
386
387                 if not isinstance(row_update, dict):
388                     raise error.Error('<table-update> for table "%s" '
389                                       'contains <row-update> for %s that '
390                                       'is not an object'
391                                       % (table_name, uuid_string))
392
393                 parser = ovs.db.parser.Parser(row_update, "row-update")
394                 old = parser.get_optional("old", [dict])
395                 new = parser.get_optional("new", [dict])
396                 parser.finish()
397
398                 if not old and not new:
399                     raise error.Error('<row-update> missing "old" and '
400                                       '"new" members', row_update)
401
402                 if self.__process_update(table, uuid, old, new):
403                     self.change_seqno += 1
404
405     def __process_update(self, table, uuid, old, new):
406         """Returns True if a column changed, False otherwise."""
407         row = table.rows.get(uuid)
408         changed = False
409         if not new:
410             # Delete row.
411             if row:
412                 del table.rows[uuid]
413                 changed = True
414                 self.notify(ROW_DELETE, row)
415             else:
416                 # XXX rate-limit
417                 vlog.warn("cannot delete missing row %s from table %s"
418                           % (uuid, table.name))
419         elif not old:
420             # Insert row.
421             if not row:
422                 row = self.__create_row(table, uuid)
423                 changed = True
424             else:
425                 # XXX rate-limit
426                 vlog.warn("cannot add existing row %s to table %s"
427                           % (uuid, table.name))
428             if self.__row_update(table, row, new):
429                 changed = True
430                 self.notify(ROW_CREATE, row)
431         else:
432             op = ROW_UPDATE
433             if not row:
434                 row = self.__create_row(table, uuid)
435                 changed = True
436                 op = ROW_CREATE
437                 # XXX rate-limit
438                 vlog.warn("cannot modify missing row %s in table %s"
439                           % (uuid, table.name))
440             if self.__row_update(table, row, new):
441                 changed = True
442                 self.notify(op, row, Row.from_json(self, table, uuid, old))
443         return changed
444
445     def __row_update(self, table, row, row_json):
446         changed = False
447         for column_name, datum_json in six.iteritems(row_json):
448             column = table.columns.get(column_name)
449             if not column:
450                 # XXX rate-limit
451                 vlog.warn("unknown column %s updating table %s"
452                           % (column_name, table.name))
453                 continue
454
455             try:
456                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
457             except error.Error as e:
458                 # XXX rate-limit
459                 vlog.warn("error parsing column %s in table %s: %s"
460                           % (column_name, table.name, e))
461                 continue
462
463             if datum != row._data[column_name]:
464                 row._data[column_name] = datum
465                 if column.alert:
466                     changed = True
467             else:
468                 # Didn't really change but the OVSDB monitor protocol always
469                 # includes every value in a row.
470                 pass
471         return changed
472
473     def __create_row(self, table, uuid):
474         data = {}
475         for column in six.itervalues(table.columns):
476             data[column.name] = ovs.db.data.Datum.default(column.type)
477         row = table.rows[uuid] = Row(self, table, uuid, data)
478         return row
479
480     def __error(self):
481         self._session.force_reconnect()
482
483     def __txn_abort_all(self):
484         while self._outstanding_txns:
485             txn = self._outstanding_txns.popitem()[1]
486             txn._status = Transaction.TRY_AGAIN
487
488     def __txn_process_reply(self, msg):
489         txn = self._outstanding_txns.pop(msg.id, None)
490         if txn:
491             txn._process_reply(msg)
492
493
494 def _uuid_to_row(atom, base):
495     if base.ref_table:
496         return base.ref_table.rows.get(atom)
497     else:
498         return atom
499
500
501 def _row_to_uuid(value):
502     if isinstance(value, Row):
503         return value.uuid
504     else:
505         return value
506
507
508 @functools.total_ordering
509 class Row(object):
510     """A row within an IDL.
511
512     The client may access the following attributes directly:
513
514     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
515
516     - An attribute for each column in the Row's table, named for the column,
517       whose values are as returned by Datum.to_python() for the column's type.
518
519       If some error occurs (e.g. the database server's idea of the column is
520       different from the IDL's idea), then the attribute values is the
521       "default" value return by Datum.default() for the column's type.  (It is
522       important to know this because the default value may violate constraints
523       for the column's type, e.g. the default integer value is 0 even if column
524       contraints require the column's value to be positive.)
525
526       When a transaction is active, column attributes may also be assigned new
527       values.  Committing the transaction will then cause the new value to be
528       stored into the database.
529
530       *NOTE*: In the current implementation, the value of a column is a *copy*
531       of the value in the database.  This means that modifying its value
532       directly will have no useful effect.  For example, the following:
533         row.mycolumn["a"] = "b"              # don't do this
534       will not change anything in the database, even after commit.  To modify
535       the column, instead assign the modified column value back to the column:
536         d = row.mycolumn
537         d["a"] = "b"
538         row.mycolumn = d
539 """
540     def __init__(self, idl, table, uuid, data):
541         # All of the explicit references to self.__dict__ below are required
542         # to set real attributes with invoking self.__getattr__().
543         self.__dict__["uuid"] = uuid
544
545         self.__dict__["_idl"] = idl
546         self.__dict__["_table"] = table
547
548         # _data is the committed data.  It takes the following values:
549         #
550         #   - A dictionary that maps every column name to a Datum, if the row
551         #     exists in the committed form of the database.
552         #
553         #   - None, if this row is newly inserted within the active transaction
554         #     and thus has no committed form.
555         self.__dict__["_data"] = data
556
557         # _changes describes changes to this row within the active transaction.
558         # It takes the following values:
559         #
560         #   - {}, the empty dictionary, if no transaction is active or if the
561         #     row has yet not been changed within this transaction.
562         #
563         #   - A dictionary that maps a column name to its new Datum, if an
564         #     active transaction changes those columns' values.
565         #
566         #   - A dictionary that maps every column name to a Datum, if the row
567         #     is newly inserted within the active transaction.
568         #
569         #   - None, if this transaction deletes this row.
570         self.__dict__["_changes"] = {}
571
572         # A dictionary whose keys are the names of columns that must be
573         # verified as prerequisites when the transaction commits.  The values
574         # in the dictionary are all None.
575         self.__dict__["_prereqs"] = {}
576
577     def __lt__(self, other):
578         if not isinstance(other, Row):
579             return NotImplemented
580         return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
581
582     def __eq__(self, other):
583         if not isinstance(other, Row):
584             return NotImplemented
585         return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
586
587     def __hash__(self):
588         return int(self.__dict__['uuid'])
589
590     def __getattr__(self, column_name):
591         assert self._changes is not None
592
593         datum = self._changes.get(column_name)
594         if datum is None:
595             if self._data is None:
596                 raise AttributeError("%s instance has no attribute '%s'" %
597                                      (self.__class__.__name__, column_name))
598             if column_name in self._data:
599                 datum = self._data[column_name]
600             else:
601                 raise AttributeError("%s instance has no attribute '%s'" %
602                                      (self.__class__.__name__, column_name))
603
604         return datum.to_python(_uuid_to_row)
605
606     def __setattr__(self, column_name, value):
607         assert self._changes is not None
608         assert self._idl.txn
609
610         if ((self._table.name in self._idl.readonly) and
611             (column_name in self._idl.readonly[self._table.name])):
612             vlog.warn("attempting to write to readonly column %s"
613                       % column_name)
614             return
615
616         column = self._table.columns[column_name]
617         try:
618             datum = ovs.db.data.Datum.from_python(column.type, value,
619                                                   _row_to_uuid)
620         except error.Error as e:
621             # XXX rate-limit
622             vlog.err("attempting to write bad value to column %s (%s)"
623                      % (column_name, e))
624             return
625         self._idl.txn._write(self, column, datum)
626
627     @classmethod
628     def from_json(cls, idl, table, uuid, row_json):
629         data = {}
630         for column_name, datum_json in six.iteritems(row_json):
631             column = table.columns.get(column_name)
632             if not column:
633                 # XXX rate-limit
634                 vlog.warn("unknown column %s in table %s"
635                           % (column_name, table.name))
636                 continue
637             try:
638                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
639             except error.Error as e:
640                 # XXX rate-limit
641                 vlog.warn("error parsing column %s in table %s: %s"
642                           % (column_name, table.name, e))
643                 continue
644             data[column_name] = datum
645         return cls(idl, table, uuid, data)
646
647     def verify(self, column_name):
648         """Causes the original contents of column 'column_name' in this row to
649         be verified as a prerequisite to completing the transaction.  That is,
650         if 'column_name' changed in this row (or if this row was deleted)
651         between the time that the IDL originally read its contents and the time
652         that the transaction commits, then the transaction aborts and
653         Transaction.commit() returns Transaction.TRY_AGAIN.
654
655         The intention is that, to ensure that no transaction commits based on
656         dirty reads, an application should call Row.verify() on each data item
657         read as part of a read-modify-write operation.
658
659         In some cases Row.verify() reduces to a no-op, because the current
660         value of the column is already known:
661
662           - If this row is a row created by the current transaction (returned
663             by Transaction.insert()).
664
665           - If the column has already been modified within the current
666             transaction.
667
668         Because of the latter property, always call Row.verify() *before*
669         modifying the column, for a given read-modify-write.
670
671         A transaction must be in progress."""
672         assert self._idl.txn
673         assert self._changes is not None
674         if not self._data or column_name in self._changes:
675             return
676
677         self._prereqs[column_name] = None
678
679     def delete(self):
680         """Deletes this row from its table.
681
682         A transaction must be in progress."""
683         assert self._idl.txn
684         assert self._changes is not None
685         if self._data is None:
686             del self._idl.txn._txn_rows[self.uuid]
687         else:
688             self._idl.txn._txn_rows[self.uuid] = self
689         self.__dict__["_changes"] = None
690         del self._table.rows[self.uuid]
691
692     def fetch(self, column_name):
693         self._idl.txn._fetch(self, column_name)
694
695     def increment(self, column_name):
696         """Causes the transaction, when committed, to increment the value of
697         'column_name' within this row by 1.  'column_name' must have an integer
698         type.  After the transaction commits successfully, the client may
699         retrieve the final (incremented) value of 'column_name' with
700         Transaction.get_increment_new_value().
701
702         The client could accomplish something similar by reading and writing
703         and verify()ing columns.  However, increment() will never (by itself)
704         cause a transaction to fail because of a verify error.
705
706         The intended use is for incrementing the "next_cfg" column in
707         the Open_vSwitch table."""
708         self._idl.txn._increment(self, column_name)
709
710
711 def _uuid_name_from_uuid(uuid):
712     return "row%s" % str(uuid).replace("-", "_")
713
714
715 def _where_uuid_equals(uuid):
716     return [["_uuid", "==", ["uuid", str(uuid)]]]
717
718
719 class _InsertedRow(object):
720     def __init__(self, op_index):
721         self.op_index = op_index
722         self.real = None
723
724
725 class Transaction(object):
726     """A transaction may modify the contents of a database by modifying the
727     values of columns, deleting rows, inserting rows, or adding checks that
728     columns in the database have not changed ("verify" operations), through
729     Row methods.
730
731     Reading and writing columns and inserting and deleting rows are all
732     straightforward.  The reasons to verify columns are less obvious.
733     Verification is the key to maintaining transactional integrity.  Because
734     OVSDB handles multiple clients, it can happen that between the time that
735     OVSDB client A reads a column and writes a new value, OVSDB client B has
736     written that column.  Client A's write should not ordinarily overwrite
737     client B's, especially if the column in question is a "map" column that
738     contains several more or less independent data items.  If client A adds a
739     "verify" operation before it writes the column, then the transaction fails
740     in case client B modifies it first.  Client A will then see the new value
741     of the column and compose a new transaction based on the new contents
742     written by client B.
743
744     When a transaction is complete, which must be before the next call to
745     Idl.run(), call Transaction.commit() or Transaction.abort().
746
747     The life-cycle of a transaction looks like this:
748
749     1. Create the transaction and record the initial sequence number:
750
751         seqno = idl.change_seqno(idl)
752         txn = Transaction(idl)
753
754     2. Modify the database with Row and Transaction methods.
755
756     3. Commit the transaction by calling Transaction.commit().  The first call
757        to this function probably returns Transaction.INCOMPLETE.  The client
758        must keep calling again along as this remains true, calling Idl.run() in
759        between to let the IDL do protocol processing.  (If the client doesn't
760        have anything else to do in the meantime, it can use
761        Transaction.commit_block() to avoid having to loop itself.)
762
763     4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
764        to change from the saved 'seqno' (it's possible that it's already
765        changed, in which case the client should not wait at all), then start
766        over from step 1.  Only a call to Idl.run() will change the return value
767        of Idl.change_seqno.  (Transaction.commit_block() calls Idl.run().)"""
768
769     # Status values that Transaction.commit() can return.
770
771     # Not yet committed or aborted.
772     UNCOMMITTED = "uncommitted"
773     # Transaction didn't include any changes.
774     UNCHANGED = "unchanged"
775     # Commit in progress, please wait.
776     INCOMPLETE = "incomplete"
777     # ovsdb_idl_txn_abort() called.
778     ABORTED = "aborted"
779     # Commit successful.
780     SUCCESS = "success"
781     # Commit failed because a "verify" operation
782     # reported an inconsistency, due to a network
783     # problem, or other transient failure.  Wait
784     # for a change, then try again.
785     TRY_AGAIN = "try again"
786     # Server hasn't given us the lock yet.
787     NOT_LOCKED = "not locked"
788     # Commit failed due to a hard error.
789     ERROR = "error"
790
791     @staticmethod
792     def status_to_string(status):
793         """Converts one of the status values that Transaction.commit() can
794         return into a human-readable string.
795
796         (The status values are in fact such strings already, so
797         there's nothing to do.)"""
798         return status
799
800     def __init__(self, idl):
801         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
802         A given Idl may only have a single active transaction at a time.
803
804         A Transaction may modify the contents of a database by assigning new
805         values to columns (attributes of Row), deleting rows (with
806         Row.delete()), or inserting rows (with Transaction.insert()).  It may
807         also check that columns in the database have not changed with
808         Row.verify().
809
810         When a transaction is complete (which must be before the next call to
811         Idl.run()), call Transaction.commit() or Transaction.abort()."""
812         assert idl.txn is None
813
814         idl.txn = self
815         self._request_id = None
816         self.idl = idl
817         self.dry_run = False
818         self._txn_rows = {}
819         self._status = Transaction.UNCOMMITTED
820         self._error = None
821         self._comments = []
822
823         self._inc_row = None
824         self._inc_column = None
825
826         self._fetch_requests = []
827
828         self._inserted_rows = {}  # Map from UUID to _InsertedRow
829
830     def add_comment(self, comment):
831         """Appends 'comment' to the comments that will be passed to the OVSDB
832         server when this transaction is committed.  (The comment will be
833         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
834         relatively human-readable form.)"""
835         self._comments.append(comment)
836
837     def wait(self, poller):
838         """Causes poll_block() to wake up if this transaction has completed
839         committing."""
840         if self._status not in (Transaction.UNCOMMITTED,
841                                 Transaction.INCOMPLETE):
842             poller.immediate_wake()
843
844     def _substitute_uuids(self, json):
845         if isinstance(json, (list, tuple)):
846             if (len(json) == 2
847                 and json[0] == 'uuid'
848                 and ovs.ovsuuid.is_valid_string(json[1])):
849                 uuid = ovs.ovsuuid.from_string(json[1])
850                 row = self._txn_rows.get(uuid, None)
851                 if row and row._data is None:
852                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
853             else:
854                 return [self._substitute_uuids(elem) for elem in json]
855         return json
856
857     def __disassemble(self):
858         self.idl.txn = None
859
860         for row in six.itervalues(self._txn_rows):
861             if row._changes is None:
862                 row._table.rows[row.uuid] = row
863             elif row._data is None:
864                 del row._table.rows[row.uuid]
865             row.__dict__["_changes"] = {}
866             row.__dict__["_prereqs"] = {}
867         self._txn_rows = {}
868
869     def commit(self):
870         """Attempts to commit 'txn'.  Returns the status of the commit
871         operation, one of the following constants:
872
873           Transaction.INCOMPLETE:
874
875               The transaction is in progress, but not yet complete.  The caller
876               should call again later, after calling Idl.run() to let the
877               IDL do OVSDB protocol processing.
878
879           Transaction.UNCHANGED:
880
881               The transaction is complete.  (It didn't actually change the
882               database, so the IDL didn't send any request to the database
883               server.)
884
885           Transaction.ABORTED:
886
887               The caller previously called Transaction.abort().
888
889           Transaction.SUCCESS:
890
891               The transaction was successful.  The update made by the
892               transaction (and possibly other changes made by other database
893               clients) should already be visible in the IDL.
894
895           Transaction.TRY_AGAIN:
896
897               The transaction failed for some transient reason, e.g. because a
898               "verify" operation reported an inconsistency or due to a network
899               problem.  The caller should wait for a change to the database,
900               then compose a new transaction, and commit the new transaction.
901
902               Use Idl.change_seqno to wait for a change in the database.  It is
903               important to use its value *before* the initial call to
904               Transaction.commit() as the baseline for this purpose, because
905               the change that one should wait for can happen after the initial
906               call but before the call that returns Transaction.TRY_AGAIN, and
907               using some other baseline value in that situation could cause an
908               indefinite wait if the database rarely changes.
909
910           Transaction.NOT_LOCKED:
911
912               The transaction failed because the IDL has been configured to
913               require a database lock (with Idl.set_lock()) but didn't
914               get it yet or has already lost it.
915
916         Committing a transaction rolls back all of the changes that it made to
917         the IDL's copy of the database.  If the transaction commits
918         successfully, then the database server will send an update and, thus,
919         the IDL will be updated with the committed changes."""
920         # The status can only change if we're the active transaction.
921         # (Otherwise, our status will change only in Idl.run().)
922         if self != self.idl.txn:
923             return self._status
924
925         # If we need a lock but don't have it, give up quickly.
926         if self.idl.lock_name and not self.idl.has_lock:
927             self._status = Transaction.NOT_LOCKED
928             self.__disassemble()
929             return self._status
930
931         operations = [self.idl._db.name]
932
933         # Assert that we have the required lock (avoiding a race).
934         if self.idl.lock_name:
935             operations.append({"op": "assert",
936                                "lock": self.idl.lock_name})
937
938         # Add prerequisites and declarations of new rows.
939         for row in six.itervalues(self._txn_rows):
940             if row._prereqs:
941                 rows = {}
942                 columns = []
943                 for column_name in row._prereqs:
944                     columns.append(column_name)
945                     rows[column_name] = row._data[column_name].to_json()
946                 operations.append({"op": "wait",
947                                    "table": row._table.name,
948                                    "timeout": 0,
949                                    "where": _where_uuid_equals(row.uuid),
950                                    "until": "==",
951                                    "columns": columns,
952                                    "rows": [rows]})
953
954         # Add updates.
955         any_updates = False
956         for row in six.itervalues(self._txn_rows):
957             if row._changes is None:
958                 if row._table.is_root:
959                     operations.append({"op": "delete",
960                                        "table": row._table.name,
961                                        "where": _where_uuid_equals(row.uuid)})
962                     any_updates = True
963                 else:
964                     # Let ovsdb-server decide whether to really delete it.
965                     pass
966             elif row._changes:
967                 op = {"table": row._table.name}
968                 if row._data is None:
969                     op["op"] = "insert"
970                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
971                     any_updates = True
972
973                     op_index = len(operations) - 1
974                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
975                 else:
976                     op["op"] = "update"
977                     op["where"] = _where_uuid_equals(row.uuid)
978
979                 row_json = {}
980                 op["row"] = row_json
981
982                 for column_name, datum in six.iteritems(row._changes):
983                     if row._data is not None or not datum.is_default():
984                         row_json[column_name] = (
985                                 self._substitute_uuids(datum.to_json()))
986
987                         # If anything really changed, consider it an update.
988                         # We can't suppress not-really-changed values earlier
989                         # or transactions would become nonatomic (see the big
990                         # comment inside Transaction._write()).
991                         if (not any_updates and row._data is not None and
992                             row._data[column_name] != datum):
993                             any_updates = True
994
995                 if row._data is None or row_json:
996                     operations.append(op)
997
998         if self._fetch_requests:
999             for fetch in self._fetch_requests:
1000                 fetch["index"] = len(operations) - 1
1001                 operations.append({"op": "select",
1002                                    "table": fetch["row"]._table.name,
1003                                    "where": self._substitute_uuids(
1004                                        _where_uuid_equals(fetch["row"].uuid)),
1005                                    "columns": [fetch["column_name"]]})
1006             any_updates = True
1007
1008         # Add increment.
1009         if self._inc_row and any_updates:
1010             self._inc_index = len(operations) - 1
1011
1012             operations.append({"op": "mutate",
1013                                "table": self._inc_row._table.name,
1014                                "where": self._substitute_uuids(
1015                                    _where_uuid_equals(self._inc_row.uuid)),
1016                                "mutations": [[self._inc_column, "+=", 1]]})
1017             operations.append({"op": "select",
1018                                "table": self._inc_row._table.name,
1019                                "where": self._substitute_uuids(
1020                                    _where_uuid_equals(self._inc_row.uuid)),
1021                                "columns": [self._inc_column]})
1022
1023         # Add comment.
1024         if self._comments:
1025             operations.append({"op": "comment",
1026                                "comment": "\n".join(self._comments)})
1027
1028         # Dry run?
1029         if self.dry_run:
1030             operations.append({"op": "abort"})
1031
1032         if not any_updates:
1033             self._status = Transaction.UNCHANGED
1034         else:
1035             msg = ovs.jsonrpc.Message.create_request("transact", operations)
1036             self._request_id = msg.id
1037             if not self.idl._session.send(msg):
1038                 self.idl._outstanding_txns[self._request_id] = self
1039                 self._status = Transaction.INCOMPLETE
1040             else:
1041                 self._status = Transaction.TRY_AGAIN
1042
1043         self.__disassemble()
1044         return self._status
1045
1046     def commit_block(self):
1047         """Attempts to commit this transaction, blocking until the commit
1048         either succeeds or fails.  Returns the final commit status, which may
1049         be any Transaction.* value other than Transaction.INCOMPLETE.
1050
1051         This function calls Idl.run() on this transaction'ss IDL, so it may
1052         cause Idl.change_seqno to change."""
1053         while True:
1054             status = self.commit()
1055             if status != Transaction.INCOMPLETE:
1056                 return status
1057
1058             self.idl.run()
1059
1060             poller = ovs.poller.Poller()
1061             self.idl.wait(poller)
1062             self.wait(poller)
1063             poller.block()
1064
1065     def get_increment_new_value(self):
1066         """Returns the final (incremented) value of the column in this
1067         transaction that was set to be incremented by Row.increment.  This
1068         transaction must have committed successfully."""
1069         assert self._status == Transaction.SUCCESS
1070         return self._inc_new_value
1071
1072     def abort(self):
1073         """Aborts this transaction.  If Transaction.commit() has already been
1074         called then the transaction might get committed anyhow."""
1075         self.__disassemble()
1076         if self._status in (Transaction.UNCOMMITTED,
1077                             Transaction.INCOMPLETE):
1078             self._status = Transaction.ABORTED
1079
1080     def get_error(self):
1081         """Returns a string representing this transaction's current status,
1082         suitable for use in log messages."""
1083         if self._status != Transaction.ERROR:
1084             return Transaction.status_to_string(self._status)
1085         elif self._error:
1086             return self._error
1087         else:
1088             return "no error details available"
1089
1090     def __set_error_json(self, json):
1091         if self._error is None:
1092             self._error = ovs.json.to_string(json)
1093
1094     def get_insert_uuid(self, uuid):
1095         """Finds and returns the permanent UUID that the database assigned to a
1096         newly inserted row, given the UUID that Transaction.insert() assigned
1097         locally to that row.
1098
1099         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1100         or if it was assigned by that function and then deleted by Row.delete()
1101         within the same transaction.  (Rows that are inserted and then deleted
1102         within a single transaction are never sent to the database server, so
1103         it never assigns them a permanent UUID.)
1104
1105         This transaction must have completed successfully."""
1106         assert self._status in (Transaction.SUCCESS,
1107                                 Transaction.UNCHANGED)
1108         inserted_row = self._inserted_rows.get(uuid)
1109         if inserted_row:
1110             return inserted_row.real
1111         return None
1112
1113     def _increment(self, row, column):
1114         assert not self._inc_row
1115         self._inc_row = row
1116         self._inc_column = column
1117
1118     def _fetch(self, row, column_name):
1119         self._fetch_requests.append({"row": row, "column_name": column_name})
1120
1121     def _write(self, row, column, datum):
1122         assert row._changes is not None
1123
1124         txn = row._idl.txn
1125
1126         # If this is a write-only column and the datum being written is the
1127         # same as the one already there, just skip the update entirely.  This
1128         # is worth optimizing because we have a lot of columns that get
1129         # periodically refreshed into the database but don't actually change
1130         # that often.
1131         #
1132         # We don't do this for read/write columns because that would break
1133         # atomicity of transactions--some other client might have written a
1134         # different value in that column since we read it.  (But if a whole
1135         # transaction only does writes of existing values, without making any
1136         # real changes, we will drop the whole transaction later in
1137         # ovsdb_idl_txn_commit().)
1138         if (not column.alert and row._data and
1139                 row._data.get(column.name) == datum):
1140             new_value = row._changes.get(column.name)
1141             if new_value is None or new_value == datum:
1142                 return
1143
1144         txn._txn_rows[row.uuid] = row
1145         row._changes[column.name] = datum.copy()
1146
1147     def insert(self, table, new_uuid=None):
1148         """Inserts and returns a new row in 'table', which must be one of the
1149         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1150
1151         The new row is assigned a provisional UUID.  If 'uuid' is None then one
1152         is randomly generated; otherwise 'uuid' should specify a randomly
1153         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
1154         different UUID when 'txn' is committed, but the IDL will replace any
1155         uses of the provisional UUID in the data to be to be committed by the
1156         UUID assigned by ovsdb-server."""
1157         assert self._status == Transaction.UNCOMMITTED
1158         if new_uuid is None:
1159             new_uuid = uuid.uuid4()
1160         row = Row(self.idl, table, new_uuid, None)
1161         table.rows[row.uuid] = row
1162         self._txn_rows[row.uuid] = row
1163         return row
1164
1165     def _process_reply(self, msg):
1166         if msg.type == ovs.jsonrpc.Message.T_ERROR:
1167             self._status = Transaction.ERROR
1168         elif not isinstance(msg.result, (list, tuple)):
1169             # XXX rate-limit
1170             vlog.warn('reply to "transact" is not JSON array')
1171         else:
1172             hard_errors = False
1173             soft_errors = False
1174             lock_errors = False
1175
1176             ops = msg.result
1177             for op in ops:
1178                 if op is None:
1179                     # This isn't an error in itself but indicates that some
1180                     # prior operation failed, so make sure that we know about
1181                     # it.
1182                     soft_errors = True
1183                 elif isinstance(op, dict):
1184                     error = op.get("error")
1185                     if error is not None:
1186                         if error == "timed out":
1187                             soft_errors = True
1188                         elif error == "not owner":
1189                             lock_errors = True
1190                         elif error == "aborted":
1191                             pass
1192                         else:
1193                             hard_errors = True
1194                             self.__set_error_json(op)
1195                 else:
1196                     hard_errors = True
1197                     self.__set_error_json(op)
1198                     # XXX rate-limit
1199                     vlog.warn("operation reply is not JSON null or object")
1200
1201             if not soft_errors and not hard_errors and not lock_errors:
1202                 if self._inc_row and not self.__process_inc_reply(ops):
1203                     hard_errors = True
1204                 if self._fetch_requests:
1205                     if self.__process_fetch_reply(ops):
1206                         self.idl.change_seqno += 1
1207                     else:
1208                         hard_errors = True
1209
1210                 for insert in six.itervalues(self._inserted_rows):
1211                     if not self.__process_insert_reply(insert, ops):
1212                         hard_errors = True
1213
1214             if hard_errors:
1215                 self._status = Transaction.ERROR
1216             elif lock_errors:
1217                 self._status = Transaction.NOT_LOCKED
1218             elif soft_errors:
1219                 self._status = Transaction.TRY_AGAIN
1220             else:
1221                 self._status = Transaction.SUCCESS
1222
1223     @staticmethod
1224     def __check_json_type(json, types, name):
1225         if not json:
1226             # XXX rate-limit
1227             vlog.warn("%s is missing" % name)
1228             return False
1229         elif not isinstance(json, tuple(types)):
1230             # XXX rate-limit
1231             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1232             return False
1233         else:
1234             return True
1235
1236     def __process_fetch_reply(self, ops):
1237         update = False
1238         for fetch_request in self._fetch_requests:
1239             row = fetch_request["row"]
1240             column_name = fetch_request["column_name"]
1241             index = fetch_request["index"]
1242             table = row._table
1243
1244             select = ops[index]
1245             fetched_rows = select.get("rows")
1246             if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1247                                                  '"select" reply "rows"'):
1248                 return False
1249             if len(fetched_rows) != 1:
1250                 # XXX rate-limit
1251                 vlog.warn('"select" reply "rows" has %d elements '
1252                           'instead of 1' % len(fetched_rows))
1253                 continue
1254             fetched_row = fetched_rows[0]
1255             if not Transaction.__check_json_type(fetched_row, (dict,),
1256                                                  '"select" reply row'):
1257                 continue
1258
1259             column = table.columns.get(column_name)
1260             datum_json = fetched_row.get(column_name)
1261             datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1262
1263             row._data[column_name] = datum
1264             update = True
1265
1266         return update
1267
1268     def __process_inc_reply(self, ops):
1269         if self._inc_index + 2 > len(ops):
1270             # XXX rate-limit
1271             vlog.warn("reply does not contain enough operations for "
1272                       "increment (has %d, needs %d)" %
1273                       (len(ops), self._inc_index + 2))
1274
1275         # We know that this is a JSON object because the loop in
1276         # __process_reply() already checked.
1277         mutate = ops[self._inc_index]
1278         count = mutate.get("count")
1279         if not Transaction.__check_json_type(count, six.integer_types,
1280                                              '"mutate" reply "count"'):
1281             return False
1282         if count != 1:
1283             # XXX rate-limit
1284             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1285             return False
1286
1287         select = ops[self._inc_index + 1]
1288         rows = select.get("rows")
1289         if not Transaction.__check_json_type(rows, (list, tuple),
1290                                              '"select" reply "rows"'):
1291             return False
1292         if len(rows) != 1:
1293             # XXX rate-limit
1294             vlog.warn('"select" reply "rows" has %d elements '
1295                       'instead of 1' % len(rows))
1296             return False
1297         row = rows[0]
1298         if not Transaction.__check_json_type(row, (dict,),
1299                                              '"select" reply row'):
1300             return False
1301         column = row.get(self._inc_column)
1302         if not Transaction.__check_json_type(column, six.integer_types,
1303                                              '"select" reply inc column'):
1304             return False
1305         self._inc_new_value = column
1306         return True
1307
1308     def __process_insert_reply(self, insert, ops):
1309         if insert.op_index >= len(ops):
1310             # XXX rate-limit
1311             vlog.warn("reply does not contain enough operations "
1312                       "for insert (has %d, needs %d)"
1313                       % (len(ops), insert.op_index))
1314             return False
1315
1316         # We know that this is a JSON object because the loop in
1317         # __process_reply() already checked.
1318         reply = ops[insert.op_index]
1319         json_uuid = reply.get("uuid")
1320         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1321                                              '"insert" reply "uuid"'):
1322             return False
1323
1324         try:
1325             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1326         except error.Error:
1327             # XXX rate-limit
1328             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1329             return False
1330
1331         insert.real = uuid_
1332         return True
1333
1334
1335 class SchemaHelper(object):
1336     """IDL Schema helper.
1337
1338     This class encapsulates the logic required to generate schemas suitable
1339     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1340     they are interested in using register_columns().  When finished, the
1341     get_idl_schema() function may be called.
1342
1343     The location on disk of the schema used may be found in the
1344     'schema_location' variable."""
1345
1346     def __init__(self, location=None, schema_json=None):
1347         """Creates a new Schema object.
1348
1349         'location' file path to ovs schema. None means default location
1350         'schema_json' schema in json preresentation in memory
1351         """
1352
1353         if location and schema_json:
1354             raise ValueError("both location and schema_json can't be "
1355                              "specified. it's ambiguous.")
1356         if schema_json is None:
1357             if location is None:
1358                 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1359             schema_json = ovs.json.from_file(location)
1360
1361         self.schema_json = schema_json
1362         self._tables = {}
1363         self._readonly = {}
1364         self._all = False
1365
1366     def register_columns(self, table, columns, readonly=[]):
1367         """Registers interest in the given 'columns' of 'table'.  Future calls
1368         to get_idl_schema() will include 'table':column for each column in
1369         'columns'. This function automatically avoids adding duplicate entries
1370         to the schema.
1371         A subset of 'columns' can be specified as 'readonly'. The readonly
1372         columns are not replicated but can be fetched on-demand by the user
1373         with Row.fetch().
1374
1375         'table' must be a string.
1376         'columns' must be a list of strings.
1377         'readonly' must be a list of strings.
1378         """
1379
1380         assert isinstance(table, six.string_types)
1381         assert isinstance(columns, list)
1382
1383         columns = set(columns) | self._tables.get(table, set())
1384         self._tables[table] = columns
1385         self._readonly[table] = readonly
1386
1387     def register_table(self, table):
1388         """Registers interest in the given all columns of 'table'. Future calls
1389         to get_idl_schema() will include all columns of 'table'.
1390
1391         'table' must be a string
1392         """
1393         assert isinstance(table, six.string_types)
1394         self._tables[table] = set()  # empty set means all columns in the table
1395
1396     def register_all(self):
1397         """Registers interest in every column of every table."""
1398         self._all = True
1399
1400     def get_idl_schema(self):
1401         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1402         object based on columns registered using the register_columns()
1403         function."""
1404
1405         schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1406         self.schema_json = None
1407
1408         if not self._all:
1409             schema_tables = {}
1410             for table, columns in six.iteritems(self._tables):
1411                 schema_tables[table] = (
1412                     self._keep_table_columns(schema, table, columns))
1413
1414             schema.tables = schema_tables
1415         schema.readonly = self._readonly
1416         return schema
1417
1418     def _keep_table_columns(self, schema, table_name, columns):
1419         assert table_name in schema.tables
1420         table = schema.tables[table_name]
1421
1422         if not columns:
1423             # empty set means all columns in the table
1424             return table
1425
1426         new_columns = {}
1427         for column_name in columns:
1428             assert isinstance(column_name, six.string_types)
1429             assert column_name in table.columns
1430
1431             new_columns[column_name] = table.columns[column_name]
1432
1433         table.columns = new_columns
1434         return table