From 897c8064f55c180a7cbaec75cc2ba7fb48031d17 Mon Sep 17 00:00:00 2001 From: Liran Schour Date: Mon, 18 Jul 2016 11:45:57 +0300 Subject: [PATCH] python: move Python idl to work with monitor_cond Python idl works now with "monitor_cond" method. Add test for backward compatibility with old "monitor" method. Signed-off-by: Liran Schour Signed-off-by: Ben Pfaff --- python/ovs/db/data.py | 16 +++- python/ovs/db/idl.py | 172 +++++++++++++++++++++++++++++++++++++----- tests/ovsdb-idl.at | 97 ++++++++++++++++++++++++ 3 files changed, 265 insertions(+), 20 deletions(-) diff --git a/python/ovs/db/data.py b/python/ovs/db/data.py index 42e78fbcb..747acd5d6 100644 --- a/python/ovs/db/data.py +++ b/python/ovs/db/data.py @@ -162,7 +162,7 @@ class Atom(object): % (self.to_string(), base.enum.to_string())) elif base.type in [ovs.db.types.IntegerType, ovs.db.types.RealType]: if ((base.min is None or self.value >= base.min) and - (base.max is None or self.value <= base.max)): + (base.max is None or self.value <= base.max)): pass elif base.min is not None and base.max is not None: raise ConstraintViolation( @@ -171,7 +171,7 @@ class Atom(object): elif base.min is not None: raise ConstraintViolation( "%s is less than minimum allowed value %.15g" - % (self.to_string(), base.min)) + % (self.to_string(), base.min)) else: raise ConstraintViolation( "%s is greater than maximum allowed value %.15g" @@ -415,6 +415,18 @@ class Datum(object): s.append(tail) return ''.join(s) + def diff(self, datum): + if self.type.n_max > 1 or len(self.values) == 0: + for k, v in six.iteritems(datum.values): + if k in self.values and v == self.values[k]: + del self.values[k] + else: + self.values[k] = v + else: + return datum + + return self + def as_list(self): if self.type.is_map(): return [[k.value, v.value] for k, v in six.iteritems(self.values)] diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py index e69d35ea7..2f3645625 100644 --- a/python/ovs/db/idl.py +++ b/python/ovs/db/idl.py @@ -33,6 +33,9 @@ ROW_CREATE = "create" ROW_UPDATE = "update" ROW_DELETE = "delete" +OVSDB_UPDATE = 0 +OVSDB_UPDATE2 = 1 + class Idl(object): """Open vSwitch Database Interface Definition Language (OVSDB IDL). @@ -86,6 +89,10 @@ class Idl(object): currently being constructed, if there is one, or None otherwise. """ + IDL_S_INITIAL = 0 + IDL_S_MONITOR_REQUESTED = 1 + IDL_S_MONITOR_COND_REQUESTED = 2 + def __init__(self, remote, schema): """Creates and returns a connection to the database named 'db_name' on 'remote', which should be in a form acceptable to @@ -116,6 +123,8 @@ class Idl(object): self._monitor_request_id = None self._last_seqno = None self.change_seqno = 0 + self.uuid = uuid.uuid1() + self.state = self.IDL_S_INITIAL # Database locking. self.lock_name = None # Name of lock we need, None if none. @@ -134,6 +143,7 @@ class Idl(object): table.need_table = False table.rows = {} table.idl = self + table.condition = [] def close(self): """Closes the connection to the database. The IDL will no longer @@ -180,11 +190,15 @@ class Idl(object): if msg is None: break if (msg.type == ovs.jsonrpc.Message.T_NOTIFY - and msg.method == "update" - and len(msg.params) == 2 - and msg.params[0] is None): + and msg.method == "update2" + and len(msg.params) == 2): + # Database contents changed. + self.__parse_update(msg.params[1], OVSDB_UPDATE2) + elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY + and msg.method == "update" + and len(msg.params) == 2): # Database contents changed. - self.__parse_update(msg.params[1]) + self.__parse_update(msg.params[1], OVSDB_UPDATE) elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._monitor_request_id is not None and self._monitor_request_id == msg.id): @@ -193,10 +207,15 @@ class Idl(object): self.change_seqno += 1 self._monitor_request_id = None self.__clear() - self.__parse_update(msg.result) + if self.state == self.IDL_S_MONITOR_COND_REQUESTED: + self.__parse_update(msg.result, OVSDB_UPDATE2) + else: + assert self.state == self.IDL_S_MONITOR_REQUESTED + self.__parse_update(msg.result, OVSDB_UPDATE) + except error.Error as e: vlog.err("%s: parse error in received schema: %s" - % (self._session.get_name(), e)) + % (self._session.get_name(), e)) self.__error() elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._lock_request_id is not None @@ -214,6 +233,11 @@ class Idl(object): elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo": # Reply to our echo request. Ignore it. pass + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and + self.state == self.IDL_S_MONITOR_COND_REQUESTED and + self._monitor_request_id == msg.id): + if msg.error == "unknown method": + self.__send_monitor_request() elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, ovs.jsonrpc.Message.T_REPLY) and self.__txn_process_reply(msg)): @@ -228,6 +252,19 @@ class Idl(object): return initial_change_seqno != self.change_seqno + def cond_change(self, table_name, cond): + """Change conditions for this IDL session. If session is not already + connected, add condtion to table and submit it on send_monitor_request. + Otherwise send monitor_cond_change method with the requested + changes.""" + table = self.tables.get(table_name) + if not table: + raise error.Error('Unknown table "%s"' % table_name) + if self._session.is_connected(): + self.__send_cond_change(table, cond) + else: + table.condition = cond + def wait(self, poller): """Arranges for poller.block() to wake up when self.run() has something to do or when activity occurs on a transaction on 'self'.""" @@ -279,10 +316,18 @@ class Idl(object): :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE :param row: The row as it is after the operation has occured :type row: Row - :param updates: For updates, a Row object with just the changed columns + :param updates: For updates, row with only updated columns :type updates: Row """ + def __send_cond_change(self, table, cond): + monitor_cond_change = {table.name: [{"where": cond}]} + old_uuid = str(self.uuid) + self.uuid = uuid.uuid1() + params = [old_uuid, str(self.uuid), monitor_cond_change] + msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params) + self._session.send(msg) + def __clear(self): changed = False @@ -340,28 +385,39 @@ class Idl(object): self.is_lock_contended = True def __send_monitor_request(self): + if self.state == self.IDL_S_INITIAL: + self.state = self.IDL_S_MONITOR_COND_REQUESTED + method = "monitor_cond" + else: + self.state = self.IDL_S_MONITOR_REQUESTED + method = "monitor" + monitor_requests = {} for table in six.itervalues(self.tables): columns = [] for column in six.iterkeys(table.columns): if ((table.name not in self.readonly) or - (table.name in self.readonly) and - (column not in self.readonly[table.name])): + (table.name in self.readonly) and + (column not in self.readonly[table.name])): columns.append(column) monitor_requests[table.name] = {"columns": columns} + if method == "monitor_cond" and table.condition: + monitor_requests[table.name]["where"] = table.condition + table.condition = None + msg = ovs.jsonrpc.Message.create_request( - "monitor", [self._db.name, None, monitor_requests]) + method, [self._db.name, str(self.uuid), monitor_requests]) self._monitor_request_id = msg.id self._session.send(msg) - def __parse_update(self, update): + def __parse_update(self, update, version): try: - self.__do_parse_update(update) + self.__do_parse_update(update, version) except error.Error as e: vlog.err("%s: error parsing update: %s" % (self._session.get_name(), e)) - def __do_parse_update(self, table_updates): + def __do_parse_update(self, table_updates, version): if not isinstance(table_updates, dict): raise error.Error(" is not an object", table_updates) @@ -390,6 +446,11 @@ class Idl(object): 'is not an object' % (table_name, uuid_string)) + if version == OVSDB_UPDATE2: + if self.__process_update2(table, uuid, row_update): + self.change_seqno += 1 + continue + parser = ovs.db.parser.Parser(row_update, "row-update") old = parser.get_optional("old", [dict]) new = parser.get_optional("new", [dict]) @@ -402,6 +463,45 @@ class Idl(object): if self.__process_update(table, uuid, old, new): self.change_seqno += 1 + def __process_update2(self, table, uuid, row_update): + row = table.rows.get(uuid) + changed = False + if "delete" in row_update: + if row: + del table.rows[uuid] + self.notify(ROW_DELETE, row) + changed = True + else: + # XXX rate-limit + vlog.warn("cannot delete missing row %s from table" + "%s" % (uuid, table.name)) + elif "insert" in row_update or "initial" in row_update: + if row: + vlog.warn("cannot add existing row %s from table" + " %s" % (uuid, table.name)) + del table.rows[uuid] + row = self.__create_row(table, uuid) + if "insert" in row_update: + row_update = row_update['insert'] + else: + row_update = row_update['initial'] + self.__add_default(table, row_update) + if self.__row_update(table, row, row_update): + changed = True + self.notify(ROW_CREATE, row) + elif "modify" in row_update: + if not row: + raise error.Error('Modify non-existing row') + + self.__apply_diff(table, row, row_update['modify']) + self.notify(ROW_UPDATE, row, + Row.from_json(self, table, uuid, row_update['modify'])) + changed = True + else: + raise error.Error(' unknown operation', + row_update) + return changed + def __process_update(self, table, uuid, old, new): """Returns True if a column changed, False otherwise.""" row = table.rows.get(uuid) @@ -442,6 +542,42 @@ class Idl(object): self.notify(op, row, Row.from_json(self, table, uuid, old)) return changed + def __column_name(self, column): + if column.type.key.type == ovs.db.types.UuidType: + return ovs.ovsuuid.to_json(column.type.key.type.default) + else: + return column.type.key.type.default + + def __add_default(self, table, row_update): + for column in six.itervalues(table.columns): + if column.name not in row_update: + if ((table.name not in self.readonly) or + (table.name in self.readonly) and + (column.name not in self.readonly[table.name])): + if column.type.n_min != 0 and not column.type.is_map(): + row_update[column.name] = self.__column_name(column) + + def __apply_diff(self, table, row, row_diff): + for column_name, datum_json in six.iteritems(row_diff): + column = table.columns.get(column_name) + if not column: + # XXX rate-limit + vlog.warn("unknown column %s updating table %s" + % (column_name, table.name)) + continue + + try: + datum = ovs.db.data.Datum.from_json(column.type, datum_json) + except error.Error as e: + # XXX rate-limit + vlog.warn("error parsing column %s in table %s: %s" + % (column_name, table.name, e)) + continue + + datum = row._data[column_name].diff(datum) + if datum != row._data[column_name]: + row._data[column_name] = datum + def __row_update(self, table, row, row_json): changed = False for column_name, datum_json in six.iteritems(row_json): @@ -608,7 +744,7 @@ class Row(object): assert self._idl.txn if ((self._table.name in self._idl.readonly) and - (column_name in self._idl.readonly[self._table.name])): + (column_name in self._idl.readonly[self._table.name])): vlog.warn("attempting to write to readonly column %s" % column_name) return @@ -844,8 +980,8 @@ class Transaction(object): def _substitute_uuids(self, json): if isinstance(json, (list, tuple)): if (len(json) == 2 - and json[0] == 'uuid' - and ovs.ovsuuid.is_valid_string(json[1])): + and json[0] == 'uuid' + and ovs.ovsuuid.is_valid_string(json[1])): uuid = ovs.ovsuuid.from_string(json[1]) row = self._txn_rows.get(uuid, None) if row and row._data is None: @@ -982,14 +1118,14 @@ class Transaction(object): for column_name, datum in six.iteritems(row._changes): if row._data is not None or not datum.is_default(): row_json[column_name] = ( - self._substitute_uuids(datum.to_json())) + self._substitute_uuids(datum.to_json())) # If anything really changed, consider it an update. # We can't suppress not-really-changed values earlier # or transactions would become nonatomic (see the big # comment inside Transaction._write()). if (not any_updates and row._data is not None and - row._data[column_name] != datum): + row._data[column_name] != datum): any_updates = True if row._data is None or row_json: diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at index cd8d0a344..63008b3df 100644 --- a/tests/ovsdb-idl.at +++ b/tests/ovsdb-idl.at @@ -708,6 +708,103 @@ OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially populated], 003: done ]]) +m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND_PY], + [AT_SETUP([$1 - Python]) + AT_SKIP_IF([test $HAVE_PYTHON = no]) + AT_KEYWORDS([ovsdb server idl Python monitor $4]) + AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema], + [0], [stdout], [ignore]) + AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore]) + AT_CHECK([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/disable-monitor-cond]) + AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema unix:socket $2], + [0], [stdout], [ignore], [kill `cat pid`]) + AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$5],,, [[| $5]]), + [0], [$3], [], [kill `cat pid`]) + OVSDB_SERVER_SHUTDOWN + AT_CLEANUP]) + + +m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND], + [OVSDB_CHECK_IDL_WO_MONITOR_COND_PY($@)]) + + +OVSDB_CHECK_IDL_WO_MONITOR_COND([simple idl disable monitor-cond], + [['["idltest", + {"op": "insert", + "table": "simple", + "row": {"i": 1, + "r": 2.0, + "b": true, + "s": "mystring", + "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"], + "ia": ["set", [1, 2, 3]], + "ra": ["set", [-0.5]], + "ba": ["set", [true]], + "sa": ["set", ["abc", "def"]], + "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"], + ["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}}, + {"op": "insert", + "table": "simple", + "row": {}}]' \ + '["idltest", + {"op": "update", + "table": "simple", + "where": [], + "row": {"b": true}}]' \ + '["idltest", + {"op": "update", + "table": "simple", + "where": [], + "row": {"r": 123.5}}]' \ + '["idltest", + {"op": "insert", + "table": "simple", + "row": {"i": -1, + "r": 125, + "b": false, + "s": "", + "ia": ["set", [1]], + "ra": ["set", [1.5]], + "ba": ["set", [false]], + "sa": ["set", []], + "ua": ["set", []]}}]' \ + '["idltest", + {"op": "update", + "table": "simple", + "where": [["i", "<", 1]], + "row": {"s": "newstring"}}]' \ + '["idltest", + {"op": "delete", + "table": "simple", + "where": [["i", "==", 0]]}]' \ + 'reconnect']], + [[000: empty +001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]} +002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> +002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +003: {"error":null,"result":[{"count":2}]} +004: i=0 r=0 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> +004: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +005: {"error":null,"result":[{"count":2}]} +006: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> +006: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]} +008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> +008: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> +008: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +009: {"error":null,"result":[{"count":2}]} +010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> +010: i=0 r=123.5 b=true s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> +010: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +011: {"error":null,"result":[{"count":1}]} +012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> +012: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +013: reconnect +014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> +014: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +015: done +]]) + m4_define([OVSDB_CHECK_IDL_TRACK_C], [AT_SETUP([$1 - C]) AT_KEYWORDS([ovsdb server idl tracking positive $5]) -- 2.20.1