python: Start fixing some Python 3 issues.
[cascardo/ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import uuid
16
17 import ovs.jsonrpc
18 import ovs.db.parser
19 import ovs.db.schema
20 from ovs.db import error
21 import ovs.ovsuuid
22 import ovs.poller
23 import ovs.vlog
24
25 vlog = ovs.vlog.Vlog("idl")
26
27 __pychecker__ = 'no-classattr no-objattrs'
28
29 ROW_CREATE = "create"
30 ROW_UPDATE = "update"
31 ROW_DELETE = "delete"
32
33
34 class Idl(object):
35     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
36
37     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
38     requests to an OVSDB database server and parses the responses, converting
39     raw JSON into data structures that are easier for clients to digest.
40
41     The IDL also assists with issuing database transactions.  The client
42     creates a transaction, manipulates the IDL data structures, and commits or
43     aborts the transaction.  The IDL then composes and issues the necessary
44     JSON-RPC requests and reports to the client whether the transaction
45     completed successfully.
46
47     The client is allowed to access the following attributes directly, in a
48     read-only fashion:
49
50     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
51       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
52       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
53       to a Row object.
54
55       The client may directly read and write the Row objects referenced by the
56       'rows' map values.  Refer to Row for more details.
57
58     - 'change_seqno': A number that represents the IDL's state.  When the IDL
59       is updated (by Idl.run()), its value changes.  The sequence number can
60       occasionally change even if the database does not.  This happens if the
61       connection to the database drops and reconnects, which causes the
62       database contents to be reloaded even if they didn't change.  (It could
63       also happen if the database server sends out a "change" that reflects
64       what the IDL already thought was in the database.  The database server is
65       not supposed to do that, but bugs could in theory cause it to do so.)
66
67     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
68       if no lock is configured.
69
70     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
71       lock, and False otherwise.
72
73       Locking and unlocking happens asynchronously from the database client's
74       point of view, so the information is only useful for optimization
75       (e.g. if the client doesn't have the lock then there's no point in trying
76       to write to the database).
77
78     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
79       the database server has indicated that some other client already owns the
80       requested lock, and False otherwise.
81
82     - 'txn': The ovs.db.idl.Transaction object for the database transaction
83       currently being constructed, if there is one, or None otherwise.
84 """
85
86     def __init__(self, remote, schema):
87         """Creates and returns a connection to the database named 'db_name' on
88         'remote', which should be in a form acceptable to
89         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
90         replica of the remote database.
91
92         'schema' should be the schema for the remote database.  The caller may
93         have cut it down by removing tables or columns that are not of
94         interest.  The IDL will only replicate the tables and columns that
95         remain.  The caller may also add a attribute named 'alert' to selected
96         remaining columns, setting its value to False; if so, then changes to
97         those columns will not be considered changes to the database for the
98         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
99         useful for columns that the IDL's client will write but not read.
100
101         As a convenience to users, 'schema' may also be an instance of the
102         SchemaHelper class.
103
104         The IDL uses and modifies 'schema' directly."""
105
106         assert isinstance(schema, SchemaHelper)
107         schema = schema.get_idl_schema()
108
109         self.tables = schema.tables
110         self.readonly = schema.readonly
111         self._db = schema
112         self._session = ovs.jsonrpc.Session.open(remote)
113         self._monitor_request_id = None
114         self._last_seqno = None
115         self.change_seqno = 0
116
117         # Database locking.
118         self.lock_name = None          # Name of lock we need, None if none.
119         self.has_lock = False          # Has db server said we have the lock?
120         self.is_lock_contended = False  # Has db server said we can't get lock?
121         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
122
123         # Transaction support.
124         self.txn = None
125         self._outstanding_txns = {}
126
127         for table in schema.tables.itervalues():
128             for column in table.columns.itervalues():
129                 if not hasattr(column, 'alert'):
130                     column.alert = True
131             table.need_table = False
132             table.rows = {}
133             table.idl = self
134
135     def close(self):
136         """Closes the connection to the database.  The IDL will no longer
137         update."""
138         self._session.close()
139
140     def run(self):
141         """Processes a batch of messages from the database server.  Returns
142         True if the database as seen through the IDL changed, False if it did
143         not change.  The initial fetch of the entire contents of the remote
144         database is considered to be one kind of change.  If the IDL has been
145         configured to acquire a database lock (with Idl.set_lock()), then
146         successfully acquiring the lock is also considered to be a change.
147
148         This function can return occasional false positives, that is, report
149         that the database changed even though it didn't.  This happens if the
150         connection to the database drops and reconnects, which causes the
151         database contents to be reloaded even if they didn't change.  (It could
152         also happen if the database server sends out a "change" that reflects
153         what we already thought was in the database, but the database server is
154         not supposed to do that.)
155
156         As an alternative to checking the return value, the client may check
157         for changes in self.change_seqno."""
158         assert not self.txn
159         initial_change_seqno = self.change_seqno
160         self._session.run()
161         i = 0
162         while i < 50:
163             i += 1
164             if not self._session.is_connected():
165                 break
166
167             seqno = self._session.get_seqno()
168             if seqno != self._last_seqno:
169                 self._last_seqno = seqno
170                 self.__txn_abort_all()
171                 self.__send_monitor_request()
172                 if self.lock_name:
173                     self.__send_lock_request()
174                 break
175
176             msg = self._session.recv()
177             if msg is None:
178                 break
179             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
180                 and msg.method == "update"
181                 and len(msg.params) == 2
182                 and msg.params[0] is None):
183                 # Database contents changed.
184                 self.__parse_update(msg.params[1])
185             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
186                   and self._monitor_request_id is not None
187                   and self._monitor_request_id == msg.id):
188                 # Reply to our "monitor" request.
189                 try:
190                     self.change_seqno += 1
191                     self._monitor_request_id = None
192                     self.__clear()
193                     self.__parse_update(msg.result)
194                 except error.Error as e:
195                     vlog.err("%s: parse error in received schema: %s"
196                               % (self._session.get_name(), e))
197                     self.__error()
198             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
199                   and self._lock_request_id is not None
200                   and self._lock_request_id == msg.id):
201                 # Reply to our "lock" request.
202                 self.__parse_lock_reply(msg.result)
203             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
204                   and msg.method == "locked"):
205                 # We got our lock.
206                 self.__parse_lock_notify(msg.params, True)
207             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
208                   and msg.method == "stolen"):
209                 # Someone else stole our lock.
210                 self.__parse_lock_notify(msg.params, False)
211             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
212                 # Reply to our echo request.  Ignore it.
213                 pass
214             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
215                                ovs.jsonrpc.Message.T_REPLY)
216                   and self.__txn_process_reply(msg)):
217                 # __txn_process_reply() did everything needed.
218                 pass
219             else:
220                 # This can happen if a transaction is destroyed before we
221                 # receive the reply, so keep the log level low.
222                 vlog.dbg("%s: received unexpected %s message"
223                          % (self._session.get_name(),
224                              ovs.jsonrpc.Message.type_to_string(msg.type)))
225
226         return initial_change_seqno != self.change_seqno
227
228     def wait(self, poller):
229         """Arranges for poller.block() to wake up when self.run() has something
230         to do or when activity occurs on a transaction on 'self'."""
231         self._session.wait(poller)
232         self._session.recv_wait(poller)
233
234     def has_ever_connected(self):
235         """Returns True, if the IDL successfully connected to the remote
236         database and retrieved its contents (even if the connection
237         subsequently dropped and is in the process of reconnecting).  If so,
238         then the IDL contains an atomic snapshot of the database's contents
239         (but it might be arbitrarily old if the connection dropped).
240
241         Returns False if the IDL has never connected or retrieved the
242         database's contents.  If so, the IDL is empty."""
243         return self.change_seqno != 0
244
245     def force_reconnect(self):
246         """Forces the IDL to drop its connection to the database and reconnect.
247         In the meantime, the contents of the IDL will not change."""
248         self._session.force_reconnect()
249
250     def set_lock(self, lock_name):
251         """If 'lock_name' is not None, configures the IDL to obtain the named
252         lock from the database server and to avoid modifying the database when
253         the lock cannot be acquired (that is, when another client has the same
254         lock).
255
256         If 'lock_name' is None, drops the locking requirement and releases the
257         lock."""
258         assert not self.txn
259         assert not self._outstanding_txns
260
261         if self.lock_name and (not lock_name or lock_name != self.lock_name):
262             # Release previous lock.
263             self.__send_unlock_request()
264             self.lock_name = None
265             self.is_lock_contended = False
266
267         if lock_name and not self.lock_name:
268             # Acquire new lock.
269             self.lock_name = lock_name
270             self.__send_lock_request()
271
272     def notify(self, event, row, updates=None):
273         """Hook for implementing create/update/delete notifications
274
275         :param event:   The event that was triggered
276         :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE
277         :param row:     The row as it is after the operation has occured
278         :type row:      Row
279         :param updates: For updates, a Row object with just the changed columns
280         :type updates:  Row
281         """
282
283     def __clear(self):
284         changed = False
285
286         for table in self.tables.itervalues():
287             if table.rows:
288                 changed = True
289                 table.rows = {}
290
291         if changed:
292             self.change_seqno += 1
293
294     def __update_has_lock(self, new_has_lock):
295         if new_has_lock and not self.has_lock:
296             if self._monitor_request_id is None:
297                 self.change_seqno += 1
298             else:
299                 # We're waiting for a monitor reply, so don't signal that the
300                 # database changed.  The monitor reply will increment
301                 # change_seqno anyhow.
302                 pass
303             self.is_lock_contended = False
304         self.has_lock = new_has_lock
305
306     def __do_send_lock_request(self, method):
307         self.__update_has_lock(False)
308         self._lock_request_id = None
309         if self._session.is_connected():
310             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
311             msg_id = msg.id
312             self._session.send(msg)
313         else:
314             msg_id = None
315         return msg_id
316
317     def __send_lock_request(self):
318         self._lock_request_id = self.__do_send_lock_request("lock")
319
320     def __send_unlock_request(self):
321         self.__do_send_lock_request("unlock")
322
323     def __parse_lock_reply(self, result):
324         self._lock_request_id = None
325         got_lock = type(result) == dict and result.get("locked") is True
326         self.__update_has_lock(got_lock)
327         if not got_lock:
328             self.is_lock_contended = True
329
330     def __parse_lock_notify(self, params, new_has_lock):
331         if (self.lock_name is not None
332             and type(params) in (list, tuple)
333             and params
334             and params[0] == self.lock_name):
335             self.__update_has_lock(new_has_lock)
336             if not new_has_lock:
337                 self.is_lock_contended = True
338
339     def __send_monitor_request(self):
340         monitor_requests = {}
341         for table in self.tables.itervalues():
342             columns = []
343             for column in table.columns.keys():
344                 if ((table.name not in self.readonly) or
345                     (table.name in self.readonly) and
346                     (column not in self.readonly[table.name])):
347                     columns.append(column)
348             monitor_requests[table.name] = {"columns": columns}
349         msg = ovs.jsonrpc.Message.create_request(
350             "monitor", [self._db.name, None, monitor_requests])
351         self._monitor_request_id = msg.id
352         self._session.send(msg)
353
354     def __parse_update(self, update):
355         try:
356             self.__do_parse_update(update)
357         except error.Error as e:
358             vlog.err("%s: error parsing update: %s"
359                      % (self._session.get_name(), e))
360
361     def __do_parse_update(self, table_updates):
362         if type(table_updates) != dict:
363             raise error.Error("<table-updates> is not an object",
364                               table_updates)
365
366         for table_name, table_update in table_updates.iteritems():
367             table = self.tables.get(table_name)
368             if not table:
369                 raise error.Error('<table-updates> includes unknown '
370                                   'table "%s"' % table_name)
371
372             if type(table_update) != dict:
373                 raise error.Error('<table-update> for table "%s" is not '
374                                   'an object' % table_name, table_update)
375
376             for uuid_string, row_update in table_update.iteritems():
377                 if not ovs.ovsuuid.is_valid_string(uuid_string):
378                     raise error.Error('<table-update> for table "%s" '
379                                       'contains bad UUID "%s" as member '
380                                       'name' % (table_name, uuid_string),
381                                       table_update)
382                 uuid = ovs.ovsuuid.from_string(uuid_string)
383
384                 if type(row_update) != dict:
385                     raise error.Error('<table-update> for table "%s" '
386                                       'contains <row-update> for %s that '
387                                       'is not an object'
388                                       % (table_name, uuid_string))
389
390                 parser = ovs.db.parser.Parser(row_update, "row-update")
391                 old = parser.get_optional("old", [dict])
392                 new = parser.get_optional("new", [dict])
393                 parser.finish()
394
395                 if not old and not new:
396                     raise error.Error('<row-update> missing "old" and '
397                                       '"new" members', row_update)
398
399                 if self.__process_update(table, uuid, old, new):
400                     self.change_seqno += 1
401
402     def __process_update(self, table, uuid, old, new):
403         """Returns True if a column changed, False otherwise."""
404         row = table.rows.get(uuid)
405         changed = False
406         if not new:
407             # Delete row.
408             if row:
409                 del table.rows[uuid]
410                 changed = True
411                 self.notify(ROW_DELETE, row)
412             else:
413                 # XXX rate-limit
414                 vlog.warn("cannot delete missing row %s from table %s"
415                           % (uuid, table.name))
416         elif not old:
417             # Insert row.
418             if not row:
419                 row = self.__create_row(table, uuid)
420                 changed = True
421             else:
422                 # XXX rate-limit
423                 vlog.warn("cannot add existing row %s to table %s"
424                           % (uuid, table.name))
425             if self.__row_update(table, row, new):
426                 changed = True
427                 self.notify(ROW_CREATE, row)
428         else:
429             op = ROW_UPDATE
430             if not row:
431                 row = self.__create_row(table, uuid)
432                 changed = True
433                 op = ROW_CREATE
434                 # XXX rate-limit
435                 vlog.warn("cannot modify missing row %s in table %s"
436                           % (uuid, table.name))
437             if self.__row_update(table, row, new):
438                 changed = True
439                 self.notify(op, row, Row.from_json(self, table, uuid, old))
440         return changed
441
442     def __row_update(self, table, row, row_json):
443         changed = False
444         for column_name, datum_json in row_json.iteritems():
445             column = table.columns.get(column_name)
446             if not column:
447                 # XXX rate-limit
448                 vlog.warn("unknown column %s updating table %s"
449                           % (column_name, table.name))
450                 continue
451
452             try:
453                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
454             except error.Error as e:
455                 # XXX rate-limit
456                 vlog.warn("error parsing column %s in table %s: %s"
457                           % (column_name, table.name, e))
458                 continue
459
460             if datum != row._data[column_name]:
461                 row._data[column_name] = datum
462                 if column.alert:
463                     changed = True
464             else:
465                 # Didn't really change but the OVSDB monitor protocol always
466                 # includes every value in a row.
467                 pass
468         return changed
469
470     def __create_row(self, table, uuid):
471         data = {}
472         for column in table.columns.itervalues():
473             data[column.name] = ovs.db.data.Datum.default(column.type)
474         row = table.rows[uuid] = Row(self, table, uuid, data)
475         return row
476
477     def __error(self):
478         self._session.force_reconnect()
479
480     def __txn_abort_all(self):
481         while self._outstanding_txns:
482             txn = self._outstanding_txns.popitem()[1]
483             txn._status = Transaction.TRY_AGAIN
484
485     def __txn_process_reply(self, msg):
486         txn = self._outstanding_txns.pop(msg.id, None)
487         if txn:
488             txn._process_reply(msg)
489
490
491 def _uuid_to_row(atom, base):
492     if base.ref_table:
493         return base.ref_table.rows.get(atom)
494     else:
495         return atom
496
497
498 def _row_to_uuid(value):
499     if type(value) == Row:
500         return value.uuid
501     else:
502         return value
503
504
505 class Row(object):
506     """A row within an IDL.
507
508     The client may access the following attributes directly:
509
510     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
511
512     - An attribute for each column in the Row's table, named for the column,
513       whose values are as returned by Datum.to_python() for the column's type.
514
515       If some error occurs (e.g. the database server's idea of the column is
516       different from the IDL's idea), then the attribute values is the
517       "default" value return by Datum.default() for the column's type.  (It is
518       important to know this because the default value may violate constraints
519       for the column's type, e.g. the default integer value is 0 even if column
520       contraints require the column's value to be positive.)
521
522       When a transaction is active, column attributes may also be assigned new
523       values.  Committing the transaction will then cause the new value to be
524       stored into the database.
525
526       *NOTE*: In the current implementation, the value of a column is a *copy*
527       of the value in the database.  This means that modifying its value
528       directly will have no useful effect.  For example, the following:
529         row.mycolumn["a"] = "b"              # don't do this
530       will not change anything in the database, even after commit.  To modify
531       the column, instead assign the modified column value back to the column:
532         d = row.mycolumn
533         d["a"] = "b"
534         row.mycolumn = d
535 """
536     def __init__(self, idl, table, uuid, data):
537         # All of the explicit references to self.__dict__ below are required
538         # to set real attributes with invoking self.__getattr__().
539         self.__dict__["uuid"] = uuid
540
541         self.__dict__["_idl"] = idl
542         self.__dict__["_table"] = table
543
544         # _data is the committed data.  It takes the following values:
545         #
546         #   - A dictionary that maps every column name to a Datum, if the row
547         #     exists in the committed form of the database.
548         #
549         #   - None, if this row is newly inserted within the active transaction
550         #     and thus has no committed form.
551         self.__dict__["_data"] = data
552
553         # _changes describes changes to this row within the active transaction.
554         # It takes the following values:
555         #
556         #   - {}, the empty dictionary, if no transaction is active or if the
557         #     row has yet not been changed within this transaction.
558         #
559         #   - A dictionary that maps a column name to its new Datum, if an
560         #     active transaction changes those columns' values.
561         #
562         #   - A dictionary that maps every column name to a Datum, if the row
563         #     is newly inserted within the active transaction.
564         #
565         #   - None, if this transaction deletes this row.
566         self.__dict__["_changes"] = {}
567
568         # A dictionary whose keys are the names of columns that must be
569         # verified as prerequisites when the transaction commits.  The values
570         # in the dictionary are all None.
571         self.__dict__["_prereqs"] = {}
572
573     def __getattr__(self, column_name):
574         assert self._changes is not None
575
576         datum = self._changes.get(column_name)
577         if datum is None:
578             if self._data is None:
579                 raise AttributeError("%s instance has no attribute '%s'" %
580                                      (self.__class__.__name__, column_name))
581             if column_name in self._data:
582                 datum = self._data[column_name]
583             else:
584                 raise AttributeError("%s instance has no attribute '%s'" %
585                                      (self.__class__.__name__, column_name))
586
587         return datum.to_python(_uuid_to_row)
588
589     def __setattr__(self, column_name, value):
590         assert self._changes is not None
591         assert self._idl.txn
592
593         if ((self._table.name in self._idl.readonly) and
594             (column_name in self._idl.readonly[self._table.name])):
595             vlog.warn("attempting to write to readonly column %s"
596                       % column_name)
597             return
598
599         column = self._table.columns[column_name]
600         try:
601             datum = ovs.db.data.Datum.from_python(column.type, value,
602                                                   _row_to_uuid)
603         except error.Error as e:
604             # XXX rate-limit
605             vlog.err("attempting to write bad value to column %s (%s)"
606                      % (column_name, e))
607             return
608         self._idl.txn._write(self, column, datum)
609
610     @classmethod
611     def from_json(cls, idl, table, uuid, row_json):
612         data = {}
613         for column_name, datum_json in row_json.iteritems():
614             column = table.columns.get(column_name)
615             if not column:
616                 # XXX rate-limit
617                 vlog.warn("unknown column %s in table %s"
618                           % (column_name, table.name))
619                 continue
620             try:
621                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
622             except error.Error as e:
623                 # XXX rate-limit
624                 vlog.warn("error parsing column %s in table %s: %s"
625                           % (column_name, table.name, e))
626                 continue
627             data[column_name] = datum
628         return cls(idl, table, uuid, data)
629
630     def verify(self, column_name):
631         """Causes the original contents of column 'column_name' in this row to
632         be verified as a prerequisite to completing the transaction.  That is,
633         if 'column_name' changed in this row (or if this row was deleted)
634         between the time that the IDL originally read its contents and the time
635         that the transaction commits, then the transaction aborts and
636         Transaction.commit() returns Transaction.TRY_AGAIN.
637
638         The intention is that, to ensure that no transaction commits based on
639         dirty reads, an application should call Row.verify() on each data item
640         read as part of a read-modify-write operation.
641
642         In some cases Row.verify() reduces to a no-op, because the current
643         value of the column is already known:
644
645           - If this row is a row created by the current transaction (returned
646             by Transaction.insert()).
647
648           - If the column has already been modified within the current
649             transaction.
650
651         Because of the latter property, always call Row.verify() *before*
652         modifying the column, for a given read-modify-write.
653
654         A transaction must be in progress."""
655         assert self._idl.txn
656         assert self._changes is not None
657         if not self._data or column_name in self._changes:
658             return
659
660         self._prereqs[column_name] = None
661
662     def delete(self):
663         """Deletes this row from its table.
664
665         A transaction must be in progress."""
666         assert self._idl.txn
667         assert self._changes is not None
668         if self._data is None:
669             del self._idl.txn._txn_rows[self.uuid]
670         else:
671             self._idl.txn._txn_rows[self.uuid] = self
672         self.__dict__["_changes"] = None
673         del self._table.rows[self.uuid]
674
675     def fetch(self, column_name):
676         self._idl.txn._fetch(self, column_name)
677
678     def increment(self, column_name):
679         """Causes the transaction, when committed, to increment the value of
680         'column_name' within this row by 1.  'column_name' must have an integer
681         type.  After the transaction commits successfully, the client may
682         retrieve the final (incremented) value of 'column_name' with
683         Transaction.get_increment_new_value().
684
685         The client could accomplish something similar by reading and writing
686         and verify()ing columns.  However, increment() will never (by itself)
687         cause a transaction to fail because of a verify error.
688
689         The intended use is for incrementing the "next_cfg" column in
690         the Open_vSwitch table."""
691         self._idl.txn._increment(self, column_name)
692
693
694 def _uuid_name_from_uuid(uuid):
695     return "row%s" % str(uuid).replace("-", "_")
696
697
698 def _where_uuid_equals(uuid):
699     return [["_uuid", "==", ["uuid", str(uuid)]]]
700
701
702 class _InsertedRow(object):
703     def __init__(self, op_index):
704         self.op_index = op_index
705         self.real = None
706
707
708 class Transaction(object):
709     """A transaction may modify the contents of a database by modifying the
710     values of columns, deleting rows, inserting rows, or adding checks that
711     columns in the database have not changed ("verify" operations), through
712     Row methods.
713
714     Reading and writing columns and inserting and deleting rows are all
715     straightforward.  The reasons to verify columns are less obvious.
716     Verification is the key to maintaining transactional integrity.  Because
717     OVSDB handles multiple clients, it can happen that between the time that
718     OVSDB client A reads a column and writes a new value, OVSDB client B has
719     written that column.  Client A's write should not ordinarily overwrite
720     client B's, especially if the column in question is a "map" column that
721     contains several more or less independent data items.  If client A adds a
722     "verify" operation before it writes the column, then the transaction fails
723     in case client B modifies it first.  Client A will then see the new value
724     of the column and compose a new transaction based on the new contents
725     written by client B.
726
727     When a transaction is complete, which must be before the next call to
728     Idl.run(), call Transaction.commit() or Transaction.abort().
729
730     The life-cycle of a transaction looks like this:
731
732     1. Create the transaction and record the initial sequence number:
733
734         seqno = idl.change_seqno(idl)
735         txn = Transaction(idl)
736
737     2. Modify the database with Row and Transaction methods.
738
739     3. Commit the transaction by calling Transaction.commit().  The first call
740        to this function probably returns Transaction.INCOMPLETE.  The client
741        must keep calling again along as this remains true, calling Idl.run() in
742        between to let the IDL do protocol processing.  (If the client doesn't
743        have anything else to do in the meantime, it can use
744        Transaction.commit_block() to avoid having to loop itself.)
745
746     4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
747        to change from the saved 'seqno' (it's possible that it's already
748        changed, in which case the client should not wait at all), then start
749        over from step 1.  Only a call to Idl.run() will change the return value
750        of Idl.change_seqno.  (Transaction.commit_block() calls Idl.run().)"""
751
752     # Status values that Transaction.commit() can return.
753
754     # Not yet committed or aborted.
755     UNCOMMITTED = "uncommitted"
756     # Transaction didn't include any changes.
757     UNCHANGED = "unchanged"
758     # Commit in progress, please wait.
759     INCOMPLETE = "incomplete"
760     # ovsdb_idl_txn_abort() called.
761     ABORTED = "aborted"
762     # Commit successful.
763     SUCCESS = "success"
764     # Commit failed because a "verify" operation
765     # reported an inconsistency, due to a network
766     # problem, or other transient failure.  Wait
767     # for a change, then try again.
768     TRY_AGAIN = "try again"
769     # Server hasn't given us the lock yet.
770     NOT_LOCKED = "not locked"
771     # Commit failed due to a hard error.
772     ERROR = "error"
773
774     @staticmethod
775     def status_to_string(status):
776         """Converts one of the status values that Transaction.commit() can
777         return into a human-readable string.
778
779         (The status values are in fact such strings already, so
780         there's nothing to do.)"""
781         return status
782
783     def __init__(self, idl):
784         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
785         A given Idl may only have a single active transaction at a time.
786
787         A Transaction may modify the contents of a database by assigning new
788         values to columns (attributes of Row), deleting rows (with
789         Row.delete()), or inserting rows (with Transaction.insert()).  It may
790         also check that columns in the database have not changed with
791         Row.verify().
792
793         When a transaction is complete (which must be before the next call to
794         Idl.run()), call Transaction.commit() or Transaction.abort()."""
795         assert idl.txn is None
796
797         idl.txn = self
798         self._request_id = None
799         self.idl = idl
800         self.dry_run = False
801         self._txn_rows = {}
802         self._status = Transaction.UNCOMMITTED
803         self._error = None
804         self._comments = []
805
806         self._inc_row = None
807         self._inc_column = None
808
809         self._fetch_requests = []
810
811         self._inserted_rows = {}  # Map from UUID to _InsertedRow
812
813     def add_comment(self, comment):
814         """Appends 'comment' to the comments that will be passed to the OVSDB
815         server when this transaction is committed.  (The comment will be
816         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
817         relatively human-readable form.)"""
818         self._comments.append(comment)
819
820     def wait(self, poller):
821         """Causes poll_block() to wake up if this transaction has completed
822         committing."""
823         if self._status not in (Transaction.UNCOMMITTED,
824                                 Transaction.INCOMPLETE):
825             poller.immediate_wake()
826
827     def _substitute_uuids(self, json):
828         if type(json) in (list, tuple):
829             if (len(json) == 2
830                 and json[0] == 'uuid'
831                 and ovs.ovsuuid.is_valid_string(json[1])):
832                 uuid = ovs.ovsuuid.from_string(json[1])
833                 row = self._txn_rows.get(uuid, None)
834                 if row and row._data is None:
835                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
836             else:
837                 return [self._substitute_uuids(elem) for elem in json]
838         return json
839
840     def __disassemble(self):
841         self.idl.txn = None
842
843         for row in self._txn_rows.itervalues():
844             if row._changes is None:
845                 row._table.rows[row.uuid] = row
846             elif row._data is None:
847                 del row._table.rows[row.uuid]
848             row.__dict__["_changes"] = {}
849             row.__dict__["_prereqs"] = {}
850         self._txn_rows = {}
851
852     def commit(self):
853         """Attempts to commit 'txn'.  Returns the status of the commit
854         operation, one of the following constants:
855
856           Transaction.INCOMPLETE:
857
858               The transaction is in progress, but not yet complete.  The caller
859               should call again later, after calling Idl.run() to let the
860               IDL do OVSDB protocol processing.
861
862           Transaction.UNCHANGED:
863
864               The transaction is complete.  (It didn't actually change the
865               database, so the IDL didn't send any request to the database
866               server.)
867
868           Transaction.ABORTED:
869
870               The caller previously called Transaction.abort().
871
872           Transaction.SUCCESS:
873
874               The transaction was successful.  The update made by the
875               transaction (and possibly other changes made by other database
876               clients) should already be visible in the IDL.
877
878           Transaction.TRY_AGAIN:
879
880               The transaction failed for some transient reason, e.g. because a
881               "verify" operation reported an inconsistency or due to a network
882               problem.  The caller should wait for a change to the database,
883               then compose a new transaction, and commit the new transaction.
884
885               Use Idl.change_seqno to wait for a change in the database.  It is
886               important to use its value *before* the initial call to
887               Transaction.commit() as the baseline for this purpose, because
888               the change that one should wait for can happen after the initial
889               call but before the call that returns Transaction.TRY_AGAIN, and
890               using some other baseline value in that situation could cause an
891               indefinite wait if the database rarely changes.
892
893           Transaction.NOT_LOCKED:
894
895               The transaction failed because the IDL has been configured to
896               require a database lock (with Idl.set_lock()) but didn't
897               get it yet or has already lost it.
898
899         Committing a transaction rolls back all of the changes that it made to
900         the IDL's copy of the database.  If the transaction commits
901         successfully, then the database server will send an update and, thus,
902         the IDL will be updated with the committed changes."""
903         # The status can only change if we're the active transaction.
904         # (Otherwise, our status will change only in Idl.run().)
905         if self != self.idl.txn:
906             return self._status
907
908         # If we need a lock but don't have it, give up quickly.
909         if self.idl.lock_name and not self.idl.has_lock:
910             self._status = Transaction.NOT_LOCKED
911             self.__disassemble()
912             return self._status
913
914         operations = [self.idl._db.name]
915
916         # Assert that we have the required lock (avoiding a race).
917         if self.idl.lock_name:
918             operations.append({"op": "assert",
919                                "lock": self.idl.lock_name})
920
921         # Add prerequisites and declarations of new rows.
922         for row in self._txn_rows.itervalues():
923             if row._prereqs:
924                 rows = {}
925                 columns = []
926                 for column_name in row._prereqs:
927                     columns.append(column_name)
928                     rows[column_name] = row._data[column_name].to_json()
929                 operations.append({"op": "wait",
930                                    "table": row._table.name,
931                                    "timeout": 0,
932                                    "where": _where_uuid_equals(row.uuid),
933                                    "until": "==",
934                                    "columns": columns,
935                                    "rows": [rows]})
936
937         # Add updates.
938         any_updates = False
939         for row in self._txn_rows.itervalues():
940             if row._changes is None:
941                 if row._table.is_root:
942                     operations.append({"op": "delete",
943                                        "table": row._table.name,
944                                        "where": _where_uuid_equals(row.uuid)})
945                     any_updates = True
946                 else:
947                     # Let ovsdb-server decide whether to really delete it.
948                     pass
949             elif row._changes:
950                 op = {"table": row._table.name}
951                 if row._data is None:
952                     op["op"] = "insert"
953                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
954                     any_updates = True
955
956                     op_index = len(operations) - 1
957                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
958                 else:
959                     op["op"] = "update"
960                     op["where"] = _where_uuid_equals(row.uuid)
961
962                 row_json = {}
963                 op["row"] = row_json
964
965                 for column_name, datum in row._changes.iteritems():
966                     if row._data is not None or not datum.is_default():
967                         row_json[column_name] = (
968                                 self._substitute_uuids(datum.to_json()))
969
970                         # If anything really changed, consider it an update.
971                         # We can't suppress not-really-changed values earlier
972                         # or transactions would become nonatomic (see the big
973                         # comment inside Transaction._write()).
974                         if (not any_updates and row._data is not None and
975                             row._data[column_name] != datum):
976                             any_updates = True
977
978                 if row._data is None or row_json:
979                     operations.append(op)
980
981         if self._fetch_requests:
982             for fetch in self._fetch_requests:
983                 fetch["index"] = len(operations) - 1
984                 operations.append({"op": "select",
985                                    "table": fetch["row"]._table.name,
986                                    "where": self._substitute_uuids(
987                                        _where_uuid_equals(fetch["row"].uuid)),
988                                    "columns": [fetch["column_name"]]})
989             any_updates = True
990
991         # Add increment.
992         if self._inc_row and any_updates:
993             self._inc_index = len(operations) - 1
994
995             operations.append({"op": "mutate",
996                                "table": self._inc_row._table.name,
997                                "where": self._substitute_uuids(
998                                    _where_uuid_equals(self._inc_row.uuid)),
999                                "mutations": [[self._inc_column, "+=", 1]]})
1000             operations.append({"op": "select",
1001                                "table": self._inc_row._table.name,
1002                                "where": self._substitute_uuids(
1003                                    _where_uuid_equals(self._inc_row.uuid)),
1004                                "columns": [self._inc_column]})
1005
1006         # Add comment.
1007         if self._comments:
1008             operations.append({"op": "comment",
1009                                "comment": "\n".join(self._comments)})
1010
1011         # Dry run?
1012         if self.dry_run:
1013             operations.append({"op": "abort"})
1014
1015         if not any_updates:
1016             self._status = Transaction.UNCHANGED
1017         else:
1018             msg = ovs.jsonrpc.Message.create_request("transact", operations)
1019             self._request_id = msg.id
1020             if not self.idl._session.send(msg):
1021                 self.idl._outstanding_txns[self._request_id] = self
1022                 self._status = Transaction.INCOMPLETE
1023             else:
1024                 self._status = Transaction.TRY_AGAIN
1025
1026         self.__disassemble()
1027         return self._status
1028
1029     def commit_block(self):
1030         """Attempts to commit this transaction, blocking until the commit
1031         either succeeds or fails.  Returns the final commit status, which may
1032         be any Transaction.* value other than Transaction.INCOMPLETE.
1033
1034         This function calls Idl.run() on this transaction'ss IDL, so it may
1035         cause Idl.change_seqno to change."""
1036         while True:
1037             status = self.commit()
1038             if status != Transaction.INCOMPLETE:
1039                 return status
1040
1041             self.idl.run()
1042
1043             poller = ovs.poller.Poller()
1044             self.idl.wait(poller)
1045             self.wait(poller)
1046             poller.block()
1047
1048     def get_increment_new_value(self):
1049         """Returns the final (incremented) value of the column in this
1050         transaction that was set to be incremented by Row.increment.  This
1051         transaction must have committed successfully."""
1052         assert self._status == Transaction.SUCCESS
1053         return self._inc_new_value
1054
1055     def abort(self):
1056         """Aborts this transaction.  If Transaction.commit() has already been
1057         called then the transaction might get committed anyhow."""
1058         self.__disassemble()
1059         if self._status in (Transaction.UNCOMMITTED,
1060                             Transaction.INCOMPLETE):
1061             self._status = Transaction.ABORTED
1062
1063     def get_error(self):
1064         """Returns a string representing this transaction's current status,
1065         suitable for use in log messages."""
1066         if self._status != Transaction.ERROR:
1067             return Transaction.status_to_string(self._status)
1068         elif self._error:
1069             return self._error
1070         else:
1071             return "no error details available"
1072
1073     def __set_error_json(self, json):
1074         if self._error is None:
1075             self._error = ovs.json.to_string(json)
1076
1077     def get_insert_uuid(self, uuid):
1078         """Finds and returns the permanent UUID that the database assigned to a
1079         newly inserted row, given the UUID that Transaction.insert() assigned
1080         locally to that row.
1081
1082         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1083         or if it was assigned by that function and then deleted by Row.delete()
1084         within the same transaction.  (Rows that are inserted and then deleted
1085         within a single transaction are never sent to the database server, so
1086         it never assigns them a permanent UUID.)
1087
1088         This transaction must have completed successfully."""
1089         assert self._status in (Transaction.SUCCESS,
1090                                 Transaction.UNCHANGED)
1091         inserted_row = self._inserted_rows.get(uuid)
1092         if inserted_row:
1093             return inserted_row.real
1094         return None
1095
1096     def _increment(self, row, column):
1097         assert not self._inc_row
1098         self._inc_row = row
1099         self._inc_column = column
1100
1101     def _fetch(self, row, column_name):
1102         self._fetch_requests.append({"row": row, "column_name": column_name})
1103
1104     def _write(self, row, column, datum):
1105         assert row._changes is not None
1106
1107         txn = row._idl.txn
1108
1109         # If this is a write-only column and the datum being written is the
1110         # same as the one already there, just skip the update entirely.  This
1111         # is worth optimizing because we have a lot of columns that get
1112         # periodically refreshed into the database but don't actually change
1113         # that often.
1114         #
1115         # We don't do this for read/write columns because that would break
1116         # atomicity of transactions--some other client might have written a
1117         # different value in that column since we read it.  (But if a whole
1118         # transaction only does writes of existing values, without making any
1119         # real changes, we will drop the whole transaction later in
1120         # ovsdb_idl_txn_commit().)
1121         if (not column.alert and row._data and
1122                 row._data.get(column.name) == datum):
1123             new_value = row._changes.get(column.name)
1124             if new_value is None or new_value == datum:
1125                 return
1126
1127         txn._txn_rows[row.uuid] = row
1128         row._changes[column.name] = datum.copy()
1129
1130     def insert(self, table, new_uuid=None):
1131         """Inserts and returns a new row in 'table', which must be one of the
1132         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1133
1134         The new row is assigned a provisional UUID.  If 'uuid' is None then one
1135         is randomly generated; otherwise 'uuid' should specify a randomly
1136         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
1137         different UUID when 'txn' is committed, but the IDL will replace any
1138         uses of the provisional UUID in the data to be to be committed by the
1139         UUID assigned by ovsdb-server."""
1140         assert self._status == Transaction.UNCOMMITTED
1141         if new_uuid is None:
1142             new_uuid = uuid.uuid4()
1143         row = Row(self.idl, table, new_uuid, None)
1144         table.rows[row.uuid] = row
1145         self._txn_rows[row.uuid] = row
1146         return row
1147
1148     def _process_reply(self, msg):
1149         if msg.type == ovs.jsonrpc.Message.T_ERROR:
1150             self._status = Transaction.ERROR
1151         elif type(msg.result) not in (list, tuple):
1152             # XXX rate-limit
1153             vlog.warn('reply to "transact" is not JSON array')
1154         else:
1155             hard_errors = False
1156             soft_errors = False
1157             lock_errors = False
1158
1159             ops = msg.result
1160             for op in ops:
1161                 if op is None:
1162                     # This isn't an error in itself but indicates that some
1163                     # prior operation failed, so make sure that we know about
1164                     # it.
1165                     soft_errors = True
1166                 elif type(op) == dict:
1167                     error = op.get("error")
1168                     if error is not None:
1169                         if error == "timed out":
1170                             soft_errors = True
1171                         elif error == "not owner":
1172                             lock_errors = True
1173                         elif error == "aborted":
1174                             pass
1175                         else:
1176                             hard_errors = True
1177                             self.__set_error_json(op)
1178                 else:
1179                     hard_errors = True
1180                     self.__set_error_json(op)
1181                     # XXX rate-limit
1182                     vlog.warn("operation reply is not JSON null or object")
1183
1184             if not soft_errors and not hard_errors and not lock_errors:
1185                 if self._inc_row and not self.__process_inc_reply(ops):
1186                     hard_errors = True
1187                 if self._fetch_requests:
1188                     if self.__process_fetch_reply(ops):
1189                         self.idl.change_seqno += 1
1190                     else:
1191                         hard_errors = True
1192
1193                 for insert in self._inserted_rows.itervalues():
1194                     if not self.__process_insert_reply(insert, ops):
1195                         hard_errors = True
1196
1197             if hard_errors:
1198                 self._status = Transaction.ERROR
1199             elif lock_errors:
1200                 self._status = Transaction.NOT_LOCKED
1201             elif soft_errors:
1202                 self._status = Transaction.TRY_AGAIN
1203             else:
1204                 self._status = Transaction.SUCCESS
1205
1206     @staticmethod
1207     def __check_json_type(json, types, name):
1208         if not json:
1209             # XXX rate-limit
1210             vlog.warn("%s is missing" % name)
1211             return False
1212         elif type(json) not in types:
1213             # XXX rate-limit
1214             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1215             return False
1216         else:
1217             return True
1218
1219     def __process_fetch_reply(self, ops):
1220         update = False
1221         for fetch_request in self._fetch_requests:
1222             row = fetch_request["row"]
1223             column_name = fetch_request["column_name"]
1224             index = fetch_request["index"]
1225             table = row._table
1226
1227             select = ops[index]
1228             fetched_rows = select.get("rows")
1229             if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1230                                                  '"select" reply "rows"'):
1231                 return False
1232             if len(fetched_rows) != 1:
1233                 # XXX rate-limit
1234                 vlog.warn('"select" reply "rows" has %d elements '
1235                           'instead of 1' % len(fetched_rows))
1236                 continue
1237             fetched_row = fetched_rows[0]
1238             if not Transaction.__check_json_type(fetched_row, (dict,),
1239                                                  '"select" reply row'):
1240                 continue
1241
1242             column = table.columns.get(column_name)
1243             datum_json = fetched_row.get(column_name)
1244             datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1245
1246             row._data[column_name] = datum
1247             update = True
1248
1249         return update
1250
1251     def __process_inc_reply(self, ops):
1252         if self._inc_index + 2 > len(ops):
1253             # XXX rate-limit
1254             vlog.warn("reply does not contain enough operations for "
1255                       "increment (has %d, needs %d)" %
1256                       (len(ops), self._inc_index + 2))
1257
1258         # We know that this is a JSON object because the loop in
1259         # __process_reply() already checked.
1260         mutate = ops[self._inc_index]
1261         count = mutate.get("count")
1262         if not Transaction.__check_json_type(count, (int, long),
1263                                              '"mutate" reply "count"'):
1264             return False
1265         if count != 1:
1266             # XXX rate-limit
1267             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1268             return False
1269
1270         select = ops[self._inc_index + 1]
1271         rows = select.get("rows")
1272         if not Transaction.__check_json_type(rows, (list, tuple),
1273                                              '"select" reply "rows"'):
1274             return False
1275         if len(rows) != 1:
1276             # XXX rate-limit
1277             vlog.warn('"select" reply "rows" has %d elements '
1278                       'instead of 1' % len(rows))
1279             return False
1280         row = rows[0]
1281         if not Transaction.__check_json_type(row, (dict,),
1282                                              '"select" reply row'):
1283             return False
1284         column = row.get(self._inc_column)
1285         if not Transaction.__check_json_type(column, (int, long),
1286                                              '"select" reply inc column'):
1287             return False
1288         self._inc_new_value = column
1289         return True
1290
1291     def __process_insert_reply(self, insert, ops):
1292         if insert.op_index >= len(ops):
1293             # XXX rate-limit
1294             vlog.warn("reply does not contain enough operations "
1295                       "for insert (has %d, needs %d)"
1296                       % (len(ops), insert.op_index))
1297             return False
1298
1299         # We know that this is a JSON object because the loop in
1300         # __process_reply() already checked.
1301         reply = ops[insert.op_index]
1302         json_uuid = reply.get("uuid")
1303         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1304                                              '"insert" reply "uuid"'):
1305             return False
1306
1307         try:
1308             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1309         except error.Error:
1310             # XXX rate-limit
1311             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1312             return False
1313
1314         insert.real = uuid_
1315         return True
1316
1317
1318 class SchemaHelper(object):
1319     """IDL Schema helper.
1320
1321     This class encapsulates the logic required to generate schemas suitable
1322     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1323     they are interested in using register_columns().  When finished, the
1324     get_idl_schema() function may be called.
1325
1326     The location on disk of the schema used may be found in the
1327     'schema_location' variable."""
1328
1329     def __init__(self, location=None, schema_json=None):
1330         """Creates a new Schema object.
1331
1332         'location' file path to ovs schema. None means default location
1333         'schema_json' schema in json preresentation in memory
1334         """
1335
1336         if location and schema_json:
1337             raise ValueError("both location and schema_json can't be "
1338                              "specified. it's ambiguous.")
1339         if schema_json is None:
1340             if location is None:
1341                 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1342             schema_json = ovs.json.from_file(location)
1343
1344         self.schema_json = schema_json
1345         self._tables = {}
1346         self._readonly = {}
1347         self._all = False
1348
1349     def register_columns(self, table, columns, readonly=[]):
1350         """Registers interest in the given 'columns' of 'table'.  Future calls
1351         to get_idl_schema() will include 'table':column for each column in
1352         'columns'. This function automatically avoids adding duplicate entries
1353         to the schema.
1354         A subset of 'columns' can be specified as 'readonly'. The readonly
1355         columns are not replicated but can be fetched on-demand by the user
1356         with Row.fetch().
1357
1358         'table' must be a string.
1359         'columns' must be a list of strings.
1360         'readonly' must be a list of strings.
1361         """
1362
1363         assert type(table) is str
1364         assert type(columns) is list
1365
1366         columns = set(columns) | self._tables.get(table, set())
1367         self._tables[table] = columns
1368         self._readonly[table] = readonly
1369
1370     def register_table(self, table):
1371         """Registers interest in the given all columns of 'table'. Future calls
1372         to get_idl_schema() will include all columns of 'table'.
1373
1374         'table' must be a string
1375         """
1376         assert type(table) is str
1377         self._tables[table] = set()  # empty set means all columns in the table
1378
1379     def register_all(self):
1380         """Registers interest in every column of every table."""
1381         self._all = True
1382
1383     def get_idl_schema(self):
1384         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1385         object based on columns registered using the register_columns()
1386         function."""
1387
1388         schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1389         self.schema_json = None
1390
1391         if not self._all:
1392             schema_tables = {}
1393             for table, columns in self._tables.iteritems():
1394                 schema_tables[table] = (
1395                     self._keep_table_columns(schema, table, columns))
1396
1397             schema.tables = schema_tables
1398         schema.readonly = self._readonly
1399         return schema
1400
1401     def _keep_table_columns(self, schema, table_name, columns):
1402         assert table_name in schema.tables
1403         table = schema.tables[table_name]
1404
1405         if not columns:
1406             # empty set means all columns in the table
1407             return table
1408
1409         new_columns = {}
1410         for column_name in columns:
1411             assert type(column_name) is str
1412             assert column_name in table.columns
1413
1414             new_columns[column_name] = table.columns[column_name]
1415
1416         table.columns = new_columns
1417         return table