python: Fix attempt to use a bool as a function.
[cascardo/ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import uuid
16
17 import ovs.jsonrpc
18 import ovs.db.parser
19 import ovs.db.schema
20 from ovs.db import error
21 import ovs.ovsuuid
22 import ovs.poller
23 import ovs.vlog
24
25 vlog = ovs.vlog.Vlog("idl")
26
27 __pychecker__ = 'no-classattr no-objattrs'
28
29 ROW_CREATE = "create"
30 ROW_UPDATE = "update"
31 ROW_DELETE = "delete"
32
33
34 class Idl(object):
35     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
36
37     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
38     requests to an OVSDB database server and parses the responses, converting
39     raw JSON into data structures that are easier for clients to digest.
40
41     The IDL also assists with issuing database transactions.  The client
42     creates a transaction, manipulates the IDL data structures, and commits or
43     aborts the transaction.  The IDL then composes and issues the necessary
44     JSON-RPC requests and reports to the client whether the transaction
45     completed successfully.
46
47     The client is allowed to access the following attributes directly, in a
48     read-only fashion:
49
50     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
51       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
52       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
53       to a Row object.
54
55       The client may directly read and write the Row objects referenced by the
56       'rows' map values.  Refer to Row for more details.
57
58     - 'change_seqno': A number that represents the IDL's state.  When the IDL
59       is updated (by Idl.run()), its value changes.  The sequence number can
60       occasionally change even if the database does not.  This happens if the
61       connection to the database drops and reconnects, which causes the
62       database contents to be reloaded even if they didn't change.  (It could
63       also happen if the database server sends out a "change" that reflects
64       what the IDL already thought was in the database.  The database server is
65       not supposed to do that, but bugs could in theory cause it to do so.)
66
67     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
68       if no lock is configured.
69
70     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
71       lock, and False otherwise.
72
73       Locking and unlocking happens asynchronously from the database client's
74       point of view, so the information is only useful for optimization
75       (e.g. if the client doesn't have the lock then there's no point in trying
76       to write to the database).
77
78     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
79       the database server has indicated that some other client already owns the
80       requested lock, and False otherwise.
81
82     - 'txn': The ovs.db.idl.Transaction object for the database transaction
83       currently being constructed, if there is one, or None otherwise.
84 """
85
86     def __init__(self, remote, schema):
87         """Creates and returns a connection to the database named 'db_name' on
88         'remote', which should be in a form acceptable to
89         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
90         replica of the remote database.
91
92         'schema' should be the schema for the remote database.  The caller may
93         have cut it down by removing tables or columns that are not of
94         interest.  The IDL will only replicate the tables and columns that
95         remain.  The caller may also add a attribute named 'alert' to selected
96         remaining columns, setting its value to False; if so, then changes to
97         those columns will not be considered changes to the database for the
98         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
99         useful for columns that the IDL's client will write but not read.
100
101         As a convenience to users, 'schema' may also be an instance of the
102         SchemaHelper class.
103
104         The IDL uses and modifies 'schema' directly."""
105
106         assert isinstance(schema, SchemaHelper)
107         schema = schema.get_idl_schema()
108
109         self.tables = schema.tables
110         self._db = schema
111         self._session = ovs.jsonrpc.Session.open(remote)
112         self._monitor_request_id = None
113         self._last_seqno = None
114         self.change_seqno = 0
115
116         # Database locking.
117         self.lock_name = None          # Name of lock we need, None if none.
118         self.has_lock = False          # Has db server said we have the lock?
119         self.is_lock_contended = False  # Has db server said we can't get lock?
120         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
121
122         # Transaction support.
123         self.txn = None
124         self._outstanding_txns = {}
125
126         for table in schema.tables.itervalues():
127             for column in table.columns.itervalues():
128                 if not hasattr(column, 'alert'):
129                     column.alert = True
130             table.need_table = False
131             table.rows = {}
132             table.idl = self
133
134     def close(self):
135         """Closes the connection to the database.  The IDL will no longer
136         update."""
137         self._session.close()
138
139     def run(self):
140         """Processes a batch of messages from the database server.  Returns
141         True if the database as seen through the IDL changed, False if it did
142         not change.  The initial fetch of the entire contents of the remote
143         database is considered to be one kind of change.  If the IDL has been
144         configured to acquire a database lock (with Idl.set_lock()), then
145         successfully acquiring the lock is also considered to be a change.
146
147         This function can return occasional false positives, that is, report
148         that the database changed even though it didn't.  This happens if the
149         connection to the database drops and reconnects, which causes the
150         database contents to be reloaded even if they didn't change.  (It could
151         also happen if the database server sends out a "change" that reflects
152         what we already thought was in the database, but the database server is
153         not supposed to do that.)
154
155         As an alternative to checking the return value, the client may check
156         for changes in self.change_seqno."""
157         assert not self.txn
158         initial_change_seqno = self.change_seqno
159         self._session.run()
160         i = 0
161         while i < 50:
162             i += 1
163             if not self._session.is_connected():
164                 break
165
166             seqno = self._session.get_seqno()
167             if seqno != self._last_seqno:
168                 self._last_seqno = seqno
169                 self.__txn_abort_all()
170                 self.__send_monitor_request()
171                 if self.lock_name:
172                     self.__send_lock_request()
173                 break
174
175             msg = self._session.recv()
176             if msg is None:
177                 break
178             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
179                 and msg.method == "update"
180                 and len(msg.params) == 2
181                 and msg.params[0] == None):
182                 # Database contents changed.
183                 self.__parse_update(msg.params[1])
184             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
185                   and self._monitor_request_id is not None
186                   and self._monitor_request_id == msg.id):
187                 # Reply to our "monitor" request.
188                 try:
189                     self.change_seqno += 1
190                     self._monitor_request_id = None
191                     self.__clear()
192                     self.__parse_update(msg.result)
193                 except error.Error, e:
194                     vlog.err("%s: parse error in received schema: %s"
195                               % (self._session.get_name(), e))
196                     self.__error()
197             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
198                   and self._lock_request_id is not None
199                   and self._lock_request_id == msg.id):
200                 # Reply to our "lock" request.
201                 self.__parse_lock_reply(msg.result)
202             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
203                   and msg.method == "locked"):
204                 # We got our lock.
205                 self.__parse_lock_notify(msg.params, True)
206             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
207                   and msg.method == "stolen"):
208                 # Someone else stole our lock.
209                 self.__parse_lock_notify(msg.params, False)
210             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
211                 # Reply to our echo request.  Ignore it.
212                 pass
213             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
214                                ovs.jsonrpc.Message.T_REPLY)
215                   and self.__txn_process_reply(msg)):
216                 # __txn_process_reply() did everything needed.
217                 pass
218             else:
219                 # This can happen if a transaction is destroyed before we
220                 # receive the reply, so keep the log level low.
221                 vlog.dbg("%s: received unexpected %s message"
222                          % (self._session.get_name(),
223                              ovs.jsonrpc.Message.type_to_string(msg.type)))
224
225         return initial_change_seqno != self.change_seqno
226
227     def wait(self, poller):
228         """Arranges for poller.block() to wake up when self.run() has something
229         to do or when activity occurs on a transaction on 'self'."""
230         self._session.wait(poller)
231         self._session.recv_wait(poller)
232
233     def has_ever_connected(self):
234         """Returns True, if the IDL successfully connected to the remote
235         database and retrieved its contents (even if the connection
236         subsequently dropped and is in the process of reconnecting).  If so,
237         then the IDL contains an atomic snapshot of the database's contents
238         (but it might be arbitrarily old if the connection dropped).
239
240         Returns False if the IDL has never connected or retrieved the
241         database's contents.  If so, the IDL is empty."""
242         return self.change_seqno != 0
243
244     def force_reconnect(self):
245         """Forces the IDL to drop its connection to the database and reconnect.
246         In the meantime, the contents of the IDL will not change."""
247         self._session.force_reconnect()
248
249     def set_lock(self, lock_name):
250         """If 'lock_name' is not None, configures the IDL to obtain the named
251         lock from the database server and to avoid modifying the database when
252         the lock cannot be acquired (that is, when another client has the same
253         lock).
254
255         If 'lock_name' is None, drops the locking requirement and releases the
256         lock."""
257         assert not self.txn
258         assert not self._outstanding_txns
259
260         if self.lock_name and (not lock_name or lock_name != self.lock_name):
261             # Release previous lock.
262             self.__send_unlock_request()
263             self.lock_name = None
264             self.is_lock_contended = False
265
266         if lock_name and not self.lock_name:
267             # Acquire new lock.
268             self.lock_name = lock_name
269             self.__send_lock_request()
270
271     def notify(self, event, row, updates=None):
272         """Hook for implementing create/update/delete notifications
273
274         :param event:   The event that was triggered
275         :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE
276         :param row:     The row as it is after the operation has occured
277         :type row:      Row
278         :param updates: For updates, a Row object with just the changed columns
279         :type updates:  Row
280         """
281
282     def __clear(self):
283         changed = False
284
285         for table in self.tables.itervalues():
286             if table.rows:
287                 changed = True
288                 table.rows = {}
289
290         if changed:
291             self.change_seqno += 1
292
293     def __update_has_lock(self, new_has_lock):
294         if new_has_lock and not self.has_lock:
295             if self._monitor_request_id is None:
296                 self.change_seqno += 1
297             else:
298                 # We're waiting for a monitor reply, so don't signal that the
299                 # database changed.  The monitor reply will increment
300                 # change_seqno anyhow.
301                 pass
302             self.is_lock_contended = False
303         self.has_lock = new_has_lock
304
305     def __do_send_lock_request(self, method):
306         self.__update_has_lock(False)
307         self._lock_request_id = None
308         if self._session.is_connected():
309             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
310             msg_id = msg.id
311             self._session.send(msg)
312         else:
313             msg_id = None
314         return msg_id
315
316     def __send_lock_request(self):
317         self._lock_request_id = self.__do_send_lock_request("lock")
318
319     def __send_unlock_request(self):
320         self.__do_send_lock_request("unlock")
321
322     def __parse_lock_reply(self, result):
323         self._lock_request_id = None
324         got_lock = type(result) == dict and result.get("locked") is True
325         self.__update_has_lock(got_lock)
326         if not got_lock:
327             self.is_lock_contended = True
328
329     def __parse_lock_notify(self, params, new_has_lock):
330         if (self.lock_name is not None
331             and type(params) in (list, tuple)
332             and params
333             and params[0] == self.lock_name):
334             self.__update_has_lock(self, new_has_lock)
335             if not new_has_lock:
336                 self.is_lock_contended = True
337
338     def __send_monitor_request(self):
339         monitor_requests = {}
340         for table in self.tables.itervalues():
341             monitor_requests[table.name] = {"columns": table.columns.keys()}
342         msg = ovs.jsonrpc.Message.create_request(
343             "monitor", [self._db.name, None, monitor_requests])
344         self._monitor_request_id = msg.id
345         self._session.send(msg)
346
347     def __parse_update(self, update):
348         try:
349             self.__do_parse_update(update)
350         except error.Error, e:
351             vlog.err("%s: error parsing update: %s"
352                      % (self._session.get_name(), e))
353
354     def __do_parse_update(self, table_updates):
355         if type(table_updates) != dict:
356             raise error.Error("<table-updates> is not an object",
357                               table_updates)
358
359         for table_name, table_update in table_updates.iteritems():
360             table = self.tables.get(table_name)
361             if not table:
362                 raise error.Error('<table-updates> includes unknown '
363                                   'table "%s"' % table_name)
364
365             if type(table_update) != dict:
366                 raise error.Error('<table-update> for table "%s" is not '
367                                   'an object' % table_name, table_update)
368
369             for uuid_string, row_update in table_update.iteritems():
370                 if not ovs.ovsuuid.is_valid_string(uuid_string):
371                     raise error.Error('<table-update> for table "%s" '
372                                       'contains bad UUID "%s" as member '
373                                       'name' % (table_name, uuid_string),
374                                       table_update)
375                 uuid = ovs.ovsuuid.from_string(uuid_string)
376
377                 if type(row_update) != dict:
378                     raise error.Error('<table-update> for table "%s" '
379                                       'contains <row-update> for %s that '
380                                       'is not an object'
381                                       % (table_name, uuid_string))
382
383                 parser = ovs.db.parser.Parser(row_update, "row-update")
384                 old = parser.get_optional("old", [dict])
385                 new = parser.get_optional("new", [dict])
386                 parser.finish()
387
388                 if not old and not new:
389                     raise error.Error('<row-update> missing "old" and '
390                                       '"new" members', row_update)
391
392                 if self.__process_update(table, uuid, old, new):
393                     self.change_seqno += 1
394
395     def __process_update(self, table, uuid, old, new):
396         """Returns True if a column changed, False otherwise."""
397         row = table.rows.get(uuid)
398         changed = False
399         if not new:
400             # Delete row.
401             if row:
402                 del table.rows[uuid]
403                 changed = True
404                 self.notify(ROW_DELETE, row)
405             else:
406                 # XXX rate-limit
407                 vlog.warn("cannot delete missing row %s from table %s"
408                           % (uuid, table.name))
409         elif not old:
410             # Insert row.
411             if not row:
412                 row = self.__create_row(table, uuid)
413                 changed = True
414             else:
415                 # XXX rate-limit
416                 vlog.warn("cannot add existing row %s to table %s"
417                           % (uuid, table.name))
418             if self.__row_update(table, row, new):
419                 changed = True
420                 self.notify(ROW_CREATE, row)
421         else:
422             op = ROW_UPDATE
423             if not row:
424                 row = self.__create_row(table, uuid)
425                 changed = True
426                 op = ROW_CREATE
427                 # XXX rate-limit
428                 vlog.warn("cannot modify missing row %s in table %s"
429                           % (uuid, table.name))
430             if self.__row_update(table, row, new):
431                 changed = True
432                 self.notify(op, row, Row.from_json(self, table, uuid, old))
433         return changed
434
435     def __row_update(self, table, row, row_json):
436         changed = False
437         for column_name, datum_json in row_json.iteritems():
438             column = table.columns.get(column_name)
439             if not column:
440                 # XXX rate-limit
441                 vlog.warn("unknown column %s updating table %s"
442                           % (column_name, table.name))
443                 continue
444
445             try:
446                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
447             except error.Error, e:
448                 # XXX rate-limit
449                 vlog.warn("error parsing column %s in table %s: %s"
450                           % (column_name, table.name, e))
451                 continue
452
453             if datum != row._data[column_name]:
454                 row._data[column_name] = datum
455                 if column.alert:
456                     changed = True
457             else:
458                 # Didn't really change but the OVSDB monitor protocol always
459                 # includes every value in a row.
460                 pass
461         return changed
462
463     def __create_row(self, table, uuid):
464         data = {}
465         for column in table.columns.itervalues():
466             data[column.name] = ovs.db.data.Datum.default(column.type)
467         row = table.rows[uuid] = Row(self, table, uuid, data)
468         return row
469
470     def __error(self):
471         self._session.force_reconnect()
472
473     def __txn_abort_all(self):
474         while self._outstanding_txns:
475             txn = self._outstanding_txns.popitem()[1]
476             txn._status = Transaction.TRY_AGAIN
477
478     def __txn_process_reply(self, msg):
479         txn = self._outstanding_txns.pop(msg.id, None)
480         if txn:
481             txn._process_reply(msg)
482
483
484 def _uuid_to_row(atom, base):
485     if base.ref_table:
486         return base.ref_table.rows.get(atom)
487     else:
488         return atom
489
490
491 def _row_to_uuid(value):
492     if type(value) == Row:
493         return value.uuid
494     else:
495         return value
496
497
498 class Row(object):
499     """A row within an IDL.
500
501     The client may access the following attributes directly:
502
503     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
504
505     - An attribute for each column in the Row's table, named for the column,
506       whose values are as returned by Datum.to_python() for the column's type.
507
508       If some error occurs (e.g. the database server's idea of the column is
509       different from the IDL's idea), then the attribute values is the
510       "default" value return by Datum.default() for the column's type.  (It is
511       important to know this because the default value may violate constraints
512       for the column's type, e.g. the default integer value is 0 even if column
513       contraints require the column's value to be positive.)
514
515       When a transaction is active, column attributes may also be assigned new
516       values.  Committing the transaction will then cause the new value to be
517       stored into the database.
518
519       *NOTE*: In the current implementation, the value of a column is a *copy*
520       of the value in the database.  This means that modifying its value
521       directly will have no useful effect.  For example, the following:
522         row.mycolumn["a"] = "b"              # don't do this
523       will not change anything in the database, even after commit.  To modify
524       the column, instead assign the modified column value back to the column:
525         d = row.mycolumn
526         d["a"] = "b"
527         row.mycolumn = d
528 """
529     def __init__(self, idl, table, uuid, data):
530         # All of the explicit references to self.__dict__ below are required
531         # to set real attributes with invoking self.__getattr__().
532         self.__dict__["uuid"] = uuid
533
534         self.__dict__["_idl"] = idl
535         self.__dict__["_table"] = table
536
537         # _data is the committed data.  It takes the following values:
538         #
539         #   - A dictionary that maps every column name to a Datum, if the row
540         #     exists in the committed form of the database.
541         #
542         #   - None, if this row is newly inserted within the active transaction
543         #     and thus has no committed form.
544         self.__dict__["_data"] = data
545
546         # _changes describes changes to this row within the active transaction.
547         # It takes the following values:
548         #
549         #   - {}, the empty dictionary, if no transaction is active or if the
550         #     row has yet not been changed within this transaction.
551         #
552         #   - A dictionary that maps a column name to its new Datum, if an
553         #     active transaction changes those columns' values.
554         #
555         #   - A dictionary that maps every column name to a Datum, if the row
556         #     is newly inserted within the active transaction.
557         #
558         #   - None, if this transaction deletes this row.
559         self.__dict__["_changes"] = {}
560
561         # A dictionary whose keys are the names of columns that must be
562         # verified as prerequisites when the transaction commits.  The values
563         # in the dictionary are all None.
564         self.__dict__["_prereqs"] = {}
565
566     def __getattr__(self, column_name):
567         assert self._changes is not None
568
569         datum = self._changes.get(column_name)
570         if datum is None:
571             if self._data is None:
572                 raise AttributeError("%s instance has no attribute '%s'" %
573                                      (self.__class__.__name__, column_name))
574             datum = self._data[column_name]
575
576         return datum.to_python(_uuid_to_row)
577
578     def __setattr__(self, column_name, value):
579         assert self._changes is not None
580         assert self._idl.txn
581
582         column = self._table.columns[column_name]
583         try:
584             datum = ovs.db.data.Datum.from_python(column.type, value,
585                                                   _row_to_uuid)
586         except error.Error, e:
587             # XXX rate-limit
588             vlog.err("attempting to write bad value to column %s (%s)"
589                      % (column_name, e))
590             return
591         self._idl.txn._write(self, column, datum)
592
593     @classmethod
594     def from_json(cls, idl, table, uuid, row_json):
595         data = {}
596         for column_name, datum_json in row_json.iteritems():
597             column = table.columns.get(column_name)
598             if not column:
599                 # XXX rate-limit
600                 vlog.warn("unknown column %s in table %s"
601                           % (column_name, table.name))
602                 continue
603             try:
604                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
605             except error.Error, e:
606                 # XXX rate-limit
607                 vlog.warn("error parsing column %s in table %s: %s"
608                           % (column_name, table.name, e))
609                 continue
610             data[column_name] = datum
611         return cls(idl, table, uuid, data)
612
613     def verify(self, column_name):
614         """Causes the original contents of column 'column_name' in this row to
615         be verified as a prerequisite to completing the transaction.  That is,
616         if 'column_name' changed in this row (or if this row was deleted)
617         between the time that the IDL originally read its contents and the time
618         that the transaction commits, then the transaction aborts and
619         Transaction.commit() returns Transaction.TRY_AGAIN.
620
621         The intention is that, to ensure that no transaction commits based on
622         dirty reads, an application should call Row.verify() on each data item
623         read as part of a read-modify-write operation.
624
625         In some cases Row.verify() reduces to a no-op, because the current
626         value of the column is already known:
627
628           - If this row is a row created by the current transaction (returned
629             by Transaction.insert()).
630
631           - If the column has already been modified within the current
632             transaction.
633
634         Because of the latter property, always call Row.verify() *before*
635         modifying the column, for a given read-modify-write.
636
637         A transaction must be in progress."""
638         assert self._idl.txn
639         assert self._changes is not None
640         if not self._data or column_name in self._changes:
641             return
642
643         self._prereqs[column_name] = None
644
645     def delete(self):
646         """Deletes this row from its table.
647
648         A transaction must be in progress."""
649         assert self._idl.txn
650         assert self._changes is not None
651         if self._data is None:
652             del self._idl.txn._txn_rows[self.uuid]
653         else:
654             self._idl.txn._txn_rows[self.uuid] = self
655         self.__dict__["_changes"] = None
656         del self._table.rows[self.uuid]
657
658     def increment(self, column_name):
659         """Causes the transaction, when committed, to increment the value of
660         'column_name' within this row by 1.  'column_name' must have an integer
661         type.  After the transaction commits successfully, the client may
662         retrieve the final (incremented) value of 'column_name' with
663         Transaction.get_increment_new_value().
664
665         The client could accomplish something similar by reading and writing
666         and verify()ing columns.  However, increment() will never (by itself)
667         cause a transaction to fail because of a verify error.
668
669         The intended use is for incrementing the "next_cfg" column in
670         the Open_vSwitch table."""
671         self._idl.txn._increment(self, column_name)
672
673
674 def _uuid_name_from_uuid(uuid):
675     return "row%s" % str(uuid).replace("-", "_")
676
677
678 def _where_uuid_equals(uuid):
679     return [["_uuid", "==", ["uuid", str(uuid)]]]
680
681
682 class _InsertedRow(object):
683     def __init__(self, op_index):
684         self.op_index = op_index
685         self.real = None
686
687
688 class Transaction(object):
689     """A transaction may modify the contents of a database by modifying the
690     values of columns, deleting rows, inserting rows, or adding checks that
691     columns in the database have not changed ("verify" operations), through
692     Row methods.
693
694     Reading and writing columns and inserting and deleting rows are all
695     straightforward.  The reasons to verify columns are less obvious.
696     Verification is the key to maintaining transactional integrity.  Because
697     OVSDB handles multiple clients, it can happen that between the time that
698     OVSDB client A reads a column and writes a new value, OVSDB client B has
699     written that column.  Client A's write should not ordinarily overwrite
700     client B's, especially if the column in question is a "map" column that
701     contains several more or less independent data items.  If client A adds a
702     "verify" operation before it writes the column, then the transaction fails
703     in case client B modifies it first.  Client A will then see the new value
704     of the column and compose a new transaction based on the new contents
705     written by client B.
706
707     When a transaction is complete, which must be before the next call to
708     Idl.run(), call Transaction.commit() or Transaction.abort().
709
710     The life-cycle of a transaction looks like this:
711
712     1. Create the transaction and record the initial sequence number:
713
714         seqno = idl.change_seqno(idl)
715         txn = Transaction(idl)
716
717     2. Modify the database with Row and Transaction methods.
718
719     3. Commit the transaction by calling Transaction.commit().  The first call
720        to this function probably returns Transaction.INCOMPLETE.  The client
721        must keep calling again along as this remains true, calling Idl.run() in
722        between to let the IDL do protocol processing.  (If the client doesn't
723        have anything else to do in the meantime, it can use
724        Transaction.commit_block() to avoid having to loop itself.)
725
726     4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
727        to change from the saved 'seqno' (it's possible that it's already
728        changed, in which case the client should not wait at all), then start
729        over from step 1.  Only a call to Idl.run() will change the return value
730        of Idl.change_seqno.  (Transaction.commit_block() calls Idl.run().)"""
731
732     # Status values that Transaction.commit() can return.
733     UNCOMMITTED = "uncommitted"  # Not yet committed or aborted.
734     UNCHANGED = "unchanged"      # Transaction didn't include any changes.
735     INCOMPLETE = "incomplete"    # Commit in progress, please wait.
736     ABORTED = "aborted"          # ovsdb_idl_txn_abort() called.
737     SUCCESS = "success"          # Commit successful.
738     TRY_AGAIN = "try again"      # Commit failed because a "verify" operation
739                                  # reported an inconsistency, due to a network
740                                  # problem, or other transient failure.  Wait
741                                  # for a change, then try again.
742     NOT_LOCKED = "not locked"    # Server hasn't given us the lock yet.
743     ERROR = "error"              # Commit failed due to a hard error.
744
745     @staticmethod
746     def status_to_string(status):
747         """Converts one of the status values that Transaction.commit() can
748         return into a human-readable string.
749
750         (The status values are in fact such strings already, so
751         there's nothing to do.)"""
752         return status
753
754     def __init__(self, idl):
755         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
756         A given Idl may only have a single active transaction at a time.
757
758         A Transaction may modify the contents of a database by assigning new
759         values to columns (attributes of Row), deleting rows (with
760         Row.delete()), or inserting rows (with Transaction.insert()).  It may
761         also check that columns in the database have not changed with
762         Row.verify().
763
764         When a transaction is complete (which must be before the next call to
765         Idl.run()), call Transaction.commit() or Transaction.abort()."""
766         assert idl.txn is None
767
768         idl.txn = self
769         self._request_id = None
770         self.idl = idl
771         self.dry_run = False
772         self._txn_rows = {}
773         self._status = Transaction.UNCOMMITTED
774         self._error = None
775         self._comments = []
776
777         self._inc_row = None
778         self._inc_column = None
779
780         self._inserted_rows = {}  # Map from UUID to _InsertedRow
781
782     def add_comment(self, comment):
783         """Appens 'comment' to the comments that will be passed to the OVSDB
784         server when this transaction is committed.  (The comment will be
785         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
786         relatively human-readable form.)"""
787         self._comments.append(comment)
788
789     def wait(self, poller):
790         """Causes poll_block() to wake up if this transaction has completed
791         committing."""
792         if self._status not in (Transaction.UNCOMMITTED,
793                                 Transaction.INCOMPLETE):
794             poller.immediate_wake()
795
796     def _substitute_uuids(self, json):
797         if type(json) in (list, tuple):
798             if (len(json) == 2
799                 and json[0] == 'uuid'
800                 and ovs.ovsuuid.is_valid_string(json[1])):
801                 uuid = ovs.ovsuuid.from_string(json[1])
802                 row = self._txn_rows.get(uuid, None)
803                 if row and row._data is None:
804                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
805             else:
806                 return [self._substitute_uuids(elem) for elem in json]
807         return json
808
809     def __disassemble(self):
810         self.idl.txn = None
811
812         for row in self._txn_rows.itervalues():
813             if row._changes is None:
814                 row._table.rows[row.uuid] = row
815             elif row._data is None:
816                 del row._table.rows[row.uuid]
817             row.__dict__["_changes"] = {}
818             row.__dict__["_prereqs"] = {}
819         self._txn_rows = {}
820
821     def commit(self):
822         """Attempts to commit 'txn'.  Returns the status of the commit
823         operation, one of the following constants:
824
825           Transaction.INCOMPLETE:
826
827               The transaction is in progress, but not yet complete.  The caller
828               should call again later, after calling Idl.run() to let the
829               IDL do OVSDB protocol processing.
830
831           Transaction.UNCHANGED:
832
833               The transaction is complete.  (It didn't actually change the
834               database, so the IDL didn't send any request to the database
835               server.)
836
837           Transaction.ABORTED:
838
839               The caller previously called Transaction.abort().
840
841           Transaction.SUCCESS:
842
843               The transaction was successful.  The update made by the
844               transaction (and possibly other changes made by other database
845               clients) should already be visible in the IDL.
846
847           Transaction.TRY_AGAIN:
848
849               The transaction failed for some transient reason, e.g. because a
850               "verify" operation reported an inconsistency or due to a network
851               problem.  The caller should wait for a change to the database,
852               then compose a new transaction, and commit the new transaction.
853
854               Use Idl.change_seqno to wait for a change in the database.  It is
855               important to use its value *before* the initial call to
856               Transaction.commit() as the baseline for this purpose, because
857               the change that one should wait for can happen after the initial
858               call but before the call that returns Transaction.TRY_AGAIN, and
859               using some other baseline value in that situation could cause an
860               indefinite wait if the database rarely changes.
861
862           Transaction.NOT_LOCKED:
863
864               The transaction failed because the IDL has been configured to
865               require a database lock (with Idl.set_lock()) but didn't
866               get it yet or has already lost it.
867
868         Committing a transaction rolls back all of the changes that it made to
869         the IDL's copy of the database.  If the transaction commits
870         successfully, then the database server will send an update and, thus,
871         the IDL will be updated with the committed changes."""
872         # The status can only change if we're the active transaction.
873         # (Otherwise, our status will change only in Idl.run().)
874         if self != self.idl.txn:
875             return self._status
876
877         # If we need a lock but don't have it, give up quickly.
878         if self.idl.lock_name and not self.idl.has_lock:
879             self._status = Transaction.NOT_LOCKED
880             self.__disassemble()
881             return self._status
882
883         operations = [self.idl._db.name]
884
885         # Assert that we have the required lock (avoiding a race).
886         if self.idl.lock_name:
887             operations.append({"op": "assert",
888                                "lock": self.idl.lock_name})
889
890         # Add prerequisites and declarations of new rows.
891         for row in self._txn_rows.itervalues():
892             if row._prereqs:
893                 rows = {}
894                 columns = []
895                 for column_name in row._prereqs:
896                     columns.append(column_name)
897                     rows[column_name] = row._data[column_name].to_json()
898                 operations.append({"op": "wait",
899                                    "table": row._table.name,
900                                    "timeout": 0,
901                                    "where": _where_uuid_equals(row.uuid),
902                                    "until": "==",
903                                    "columns": columns,
904                                    "rows": [rows]})
905
906         # Add updates.
907         any_updates = False
908         for row in self._txn_rows.itervalues():
909             if row._changes is None:
910                 if row._table.is_root:
911                     operations.append({"op": "delete",
912                                        "table": row._table.name,
913                                        "where": _where_uuid_equals(row.uuid)})
914                     any_updates = True
915                 else:
916                     # Let ovsdb-server decide whether to really delete it.
917                     pass
918             elif row._changes:
919                 op = {"table": row._table.name}
920                 if row._data is None:
921                     op["op"] = "insert"
922                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
923                     any_updates = True
924
925                     op_index = len(operations) - 1
926                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
927                 else:
928                     op["op"] = "update"
929                     op["where"] = _where_uuid_equals(row.uuid)
930
931                 row_json = {}
932                 op["row"] = row_json
933
934                 for column_name, datum in row._changes.iteritems():
935                     if row._data is not None or not datum.is_default():
936                         row_json[column_name] = (
937                                 self._substitute_uuids(datum.to_json()))
938
939                         # If anything really changed, consider it an update.
940                         # We can't suppress not-really-changed values earlier
941                         # or transactions would become nonatomic (see the big
942                         # comment inside Transaction._write()).
943                         if (not any_updates and row._data is not None and
944                             row._data[column_name] != datum):
945                             any_updates = True
946
947                 if row._data is None or row_json:
948                     operations.append(op)
949
950         # Add increment.
951         if self._inc_row and any_updates:
952             self._inc_index = len(operations) - 1
953
954             operations.append({"op": "mutate",
955                                "table": self._inc_row._table.name,
956                                "where": self._substitute_uuids(
957                                    _where_uuid_equals(self._inc_row.uuid)),
958                                "mutations": [[self._inc_column, "+=", 1]]})
959             operations.append({"op": "select",
960                                "table": self._inc_row._table.name,
961                                "where": self._substitute_uuids(
962                                    _where_uuid_equals(self._inc_row.uuid)),
963                                "columns": [self._inc_column]})
964
965         # Add comment.
966         if self._comments:
967             operations.append({"op": "comment",
968                                "comment": "\n".join(self._comments)})
969
970         # Dry run?
971         if self.dry_run:
972             operations.append({"op": "abort"})
973
974         if not any_updates:
975             self._status = Transaction.UNCHANGED
976         else:
977             msg = ovs.jsonrpc.Message.create_request("transact", operations)
978             self._request_id = msg.id
979             if not self.idl._session.send(msg):
980                 self.idl._outstanding_txns[self._request_id] = self
981                 self._status = Transaction.INCOMPLETE
982             else:
983                 self._status = Transaction.TRY_AGAIN
984
985         self.__disassemble()
986         return self._status
987
988     def commit_block(self):
989         """Attempts to commit this transaction, blocking until the commit
990         either succeeds or fails.  Returns the final commit status, which may
991         be any Transaction.* value other than Transaction.INCOMPLETE.
992
993         This function calls Idl.run() on this transaction'ss IDL, so it may
994         cause Idl.change_seqno to change."""
995         while True:
996             status = self.commit()
997             if status != Transaction.INCOMPLETE:
998                 return status
999
1000             self.idl.run()
1001
1002             poller = ovs.poller.Poller()
1003             self.idl.wait(poller)
1004             self.wait(poller)
1005             poller.block()
1006
1007     def get_increment_new_value(self):
1008         """Returns the final (incremented) value of the column in this
1009         transaction that was set to be incremented by Row.increment.  This
1010         transaction must have committed successfully."""
1011         assert self._status == Transaction.SUCCESS
1012         return self._inc_new_value
1013
1014     def abort(self):
1015         """Aborts this transaction.  If Transaction.commit() has already been
1016         called then the transaction might get committed anyhow."""
1017         self.__disassemble()
1018         if self._status in (Transaction.UNCOMMITTED,
1019                             Transaction.INCOMPLETE):
1020             self._status = Transaction.ABORTED
1021
1022     def get_error(self):
1023         """Returns a string representing this transaction's current status,
1024         suitable for use in log messages."""
1025         if self._status != Transaction.ERROR:
1026             return Transaction.status_to_string(self._status)
1027         elif self._error:
1028             return self._error
1029         else:
1030             return "no error details available"
1031
1032     def __set_error_json(self, json):
1033         if self._error is None:
1034             self._error = ovs.json.to_string(json)
1035
1036     def get_insert_uuid(self, uuid):
1037         """Finds and returns the permanent UUID that the database assigned to a
1038         newly inserted row, given the UUID that Transaction.insert() assigned
1039         locally to that row.
1040
1041         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1042         or if it was assigned by that function and then deleted by Row.delete()
1043         within the same transaction.  (Rows that are inserted and then deleted
1044         within a single transaction are never sent to the database server, so
1045         it never assigns them a permanent UUID.)
1046
1047         This transaction must have completed successfully."""
1048         assert self._status in (Transaction.SUCCESS,
1049                                 Transaction.UNCHANGED)
1050         inserted_row = self._inserted_rows.get(uuid)
1051         if inserted_row:
1052             return inserted_row.real
1053         return None
1054
1055     def _increment(self, row, column):
1056         assert not self._inc_row
1057         self._inc_row = row
1058         self._inc_column = column
1059
1060     def _write(self, row, column, datum):
1061         assert row._changes is not None
1062
1063         txn = row._idl.txn
1064
1065         # If this is a write-only column and the datum being written is the
1066         # same as the one already there, just skip the update entirely.  This
1067         # is worth optimizing because we have a lot of columns that get
1068         # periodically refreshed into the database but don't actually change
1069         # that often.
1070         #
1071         # We don't do this for read/write columns because that would break
1072         # atomicity of transactions--some other client might have written a
1073         # different value in that column since we read it.  (But if a whole
1074         # transaction only does writes of existing values, without making any
1075         # real changes, we will drop the whole transaction later in
1076         # ovsdb_idl_txn_commit().)
1077         if not column.alert and row._data.get(column.name) == datum:
1078             new_value = row._changes.get(column.name)
1079             if new_value is None or new_value == datum:
1080                 return
1081
1082         txn._txn_rows[row.uuid] = row
1083         row._changes[column.name] = datum.copy()
1084
1085     def insert(self, table, new_uuid=None):
1086         """Inserts and returns a new row in 'table', which must be one of the
1087         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1088
1089         The new row is assigned a provisional UUID.  If 'uuid' is None then one
1090         is randomly generated; otherwise 'uuid' should specify a randomly
1091         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
1092         different UUID when 'txn' is committed, but the IDL will replace any
1093         uses of the provisional UUID in the data to be to be committed by the
1094         UUID assigned by ovsdb-server."""
1095         assert self._status == Transaction.UNCOMMITTED
1096         if new_uuid is None:
1097             new_uuid = uuid.uuid4()
1098         row = Row(self.idl, table, new_uuid, None)
1099         table.rows[row.uuid] = row
1100         self._txn_rows[row.uuid] = row
1101         return row
1102
1103     def _process_reply(self, msg):
1104         if msg.type == ovs.jsonrpc.Message.T_ERROR:
1105             self._status = Transaction.ERROR
1106         elif type(msg.result) not in (list, tuple):
1107             # XXX rate-limit
1108             vlog.warn('reply to "transact" is not JSON array')
1109         else:
1110             hard_errors = False
1111             soft_errors = False
1112             lock_errors = False
1113
1114             ops = msg.result
1115             for op in ops:
1116                 if op is None:
1117                     # This isn't an error in itself but indicates that some
1118                     # prior operation failed, so make sure that we know about
1119                     # it.
1120                     soft_errors = True
1121                 elif type(op) == dict:
1122                     error = op.get("error")
1123                     if error is not None:
1124                         if error == "timed out":
1125                             soft_errors = True
1126                         elif error == "not owner":
1127                             lock_errors = True
1128                         elif error == "aborted":
1129                             pass
1130                         else:
1131                             hard_errors = True
1132                             self.__set_error_json(op)
1133                 else:
1134                     hard_errors = True
1135                     self.__set_error_json(op)
1136                     # XXX rate-limit
1137                     vlog.warn("operation reply is not JSON null or object")
1138
1139             if not soft_errors and not hard_errors and not lock_errors:
1140                 if self._inc_row and not self.__process_inc_reply(ops):
1141                     hard_errors = True
1142
1143                 for insert in self._inserted_rows.itervalues():
1144                     if not self.__process_insert_reply(insert, ops):
1145                         hard_errors = True
1146
1147             if hard_errors:
1148                 self._status = Transaction.ERROR
1149             elif lock_errors:
1150                 self._status = Transaction.NOT_LOCKED
1151             elif soft_errors:
1152                 self._status = Transaction.TRY_AGAIN
1153             else:
1154                 self._status = Transaction.SUCCESS
1155
1156     @staticmethod
1157     def __check_json_type(json, types, name):
1158         if not json:
1159             # XXX rate-limit
1160             vlog.warn("%s is missing" % name)
1161             return False
1162         elif type(json) not in types:
1163             # XXX rate-limit
1164             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1165             return False
1166         else:
1167             return True
1168
1169     def __process_inc_reply(self, ops):
1170         if self._inc_index + 2 > len(ops):
1171             # XXX rate-limit
1172             vlog.warn("reply does not contain enough operations for "
1173                       "increment (has %d, needs %d)" %
1174                       (len(ops), self._inc_index + 2))
1175
1176         # We know that this is a JSON object because the loop in
1177         # __process_reply() already checked.
1178         mutate = ops[self._inc_index]
1179         count = mutate.get("count")
1180         if not Transaction.__check_json_type(count, (int, long),
1181                                              '"mutate" reply "count"'):
1182             return False
1183         if count != 1:
1184             # XXX rate-limit
1185             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1186             return False
1187
1188         select = ops[self._inc_index + 1]
1189         rows = select.get("rows")
1190         if not Transaction.__check_json_type(rows, (list, tuple),
1191                                              '"select" reply "rows"'):
1192             return False
1193         if len(rows) != 1:
1194             # XXX rate-limit
1195             vlog.warn('"select" reply "rows" has %d elements '
1196                       'instead of 1' % len(rows))
1197             return False
1198         row = rows[0]
1199         if not Transaction.__check_json_type(row, (dict,),
1200                                              '"select" reply row'):
1201             return False
1202         column = row.get(self._inc_column)
1203         if not Transaction.__check_json_type(column, (int, long),
1204                                              '"select" reply inc column'):
1205             return False
1206         self._inc_new_value = column
1207         return True
1208
1209     def __process_insert_reply(self, insert, ops):
1210         if insert.op_index >= len(ops):
1211             # XXX rate-limit
1212             vlog.warn("reply does not contain enough operations "
1213                       "for insert (has %d, needs %d)"
1214                       % (len(ops), insert.op_index))
1215             return False
1216
1217         # We know that this is a JSON object because the loop in
1218         # __process_reply() already checked.
1219         reply = ops[insert.op_index]
1220         json_uuid = reply.get("uuid")
1221         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1222                                              '"insert" reply "uuid"'):
1223             return False
1224
1225         try:
1226             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1227         except error.Error:
1228             # XXX rate-limit
1229             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1230             return False
1231
1232         insert.real = uuid_
1233         return True
1234
1235
1236 class SchemaHelper(object):
1237     """IDL Schema helper.
1238
1239     This class encapsulates the logic required to generate schemas suitable
1240     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1241     they are interested in using register_columns().  When finished, the
1242     get_idl_schema() function may be called.
1243
1244     The location on disk of the schema used may be found in the
1245     'schema_location' variable."""
1246
1247     def __init__(self, location=None, schema_json=None):
1248         """Creates a new Schema object.
1249
1250         'location' file path to ovs schema. None means default location
1251         'schema_json' schema in json preresentation in memory
1252         """
1253
1254         if location and schema_json:
1255             raise ValueError("both location and schema_json can't be "
1256                              "specified. it's ambiguous.")
1257         if schema_json is None:
1258             if location is None:
1259                 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1260             schema_json = ovs.json.from_file(location)
1261
1262         self.schema_json = schema_json
1263         self._tables = {}
1264         self._all = False
1265
1266     def register_columns(self, table, columns):
1267         """Registers interest in the given 'columns' of 'table'.  Future calls
1268         to get_idl_schema() will include 'table':column for each column in
1269         'columns'. This function automatically avoids adding duplicate entries
1270         to the schema.
1271
1272         'table' must be a string.
1273         'columns' must be a list of strings.
1274         """
1275
1276         assert type(table) is str
1277         assert type(columns) is list
1278
1279         columns = set(columns) | self._tables.get(table, set())
1280         self._tables[table] = columns
1281
1282     def register_table(self, table):
1283         """Registers interest in the given all columns of 'table'. Future calls
1284         to get_idl_schema() will include all columns of 'table'.
1285
1286         'table' must be a string
1287         """
1288         assert type(table) is str
1289         self._tables[table] = set()  # empty set means all columns in the table
1290
1291     def register_all(self):
1292         """Registers interest in every column of every table."""
1293         self._all = True
1294
1295     def get_idl_schema(self):
1296         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1297         object based on columns registered using the register_columns()
1298         function."""
1299
1300         schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1301         self.schema_json = None
1302
1303         if not self._all:
1304             schema_tables = {}
1305             for table, columns in self._tables.iteritems():
1306                 schema_tables[table] = (
1307                     self._keep_table_columns(schema, table, columns))
1308
1309             schema.tables = schema_tables
1310         return schema
1311
1312     def _keep_table_columns(self, schema, table_name, columns):
1313         assert table_name in schema.tables
1314         table = schema.tables[table_name]
1315
1316         if not columns:
1317             # empty set means all columns in the table
1318             return table
1319
1320         new_columns = {}
1321         for column_name in columns:
1322             assert type(column_name) is str
1323             assert column_name in table.columns
1324
1325             new_columns[column_name] = table.columns[column_name]
1326
1327         table.columns = new_columns
1328         return table