# See the License for the specific language governing permissions and
# limitations under the License.
+import functools
import uuid
import six
ROW_UPDATE = "update"
ROW_DELETE = "delete"
+OVSDB_UPDATE = 0
+OVSDB_UPDATE2 = 1
+
class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
currently being constructed, if there is one, or None otherwise.
"""
+ IDL_S_INITIAL = 0
+ IDL_S_MONITOR_REQUESTED = 1
+ IDL_S_MONITOR_COND_REQUESTED = 2
+
def __init__(self, remote, schema):
"""Creates and returns a connection to the database named 'db_name' on
'remote', which should be in a form acceptable to
self._monitor_request_id = None
self._last_seqno = None
self.change_seqno = 0
+ self.uuid = uuid.uuid1()
+ self.state = self.IDL_S_INITIAL
# Database locking.
self.lock_name = None # Name of lock we need, None if none.
table.need_table = False
table.rows = {}
table.idl = self
+ table.condition = []
+ table.cond_changed = False
def close(self):
"""Closes the connection to the database. The IDL will no longer
for changes in self.change_seqno."""
assert not self.txn
initial_change_seqno = self.change_seqno
+
+ self.send_cond_change()
self._session.run()
i = 0
while i < 50:
if msg is None:
break
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
- and msg.method == "update"
- and len(msg.params) == 2
- and msg.params[0] is None):
+ and msg.method == "update2"
+ and len(msg.params) == 2):
+ # Database contents changed.
+ self.__parse_update(msg.params[1], OVSDB_UPDATE2)
+ elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+ and msg.method == "update"
+ and len(msg.params) == 2):
# Database contents changed.
- self.__parse_update(msg.params[1])
+ self.__parse_update(msg.params[1], OVSDB_UPDATE)
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._monitor_request_id is not None
and self._monitor_request_id == msg.id):
self.change_seqno += 1
self._monitor_request_id = None
self.__clear()
- self.__parse_update(msg.result)
+ if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+ self.__parse_update(msg.result, OVSDB_UPDATE2)
+ else:
+ assert self.state == self.IDL_S_MONITOR_REQUESTED
+ self.__parse_update(msg.result, OVSDB_UPDATE)
+
except error.Error as e:
vlog.err("%s: parse error in received schema: %s"
- % (self._session.get_name(), e))
+ % (self._session.get_name(), e))
self.__error()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._lock_request_id is not None
elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
# Reply to our echo request. Ignore it.
pass
+ elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+ self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+ self._monitor_request_id == msg.id):
+ if msg.error == "unknown method":
+ self.__send_monitor_request()
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
ovs.jsonrpc.Message.T_REPLY)
and self.__txn_process_reply(msg)):
return initial_change_seqno != self.change_seqno
+ def send_cond_change(self):
+ if not self._session.is_connected():
+ return
+
+ for table in six.itervalues(self.tables):
+ if table.cond_changed:
+ self.__send_cond_change(table, table.condition)
+ table.cond_changed = False
+
+ def cond_change(self, table_name, add_cmd, cond):
+ """Change conditions for this IDL session. If session is not already
+ connected, add condtion to table and submit it on send_monitor_request.
+ Otherwise send monitor_cond_change method with the requested
+ changes."""
+
+ table = self.tables.get(table_name)
+ if not table:
+ raise error.Error('Unknown table "%s"' % table_name)
+
+ if add_cmd:
+ table.condition += cond
+ else:
+ for c in cond:
+ table.condition.remove(c)
+
+ table.cond_changed = True
+
def wait(self, poller):
"""Arranges for poller.block() to wake up when self.run() has something
to do or when activity occurs on a transaction on 'self'."""
:type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
:param row: The row as it is after the operation has occured
:type row: Row
- :param updates: For updates, a Row object with just the changed columns
+ :param updates: For updates, row with only updated columns
:type updates: Row
"""
+ def __send_cond_change(self, table, cond):
+ monitor_cond_change = {table.name: [{"where": cond}]}
+ old_uuid = str(self.uuid)
+ self.uuid = uuid.uuid1()
+ params = [old_uuid, str(self.uuid), monitor_cond_change]
+ msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
+ self._session.send(msg)
+
def __clear(self):
changed = False
def __parse_lock_reply(self, result):
self._lock_request_id = None
- got_lock = type(result) == dict and result.get("locked") is True
+ got_lock = isinstance(result, dict) and result.get("locked") is True
self.__update_has_lock(got_lock)
if not got_lock:
self.is_lock_contended = True
def __parse_lock_notify(self, params, new_has_lock):
if (self.lock_name is not None
- and type(params) in (list, tuple)
+ and isinstance(params, (list, tuple))
and params
and params[0] == self.lock_name):
self.__update_has_lock(new_has_lock)
self.is_lock_contended = True
def __send_monitor_request(self):
+ if self.state == self.IDL_S_INITIAL:
+ self.state = self.IDL_S_MONITOR_COND_REQUESTED
+ method = "monitor_cond"
+ else:
+ self.state = self.IDL_S_MONITOR_REQUESTED
+ method = "monitor"
+
monitor_requests = {}
for table in six.itervalues(self.tables):
columns = []
for column in six.iterkeys(table.columns):
if ((table.name not in self.readonly) or
- (table.name in self.readonly) and
- (column not in self.readonly[table.name])):
+ (table.name in self.readonly) and
+ (column not in self.readonly[table.name])):
columns.append(column)
monitor_requests[table.name] = {"columns": columns}
+ if method == "monitor_cond" and table.cond_changed and \
+ table.condition:
+ monitor_requests[table.name]["where"] = table.condition
+ table.cond_change = False
+
msg = ovs.jsonrpc.Message.create_request(
- "monitor", [self._db.name, None, monitor_requests])
+ method, [self._db.name, str(self.uuid), monitor_requests])
self._monitor_request_id = msg.id
self._session.send(msg)
- def __parse_update(self, update):
+ def __parse_update(self, update, version):
try:
- self.__do_parse_update(update)
+ self.__do_parse_update(update, version)
except error.Error as e:
vlog.err("%s: error parsing update: %s"
% (self._session.get_name(), e))
- def __do_parse_update(self, table_updates):
- if type(table_updates) != dict:
+ def __do_parse_update(self, table_updates, version):
+ if not isinstance(table_updates, dict):
raise error.Error("<table-updates> is not an object",
table_updates)
raise error.Error('<table-updates> includes unknown '
'table "%s"' % table_name)
- if type(table_update) != dict:
+ if not isinstance(table_update, dict):
raise error.Error('<table-update> for table "%s" is not '
'an object' % table_name, table_update)
table_update)
uuid = ovs.ovsuuid.from_string(uuid_string)
- if type(row_update) != dict:
+ if not isinstance(row_update, dict):
raise error.Error('<table-update> for table "%s" '
'contains <row-update> for %s that '
'is not an object'
% (table_name, uuid_string))
+ if version == OVSDB_UPDATE2:
+ if self.__process_update2(table, uuid, row_update):
+ self.change_seqno += 1
+ continue
+
parser = ovs.db.parser.Parser(row_update, "row-update")
old = parser.get_optional("old", [dict])
new = parser.get_optional("new", [dict])
if self.__process_update(table, uuid, old, new):
self.change_seqno += 1
+ def __process_update2(self, table, uuid, row_update):
+ row = table.rows.get(uuid)
+ changed = False
+ if "delete" in row_update:
+ if row:
+ del table.rows[uuid]
+ self.notify(ROW_DELETE, row)
+ changed = True
+ else:
+ # XXX rate-limit
+ vlog.warn("cannot delete missing row %s from table"
+ "%s" % (uuid, table.name))
+ elif "insert" in row_update or "initial" in row_update:
+ if row:
+ vlog.warn("cannot add existing row %s from table"
+ " %s" % (uuid, table.name))
+ del table.rows[uuid]
+ row = self.__create_row(table, uuid)
+ if "insert" in row_update:
+ row_update = row_update['insert']
+ else:
+ row_update = row_update['initial']
+ self.__add_default(table, row_update)
+ if self.__row_update(table, row, row_update):
+ changed = True
+ self.notify(ROW_CREATE, row)
+ elif "modify" in row_update:
+ if not row:
+ raise error.Error('Modify non-existing row')
+
+ self.__apply_diff(table, row, row_update['modify'])
+ self.notify(ROW_UPDATE, row,
+ Row.from_json(self, table, uuid, row_update['modify']))
+ changed = True
+ else:
+ raise error.Error('<row-update> unknown operation',
+ row_update)
+ return changed
+
def __process_update(self, table, uuid, old, new):
"""Returns True if a column changed, False otherwise."""
row = table.rows.get(uuid)
self.notify(op, row, Row.from_json(self, table, uuid, old))
return changed
+ def __column_name(self, column):
+ if column.type.key.type == ovs.db.types.UuidType:
+ return ovs.ovsuuid.to_json(column.type.key.type.default)
+ else:
+ return column.type.key.type.default
+
+ def __add_default(self, table, row_update):
+ for column in six.itervalues(table.columns):
+ if column.name not in row_update:
+ if ((table.name not in self.readonly) or
+ (table.name in self.readonly) and
+ (column.name not in self.readonly[table.name])):
+ if column.type.n_min != 0 and not column.type.is_map():
+ row_update[column.name] = self.__column_name(column)
+
+ def __apply_diff(self, table, row, row_diff):
+ for column_name, datum_json in six.iteritems(row_diff):
+ column = table.columns.get(column_name)
+ if not column:
+ # XXX rate-limit
+ vlog.warn("unknown column %s updating table %s"
+ % (column_name, table.name))
+ continue
+
+ try:
+ datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+ except error.Error as e:
+ # XXX rate-limit
+ vlog.warn("error parsing column %s in table %s: %s"
+ % (column_name, table.name, e))
+ continue
+
+ datum = row._data[column_name].diff(datum)
+ if datum != row._data[column_name]:
+ row._data[column_name] = datum
+
def __row_update(self, table, row, row_json):
changed = False
for column_name, datum_json in six.iteritems(row_json):
def _row_to_uuid(value):
- if type(value) == Row:
+ if isinstance(value, Row):
return value.uuid
else:
return value
+@functools.total_ordering
class Row(object):
"""A row within an IDL.
# in the dictionary are all None.
self.__dict__["_prereqs"] = {}
+ def __lt__(self, other):
+ if not isinstance(other, Row):
+ return NotImplemented
+ return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
+
+ def __eq__(self, other):
+ if not isinstance(other, Row):
+ return NotImplemented
+ return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
+
+ def __hash__(self):
+ return int(self.__dict__['uuid'])
+
def __getattr__(self, column_name):
assert self._changes is not None
assert self._idl.txn
if ((self._table.name in self._idl.readonly) and
- (column_name in self._idl.readonly[self._table.name])):
+ (column_name in self._idl.readonly[self._table.name])):
vlog.warn("attempting to write to readonly column %s"
% column_name)
return
poller.immediate_wake()
def _substitute_uuids(self, json):
- if type(json) in (list, tuple):
+ if isinstance(json, (list, tuple)):
if (len(json) == 2
- and json[0] == 'uuid'
- and ovs.ovsuuid.is_valid_string(json[1])):
+ and json[0] == 'uuid'
+ and ovs.ovsuuid.is_valid_string(json[1])):
uuid = ovs.ovsuuid.from_string(json[1])
row = self._txn_rows.get(uuid, None)
if row and row._data is None:
for column_name, datum in six.iteritems(row._changes):
if row._data is not None or not datum.is_default():
row_json[column_name] = (
- self._substitute_uuids(datum.to_json()))
+ self._substitute_uuids(datum.to_json()))
# If anything really changed, consider it an update.
# We can't suppress not-really-changed values earlier
# or transactions would become nonatomic (see the big
# comment inside Transaction._write()).
if (not any_updates and row._data is not None and
- row._data[column_name] != datum):
+ row._data[column_name] != datum):
any_updates = True
if row._data is None or row_json:
def _process_reply(self, msg):
if msg.type == ovs.jsonrpc.Message.T_ERROR:
self._status = Transaction.ERROR
- elif type(msg.result) not in (list, tuple):
+ elif not isinstance(msg.result, (list, tuple)):
# XXX rate-limit
vlog.warn('reply to "transact" is not JSON array')
else:
# prior operation failed, so make sure that we know about
# it.
soft_errors = True
- elif type(op) == dict:
+ elif isinstance(op, dict):
error = op.get("error")
if error is not None:
if error == "timed out":
# XXX rate-limit
vlog.warn("%s is missing" % name)
return False
- elif type(json) not in types:
+ elif not isinstance(json, tuple(types)):
# XXX rate-limit
vlog.warn("%s has unexpected type %s" % (name, type(json)))
return False
'readonly' must be a list of strings.
"""
- assert type(table) is str
- assert type(columns) is list
+ assert isinstance(table, six.string_types)
+ assert isinstance(columns, list)
columns = set(columns) | self._tables.get(table, set())
self._tables[table] = columns
'table' must be a string
"""
- assert type(table) is str
+ assert isinstance(table, six.string_types)
self._tables[table] = set() # empty set means all columns in the table
def register_all(self):
new_columns = {}
for column_name in columns:
- assert type(column_name) is str
+ assert isinstance(column_name, six.string_types)
assert column_name in table.columns
new_columns[column_name] = table.columns[column_name]