python: Resolve some indentation warnings.
[cascardo/ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import uuid
16
17 import ovs.jsonrpc
18 import ovs.db.parser
19 import ovs.db.schema
20 from ovs.db import error
21 import ovs.ovsuuid
22 import ovs.poller
23 import ovs.vlog
24
25 vlog = ovs.vlog.Vlog("idl")
26
27 __pychecker__ = 'no-classattr no-objattrs'
28
29 ROW_CREATE = "create"
30 ROW_UPDATE = "update"
31 ROW_DELETE = "delete"
32
33
34 class Idl(object):
35     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
36
37     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
38     requests to an OVSDB database server and parses the responses, converting
39     raw JSON into data structures that are easier for clients to digest.
40
41     The IDL also assists with issuing database transactions.  The client
42     creates a transaction, manipulates the IDL data structures, and commits or
43     aborts the transaction.  The IDL then composes and issues the necessary
44     JSON-RPC requests and reports to the client whether the transaction
45     completed successfully.
46
47     The client is allowed to access the following attributes directly, in a
48     read-only fashion:
49
50     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
51       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
52       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
53       to a Row object.
54
55       The client may directly read and write the Row objects referenced by the
56       'rows' map values.  Refer to Row for more details.
57
58     - 'change_seqno': A number that represents the IDL's state.  When the IDL
59       is updated (by Idl.run()), its value changes.  The sequence number can
60       occasionally change even if the database does not.  This happens if the
61       connection to the database drops and reconnects, which causes the
62       database contents to be reloaded even if they didn't change.  (It could
63       also happen if the database server sends out a "change" that reflects
64       what the IDL already thought was in the database.  The database server is
65       not supposed to do that, but bugs could in theory cause it to do so.)
66
67     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
68       if no lock is configured.
69
70     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
71       lock, and False otherwise.
72
73       Locking and unlocking happens asynchronously from the database client's
74       point of view, so the information is only useful for optimization
75       (e.g. if the client doesn't have the lock then there's no point in trying
76       to write to the database).
77
78     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
79       the database server has indicated that some other client already owns the
80       requested lock, and False otherwise.
81
82     - 'txn': The ovs.db.idl.Transaction object for the database transaction
83       currently being constructed, if there is one, or None otherwise.
84 """
85
86     def __init__(self, remote, schema):
87         """Creates and returns a connection to the database named 'db_name' on
88         'remote', which should be in a form acceptable to
89         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
90         replica of the remote database.
91
92         'schema' should be the schema for the remote database.  The caller may
93         have cut it down by removing tables or columns that are not of
94         interest.  The IDL will only replicate the tables and columns that
95         remain.  The caller may also add a attribute named 'alert' to selected
96         remaining columns, setting its value to False; if so, then changes to
97         those columns will not be considered changes to the database for the
98         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
99         useful for columns that the IDL's client will write but not read.
100
101         As a convenience to users, 'schema' may also be an instance of the
102         SchemaHelper class.
103
104         The IDL uses and modifies 'schema' directly."""
105
106         assert isinstance(schema, SchemaHelper)
107         schema = schema.get_idl_schema()
108
109         self.tables = schema.tables
110         self.readonly = schema.readonly
111         self._db = schema
112         self._session = ovs.jsonrpc.Session.open(remote)
113         self._monitor_request_id = None
114         self._last_seqno = None
115         self.change_seqno = 0
116
117         # Database locking.
118         self.lock_name = None          # Name of lock we need, None if none.
119         self.has_lock = False          # Has db server said we have the lock?
120         self.is_lock_contended = False  # Has db server said we can't get lock?
121         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
122
123         # Transaction support.
124         self.txn = None
125         self._outstanding_txns = {}
126
127         for table in schema.tables.itervalues():
128             for column in table.columns.itervalues():
129                 if not hasattr(column, 'alert'):
130                     column.alert = True
131             table.need_table = False
132             table.rows = {}
133             table.idl = self
134
135     def close(self):
136         """Closes the connection to the database.  The IDL will no longer
137         update."""
138         self._session.close()
139
140     def run(self):
141         """Processes a batch of messages from the database server.  Returns
142         True if the database as seen through the IDL changed, False if it did
143         not change.  The initial fetch of the entire contents of the remote
144         database is considered to be one kind of change.  If the IDL has been
145         configured to acquire a database lock (with Idl.set_lock()), then
146         successfully acquiring the lock is also considered to be a change.
147
148         This function can return occasional false positives, that is, report
149         that the database changed even though it didn't.  This happens if the
150         connection to the database drops and reconnects, which causes the
151         database contents to be reloaded even if they didn't change.  (It could
152         also happen if the database server sends out a "change" that reflects
153         what we already thought was in the database, but the database server is
154         not supposed to do that.)
155
156         As an alternative to checking the return value, the client may check
157         for changes in self.change_seqno."""
158         assert not self.txn
159         initial_change_seqno = self.change_seqno
160         self._session.run()
161         i = 0
162         while i < 50:
163             i += 1
164             if not self._session.is_connected():
165                 break
166
167             seqno = self._session.get_seqno()
168             if seqno != self._last_seqno:
169                 self._last_seqno = seqno
170                 self.__txn_abort_all()
171                 self.__send_monitor_request()
172                 if self.lock_name:
173                     self.__send_lock_request()
174                 break
175
176             msg = self._session.recv()
177             if msg is None:
178                 break
179             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
180                 and msg.method == "update"
181                 and len(msg.params) == 2
182                 and msg.params[0] is None):
183                 # Database contents changed.
184                 self.__parse_update(msg.params[1])
185             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
186                   and self._monitor_request_id is not None
187                   and self._monitor_request_id == msg.id):
188                 # Reply to our "monitor" request.
189                 try:
190                     self.change_seqno += 1
191                     self._monitor_request_id = None
192                     self.__clear()
193                     self.__parse_update(msg.result)
194                 except error.Error, e:
195                     vlog.err("%s: parse error in received schema: %s"
196                               % (self._session.get_name(), e))
197                     self.__error()
198             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
199                   and self._lock_request_id is not None
200                   and self._lock_request_id == msg.id):
201                 # Reply to our "lock" request.
202                 self.__parse_lock_reply(msg.result)
203             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
204                   and msg.method == "locked"):
205                 # We got our lock.
206                 self.__parse_lock_notify(msg.params, True)
207             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
208                   and msg.method == "stolen"):
209                 # Someone else stole our lock.
210                 self.__parse_lock_notify(msg.params, False)
211             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
212                 # Reply to our echo request.  Ignore it.
213                 pass
214             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
215                                ovs.jsonrpc.Message.T_REPLY)
216                   and self.__txn_process_reply(msg)):
217                 # __txn_process_reply() did everything needed.
218                 pass
219             else:
220                 # This can happen if a transaction is destroyed before we
221                 # receive the reply, so keep the log level low.
222                 vlog.dbg("%s: received unexpected %s message"
223                          % (self._session.get_name(),
224                              ovs.jsonrpc.Message.type_to_string(msg.type)))
225
226         return initial_change_seqno != self.change_seqno
227
228     def wait(self, poller):
229         """Arranges for poller.block() to wake up when self.run() has something
230         to do or when activity occurs on a transaction on 'self'."""
231         self._session.wait(poller)
232         self._session.recv_wait(poller)
233
234     def has_ever_connected(self):
235         """Returns True, if the IDL successfully connected to the remote
236         database and retrieved its contents (even if the connection
237         subsequently dropped and is in the process of reconnecting).  If so,
238         then the IDL contains an atomic snapshot of the database's contents
239         (but it might be arbitrarily old if the connection dropped).
240
241         Returns False if the IDL has never connected or retrieved the
242         database's contents.  If so, the IDL is empty."""
243         return self.change_seqno != 0
244
245     def force_reconnect(self):
246         """Forces the IDL to drop its connection to the database and reconnect.
247         In the meantime, the contents of the IDL will not change."""
248         self._session.force_reconnect()
249
250     def set_lock(self, lock_name):
251         """If 'lock_name' is not None, configures the IDL to obtain the named
252         lock from the database server and to avoid modifying the database when
253         the lock cannot be acquired (that is, when another client has the same
254         lock).
255
256         If 'lock_name' is None, drops the locking requirement and releases the
257         lock."""
258         assert not self.txn
259         assert not self._outstanding_txns
260
261         if self.lock_name and (not lock_name or lock_name != self.lock_name):
262             # Release previous lock.
263             self.__send_unlock_request()
264             self.lock_name = None
265             self.is_lock_contended = False
266
267         if lock_name and not self.lock_name:
268             # Acquire new lock.
269             self.lock_name = lock_name
270             self.__send_lock_request()
271
272     def notify(self, event, row, updates=None):
273         """Hook for implementing create/update/delete notifications
274
275         :param event:   The event that was triggered
276         :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE
277         :param row:     The row as it is after the operation has occured
278         :type row:      Row
279         :param updates: For updates, a Row object with just the changed columns
280         :type updates:  Row
281         """
282
283     def __clear(self):
284         changed = False
285
286         for table in self.tables.itervalues():
287             if table.rows:
288                 changed = True
289                 table.rows = {}
290
291         if changed:
292             self.change_seqno += 1
293
294     def __update_has_lock(self, new_has_lock):
295         if new_has_lock and not self.has_lock:
296             if self._monitor_request_id is None:
297                 self.change_seqno += 1
298             else:
299                 # We're waiting for a monitor reply, so don't signal that the
300                 # database changed.  The monitor reply will increment
301                 # change_seqno anyhow.
302                 pass
303             self.is_lock_contended = False
304         self.has_lock = new_has_lock
305
306     def __do_send_lock_request(self, method):
307         self.__update_has_lock(False)
308         self._lock_request_id = None
309         if self._session.is_connected():
310             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
311             msg_id = msg.id
312             self._session.send(msg)
313         else:
314             msg_id = None
315         return msg_id
316
317     def __send_lock_request(self):
318         self._lock_request_id = self.__do_send_lock_request("lock")
319
320     def __send_unlock_request(self):
321         self.__do_send_lock_request("unlock")
322
323     def __parse_lock_reply(self, result):
324         self._lock_request_id = None
325         got_lock = type(result) == dict and result.get("locked") is True
326         self.__update_has_lock(got_lock)
327         if not got_lock:
328             self.is_lock_contended = True
329
330     def __parse_lock_notify(self, params, new_has_lock):
331         if (self.lock_name is not None
332             and type(params) in (list, tuple)
333             and params
334             and params[0] == self.lock_name):
335             self.__update_has_lock(self, new_has_lock)
336             if not new_has_lock:
337                 self.is_lock_contended = True
338
339     def __send_monitor_request(self):
340         monitor_requests = {}
341         for table in self.tables.itervalues():
342             columns = []
343             for column in table.columns.keys():
344                 if ((table.name not in self.readonly) or
345                     (table.name in self.readonly) and
346                     (column not in self.readonly[table.name])):
347                     columns.append(column)
348             monitor_requests[table.name] = {"columns": columns}
349         msg = ovs.jsonrpc.Message.create_request(
350             "monitor", [self._db.name, None, monitor_requests])
351         self._monitor_request_id = msg.id
352         self._session.send(msg)
353
354     def __parse_update(self, update):
355         try:
356             self.__do_parse_update(update)
357         except error.Error, e:
358             vlog.err("%s: error parsing update: %s"
359                      % (self._session.get_name(), e))
360
361     def __do_parse_update(self, table_updates):
362         if type(table_updates) != dict:
363             raise error.Error("<table-updates> is not an object",
364                               table_updates)
365
366         for table_name, table_update in table_updates.iteritems():
367             table = self.tables.get(table_name)
368             if not table:
369                 raise error.Error('<table-updates> includes unknown '
370                                   'table "%s"' % table_name)
371
372             if type(table_update) != dict:
373                 raise error.Error('<table-update> for table "%s" is not '
374                                   'an object' % table_name, table_update)
375
376             for uuid_string, row_update in table_update.iteritems():
377                 if not ovs.ovsuuid.is_valid_string(uuid_string):
378                     raise error.Error('<table-update> for table "%s" '
379                                       'contains bad UUID "%s" as member '
380                                       'name' % (table_name, uuid_string),
381                                       table_update)
382                 uuid = ovs.ovsuuid.from_string(uuid_string)
383
384                 if type(row_update) != dict:
385                     raise error.Error('<table-update> for table "%s" '
386                                       'contains <row-update> for %s that '
387                                       'is not an object'
388                                       % (table_name, uuid_string))
389
390                 parser = ovs.db.parser.Parser(row_update, "row-update")
391                 old = parser.get_optional("old", [dict])
392                 new = parser.get_optional("new", [dict])
393                 parser.finish()
394
395                 if not old and not new:
396                     raise error.Error('<row-update> missing "old" and '
397                                       '"new" members', row_update)
398
399                 if self.__process_update(table, uuid, old, new):
400                     self.change_seqno += 1
401
402     def __process_update(self, table, uuid, old, new):
403         """Returns True if a column changed, False otherwise."""
404         row = table.rows.get(uuid)
405         changed = False
406         if not new:
407             # Delete row.
408             if row:
409                 del table.rows[uuid]
410                 changed = True
411                 self.notify(ROW_DELETE, row)
412             else:
413                 # XXX rate-limit
414                 vlog.warn("cannot delete missing row %s from table %s"
415                           % (uuid, table.name))
416         elif not old:
417             # Insert row.
418             if not row:
419                 row = self.__create_row(table, uuid)
420                 changed = True
421             else:
422                 # XXX rate-limit
423                 vlog.warn("cannot add existing row %s to table %s"
424                           % (uuid, table.name))
425             if self.__row_update(table, row, new):
426                 changed = True
427                 self.notify(ROW_CREATE, row)
428         else:
429             op = ROW_UPDATE
430             if not row:
431                 row = self.__create_row(table, uuid)
432                 changed = True
433                 op = ROW_CREATE
434                 # XXX rate-limit
435                 vlog.warn("cannot modify missing row %s in table %s"
436                           % (uuid, table.name))
437             if self.__row_update(table, row, new):
438                 changed = True
439                 self.notify(op, row, Row.from_json(self, table, uuid, old))
440         return changed
441
442     def __row_update(self, table, row, row_json):
443         changed = False
444         for column_name, datum_json in row_json.iteritems():
445             column = table.columns.get(column_name)
446             if not column:
447                 # XXX rate-limit
448                 vlog.warn("unknown column %s updating table %s"
449                           % (column_name, table.name))
450                 continue
451
452             try:
453                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
454             except error.Error, e:
455                 # XXX rate-limit
456                 vlog.warn("error parsing column %s in table %s: %s"
457                           % (column_name, table.name, e))
458                 continue
459
460             if datum != row._data[column_name]:
461                 row._data[column_name] = datum
462                 if column.alert:
463                     changed = True
464             else:
465                 # Didn't really change but the OVSDB monitor protocol always
466                 # includes every value in a row.
467                 pass
468         return changed
469
470     def __create_row(self, table, uuid):
471         data = {}
472         for column in table.columns.itervalues():
473             data[column.name] = ovs.db.data.Datum.default(column.type)
474         row = table.rows[uuid] = Row(self, table, uuid, data)
475         return row
476
477     def __error(self):
478         self._session.force_reconnect()
479
480     def __txn_abort_all(self):
481         while self._outstanding_txns:
482             txn = self._outstanding_txns.popitem()[1]
483             txn._status = Transaction.TRY_AGAIN
484
485     def __txn_process_reply(self, msg):
486         txn = self._outstanding_txns.pop(msg.id, None)
487         if txn:
488             txn._process_reply(msg)
489
490
491 def _uuid_to_row(atom, base):
492     if base.ref_table:
493         return base.ref_table.rows.get(atom)
494     else:
495         return atom
496
497
498 def _row_to_uuid(value):
499     if type(value) == Row:
500         return value.uuid
501     else:
502         return value
503
504
505 class Row(object):
506     """A row within an IDL.
507
508     The client may access the following attributes directly:
509
510     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
511
512     - An attribute for each column in the Row's table, named for the column,
513       whose values are as returned by Datum.to_python() for the column's type.
514
515       If some error occurs (e.g. the database server's idea of the column is
516       different from the IDL's idea), then the attribute values is the
517       "default" value return by Datum.default() for the column's type.  (It is
518       important to know this because the default value may violate constraints
519       for the column's type, e.g. the default integer value is 0 even if column
520       contraints require the column's value to be positive.)
521
522       When a transaction is active, column attributes may also be assigned new
523       values.  Committing the transaction will then cause the new value to be
524       stored into the database.
525
526       *NOTE*: In the current implementation, the value of a column is a *copy*
527       of the value in the database.  This means that modifying its value
528       directly will have no useful effect.  For example, the following:
529         row.mycolumn["a"] = "b"              # don't do this
530       will not change anything in the database, even after commit.  To modify
531       the column, instead assign the modified column value back to the column:
532         d = row.mycolumn
533         d["a"] = "b"
534         row.mycolumn = d
535 """
536     def __init__(self, idl, table, uuid, data):
537         # All of the explicit references to self.__dict__ below are required
538         # to set real attributes with invoking self.__getattr__().
539         self.__dict__["uuid"] = uuid
540
541         self.__dict__["_idl"] = idl
542         self.__dict__["_table"] = table
543
544         # _data is the committed data.  It takes the following values:
545         #
546         #   - A dictionary that maps every column name to a Datum, if the row
547         #     exists in the committed form of the database.
548         #
549         #   - None, if this row is newly inserted within the active transaction
550         #     and thus has no committed form.
551         self.__dict__["_data"] = data
552
553         # _changes describes changes to this row within the active transaction.
554         # It takes the following values:
555         #
556         #   - {}, the empty dictionary, if no transaction is active or if the
557         #     row has yet not been changed within this transaction.
558         #
559         #   - A dictionary that maps a column name to its new Datum, if an
560         #     active transaction changes those columns' values.
561         #
562         #   - A dictionary that maps every column name to a Datum, if the row
563         #     is newly inserted within the active transaction.
564         #
565         #   - None, if this transaction deletes this row.
566         self.__dict__["_changes"] = {}
567
568         # A dictionary whose keys are the names of columns that must be
569         # verified as prerequisites when the transaction commits.  The values
570         # in the dictionary are all None.
571         self.__dict__["_prereqs"] = {}
572
573     def __getattr__(self, column_name):
574         assert self._changes is not None
575
576         datum = self._changes.get(column_name)
577         if datum is None:
578             if self._data is None:
579                 raise AttributeError("%s instance has no attribute '%s'" %
580                                      (self.__class__.__name__, column_name))
581             if column_name in self._data:
582                 datum = self._data[column_name]
583             else:
584                 raise AttributeError("%s instance has no attribute '%s'" %
585                                      (self.__class__.__name__, column_name))
586
587         return datum.to_python(_uuid_to_row)
588
589     def __setattr__(self, column_name, value):
590         assert self._changes is not None
591         assert self._idl.txn
592
593         if ((self._table.name in self._idl.readonly) and
594             (column_name in self._idl.readonly[self._table.name])):
595             vlog.warn("attempting to write to readonly column %s" % column_name)
596             return
597
598         column = self._table.columns[column_name]
599         try:
600             datum = ovs.db.data.Datum.from_python(column.type, value,
601                                                   _row_to_uuid)
602         except error.Error, e:
603             # XXX rate-limit
604             vlog.err("attempting to write bad value to column %s (%s)"
605                      % (column_name, e))
606             return
607         self._idl.txn._write(self, column, datum)
608
609     @classmethod
610     def from_json(cls, idl, table, uuid, row_json):
611         data = {}
612         for column_name, datum_json in row_json.iteritems():
613             column = table.columns.get(column_name)
614             if not column:
615                 # XXX rate-limit
616                 vlog.warn("unknown column %s in table %s"
617                           % (column_name, table.name))
618                 continue
619             try:
620                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
621             except error.Error, e:
622                 # XXX rate-limit
623                 vlog.warn("error parsing column %s in table %s: %s"
624                           % (column_name, table.name, e))
625                 continue
626             data[column_name] = datum
627         return cls(idl, table, uuid, data)
628
629     def verify(self, column_name):
630         """Causes the original contents of column 'column_name' in this row to
631         be verified as a prerequisite to completing the transaction.  That is,
632         if 'column_name' changed in this row (or if this row was deleted)
633         between the time that the IDL originally read its contents and the time
634         that the transaction commits, then the transaction aborts and
635         Transaction.commit() returns Transaction.TRY_AGAIN.
636
637         The intention is that, to ensure that no transaction commits based on
638         dirty reads, an application should call Row.verify() on each data item
639         read as part of a read-modify-write operation.
640
641         In some cases Row.verify() reduces to a no-op, because the current
642         value of the column is already known:
643
644           - If this row is a row created by the current transaction (returned
645             by Transaction.insert()).
646
647           - If the column has already been modified within the current
648             transaction.
649
650         Because of the latter property, always call Row.verify() *before*
651         modifying the column, for a given read-modify-write.
652
653         A transaction must be in progress."""
654         assert self._idl.txn
655         assert self._changes is not None
656         if not self._data or column_name in self._changes:
657             return
658
659         self._prereqs[column_name] = None
660
661     def delete(self):
662         """Deletes this row from its table.
663
664         A transaction must be in progress."""
665         assert self._idl.txn
666         assert self._changes is not None
667         if self._data is None:
668             del self._idl.txn._txn_rows[self.uuid]
669         else:
670             self._idl.txn._txn_rows[self.uuid] = self
671         self.__dict__["_changes"] = None
672         del self._table.rows[self.uuid]
673
674     def fetch(self, column_name):
675         self._idl.txn._fetch(self, column_name)
676
677     def increment(self, column_name):
678         """Causes the transaction, when committed, to increment the value of
679         'column_name' within this row by 1.  'column_name' must have an integer
680         type.  After the transaction commits successfully, the client may
681         retrieve the final (incremented) value of 'column_name' with
682         Transaction.get_increment_new_value().
683
684         The client could accomplish something similar by reading and writing
685         and verify()ing columns.  However, increment() will never (by itself)
686         cause a transaction to fail because of a verify error.
687
688         The intended use is for incrementing the "next_cfg" column in
689         the Open_vSwitch table."""
690         self._idl.txn._increment(self, column_name)
691
692
693 def _uuid_name_from_uuid(uuid):
694     return "row%s" % str(uuid).replace("-", "_")
695
696
697 def _where_uuid_equals(uuid):
698     return [["_uuid", "==", ["uuid", str(uuid)]]]
699
700
701 class _InsertedRow(object):
702     def __init__(self, op_index):
703         self.op_index = op_index
704         self.real = None
705
706
707 class Transaction(object):
708     """A transaction may modify the contents of a database by modifying the
709     values of columns, deleting rows, inserting rows, or adding checks that
710     columns in the database have not changed ("verify" operations), through
711     Row methods.
712
713     Reading and writing columns and inserting and deleting rows are all
714     straightforward.  The reasons to verify columns are less obvious.
715     Verification is the key to maintaining transactional integrity.  Because
716     OVSDB handles multiple clients, it can happen that between the time that
717     OVSDB client A reads a column and writes a new value, OVSDB client B has
718     written that column.  Client A's write should not ordinarily overwrite
719     client B's, especially if the column in question is a "map" column that
720     contains several more or less independent data items.  If client A adds a
721     "verify" operation before it writes the column, then the transaction fails
722     in case client B modifies it first.  Client A will then see the new value
723     of the column and compose a new transaction based on the new contents
724     written by client B.
725
726     When a transaction is complete, which must be before the next call to
727     Idl.run(), call Transaction.commit() or Transaction.abort().
728
729     The life-cycle of a transaction looks like this:
730
731     1. Create the transaction and record the initial sequence number:
732
733         seqno = idl.change_seqno(idl)
734         txn = Transaction(idl)
735
736     2. Modify the database with Row and Transaction methods.
737
738     3. Commit the transaction by calling Transaction.commit().  The first call
739        to this function probably returns Transaction.INCOMPLETE.  The client
740        must keep calling again along as this remains true, calling Idl.run() in
741        between to let the IDL do protocol processing.  (If the client doesn't
742        have anything else to do in the meantime, it can use
743        Transaction.commit_block() to avoid having to loop itself.)
744
745     4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
746        to change from the saved 'seqno' (it's possible that it's already
747        changed, in which case the client should not wait at all), then start
748        over from step 1.  Only a call to Idl.run() will change the return value
749        of Idl.change_seqno.  (Transaction.commit_block() calls Idl.run().)"""
750
751     # Status values that Transaction.commit() can return.
752
753     # Not yet committed or aborted.
754     UNCOMMITTED = "uncommitted"
755     # Transaction didn't include any changes.
756     UNCHANGED = "unchanged"
757     # Commit in progress, please wait.
758     INCOMPLETE = "incomplete"
759     # ovsdb_idl_txn_abort() called.
760     ABORTED = "aborted"
761     # Commit successful.
762     SUCCESS = "success"
763     # Commit failed because a "verify" operation
764     # reported an inconsistency, due to a network
765     # problem, or other transient failure.  Wait
766     # for a change, then try again.
767     TRY_AGAIN = "try again"
768     # Server hasn't given us the lock yet.
769     NOT_LOCKED = "not locked"
770     # Commit failed due to a hard error.
771     ERROR = "error"
772
773     @staticmethod
774     def status_to_string(status):
775         """Converts one of the status values that Transaction.commit() can
776         return into a human-readable string.
777
778         (The status values are in fact such strings already, so
779         there's nothing to do.)"""
780         return status
781
782     def __init__(self, idl):
783         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
784         A given Idl may only have a single active transaction at a time.
785
786         A Transaction may modify the contents of a database by assigning new
787         values to columns (attributes of Row), deleting rows (with
788         Row.delete()), or inserting rows (with Transaction.insert()).  It may
789         also check that columns in the database have not changed with
790         Row.verify().
791
792         When a transaction is complete (which must be before the next call to
793         Idl.run()), call Transaction.commit() or Transaction.abort()."""
794         assert idl.txn is None
795
796         idl.txn = self
797         self._request_id = None
798         self.idl = idl
799         self.dry_run = False
800         self._txn_rows = {}
801         self._status = Transaction.UNCOMMITTED
802         self._error = None
803         self._comments = []
804
805         self._inc_row = None
806         self._inc_column = None
807
808         self._fetch_requests = []
809
810         self._inserted_rows = {}  # Map from UUID to _InsertedRow
811
812     def add_comment(self, comment):
813         """Appends 'comment' to the comments that will be passed to the OVSDB
814         server when this transaction is committed.  (The comment will be
815         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
816         relatively human-readable form.)"""
817         self._comments.append(comment)
818
819     def wait(self, poller):
820         """Causes poll_block() to wake up if this transaction has completed
821         committing."""
822         if self._status not in (Transaction.UNCOMMITTED,
823                                 Transaction.INCOMPLETE):
824             poller.immediate_wake()
825
826     def _substitute_uuids(self, json):
827         if type(json) in (list, tuple):
828             if (len(json) == 2
829                 and json[0] == 'uuid'
830                 and ovs.ovsuuid.is_valid_string(json[1])):
831                 uuid = ovs.ovsuuid.from_string(json[1])
832                 row = self._txn_rows.get(uuid, None)
833                 if row and row._data is None:
834                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
835             else:
836                 return [self._substitute_uuids(elem) for elem in json]
837         return json
838
839     def __disassemble(self):
840         self.idl.txn = None
841
842         for row in self._txn_rows.itervalues():
843             if row._changes is None:
844                 row._table.rows[row.uuid] = row
845             elif row._data is None:
846                 del row._table.rows[row.uuid]
847             row.__dict__["_changes"] = {}
848             row.__dict__["_prereqs"] = {}
849         self._txn_rows = {}
850
851     def commit(self):
852         """Attempts to commit 'txn'.  Returns the status of the commit
853         operation, one of the following constants:
854
855           Transaction.INCOMPLETE:
856
857               The transaction is in progress, but not yet complete.  The caller
858               should call again later, after calling Idl.run() to let the
859               IDL do OVSDB protocol processing.
860
861           Transaction.UNCHANGED:
862
863               The transaction is complete.  (It didn't actually change the
864               database, so the IDL didn't send any request to the database
865               server.)
866
867           Transaction.ABORTED:
868
869               The caller previously called Transaction.abort().
870
871           Transaction.SUCCESS:
872
873               The transaction was successful.  The update made by the
874               transaction (and possibly other changes made by other database
875               clients) should already be visible in the IDL.
876
877           Transaction.TRY_AGAIN:
878
879               The transaction failed for some transient reason, e.g. because a
880               "verify" operation reported an inconsistency or due to a network
881               problem.  The caller should wait for a change to the database,
882               then compose a new transaction, and commit the new transaction.
883
884               Use Idl.change_seqno to wait for a change in the database.  It is
885               important to use its value *before* the initial call to
886               Transaction.commit() as the baseline for this purpose, because
887               the change that one should wait for can happen after the initial
888               call but before the call that returns Transaction.TRY_AGAIN, and
889               using some other baseline value in that situation could cause an
890               indefinite wait if the database rarely changes.
891
892           Transaction.NOT_LOCKED:
893
894               The transaction failed because the IDL has been configured to
895               require a database lock (with Idl.set_lock()) but didn't
896               get it yet or has already lost it.
897
898         Committing a transaction rolls back all of the changes that it made to
899         the IDL's copy of the database.  If the transaction commits
900         successfully, then the database server will send an update and, thus,
901         the IDL will be updated with the committed changes."""
902         # The status can only change if we're the active transaction.
903         # (Otherwise, our status will change only in Idl.run().)
904         if self != self.idl.txn:
905             return self._status
906
907         # If we need a lock but don't have it, give up quickly.
908         if self.idl.lock_name and not self.idl.has_lock:
909             self._status = Transaction.NOT_LOCKED
910             self.__disassemble()
911             return self._status
912
913         operations = [self.idl._db.name]
914
915         # Assert that we have the required lock (avoiding a race).
916         if self.idl.lock_name:
917             operations.append({"op": "assert",
918                                "lock": self.idl.lock_name})
919
920         # Add prerequisites and declarations of new rows.
921         for row in self._txn_rows.itervalues():
922             if row._prereqs:
923                 rows = {}
924                 columns = []
925                 for column_name in row._prereqs:
926                     columns.append(column_name)
927                     rows[column_name] = row._data[column_name].to_json()
928                 operations.append({"op": "wait",
929                                    "table": row._table.name,
930                                    "timeout": 0,
931                                    "where": _where_uuid_equals(row.uuid),
932                                    "until": "==",
933                                    "columns": columns,
934                                    "rows": [rows]})
935
936         # Add updates.
937         any_updates = False
938         for row in self._txn_rows.itervalues():
939             if row._changes is None:
940                 if row._table.is_root:
941                     operations.append({"op": "delete",
942                                        "table": row._table.name,
943                                        "where": _where_uuid_equals(row.uuid)})
944                     any_updates = True
945                 else:
946                     # Let ovsdb-server decide whether to really delete it.
947                     pass
948             elif row._changes:
949                 op = {"table": row._table.name}
950                 if row._data is None:
951                     op["op"] = "insert"
952                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
953                     any_updates = True
954
955                     op_index = len(operations) - 1
956                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
957                 else:
958                     op["op"] = "update"
959                     op["where"] = _where_uuid_equals(row.uuid)
960
961                 row_json = {}
962                 op["row"] = row_json
963
964                 for column_name, datum in row._changes.iteritems():
965                     if row._data is not None or not datum.is_default():
966                         row_json[column_name] = (
967                                 self._substitute_uuids(datum.to_json()))
968
969                         # If anything really changed, consider it an update.
970                         # We can't suppress not-really-changed values earlier
971                         # or transactions would become nonatomic (see the big
972                         # comment inside Transaction._write()).
973                         if (not any_updates and row._data is not None and
974                             row._data[column_name] != datum):
975                             any_updates = True
976
977                 if row._data is None or row_json:
978                     operations.append(op)
979
980         if self._fetch_requests:
981             for fetch in self._fetch_requests:
982                 fetch["index"] = len(operations) - 1
983                 operations.append({"op": "select",
984                                    "table": fetch["row"]._table.name,
985                                    "where": self._substitute_uuids(
986                                        _where_uuid_equals(fetch["row"].uuid)),
987                                    "columns": [fetch["column_name"]]})
988             any_updates = True
989
990         # Add increment.
991         if self._inc_row and any_updates:
992             self._inc_index = len(operations) - 1
993
994             operations.append({"op": "mutate",
995                                "table": self._inc_row._table.name,
996                                "where": self._substitute_uuids(
997                                    _where_uuid_equals(self._inc_row.uuid)),
998                                "mutations": [[self._inc_column, "+=", 1]]})
999             operations.append({"op": "select",
1000                                "table": self._inc_row._table.name,
1001                                "where": self._substitute_uuids(
1002                                    _where_uuid_equals(self._inc_row.uuid)),
1003                                "columns": [self._inc_column]})
1004
1005         # Add comment.
1006         if self._comments:
1007             operations.append({"op": "comment",
1008                                "comment": "\n".join(self._comments)})
1009
1010         # Dry run?
1011         if self.dry_run:
1012             operations.append({"op": "abort"})
1013
1014         if not any_updates:
1015             self._status = Transaction.UNCHANGED
1016         else:
1017             msg = ovs.jsonrpc.Message.create_request("transact", operations)
1018             self._request_id = msg.id
1019             if not self.idl._session.send(msg):
1020                 self.idl._outstanding_txns[self._request_id] = self
1021                 self._status = Transaction.INCOMPLETE
1022             else:
1023                 self._status = Transaction.TRY_AGAIN
1024
1025         self.__disassemble()
1026         return self._status
1027
1028     def commit_block(self):
1029         """Attempts to commit this transaction, blocking until the commit
1030         either succeeds or fails.  Returns the final commit status, which may
1031         be any Transaction.* value other than Transaction.INCOMPLETE.
1032
1033         This function calls Idl.run() on this transaction'ss IDL, so it may
1034         cause Idl.change_seqno to change."""
1035         while True:
1036             status = self.commit()
1037             if status != Transaction.INCOMPLETE:
1038                 return status
1039
1040             self.idl.run()
1041
1042             poller = ovs.poller.Poller()
1043             self.idl.wait(poller)
1044             self.wait(poller)
1045             poller.block()
1046
1047     def get_increment_new_value(self):
1048         """Returns the final (incremented) value of the column in this
1049         transaction that was set to be incremented by Row.increment.  This
1050         transaction must have committed successfully."""
1051         assert self._status == Transaction.SUCCESS
1052         return self._inc_new_value
1053
1054     def abort(self):
1055         """Aborts this transaction.  If Transaction.commit() has already been
1056         called then the transaction might get committed anyhow."""
1057         self.__disassemble()
1058         if self._status in (Transaction.UNCOMMITTED,
1059                             Transaction.INCOMPLETE):
1060             self._status = Transaction.ABORTED
1061
1062     def get_error(self):
1063         """Returns a string representing this transaction's current status,
1064         suitable for use in log messages."""
1065         if self._status != Transaction.ERROR:
1066             return Transaction.status_to_string(self._status)
1067         elif self._error:
1068             return self._error
1069         else:
1070             return "no error details available"
1071
1072     def __set_error_json(self, json):
1073         if self._error is None:
1074             self._error = ovs.json.to_string(json)
1075
1076     def get_insert_uuid(self, uuid):
1077         """Finds and returns the permanent UUID that the database assigned to a
1078         newly inserted row, given the UUID that Transaction.insert() assigned
1079         locally to that row.
1080
1081         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1082         or if it was assigned by that function and then deleted by Row.delete()
1083         within the same transaction.  (Rows that are inserted and then deleted
1084         within a single transaction are never sent to the database server, so
1085         it never assigns them a permanent UUID.)
1086
1087         This transaction must have completed successfully."""
1088         assert self._status in (Transaction.SUCCESS,
1089                                 Transaction.UNCHANGED)
1090         inserted_row = self._inserted_rows.get(uuid)
1091         if inserted_row:
1092             return inserted_row.real
1093         return None
1094
1095     def _increment(self, row, column):
1096         assert not self._inc_row
1097         self._inc_row = row
1098         self._inc_column = column
1099
1100     def _fetch(self, row, column_name):
1101         self._fetch_requests.append({"row": row, "column_name": column_name})
1102
1103     def _write(self, row, column, datum):
1104         assert row._changes is not None
1105
1106         txn = row._idl.txn
1107
1108         # If this is a write-only column and the datum being written is the
1109         # same as the one already there, just skip the update entirely.  This
1110         # is worth optimizing because we have a lot of columns that get
1111         # periodically refreshed into the database but don't actually change
1112         # that often.
1113         #
1114         # We don't do this for read/write columns because that would break
1115         # atomicity of transactions--some other client might have written a
1116         # different value in that column since we read it.  (But if a whole
1117         # transaction only does writes of existing values, without making any
1118         # real changes, we will drop the whole transaction later in
1119         # ovsdb_idl_txn_commit().)
1120         if not column.alert and row._data and row._data.get(column.name) == datum:
1121             new_value = row._changes.get(column.name)
1122             if new_value is None or new_value == datum:
1123                 return
1124
1125         txn._txn_rows[row.uuid] = row
1126         row._changes[column.name] = datum.copy()
1127
1128     def insert(self, table, new_uuid=None):
1129         """Inserts and returns a new row in 'table', which must be one of the
1130         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1131
1132         The new row is assigned a provisional UUID.  If 'uuid' is None then one
1133         is randomly generated; otherwise 'uuid' should specify a randomly
1134         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
1135         different UUID when 'txn' is committed, but the IDL will replace any
1136         uses of the provisional UUID in the data to be to be committed by the
1137         UUID assigned by ovsdb-server."""
1138         assert self._status == Transaction.UNCOMMITTED
1139         if new_uuid is None:
1140             new_uuid = uuid.uuid4()
1141         row = Row(self.idl, table, new_uuid, None)
1142         table.rows[row.uuid] = row
1143         self._txn_rows[row.uuid] = row
1144         return row
1145
1146     def _process_reply(self, msg):
1147         if msg.type == ovs.jsonrpc.Message.T_ERROR:
1148             self._status = Transaction.ERROR
1149         elif type(msg.result) not in (list, tuple):
1150             # XXX rate-limit
1151             vlog.warn('reply to "transact" is not JSON array')
1152         else:
1153             hard_errors = False
1154             soft_errors = False
1155             lock_errors = False
1156
1157             ops = msg.result
1158             for op in ops:
1159                 if op is None:
1160                     # This isn't an error in itself but indicates that some
1161                     # prior operation failed, so make sure that we know about
1162                     # it.
1163                     soft_errors = True
1164                 elif type(op) == dict:
1165                     error = op.get("error")
1166                     if error is not None:
1167                         if error == "timed out":
1168                             soft_errors = True
1169                         elif error == "not owner":
1170                             lock_errors = True
1171                         elif error == "aborted":
1172                             pass
1173                         else:
1174                             hard_errors = True
1175                             self.__set_error_json(op)
1176                 else:
1177                     hard_errors = True
1178                     self.__set_error_json(op)
1179                     # XXX rate-limit
1180                     vlog.warn("operation reply is not JSON null or object")
1181
1182             if not soft_errors and not hard_errors and not lock_errors:
1183                 if self._inc_row and not self.__process_inc_reply(ops):
1184                     hard_errors = True
1185                 if self._fetch_requests:
1186                     if self.__process_fetch_reply(ops):
1187                         self.idl.change_seqno += 1
1188                     else:
1189                         hard_errors = True
1190
1191                 for insert in self._inserted_rows.itervalues():
1192                     if not self.__process_insert_reply(insert, ops):
1193                         hard_errors = True
1194
1195             if hard_errors:
1196                 self._status = Transaction.ERROR
1197             elif lock_errors:
1198                 self._status = Transaction.NOT_LOCKED
1199             elif soft_errors:
1200                 self._status = Transaction.TRY_AGAIN
1201             else:
1202                 self._status = Transaction.SUCCESS
1203
1204     @staticmethod
1205     def __check_json_type(json, types, name):
1206         if not json:
1207             # XXX rate-limit
1208             vlog.warn("%s is missing" % name)
1209             return False
1210         elif type(json) not in types:
1211             # XXX rate-limit
1212             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1213             return False
1214         else:
1215             return True
1216
1217     def __process_fetch_reply(self, ops):
1218         update = False
1219         for fetch_request in self._fetch_requests:
1220             row = fetch_request["row"]
1221             column_name = fetch_request["column_name"]
1222             index = fetch_request["index"]
1223             table = row._table
1224
1225             select = ops[index]
1226             fetched_rows = select.get("rows")
1227             if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1228                                                  '"select" reply "rows"'):
1229                 return False
1230             if len(fetched_rows) != 1:
1231                 # XXX rate-limit
1232                 vlog.warn('"select" reply "rows" has %d elements '
1233                           'instead of 1' % len(fetched_rows))
1234                 continue
1235             fetched_row = fetched_rows[0]
1236             if not Transaction.__check_json_type(fetched_row, (dict,),
1237                                                  '"select" reply row'):
1238                 continue
1239
1240             column = table.columns.get(column_name)
1241             datum_json = fetched_row.get(column_name)
1242             datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1243
1244             row._data[column_name] = datum
1245             update = True
1246
1247         return update
1248
1249     def __process_inc_reply(self, ops):
1250         if self._inc_index + 2 > len(ops):
1251             # XXX rate-limit
1252             vlog.warn("reply does not contain enough operations for "
1253                       "increment (has %d, needs %d)" %
1254                       (len(ops), self._inc_index + 2))
1255
1256         # We know that this is a JSON object because the loop in
1257         # __process_reply() already checked.
1258         mutate = ops[self._inc_index]
1259         count = mutate.get("count")
1260         if not Transaction.__check_json_type(count, (int, long),
1261                                              '"mutate" reply "count"'):
1262             return False
1263         if count != 1:
1264             # XXX rate-limit
1265             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1266             return False
1267
1268         select = ops[self._inc_index + 1]
1269         rows = select.get("rows")
1270         if not Transaction.__check_json_type(rows, (list, tuple),
1271                                              '"select" reply "rows"'):
1272             return False
1273         if len(rows) != 1:
1274             # XXX rate-limit
1275             vlog.warn('"select" reply "rows" has %d elements '
1276                       'instead of 1' % len(rows))
1277             return False
1278         row = rows[0]
1279         if not Transaction.__check_json_type(row, (dict,),
1280                                              '"select" reply row'):
1281             return False
1282         column = row.get(self._inc_column)
1283         if not Transaction.__check_json_type(column, (int, long),
1284                                              '"select" reply inc column'):
1285             return False
1286         self._inc_new_value = column
1287         return True
1288
1289     def __process_insert_reply(self, insert, ops):
1290         if insert.op_index >= len(ops):
1291             # XXX rate-limit
1292             vlog.warn("reply does not contain enough operations "
1293                       "for insert (has %d, needs %d)"
1294                       % (len(ops), insert.op_index))
1295             return False
1296
1297         # We know that this is a JSON object because the loop in
1298         # __process_reply() already checked.
1299         reply = ops[insert.op_index]
1300         json_uuid = reply.get("uuid")
1301         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1302                                              '"insert" reply "uuid"'):
1303             return False
1304
1305         try:
1306             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1307         except error.Error:
1308             # XXX rate-limit
1309             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1310             return False
1311
1312         insert.real = uuid_
1313         return True
1314
1315
1316 class SchemaHelper(object):
1317     """IDL Schema helper.
1318
1319     This class encapsulates the logic required to generate schemas suitable
1320     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1321     they are interested in using register_columns().  When finished, the
1322     get_idl_schema() function may be called.
1323
1324     The location on disk of the schema used may be found in the
1325     'schema_location' variable."""
1326
1327     def __init__(self, location=None, schema_json=None):
1328         """Creates a new Schema object.
1329
1330         'location' file path to ovs schema. None means default location
1331         'schema_json' schema in json preresentation in memory
1332         """
1333
1334         if location and schema_json:
1335             raise ValueError("both location and schema_json can't be "
1336                              "specified. it's ambiguous.")
1337         if schema_json is None:
1338             if location is None:
1339                 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1340             schema_json = ovs.json.from_file(location)
1341
1342         self.schema_json = schema_json
1343         self._tables = {}
1344         self._readonly = {}
1345         self._all = False
1346
1347     def register_columns(self, table, columns, readonly=[]):
1348         """Registers interest in the given 'columns' of 'table'.  Future calls
1349         to get_idl_schema() will include 'table':column for each column in
1350         'columns'. This function automatically avoids adding duplicate entries
1351         to the schema.
1352         A subset of 'columns' can be specified as 'readonly'. The readonly
1353         columns are not replicated but can be fetched on-demand by the user
1354         with Row.fetch().
1355
1356         'table' must be a string.
1357         'columns' must be a list of strings.
1358         'readonly' must be a list of strings.
1359         """
1360
1361         assert type(table) is str
1362         assert type(columns) is list
1363
1364         columns = set(columns) | self._tables.get(table, set())
1365         self._tables[table] = columns
1366         self._readonly[table] = readonly
1367
1368     def register_table(self, table):
1369         """Registers interest in the given all columns of 'table'. Future calls
1370         to get_idl_schema() will include all columns of 'table'.
1371
1372         'table' must be a string
1373         """
1374         assert type(table) is str
1375         self._tables[table] = set()  # empty set means all columns in the table
1376
1377     def register_all(self):
1378         """Registers interest in every column of every table."""
1379         self._all = True
1380
1381     def get_idl_schema(self):
1382         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1383         object based on columns registered using the register_columns()
1384         function."""
1385
1386         schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1387         self.schema_json = None
1388
1389         if not self._all:
1390             schema_tables = {}
1391             for table, columns in self._tables.iteritems():
1392                 schema_tables[table] = (
1393                     self._keep_table_columns(schema, table, columns))
1394
1395             schema.tables = schema_tables
1396         schema.readonly = self._readonly
1397         return schema
1398
1399     def _keep_table_columns(self, schema, table_name, columns):
1400         assert table_name in schema.tables
1401         table = schema.tables[table_name]
1402
1403         if not columns:
1404             # empty set means all columns in the table
1405             return table
1406
1407         new_columns = {}
1408         for column_name in columns:
1409             assert type(column_name) is str
1410             assert column_name in table.columns
1411
1412             new_columns[column_name] = table.columns[column_name]
1413
1414         table.columns = new_columns
1415         return table