1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from ovs.db import error
25 vlog = ovs.vlog.Vlog("idl")
27 __pychecker__ = 'no-classattr no-objattrs'
35 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
37 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
38 requests to an OVSDB database server and parses the responses, converting
39 raw JSON into data structures that are easier for clients to digest.
41 The IDL also assists with issuing database transactions. The client
42 creates a transaction, manipulates the IDL data structures, and commits or
43 aborts the transaction. The IDL then composes and issues the necessary
44 JSON-RPC requests and reports to the client whether the transaction
45 completed successfully.
47 The client is allowed to access the following attributes directly, in a
50 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
51 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
52 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
55 The client may directly read and write the Row objects referenced by the
56 'rows' map values. Refer to Row for more details.
58 - 'change_seqno': A number that represents the IDL's state. When the IDL
59 is updated (by Idl.run()), its value changes. The sequence number can
60 occasionally change even if the database does not. This happens if the
61 connection to the database drops and reconnects, which causes the
62 database contents to be reloaded even if they didn't change. (It could
63 also happen if the database server sends out a "change" that reflects
64 what the IDL already thought was in the database. The database server is
65 not supposed to do that, but bugs could in theory cause it to do so.)
67 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
68 if no lock is configured.
70 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
71 lock, and False otherwise.
73 Locking and unlocking happens asynchronously from the database client's
74 point of view, so the information is only useful for optimization
75 (e.g. if the client doesn't have the lock then there's no point in trying
76 to write to the database).
78 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
79 the database server has indicated that some other client already owns the
80 requested lock, and False otherwise.
82 - 'txn': The ovs.db.idl.Transaction object for the database transaction
83 currently being constructed, if there is one, or None otherwise.
86 def __init__(self, remote, schema):
87 """Creates and returns a connection to the database named 'db_name' on
88 'remote', which should be in a form acceptable to
89 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
90 replica of the remote database.
92 'schema' should be the schema for the remote database. The caller may
93 have cut it down by removing tables or columns that are not of
94 interest. The IDL will only replicate the tables and columns that
95 remain. The caller may also add a attribute named 'alert' to selected
96 remaining columns, setting its value to False; if so, then changes to
97 those columns will not be considered changes to the database for the
98 purpose of the return value of Idl.run() and Idl.change_seqno. This is
99 useful for columns that the IDL's client will write but not read.
101 As a convenience to users, 'schema' may also be an instance of the
104 The IDL uses and modifies 'schema' directly."""
106 assert isinstance(schema, SchemaHelper)
107 schema = schema.get_idl_schema()
109 self.tables = schema.tables
111 self._session = ovs.jsonrpc.Session.open(remote)
112 self._monitor_request_id = None
113 self._last_seqno = None
114 self.change_seqno = 0
117 self.lock_name = None # Name of lock we need, None if none.
118 self.has_lock = False # Has db server said we have the lock?
119 self.is_lock_contended = False # Has db server said we can't get lock?
120 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
122 # Transaction support.
124 self._outstanding_txns = {}
126 for table in schema.tables.itervalues():
127 for column in table.columns.itervalues():
128 if not hasattr(column, 'alert'):
130 table.need_table = False
135 """Closes the connection to the database. The IDL will no longer
137 self._session.close()
140 """Processes a batch of messages from the database server. Returns
141 True if the database as seen through the IDL changed, False if it did
142 not change. The initial fetch of the entire contents of the remote
143 database is considered to be one kind of change. If the IDL has been
144 configured to acquire a database lock (with Idl.set_lock()), then
145 successfully acquiring the lock is also considered to be a change.
147 This function can return occasional false positives, that is, report
148 that the database changed even though it didn't. This happens if the
149 connection to the database drops and reconnects, which causes the
150 database contents to be reloaded even if they didn't change. (It could
151 also happen if the database server sends out a "change" that reflects
152 what we already thought was in the database, but the database server is
153 not supposed to do that.)
155 As an alternative to checking the return value, the client may check
156 for changes in self.change_seqno."""
158 initial_change_seqno = self.change_seqno
163 if not self._session.is_connected():
166 seqno = self._session.get_seqno()
167 if seqno != self._last_seqno:
168 self._last_seqno = seqno
169 self.__txn_abort_all()
170 self.__send_monitor_request()
172 self.__send_lock_request()
175 msg = self._session.recv()
178 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
179 and msg.method == "update"
180 and len(msg.params) == 2
181 and msg.params[0] == None):
182 # Database contents changed.
183 self.__parse_update(msg.params[1])
184 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
185 and self._monitor_request_id is not None
186 and self._monitor_request_id == msg.id):
187 # Reply to our "monitor" request.
189 self.change_seqno += 1
190 self._monitor_request_id = None
192 self.__parse_update(msg.result)
193 except error.Error, e:
194 vlog.err("%s: parse error in received schema: %s"
195 % (self._session.get_name(), e))
197 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
198 and self._lock_request_id is not None
199 and self._lock_request_id == msg.id):
200 # Reply to our "lock" request.
201 self.__parse_lock_reply(msg.result)
202 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
203 and msg.method == "locked"):
205 self.__parse_lock_notify(msg.params, True)
206 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
207 and msg.method == "stolen"):
208 # Someone else stole our lock.
209 self.__parse_lock_notify(msg.params, False)
210 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
211 # Reply to our echo request. Ignore it.
213 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
214 ovs.jsonrpc.Message.T_REPLY)
215 and self.__txn_process_reply(msg)):
216 # __txn_process_reply() did everything needed.
219 # This can happen if a transaction is destroyed before we
220 # receive the reply, so keep the log level low.
221 vlog.dbg("%s: received unexpected %s message"
222 % (self._session.get_name(),
223 ovs.jsonrpc.Message.type_to_string(msg.type)))
225 return initial_change_seqno != self.change_seqno
227 def wait(self, poller):
228 """Arranges for poller.block() to wake up when self.run() has something
229 to do or when activity occurs on a transaction on 'self'."""
230 self._session.wait(poller)
231 self._session.recv_wait(poller)
233 def has_ever_connected(self):
234 """Returns True, if the IDL successfully connected to the remote
235 database and retrieved its contents (even if the connection
236 subsequently dropped and is in the process of reconnecting). If so,
237 then the IDL contains an atomic snapshot of the database's contents
238 (but it might be arbitrarily old if the connection dropped).
240 Returns False if the IDL has never connected or retrieved the
241 database's contents. If so, the IDL is empty."""
242 return self.change_seqno != 0
244 def force_reconnect(self):
245 """Forces the IDL to drop its connection to the database and reconnect.
246 In the meantime, the contents of the IDL will not change."""
247 self._session.force_reconnect()
249 def set_lock(self, lock_name):
250 """If 'lock_name' is not None, configures the IDL to obtain the named
251 lock from the database server and to avoid modifying the database when
252 the lock cannot be acquired (that is, when another client has the same
255 If 'lock_name' is None, drops the locking requirement and releases the
258 assert not self._outstanding_txns
260 if self.lock_name and (not lock_name or lock_name != self.lock_name):
261 # Release previous lock.
262 self.__send_unlock_request()
263 self.lock_name = None
264 self.is_lock_contended = False
266 if lock_name and not self.lock_name:
268 self.lock_name = lock_name
269 self.__send_lock_request()
271 def notify(self, event, row, updates=None):
272 """Hook for implementing create/update/delete notifications
274 :param event: The event that was triggered
275 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
276 :param row: The row as it is after the operation has occured
278 :param updates: For updates, a Row object with just the changed columns
285 for table in self.tables.itervalues():
291 self.change_seqno += 1
293 def __update_has_lock(self, new_has_lock):
294 if new_has_lock and not self.has_lock:
295 if self._monitor_request_id is None:
296 self.change_seqno += 1
298 # We're waiting for a monitor reply, so don't signal that the
299 # database changed. The monitor reply will increment
300 # change_seqno anyhow.
302 self.is_lock_contended = False
303 self.has_lock = new_has_lock
305 def __do_send_lock_request(self, method):
306 self.__update_has_lock(False)
307 self._lock_request_id = None
308 if self._session.is_connected():
309 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
311 self._session.send(msg)
316 def __send_lock_request(self):
317 self._lock_request_id = self.__do_send_lock_request("lock")
319 def __send_unlock_request(self):
320 self.__do_send_lock_request("unlock")
322 def __parse_lock_reply(self, result):
323 self._lock_request_id = None
324 got_lock = type(result) == dict and result.get("locked") is True
325 self.__update_has_lock(got_lock)
327 self.is_lock_contended = True
329 def __parse_lock_notify(self, params, new_has_lock):
330 if (self.lock_name is not None
331 and type(params) in (list, tuple)
333 and params[0] == self.lock_name):
334 self.__update_has_lock(self, new_has_lock)
336 self.is_lock_contended = True
338 def __send_monitor_request(self):
339 monitor_requests = {}
340 for table in self.tables.itervalues():
341 monitor_requests[table.name] = {"columns": table.columns.keys()}
342 msg = ovs.jsonrpc.Message.create_request(
343 "monitor", [self._db.name, None, monitor_requests])
344 self._monitor_request_id = msg.id
345 self._session.send(msg)
347 def __parse_update(self, update):
349 self.__do_parse_update(update)
350 except error.Error, e:
351 vlog.err("%s: error parsing update: %s"
352 % (self._session.get_name(), e))
354 def __do_parse_update(self, table_updates):
355 if type(table_updates) != dict:
356 raise error.Error("<table-updates> is not an object",
359 for table_name, table_update in table_updates.iteritems():
360 table = self.tables.get(table_name)
362 raise error.Error('<table-updates> includes unknown '
363 'table "%s"' % table_name)
365 if type(table_update) != dict:
366 raise error.Error('<table-update> for table "%s" is not '
367 'an object' % table_name, table_update)
369 for uuid_string, row_update in table_update.iteritems():
370 if not ovs.ovsuuid.is_valid_string(uuid_string):
371 raise error.Error('<table-update> for table "%s" '
372 'contains bad UUID "%s" as member '
373 'name' % (table_name, uuid_string),
375 uuid = ovs.ovsuuid.from_string(uuid_string)
377 if type(row_update) != dict:
378 raise error.Error('<table-update> for table "%s" '
379 'contains <row-update> for %s that '
381 % (table_name, uuid_string))
383 parser = ovs.db.parser.Parser(row_update, "row-update")
384 old = parser.get_optional("old", [dict])
385 new = parser.get_optional("new", [dict])
388 if not old and not new:
389 raise error.Error('<row-update> missing "old" and '
390 '"new" members', row_update)
392 if self.__process_update(table, uuid, old, new):
393 self.change_seqno += 1
395 def __process_update(self, table, uuid, old, new):
396 """Returns True if a column changed, False otherwise."""
397 row = table.rows.get(uuid)
404 self.notify(ROW_DELETE, row)
407 vlog.warn("cannot delete missing row %s from table %s"
408 % (uuid, table.name))
412 row = self.__create_row(table, uuid)
416 vlog.warn("cannot add existing row %s to table %s"
417 % (uuid, table.name))
418 if self.__row_update(table, row, new):
420 self.notify(ROW_CREATE, row)
424 row = self.__create_row(table, uuid)
428 vlog.warn("cannot modify missing row %s in table %s"
429 % (uuid, table.name))
430 if self.__row_update(table, row, new):
432 self.notify(op, row, Row.from_json(self, table, uuid, old))
435 def __row_update(self, table, row, row_json):
437 for column_name, datum_json in row_json.iteritems():
438 column = table.columns.get(column_name)
441 vlog.warn("unknown column %s updating table %s"
442 % (column_name, table.name))
446 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
447 except error.Error, e:
449 vlog.warn("error parsing column %s in table %s: %s"
450 % (column_name, table.name, e))
453 if datum != row._data[column_name]:
454 row._data[column_name] = datum
458 # Didn't really change but the OVSDB monitor protocol always
459 # includes every value in a row.
463 def __create_row(self, table, uuid):
465 for column in table.columns.itervalues():
466 data[column.name] = ovs.db.data.Datum.default(column.type)
467 row = table.rows[uuid] = Row(self, table, uuid, data)
471 self._session.force_reconnect()
473 def __txn_abort_all(self):
474 while self._outstanding_txns:
475 txn = self._outstanding_txns.popitem()[1]
476 txn._status = Transaction.TRY_AGAIN
478 def __txn_process_reply(self, msg):
479 txn = self._outstanding_txns.pop(msg.id, None)
481 txn._process_reply(msg)
484 def _uuid_to_row(atom, base):
486 return base.ref_table.rows.get(atom)
491 def _row_to_uuid(value):
492 if type(value) == Row:
499 """A row within an IDL.
501 The client may access the following attributes directly:
503 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
505 - An attribute for each column in the Row's table, named for the column,
506 whose values are as returned by Datum.to_python() for the column's type.
508 If some error occurs (e.g. the database server's idea of the column is
509 different from the IDL's idea), then the attribute values is the
510 "default" value return by Datum.default() for the column's type. (It is
511 important to know this because the default value may violate constraints
512 for the column's type, e.g. the default integer value is 0 even if column
513 contraints require the column's value to be positive.)
515 When a transaction is active, column attributes may also be assigned new
516 values. Committing the transaction will then cause the new value to be
517 stored into the database.
519 *NOTE*: In the current implementation, the value of a column is a *copy*
520 of the value in the database. This means that modifying its value
521 directly will have no useful effect. For example, the following:
522 row.mycolumn["a"] = "b" # don't do this
523 will not change anything in the database, even after commit. To modify
524 the column, instead assign the modified column value back to the column:
529 def __init__(self, idl, table, uuid, data):
530 # All of the explicit references to self.__dict__ below are required
531 # to set real attributes with invoking self.__getattr__().
532 self.__dict__["uuid"] = uuid
534 self.__dict__["_idl"] = idl
535 self.__dict__["_table"] = table
537 # _data is the committed data. It takes the following values:
539 # - A dictionary that maps every column name to a Datum, if the row
540 # exists in the committed form of the database.
542 # - None, if this row is newly inserted within the active transaction
543 # and thus has no committed form.
544 self.__dict__["_data"] = data
546 # _changes describes changes to this row within the active transaction.
547 # It takes the following values:
549 # - {}, the empty dictionary, if no transaction is active or if the
550 # row has yet not been changed within this transaction.
552 # - A dictionary that maps a column name to its new Datum, if an
553 # active transaction changes those columns' values.
555 # - A dictionary that maps every column name to a Datum, if the row
556 # is newly inserted within the active transaction.
558 # - None, if this transaction deletes this row.
559 self.__dict__["_changes"] = {}
561 # A dictionary whose keys are the names of columns that must be
562 # verified as prerequisites when the transaction commits. The values
563 # in the dictionary are all None.
564 self.__dict__["_prereqs"] = {}
566 def __getattr__(self, column_name):
567 assert self._changes is not None
569 datum = self._changes.get(column_name)
571 if self._data is None:
572 raise AttributeError("%s instance has no attribute '%s'" %
573 (self.__class__.__name__, column_name))
574 datum = self._data[column_name]
576 return datum.to_python(_uuid_to_row)
578 def __setattr__(self, column_name, value):
579 assert self._changes is not None
582 column = self._table.columns[column_name]
584 datum = ovs.db.data.Datum.from_python(column.type, value,
586 except error.Error, e:
588 vlog.err("attempting to write bad value to column %s (%s)"
591 self._idl.txn._write(self, column, datum)
594 def from_json(cls, idl, table, uuid, row_json):
596 for column_name, datum_json in row_json.iteritems():
597 column = table.columns.get(column_name)
600 vlog.warn("unknown column %s in table %s"
601 % (column_name, table.name))
604 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
605 except error.Error, e:
607 vlog.warn("error parsing column %s in table %s: %s"
608 % (column_name, table.name, e))
610 data[column_name] = datum
611 return cls(idl, table, uuid, data)
613 def verify(self, column_name):
614 """Causes the original contents of column 'column_name' in this row to
615 be verified as a prerequisite to completing the transaction. That is,
616 if 'column_name' changed in this row (or if this row was deleted)
617 between the time that the IDL originally read its contents and the time
618 that the transaction commits, then the transaction aborts and
619 Transaction.commit() returns Transaction.TRY_AGAIN.
621 The intention is that, to ensure that no transaction commits based on
622 dirty reads, an application should call Row.verify() on each data item
623 read as part of a read-modify-write operation.
625 In some cases Row.verify() reduces to a no-op, because the current
626 value of the column is already known:
628 - If this row is a row created by the current transaction (returned
629 by Transaction.insert()).
631 - If the column has already been modified within the current
634 Because of the latter property, always call Row.verify() *before*
635 modifying the column, for a given read-modify-write.
637 A transaction must be in progress."""
639 assert self._changes is not None
640 if not self._data or column_name in self._changes:
643 self._prereqs[column_name] = None
646 """Deletes this row from its table.
648 A transaction must be in progress."""
650 assert self._changes is not None
651 if self._data is None:
652 del self._idl.txn._txn_rows[self.uuid]
654 self._idl.txn._txn_rows[self.uuid] = self
655 self.__dict__["_changes"] = None
656 del self._table.rows[self.uuid]
658 def increment(self, column_name):
659 """Causes the transaction, when committed, to increment the value of
660 'column_name' within this row by 1. 'column_name' must have an integer
661 type. After the transaction commits successfully, the client may
662 retrieve the final (incremented) value of 'column_name' with
663 Transaction.get_increment_new_value().
665 The client could accomplish something similar by reading and writing
666 and verify()ing columns. However, increment() will never (by itself)
667 cause a transaction to fail because of a verify error.
669 The intended use is for incrementing the "next_cfg" column in
670 the Open_vSwitch table."""
671 self._idl.txn._increment(self, column_name)
674 def _uuid_name_from_uuid(uuid):
675 return "row%s" % str(uuid).replace("-", "_")
678 def _where_uuid_equals(uuid):
679 return [["_uuid", "==", ["uuid", str(uuid)]]]
682 class _InsertedRow(object):
683 def __init__(self, op_index):
684 self.op_index = op_index
688 class Transaction(object):
689 """A transaction may modify the contents of a database by modifying the
690 values of columns, deleting rows, inserting rows, or adding checks that
691 columns in the database have not changed ("verify" operations), through
694 Reading and writing columns and inserting and deleting rows are all
695 straightforward. The reasons to verify columns are less obvious.
696 Verification is the key to maintaining transactional integrity. Because
697 OVSDB handles multiple clients, it can happen that between the time that
698 OVSDB client A reads a column and writes a new value, OVSDB client B has
699 written that column. Client A's write should not ordinarily overwrite
700 client B's, especially if the column in question is a "map" column that
701 contains several more or less independent data items. If client A adds a
702 "verify" operation before it writes the column, then the transaction fails
703 in case client B modifies it first. Client A will then see the new value
704 of the column and compose a new transaction based on the new contents
707 When a transaction is complete, which must be before the next call to
708 Idl.run(), call Transaction.commit() or Transaction.abort().
710 The life-cycle of a transaction looks like this:
712 1. Create the transaction and record the initial sequence number:
714 seqno = idl.change_seqno(idl)
715 txn = Transaction(idl)
717 2. Modify the database with Row and Transaction methods.
719 3. Commit the transaction by calling Transaction.commit(). The first call
720 to this function probably returns Transaction.INCOMPLETE. The client
721 must keep calling again along as this remains true, calling Idl.run() in
722 between to let the IDL do protocol processing. (If the client doesn't
723 have anything else to do in the meantime, it can use
724 Transaction.commit_block() to avoid having to loop itself.)
726 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
727 to change from the saved 'seqno' (it's possible that it's already
728 changed, in which case the client should not wait at all), then start
729 over from step 1. Only a call to Idl.run() will change the return value
730 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
732 # Status values that Transaction.commit() can return.
733 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
734 UNCHANGED = "unchanged" # Transaction didn't include any changes.
735 INCOMPLETE = "incomplete" # Commit in progress, please wait.
736 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
737 SUCCESS = "success" # Commit successful.
738 TRY_AGAIN = "try again" # Commit failed because a "verify" operation
739 # reported an inconsistency, due to a network
740 # problem, or other transient failure. Wait
741 # for a change, then try again.
742 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
743 ERROR = "error" # Commit failed due to a hard error.
746 def status_to_string(status):
747 """Converts one of the status values that Transaction.commit() can
748 return into a human-readable string.
750 (The status values are in fact such strings already, so
751 there's nothing to do.)"""
754 def __init__(self, idl):
755 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
756 A given Idl may only have a single active transaction at a time.
758 A Transaction may modify the contents of a database by assigning new
759 values to columns (attributes of Row), deleting rows (with
760 Row.delete()), or inserting rows (with Transaction.insert()). It may
761 also check that columns in the database have not changed with
764 When a transaction is complete (which must be before the next call to
765 Idl.run()), call Transaction.commit() or Transaction.abort()."""
766 assert idl.txn is None
769 self._request_id = None
773 self._status = Transaction.UNCOMMITTED
778 self._inc_column = None
780 self._inserted_rows = {} # Map from UUID to _InsertedRow
782 def add_comment(self, comment):
783 """Appens 'comment' to the comments that will be passed to the OVSDB
784 server when this transaction is committed. (The comment will be
785 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
786 relatively human-readable form.)"""
787 self._comments.append(comment)
789 def wait(self, poller):
790 """Causes poll_block() to wake up if this transaction has completed
792 if self._status not in (Transaction.UNCOMMITTED,
793 Transaction.INCOMPLETE):
794 poller.immediate_wake()
796 def _substitute_uuids(self, json):
797 if type(json) in (list, tuple):
799 and json[0] == 'uuid'
800 and ovs.ovsuuid.is_valid_string(json[1])):
801 uuid = ovs.ovsuuid.from_string(json[1])
802 row = self._txn_rows.get(uuid, None)
803 if row and row._data is None:
804 return ["named-uuid", _uuid_name_from_uuid(uuid)]
806 return [self._substitute_uuids(elem) for elem in json]
809 def __disassemble(self):
812 for row in self._txn_rows.itervalues():
813 if row._changes is None:
814 row._table.rows[row.uuid] = row
815 elif row._data is None:
816 del row._table.rows[row.uuid]
817 row.__dict__["_changes"] = {}
818 row.__dict__["_prereqs"] = {}
822 """Attempts to commit 'txn'. Returns the status of the commit
823 operation, one of the following constants:
825 Transaction.INCOMPLETE:
827 The transaction is in progress, but not yet complete. The caller
828 should call again later, after calling Idl.run() to let the
829 IDL do OVSDB protocol processing.
831 Transaction.UNCHANGED:
833 The transaction is complete. (It didn't actually change the
834 database, so the IDL didn't send any request to the database
839 The caller previously called Transaction.abort().
843 The transaction was successful. The update made by the
844 transaction (and possibly other changes made by other database
845 clients) should already be visible in the IDL.
847 Transaction.TRY_AGAIN:
849 The transaction failed for some transient reason, e.g. because a
850 "verify" operation reported an inconsistency or due to a network
851 problem. The caller should wait for a change to the database,
852 then compose a new transaction, and commit the new transaction.
854 Use Idl.change_seqno to wait for a change in the database. It is
855 important to use its value *before* the initial call to
856 Transaction.commit() as the baseline for this purpose, because
857 the change that one should wait for can happen after the initial
858 call but before the call that returns Transaction.TRY_AGAIN, and
859 using some other baseline value in that situation could cause an
860 indefinite wait if the database rarely changes.
862 Transaction.NOT_LOCKED:
864 The transaction failed because the IDL has been configured to
865 require a database lock (with Idl.set_lock()) but didn't
866 get it yet or has already lost it.
868 Committing a transaction rolls back all of the changes that it made to
869 the IDL's copy of the database. If the transaction commits
870 successfully, then the database server will send an update and, thus,
871 the IDL will be updated with the committed changes."""
872 # The status can only change if we're the active transaction.
873 # (Otherwise, our status will change only in Idl.run().)
874 if self != self.idl.txn:
877 # If we need a lock but don't have it, give up quickly.
878 if self.idl.lock_name and not self.idl.has_lock():
879 self._status = Transaction.NOT_LOCKED
883 operations = [self.idl._db.name]
885 # Assert that we have the required lock (avoiding a race).
886 if self.idl.lock_name:
887 operations.append({"op": "assert",
888 "lock": self.idl.lock_name})
890 # Add prerequisites and declarations of new rows.
891 for row in self._txn_rows.itervalues():
895 for column_name in row._prereqs:
896 columns.append(column_name)
897 rows[column_name] = row._data[column_name].to_json()
898 operations.append({"op": "wait",
899 "table": row._table.name,
901 "where": _where_uuid_equals(row.uuid),
908 for row in self._txn_rows.itervalues():
909 if row._changes is None:
910 if row._table.is_root:
911 operations.append({"op": "delete",
912 "table": row._table.name,
913 "where": _where_uuid_equals(row.uuid)})
916 # Let ovsdb-server decide whether to really delete it.
919 op = {"table": row._table.name}
920 if row._data is None:
922 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
925 op_index = len(operations) - 1
926 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
929 op["where"] = _where_uuid_equals(row.uuid)
934 for column_name, datum in row._changes.iteritems():
935 if row._data is not None or not datum.is_default():
936 row_json[column_name] = (
937 self._substitute_uuids(datum.to_json()))
939 # If anything really changed, consider it an update.
940 # We can't suppress not-really-changed values earlier
941 # or transactions would become nonatomic (see the big
942 # comment inside Transaction._write()).
943 if (not any_updates and row._data is not None and
944 row._data[column_name] != datum):
947 if row._data is None or row_json:
948 operations.append(op)
951 if self._inc_row and any_updates:
952 self._inc_index = len(operations) - 1
954 operations.append({"op": "mutate",
955 "table": self._inc_row._table.name,
956 "where": self._substitute_uuids(
957 _where_uuid_equals(self._inc_row.uuid)),
958 "mutations": [[self._inc_column, "+=", 1]]})
959 operations.append({"op": "select",
960 "table": self._inc_row._table.name,
961 "where": self._substitute_uuids(
962 _where_uuid_equals(self._inc_row.uuid)),
963 "columns": [self._inc_column]})
967 operations.append({"op": "comment",
968 "comment": "\n".join(self._comments)})
972 operations.append({"op": "abort"})
975 self._status = Transaction.UNCHANGED
977 msg = ovs.jsonrpc.Message.create_request("transact", operations)
978 self._request_id = msg.id
979 if not self.idl._session.send(msg):
980 self.idl._outstanding_txns[self._request_id] = self
981 self._status = Transaction.INCOMPLETE
983 self._status = Transaction.TRY_AGAIN
988 def commit_block(self):
989 """Attempts to commit this transaction, blocking until the commit
990 either succeeds or fails. Returns the final commit status, which may
991 be any Transaction.* value other than Transaction.INCOMPLETE.
993 This function calls Idl.run() on this transaction'ss IDL, so it may
994 cause Idl.change_seqno to change."""
996 status = self.commit()
997 if status != Transaction.INCOMPLETE:
1002 poller = ovs.poller.Poller()
1003 self.idl.wait(poller)
1007 def get_increment_new_value(self):
1008 """Returns the final (incremented) value of the column in this
1009 transaction that was set to be incremented by Row.increment. This
1010 transaction must have committed successfully."""
1011 assert self._status == Transaction.SUCCESS
1012 return self._inc_new_value
1015 """Aborts this transaction. If Transaction.commit() has already been
1016 called then the transaction might get committed anyhow."""
1017 self.__disassemble()
1018 if self._status in (Transaction.UNCOMMITTED,
1019 Transaction.INCOMPLETE):
1020 self._status = Transaction.ABORTED
1022 def get_error(self):
1023 """Returns a string representing this transaction's current status,
1024 suitable for use in log messages."""
1025 if self._status != Transaction.ERROR:
1026 return Transaction.status_to_string(self._status)
1030 return "no error details available"
1032 def __set_error_json(self, json):
1033 if self._error is None:
1034 self._error = ovs.json.to_string(json)
1036 def get_insert_uuid(self, uuid):
1037 """Finds and returns the permanent UUID that the database assigned to a
1038 newly inserted row, given the UUID that Transaction.insert() assigned
1039 locally to that row.
1041 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1042 or if it was assigned by that function and then deleted by Row.delete()
1043 within the same transaction. (Rows that are inserted and then deleted
1044 within a single transaction are never sent to the database server, so
1045 it never assigns them a permanent UUID.)
1047 This transaction must have completed successfully."""
1048 assert self._status in (Transaction.SUCCESS,
1049 Transaction.UNCHANGED)
1050 inserted_row = self._inserted_rows.get(uuid)
1052 return inserted_row.real
1055 def _increment(self, row, column):
1056 assert not self._inc_row
1058 self._inc_column = column
1060 def _write(self, row, column, datum):
1061 assert row._changes is not None
1065 # If this is a write-only column and the datum being written is the
1066 # same as the one already there, just skip the update entirely. This
1067 # is worth optimizing because we have a lot of columns that get
1068 # periodically refreshed into the database but don't actually change
1071 # We don't do this for read/write columns because that would break
1072 # atomicity of transactions--some other client might have written a
1073 # different value in that column since we read it. (But if a whole
1074 # transaction only does writes of existing values, without making any
1075 # real changes, we will drop the whole transaction later in
1076 # ovsdb_idl_txn_commit().)
1077 if not column.alert and row._data.get(column.name) == datum:
1078 new_value = row._changes.get(column.name)
1079 if new_value is None or new_value == datum:
1082 txn._txn_rows[row.uuid] = row
1083 row._changes[column.name] = datum.copy()
1085 def insert(self, table, new_uuid=None):
1086 """Inserts and returns a new row in 'table', which must be one of the
1087 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1089 The new row is assigned a provisional UUID. If 'uuid' is None then one
1090 is randomly generated; otherwise 'uuid' should specify a randomly
1091 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1092 different UUID when 'txn' is committed, but the IDL will replace any
1093 uses of the provisional UUID in the data to be to be committed by the
1094 UUID assigned by ovsdb-server."""
1095 assert self._status == Transaction.UNCOMMITTED
1096 if new_uuid is None:
1097 new_uuid = uuid.uuid4()
1098 row = Row(self.idl, table, new_uuid, None)
1099 table.rows[row.uuid] = row
1100 self._txn_rows[row.uuid] = row
1103 def _process_reply(self, msg):
1104 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1105 self._status = Transaction.ERROR
1106 elif type(msg.result) not in (list, tuple):
1108 vlog.warn('reply to "transact" is not JSON array')
1117 # This isn't an error in itself but indicates that some
1118 # prior operation failed, so make sure that we know about
1121 elif type(op) == dict:
1122 error = op.get("error")
1123 if error is not None:
1124 if error == "timed out":
1126 elif error == "not owner":
1128 elif error == "aborted":
1132 self.__set_error_json(op)
1135 self.__set_error_json(op)
1137 vlog.warn("operation reply is not JSON null or object")
1139 if not soft_errors and not hard_errors and not lock_errors:
1140 if self._inc_row and not self.__process_inc_reply(ops):
1143 for insert in self._inserted_rows.itervalues():
1144 if not self.__process_insert_reply(insert, ops):
1148 self._status = Transaction.ERROR
1150 self._status = Transaction.NOT_LOCKED
1152 self._status = Transaction.TRY_AGAIN
1154 self._status = Transaction.SUCCESS
1157 def __check_json_type(json, types, name):
1160 vlog.warn("%s is missing" % name)
1162 elif type(json) not in types:
1164 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1169 def __process_inc_reply(self, ops):
1170 if self._inc_index + 2 > len(ops):
1172 vlog.warn("reply does not contain enough operations for "
1173 "increment (has %d, needs %d)" %
1174 (len(ops), self._inc_index + 2))
1176 # We know that this is a JSON object because the loop in
1177 # __process_reply() already checked.
1178 mutate = ops[self._inc_index]
1179 count = mutate.get("count")
1180 if not Transaction.__check_json_type(count, (int, long),
1181 '"mutate" reply "count"'):
1185 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1188 select = ops[self._inc_index + 1]
1189 rows = select.get("rows")
1190 if not Transaction.__check_json_type(rows, (list, tuple),
1191 '"select" reply "rows"'):
1195 vlog.warn('"select" reply "rows" has %d elements '
1196 'instead of 1' % len(rows))
1199 if not Transaction.__check_json_type(row, (dict,),
1200 '"select" reply row'):
1202 column = row.get(self._inc_column)
1203 if not Transaction.__check_json_type(column, (int, long),
1204 '"select" reply inc column'):
1206 self._inc_new_value = column
1209 def __process_insert_reply(self, insert, ops):
1210 if insert.op_index >= len(ops):
1212 vlog.warn("reply does not contain enough operations "
1213 "for insert (has %d, needs %d)"
1214 % (len(ops), insert.op_index))
1217 # We know that this is a JSON object because the loop in
1218 # __process_reply() already checked.
1219 reply = ops[insert.op_index]
1220 json_uuid = reply.get("uuid")
1221 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1222 '"insert" reply "uuid"'):
1226 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1229 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1236 class SchemaHelper(object):
1237 """IDL Schema helper.
1239 This class encapsulates the logic required to generate schemas suitable
1240 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1241 they are interested in using register_columns(). When finished, the
1242 get_idl_schema() function may be called.
1244 The location on disk of the schema used may be found in the
1245 'schema_location' variable."""
1247 def __init__(self, location=None, schema_json=None):
1248 """Creates a new Schema object.
1250 'location' file path to ovs schema. None means default location
1251 'schema_json' schema in json preresentation in memory
1254 if location and schema_json:
1255 raise ValueError("both location and schema_json can't be "
1256 "specified. it's ambiguous.")
1257 if schema_json is None:
1258 if location is None:
1259 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1260 schema_json = ovs.json.from_file(location)
1262 self.schema_json = schema_json
1266 def register_columns(self, table, columns):
1267 """Registers interest in the given 'columns' of 'table'. Future calls
1268 to get_idl_schema() will include 'table':column for each column in
1269 'columns'. This function automatically avoids adding duplicate entries
1272 'table' must be a string.
1273 'columns' must be a list of strings.
1276 assert type(table) is str
1277 assert type(columns) is list
1279 columns = set(columns) | self._tables.get(table, set())
1280 self._tables[table] = columns
1282 def register_table(self, table):
1283 """Registers interest in the given all columns of 'table'. Future calls
1284 to get_idl_schema() will include all columns of 'table'.
1286 'table' must be a string
1288 assert type(table) is str
1289 self._tables[table] = set() # empty set means all columns in the table
1291 def register_all(self):
1292 """Registers interest in every column of every table."""
1295 def get_idl_schema(self):
1296 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1297 object based on columns registered using the register_columns()
1300 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1301 self.schema_json = None
1305 for table, columns in self._tables.iteritems():
1306 schema_tables[table] = (
1307 self._keep_table_columns(schema, table, columns))
1309 schema.tables = schema_tables
1312 def _keep_table_columns(self, schema, table_name, columns):
1313 assert table_name in schema.tables
1314 table = schema.tables[table_name]
1317 # empty set means all columns in the table
1321 for column_name in columns:
1322 assert type(column_name) is str
1323 assert column_name in table.columns
1325 new_columns[column_name] = table.columns[column_name]
1327 table.columns = new_columns