1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 from ovs.db import error
28 vlog = ovs.vlog.Vlog("idl")
30 __pychecker__ = 'no-classattr no-objattrs'
38 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
40 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
41 requests to an OVSDB database server and parses the responses, converting
42 raw JSON into data structures that are easier for clients to digest.
44 The IDL also assists with issuing database transactions. The client
45 creates a transaction, manipulates the IDL data structures, and commits or
46 aborts the transaction. The IDL then composes and issues the necessary
47 JSON-RPC requests and reports to the client whether the transaction
48 completed successfully.
50 The client is allowed to access the following attributes directly, in a
53 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
54 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
55 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
58 The client may directly read and write the Row objects referenced by the
59 'rows' map values. Refer to Row for more details.
61 - 'change_seqno': A number that represents the IDL's state. When the IDL
62 is updated (by Idl.run()), its value changes. The sequence number can
63 occasionally change even if the database does not. This happens if the
64 connection to the database drops and reconnects, which causes the
65 database contents to be reloaded even if they didn't change. (It could
66 also happen if the database server sends out a "change" that reflects
67 what the IDL already thought was in the database. The database server is
68 not supposed to do that, but bugs could in theory cause it to do so.)
70 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
71 if no lock is configured.
73 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
74 lock, and False otherwise.
76 Locking and unlocking happens asynchronously from the database client's
77 point of view, so the information is only useful for optimization
78 (e.g. if the client doesn't have the lock then there's no point in trying
79 to write to the database).
81 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
82 the database server has indicated that some other client already owns the
83 requested lock, and False otherwise.
85 - 'txn': The ovs.db.idl.Transaction object for the database transaction
86 currently being constructed, if there is one, or None otherwise.
89 def __init__(self, remote, schema):
90 """Creates and returns a connection to the database named 'db_name' on
91 'remote', which should be in a form acceptable to
92 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
93 replica of the remote database.
95 'schema' should be the schema for the remote database. The caller may
96 have cut it down by removing tables or columns that are not of
97 interest. The IDL will only replicate the tables and columns that
98 remain. The caller may also add a attribute named 'alert' to selected
99 remaining columns, setting its value to False; if so, then changes to
100 those columns will not be considered changes to the database for the
101 purpose of the return value of Idl.run() and Idl.change_seqno. This is
102 useful for columns that the IDL's client will write but not read.
104 As a convenience to users, 'schema' may also be an instance of the
107 The IDL uses and modifies 'schema' directly."""
109 assert isinstance(schema, SchemaHelper)
110 schema = schema.get_idl_schema()
112 self.tables = schema.tables
113 self.readonly = schema.readonly
115 self._session = ovs.jsonrpc.Session.open(remote)
116 self._monitor_request_id = None
117 self._last_seqno = None
118 self.change_seqno = 0
121 self.lock_name = None # Name of lock we need, None if none.
122 self.has_lock = False # Has db server said we have the lock?
123 self.is_lock_contended = False # Has db server said we can't get lock?
124 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
126 # Transaction support.
128 self._outstanding_txns = {}
130 for table in six.itervalues(schema.tables):
131 for column in six.itervalues(table.columns):
132 if not hasattr(column, 'alert'):
134 table.need_table = False
139 """Closes the connection to the database. The IDL will no longer
141 self._session.close()
144 """Processes a batch of messages from the database server. Returns
145 True if the database as seen through the IDL changed, False if it did
146 not change. The initial fetch of the entire contents of the remote
147 database is considered to be one kind of change. If the IDL has been
148 configured to acquire a database lock (with Idl.set_lock()), then
149 successfully acquiring the lock is also considered to be a change.
151 This function can return occasional false positives, that is, report
152 that the database changed even though it didn't. This happens if the
153 connection to the database drops and reconnects, which causes the
154 database contents to be reloaded even if they didn't change. (It could
155 also happen if the database server sends out a "change" that reflects
156 what we already thought was in the database, but the database server is
157 not supposed to do that.)
159 As an alternative to checking the return value, the client may check
160 for changes in self.change_seqno."""
162 initial_change_seqno = self.change_seqno
167 if not self._session.is_connected():
170 seqno = self._session.get_seqno()
171 if seqno != self._last_seqno:
172 self._last_seqno = seqno
173 self.__txn_abort_all()
174 self.__send_monitor_request()
176 self.__send_lock_request()
179 msg = self._session.recv()
182 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
183 and msg.method == "update"
184 and len(msg.params) == 2
185 and msg.params[0] is None):
186 # Database contents changed.
187 self.__parse_update(msg.params[1])
188 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
189 and self._monitor_request_id is not None
190 and self._monitor_request_id == msg.id):
191 # Reply to our "monitor" request.
193 self.change_seqno += 1
194 self._monitor_request_id = None
196 self.__parse_update(msg.result)
197 except error.Error as e:
198 vlog.err("%s: parse error in received schema: %s"
199 % (self._session.get_name(), e))
201 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
202 and self._lock_request_id is not None
203 and self._lock_request_id == msg.id):
204 # Reply to our "lock" request.
205 self.__parse_lock_reply(msg.result)
206 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
207 and msg.method == "locked"):
209 self.__parse_lock_notify(msg.params, True)
210 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
211 and msg.method == "stolen"):
212 # Someone else stole our lock.
213 self.__parse_lock_notify(msg.params, False)
214 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
215 # Reply to our echo request. Ignore it.
217 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
218 ovs.jsonrpc.Message.T_REPLY)
219 and self.__txn_process_reply(msg)):
220 # __txn_process_reply() did everything needed.
223 # This can happen if a transaction is destroyed before we
224 # receive the reply, so keep the log level low.
225 vlog.dbg("%s: received unexpected %s message"
226 % (self._session.get_name(),
227 ovs.jsonrpc.Message.type_to_string(msg.type)))
229 return initial_change_seqno != self.change_seqno
231 def wait(self, poller):
232 """Arranges for poller.block() to wake up when self.run() has something
233 to do or when activity occurs on a transaction on 'self'."""
234 self._session.wait(poller)
235 self._session.recv_wait(poller)
237 def has_ever_connected(self):
238 """Returns True, if the IDL successfully connected to the remote
239 database and retrieved its contents (even if the connection
240 subsequently dropped and is in the process of reconnecting). If so,
241 then the IDL contains an atomic snapshot of the database's contents
242 (but it might be arbitrarily old if the connection dropped).
244 Returns False if the IDL has never connected or retrieved the
245 database's contents. If so, the IDL is empty."""
246 return self.change_seqno != 0
248 def force_reconnect(self):
249 """Forces the IDL to drop its connection to the database and reconnect.
250 In the meantime, the contents of the IDL will not change."""
251 self._session.force_reconnect()
253 def set_lock(self, lock_name):
254 """If 'lock_name' is not None, configures the IDL to obtain the named
255 lock from the database server and to avoid modifying the database when
256 the lock cannot be acquired (that is, when another client has the same
259 If 'lock_name' is None, drops the locking requirement and releases the
262 assert not self._outstanding_txns
264 if self.lock_name and (not lock_name or lock_name != self.lock_name):
265 # Release previous lock.
266 self.__send_unlock_request()
267 self.lock_name = None
268 self.is_lock_contended = False
270 if lock_name and not self.lock_name:
272 self.lock_name = lock_name
273 self.__send_lock_request()
275 def notify(self, event, row, updates=None):
276 """Hook for implementing create/update/delete notifications
278 :param event: The event that was triggered
279 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
280 :param row: The row as it is after the operation has occured
282 :param updates: For updates, a Row object with just the changed columns
289 for table in six.itervalues(self.tables):
295 self.change_seqno += 1
297 def __update_has_lock(self, new_has_lock):
298 if new_has_lock and not self.has_lock:
299 if self._monitor_request_id is None:
300 self.change_seqno += 1
302 # We're waiting for a monitor reply, so don't signal that the
303 # database changed. The monitor reply will increment
304 # change_seqno anyhow.
306 self.is_lock_contended = False
307 self.has_lock = new_has_lock
309 def __do_send_lock_request(self, method):
310 self.__update_has_lock(False)
311 self._lock_request_id = None
312 if self._session.is_connected():
313 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
315 self._session.send(msg)
320 def __send_lock_request(self):
321 self._lock_request_id = self.__do_send_lock_request("lock")
323 def __send_unlock_request(self):
324 self.__do_send_lock_request("unlock")
326 def __parse_lock_reply(self, result):
327 self._lock_request_id = None
328 got_lock = isinstance(result, dict) and result.get("locked") is True
329 self.__update_has_lock(got_lock)
331 self.is_lock_contended = True
333 def __parse_lock_notify(self, params, new_has_lock):
334 if (self.lock_name is not None
335 and isinstance(params, (list, tuple))
337 and params[0] == self.lock_name):
338 self.__update_has_lock(new_has_lock)
340 self.is_lock_contended = True
342 def __send_monitor_request(self):
343 monitor_requests = {}
344 for table in six.itervalues(self.tables):
346 for column in six.iterkeys(table.columns):
347 if ((table.name not in self.readonly) or
348 (table.name in self.readonly) and
349 (column not in self.readonly[table.name])):
350 columns.append(column)
351 monitor_requests[table.name] = {"columns": columns}
352 msg = ovs.jsonrpc.Message.create_request(
353 "monitor", [self._db.name, None, monitor_requests])
354 self._monitor_request_id = msg.id
355 self._session.send(msg)
357 def __parse_update(self, update):
359 self.__do_parse_update(update)
360 except error.Error as e:
361 vlog.err("%s: error parsing update: %s"
362 % (self._session.get_name(), e))
364 def __do_parse_update(self, table_updates):
365 if not isinstance(table_updates, dict):
366 raise error.Error("<table-updates> is not an object",
369 for table_name, table_update in six.iteritems(table_updates):
370 table = self.tables.get(table_name)
372 raise error.Error('<table-updates> includes unknown '
373 'table "%s"' % table_name)
375 if not isinstance(table_update, dict):
376 raise error.Error('<table-update> for table "%s" is not '
377 'an object' % table_name, table_update)
379 for uuid_string, row_update in six.iteritems(table_update):
380 if not ovs.ovsuuid.is_valid_string(uuid_string):
381 raise error.Error('<table-update> for table "%s" '
382 'contains bad UUID "%s" as member '
383 'name' % (table_name, uuid_string),
385 uuid = ovs.ovsuuid.from_string(uuid_string)
387 if not isinstance(row_update, dict):
388 raise error.Error('<table-update> for table "%s" '
389 'contains <row-update> for %s that '
391 % (table_name, uuid_string))
393 parser = ovs.db.parser.Parser(row_update, "row-update")
394 old = parser.get_optional("old", [dict])
395 new = parser.get_optional("new", [dict])
398 if not old and not new:
399 raise error.Error('<row-update> missing "old" and '
400 '"new" members', row_update)
402 if self.__process_update(table, uuid, old, new):
403 self.change_seqno += 1
405 def __process_update(self, table, uuid, old, new):
406 """Returns True if a column changed, False otherwise."""
407 row = table.rows.get(uuid)
414 self.notify(ROW_DELETE, row)
417 vlog.warn("cannot delete missing row %s from table %s"
418 % (uuid, table.name))
422 row = self.__create_row(table, uuid)
426 vlog.warn("cannot add existing row %s to table %s"
427 % (uuid, table.name))
428 if self.__row_update(table, row, new):
430 self.notify(ROW_CREATE, row)
434 row = self.__create_row(table, uuid)
438 vlog.warn("cannot modify missing row %s in table %s"
439 % (uuid, table.name))
440 if self.__row_update(table, row, new):
442 self.notify(op, row, Row.from_json(self, table, uuid, old))
445 def __row_update(self, table, row, row_json):
447 for column_name, datum_json in six.iteritems(row_json):
448 column = table.columns.get(column_name)
451 vlog.warn("unknown column %s updating table %s"
452 % (column_name, table.name))
456 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
457 except error.Error as e:
459 vlog.warn("error parsing column %s in table %s: %s"
460 % (column_name, table.name, e))
463 if datum != row._data[column_name]:
464 row._data[column_name] = datum
468 # Didn't really change but the OVSDB monitor protocol always
469 # includes every value in a row.
473 def __create_row(self, table, uuid):
475 for column in six.itervalues(table.columns):
476 data[column.name] = ovs.db.data.Datum.default(column.type)
477 row = table.rows[uuid] = Row(self, table, uuid, data)
481 self._session.force_reconnect()
483 def __txn_abort_all(self):
484 while self._outstanding_txns:
485 txn = self._outstanding_txns.popitem()[1]
486 txn._status = Transaction.TRY_AGAIN
488 def __txn_process_reply(self, msg):
489 txn = self._outstanding_txns.pop(msg.id, None)
491 txn._process_reply(msg)
494 def _uuid_to_row(atom, base):
496 return base.ref_table.rows.get(atom)
501 def _row_to_uuid(value):
502 if isinstance(value, Row):
508 @functools.total_ordering
510 """A row within an IDL.
512 The client may access the following attributes directly:
514 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
516 - An attribute for each column in the Row's table, named for the column,
517 whose values are as returned by Datum.to_python() for the column's type.
519 If some error occurs (e.g. the database server's idea of the column is
520 different from the IDL's idea), then the attribute values is the
521 "default" value return by Datum.default() for the column's type. (It is
522 important to know this because the default value may violate constraints
523 for the column's type, e.g. the default integer value is 0 even if column
524 contraints require the column's value to be positive.)
526 When a transaction is active, column attributes may also be assigned new
527 values. Committing the transaction will then cause the new value to be
528 stored into the database.
530 *NOTE*: In the current implementation, the value of a column is a *copy*
531 of the value in the database. This means that modifying its value
532 directly will have no useful effect. For example, the following:
533 row.mycolumn["a"] = "b" # don't do this
534 will not change anything in the database, even after commit. To modify
535 the column, instead assign the modified column value back to the column:
540 def __init__(self, idl, table, uuid, data):
541 # All of the explicit references to self.__dict__ below are required
542 # to set real attributes with invoking self.__getattr__().
543 self.__dict__["uuid"] = uuid
545 self.__dict__["_idl"] = idl
546 self.__dict__["_table"] = table
548 # _data is the committed data. It takes the following values:
550 # - A dictionary that maps every column name to a Datum, if the row
551 # exists in the committed form of the database.
553 # - None, if this row is newly inserted within the active transaction
554 # and thus has no committed form.
555 self.__dict__["_data"] = data
557 # _changes describes changes to this row within the active transaction.
558 # It takes the following values:
560 # - {}, the empty dictionary, if no transaction is active or if the
561 # row has yet not been changed within this transaction.
563 # - A dictionary that maps a column name to its new Datum, if an
564 # active transaction changes those columns' values.
566 # - A dictionary that maps every column name to a Datum, if the row
567 # is newly inserted within the active transaction.
569 # - None, if this transaction deletes this row.
570 self.__dict__["_changes"] = {}
572 # A dictionary whose keys are the names of columns that must be
573 # verified as prerequisites when the transaction commits. The values
574 # in the dictionary are all None.
575 self.__dict__["_prereqs"] = {}
577 def __lt__(self, other):
578 if not isinstance(other, Row):
579 return NotImplemented
580 return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
582 def __eq__(self, other):
583 if not isinstance(other, Row):
584 return NotImplemented
585 return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
588 return int(self.__dict__['uuid'])
590 def __getattr__(self, column_name):
591 assert self._changes is not None
593 datum = self._changes.get(column_name)
595 if self._data is None:
596 raise AttributeError("%s instance has no attribute '%s'" %
597 (self.__class__.__name__, column_name))
598 if column_name in self._data:
599 datum = self._data[column_name]
601 raise AttributeError("%s instance has no attribute '%s'" %
602 (self.__class__.__name__, column_name))
604 return datum.to_python(_uuid_to_row)
606 def __setattr__(self, column_name, value):
607 assert self._changes is not None
610 if ((self._table.name in self._idl.readonly) and
611 (column_name in self._idl.readonly[self._table.name])):
612 vlog.warn("attempting to write to readonly column %s"
616 column = self._table.columns[column_name]
618 datum = ovs.db.data.Datum.from_python(column.type, value,
620 except error.Error as e:
622 vlog.err("attempting to write bad value to column %s (%s)"
625 self._idl.txn._write(self, column, datum)
628 def from_json(cls, idl, table, uuid, row_json):
630 for column_name, datum_json in six.iteritems(row_json):
631 column = table.columns.get(column_name)
634 vlog.warn("unknown column %s in table %s"
635 % (column_name, table.name))
638 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
639 except error.Error as e:
641 vlog.warn("error parsing column %s in table %s: %s"
642 % (column_name, table.name, e))
644 data[column_name] = datum
645 return cls(idl, table, uuid, data)
647 def verify(self, column_name):
648 """Causes the original contents of column 'column_name' in this row to
649 be verified as a prerequisite to completing the transaction. That is,
650 if 'column_name' changed in this row (or if this row was deleted)
651 between the time that the IDL originally read its contents and the time
652 that the transaction commits, then the transaction aborts and
653 Transaction.commit() returns Transaction.TRY_AGAIN.
655 The intention is that, to ensure that no transaction commits based on
656 dirty reads, an application should call Row.verify() on each data item
657 read as part of a read-modify-write operation.
659 In some cases Row.verify() reduces to a no-op, because the current
660 value of the column is already known:
662 - If this row is a row created by the current transaction (returned
663 by Transaction.insert()).
665 - If the column has already been modified within the current
668 Because of the latter property, always call Row.verify() *before*
669 modifying the column, for a given read-modify-write.
671 A transaction must be in progress."""
673 assert self._changes is not None
674 if not self._data or column_name in self._changes:
677 self._prereqs[column_name] = None
680 """Deletes this row from its table.
682 A transaction must be in progress."""
684 assert self._changes is not None
685 if self._data is None:
686 del self._idl.txn._txn_rows[self.uuid]
688 self._idl.txn._txn_rows[self.uuid] = self
689 self.__dict__["_changes"] = None
690 del self._table.rows[self.uuid]
692 def fetch(self, column_name):
693 self._idl.txn._fetch(self, column_name)
695 def increment(self, column_name):
696 """Causes the transaction, when committed, to increment the value of
697 'column_name' within this row by 1. 'column_name' must have an integer
698 type. After the transaction commits successfully, the client may
699 retrieve the final (incremented) value of 'column_name' with
700 Transaction.get_increment_new_value().
702 The client could accomplish something similar by reading and writing
703 and verify()ing columns. However, increment() will never (by itself)
704 cause a transaction to fail because of a verify error.
706 The intended use is for incrementing the "next_cfg" column in
707 the Open_vSwitch table."""
708 self._idl.txn._increment(self, column_name)
711 def _uuid_name_from_uuid(uuid):
712 return "row%s" % str(uuid).replace("-", "_")
715 def _where_uuid_equals(uuid):
716 return [["_uuid", "==", ["uuid", str(uuid)]]]
719 class _InsertedRow(object):
720 def __init__(self, op_index):
721 self.op_index = op_index
725 class Transaction(object):
726 """A transaction may modify the contents of a database by modifying the
727 values of columns, deleting rows, inserting rows, or adding checks that
728 columns in the database have not changed ("verify" operations), through
731 Reading and writing columns and inserting and deleting rows are all
732 straightforward. The reasons to verify columns are less obvious.
733 Verification is the key to maintaining transactional integrity. Because
734 OVSDB handles multiple clients, it can happen that between the time that
735 OVSDB client A reads a column and writes a new value, OVSDB client B has
736 written that column. Client A's write should not ordinarily overwrite
737 client B's, especially if the column in question is a "map" column that
738 contains several more or less independent data items. If client A adds a
739 "verify" operation before it writes the column, then the transaction fails
740 in case client B modifies it first. Client A will then see the new value
741 of the column and compose a new transaction based on the new contents
744 When a transaction is complete, which must be before the next call to
745 Idl.run(), call Transaction.commit() or Transaction.abort().
747 The life-cycle of a transaction looks like this:
749 1. Create the transaction and record the initial sequence number:
751 seqno = idl.change_seqno(idl)
752 txn = Transaction(idl)
754 2. Modify the database with Row and Transaction methods.
756 3. Commit the transaction by calling Transaction.commit(). The first call
757 to this function probably returns Transaction.INCOMPLETE. The client
758 must keep calling again along as this remains true, calling Idl.run() in
759 between to let the IDL do protocol processing. (If the client doesn't
760 have anything else to do in the meantime, it can use
761 Transaction.commit_block() to avoid having to loop itself.)
763 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
764 to change from the saved 'seqno' (it's possible that it's already
765 changed, in which case the client should not wait at all), then start
766 over from step 1. Only a call to Idl.run() will change the return value
767 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
769 # Status values that Transaction.commit() can return.
771 # Not yet committed or aborted.
772 UNCOMMITTED = "uncommitted"
773 # Transaction didn't include any changes.
774 UNCHANGED = "unchanged"
775 # Commit in progress, please wait.
776 INCOMPLETE = "incomplete"
777 # ovsdb_idl_txn_abort() called.
781 # Commit failed because a "verify" operation
782 # reported an inconsistency, due to a network
783 # problem, or other transient failure. Wait
784 # for a change, then try again.
785 TRY_AGAIN = "try again"
786 # Server hasn't given us the lock yet.
787 NOT_LOCKED = "not locked"
788 # Commit failed due to a hard error.
792 def status_to_string(status):
793 """Converts one of the status values that Transaction.commit() can
794 return into a human-readable string.
796 (The status values are in fact such strings already, so
797 there's nothing to do.)"""
800 def __init__(self, idl):
801 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
802 A given Idl may only have a single active transaction at a time.
804 A Transaction may modify the contents of a database by assigning new
805 values to columns (attributes of Row), deleting rows (with
806 Row.delete()), or inserting rows (with Transaction.insert()). It may
807 also check that columns in the database have not changed with
810 When a transaction is complete (which must be before the next call to
811 Idl.run()), call Transaction.commit() or Transaction.abort()."""
812 assert idl.txn is None
815 self._request_id = None
819 self._status = Transaction.UNCOMMITTED
824 self._inc_column = None
826 self._fetch_requests = []
828 self._inserted_rows = {} # Map from UUID to _InsertedRow
830 def add_comment(self, comment):
831 """Appends 'comment' to the comments that will be passed to the OVSDB
832 server when this transaction is committed. (The comment will be
833 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
834 relatively human-readable form.)"""
835 self._comments.append(comment)
837 def wait(self, poller):
838 """Causes poll_block() to wake up if this transaction has completed
840 if self._status not in (Transaction.UNCOMMITTED,
841 Transaction.INCOMPLETE):
842 poller.immediate_wake()
844 def _substitute_uuids(self, json):
845 if isinstance(json, (list, tuple)):
847 and json[0] == 'uuid'
848 and ovs.ovsuuid.is_valid_string(json[1])):
849 uuid = ovs.ovsuuid.from_string(json[1])
850 row = self._txn_rows.get(uuid, None)
851 if row and row._data is None:
852 return ["named-uuid", _uuid_name_from_uuid(uuid)]
854 return [self._substitute_uuids(elem) for elem in json]
857 def __disassemble(self):
860 for row in six.itervalues(self._txn_rows):
861 if row._changes is None:
862 row._table.rows[row.uuid] = row
863 elif row._data is None:
864 del row._table.rows[row.uuid]
865 row.__dict__["_changes"] = {}
866 row.__dict__["_prereqs"] = {}
870 """Attempts to commit 'txn'. Returns the status of the commit
871 operation, one of the following constants:
873 Transaction.INCOMPLETE:
875 The transaction is in progress, but not yet complete. The caller
876 should call again later, after calling Idl.run() to let the
877 IDL do OVSDB protocol processing.
879 Transaction.UNCHANGED:
881 The transaction is complete. (It didn't actually change the
882 database, so the IDL didn't send any request to the database
887 The caller previously called Transaction.abort().
891 The transaction was successful. The update made by the
892 transaction (and possibly other changes made by other database
893 clients) should already be visible in the IDL.
895 Transaction.TRY_AGAIN:
897 The transaction failed for some transient reason, e.g. because a
898 "verify" operation reported an inconsistency or due to a network
899 problem. The caller should wait for a change to the database,
900 then compose a new transaction, and commit the new transaction.
902 Use Idl.change_seqno to wait for a change in the database. It is
903 important to use its value *before* the initial call to
904 Transaction.commit() as the baseline for this purpose, because
905 the change that one should wait for can happen after the initial
906 call but before the call that returns Transaction.TRY_AGAIN, and
907 using some other baseline value in that situation could cause an
908 indefinite wait if the database rarely changes.
910 Transaction.NOT_LOCKED:
912 The transaction failed because the IDL has been configured to
913 require a database lock (with Idl.set_lock()) but didn't
914 get it yet or has already lost it.
916 Committing a transaction rolls back all of the changes that it made to
917 the IDL's copy of the database. If the transaction commits
918 successfully, then the database server will send an update and, thus,
919 the IDL will be updated with the committed changes."""
920 # The status can only change if we're the active transaction.
921 # (Otherwise, our status will change only in Idl.run().)
922 if self != self.idl.txn:
925 # If we need a lock but don't have it, give up quickly.
926 if self.idl.lock_name and not self.idl.has_lock:
927 self._status = Transaction.NOT_LOCKED
931 operations = [self.idl._db.name]
933 # Assert that we have the required lock (avoiding a race).
934 if self.idl.lock_name:
935 operations.append({"op": "assert",
936 "lock": self.idl.lock_name})
938 # Add prerequisites and declarations of new rows.
939 for row in six.itervalues(self._txn_rows):
943 for column_name in row._prereqs:
944 columns.append(column_name)
945 rows[column_name] = row._data[column_name].to_json()
946 operations.append({"op": "wait",
947 "table": row._table.name,
949 "where": _where_uuid_equals(row.uuid),
956 for row in six.itervalues(self._txn_rows):
957 if row._changes is None:
958 if row._table.is_root:
959 operations.append({"op": "delete",
960 "table": row._table.name,
961 "where": _where_uuid_equals(row.uuid)})
964 # Let ovsdb-server decide whether to really delete it.
967 op = {"table": row._table.name}
968 if row._data is None:
970 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
973 op_index = len(operations) - 1
974 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
977 op["where"] = _where_uuid_equals(row.uuid)
982 for column_name, datum in six.iteritems(row._changes):
983 if row._data is not None or not datum.is_default():
984 row_json[column_name] = (
985 self._substitute_uuids(datum.to_json()))
987 # If anything really changed, consider it an update.
988 # We can't suppress not-really-changed values earlier
989 # or transactions would become nonatomic (see the big
990 # comment inside Transaction._write()).
991 if (not any_updates and row._data is not None and
992 row._data[column_name] != datum):
995 if row._data is None or row_json:
996 operations.append(op)
998 if self._fetch_requests:
999 for fetch in self._fetch_requests:
1000 fetch["index"] = len(operations) - 1
1001 operations.append({"op": "select",
1002 "table": fetch["row"]._table.name,
1003 "where": self._substitute_uuids(
1004 _where_uuid_equals(fetch["row"].uuid)),
1005 "columns": [fetch["column_name"]]})
1009 if self._inc_row and any_updates:
1010 self._inc_index = len(operations) - 1
1012 operations.append({"op": "mutate",
1013 "table": self._inc_row._table.name,
1014 "where": self._substitute_uuids(
1015 _where_uuid_equals(self._inc_row.uuid)),
1016 "mutations": [[self._inc_column, "+=", 1]]})
1017 operations.append({"op": "select",
1018 "table": self._inc_row._table.name,
1019 "where": self._substitute_uuids(
1020 _where_uuid_equals(self._inc_row.uuid)),
1021 "columns": [self._inc_column]})
1025 operations.append({"op": "comment",
1026 "comment": "\n".join(self._comments)})
1030 operations.append({"op": "abort"})
1033 self._status = Transaction.UNCHANGED
1035 msg = ovs.jsonrpc.Message.create_request("transact", operations)
1036 self._request_id = msg.id
1037 if not self.idl._session.send(msg):
1038 self.idl._outstanding_txns[self._request_id] = self
1039 self._status = Transaction.INCOMPLETE
1041 self._status = Transaction.TRY_AGAIN
1043 self.__disassemble()
1046 def commit_block(self):
1047 """Attempts to commit this transaction, blocking until the commit
1048 either succeeds or fails. Returns the final commit status, which may
1049 be any Transaction.* value other than Transaction.INCOMPLETE.
1051 This function calls Idl.run() on this transaction'ss IDL, so it may
1052 cause Idl.change_seqno to change."""
1054 status = self.commit()
1055 if status != Transaction.INCOMPLETE:
1060 poller = ovs.poller.Poller()
1061 self.idl.wait(poller)
1065 def get_increment_new_value(self):
1066 """Returns the final (incremented) value of the column in this
1067 transaction that was set to be incremented by Row.increment. This
1068 transaction must have committed successfully."""
1069 assert self._status == Transaction.SUCCESS
1070 return self._inc_new_value
1073 """Aborts this transaction. If Transaction.commit() has already been
1074 called then the transaction might get committed anyhow."""
1075 self.__disassemble()
1076 if self._status in (Transaction.UNCOMMITTED,
1077 Transaction.INCOMPLETE):
1078 self._status = Transaction.ABORTED
1080 def get_error(self):
1081 """Returns a string representing this transaction's current status,
1082 suitable for use in log messages."""
1083 if self._status != Transaction.ERROR:
1084 return Transaction.status_to_string(self._status)
1088 return "no error details available"
1090 def __set_error_json(self, json):
1091 if self._error is None:
1092 self._error = ovs.json.to_string(json)
1094 def get_insert_uuid(self, uuid):
1095 """Finds and returns the permanent UUID that the database assigned to a
1096 newly inserted row, given the UUID that Transaction.insert() assigned
1097 locally to that row.
1099 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1100 or if it was assigned by that function and then deleted by Row.delete()
1101 within the same transaction. (Rows that are inserted and then deleted
1102 within a single transaction are never sent to the database server, so
1103 it never assigns them a permanent UUID.)
1105 This transaction must have completed successfully."""
1106 assert self._status in (Transaction.SUCCESS,
1107 Transaction.UNCHANGED)
1108 inserted_row = self._inserted_rows.get(uuid)
1110 return inserted_row.real
1113 def _increment(self, row, column):
1114 assert not self._inc_row
1116 self._inc_column = column
1118 def _fetch(self, row, column_name):
1119 self._fetch_requests.append({"row": row, "column_name": column_name})
1121 def _write(self, row, column, datum):
1122 assert row._changes is not None
1126 # If this is a write-only column and the datum being written is the
1127 # same as the one already there, just skip the update entirely. This
1128 # is worth optimizing because we have a lot of columns that get
1129 # periodically refreshed into the database but don't actually change
1132 # We don't do this for read/write columns because that would break
1133 # atomicity of transactions--some other client might have written a
1134 # different value in that column since we read it. (But if a whole
1135 # transaction only does writes of existing values, without making any
1136 # real changes, we will drop the whole transaction later in
1137 # ovsdb_idl_txn_commit().)
1138 if (not column.alert and row._data and
1139 row._data.get(column.name) == datum):
1140 new_value = row._changes.get(column.name)
1141 if new_value is None or new_value == datum:
1144 txn._txn_rows[row.uuid] = row
1145 row._changes[column.name] = datum.copy()
1147 def insert(self, table, new_uuid=None):
1148 """Inserts and returns a new row in 'table', which must be one of the
1149 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1151 The new row is assigned a provisional UUID. If 'uuid' is None then one
1152 is randomly generated; otherwise 'uuid' should specify a randomly
1153 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1154 different UUID when 'txn' is committed, but the IDL will replace any
1155 uses of the provisional UUID in the data to be to be committed by the
1156 UUID assigned by ovsdb-server."""
1157 assert self._status == Transaction.UNCOMMITTED
1158 if new_uuid is None:
1159 new_uuid = uuid.uuid4()
1160 row = Row(self.idl, table, new_uuid, None)
1161 table.rows[row.uuid] = row
1162 self._txn_rows[row.uuid] = row
1165 def _process_reply(self, msg):
1166 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1167 self._status = Transaction.ERROR
1168 elif not isinstance(msg.result, (list, tuple)):
1170 vlog.warn('reply to "transact" is not JSON array')
1179 # This isn't an error in itself but indicates that some
1180 # prior operation failed, so make sure that we know about
1183 elif isinstance(op, dict):
1184 error = op.get("error")
1185 if error is not None:
1186 if error == "timed out":
1188 elif error == "not owner":
1190 elif error == "aborted":
1194 self.__set_error_json(op)
1197 self.__set_error_json(op)
1199 vlog.warn("operation reply is not JSON null or object")
1201 if not soft_errors and not hard_errors and not lock_errors:
1202 if self._inc_row and not self.__process_inc_reply(ops):
1204 if self._fetch_requests:
1205 if self.__process_fetch_reply(ops):
1206 self.idl.change_seqno += 1
1210 for insert in six.itervalues(self._inserted_rows):
1211 if not self.__process_insert_reply(insert, ops):
1215 self._status = Transaction.ERROR
1217 self._status = Transaction.NOT_LOCKED
1219 self._status = Transaction.TRY_AGAIN
1221 self._status = Transaction.SUCCESS
1224 def __check_json_type(json, types, name):
1227 vlog.warn("%s is missing" % name)
1229 elif not isinstance(json, tuple(types)):
1231 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1236 def __process_fetch_reply(self, ops):
1238 for fetch_request in self._fetch_requests:
1239 row = fetch_request["row"]
1240 column_name = fetch_request["column_name"]
1241 index = fetch_request["index"]
1245 fetched_rows = select.get("rows")
1246 if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1247 '"select" reply "rows"'):
1249 if len(fetched_rows) != 1:
1251 vlog.warn('"select" reply "rows" has %d elements '
1252 'instead of 1' % len(fetched_rows))
1254 fetched_row = fetched_rows[0]
1255 if not Transaction.__check_json_type(fetched_row, (dict,),
1256 '"select" reply row'):
1259 column = table.columns.get(column_name)
1260 datum_json = fetched_row.get(column_name)
1261 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1263 row._data[column_name] = datum
1268 def __process_inc_reply(self, ops):
1269 if self._inc_index + 2 > len(ops):
1271 vlog.warn("reply does not contain enough operations for "
1272 "increment (has %d, needs %d)" %
1273 (len(ops), self._inc_index + 2))
1275 # We know that this is a JSON object because the loop in
1276 # __process_reply() already checked.
1277 mutate = ops[self._inc_index]
1278 count = mutate.get("count")
1279 if not Transaction.__check_json_type(count, six.integer_types,
1280 '"mutate" reply "count"'):
1284 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1287 select = ops[self._inc_index + 1]
1288 rows = select.get("rows")
1289 if not Transaction.__check_json_type(rows, (list, tuple),
1290 '"select" reply "rows"'):
1294 vlog.warn('"select" reply "rows" has %d elements '
1295 'instead of 1' % len(rows))
1298 if not Transaction.__check_json_type(row, (dict,),
1299 '"select" reply row'):
1301 column = row.get(self._inc_column)
1302 if not Transaction.__check_json_type(column, six.integer_types,
1303 '"select" reply inc column'):
1305 self._inc_new_value = column
1308 def __process_insert_reply(self, insert, ops):
1309 if insert.op_index >= len(ops):
1311 vlog.warn("reply does not contain enough operations "
1312 "for insert (has %d, needs %d)"
1313 % (len(ops), insert.op_index))
1316 # We know that this is a JSON object because the loop in
1317 # __process_reply() already checked.
1318 reply = ops[insert.op_index]
1319 json_uuid = reply.get("uuid")
1320 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1321 '"insert" reply "uuid"'):
1325 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1328 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1335 class SchemaHelper(object):
1336 """IDL Schema helper.
1338 This class encapsulates the logic required to generate schemas suitable
1339 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1340 they are interested in using register_columns(). When finished, the
1341 get_idl_schema() function may be called.
1343 The location on disk of the schema used may be found in the
1344 'schema_location' variable."""
1346 def __init__(self, location=None, schema_json=None):
1347 """Creates a new Schema object.
1349 'location' file path to ovs schema. None means default location
1350 'schema_json' schema in json preresentation in memory
1353 if location and schema_json:
1354 raise ValueError("both location and schema_json can't be "
1355 "specified. it's ambiguous.")
1356 if schema_json is None:
1357 if location is None:
1358 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1359 schema_json = ovs.json.from_file(location)
1361 self.schema_json = schema_json
1366 def register_columns(self, table, columns, readonly=[]):
1367 """Registers interest in the given 'columns' of 'table'. Future calls
1368 to get_idl_schema() will include 'table':column for each column in
1369 'columns'. This function automatically avoids adding duplicate entries
1371 A subset of 'columns' can be specified as 'readonly'. The readonly
1372 columns are not replicated but can be fetched on-demand by the user
1375 'table' must be a string.
1376 'columns' must be a list of strings.
1377 'readonly' must be a list of strings.
1380 assert isinstance(table, six.string_types)
1381 assert isinstance(columns, list)
1383 columns = set(columns) | self._tables.get(table, set())
1384 self._tables[table] = columns
1385 self._readonly[table] = readonly
1387 def register_table(self, table):
1388 """Registers interest in the given all columns of 'table'. Future calls
1389 to get_idl_schema() will include all columns of 'table'.
1391 'table' must be a string
1393 assert isinstance(table, six.string_types)
1394 self._tables[table] = set() # empty set means all columns in the table
1396 def register_all(self):
1397 """Registers interest in every column of every table."""
1400 def get_idl_schema(self):
1401 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1402 object based on columns registered using the register_columns()
1405 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1406 self.schema_json = None
1410 for table, columns in six.iteritems(self._tables):
1411 schema_tables[table] = (
1412 self._keep_table_columns(schema, table, columns))
1414 schema.tables = schema_tables
1415 schema.readonly = self._readonly
1418 def _keep_table_columns(self, schema, table_name, columns):
1419 assert table_name in schema.tables
1420 table = schema.tables[table_name]
1423 # empty set means all columns in the table
1427 for column_name in columns:
1428 assert isinstance(column_name, six.string_types)
1429 assert column_name in table.columns
1431 new_columns[column_name] = table.columns[column_name]
1433 table.columns = new_columns