-# Copyright (c) 2009, 2010, 2011 Nicira Networks
+# Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
+import functools
import uuid
+import six
+
import ovs.jsonrpc
import ovs.db.parser
import ovs.db.schema
from ovs.db import error
import ovs.ovsuuid
import ovs.poller
+import ovs.vlog
+
+vlog = ovs.vlog.Vlog("idl")
+
+__pychecker__ = 'no-classattr no-objattrs'
+
+ROW_CREATE = "create"
+ROW_UPDATE = "update"
+ROW_DELETE = "delete"
-class Idl:
+OVSDB_UPDATE = 0
+OVSDB_UPDATE2 = 1
+
+
+class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
'rows' map values. Refer to Row for more details.
- 'change_seqno': A number that represents the IDL's state. When the IDL
- is updated (by Idl.run()), its value changes.
+ is updated (by Idl.run()), its value changes. The sequence number can
+ occasionally change even if the database does not. This happens if the
+ connection to the database drops and reconnects, which causes the
+ database contents to be reloaded even if they didn't change. (It could
+ also happen if the database server sends out a "change" that reflects
+ what the IDL already thought was in the database. The database server is
+ not supposed to do that, but bugs could in theory cause it to do so.)
- 'lock_name': The name of the lock configured with Idl.set_lock(), or None
if no lock is configured.
currently being constructed, if there is one, or None otherwise.
"""
+ IDL_S_INITIAL = 0
+ IDL_S_MONITOR_REQUESTED = 1
+ IDL_S_MONITOR_COND_REQUESTED = 2
+
def __init__(self, remote, schema):
"""Creates and returns a connection to the database named 'db_name' on
'remote', which should be in a form acceptable to
purpose of the return value of Idl.run() and Idl.change_seqno. This is
useful for columns that the IDL's client will write but not read.
+ As a convenience to users, 'schema' may also be an instance of the
+ SchemaHelper class.
+
The IDL uses and modifies 'schema' directly."""
+ assert isinstance(schema, SchemaHelper)
+ schema = schema.get_idl_schema()
+
self.tables = schema.tables
+ self.readonly = schema.readonly
self._db = schema
self._session = ovs.jsonrpc.Session.open(remote)
self._monitor_request_id = None
self._last_seqno = None
self.change_seqno = 0
+ self.uuid = uuid.uuid1()
+ self.state = self.IDL_S_INITIAL
# Database locking.
self.lock_name = None # Name of lock we need, None if none.
self.has_lock = False # Has db server said we have the lock?
- self.is_lock_contended = False # Has db server said we can't get lock?
+ self.is_lock_contended = False # Has db server said we can't get lock?
self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
# Transaction support.
self.txn = None
self._outstanding_txns = {}
- for table in schema.tables.itervalues():
- for column in table.columns.itervalues():
+ for table in six.itervalues(schema.tables):
+ for column in six.itervalues(table.columns):
if not hasattr(column, 'alert'):
column.alert = True
table.need_table = False
table.rows = {}
table.idl = self
+ table.condition = []
+ table.cond_changed = False
def close(self):
"""Closes the connection to the database. The IDL will no longer
for changes in self.change_seqno."""
assert not self.txn
initial_change_seqno = self.change_seqno
+
+ self.send_cond_change()
self._session.run()
i = 0
while i < 50:
if msg is None:
break
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
- and msg.method == "update"
- and len(msg.params) == 2
- and msg.params[0] == None):
+ and msg.method == "update2"
+ and len(msg.params) == 2):
+ # Database contents changed.
+ self.__parse_update(msg.params[1], OVSDB_UPDATE2)
+ elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+ and msg.method == "update"
+ and len(msg.params) == 2):
# Database contents changed.
- self.__parse_update(msg.params[1])
+ self.__parse_update(msg.params[1], OVSDB_UPDATE)
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._monitor_request_id is not None
and self._monitor_request_id == msg.id):
self.change_seqno += 1
self._monitor_request_id = None
self.__clear()
- self.__parse_update(msg.result)
- except error.Error, e:
- logging.error("%s: parse error in received schema: %s"
- % (self._session.get_name(), e))
+ if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+ self.__parse_update(msg.result, OVSDB_UPDATE2)
+ else:
+ assert self.state == self.IDL_S_MONITOR_REQUESTED
+ self.__parse_update(msg.result, OVSDB_UPDATE)
+
+ except error.Error as e:
+ vlog.err("%s: parse error in received schema: %s"
+ % (self._session.get_name(), e))
self.__error()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._lock_request_id is not None
elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
# Reply to our echo request. Ignore it.
pass
+ elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+ self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+ self._monitor_request_id == msg.id):
+ if msg.error == "unknown method":
+ self.__send_monitor_request()
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
ovs.jsonrpc.Message.T_REPLY)
and self.__txn_process_reply(msg)):
else:
# This can happen if a transaction is destroyed before we
# receive the reply, so keep the log level low.
- logging.debug("%s: received unexpected %s message"
- % (self._session.get_name(),
- ovs.jsonrpc.Message.type_to_string(msg.type)))
+ vlog.dbg("%s: received unexpected %s message"
+ % (self._session.get_name(),
+ ovs.jsonrpc.Message.type_to_string(msg.type)))
return initial_change_seqno != self.change_seqno
+ def send_cond_change(self):
+ if not self._session.is_connected():
+ return
+
+ for table in six.itervalues(self.tables):
+ if table.cond_changed:
+ self.__send_cond_change(table, table.condition)
+ table.cond_changed = False
+
+ def cond_change(self, table_name, add_cmd, cond):
+ """Change conditions for this IDL session. If session is not already
+ connected, add condtion to table and submit it on send_monitor_request.
+ Otherwise send monitor_cond_change method with the requested
+ changes."""
+
+ table = self.tables.get(table_name)
+ if not table:
+ raise error.Error('Unknown table "%s"' % table_name)
+
+ if add_cmd:
+ table.condition += cond
+ else:
+ for c in cond:
+ table.condition.remove(c)
+
+ table.cond_changed = True
+
def wait(self, poller):
"""Arranges for poller.block() to wake up when self.run() has something
to do or when activity occurs on a transaction on 'self'."""
self.lock_name = lock_name
self.__send_lock_request()
+ def notify(self, event, row, updates=None):
+ """Hook for implementing create/update/delete notifications
+
+ :param event: The event that was triggered
+ :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
+ :param row: The row as it is after the operation has occured
+ :type row: Row
+ :param updates: For updates, row with only updated columns
+ :type updates: Row
+ """
+
+ def __send_cond_change(self, table, cond):
+ monitor_cond_change = {table.name: [{"where": cond}]}
+ old_uuid = str(self.uuid)
+ self.uuid = uuid.uuid1()
+ params = [old_uuid, str(self.uuid), monitor_cond_change]
+ msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
+ self._session.send(msg)
+
def __clear(self):
changed = False
- for table in self.tables.itervalues():
+ for table in six.itervalues(self.tables):
if table.rows:
changed = True
table.rows = {}
def __parse_lock_reply(self, result):
self._lock_request_id = None
- got_lock = type(result) == dict and result.get("locked") is True
+ got_lock = isinstance(result, dict) and result.get("locked") is True
self.__update_has_lock(got_lock)
if not got_lock:
self.is_lock_contended = True
def __parse_lock_notify(self, params, new_has_lock):
if (self.lock_name is not None
- and type(params) in (list, tuple)
+ and isinstance(params, (list, tuple))
and params
and params[0] == self.lock_name):
- self.__update_has_lock(self, new_has_lock)
+ self.__update_has_lock(new_has_lock)
if not new_has_lock:
self.is_lock_contended = True
def __send_monitor_request(self):
+ if self.state == self.IDL_S_INITIAL:
+ self.state = self.IDL_S_MONITOR_COND_REQUESTED
+ method = "monitor_cond"
+ else:
+ self.state = self.IDL_S_MONITOR_REQUESTED
+ method = "monitor"
+
monitor_requests = {}
- for table in self.tables.itervalues():
- monitor_requests[table.name] = {"columns": table.columns.keys()}
+ for table in six.itervalues(self.tables):
+ columns = []
+ for column in six.iterkeys(table.columns):
+ if ((table.name not in self.readonly) or
+ (table.name in self.readonly) and
+ (column not in self.readonly[table.name])):
+ columns.append(column)
+ monitor_requests[table.name] = {"columns": columns}
+ if method == "monitor_cond" and table.cond_changed and \
+ table.condition:
+ monitor_requests[table.name]["where"] = table.condition
+ table.cond_change = False
+
msg = ovs.jsonrpc.Message.create_request(
- "monitor", [self._db.name, None, monitor_requests])
+ method, [self._db.name, str(self.uuid), monitor_requests])
self._monitor_request_id = msg.id
self._session.send(msg)
- def __parse_update(self, update):
+ def __parse_update(self, update, version):
try:
- self.__do_parse_update(update)
- except error.Error, e:
- logging.error("%s: error parsing update: %s"
- % (self._session.get_name(), e))
+ self.__do_parse_update(update, version)
+ except error.Error as e:
+ vlog.err("%s: error parsing update: %s"
+ % (self._session.get_name(), e))
- def __do_parse_update(self, table_updates):
- if type(table_updates) != dict:
+ def __do_parse_update(self, table_updates, version):
+ if not isinstance(table_updates, dict):
raise error.Error("<table-updates> is not an object",
table_updates)
- for table_name, table_update in table_updates.iteritems():
+ for table_name, table_update in six.iteritems(table_updates):
table = self.tables.get(table_name)
if not table:
raise error.Error('<table-updates> includes unknown '
'table "%s"' % table_name)
- if type(table_update) != dict:
+ if not isinstance(table_update, dict):
raise error.Error('<table-update> for table "%s" is not '
'an object' % table_name, table_update)
- for uuid_string, row_update in table_update.iteritems():
+ for uuid_string, row_update in six.iteritems(table_update):
if not ovs.ovsuuid.is_valid_string(uuid_string):
raise error.Error('<table-update> for table "%s" '
'contains bad UUID "%s" as member '
table_update)
uuid = ovs.ovsuuid.from_string(uuid_string)
- if type(row_update) != dict:
+ if not isinstance(row_update, dict):
raise error.Error('<table-update> for table "%s" '
'contains <row-update> for %s that '
'is not an object'
% (table_name, uuid_string))
+ if version == OVSDB_UPDATE2:
+ if self.__process_update2(table, uuid, row_update):
+ self.change_seqno += 1
+ continue
+
parser = ovs.db.parser.Parser(row_update, "row-update")
old = parser.get_optional("old", [dict])
new = parser.get_optional("new", [dict])
if self.__process_update(table, uuid, old, new):
self.change_seqno += 1
+ def __process_update2(self, table, uuid, row_update):
+ row = table.rows.get(uuid)
+ changed = False
+ if "delete" in row_update:
+ if row:
+ del table.rows[uuid]
+ self.notify(ROW_DELETE, row)
+ changed = True
+ else:
+ # XXX rate-limit
+ vlog.warn("cannot delete missing row %s from table"
+ "%s" % (uuid, table.name))
+ elif "insert" in row_update or "initial" in row_update:
+ if row:
+ vlog.warn("cannot add existing row %s from table"
+ " %s" % (uuid, table.name))
+ del table.rows[uuid]
+ row = self.__create_row(table, uuid)
+ if "insert" in row_update:
+ row_update = row_update['insert']
+ else:
+ row_update = row_update['initial']
+ self.__add_default(table, row_update)
+ if self.__row_update(table, row, row_update):
+ changed = True
+ self.notify(ROW_CREATE, row)
+ elif "modify" in row_update:
+ if not row:
+ raise error.Error('Modify non-existing row')
+
+ self.__apply_diff(table, row, row_update['modify'])
+ self.notify(ROW_UPDATE, row,
+ Row.from_json(self, table, uuid, row_update['modify']))
+ changed = True
+ else:
+ raise error.Error('<row-update> unknown operation',
+ row_update)
+ return changed
+
def __process_update(self, table, uuid, old, new):
"""Returns True if a column changed, False otherwise."""
row = table.rows.get(uuid)
if row:
del table.rows[uuid]
changed = True
+ self.notify(ROW_DELETE, row)
else:
# XXX rate-limit
- logging.warning("cannot delete missing row %s from table %s"
- % (uuid, table.name))
+ vlog.warn("cannot delete missing row %s from table %s"
+ % (uuid, table.name))
elif not old:
# Insert row.
if not row:
changed = True
else:
# XXX rate-limit
- logging.warning("cannot add existing row %s to table %s"
- % (uuid, table.name))
+ vlog.warn("cannot add existing row %s to table %s"
+ % (uuid, table.name))
if self.__row_update(table, row, new):
changed = True
+ self.notify(ROW_CREATE, row)
else:
+ op = ROW_UPDATE
if not row:
row = self.__create_row(table, uuid)
changed = True
+ op = ROW_CREATE
# XXX rate-limit
- logging.warning("cannot modify missing row %s in table %s"
- % (uuid, table.name))
+ vlog.warn("cannot modify missing row %s in table %s"
+ % (uuid, table.name))
if self.__row_update(table, row, new):
changed = True
+ self.notify(op, row, Row.from_json(self, table, uuid, old))
return changed
+ def __column_name(self, column):
+ if column.type.key.type == ovs.db.types.UuidType:
+ return ovs.ovsuuid.to_json(column.type.key.type.default)
+ else:
+ return column.type.key.type.default
+
+ def __add_default(self, table, row_update):
+ for column in six.itervalues(table.columns):
+ if column.name not in row_update:
+ if ((table.name not in self.readonly) or
+ (table.name in self.readonly) and
+ (column.name not in self.readonly[table.name])):
+ if column.type.n_min != 0 and not column.type.is_map():
+ row_update[column.name] = self.__column_name(column)
+
+ def __apply_diff(self, table, row, row_diff):
+ for column_name, datum_json in six.iteritems(row_diff):
+ column = table.columns.get(column_name)
+ if not column:
+ # XXX rate-limit
+ vlog.warn("unknown column %s updating table %s"
+ % (column_name, table.name))
+ continue
+
+ try:
+ datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+ except error.Error as e:
+ # XXX rate-limit
+ vlog.warn("error parsing column %s in table %s: %s"
+ % (column_name, table.name, e))
+ continue
+
+ datum = row._data[column_name].diff(datum)
+ if datum != row._data[column_name]:
+ row._data[column_name] = datum
+
def __row_update(self, table, row, row_json):
changed = False
- for column_name, datum_json in row_json.iteritems():
+ for column_name, datum_json in six.iteritems(row_json):
column = table.columns.get(column_name)
if not column:
# XXX rate-limit
- logging.warning("unknown column %s updating table %s"
- % (column_name, table.name))
+ vlog.warn("unknown column %s updating table %s"
+ % (column_name, table.name))
continue
try:
datum = ovs.db.data.Datum.from_json(column.type, datum_json)
- except error.Error, e:
+ except error.Error as e:
# XXX rate-limit
- logging.warning("error parsing column %s in table %s: %s"
- % (column_name, table.name, e))
+ vlog.warn("error parsing column %s in table %s: %s"
+ % (column_name, table.name, e))
continue
if datum != row._data[column_name]:
def __create_row(self, table, uuid):
data = {}
- for column in table.columns.itervalues():
+ for column in six.itervalues(table.columns):
data[column.name] = ovs.db.data.Datum.default(column.type)
row = table.rows[uuid] = Row(self, table, uuid, data)
return row
if txn:
txn._process_reply(msg)
+
def _uuid_to_row(atom, base):
if base.ref_table:
return base.ref_table.rows.get(atom)
else:
return atom
+
def _row_to_uuid(value):
- if type(value) == Row:
+ if isinstance(value, Row):
return value.uuid
else:
return value
+
+@functools.total_ordering
class Row(object):
"""A row within an IDL.
# in the dictionary are all None.
self.__dict__["_prereqs"] = {}
+ def __lt__(self, other):
+ if not isinstance(other, Row):
+ return NotImplemented
+ return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
+
+ def __eq__(self, other):
+ if not isinstance(other, Row):
+ return NotImplemented
+ return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
+
+ def __hash__(self):
+ return int(self.__dict__['uuid'])
+
def __getattr__(self, column_name):
assert self._changes is not None
datum = self._changes.get(column_name)
if datum is None:
- datum = self._data[column_name]
+ if self._data is None:
+ raise AttributeError("%s instance has no attribute '%s'" %
+ (self.__class__.__name__, column_name))
+ if column_name in self._data:
+ datum = self._data[column_name]
+ else:
+ raise AttributeError("%s instance has no attribute '%s'" %
+ (self.__class__.__name__, column_name))
return datum.to_python(_uuid_to_row)
assert self._changes is not None
assert self._idl.txn
+ if ((self._table.name in self._idl.readonly) and
+ (column_name in self._idl.readonly[self._table.name])):
+ vlog.warn("attempting to write to readonly column %s"
+ % column_name)
+ return
+
column = self._table.columns[column_name]
try:
datum = ovs.db.data.Datum.from_python(column.type, value,
_row_to_uuid)
- except error.Error, e:
+ except error.Error as e:
# XXX rate-limit
- logging.error("attempting to write bad value to column %s (%s)"
- % (column_name, e))
+ vlog.err("attempting to write bad value to column %s (%s)"
+ % (column_name, e))
return
self._idl.txn._write(self, column, datum)
+ @classmethod
+ def from_json(cls, idl, table, uuid, row_json):
+ data = {}
+ for column_name, datum_json in six.iteritems(row_json):
+ column = table.columns.get(column_name)
+ if not column:
+ # XXX rate-limit
+ vlog.warn("unknown column %s in table %s"
+ % (column_name, table.name))
+ continue
+ try:
+ datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+ except error.Error as e:
+ # XXX rate-limit
+ vlog.warn("error parsing column %s in table %s: %s"
+ % (column_name, table.name, e))
+ continue
+ data[column_name] = datum
+ return cls(idl, table, uuid, data)
+
def verify(self, column_name):
"""Causes the original contents of column 'column_name' in this row to
be verified as a prerequisite to completing the transaction. That is,
assert self._changes is not None
if self._data is None:
del self._idl.txn._txn_rows[self.uuid]
+ else:
+ self._idl.txn._txn_rows[self.uuid] = self
self.__dict__["_changes"] = None
del self._table.rows[self.uuid]
+ def fetch(self, column_name):
+ self._idl.txn._fetch(self, column_name)
+
+ def increment(self, column_name):
+ """Causes the transaction, when committed, to increment the value of
+ 'column_name' within this row by 1. 'column_name' must have an integer
+ type. After the transaction commits successfully, the client may
+ retrieve the final (incremented) value of 'column_name' with
+ Transaction.get_increment_new_value().
+
+ The client could accomplish something similar by reading and writing
+ and verify()ing columns. However, increment() will never (by itself)
+ cause a transaction to fail because of a verify error.
+
+ The intended use is for incrementing the "next_cfg" column in
+ the Open_vSwitch table."""
+ self._idl.txn._increment(self, column_name)
+
+
def _uuid_name_from_uuid(uuid):
return "row%s" % str(uuid).replace("-", "_")
+
def _where_uuid_equals(uuid):
return [["_uuid", "==", ["uuid", str(uuid)]]]
+
class _InsertedRow(object):
def __init__(self, op_index):
self.op_index = op_index
self.real = None
+
class Transaction(object):
+ """A transaction may modify the contents of a database by modifying the
+ values of columns, deleting rows, inserting rows, or adding checks that
+ columns in the database have not changed ("verify" operations), through
+ Row methods.
+
+ Reading and writing columns and inserting and deleting rows are all
+ straightforward. The reasons to verify columns are less obvious.
+ Verification is the key to maintaining transactional integrity. Because
+ OVSDB handles multiple clients, it can happen that between the time that
+ OVSDB client A reads a column and writes a new value, OVSDB client B has
+ written that column. Client A's write should not ordinarily overwrite
+ client B's, especially if the column in question is a "map" column that
+ contains several more or less independent data items. If client A adds a
+ "verify" operation before it writes the column, then the transaction fails
+ in case client B modifies it first. Client A will then see the new value
+ of the column and compose a new transaction based on the new contents
+ written by client B.
+
+ When a transaction is complete, which must be before the next call to
+ Idl.run(), call Transaction.commit() or Transaction.abort().
+
+ The life-cycle of a transaction looks like this:
+
+ 1. Create the transaction and record the initial sequence number:
+
+ seqno = idl.change_seqno(idl)
+ txn = Transaction(idl)
+
+ 2. Modify the database with Row and Transaction methods.
+
+ 3. Commit the transaction by calling Transaction.commit(). The first call
+ to this function probably returns Transaction.INCOMPLETE. The client
+ must keep calling again along as this remains true, calling Idl.run() in
+ between to let the IDL do protocol processing. (If the client doesn't
+ have anything else to do in the meantime, it can use
+ Transaction.commit_block() to avoid having to loop itself.)
+
+ 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
+ to change from the saved 'seqno' (it's possible that it's already
+ changed, in which case the client should not wait at all), then start
+ over from step 1. Only a call to Idl.run() will change the return value
+ of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
+
# Status values that Transaction.commit() can return.
- UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
- UNCHANGED = "unchanged" # Transaction didn't include any changes.
- INCOMPLETE = "incomplete" # Commit in progress, please wait.
- ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
- SUCCESS = "success" # Commit successful.
- TRY_AGAIN = "try again" # Commit failed because a "verify" operation
- # reported an inconsistency, due to a network
- # problem, or other transient failure.
- NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
- ERROR = "error" # Commit failed due to a hard error.
+
+ # Not yet committed or aborted.
+ UNCOMMITTED = "uncommitted"
+ # Transaction didn't include any changes.
+ UNCHANGED = "unchanged"
+ # Commit in progress, please wait.
+ INCOMPLETE = "incomplete"
+ # ovsdb_idl_txn_abort() called.
+ ABORTED = "aborted"
+ # Commit successful.
+ SUCCESS = "success"
+ # Commit failed because a "verify" operation
+ # reported an inconsistency, due to a network
+ # problem, or other transient failure. Wait
+ # for a change, then try again.
+ TRY_AGAIN = "try again"
+ # Server hasn't given us the lock yet.
+ NOT_LOCKED = "not locked"
+ # Commit failed due to a hard error.
+ ERROR = "error"
@staticmethod
def status_to_string(status):
self._error = None
self._comments = []
- self._inc_table = None
+ self._inc_row = None
self._inc_column = None
- self._inc_where = None
- self._inserted_rows = {} # Map from UUID to _InsertedRow
+ self._fetch_requests = []
+
+ self._inserted_rows = {} # Map from UUID to _InsertedRow
def add_comment(self, comment):
- """Appens 'comment' to the comments that will be passed to the OVSDB
+ """Appends 'comment' to the comments that will be passed to the OVSDB
server when this transaction is committed. (The comment will be
committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
relatively human-readable form.)"""
self._comments.append(comment)
- def increment(self, table, column, where):
- assert not self._inc_table
- self._inc_table = table
- self._inc_column = column
- self._inc_where = where
-
def wait(self, poller):
+ """Causes poll_block() to wake up if this transaction has completed
+ committing."""
if self._status not in (Transaction.UNCOMMITTED,
Transaction.INCOMPLETE):
poller.immediate_wake()
def _substitute_uuids(self, json):
- if type(json) in (list, tuple):
+ if isinstance(json, (list, tuple)):
if (len(json) == 2
- and json[0] == 'uuid'
- and ovs.ovsuuid.is_valid_string(json[1])):
+ and json[0] == 'uuid'
+ and ovs.ovsuuid.is_valid_string(json[1])):
uuid = ovs.ovsuuid.from_string(json[1])
row = self._txn_rows.get(uuid, None)
if row and row._data is None:
return ["named-uuid", _uuid_name_from_uuid(uuid)]
+ else:
+ return [self._substitute_uuids(elem) for elem in json]
return json
def __disassemble(self):
self.idl.txn = None
- for row in self._txn_rows.itervalues():
+ for row in six.itervalues(self._txn_rows):
if row._changes is None:
row._table.rows[row.uuid] = row
elif row._data is None:
self._txn_rows = {}
def commit(self):
- """Attempts to commit this transaction and returns the status of the
- commit operation, one of the constants declared as class attributes.
- If the return value is Transaction.INCOMPLETE, then the transaction is
- not yet complete and the caller should try calling again later, after
- calling Idl.run() to run the Idl.
+ """Attempts to commit 'txn'. Returns the status of the commit
+ operation, one of the following constants:
+
+ Transaction.INCOMPLETE:
+
+ The transaction is in progress, but not yet complete. The caller
+ should call again later, after calling Idl.run() to let the
+ IDL do OVSDB protocol processing.
+
+ Transaction.UNCHANGED:
+
+ The transaction is complete. (It didn't actually change the
+ database, so the IDL didn't send any request to the database
+ server.)
+
+ Transaction.ABORTED:
+
+ The caller previously called Transaction.abort().
+
+ Transaction.SUCCESS:
+
+ The transaction was successful. The update made by the
+ transaction (and possibly other changes made by other database
+ clients) should already be visible in the IDL.
+
+ Transaction.TRY_AGAIN:
+
+ The transaction failed for some transient reason, e.g. because a
+ "verify" operation reported an inconsistency or due to a network
+ problem. The caller should wait for a change to the database,
+ then compose a new transaction, and commit the new transaction.
+
+ Use Idl.change_seqno to wait for a change in the database. It is
+ important to use its value *before* the initial call to
+ Transaction.commit() as the baseline for this purpose, because
+ the change that one should wait for can happen after the initial
+ call but before the call that returns Transaction.TRY_AGAIN, and
+ using some other baseline value in that situation could cause an
+ indefinite wait if the database rarely changes.
+
+ Transaction.NOT_LOCKED:
+
+ The transaction failed because the IDL has been configured to
+ require a database lock (with Idl.set_lock()) but didn't
+ get it yet or has already lost it.
Committing a transaction rolls back all of the changes that it made to
- the Idl's copy of the database. If the transaction commits
+ the IDL's copy of the database. If the transaction commits
successfully, then the database server will send an update and, thus,
- the Idl will be updated with the committed changes."""
+ the IDL will be updated with the committed changes."""
# The status can only change if we're the active transaction.
# (Otherwise, our status will change only in Idl.run().)
if self != self.idl.txn:
return self._status
# If we need a lock but don't have it, give up quickly.
- if self.idl.lock_name and not self.idl.has_lock():
+ if self.idl.lock_name and not self.idl.has_lock:
self._status = Transaction.NOT_LOCKED
self.__disassemble()
return self._status
"lock": self.idl.lock_name})
# Add prerequisites and declarations of new rows.
- for row in self._txn_rows.itervalues():
+ for row in six.itervalues(self._txn_rows):
if row._prereqs:
rows = {}
columns = []
# Add updates.
any_updates = False
- for row in self._txn_rows.itervalues():
+ for row in six.itervalues(self._txn_rows):
if row._changes is None:
if row._table.is_root:
operations.append({"op": "delete",
row_json = {}
op["row"] = row_json
- for column_name, datum in row._changes.iteritems():
+ for column_name, datum in six.iteritems(row._changes):
if row._data is not None or not datum.is_default():
- row_json[column_name] = self._substitute_uuids(datum.to_json())
+ row_json[column_name] = (
+ self._substitute_uuids(datum.to_json()))
# If anything really changed, consider it an update.
# We can't suppress not-really-changed values earlier
# or transactions would become nonatomic (see the big
# comment inside Transaction._write()).
if (not any_updates and row._data is not None and
- row._data[column_name] != datum):
+ row._data[column_name] != datum):
any_updates = True
if row._data is None or row_json:
operations.append(op)
+ if self._fetch_requests:
+ for fetch in self._fetch_requests:
+ fetch["index"] = len(operations) - 1
+ operations.append({"op": "select",
+ "table": fetch["row"]._table.name,
+ "where": self._substitute_uuids(
+ _where_uuid_equals(fetch["row"].uuid)),
+ "columns": [fetch["column_name"]]})
+ any_updates = True
+
# Add increment.
- if self._inc_table and any_updates:
+ if self._inc_row and any_updates:
self._inc_index = len(operations) - 1
operations.append({"op": "mutate",
- "table": self._inc_table,
- "where": self._substitute_uuids(self._inc_where),
+ "table": self._inc_row._table.name,
+ "where": self._substitute_uuids(
+ _where_uuid_equals(self._inc_row.uuid)),
"mutations": [[self._inc_column, "+=", 1]]})
operations.append({"op": "select",
- "table": self._inc_table,
- "where": self._substitute_uuids(self._inc_where),
+ "table": self._inc_row._table.name,
+ "where": self._substitute_uuids(
+ _where_uuid_equals(self._inc_row.uuid)),
"columns": [self._inc_column]})
# Add comment.
return self._status
def commit_block(self):
+ """Attempts to commit this transaction, blocking until the commit
+ either succeeds or fails. Returns the final commit status, which may
+ be any Transaction.* value other than Transaction.INCOMPLETE.
+
+ This function calls Idl.run() on this transaction'ss IDL, so it may
+ cause Idl.change_seqno to change."""
while True:
status = self.commit()
if status != Transaction.INCOMPLETE:
poller.block()
def get_increment_new_value(self):
+ """Returns the final (incremented) value of the column in this
+ transaction that was set to be incremented by Row.increment. This
+ transaction must have committed successfully."""
assert self._status == Transaction.SUCCESS
return self._inc_new_value
return inserted_row.real
return None
+ def _increment(self, row, column):
+ assert not self._inc_row
+ self._inc_row = row
+ self._inc_column = column
+
+ def _fetch(self, row, column_name):
+ self._fetch_requests.append({"row": row, "column_name": column_name})
+
def _write(self, row, column, datum):
assert row._changes is not None
# transaction only does writes of existing values, without making any
# real changes, we will drop the whole transaction later in
# ovsdb_idl_txn_commit().)
- if not column.alert and row._data.get(column.name) == datum:
+ if (not column.alert and row._data and
+ row._data.get(column.name) == datum):
new_value = row._changes.get(column.name)
if new_value is None or new_value == datum:
return
def _process_reply(self, msg):
if msg.type == ovs.jsonrpc.Message.T_ERROR:
self._status = Transaction.ERROR
- elif type(msg.result) not in (list, tuple):
+ elif not isinstance(msg.result, (list, tuple)):
# XXX rate-limit
- logging.warning('reply to "transact" is not JSON array')
+ vlog.warn('reply to "transact" is not JSON array')
else:
hard_errors = False
soft_errors = False
# prior operation failed, so make sure that we know about
# it.
soft_errors = True
- elif type(op) == dict:
+ elif isinstance(op, dict):
error = op.get("error")
if error is not None:
if error == "timed out":
hard_errors = True
self.__set_error_json(op)
# XXX rate-limit
- logging.warning("operation reply is not JSON null or "
- "object")
+ vlog.warn("operation reply is not JSON null or object")
if not soft_errors and not hard_errors and not lock_errors:
- if self._inc_table and not self.__process_inc_reply(ops):
+ if self._inc_row and not self.__process_inc_reply(ops):
hard_errors = True
+ if self._fetch_requests:
+ if self.__process_fetch_reply(ops):
+ self.idl.change_seqno += 1
+ else:
+ hard_errors = True
- for insert in self._inserted_rows.itervalues():
+ for insert in six.itervalues(self._inserted_rows):
if not self.__process_insert_reply(insert, ops):
hard_errors = True
def __check_json_type(json, types, name):
if not json:
# XXX rate-limit
- logging.warning("%s is missing" % name)
+ vlog.warn("%s is missing" % name)
return False
- elif type(json) not in types:
+ elif not isinstance(json, tuple(types)):
# XXX rate-limit
- logging.warning("%s has unexpected type %s" % (name, type(json)))
+ vlog.warn("%s has unexpected type %s" % (name, type(json)))
return False
else:
return True
+ def __process_fetch_reply(self, ops):
+ update = False
+ for fetch_request in self._fetch_requests:
+ row = fetch_request["row"]
+ column_name = fetch_request["column_name"]
+ index = fetch_request["index"]
+ table = row._table
+
+ select = ops[index]
+ fetched_rows = select.get("rows")
+ if not Transaction.__check_json_type(fetched_rows, (list, tuple),
+ '"select" reply "rows"'):
+ return False
+ if len(fetched_rows) != 1:
+ # XXX rate-limit
+ vlog.warn('"select" reply "rows" has %d elements '
+ 'instead of 1' % len(fetched_rows))
+ continue
+ fetched_row = fetched_rows[0]
+ if not Transaction.__check_json_type(fetched_row, (dict,),
+ '"select" reply row'):
+ continue
+
+ column = table.columns.get(column_name)
+ datum_json = fetched_row.get(column_name)
+ datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+
+ row._data[column_name] = datum
+ update = True
+
+ return update
+
def __process_inc_reply(self, ops):
if self._inc_index + 2 > len(ops):
# XXX rate-limit
- logging.warning("reply does not contain enough operations for "
- "increment (has %d, needs %d)" %
- (len(ops), self._inc_index + 2))
+ vlog.warn("reply does not contain enough operations for "
+ "increment (has %d, needs %d)" %
+ (len(ops), self._inc_index + 2))
# We know that this is a JSON object because the loop in
# __process_reply() already checked.
mutate = ops[self._inc_index]
count = mutate.get("count")
- if not Transaction.__check_json_type(count, (int, long),
+ if not Transaction.__check_json_type(count, six.integer_types,
'"mutate" reply "count"'):
return False
if count != 1:
# XXX rate-limit
- logging.warning('"mutate" reply "count" is %d instead of 1'
- % count)
+ vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
return False
select = ops[self._inc_index + 1]
return False
if len(rows) != 1:
# XXX rate-limit
- logging.warning('"select" reply "rows" has %d elements '
- 'instead of 1' % len(rows))
+ vlog.warn('"select" reply "rows" has %d elements '
+ 'instead of 1' % len(rows))
return False
row = rows[0]
if not Transaction.__check_json_type(row, (dict,),
'"select" reply row'):
return False
column = row.get(self._inc_column)
- if not Transaction.__check_json_type(column, (int, long),
+ if not Transaction.__check_json_type(column, six.integer_types,
'"select" reply inc column'):
return False
self._inc_new_value = column
def __process_insert_reply(self, insert, ops):
if insert.op_index >= len(ops):
# XXX rate-limit
- logging.warning("reply does not contain enough operations "
- "for insert (has %d, needs %d)"
- % (len(ops), insert.op_index))
+ vlog.warn("reply does not contain enough operations "
+ "for insert (has %d, needs %d)"
+ % (len(ops), insert.op_index))
return False
# We know that this is a JSON object because the loop in
uuid_ = ovs.ovsuuid.from_json(json_uuid)
except error.Error:
# XXX rate-limit
- logging.warning('"insert" reply "uuid" is not a JSON UUID')
+ vlog.warn('"insert" reply "uuid" is not a JSON UUID')
return False
insert.real = uuid_
return True
+
+
+class SchemaHelper(object):
+ """IDL Schema helper.
+
+ This class encapsulates the logic required to generate schemas suitable
+ for creating 'ovs.db.idl.Idl' objects. Clients should register columns
+ they are interested in using register_columns(). When finished, the
+ get_idl_schema() function may be called.
+
+ The location on disk of the schema used may be found in the
+ 'schema_location' variable."""
+
+ def __init__(self, location=None, schema_json=None):
+ """Creates a new Schema object.
+
+ 'location' file path to ovs schema. None means default location
+ 'schema_json' schema in json preresentation in memory
+ """
+
+ if location and schema_json:
+ raise ValueError("both location and schema_json can't be "
+ "specified. it's ambiguous.")
+ if schema_json is None:
+ if location is None:
+ location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
+ schema_json = ovs.json.from_file(location)
+
+ self.schema_json = schema_json
+ self._tables = {}
+ self._readonly = {}
+ self._all = False
+
+ def register_columns(self, table, columns, readonly=[]):
+ """Registers interest in the given 'columns' of 'table'. Future calls
+ to get_idl_schema() will include 'table':column for each column in
+ 'columns'. This function automatically avoids adding duplicate entries
+ to the schema.
+ A subset of 'columns' can be specified as 'readonly'. The readonly
+ columns are not replicated but can be fetched on-demand by the user
+ with Row.fetch().
+
+ 'table' must be a string.
+ 'columns' must be a list of strings.
+ 'readonly' must be a list of strings.
+ """
+
+ assert isinstance(table, six.string_types)
+ assert isinstance(columns, list)
+
+ columns = set(columns) | self._tables.get(table, set())
+ self._tables[table] = columns
+ self._readonly[table] = readonly
+
+ def register_table(self, table):
+ """Registers interest in the given all columns of 'table'. Future calls
+ to get_idl_schema() will include all columns of 'table'.
+
+ 'table' must be a string
+ """
+ assert isinstance(table, six.string_types)
+ self._tables[table] = set() # empty set means all columns in the table
+
+ def register_all(self):
+ """Registers interest in every column of every table."""
+ self._all = True
+
+ def get_idl_schema(self):
+ """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
+ object based on columns registered using the register_columns()
+ function."""
+
+ schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
+ self.schema_json = None
+
+ if not self._all:
+ schema_tables = {}
+ for table, columns in six.iteritems(self._tables):
+ schema_tables[table] = (
+ self._keep_table_columns(schema, table, columns))
+
+ schema.tables = schema_tables
+ schema.readonly = self._readonly
+ return schema
+
+ def _keep_table_columns(self, schema, table_name, columns):
+ assert table_name in schema.tables
+ table = schema.tables[table_name]
+
+ if not columns:
+ # empty set means all columns in the table
+ return table
+
+ new_columns = {}
+ for column_name in columns:
+ assert isinstance(column_name, six.string_types)
+ assert column_name in table.columns
+
+ new_columns[column_name] = table.columns[column_name]
+
+ table.columns = new_columns
+ return table