lib: add monitor_cond_change API to C IDL lib
[cascardo/ovs.git] / python / ovs / db / idl.py
index e181fef..437e9b0 100644 (file)
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import functools
 import uuid
 
 import six
@@ -32,6 +33,9 @@ ROW_CREATE = "create"
 ROW_UPDATE = "update"
 ROW_DELETE = "delete"
 
+OVSDB_UPDATE = 0
+OVSDB_UPDATE2 = 1
+
 
 class Idl(object):
     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -85,6 +89,10 @@ class Idl(object):
       currently being constructed, if there is one, or None otherwise.
 """
 
+    IDL_S_INITIAL = 0
+    IDL_S_MONITOR_REQUESTED = 1
+    IDL_S_MONITOR_COND_REQUESTED = 2
+
     def __init__(self, remote, schema):
         """Creates and returns a connection to the database named 'db_name' on
         'remote', which should be in a form acceptable to
@@ -115,6 +123,8 @@ class Idl(object):
         self._monitor_request_id = None
         self._last_seqno = None
         self.change_seqno = 0
+        self.uuid = uuid.uuid1()
+        self.state = self.IDL_S_INITIAL
 
         # Database locking.
         self.lock_name = None          # Name of lock we need, None if none.
@@ -133,6 +143,8 @@ class Idl(object):
             table.need_table = False
             table.rows = {}
             table.idl = self
+            table.condition = []
+            table.cond_changed = False
 
     def close(self):
         """Closes the connection to the database.  The IDL will no longer
@@ -159,6 +171,8 @@ class Idl(object):
         for changes in self.change_seqno."""
         assert not self.txn
         initial_change_seqno = self.change_seqno
+
+        self.send_cond_change()
         self._session.run()
         i = 0
         while i < 50:
@@ -179,11 +193,15 @@ class Idl(object):
             if msg is None:
                 break
             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
-                and msg.method == "update"
-                and len(msg.params) == 2
-                and msg.params[0] is None):
+                    and msg.method == "update2"
+                    and len(msg.params) == 2):
+                # Database contents changed.
+                self.__parse_update(msg.params[1], OVSDB_UPDATE2)
+            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+                    and msg.method == "update"
+                    and len(msg.params) == 2):
                 # Database contents changed.
-                self.__parse_update(msg.params[1])
+                self.__parse_update(msg.params[1], OVSDB_UPDATE)
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._monitor_request_id is not None
                   and self._monitor_request_id == msg.id):
@@ -192,10 +210,15 @@ class Idl(object):
                     self.change_seqno += 1
                     self._monitor_request_id = None
                     self.__clear()
-                    self.__parse_update(msg.result)
+                    if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+                        self.__parse_update(msg.result, OVSDB_UPDATE2)
+                    else:
+                        assert self.state == self.IDL_S_MONITOR_REQUESTED
+                        self.__parse_update(msg.result, OVSDB_UPDATE)
+
                 except error.Error as e:
                     vlog.err("%s: parse error in received schema: %s"
-                              % (self._session.get_name(), e))
+                             % (self._session.get_name(), e))
                     self.__error()
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._lock_request_id is not None
@@ -213,6 +236,11 @@ class Idl(object):
             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
                 # Reply to our echo request.  Ignore it.
                 pass
+            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+                  self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+                  self._monitor_request_id == msg.id):
+                if msg.error == "unknown method":
+                    self.__send_monitor_request()
             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
                                ovs.jsonrpc.Message.T_REPLY)
                   and self.__txn_process_reply(msg)):
@@ -227,6 +255,33 @@ class Idl(object):
 
         return initial_change_seqno != self.change_seqno
 
+    def send_cond_change(self):
+        if not self._session.is_connected():
+            return
+
+        for table in six.itervalues(self.tables):
+            if table.cond_changed:
+                self.__send_cond_change(table, table.condition)
+                table.cond_changed = False
+
+    def cond_change(self, table_name, add_cmd, cond):
+        """Change conditions for this IDL session. If session is not already
+        connected, add condtion to table and submit it on send_monitor_request.
+        Otherwise  send monitor_cond_change method with the requested
+        changes."""
+
+        table = self.tables.get(table_name)
+        if not table:
+            raise error.Error('Unknown table "%s"' % table_name)
+
+        if add_cmd:
+            table.condition += cond
+        else:
+            for c in cond:
+                table.condition.remove(c)
+
+        table.cond_changed = True
+
     def wait(self, poller):
         """Arranges for poller.block() to wake up when self.run() has something
         to do or when activity occurs on a transaction on 'self'."""
@@ -278,10 +333,18 @@ class Idl(object):
         :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE
         :param row:     The row as it is after the operation has occured
         :type row:      Row
-        :param updates: For updates, a Row object with just the changed columns
+        :param updates: For updates, row with only updated columns
         :type updates:  Row
         """
 
+    def __send_cond_change(self, table, cond):
+        monitor_cond_change = {table.name: [{"where": cond}]}
+        old_uuid = str(self.uuid)
+        self.uuid = uuid.uuid1()
+        params = [old_uuid, str(self.uuid), monitor_cond_change]
+        msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
+        self._session.send(msg)
+
     def __clear(self):
         changed = False
 
@@ -324,14 +387,14 @@ class Idl(object):
 
     def __parse_lock_reply(self, result):
         self._lock_request_id = None
-        got_lock = type(result) == dict and result.get("locked") is True
+        got_lock = isinstance(result, dict) and result.get("locked") is True
         self.__update_has_lock(got_lock)
         if not got_lock:
             self.is_lock_contended = True
 
     def __parse_lock_notify(self, params, new_has_lock):
         if (self.lock_name is not None
-            and type(params) in (list, tuple)
+            and isinstance(params, (list, tuple))
             and params
             and params[0] == self.lock_name):
             self.__update_has_lock(new_has_lock)
@@ -339,29 +402,41 @@ class Idl(object):
                 self.is_lock_contended = True
 
     def __send_monitor_request(self):
+        if self.state == self.IDL_S_INITIAL:
+            self.state = self.IDL_S_MONITOR_COND_REQUESTED
+            method = "monitor_cond"
+        else:
+            self.state = self.IDL_S_MONITOR_REQUESTED
+            method = "monitor"
+
         monitor_requests = {}
         for table in six.itervalues(self.tables):
             columns = []
             for column in six.iterkeys(table.columns):
                 if ((table.name not in self.readonly) or
-                    (table.name in self.readonly) and
-                    (column not in self.readonly[table.name])):
+                        (table.name in self.readonly) and
+                        (column not in self.readonly[table.name])):
                     columns.append(column)
             monitor_requests[table.name] = {"columns": columns}
+            if method == "monitor_cond" and table.cond_changed and \
+                   table.condition:
+                monitor_requests[table.name]["where"] = table.condition
+                table.cond_change = False
+
         msg = ovs.jsonrpc.Message.create_request(
-            "monitor", [self._db.name, None, monitor_requests])
+            method, [self._db.name, str(self.uuid), monitor_requests])
         self._monitor_request_id = msg.id
         self._session.send(msg)
 
-    def __parse_update(self, update):
+    def __parse_update(self, update, version):
         try:
-            self.__do_parse_update(update)
+            self.__do_parse_update(update, version)
         except error.Error as e:
             vlog.err("%s: error parsing update: %s"
                      % (self._session.get_name(), e))
 
-    def __do_parse_update(self, table_updates):
-        if type(table_updates) != dict:
+    def __do_parse_update(self, table_updates, version):
+        if not isinstance(table_updates, dict):
             raise error.Error("<table-updates> is not an object",
                               table_updates)
 
@@ -371,7 +446,7 @@ class Idl(object):
                 raise error.Error('<table-updates> includes unknown '
                                   'table "%s"' % table_name)
 
-            if type(table_update) != dict:
+            if not isinstance(table_update, dict):
                 raise error.Error('<table-update> for table "%s" is not '
                                   'an object' % table_name, table_update)
 
@@ -383,12 +458,17 @@ class Idl(object):
                                       table_update)
                 uuid = ovs.ovsuuid.from_string(uuid_string)
 
-                if type(row_update) != dict:
+                if not isinstance(row_update, dict):
                     raise error.Error('<table-update> for table "%s" '
                                       'contains <row-update> for %s that '
                                       'is not an object'
                                       % (table_name, uuid_string))
 
+                if version == OVSDB_UPDATE2:
+                    if self.__process_update2(table, uuid, row_update):
+                        self.change_seqno += 1
+                    continue
+
                 parser = ovs.db.parser.Parser(row_update, "row-update")
                 old = parser.get_optional("old", [dict])
                 new = parser.get_optional("new", [dict])
@@ -401,6 +481,45 @@ class Idl(object):
                 if self.__process_update(table, uuid, old, new):
                     self.change_seqno += 1
 
+    def __process_update2(self, table, uuid, row_update):
+        row = table.rows.get(uuid)
+        changed = False
+        if "delete" in row_update:
+            if row:
+                del table.rows[uuid]
+                self.notify(ROW_DELETE, row)
+                changed = True
+            else:
+                # XXX rate-limit
+                vlog.warn("cannot delete missing row %s from table"
+                          "%s" % (uuid, table.name))
+        elif "insert" in row_update or "initial" in row_update:
+            if row:
+                vlog.warn("cannot add existing row %s from table"
+                          " %s" % (uuid, table.name))
+                del table.rows[uuid]
+            row = self.__create_row(table, uuid)
+            if "insert" in row_update:
+                row_update = row_update['insert']
+            else:
+                row_update = row_update['initial']
+            self.__add_default(table, row_update)
+            if self.__row_update(table, row, row_update):
+                changed = True
+                self.notify(ROW_CREATE, row)
+        elif "modify" in row_update:
+            if not row:
+                raise error.Error('Modify non-existing row')
+
+            self.__apply_diff(table, row, row_update['modify'])
+            self.notify(ROW_UPDATE, row,
+                        Row.from_json(self, table, uuid, row_update['modify']))
+            changed = True
+        else:
+            raise error.Error('<row-update> unknown operation',
+                              row_update)
+        return changed
+
     def __process_update(self, table, uuid, old, new):
         """Returns True if a column changed, False otherwise."""
         row = table.rows.get(uuid)
@@ -441,6 +560,42 @@ class Idl(object):
                 self.notify(op, row, Row.from_json(self, table, uuid, old))
         return changed
 
+    def __column_name(self, column):
+        if column.type.key.type == ovs.db.types.UuidType:
+            return ovs.ovsuuid.to_json(column.type.key.type.default)
+        else:
+            return column.type.key.type.default
+
+    def __add_default(self, table, row_update):
+        for column in six.itervalues(table.columns):
+            if column.name not in row_update:
+                if ((table.name not in self.readonly) or
+                        (table.name in self.readonly) and
+                        (column.name not in self.readonly[table.name])):
+                    if column.type.n_min != 0 and not column.type.is_map():
+                        row_update[column.name] = self.__column_name(column)
+
+    def __apply_diff(self, table, row, row_diff):
+        for column_name, datum_json in six.iteritems(row_diff):
+            column = table.columns.get(column_name)
+            if not column:
+                # XXX rate-limit
+                vlog.warn("unknown column %s updating table %s"
+                          % (column_name, table.name))
+                continue
+
+            try:
+                datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+            except error.Error as e:
+                # XXX rate-limit
+                vlog.warn("error parsing column %s in table %s: %s"
+                          % (column_name, table.name, e))
+                continue
+
+            datum = row._data[column_name].diff(datum)
+            if datum != row._data[column_name]:
+                row._data[column_name] = datum
+
     def __row_update(self, table, row, row_json):
         changed = False
         for column_name, datum_json in six.iteritems(row_json):
@@ -498,12 +653,13 @@ def _uuid_to_row(atom, base):
 
 
 def _row_to_uuid(value):
-    if type(value) == Row:
+    if isinstance(value, Row):
         return value.uuid
     else:
         return value
 
 
+@functools.total_ordering
 class Row(object):
     """A row within an IDL.
 
@@ -572,6 +728,19 @@ class Row(object):
         # in the dictionary are all None.
         self.__dict__["_prereqs"] = {}
 
+    def __lt__(self, other):
+        if not isinstance(other, Row):
+            return NotImplemented
+        return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
+
+    def __eq__(self, other):
+        if not isinstance(other, Row):
+            return NotImplemented
+        return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
+
+    def __hash__(self):
+        return int(self.__dict__['uuid'])
+
     def __getattr__(self, column_name):
         assert self._changes is not None
 
@@ -593,7 +762,7 @@ class Row(object):
         assert self._idl.txn
 
         if ((self._table.name in self._idl.readonly) and
-            (column_name in self._idl.readonly[self._table.name])):
+                (column_name in self._idl.readonly[self._table.name])):
             vlog.warn("attempting to write to readonly column %s"
                       % column_name)
             return
@@ -827,10 +996,10 @@ class Transaction(object):
             poller.immediate_wake()
 
     def _substitute_uuids(self, json):
-        if type(json) in (list, tuple):
+        if isinstance(json, (list, tuple)):
             if (len(json) == 2
-                and json[0] == 'uuid'
-                and ovs.ovsuuid.is_valid_string(json[1])):
+                    and json[0] == 'uuid'
+                    and ovs.ovsuuid.is_valid_string(json[1])):
                 uuid = ovs.ovsuuid.from_string(json[1])
                 row = self._txn_rows.get(uuid, None)
                 if row and row._data is None:
@@ -967,14 +1136,14 @@ class Transaction(object):
                 for column_name, datum in six.iteritems(row._changes):
                     if row._data is not None or not datum.is_default():
                         row_json[column_name] = (
-                                self._substitute_uuids(datum.to_json()))
+                            self._substitute_uuids(datum.to_json()))
 
                         # If anything really changed, consider it an update.
                         # We can't suppress not-really-changed values earlier
                         # or transactions would become nonatomic (see the big
                         # comment inside Transaction._write()).
                         if (not any_updates and row._data is not None and
-                            row._data[column_name] != datum):
+                                row._data[column_name] != datum):
                             any_updates = True
 
                 if row._data is None or row_json:
@@ -1150,7 +1319,7 @@ class Transaction(object):
     def _process_reply(self, msg):
         if msg.type == ovs.jsonrpc.Message.T_ERROR:
             self._status = Transaction.ERROR
-        elif type(msg.result) not in (list, tuple):
+        elif not isinstance(msg.result, (list, tuple)):
             # XXX rate-limit
             vlog.warn('reply to "transact" is not JSON array')
         else:
@@ -1165,7 +1334,7 @@ class Transaction(object):
                     # prior operation failed, so make sure that we know about
                     # it.
                     soft_errors = True
-                elif type(op) == dict:
+                elif isinstance(op, dict):
                     error = op.get("error")
                     if error is not None:
                         if error == "timed out":
@@ -1211,7 +1380,7 @@ class Transaction(object):
             # XXX rate-limit
             vlog.warn("%s is missing" % name)
             return False
-        elif type(json) not in types:
+        elif not isinstance(json, tuple(types)):
             # XXX rate-limit
             vlog.warn("%s has unexpected type %s" % (name, type(json)))
             return False
@@ -1362,8 +1531,8 @@ class SchemaHelper(object):
         'readonly' must be a list of strings.
         """
 
-        assert type(table) is str
-        assert type(columns) is list
+        assert isinstance(table, six.string_types)
+        assert isinstance(columns, list)
 
         columns = set(columns) | self._tables.get(table, set())
         self._tables[table] = columns
@@ -1375,7 +1544,7 @@ class SchemaHelper(object):
 
         'table' must be a string
         """
-        assert type(table) is str
+        assert isinstance(table, six.string_types)
         self._tables[table] = set()  # empty set means all columns in the table
 
     def register_all(self):
@@ -1410,7 +1579,7 @@ class SchemaHelper(object):
 
         new_columns = {}
         for column_name in columns:
-            assert type(column_name) is str
+            assert isinstance(column_name, six.string_types)
             assert column_name in table.columns
 
             new_columns[column_name] = table.columns[column_name]