python: Resolve pep8 comparison errors.
[cascardo/ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import uuid
16
17 import ovs.jsonrpc
18 import ovs.db.parser
19 import ovs.db.schema
20 from ovs.db import error
21 import ovs.ovsuuid
22 import ovs.poller
23 import ovs.vlog
24
25 vlog = ovs.vlog.Vlog("idl")
26
27 __pychecker__ = 'no-classattr no-objattrs'
28
29 ROW_CREATE = "create"
30 ROW_UPDATE = "update"
31 ROW_DELETE = "delete"
32
33
34 class Idl(object):
35     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
36
37     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
38     requests to an OVSDB database server and parses the responses, converting
39     raw JSON into data structures that are easier for clients to digest.
40
41     The IDL also assists with issuing database transactions.  The client
42     creates a transaction, manipulates the IDL data structures, and commits or
43     aborts the transaction.  The IDL then composes and issues the necessary
44     JSON-RPC requests and reports to the client whether the transaction
45     completed successfully.
46
47     The client is allowed to access the following attributes directly, in a
48     read-only fashion:
49
50     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
51       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
52       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
53       to a Row object.
54
55       The client may directly read and write the Row objects referenced by the
56       'rows' map values.  Refer to Row for more details.
57
58     - 'change_seqno': A number that represents the IDL's state.  When the IDL
59       is updated (by Idl.run()), its value changes.  The sequence number can
60       occasionally change even if the database does not.  This happens if the
61       connection to the database drops and reconnects, which causes the
62       database contents to be reloaded even if they didn't change.  (It could
63       also happen if the database server sends out a "change" that reflects
64       what the IDL already thought was in the database.  The database server is
65       not supposed to do that, but bugs could in theory cause it to do so.)
66
67     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
68       if no lock is configured.
69
70     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
71       lock, and False otherwise.
72
73       Locking and unlocking happens asynchronously from the database client's
74       point of view, so the information is only useful for optimization
75       (e.g. if the client doesn't have the lock then there's no point in trying
76       to write to the database).
77
78     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
79       the database server has indicated that some other client already owns the
80       requested lock, and False otherwise.
81
82     - 'txn': The ovs.db.idl.Transaction object for the database transaction
83       currently being constructed, if there is one, or None otherwise.
84 """
85
86     def __init__(self, remote, schema):
87         """Creates and returns a connection to the database named 'db_name' on
88         'remote', which should be in a form acceptable to
89         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
90         replica of the remote database.
91
92         'schema' should be the schema for the remote database.  The caller may
93         have cut it down by removing tables or columns that are not of
94         interest.  The IDL will only replicate the tables and columns that
95         remain.  The caller may also add a attribute named 'alert' to selected
96         remaining columns, setting its value to False; if so, then changes to
97         those columns will not be considered changes to the database for the
98         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
99         useful for columns that the IDL's client will write but not read.
100
101         As a convenience to users, 'schema' may also be an instance of the
102         SchemaHelper class.
103
104         The IDL uses and modifies 'schema' directly."""
105
106         assert isinstance(schema, SchemaHelper)
107         schema = schema.get_idl_schema()
108
109         self.tables = schema.tables
110         self.readonly = schema.readonly
111         self._db = schema
112         self._session = ovs.jsonrpc.Session.open(remote)
113         self._monitor_request_id = None
114         self._last_seqno = None
115         self.change_seqno = 0
116
117         # Database locking.
118         self.lock_name = None          # Name of lock we need, None if none.
119         self.has_lock = False          # Has db server said we have the lock?
120         self.is_lock_contended = False  # Has db server said we can't get lock?
121         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
122
123         # Transaction support.
124         self.txn = None
125         self._outstanding_txns = {}
126
127         for table in schema.tables.itervalues():
128             for column in table.columns.itervalues():
129                 if not hasattr(column, 'alert'):
130                     column.alert = True
131             table.need_table = False
132             table.rows = {}
133             table.idl = self
134
135     def close(self):
136         """Closes the connection to the database.  The IDL will no longer
137         update."""
138         self._session.close()
139
140     def run(self):
141         """Processes a batch of messages from the database server.  Returns
142         True if the database as seen through the IDL changed, False if it did
143         not change.  The initial fetch of the entire contents of the remote
144         database is considered to be one kind of change.  If the IDL has been
145         configured to acquire a database lock (with Idl.set_lock()), then
146         successfully acquiring the lock is also considered to be a change.
147
148         This function can return occasional false positives, that is, report
149         that the database changed even though it didn't.  This happens if the
150         connection to the database drops and reconnects, which causes the
151         database contents to be reloaded even if they didn't change.  (It could
152         also happen if the database server sends out a "change" that reflects
153         what we already thought was in the database, but the database server is
154         not supposed to do that.)
155
156         As an alternative to checking the return value, the client may check
157         for changes in self.change_seqno."""
158         assert not self.txn
159         initial_change_seqno = self.change_seqno
160         self._session.run()
161         i = 0
162         while i < 50:
163             i += 1
164             if not self._session.is_connected():
165                 break
166
167             seqno = self._session.get_seqno()
168             if seqno != self._last_seqno:
169                 self._last_seqno = seqno
170                 self.__txn_abort_all()
171                 self.__send_monitor_request()
172                 if self.lock_name:
173                     self.__send_lock_request()
174                 break
175
176             msg = self._session.recv()
177             if msg is None:
178                 break
179             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
180                 and msg.method == "update"
181                 and len(msg.params) == 2
182                 and msg.params[0] is None):
183                 # Database contents changed.
184                 self.__parse_update(msg.params[1])
185             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
186                   and self._monitor_request_id is not None
187                   and self._monitor_request_id == msg.id):
188                 # Reply to our "monitor" request.
189                 try:
190                     self.change_seqno += 1
191                     self._monitor_request_id = None
192                     self.__clear()
193                     self.__parse_update(msg.result)
194                 except error.Error, e:
195                     vlog.err("%s: parse error in received schema: %s"
196                               % (self._session.get_name(), e))
197                     self.__error()
198             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
199                   and self._lock_request_id is not None
200                   and self._lock_request_id == msg.id):
201                 # Reply to our "lock" request.
202                 self.__parse_lock_reply(msg.result)
203             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
204                   and msg.method == "locked"):
205                 # We got our lock.
206                 self.__parse_lock_notify(msg.params, True)
207             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
208                   and msg.method == "stolen"):
209                 # Someone else stole our lock.
210                 self.__parse_lock_notify(msg.params, False)
211             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
212                 # Reply to our echo request.  Ignore it.
213                 pass
214             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
215                                ovs.jsonrpc.Message.T_REPLY)
216                   and self.__txn_process_reply(msg)):
217                 # __txn_process_reply() did everything needed.
218                 pass
219             else:
220                 # This can happen if a transaction is destroyed before we
221                 # receive the reply, so keep the log level low.
222                 vlog.dbg("%s: received unexpected %s message"
223                          % (self._session.get_name(),
224                              ovs.jsonrpc.Message.type_to_string(msg.type)))
225
226         return initial_change_seqno != self.change_seqno
227
228     def wait(self, poller):
229         """Arranges for poller.block() to wake up when self.run() has something
230         to do or when activity occurs on a transaction on 'self'."""
231         self._session.wait(poller)
232         self._session.recv_wait(poller)
233
234     def has_ever_connected(self):
235         """Returns True, if the IDL successfully connected to the remote
236         database and retrieved its contents (even if the connection
237         subsequently dropped and is in the process of reconnecting).  If so,
238         then the IDL contains an atomic snapshot of the database's contents
239         (but it might be arbitrarily old if the connection dropped).
240
241         Returns False if the IDL has never connected or retrieved the
242         database's contents.  If so, the IDL is empty."""
243         return self.change_seqno != 0
244
245     def force_reconnect(self):
246         """Forces the IDL to drop its connection to the database and reconnect.
247         In the meantime, the contents of the IDL will not change."""
248         self._session.force_reconnect()
249
250     def set_lock(self, lock_name):
251         """If 'lock_name' is not None, configures the IDL to obtain the named
252         lock from the database server and to avoid modifying the database when
253         the lock cannot be acquired (that is, when another client has the same
254         lock).
255
256         If 'lock_name' is None, drops the locking requirement and releases the
257         lock."""
258         assert not self.txn
259         assert not self._outstanding_txns
260
261         if self.lock_name and (not lock_name or lock_name != self.lock_name):
262             # Release previous lock.
263             self.__send_unlock_request()
264             self.lock_name = None
265             self.is_lock_contended = False
266
267         if lock_name and not self.lock_name:
268             # Acquire new lock.
269             self.lock_name = lock_name
270             self.__send_lock_request()
271
272     def notify(self, event, row, updates=None):
273         """Hook for implementing create/update/delete notifications
274
275         :param event:   The event that was triggered
276         :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE
277         :param row:     The row as it is after the operation has occured
278         :type row:      Row
279         :param updates: For updates, a Row object with just the changed columns
280         :type updates:  Row
281         """
282
283     def __clear(self):
284         changed = False
285
286         for table in self.tables.itervalues():
287             if table.rows:
288                 changed = True
289                 table.rows = {}
290
291         if changed:
292             self.change_seqno += 1
293
294     def __update_has_lock(self, new_has_lock):
295         if new_has_lock and not self.has_lock:
296             if self._monitor_request_id is None:
297                 self.change_seqno += 1
298             else:
299                 # We're waiting for a monitor reply, so don't signal that the
300                 # database changed.  The monitor reply will increment
301                 # change_seqno anyhow.
302                 pass
303             self.is_lock_contended = False
304         self.has_lock = new_has_lock
305
306     def __do_send_lock_request(self, method):
307         self.__update_has_lock(False)
308         self._lock_request_id = None
309         if self._session.is_connected():
310             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
311             msg_id = msg.id
312             self._session.send(msg)
313         else:
314             msg_id = None
315         return msg_id
316
317     def __send_lock_request(self):
318         self._lock_request_id = self.__do_send_lock_request("lock")
319
320     def __send_unlock_request(self):
321         self.__do_send_lock_request("unlock")
322
323     def __parse_lock_reply(self, result):
324         self._lock_request_id = None
325         got_lock = type(result) == dict and result.get("locked") is True
326         self.__update_has_lock(got_lock)
327         if not got_lock:
328             self.is_lock_contended = True
329
330     def __parse_lock_notify(self, params, new_has_lock):
331         if (self.lock_name is not None
332             and type(params) in (list, tuple)
333             and params
334             and params[0] == self.lock_name):
335             self.__update_has_lock(self, new_has_lock)
336             if not new_has_lock:
337                 self.is_lock_contended = True
338
339     def __send_monitor_request(self):
340         monitor_requests = {}
341         for table in self.tables.itervalues():
342             columns = []
343             for column in table.columns.keys():
344                 if ((table.name not in self.readonly) or
345                     (table.name in self.readonly) and
346                     (column not in self.readonly[table.name])):
347                     columns.append(column)
348             monitor_requests[table.name] = {"columns": columns}
349         msg = ovs.jsonrpc.Message.create_request(
350             "monitor", [self._db.name, None, monitor_requests])
351         self._monitor_request_id = msg.id
352         self._session.send(msg)
353
354     def __parse_update(self, update):
355         try:
356             self.__do_parse_update(update)
357         except error.Error, e:
358             vlog.err("%s: error parsing update: %s"
359                      % (self._session.get_name(), e))
360
361     def __do_parse_update(self, table_updates):
362         if type(table_updates) != dict:
363             raise error.Error("<table-updates> is not an object",
364                               table_updates)
365
366         for table_name, table_update in table_updates.iteritems():
367             table = self.tables.get(table_name)
368             if not table:
369                 raise error.Error('<table-updates> includes unknown '
370                                   'table "%s"' % table_name)
371
372             if type(table_update) != dict:
373                 raise error.Error('<table-update> for table "%s" is not '
374                                   'an object' % table_name, table_update)
375
376             for uuid_string, row_update in table_update.iteritems():
377                 if not ovs.ovsuuid.is_valid_string(uuid_string):
378                     raise error.Error('<table-update> for table "%s" '
379                                       'contains bad UUID "%s" as member '
380                                       'name' % (table_name, uuid_string),
381                                       table_update)
382                 uuid = ovs.ovsuuid.from_string(uuid_string)
383
384                 if type(row_update) != dict:
385                     raise error.Error('<table-update> for table "%s" '
386                                       'contains <row-update> for %s that '
387                                       'is not an object'
388                                       % (table_name, uuid_string))
389
390                 parser = ovs.db.parser.Parser(row_update, "row-update")
391                 old = parser.get_optional("old", [dict])
392                 new = parser.get_optional("new", [dict])
393                 parser.finish()
394
395                 if not old and not new:
396                     raise error.Error('<row-update> missing "old" and '
397                                       '"new" members', row_update)
398
399                 if self.__process_update(table, uuid, old, new):
400                     self.change_seqno += 1
401
402     def __process_update(self, table, uuid, old, new):
403         """Returns True if a column changed, False otherwise."""
404         row = table.rows.get(uuid)
405         changed = False
406         if not new:
407             # Delete row.
408             if row:
409                 del table.rows[uuid]
410                 changed = True
411                 self.notify(ROW_DELETE, row)
412             else:
413                 # XXX rate-limit
414                 vlog.warn("cannot delete missing row %s from table %s"
415                           % (uuid, table.name))
416         elif not old:
417             # Insert row.
418             if not row:
419                 row = self.__create_row(table, uuid)
420                 changed = True
421             else:
422                 # XXX rate-limit
423                 vlog.warn("cannot add existing row %s to table %s"
424                           % (uuid, table.name))
425             if self.__row_update(table, row, new):
426                 changed = True
427                 self.notify(ROW_CREATE, row)
428         else:
429             op = ROW_UPDATE
430             if not row:
431                 row = self.__create_row(table, uuid)
432                 changed = True
433                 op = ROW_CREATE
434                 # XXX rate-limit
435                 vlog.warn("cannot modify missing row %s in table %s"
436                           % (uuid, table.name))
437             if self.__row_update(table, row, new):
438                 changed = True
439                 self.notify(op, row, Row.from_json(self, table, uuid, old))
440         return changed
441
442     def __row_update(self, table, row, row_json):
443         changed = False
444         for column_name, datum_json in row_json.iteritems():
445             column = table.columns.get(column_name)
446             if not column:
447                 # XXX rate-limit
448                 vlog.warn("unknown column %s updating table %s"
449                           % (column_name, table.name))
450                 continue
451
452             try:
453                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
454             except error.Error, e:
455                 # XXX rate-limit
456                 vlog.warn("error parsing column %s in table %s: %s"
457                           % (column_name, table.name, e))
458                 continue
459
460             if datum != row._data[column_name]:
461                 row._data[column_name] = datum
462                 if column.alert:
463                     changed = True
464             else:
465                 # Didn't really change but the OVSDB monitor protocol always
466                 # includes every value in a row.
467                 pass
468         return changed
469
470     def __create_row(self, table, uuid):
471         data = {}
472         for column in table.columns.itervalues():
473             data[column.name] = ovs.db.data.Datum.default(column.type)
474         row = table.rows[uuid] = Row(self, table, uuid, data)
475         return row
476
477     def __error(self):
478         self._session.force_reconnect()
479
480     def __txn_abort_all(self):
481         while self._outstanding_txns:
482             txn = self._outstanding_txns.popitem()[1]
483             txn._status = Transaction.TRY_AGAIN
484
485     def __txn_process_reply(self, msg):
486         txn = self._outstanding_txns.pop(msg.id, None)
487         if txn:
488             txn._process_reply(msg)
489
490
491 def _uuid_to_row(atom, base):
492     if base.ref_table:
493         return base.ref_table.rows.get(atom)
494     else:
495         return atom
496
497
498 def _row_to_uuid(value):
499     if type(value) == Row:
500         return value.uuid
501     else:
502         return value
503
504
505 class Row(object):
506     """A row within an IDL.
507
508     The client may access the following attributes directly:
509
510     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
511
512     - An attribute for each column in the Row's table, named for the column,
513       whose values are as returned by Datum.to_python() for the column's type.
514
515       If some error occurs (e.g. the database server's idea of the column is
516       different from the IDL's idea), then the attribute values is the
517       "default" value return by Datum.default() for the column's type.  (It is
518       important to know this because the default value may violate constraints
519       for the column's type, e.g. the default integer value is 0 even if column
520       contraints require the column's value to be positive.)
521
522       When a transaction is active, column attributes may also be assigned new
523       values.  Committing the transaction will then cause the new value to be
524       stored into the database.
525
526       *NOTE*: In the current implementation, the value of a column is a *copy*
527       of the value in the database.  This means that modifying its value
528       directly will have no useful effect.  For example, the following:
529         row.mycolumn["a"] = "b"              # don't do this
530       will not change anything in the database, even after commit.  To modify
531       the column, instead assign the modified column value back to the column:
532         d = row.mycolumn
533         d["a"] = "b"
534         row.mycolumn = d
535 """
536     def __init__(self, idl, table, uuid, data):
537         # All of the explicit references to self.__dict__ below are required
538         # to set real attributes with invoking self.__getattr__().
539         self.__dict__["uuid"] = uuid
540
541         self.__dict__["_idl"] = idl
542         self.__dict__["_table"] = table
543
544         # _data is the committed data.  It takes the following values:
545         #
546         #   - A dictionary that maps every column name to a Datum, if the row
547         #     exists in the committed form of the database.
548         #
549         #   - None, if this row is newly inserted within the active transaction
550         #     and thus has no committed form.
551         self.__dict__["_data"] = data
552
553         # _changes describes changes to this row within the active transaction.
554         # It takes the following values:
555         #
556         #   - {}, the empty dictionary, if no transaction is active or if the
557         #     row has yet not been changed within this transaction.
558         #
559         #   - A dictionary that maps a column name to its new Datum, if an
560         #     active transaction changes those columns' values.
561         #
562         #   - A dictionary that maps every column name to a Datum, if the row
563         #     is newly inserted within the active transaction.
564         #
565         #   - None, if this transaction deletes this row.
566         self.__dict__["_changes"] = {}
567
568         # A dictionary whose keys are the names of columns that must be
569         # verified as prerequisites when the transaction commits.  The values
570         # in the dictionary are all None.
571         self.__dict__["_prereqs"] = {}
572
573     def __getattr__(self, column_name):
574         assert self._changes is not None
575
576         datum = self._changes.get(column_name)
577         if datum is None:
578             if self._data is None:
579                 raise AttributeError("%s instance has no attribute '%s'" %
580                                      (self.__class__.__name__, column_name))
581             if column_name in self._data:
582                 datum = self._data[column_name]
583             else:
584                 raise AttributeError("%s instance has no attribute '%s'" %
585                                      (self.__class__.__name__, column_name))
586
587         return datum.to_python(_uuid_to_row)
588
589     def __setattr__(self, column_name, value):
590         assert self._changes is not None
591         assert self._idl.txn
592
593         if ((self._table.name in self._idl.readonly) and
594             (column_name in self._idl.readonly[self._table.name])):
595             vlog.warn("attempting to write to readonly column %s" % column_name)
596             return
597
598         column = self._table.columns[column_name]
599         try:
600             datum = ovs.db.data.Datum.from_python(column.type, value,
601                                                   _row_to_uuid)
602         except error.Error, e:
603             # XXX rate-limit
604             vlog.err("attempting to write bad value to column %s (%s)"
605                      % (column_name, e))
606             return
607         self._idl.txn._write(self, column, datum)
608
609     @classmethod
610     def from_json(cls, idl, table, uuid, row_json):
611         data = {}
612         for column_name, datum_json in row_json.iteritems():
613             column = table.columns.get(column_name)
614             if not column:
615                 # XXX rate-limit
616                 vlog.warn("unknown column %s in table %s"
617                           % (column_name, table.name))
618                 continue
619             try:
620                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
621             except error.Error, e:
622                 # XXX rate-limit
623                 vlog.warn("error parsing column %s in table %s: %s"
624                           % (column_name, table.name, e))
625                 continue
626             data[column_name] = datum
627         return cls(idl, table, uuid, data)
628
629     def verify(self, column_name):
630         """Causes the original contents of column 'column_name' in this row to
631         be verified as a prerequisite to completing the transaction.  That is,
632         if 'column_name' changed in this row (or if this row was deleted)
633         between the time that the IDL originally read its contents and the time
634         that the transaction commits, then the transaction aborts and
635         Transaction.commit() returns Transaction.TRY_AGAIN.
636
637         The intention is that, to ensure that no transaction commits based on
638         dirty reads, an application should call Row.verify() on each data item
639         read as part of a read-modify-write operation.
640
641         In some cases Row.verify() reduces to a no-op, because the current
642         value of the column is already known:
643
644           - If this row is a row created by the current transaction (returned
645             by Transaction.insert()).
646
647           - If the column has already been modified within the current
648             transaction.
649
650         Because of the latter property, always call Row.verify() *before*
651         modifying the column, for a given read-modify-write.
652
653         A transaction must be in progress."""
654         assert self._idl.txn
655         assert self._changes is not None
656         if not self._data or column_name in self._changes:
657             return
658
659         self._prereqs[column_name] = None
660
661     def delete(self):
662         """Deletes this row from its table.
663
664         A transaction must be in progress."""
665         assert self._idl.txn
666         assert self._changes is not None
667         if self._data is None:
668             del self._idl.txn._txn_rows[self.uuid]
669         else:
670             self._idl.txn._txn_rows[self.uuid] = self
671         self.__dict__["_changes"] = None
672         del self._table.rows[self.uuid]
673
674     def fetch(self, column_name):
675         self._idl.txn._fetch(self, column_name)
676
677     def increment(self, column_name):
678         """Causes the transaction, when committed, to increment the value of
679         'column_name' within this row by 1.  'column_name' must have an integer
680         type.  After the transaction commits successfully, the client may
681         retrieve the final (incremented) value of 'column_name' with
682         Transaction.get_increment_new_value().
683
684         The client could accomplish something similar by reading and writing
685         and verify()ing columns.  However, increment() will never (by itself)
686         cause a transaction to fail because of a verify error.
687
688         The intended use is for incrementing the "next_cfg" column in
689         the Open_vSwitch table."""
690         self._idl.txn._increment(self, column_name)
691
692
693 def _uuid_name_from_uuid(uuid):
694     return "row%s" % str(uuid).replace("-", "_")
695
696
697 def _where_uuid_equals(uuid):
698     return [["_uuid", "==", ["uuid", str(uuid)]]]
699
700
701 class _InsertedRow(object):
702     def __init__(self, op_index):
703         self.op_index = op_index
704         self.real = None
705
706
707 class Transaction(object):
708     """A transaction may modify the contents of a database by modifying the
709     values of columns, deleting rows, inserting rows, or adding checks that
710     columns in the database have not changed ("verify" operations), through
711     Row methods.
712
713     Reading and writing columns and inserting and deleting rows are all
714     straightforward.  The reasons to verify columns are less obvious.
715     Verification is the key to maintaining transactional integrity.  Because
716     OVSDB handles multiple clients, it can happen that between the time that
717     OVSDB client A reads a column and writes a new value, OVSDB client B has
718     written that column.  Client A's write should not ordinarily overwrite
719     client B's, especially if the column in question is a "map" column that
720     contains several more or less independent data items.  If client A adds a
721     "verify" operation before it writes the column, then the transaction fails
722     in case client B modifies it first.  Client A will then see the new value
723     of the column and compose a new transaction based on the new contents
724     written by client B.
725
726     When a transaction is complete, which must be before the next call to
727     Idl.run(), call Transaction.commit() or Transaction.abort().
728
729     The life-cycle of a transaction looks like this:
730
731     1. Create the transaction and record the initial sequence number:
732
733         seqno = idl.change_seqno(idl)
734         txn = Transaction(idl)
735
736     2. Modify the database with Row and Transaction methods.
737
738     3. Commit the transaction by calling Transaction.commit().  The first call
739        to this function probably returns Transaction.INCOMPLETE.  The client
740        must keep calling again along as this remains true, calling Idl.run() in
741        between to let the IDL do protocol processing.  (If the client doesn't
742        have anything else to do in the meantime, it can use
743        Transaction.commit_block() to avoid having to loop itself.)
744
745     4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
746        to change from the saved 'seqno' (it's possible that it's already
747        changed, in which case the client should not wait at all), then start
748        over from step 1.  Only a call to Idl.run() will change the return value
749        of Idl.change_seqno.  (Transaction.commit_block() calls Idl.run().)"""
750
751     # Status values that Transaction.commit() can return.
752     UNCOMMITTED = "uncommitted"  # Not yet committed or aborted.
753     UNCHANGED = "unchanged"      # Transaction didn't include any changes.
754     INCOMPLETE = "incomplete"    # Commit in progress, please wait.
755     ABORTED = "aborted"          # ovsdb_idl_txn_abort() called.
756     SUCCESS = "success"          # Commit successful.
757     TRY_AGAIN = "try again"      # Commit failed because a "verify" operation
758                                  # reported an inconsistency, due to a network
759                                  # problem, or other transient failure.  Wait
760                                  # for a change, then try again.
761     NOT_LOCKED = "not locked"    # Server hasn't given us the lock yet.
762     ERROR = "error"              # Commit failed due to a hard error.
763
764     @staticmethod
765     def status_to_string(status):
766         """Converts one of the status values that Transaction.commit() can
767         return into a human-readable string.
768
769         (The status values are in fact such strings already, so
770         there's nothing to do.)"""
771         return status
772
773     def __init__(self, idl):
774         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
775         A given Idl may only have a single active transaction at a time.
776
777         A Transaction may modify the contents of a database by assigning new
778         values to columns (attributes of Row), deleting rows (with
779         Row.delete()), or inserting rows (with Transaction.insert()).  It may
780         also check that columns in the database have not changed with
781         Row.verify().
782
783         When a transaction is complete (which must be before the next call to
784         Idl.run()), call Transaction.commit() or Transaction.abort()."""
785         assert idl.txn is None
786
787         idl.txn = self
788         self._request_id = None
789         self.idl = idl
790         self.dry_run = False
791         self._txn_rows = {}
792         self._status = Transaction.UNCOMMITTED
793         self._error = None
794         self._comments = []
795
796         self._inc_row = None
797         self._inc_column = None
798
799         self._fetch_requests = []
800
801         self._inserted_rows = {}  # Map from UUID to _InsertedRow
802
803     def add_comment(self, comment):
804         """Appends 'comment' to the comments that will be passed to the OVSDB
805         server when this transaction is committed.  (The comment will be
806         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
807         relatively human-readable form.)"""
808         self._comments.append(comment)
809
810     def wait(self, poller):
811         """Causes poll_block() to wake up if this transaction has completed
812         committing."""
813         if self._status not in (Transaction.UNCOMMITTED,
814                                 Transaction.INCOMPLETE):
815             poller.immediate_wake()
816
817     def _substitute_uuids(self, json):
818         if type(json) in (list, tuple):
819             if (len(json) == 2
820                 and json[0] == 'uuid'
821                 and ovs.ovsuuid.is_valid_string(json[1])):
822                 uuid = ovs.ovsuuid.from_string(json[1])
823                 row = self._txn_rows.get(uuid, None)
824                 if row and row._data is None:
825                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
826             else:
827                 return [self._substitute_uuids(elem) for elem in json]
828         return json
829
830     def __disassemble(self):
831         self.idl.txn = None
832
833         for row in self._txn_rows.itervalues():
834             if row._changes is None:
835                 row._table.rows[row.uuid] = row
836             elif row._data is None:
837                 del row._table.rows[row.uuid]
838             row.__dict__["_changes"] = {}
839             row.__dict__["_prereqs"] = {}
840         self._txn_rows = {}
841
842     def commit(self):
843         """Attempts to commit 'txn'.  Returns the status of the commit
844         operation, one of the following constants:
845
846           Transaction.INCOMPLETE:
847
848               The transaction is in progress, but not yet complete.  The caller
849               should call again later, after calling Idl.run() to let the
850               IDL do OVSDB protocol processing.
851
852           Transaction.UNCHANGED:
853
854               The transaction is complete.  (It didn't actually change the
855               database, so the IDL didn't send any request to the database
856               server.)
857
858           Transaction.ABORTED:
859
860               The caller previously called Transaction.abort().
861
862           Transaction.SUCCESS:
863
864               The transaction was successful.  The update made by the
865               transaction (and possibly other changes made by other database
866               clients) should already be visible in the IDL.
867
868           Transaction.TRY_AGAIN:
869
870               The transaction failed for some transient reason, e.g. because a
871               "verify" operation reported an inconsistency or due to a network
872               problem.  The caller should wait for a change to the database,
873               then compose a new transaction, and commit the new transaction.
874
875               Use Idl.change_seqno to wait for a change in the database.  It is
876               important to use its value *before* the initial call to
877               Transaction.commit() as the baseline for this purpose, because
878               the change that one should wait for can happen after the initial
879               call but before the call that returns Transaction.TRY_AGAIN, and
880               using some other baseline value in that situation could cause an
881               indefinite wait if the database rarely changes.
882
883           Transaction.NOT_LOCKED:
884
885               The transaction failed because the IDL has been configured to
886               require a database lock (with Idl.set_lock()) but didn't
887               get it yet or has already lost it.
888
889         Committing a transaction rolls back all of the changes that it made to
890         the IDL's copy of the database.  If the transaction commits
891         successfully, then the database server will send an update and, thus,
892         the IDL will be updated with the committed changes."""
893         # The status can only change if we're the active transaction.
894         # (Otherwise, our status will change only in Idl.run().)
895         if self != self.idl.txn:
896             return self._status
897
898         # If we need a lock but don't have it, give up quickly.
899         if self.idl.lock_name and not self.idl.has_lock:
900             self._status = Transaction.NOT_LOCKED
901             self.__disassemble()
902             return self._status
903
904         operations = [self.idl._db.name]
905
906         # Assert that we have the required lock (avoiding a race).
907         if self.idl.lock_name:
908             operations.append({"op": "assert",
909                                "lock": self.idl.lock_name})
910
911         # Add prerequisites and declarations of new rows.
912         for row in self._txn_rows.itervalues():
913             if row._prereqs:
914                 rows = {}
915                 columns = []
916                 for column_name in row._prereqs:
917                     columns.append(column_name)
918                     rows[column_name] = row._data[column_name].to_json()
919                 operations.append({"op": "wait",
920                                    "table": row._table.name,
921                                    "timeout": 0,
922                                    "where": _where_uuid_equals(row.uuid),
923                                    "until": "==",
924                                    "columns": columns,
925                                    "rows": [rows]})
926
927         # Add updates.
928         any_updates = False
929         for row in self._txn_rows.itervalues():
930             if row._changes is None:
931                 if row._table.is_root:
932                     operations.append({"op": "delete",
933                                        "table": row._table.name,
934                                        "where": _where_uuid_equals(row.uuid)})
935                     any_updates = True
936                 else:
937                     # Let ovsdb-server decide whether to really delete it.
938                     pass
939             elif row._changes:
940                 op = {"table": row._table.name}
941                 if row._data is None:
942                     op["op"] = "insert"
943                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
944                     any_updates = True
945
946                     op_index = len(operations) - 1
947                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
948                 else:
949                     op["op"] = "update"
950                     op["where"] = _where_uuid_equals(row.uuid)
951
952                 row_json = {}
953                 op["row"] = row_json
954
955                 for column_name, datum in row._changes.iteritems():
956                     if row._data is not None or not datum.is_default():
957                         row_json[column_name] = (
958                                 self._substitute_uuids(datum.to_json()))
959
960                         # If anything really changed, consider it an update.
961                         # We can't suppress not-really-changed values earlier
962                         # or transactions would become nonatomic (see the big
963                         # comment inside Transaction._write()).
964                         if (not any_updates and row._data is not None and
965                             row._data[column_name] != datum):
966                             any_updates = True
967
968                 if row._data is None or row_json:
969                     operations.append(op)
970
971         if self._fetch_requests:
972             for fetch in self._fetch_requests:
973                 fetch["index"] = len(operations) - 1
974                 operations.append({"op": "select",
975                                    "table": fetch["row"]._table.name,
976                                    "where": self._substitute_uuids(
977                                        _where_uuid_equals(fetch["row"].uuid)),
978                                    "columns": [fetch["column_name"]]})
979             any_updates = True
980
981         # Add increment.
982         if self._inc_row and any_updates:
983             self._inc_index = len(operations) - 1
984
985             operations.append({"op": "mutate",
986                                "table": self._inc_row._table.name,
987                                "where": self._substitute_uuids(
988                                    _where_uuid_equals(self._inc_row.uuid)),
989                                "mutations": [[self._inc_column, "+=", 1]]})
990             operations.append({"op": "select",
991                                "table": self._inc_row._table.name,
992                                "where": self._substitute_uuids(
993                                    _where_uuid_equals(self._inc_row.uuid)),
994                                "columns": [self._inc_column]})
995
996         # Add comment.
997         if self._comments:
998             operations.append({"op": "comment",
999                                "comment": "\n".join(self._comments)})
1000
1001         # Dry run?
1002         if self.dry_run:
1003             operations.append({"op": "abort"})
1004
1005         if not any_updates:
1006             self._status = Transaction.UNCHANGED
1007         else:
1008             msg = ovs.jsonrpc.Message.create_request("transact", operations)
1009             self._request_id = msg.id
1010             if not self.idl._session.send(msg):
1011                 self.idl._outstanding_txns[self._request_id] = self
1012                 self._status = Transaction.INCOMPLETE
1013             else:
1014                 self._status = Transaction.TRY_AGAIN
1015
1016         self.__disassemble()
1017         return self._status
1018
1019     def commit_block(self):
1020         """Attempts to commit this transaction, blocking until the commit
1021         either succeeds or fails.  Returns the final commit status, which may
1022         be any Transaction.* value other than Transaction.INCOMPLETE.
1023
1024         This function calls Idl.run() on this transaction'ss IDL, so it may
1025         cause Idl.change_seqno to change."""
1026         while True:
1027             status = self.commit()
1028             if status != Transaction.INCOMPLETE:
1029                 return status
1030
1031             self.idl.run()
1032
1033             poller = ovs.poller.Poller()
1034             self.idl.wait(poller)
1035             self.wait(poller)
1036             poller.block()
1037
1038     def get_increment_new_value(self):
1039         """Returns the final (incremented) value of the column in this
1040         transaction that was set to be incremented by Row.increment.  This
1041         transaction must have committed successfully."""
1042         assert self._status == Transaction.SUCCESS
1043         return self._inc_new_value
1044
1045     def abort(self):
1046         """Aborts this transaction.  If Transaction.commit() has already been
1047         called then the transaction might get committed anyhow."""
1048         self.__disassemble()
1049         if self._status in (Transaction.UNCOMMITTED,
1050                             Transaction.INCOMPLETE):
1051             self._status = Transaction.ABORTED
1052
1053     def get_error(self):
1054         """Returns a string representing this transaction's current status,
1055         suitable for use in log messages."""
1056         if self._status != Transaction.ERROR:
1057             return Transaction.status_to_string(self._status)
1058         elif self._error:
1059             return self._error
1060         else:
1061             return "no error details available"
1062
1063     def __set_error_json(self, json):
1064         if self._error is None:
1065             self._error = ovs.json.to_string(json)
1066
1067     def get_insert_uuid(self, uuid):
1068         """Finds and returns the permanent UUID that the database assigned to a
1069         newly inserted row, given the UUID that Transaction.insert() assigned
1070         locally to that row.
1071
1072         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1073         or if it was assigned by that function and then deleted by Row.delete()
1074         within the same transaction.  (Rows that are inserted and then deleted
1075         within a single transaction are never sent to the database server, so
1076         it never assigns them a permanent UUID.)
1077
1078         This transaction must have completed successfully."""
1079         assert self._status in (Transaction.SUCCESS,
1080                                 Transaction.UNCHANGED)
1081         inserted_row = self._inserted_rows.get(uuid)
1082         if inserted_row:
1083             return inserted_row.real
1084         return None
1085
1086     def _increment(self, row, column):
1087         assert not self._inc_row
1088         self._inc_row = row
1089         self._inc_column = column
1090
1091     def _fetch(self, row, column_name):
1092         self._fetch_requests.append({"row":row, "column_name":column_name})
1093
1094     def _write(self, row, column, datum):
1095         assert row._changes is not None
1096
1097         txn = row._idl.txn
1098
1099         # If this is a write-only column and the datum being written is the
1100         # same as the one already there, just skip the update entirely.  This
1101         # is worth optimizing because we have a lot of columns that get
1102         # periodically refreshed into the database but don't actually change
1103         # that often.
1104         #
1105         # We don't do this for read/write columns because that would break
1106         # atomicity of transactions--some other client might have written a
1107         # different value in that column since we read it.  (But if a whole
1108         # transaction only does writes of existing values, without making any
1109         # real changes, we will drop the whole transaction later in
1110         # ovsdb_idl_txn_commit().)
1111         if not column.alert and row._data and row._data.get(column.name) == datum:
1112             new_value = row._changes.get(column.name)
1113             if new_value is None or new_value == datum:
1114                 return
1115
1116         txn._txn_rows[row.uuid] = row
1117         row._changes[column.name] = datum.copy()
1118
1119     def insert(self, table, new_uuid=None):
1120         """Inserts and returns a new row in 'table', which must be one of the
1121         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1122
1123         The new row is assigned a provisional UUID.  If 'uuid' is None then one
1124         is randomly generated; otherwise 'uuid' should specify a randomly
1125         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
1126         different UUID when 'txn' is committed, but the IDL will replace any
1127         uses of the provisional UUID in the data to be to be committed by the
1128         UUID assigned by ovsdb-server."""
1129         assert self._status == Transaction.UNCOMMITTED
1130         if new_uuid is None:
1131             new_uuid = uuid.uuid4()
1132         row = Row(self.idl, table, new_uuid, None)
1133         table.rows[row.uuid] = row
1134         self._txn_rows[row.uuid] = row
1135         return row
1136
1137     def _process_reply(self, msg):
1138         if msg.type == ovs.jsonrpc.Message.T_ERROR:
1139             self._status = Transaction.ERROR
1140         elif type(msg.result) not in (list, tuple):
1141             # XXX rate-limit
1142             vlog.warn('reply to "transact" is not JSON array')
1143         else:
1144             hard_errors = False
1145             soft_errors = False
1146             lock_errors = False
1147
1148             ops = msg.result
1149             for op in ops:
1150                 if op is None:
1151                     # This isn't an error in itself but indicates that some
1152                     # prior operation failed, so make sure that we know about
1153                     # it.
1154                     soft_errors = True
1155                 elif type(op) == dict:
1156                     error = op.get("error")
1157                     if error is not None:
1158                         if error == "timed out":
1159                             soft_errors = True
1160                         elif error == "not owner":
1161                             lock_errors = True
1162                         elif error == "aborted":
1163                             pass
1164                         else:
1165                             hard_errors = True
1166                             self.__set_error_json(op)
1167                 else:
1168                     hard_errors = True
1169                     self.__set_error_json(op)
1170                     # XXX rate-limit
1171                     vlog.warn("operation reply is not JSON null or object")
1172
1173             if not soft_errors and not hard_errors and not lock_errors:
1174                 if self._inc_row and not self.__process_inc_reply(ops):
1175                     hard_errors = True
1176                 if self._fetch_requests:
1177                     if self.__process_fetch_reply(ops):
1178                         self.idl.change_seqno += 1
1179                     else:
1180                         hard_errors = True
1181
1182                 for insert in self._inserted_rows.itervalues():
1183                     if not self.__process_insert_reply(insert, ops):
1184                         hard_errors = True
1185
1186             if hard_errors:
1187                 self._status = Transaction.ERROR
1188             elif lock_errors:
1189                 self._status = Transaction.NOT_LOCKED
1190             elif soft_errors:
1191                 self._status = Transaction.TRY_AGAIN
1192             else:
1193                 self._status = Transaction.SUCCESS
1194
1195     @staticmethod
1196     def __check_json_type(json, types, name):
1197         if not json:
1198             # XXX rate-limit
1199             vlog.warn("%s is missing" % name)
1200             return False
1201         elif type(json) not in types:
1202             # XXX rate-limit
1203             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1204             return False
1205         else:
1206             return True
1207
1208     def __process_fetch_reply(self, ops):
1209         update = False
1210         for fetch_request in self._fetch_requests:
1211             row = fetch_request["row"]
1212             column_name = fetch_request["column_name"]
1213             index = fetch_request["index"]
1214             table = row._table
1215
1216             select = ops[index]
1217             fetched_rows = select.get("rows")
1218             if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1219                                                  '"select" reply "rows"'):
1220                 return False
1221             if len(fetched_rows) != 1:
1222                 # XXX rate-limit
1223                 vlog.warn('"select" reply "rows" has %d elements '
1224                           'instead of 1' % len(fetched_rows))
1225                 continue
1226             fetched_row = fetched_rows[0]
1227             if not Transaction.__check_json_type(fetched_row, (dict,),
1228                                                  '"select" reply row'):
1229                 continue
1230
1231             column = table.columns.get(column_name)
1232             datum_json = fetched_row.get(column_name)
1233             datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1234
1235             row._data[column_name] = datum
1236             update = True
1237
1238         return update
1239
1240     def __process_inc_reply(self, ops):
1241         if self._inc_index + 2 > len(ops):
1242             # XXX rate-limit
1243             vlog.warn("reply does not contain enough operations for "
1244                       "increment (has %d, needs %d)" %
1245                       (len(ops), self._inc_index + 2))
1246
1247         # We know that this is a JSON object because the loop in
1248         # __process_reply() already checked.
1249         mutate = ops[self._inc_index]
1250         count = mutate.get("count")
1251         if not Transaction.__check_json_type(count, (int, long),
1252                                              '"mutate" reply "count"'):
1253             return False
1254         if count != 1:
1255             # XXX rate-limit
1256             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1257             return False
1258
1259         select = ops[self._inc_index + 1]
1260         rows = select.get("rows")
1261         if not Transaction.__check_json_type(rows, (list, tuple),
1262                                              '"select" reply "rows"'):
1263             return False
1264         if len(rows) != 1:
1265             # XXX rate-limit
1266             vlog.warn('"select" reply "rows" has %d elements '
1267                       'instead of 1' % len(rows))
1268             return False
1269         row = rows[0]
1270         if not Transaction.__check_json_type(row, (dict,),
1271                                              '"select" reply row'):
1272             return False
1273         column = row.get(self._inc_column)
1274         if not Transaction.__check_json_type(column, (int, long),
1275                                              '"select" reply inc column'):
1276             return False
1277         self._inc_new_value = column
1278         return True
1279
1280     def __process_insert_reply(self, insert, ops):
1281         if insert.op_index >= len(ops):
1282             # XXX rate-limit
1283             vlog.warn("reply does not contain enough operations "
1284                       "for insert (has %d, needs %d)"
1285                       % (len(ops), insert.op_index))
1286             return False
1287
1288         # We know that this is a JSON object because the loop in
1289         # __process_reply() already checked.
1290         reply = ops[insert.op_index]
1291         json_uuid = reply.get("uuid")
1292         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1293                                              '"insert" reply "uuid"'):
1294             return False
1295
1296         try:
1297             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1298         except error.Error:
1299             # XXX rate-limit
1300             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1301             return False
1302
1303         insert.real = uuid_
1304         return True
1305
1306
1307 class SchemaHelper(object):
1308     """IDL Schema helper.
1309
1310     This class encapsulates the logic required to generate schemas suitable
1311     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1312     they are interested in using register_columns().  When finished, the
1313     get_idl_schema() function may be called.
1314
1315     The location on disk of the schema used may be found in the
1316     'schema_location' variable."""
1317
1318     def __init__(self, location=None, schema_json=None):
1319         """Creates a new Schema object.
1320
1321         'location' file path to ovs schema. None means default location
1322         'schema_json' schema in json preresentation in memory
1323         """
1324
1325         if location and schema_json:
1326             raise ValueError("both location and schema_json can't be "
1327                              "specified. it's ambiguous.")
1328         if schema_json is None:
1329             if location is None:
1330                 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1331             schema_json = ovs.json.from_file(location)
1332
1333         self.schema_json = schema_json
1334         self._tables = {}
1335         self._readonly = {}
1336         self._all = False
1337
1338     def register_columns(self, table, columns, readonly=[]):
1339         """Registers interest in the given 'columns' of 'table'.  Future calls
1340         to get_idl_schema() will include 'table':column for each column in
1341         'columns'. This function automatically avoids adding duplicate entries
1342         to the schema.
1343         A subset of 'columns' can be specified as 'readonly'. The readonly
1344         columns are not replicated but can be fetched on-demand by the user
1345         with Row.fetch().
1346
1347         'table' must be a string.
1348         'columns' must be a list of strings.
1349         'readonly' must be a list of strings.
1350         """
1351
1352         assert type(table) is str
1353         assert type(columns) is list
1354
1355         columns = set(columns) | self._tables.get(table, set())
1356         self._tables[table] = columns
1357         self._readonly[table] = readonly
1358
1359     def register_table(self, table):
1360         """Registers interest in the given all columns of 'table'. Future calls
1361         to get_idl_schema() will include all columns of 'table'.
1362
1363         'table' must be a string
1364         """
1365         assert type(table) is str
1366         self._tables[table] = set()  # empty set means all columns in the table
1367
1368     def register_all(self):
1369         """Registers interest in every column of every table."""
1370         self._all = True
1371
1372     def get_idl_schema(self):
1373         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1374         object based on columns registered using the register_columns()
1375         function."""
1376
1377         schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1378         self.schema_json = None
1379
1380         if not self._all:
1381             schema_tables = {}
1382             for table, columns in self._tables.iteritems():
1383                 schema_tables[table] = (
1384                     self._keep_table_columns(schema, table, columns))
1385
1386             schema.tables = schema_tables
1387         schema.readonly = self._readonly
1388         return schema
1389
1390     def _keep_table_columns(self, schema, table_name, columns):
1391         assert table_name in schema.tables
1392         table = schema.tables[table_name]
1393
1394         if not columns:
1395             # empty set means all columns in the table
1396             return table
1397
1398         new_columns = {}
1399         for column_name in columns:
1400             assert type(column_name) is str
1401             assert column_name in table.columns
1402
1403             new_columns[column_name] = table.columns[column_name]
1404
1405         table.columns = new_columns
1406         return table