python: Don't compare None and int.
[cascardo/ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import uuid
16
17 import six
18
19 import ovs.jsonrpc
20 import ovs.db.parser
21 import ovs.db.schema
22 from ovs.db import error
23 import ovs.ovsuuid
24 import ovs.poller
25 import ovs.vlog
26
27 vlog = ovs.vlog.Vlog("idl")
28
29 __pychecker__ = 'no-classattr no-objattrs'
30
31 ROW_CREATE = "create"
32 ROW_UPDATE = "update"
33 ROW_DELETE = "delete"
34
35
36 class Idl(object):
37     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
38
39     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
40     requests to an OVSDB database server and parses the responses, converting
41     raw JSON into data structures that are easier for clients to digest.
42
43     The IDL also assists with issuing database transactions.  The client
44     creates a transaction, manipulates the IDL data structures, and commits or
45     aborts the transaction.  The IDL then composes and issues the necessary
46     JSON-RPC requests and reports to the client whether the transaction
47     completed successfully.
48
49     The client is allowed to access the following attributes directly, in a
50     read-only fashion:
51
52     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
53       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
54       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
55       to a Row object.
56
57       The client may directly read and write the Row objects referenced by the
58       'rows' map values.  Refer to Row for more details.
59
60     - 'change_seqno': A number that represents the IDL's state.  When the IDL
61       is updated (by Idl.run()), its value changes.  The sequence number can
62       occasionally change even if the database does not.  This happens if the
63       connection to the database drops and reconnects, which causes the
64       database contents to be reloaded even if they didn't change.  (It could
65       also happen if the database server sends out a "change" that reflects
66       what the IDL already thought was in the database.  The database server is
67       not supposed to do that, but bugs could in theory cause it to do so.)
68
69     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
70       if no lock is configured.
71
72     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
73       lock, and False otherwise.
74
75       Locking and unlocking happens asynchronously from the database client's
76       point of view, so the information is only useful for optimization
77       (e.g. if the client doesn't have the lock then there's no point in trying
78       to write to the database).
79
80     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
81       the database server has indicated that some other client already owns the
82       requested lock, and False otherwise.
83
84     - 'txn': The ovs.db.idl.Transaction object for the database transaction
85       currently being constructed, if there is one, or None otherwise.
86 """
87
88     def __init__(self, remote, schema):
89         """Creates and returns a connection to the database named 'db_name' on
90         'remote', which should be in a form acceptable to
91         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
92         replica of the remote database.
93
94         'schema' should be the schema for the remote database.  The caller may
95         have cut it down by removing tables or columns that are not of
96         interest.  The IDL will only replicate the tables and columns that
97         remain.  The caller may also add a attribute named 'alert' to selected
98         remaining columns, setting its value to False; if so, then changes to
99         those columns will not be considered changes to the database for the
100         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
101         useful for columns that the IDL's client will write but not read.
102
103         As a convenience to users, 'schema' may also be an instance of the
104         SchemaHelper class.
105
106         The IDL uses and modifies 'schema' directly."""
107
108         assert isinstance(schema, SchemaHelper)
109         schema = schema.get_idl_schema()
110
111         self.tables = schema.tables
112         self.readonly = schema.readonly
113         self._db = schema
114         self._session = ovs.jsonrpc.Session.open(remote)
115         self._monitor_request_id = None
116         self._last_seqno = None
117         self.change_seqno = 0
118
119         # Database locking.
120         self.lock_name = None          # Name of lock we need, None if none.
121         self.has_lock = False          # Has db server said we have the lock?
122         self.is_lock_contended = False  # Has db server said we can't get lock?
123         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
124
125         # Transaction support.
126         self.txn = None
127         self._outstanding_txns = {}
128
129         for table in six.itervalues(schema.tables):
130             for column in six.itervalues(table.columns):
131                 if not hasattr(column, 'alert'):
132                     column.alert = True
133             table.need_table = False
134             table.rows = {}
135             table.idl = self
136
137     def close(self):
138         """Closes the connection to the database.  The IDL will no longer
139         update."""
140         self._session.close()
141
142     def run(self):
143         """Processes a batch of messages from the database server.  Returns
144         True if the database as seen through the IDL changed, False if it did
145         not change.  The initial fetch of the entire contents of the remote
146         database is considered to be one kind of change.  If the IDL has been
147         configured to acquire a database lock (with Idl.set_lock()), then
148         successfully acquiring the lock is also considered to be a change.
149
150         This function can return occasional false positives, that is, report
151         that the database changed even though it didn't.  This happens if the
152         connection to the database drops and reconnects, which causes the
153         database contents to be reloaded even if they didn't change.  (It could
154         also happen if the database server sends out a "change" that reflects
155         what we already thought was in the database, but the database server is
156         not supposed to do that.)
157
158         As an alternative to checking the return value, the client may check
159         for changes in self.change_seqno."""
160         assert not self.txn
161         initial_change_seqno = self.change_seqno
162         self._session.run()
163         i = 0
164         while i < 50:
165             i += 1
166             if not self._session.is_connected():
167                 break
168
169             seqno = self._session.get_seqno()
170             if seqno != self._last_seqno:
171                 self._last_seqno = seqno
172                 self.__txn_abort_all()
173                 self.__send_monitor_request()
174                 if self.lock_name:
175                     self.__send_lock_request()
176                 break
177
178             msg = self._session.recv()
179             if msg is None:
180                 break
181             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
182                 and msg.method == "update"
183                 and len(msg.params) == 2
184                 and msg.params[0] is None):
185                 # Database contents changed.
186                 self.__parse_update(msg.params[1])
187             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
188                   and self._monitor_request_id is not None
189                   and self._monitor_request_id == msg.id):
190                 # Reply to our "monitor" request.
191                 try:
192                     self.change_seqno += 1
193                     self._monitor_request_id = None
194                     self.__clear()
195                     self.__parse_update(msg.result)
196                 except error.Error as e:
197                     vlog.err("%s: parse error in received schema: %s"
198                               % (self._session.get_name(), e))
199                     self.__error()
200             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
201                   and self._lock_request_id is not None
202                   and self._lock_request_id == msg.id):
203                 # Reply to our "lock" request.
204                 self.__parse_lock_reply(msg.result)
205             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
206                   and msg.method == "locked"):
207                 # We got our lock.
208                 self.__parse_lock_notify(msg.params, True)
209             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
210                   and msg.method == "stolen"):
211                 # Someone else stole our lock.
212                 self.__parse_lock_notify(msg.params, False)
213             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
214                 # Reply to our echo request.  Ignore it.
215                 pass
216             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
217                                ovs.jsonrpc.Message.T_REPLY)
218                   and self.__txn_process_reply(msg)):
219                 # __txn_process_reply() did everything needed.
220                 pass
221             else:
222                 # This can happen if a transaction is destroyed before we
223                 # receive the reply, so keep the log level low.
224                 vlog.dbg("%s: received unexpected %s message"
225                          % (self._session.get_name(),
226                              ovs.jsonrpc.Message.type_to_string(msg.type)))
227
228         return initial_change_seqno != self.change_seqno
229
230     def wait(self, poller):
231         """Arranges for poller.block() to wake up when self.run() has something
232         to do or when activity occurs on a transaction on 'self'."""
233         self._session.wait(poller)
234         self._session.recv_wait(poller)
235
236     def has_ever_connected(self):
237         """Returns True, if the IDL successfully connected to the remote
238         database and retrieved its contents (even if the connection
239         subsequently dropped and is in the process of reconnecting).  If so,
240         then the IDL contains an atomic snapshot of the database's contents
241         (but it might be arbitrarily old if the connection dropped).
242
243         Returns False if the IDL has never connected or retrieved the
244         database's contents.  If so, the IDL is empty."""
245         return self.change_seqno != 0
246
247     def force_reconnect(self):
248         """Forces the IDL to drop its connection to the database and reconnect.
249         In the meantime, the contents of the IDL will not change."""
250         self._session.force_reconnect()
251
252     def set_lock(self, lock_name):
253         """If 'lock_name' is not None, configures the IDL to obtain the named
254         lock from the database server and to avoid modifying the database when
255         the lock cannot be acquired (that is, when another client has the same
256         lock).
257
258         If 'lock_name' is None, drops the locking requirement and releases the
259         lock."""
260         assert not self.txn
261         assert not self._outstanding_txns
262
263         if self.lock_name and (not lock_name or lock_name != self.lock_name):
264             # Release previous lock.
265             self.__send_unlock_request()
266             self.lock_name = None
267             self.is_lock_contended = False
268
269         if lock_name and not self.lock_name:
270             # Acquire new lock.
271             self.lock_name = lock_name
272             self.__send_lock_request()
273
274     def notify(self, event, row, updates=None):
275         """Hook for implementing create/update/delete notifications
276
277         :param event:   The event that was triggered
278         :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE
279         :param row:     The row as it is after the operation has occured
280         :type row:      Row
281         :param updates: For updates, a Row object with just the changed columns
282         :type updates:  Row
283         """
284
285     def __clear(self):
286         changed = False
287
288         for table in six.itervalues(self.tables):
289             if table.rows:
290                 changed = True
291                 table.rows = {}
292
293         if changed:
294             self.change_seqno += 1
295
296     def __update_has_lock(self, new_has_lock):
297         if new_has_lock and not self.has_lock:
298             if self._monitor_request_id is None:
299                 self.change_seqno += 1
300             else:
301                 # We're waiting for a monitor reply, so don't signal that the
302                 # database changed.  The monitor reply will increment
303                 # change_seqno anyhow.
304                 pass
305             self.is_lock_contended = False
306         self.has_lock = new_has_lock
307
308     def __do_send_lock_request(self, method):
309         self.__update_has_lock(False)
310         self._lock_request_id = None
311         if self._session.is_connected():
312             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
313             msg_id = msg.id
314             self._session.send(msg)
315         else:
316             msg_id = None
317         return msg_id
318
319     def __send_lock_request(self):
320         self._lock_request_id = self.__do_send_lock_request("lock")
321
322     def __send_unlock_request(self):
323         self.__do_send_lock_request("unlock")
324
325     def __parse_lock_reply(self, result):
326         self._lock_request_id = None
327         got_lock = isinstance(result, dict) and result.get("locked") is True
328         self.__update_has_lock(got_lock)
329         if not got_lock:
330             self.is_lock_contended = True
331
332     def __parse_lock_notify(self, params, new_has_lock):
333         if (self.lock_name is not None
334             and isinstance(params, (list, tuple))
335             and params
336             and params[0] == self.lock_name):
337             self.__update_has_lock(new_has_lock)
338             if not new_has_lock:
339                 self.is_lock_contended = True
340
341     def __send_monitor_request(self):
342         monitor_requests = {}
343         for table in six.itervalues(self.tables):
344             columns = []
345             for column in six.iterkeys(table.columns):
346                 if ((table.name not in self.readonly) or
347                     (table.name in self.readonly) and
348                     (column not in self.readonly[table.name])):
349                     columns.append(column)
350             monitor_requests[table.name] = {"columns": columns}
351         msg = ovs.jsonrpc.Message.create_request(
352             "monitor", [self._db.name, None, monitor_requests])
353         self._monitor_request_id = msg.id
354         self._session.send(msg)
355
356     def __parse_update(self, update):
357         try:
358             self.__do_parse_update(update)
359         except error.Error as e:
360             vlog.err("%s: error parsing update: %s"
361                      % (self._session.get_name(), e))
362
363     def __do_parse_update(self, table_updates):
364         if not isinstance(table_updates, dict):
365             raise error.Error("<table-updates> is not an object",
366                               table_updates)
367
368         for table_name, table_update in six.iteritems(table_updates):
369             table = self.tables.get(table_name)
370             if not table:
371                 raise error.Error('<table-updates> includes unknown '
372                                   'table "%s"' % table_name)
373
374             if not isinstance(table_update, dict):
375                 raise error.Error('<table-update> for table "%s" is not '
376                                   'an object' % table_name, table_update)
377
378             for uuid_string, row_update in six.iteritems(table_update):
379                 if not ovs.ovsuuid.is_valid_string(uuid_string):
380                     raise error.Error('<table-update> for table "%s" '
381                                       'contains bad UUID "%s" as member '
382                                       'name' % (table_name, uuid_string),
383                                       table_update)
384                 uuid = ovs.ovsuuid.from_string(uuid_string)
385
386                 if not isinstance(row_update, dict):
387                     raise error.Error('<table-update> for table "%s" '
388                                       'contains <row-update> for %s that '
389                                       'is not an object'
390                                       % (table_name, uuid_string))
391
392                 parser = ovs.db.parser.Parser(row_update, "row-update")
393                 old = parser.get_optional("old", [dict])
394                 new = parser.get_optional("new", [dict])
395                 parser.finish()
396
397                 if not old and not new:
398                     raise error.Error('<row-update> missing "old" and '
399                                       '"new" members', row_update)
400
401                 if self.__process_update(table, uuid, old, new):
402                     self.change_seqno += 1
403
404     def __process_update(self, table, uuid, old, new):
405         """Returns True if a column changed, False otherwise."""
406         row = table.rows.get(uuid)
407         changed = False
408         if not new:
409             # Delete row.
410             if row:
411                 del table.rows[uuid]
412                 changed = True
413                 self.notify(ROW_DELETE, row)
414             else:
415                 # XXX rate-limit
416                 vlog.warn("cannot delete missing row %s from table %s"
417                           % (uuid, table.name))
418         elif not old:
419             # Insert row.
420             if not row:
421                 row = self.__create_row(table, uuid)
422                 changed = True
423             else:
424                 # XXX rate-limit
425                 vlog.warn("cannot add existing row %s to table %s"
426                           % (uuid, table.name))
427             if self.__row_update(table, row, new):
428                 changed = True
429                 self.notify(ROW_CREATE, row)
430         else:
431             op = ROW_UPDATE
432             if not row:
433                 row = self.__create_row(table, uuid)
434                 changed = True
435                 op = ROW_CREATE
436                 # XXX rate-limit
437                 vlog.warn("cannot modify missing row %s in table %s"
438                           % (uuid, table.name))
439             if self.__row_update(table, row, new):
440                 changed = True
441                 self.notify(op, row, Row.from_json(self, table, uuid, old))
442         return changed
443
444     def __row_update(self, table, row, row_json):
445         changed = False
446         for column_name, datum_json in six.iteritems(row_json):
447             column = table.columns.get(column_name)
448             if not column:
449                 # XXX rate-limit
450                 vlog.warn("unknown column %s updating table %s"
451                           % (column_name, table.name))
452                 continue
453
454             try:
455                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
456             except error.Error as e:
457                 # XXX rate-limit
458                 vlog.warn("error parsing column %s in table %s: %s"
459                           % (column_name, table.name, e))
460                 continue
461
462             if datum != row._data[column_name]:
463                 row._data[column_name] = datum
464                 if column.alert:
465                     changed = True
466             else:
467                 # Didn't really change but the OVSDB monitor protocol always
468                 # includes every value in a row.
469                 pass
470         return changed
471
472     def __create_row(self, table, uuid):
473         data = {}
474         for column in six.itervalues(table.columns):
475             data[column.name] = ovs.db.data.Datum.default(column.type)
476         row = table.rows[uuid] = Row(self, table, uuid, data)
477         return row
478
479     def __error(self):
480         self._session.force_reconnect()
481
482     def __txn_abort_all(self):
483         while self._outstanding_txns:
484             txn = self._outstanding_txns.popitem()[1]
485             txn._status = Transaction.TRY_AGAIN
486
487     def __txn_process_reply(self, msg):
488         txn = self._outstanding_txns.pop(msg.id, None)
489         if txn:
490             txn._process_reply(msg)
491
492
493 def _uuid_to_row(atom, base):
494     if base.ref_table:
495         return base.ref_table.rows.get(atom)
496     else:
497         return atom
498
499
500 def _row_to_uuid(value):
501     if isinstance(value, Row):
502         return value.uuid
503     else:
504         return value
505
506
507 class Row(object):
508     """A row within an IDL.
509
510     The client may access the following attributes directly:
511
512     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
513
514     - An attribute for each column in the Row's table, named for the column,
515       whose values are as returned by Datum.to_python() for the column's type.
516
517       If some error occurs (e.g. the database server's idea of the column is
518       different from the IDL's idea), then the attribute values is the
519       "default" value return by Datum.default() for the column's type.  (It is
520       important to know this because the default value may violate constraints
521       for the column's type, e.g. the default integer value is 0 even if column
522       contraints require the column's value to be positive.)
523
524       When a transaction is active, column attributes may also be assigned new
525       values.  Committing the transaction will then cause the new value to be
526       stored into the database.
527
528       *NOTE*: In the current implementation, the value of a column is a *copy*
529       of the value in the database.  This means that modifying its value
530       directly will have no useful effect.  For example, the following:
531         row.mycolumn["a"] = "b"              # don't do this
532       will not change anything in the database, even after commit.  To modify
533       the column, instead assign the modified column value back to the column:
534         d = row.mycolumn
535         d["a"] = "b"
536         row.mycolumn = d
537 """
538     def __init__(self, idl, table, uuid, data):
539         # All of the explicit references to self.__dict__ below are required
540         # to set real attributes with invoking self.__getattr__().
541         self.__dict__["uuid"] = uuid
542
543         self.__dict__["_idl"] = idl
544         self.__dict__["_table"] = table
545
546         # _data is the committed data.  It takes the following values:
547         #
548         #   - A dictionary that maps every column name to a Datum, if the row
549         #     exists in the committed form of the database.
550         #
551         #   - None, if this row is newly inserted within the active transaction
552         #     and thus has no committed form.
553         self.__dict__["_data"] = data
554
555         # _changes describes changes to this row within the active transaction.
556         # It takes the following values:
557         #
558         #   - {}, the empty dictionary, if no transaction is active or if the
559         #     row has yet not been changed within this transaction.
560         #
561         #   - A dictionary that maps a column name to its new Datum, if an
562         #     active transaction changes those columns' values.
563         #
564         #   - A dictionary that maps every column name to a Datum, if the row
565         #     is newly inserted within the active transaction.
566         #
567         #   - None, if this transaction deletes this row.
568         self.__dict__["_changes"] = {}
569
570         # A dictionary whose keys are the names of columns that must be
571         # verified as prerequisites when the transaction commits.  The values
572         # in the dictionary are all None.
573         self.__dict__["_prereqs"] = {}
574
575     def __getattr__(self, column_name):
576         assert self._changes is not None
577
578         datum = self._changes.get(column_name)
579         if datum is None:
580             if self._data is None:
581                 raise AttributeError("%s instance has no attribute '%s'" %
582                                      (self.__class__.__name__, column_name))
583             if column_name in self._data:
584                 datum = self._data[column_name]
585             else:
586                 raise AttributeError("%s instance has no attribute '%s'" %
587                                      (self.__class__.__name__, column_name))
588
589         return datum.to_python(_uuid_to_row)
590
591     def __setattr__(self, column_name, value):
592         assert self._changes is not None
593         assert self._idl.txn
594
595         if ((self._table.name in self._idl.readonly) and
596             (column_name in self._idl.readonly[self._table.name])):
597             vlog.warn("attempting to write to readonly column %s"
598                       % column_name)
599             return
600
601         column = self._table.columns[column_name]
602         try:
603             datum = ovs.db.data.Datum.from_python(column.type, value,
604                                                   _row_to_uuid)
605         except error.Error as e:
606             # XXX rate-limit
607             vlog.err("attempting to write bad value to column %s (%s)"
608                      % (column_name, e))
609             return
610         self._idl.txn._write(self, column, datum)
611
612     @classmethod
613     def from_json(cls, idl, table, uuid, row_json):
614         data = {}
615         for column_name, datum_json in six.iteritems(row_json):
616             column = table.columns.get(column_name)
617             if not column:
618                 # XXX rate-limit
619                 vlog.warn("unknown column %s in table %s"
620                           % (column_name, table.name))
621                 continue
622             try:
623                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
624             except error.Error as e:
625                 # XXX rate-limit
626                 vlog.warn("error parsing column %s in table %s: %s"
627                           % (column_name, table.name, e))
628                 continue
629             data[column_name] = datum
630         return cls(idl, table, uuid, data)
631
632     def verify(self, column_name):
633         """Causes the original contents of column 'column_name' in this row to
634         be verified as a prerequisite to completing the transaction.  That is,
635         if 'column_name' changed in this row (or if this row was deleted)
636         between the time that the IDL originally read its contents and the time
637         that the transaction commits, then the transaction aborts and
638         Transaction.commit() returns Transaction.TRY_AGAIN.
639
640         The intention is that, to ensure that no transaction commits based on
641         dirty reads, an application should call Row.verify() on each data item
642         read as part of a read-modify-write operation.
643
644         In some cases Row.verify() reduces to a no-op, because the current
645         value of the column is already known:
646
647           - If this row is a row created by the current transaction (returned
648             by Transaction.insert()).
649
650           - If the column has already been modified within the current
651             transaction.
652
653         Because of the latter property, always call Row.verify() *before*
654         modifying the column, for a given read-modify-write.
655
656         A transaction must be in progress."""
657         assert self._idl.txn
658         assert self._changes is not None
659         if not self._data or column_name in self._changes:
660             return
661
662         self._prereqs[column_name] = None
663
664     def delete(self):
665         """Deletes this row from its table.
666
667         A transaction must be in progress."""
668         assert self._idl.txn
669         assert self._changes is not None
670         if self._data is None:
671             del self._idl.txn._txn_rows[self.uuid]
672         else:
673             self._idl.txn._txn_rows[self.uuid] = self
674         self.__dict__["_changes"] = None
675         del self._table.rows[self.uuid]
676
677     def fetch(self, column_name):
678         self._idl.txn._fetch(self, column_name)
679
680     def increment(self, column_name):
681         """Causes the transaction, when committed, to increment the value of
682         'column_name' within this row by 1.  'column_name' must have an integer
683         type.  After the transaction commits successfully, the client may
684         retrieve the final (incremented) value of 'column_name' with
685         Transaction.get_increment_new_value().
686
687         The client could accomplish something similar by reading and writing
688         and verify()ing columns.  However, increment() will never (by itself)
689         cause a transaction to fail because of a verify error.
690
691         The intended use is for incrementing the "next_cfg" column in
692         the Open_vSwitch table."""
693         self._idl.txn._increment(self, column_name)
694
695
696 def _uuid_name_from_uuid(uuid):
697     return "row%s" % str(uuid).replace("-", "_")
698
699
700 def _where_uuid_equals(uuid):
701     return [["_uuid", "==", ["uuid", str(uuid)]]]
702
703
704 class _InsertedRow(object):
705     def __init__(self, op_index):
706         self.op_index = op_index
707         self.real = None
708
709
710 class Transaction(object):
711     """A transaction may modify the contents of a database by modifying the
712     values of columns, deleting rows, inserting rows, or adding checks that
713     columns in the database have not changed ("verify" operations), through
714     Row methods.
715
716     Reading and writing columns and inserting and deleting rows are all
717     straightforward.  The reasons to verify columns are less obvious.
718     Verification is the key to maintaining transactional integrity.  Because
719     OVSDB handles multiple clients, it can happen that between the time that
720     OVSDB client A reads a column and writes a new value, OVSDB client B has
721     written that column.  Client A's write should not ordinarily overwrite
722     client B's, especially if the column in question is a "map" column that
723     contains several more or less independent data items.  If client A adds a
724     "verify" operation before it writes the column, then the transaction fails
725     in case client B modifies it first.  Client A will then see the new value
726     of the column and compose a new transaction based on the new contents
727     written by client B.
728
729     When a transaction is complete, which must be before the next call to
730     Idl.run(), call Transaction.commit() or Transaction.abort().
731
732     The life-cycle of a transaction looks like this:
733
734     1. Create the transaction and record the initial sequence number:
735
736         seqno = idl.change_seqno(idl)
737         txn = Transaction(idl)
738
739     2. Modify the database with Row and Transaction methods.
740
741     3. Commit the transaction by calling Transaction.commit().  The first call
742        to this function probably returns Transaction.INCOMPLETE.  The client
743        must keep calling again along as this remains true, calling Idl.run() in
744        between to let the IDL do protocol processing.  (If the client doesn't
745        have anything else to do in the meantime, it can use
746        Transaction.commit_block() to avoid having to loop itself.)
747
748     4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
749        to change from the saved 'seqno' (it's possible that it's already
750        changed, in which case the client should not wait at all), then start
751        over from step 1.  Only a call to Idl.run() will change the return value
752        of Idl.change_seqno.  (Transaction.commit_block() calls Idl.run().)"""
753
754     # Status values that Transaction.commit() can return.
755
756     # Not yet committed or aborted.
757     UNCOMMITTED = "uncommitted"
758     # Transaction didn't include any changes.
759     UNCHANGED = "unchanged"
760     # Commit in progress, please wait.
761     INCOMPLETE = "incomplete"
762     # ovsdb_idl_txn_abort() called.
763     ABORTED = "aborted"
764     # Commit successful.
765     SUCCESS = "success"
766     # Commit failed because a "verify" operation
767     # reported an inconsistency, due to a network
768     # problem, or other transient failure.  Wait
769     # for a change, then try again.
770     TRY_AGAIN = "try again"
771     # Server hasn't given us the lock yet.
772     NOT_LOCKED = "not locked"
773     # Commit failed due to a hard error.
774     ERROR = "error"
775
776     @staticmethod
777     def status_to_string(status):
778         """Converts one of the status values that Transaction.commit() can
779         return into a human-readable string.
780
781         (The status values are in fact such strings already, so
782         there's nothing to do.)"""
783         return status
784
785     def __init__(self, idl):
786         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
787         A given Idl may only have a single active transaction at a time.
788
789         A Transaction may modify the contents of a database by assigning new
790         values to columns (attributes of Row), deleting rows (with
791         Row.delete()), or inserting rows (with Transaction.insert()).  It may
792         also check that columns in the database have not changed with
793         Row.verify().
794
795         When a transaction is complete (which must be before the next call to
796         Idl.run()), call Transaction.commit() or Transaction.abort()."""
797         assert idl.txn is None
798
799         idl.txn = self
800         self._request_id = None
801         self.idl = idl
802         self.dry_run = False
803         self._txn_rows = {}
804         self._status = Transaction.UNCOMMITTED
805         self._error = None
806         self._comments = []
807
808         self._inc_row = None
809         self._inc_column = None
810
811         self._fetch_requests = []
812
813         self._inserted_rows = {}  # Map from UUID to _InsertedRow
814
815     def add_comment(self, comment):
816         """Appends 'comment' to the comments that will be passed to the OVSDB
817         server when this transaction is committed.  (The comment will be
818         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
819         relatively human-readable form.)"""
820         self._comments.append(comment)
821
822     def wait(self, poller):
823         """Causes poll_block() to wake up if this transaction has completed
824         committing."""
825         if self._status not in (Transaction.UNCOMMITTED,
826                                 Transaction.INCOMPLETE):
827             poller.immediate_wake()
828
829     def _substitute_uuids(self, json):
830         if isinstance(json, (list, tuple)):
831             if (len(json) == 2
832                 and json[0] == 'uuid'
833                 and ovs.ovsuuid.is_valid_string(json[1])):
834                 uuid = ovs.ovsuuid.from_string(json[1])
835                 row = self._txn_rows.get(uuid, None)
836                 if row and row._data is None:
837                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
838             else:
839                 return [self._substitute_uuids(elem) for elem in json]
840         return json
841
842     def __disassemble(self):
843         self.idl.txn = None
844
845         for row in six.itervalues(self._txn_rows):
846             if row._changes is None:
847                 row._table.rows[row.uuid] = row
848             elif row._data is None:
849                 del row._table.rows[row.uuid]
850             row.__dict__["_changes"] = {}
851             row.__dict__["_prereqs"] = {}
852         self._txn_rows = {}
853
854     def commit(self):
855         """Attempts to commit 'txn'.  Returns the status of the commit
856         operation, one of the following constants:
857
858           Transaction.INCOMPLETE:
859
860               The transaction is in progress, but not yet complete.  The caller
861               should call again later, after calling Idl.run() to let the
862               IDL do OVSDB protocol processing.
863
864           Transaction.UNCHANGED:
865
866               The transaction is complete.  (It didn't actually change the
867               database, so the IDL didn't send any request to the database
868               server.)
869
870           Transaction.ABORTED:
871
872               The caller previously called Transaction.abort().
873
874           Transaction.SUCCESS:
875
876               The transaction was successful.  The update made by the
877               transaction (and possibly other changes made by other database
878               clients) should already be visible in the IDL.
879
880           Transaction.TRY_AGAIN:
881
882               The transaction failed for some transient reason, e.g. because a
883               "verify" operation reported an inconsistency or due to a network
884               problem.  The caller should wait for a change to the database,
885               then compose a new transaction, and commit the new transaction.
886
887               Use Idl.change_seqno to wait for a change in the database.  It is
888               important to use its value *before* the initial call to
889               Transaction.commit() as the baseline for this purpose, because
890               the change that one should wait for can happen after the initial
891               call but before the call that returns Transaction.TRY_AGAIN, and
892               using some other baseline value in that situation could cause an
893               indefinite wait if the database rarely changes.
894
895           Transaction.NOT_LOCKED:
896
897               The transaction failed because the IDL has been configured to
898               require a database lock (with Idl.set_lock()) but didn't
899               get it yet or has already lost it.
900
901         Committing a transaction rolls back all of the changes that it made to
902         the IDL's copy of the database.  If the transaction commits
903         successfully, then the database server will send an update and, thus,
904         the IDL will be updated with the committed changes."""
905         # The status can only change if we're the active transaction.
906         # (Otherwise, our status will change only in Idl.run().)
907         if self != self.idl.txn:
908             return self._status
909
910         # If we need a lock but don't have it, give up quickly.
911         if self.idl.lock_name and not self.idl.has_lock:
912             self._status = Transaction.NOT_LOCKED
913             self.__disassemble()
914             return self._status
915
916         operations = [self.idl._db.name]
917
918         # Assert that we have the required lock (avoiding a race).
919         if self.idl.lock_name:
920             operations.append({"op": "assert",
921                                "lock": self.idl.lock_name})
922
923         # Add prerequisites and declarations of new rows.
924         for row in six.itervalues(self._txn_rows):
925             if row._prereqs:
926                 rows = {}
927                 columns = []
928                 for column_name in row._prereqs:
929                     columns.append(column_name)
930                     rows[column_name] = row._data[column_name].to_json()
931                 operations.append({"op": "wait",
932                                    "table": row._table.name,
933                                    "timeout": 0,
934                                    "where": _where_uuid_equals(row.uuid),
935                                    "until": "==",
936                                    "columns": columns,
937                                    "rows": [rows]})
938
939         # Add updates.
940         any_updates = False
941         for row in six.itervalues(self._txn_rows):
942             if row._changes is None:
943                 if row._table.is_root:
944                     operations.append({"op": "delete",
945                                        "table": row._table.name,
946                                        "where": _where_uuid_equals(row.uuid)})
947                     any_updates = True
948                 else:
949                     # Let ovsdb-server decide whether to really delete it.
950                     pass
951             elif row._changes:
952                 op = {"table": row._table.name}
953                 if row._data is None:
954                     op["op"] = "insert"
955                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
956                     any_updates = True
957
958                     op_index = len(operations) - 1
959                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
960                 else:
961                     op["op"] = "update"
962                     op["where"] = _where_uuid_equals(row.uuid)
963
964                 row_json = {}
965                 op["row"] = row_json
966
967                 for column_name, datum in six.iteritems(row._changes):
968                     if row._data is not None or not datum.is_default():
969                         row_json[column_name] = (
970                                 self._substitute_uuids(datum.to_json()))
971
972                         # If anything really changed, consider it an update.
973                         # We can't suppress not-really-changed values earlier
974                         # or transactions would become nonatomic (see the big
975                         # comment inside Transaction._write()).
976                         if (not any_updates and row._data is not None and
977                             row._data[column_name] != datum):
978                             any_updates = True
979
980                 if row._data is None or row_json:
981                     operations.append(op)
982
983         if self._fetch_requests:
984             for fetch in self._fetch_requests:
985                 fetch["index"] = len(operations) - 1
986                 operations.append({"op": "select",
987                                    "table": fetch["row"]._table.name,
988                                    "where": self._substitute_uuids(
989                                        _where_uuid_equals(fetch["row"].uuid)),
990                                    "columns": [fetch["column_name"]]})
991             any_updates = True
992
993         # Add increment.
994         if self._inc_row and any_updates:
995             self._inc_index = len(operations) - 1
996
997             operations.append({"op": "mutate",
998                                "table": self._inc_row._table.name,
999                                "where": self._substitute_uuids(
1000                                    _where_uuid_equals(self._inc_row.uuid)),
1001                                "mutations": [[self._inc_column, "+=", 1]]})
1002             operations.append({"op": "select",
1003                                "table": self._inc_row._table.name,
1004                                "where": self._substitute_uuids(
1005                                    _where_uuid_equals(self._inc_row.uuid)),
1006                                "columns": [self._inc_column]})
1007
1008         # Add comment.
1009         if self._comments:
1010             operations.append({"op": "comment",
1011                                "comment": "\n".join(self._comments)})
1012
1013         # Dry run?
1014         if self.dry_run:
1015             operations.append({"op": "abort"})
1016
1017         if not any_updates:
1018             self._status = Transaction.UNCHANGED
1019         else:
1020             msg = ovs.jsonrpc.Message.create_request("transact", operations)
1021             self._request_id = msg.id
1022             if not self.idl._session.send(msg):
1023                 self.idl._outstanding_txns[self._request_id] = self
1024                 self._status = Transaction.INCOMPLETE
1025             else:
1026                 self._status = Transaction.TRY_AGAIN
1027
1028         self.__disassemble()
1029         return self._status
1030
1031     def commit_block(self):
1032         """Attempts to commit this transaction, blocking until the commit
1033         either succeeds or fails.  Returns the final commit status, which may
1034         be any Transaction.* value other than Transaction.INCOMPLETE.
1035
1036         This function calls Idl.run() on this transaction'ss IDL, so it may
1037         cause Idl.change_seqno to change."""
1038         while True:
1039             status = self.commit()
1040             if status != Transaction.INCOMPLETE:
1041                 return status
1042
1043             self.idl.run()
1044
1045             poller = ovs.poller.Poller()
1046             self.idl.wait(poller)
1047             self.wait(poller)
1048             poller.block()
1049
1050     def get_increment_new_value(self):
1051         """Returns the final (incremented) value of the column in this
1052         transaction that was set to be incremented by Row.increment.  This
1053         transaction must have committed successfully."""
1054         assert self._status == Transaction.SUCCESS
1055         return self._inc_new_value
1056
1057     def abort(self):
1058         """Aborts this transaction.  If Transaction.commit() has already been
1059         called then the transaction might get committed anyhow."""
1060         self.__disassemble()
1061         if self._status in (Transaction.UNCOMMITTED,
1062                             Transaction.INCOMPLETE):
1063             self._status = Transaction.ABORTED
1064
1065     def get_error(self):
1066         """Returns a string representing this transaction's current status,
1067         suitable for use in log messages."""
1068         if self._status != Transaction.ERROR:
1069             return Transaction.status_to_string(self._status)
1070         elif self._error:
1071             return self._error
1072         else:
1073             return "no error details available"
1074
1075     def __set_error_json(self, json):
1076         if self._error is None:
1077             self._error = ovs.json.to_string(json)
1078
1079     def get_insert_uuid(self, uuid):
1080         """Finds and returns the permanent UUID that the database assigned to a
1081         newly inserted row, given the UUID that Transaction.insert() assigned
1082         locally to that row.
1083
1084         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1085         or if it was assigned by that function and then deleted by Row.delete()
1086         within the same transaction.  (Rows that are inserted and then deleted
1087         within a single transaction are never sent to the database server, so
1088         it never assigns them a permanent UUID.)
1089
1090         This transaction must have completed successfully."""
1091         assert self._status in (Transaction.SUCCESS,
1092                                 Transaction.UNCHANGED)
1093         inserted_row = self._inserted_rows.get(uuid)
1094         if inserted_row:
1095             return inserted_row.real
1096         return None
1097
1098     def _increment(self, row, column):
1099         assert not self._inc_row
1100         self._inc_row = row
1101         self._inc_column = column
1102
1103     def _fetch(self, row, column_name):
1104         self._fetch_requests.append({"row": row, "column_name": column_name})
1105
1106     def _write(self, row, column, datum):
1107         assert row._changes is not None
1108
1109         txn = row._idl.txn
1110
1111         # If this is a write-only column and the datum being written is the
1112         # same as the one already there, just skip the update entirely.  This
1113         # is worth optimizing because we have a lot of columns that get
1114         # periodically refreshed into the database but don't actually change
1115         # that often.
1116         #
1117         # We don't do this for read/write columns because that would break
1118         # atomicity of transactions--some other client might have written a
1119         # different value in that column since we read it.  (But if a whole
1120         # transaction only does writes of existing values, without making any
1121         # real changes, we will drop the whole transaction later in
1122         # ovsdb_idl_txn_commit().)
1123         if (not column.alert and row._data and
1124                 row._data.get(column.name) == datum):
1125             new_value = row._changes.get(column.name)
1126             if new_value is None or new_value == datum:
1127                 return
1128
1129         txn._txn_rows[row.uuid] = row
1130         row._changes[column.name] = datum.copy()
1131
1132     def insert(self, table, new_uuid=None):
1133         """Inserts and returns a new row in 'table', which must be one of the
1134         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1135
1136         The new row is assigned a provisional UUID.  If 'uuid' is None then one
1137         is randomly generated; otherwise 'uuid' should specify a randomly
1138         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
1139         different UUID when 'txn' is committed, but the IDL will replace any
1140         uses of the provisional UUID in the data to be to be committed by the
1141         UUID assigned by ovsdb-server."""
1142         assert self._status == Transaction.UNCOMMITTED
1143         if new_uuid is None:
1144             new_uuid = uuid.uuid4()
1145         row = Row(self.idl, table, new_uuid, None)
1146         table.rows[row.uuid] = row
1147         self._txn_rows[row.uuid] = row
1148         return row
1149
1150     def _process_reply(self, msg):
1151         if msg.type == ovs.jsonrpc.Message.T_ERROR:
1152             self._status = Transaction.ERROR
1153         elif not isinstance(msg.result, (list, tuple)):
1154             # XXX rate-limit
1155             vlog.warn('reply to "transact" is not JSON array')
1156         else:
1157             hard_errors = False
1158             soft_errors = False
1159             lock_errors = False
1160
1161             ops = msg.result
1162             for op in ops:
1163                 if op is None:
1164                     # This isn't an error in itself but indicates that some
1165                     # prior operation failed, so make sure that we know about
1166                     # it.
1167                     soft_errors = True
1168                 elif isinstance(op, dict):
1169                     error = op.get("error")
1170                     if error is not None:
1171                         if error == "timed out":
1172                             soft_errors = True
1173                         elif error == "not owner":
1174                             lock_errors = True
1175                         elif error == "aborted":
1176                             pass
1177                         else:
1178                             hard_errors = True
1179                             self.__set_error_json(op)
1180                 else:
1181                     hard_errors = True
1182                     self.__set_error_json(op)
1183                     # XXX rate-limit
1184                     vlog.warn("operation reply is not JSON null or object")
1185
1186             if not soft_errors and not hard_errors and not lock_errors:
1187                 if self._inc_row and not self.__process_inc_reply(ops):
1188                     hard_errors = True
1189                 if self._fetch_requests:
1190                     if self.__process_fetch_reply(ops):
1191                         self.idl.change_seqno += 1
1192                     else:
1193                         hard_errors = True
1194
1195                 for insert in six.itervalues(self._inserted_rows):
1196                     if not self.__process_insert_reply(insert, ops):
1197                         hard_errors = True
1198
1199             if hard_errors:
1200                 self._status = Transaction.ERROR
1201             elif lock_errors:
1202                 self._status = Transaction.NOT_LOCKED
1203             elif soft_errors:
1204                 self._status = Transaction.TRY_AGAIN
1205             else:
1206                 self._status = Transaction.SUCCESS
1207
1208     @staticmethod
1209     def __check_json_type(json, types, name):
1210         if not json:
1211             # XXX rate-limit
1212             vlog.warn("%s is missing" % name)
1213             return False
1214         elif not isinstance(json, tuple(types)):
1215             # XXX rate-limit
1216             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1217             return False
1218         else:
1219             return True
1220
1221     def __process_fetch_reply(self, ops):
1222         update = False
1223         for fetch_request in self._fetch_requests:
1224             row = fetch_request["row"]
1225             column_name = fetch_request["column_name"]
1226             index = fetch_request["index"]
1227             table = row._table
1228
1229             select = ops[index]
1230             fetched_rows = select.get("rows")
1231             if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1232                                                  '"select" reply "rows"'):
1233                 return False
1234             if len(fetched_rows) != 1:
1235                 # XXX rate-limit
1236                 vlog.warn('"select" reply "rows" has %d elements '
1237                           'instead of 1' % len(fetched_rows))
1238                 continue
1239             fetched_row = fetched_rows[0]
1240             if not Transaction.__check_json_type(fetched_row, (dict,),
1241                                                  '"select" reply row'):
1242                 continue
1243
1244             column = table.columns.get(column_name)
1245             datum_json = fetched_row.get(column_name)
1246             datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1247
1248             row._data[column_name] = datum
1249             update = True
1250
1251         return update
1252
1253     def __process_inc_reply(self, ops):
1254         if self._inc_index + 2 > len(ops):
1255             # XXX rate-limit
1256             vlog.warn("reply does not contain enough operations for "
1257                       "increment (has %d, needs %d)" %
1258                       (len(ops), self._inc_index + 2))
1259
1260         # We know that this is a JSON object because the loop in
1261         # __process_reply() already checked.
1262         mutate = ops[self._inc_index]
1263         count = mutate.get("count")
1264         if not Transaction.__check_json_type(count, six.integer_types,
1265                                              '"mutate" reply "count"'):
1266             return False
1267         if count != 1:
1268             # XXX rate-limit
1269             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1270             return False
1271
1272         select = ops[self._inc_index + 1]
1273         rows = select.get("rows")
1274         if not Transaction.__check_json_type(rows, (list, tuple),
1275                                              '"select" reply "rows"'):
1276             return False
1277         if len(rows) != 1:
1278             # XXX rate-limit
1279             vlog.warn('"select" reply "rows" has %d elements '
1280                       'instead of 1' % len(rows))
1281             return False
1282         row = rows[0]
1283         if not Transaction.__check_json_type(row, (dict,),
1284                                              '"select" reply row'):
1285             return False
1286         column = row.get(self._inc_column)
1287         if not Transaction.__check_json_type(column, six.integer_types,
1288                                              '"select" reply inc column'):
1289             return False
1290         self._inc_new_value = column
1291         return True
1292
1293     def __process_insert_reply(self, insert, ops):
1294         if insert.op_index >= len(ops):
1295             # XXX rate-limit
1296             vlog.warn("reply does not contain enough operations "
1297                       "for insert (has %d, needs %d)"
1298                       % (len(ops), insert.op_index))
1299             return False
1300
1301         # We know that this is a JSON object because the loop in
1302         # __process_reply() already checked.
1303         reply = ops[insert.op_index]
1304         json_uuid = reply.get("uuid")
1305         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1306                                              '"insert" reply "uuid"'):
1307             return False
1308
1309         try:
1310             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1311         except error.Error:
1312             # XXX rate-limit
1313             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1314             return False
1315
1316         insert.real = uuid_
1317         return True
1318
1319
1320 class SchemaHelper(object):
1321     """IDL Schema helper.
1322
1323     This class encapsulates the logic required to generate schemas suitable
1324     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1325     they are interested in using register_columns().  When finished, the
1326     get_idl_schema() function may be called.
1327
1328     The location on disk of the schema used may be found in the
1329     'schema_location' variable."""
1330
1331     def __init__(self, location=None, schema_json=None):
1332         """Creates a new Schema object.
1333
1334         'location' file path to ovs schema. None means default location
1335         'schema_json' schema in json preresentation in memory
1336         """
1337
1338         if location and schema_json:
1339             raise ValueError("both location and schema_json can't be "
1340                              "specified. it's ambiguous.")
1341         if schema_json is None:
1342             if location is None:
1343                 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1344             schema_json = ovs.json.from_file(location)
1345
1346         self.schema_json = schema_json
1347         self._tables = {}
1348         self._readonly = {}
1349         self._all = False
1350
1351     def register_columns(self, table, columns, readonly=[]):
1352         """Registers interest in the given 'columns' of 'table'.  Future calls
1353         to get_idl_schema() will include 'table':column for each column in
1354         'columns'. This function automatically avoids adding duplicate entries
1355         to the schema.
1356         A subset of 'columns' can be specified as 'readonly'. The readonly
1357         columns are not replicated but can be fetched on-demand by the user
1358         with Row.fetch().
1359
1360         'table' must be a string.
1361         'columns' must be a list of strings.
1362         'readonly' must be a list of strings.
1363         """
1364
1365         assert isinstance(table, six.string_types)
1366         assert isinstance(columns, list)
1367
1368         columns = set(columns) | self._tables.get(table, set())
1369         self._tables[table] = columns
1370         self._readonly[table] = readonly
1371
1372     def register_table(self, table):
1373         """Registers interest in the given all columns of 'table'. Future calls
1374         to get_idl_schema() will include all columns of 'table'.
1375
1376         'table' must be a string
1377         """
1378         assert isinstance(table, six.string_types)
1379         self._tables[table] = set()  # empty set means all columns in the table
1380
1381     def register_all(self):
1382         """Registers interest in every column of every table."""
1383         self._all = True
1384
1385     def get_idl_schema(self):
1386         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1387         object based on columns registered using the register_columns()
1388         function."""
1389
1390         schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1391         self.schema_json = None
1392
1393         if not self._all:
1394             schema_tables = {}
1395             for table, columns in six.iteritems(self._tables):
1396                 schema_tables[table] = (
1397                     self._keep_table_columns(schema, table, columns))
1398
1399             schema.tables = schema_tables
1400         schema.readonly = self._readonly
1401         return schema
1402
1403     def _keep_table_columns(self, schema, table_name, columns):
1404         assert table_name in schema.tables
1405         table = schema.tables[table_name]
1406
1407         if not columns:
1408             # empty set means all columns in the table
1409             return table
1410
1411         new_columns = {}
1412         for column_name in columns:
1413             assert isinstance(column_name, six.string_types)
1414             assert column_name in table.columns
1415
1416             new_columns[column_name] = table.columns[column_name]
1417
1418         table.columns = new_columns
1419         return table