ovsdb-idl: Support for readonly columns that are fetched on-demand
authorShad Ansari <shad.ansari@hp.com>
Thu, 22 Oct 2015 21:35:24 +0000 (14:35 -0700)
committerBen Pfaff <blp@ovn.org>
Mon, 23 Nov 2015 16:34:54 +0000 (08:34 -0800)
There is currently no mechanism in IDL to fetch specific column values
on-demand without having to register them for monitoring. In the case
where the column represent a frequently changing entity (e.g. counter),
and the reads are relatively infrequent (e.g. CLI client), there is a
significant overhead in replication.

This patch adds support in the Python IDL to register a subset of the
columns of a table as "readonly". Readonly columns are not replicated.
Users may "fetch" the readonly columns of a row on-demand. Once fetched,
the columns are not updated until the next fetch by the user. Writes by
the user to readonly columns does not change the value (both locally or
on the server).

The two main user visible changes in this patch are:
  - The SchemaHelper.register_columns() method now takes an optionaly
    argument to specify the subset of readonly column(s)
  - A new Row.fetch(columns) method to fetch values of readonly columns(s)

Usage:
------

    # Schema file includes all columns, including readonly
    schema_helper = ovs.db.idl.SchemaHelper(schema_file)

    # Register interest in columns with 'r' and 's' as readonly
    schema_helper.register_columns("simple", [i, r, s], [r, s])

    # Create Idl and jsonrpc, and wait for update, as usual
    ...

    # Fetch value of column 'r' for a specific row
    row.fetch('r')
    txn.commit_block()

    print row.r
    print getattr(row, 'r')

    # Writing to readonly column has no effect (locally or on server)
    row.r = 3
    print row.r     # prints fetched value not 3

Signed-off-by: Shad Ansari <shad.ansari@hp.com>
Signed-off-by: Ben Pfaff <blp@ovn.org>
python/ovs/db/idl.py
tests/ovsdb-idl.at
tests/test-ovsdb.py

index f074dbf..c8990c7 100644 (file)
@@ -107,6 +107,7 @@ class Idl(object):
         schema = schema.get_idl_schema()
 
         self.tables = schema.tables
+        self.readonly = schema.readonly
         self._db = schema
         self._session = ovs.jsonrpc.Session.open(remote)
         self._monitor_request_id = None
@@ -338,7 +339,13 @@ class Idl(object):
     def __send_monitor_request(self):
         monitor_requests = {}
         for table in self.tables.itervalues():
-            monitor_requests[table.name] = {"columns": table.columns.keys()}
+            columns = []
+            for column in table.columns.keys():
+                if ((table.name not in self.readonly) or
+                    (table.name in self.readonly) and
+                    (column not in self.readonly[table.name])):
+                    columns.append(column)
+            monitor_requests[table.name] = {"columns": columns}
         msg = ovs.jsonrpc.Message.create_request(
             "monitor", [self._db.name, None, monitor_requests])
         self._monitor_request_id = msg.id
@@ -571,7 +578,11 @@ class Row(object):
             if self._data is None:
                 raise AttributeError("%s instance has no attribute '%s'" %
                                      (self.__class__.__name__, column_name))
-            datum = self._data[column_name]
+            if column_name in self._data:
+                datum = self._data[column_name]
+            else:
+                raise AttributeError("%s instance has no attribute '%s'" %
+                                     (self.__class__.__name__, column_name))
 
         return datum.to_python(_uuid_to_row)
 
@@ -579,6 +590,11 @@ class Row(object):
         assert self._changes is not None
         assert self._idl.txn
 
+        if ((self._table.name in self._idl.readonly) and
+            (column_name in self._idl.readonly[self._table.name])):
+            vlog.warn("attempting to write to readonly column %s" % column_name)
+            return
+
         column = self._table.columns[column_name]
         try:
             datum = ovs.db.data.Datum.from_python(column.type, value,
@@ -655,6 +671,9 @@ class Row(object):
         self.__dict__["_changes"] = None
         del self._table.rows[self.uuid]
 
+    def fetch(self, column_name):
+        self._idl.txn._fetch(self, column_name)
+
     def increment(self, column_name):
         """Causes the transaction, when committed, to increment the value of
         'column_name' within this row by 1.  'column_name' must have an integer
@@ -777,10 +796,12 @@ class Transaction(object):
         self._inc_row = None
         self._inc_column = None
 
+        self._fetch_requests = []
+
         self._inserted_rows = {}  # Map from UUID to _InsertedRow
 
     def add_comment(self, comment):
-        """Appens 'comment' to the comments that will be passed to the OVSDB
+        """Appends 'comment' to the comments that will be passed to the OVSDB
         server when this transaction is committed.  (The comment will be
         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
         relatively human-readable form.)"""
@@ -947,6 +968,16 @@ class Transaction(object):
                 if row._data is None or row_json:
                     operations.append(op)
 
+        if self._fetch_requests:
+            for fetch in self._fetch_requests:
+                fetch["index"] = len(operations) - 1
+                operations.append({"op": "select",
+                                   "table": fetch["row"]._table.name,
+                                   "where": self._substitute_uuids(
+                                       _where_uuid_equals(fetch["row"].uuid)),
+                                   "columns": [fetch["column_name"]]})
+            any_updates = True
+
         # Add increment.
         if self._inc_row and any_updates:
             self._inc_index = len(operations) - 1
@@ -1057,6 +1088,9 @@ class Transaction(object):
         self._inc_row = row
         self._inc_column = column
 
+    def _fetch(self, row, column_name):
+        self._fetch_requests.append({"row":row, "column_name":column_name})
+
     def _write(self, row, column, datum):
         assert row._changes is not None
 
@@ -1139,6 +1173,11 @@ class Transaction(object):
             if not soft_errors and not hard_errors and not lock_errors:
                 if self._inc_row and not self.__process_inc_reply(ops):
                     hard_errors = True
+                if self._fetch_requests:
+                    if self.__process_fetch_reply(ops):
+                        self.idl.change_seqno += 1
+                    else:
+                        hard_errors = True
 
                 for insert in self._inserted_rows.itervalues():
                     if not self.__process_insert_reply(insert, ops):
@@ -1166,6 +1205,38 @@ class Transaction(object):
         else:
             return True
 
+    def __process_fetch_reply(self, ops):
+        update = False
+        for fetch_request in self._fetch_requests:
+            row = fetch_request["row"]
+            column_name = fetch_request["column_name"]
+            index = fetch_request["index"]
+            table = row._table
+
+            select = ops[index]
+            fetched_rows = select.get("rows")
+            if not Transaction.__check_json_type(fetched_rows, (list, tuple),
+                                                 '"select" reply "rows"'):
+                return False
+            if len(fetched_rows) != 1:
+                # XXX rate-limit
+                vlog.warn('"select" reply "rows" has %d elements '
+                          'instead of 1' % len(rows))
+                continue
+            fetched_row = fetched_rows[0]
+            if not Transaction.__check_json_type(fetched_row, (dict,),
+                                                 '"select" reply row'):
+                continue
+
+            column = table.columns.get(column_name)
+            datum_json = fetched_row.get(column_name)
+            datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+
+            row._data[column_name] = datum
+            update = True
+
+        return update
+
     def __process_inc_reply(self, ops):
         if self._inc_index + 2 > len(ops):
             # XXX rate-limit
@@ -1261,16 +1332,21 @@ class SchemaHelper(object):
 
         self.schema_json = schema_json
         self._tables = {}
+        self._readonly = {}
         self._all = False
 
-    def register_columns(self, table, columns):
+    def register_columns(self, table, columns, readonly=[]):
         """Registers interest in the given 'columns' of 'table'.  Future calls
         to get_idl_schema() will include 'table':column for each column in
         'columns'. This function automatically avoids adding duplicate entries
         to the schema.
+        A subset of 'columns' can be specified as 'readonly'. The readonly
+        columns are not replicated but can be fetched on-demand by the user
+        with Row.fetch().
 
         'table' must be a string.
         'columns' must be a list of strings.
+        'readonly' must be a list of strings.
         """
 
         assert type(table) is str
@@ -1278,6 +1354,7 @@ class SchemaHelper(object):
 
         columns = set(columns) | self._tables.get(table, set())
         self._tables[table] = columns
+        self._readonly[table] = readonly
 
     def register_table(self, table):
         """Registers interest in the given all columns of 'table'. Future calls
@@ -1307,6 +1384,7 @@ class SchemaHelper(object):
                     self._keep_table_columns(schema, table, columns))
 
             schema.tables = schema_tables
+        schema.readonly = self._readonly
         return schema
 
     def _keep_table_columns(self, schema, table_name, columns):
index d3d2aeb..c7b2582 100644 (file)
@@ -598,3 +598,50 @@ AT_CHECK([grep '"monitor"' stderr | grep -c '"ua"'], [0], [1
 ])
 OVSDB_SERVER_SHUTDOWN
 AT_CLEANUP
+
+m4_define([OVSDB_CHECK_IDL_FETCH_COLUMNS_PY],
+  [AT_SETUP([$1 - Python fetch])
+   AT_SKIP_IF([test $HAVE_PYTHON = no])
+   AT_KEYWORDS([ovsdb server idl positive Python increment fetch $6])
+   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
+                  [0], [stdout], [ignore])
+   AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
+   m4_if([$2], [], [],
+     [AT_CHECK([ovsdb-client transact unix:socket $2], [0], [ignore], [ignore], [kill `cat pid`])])
+   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py  -t10 idl $srcdir/idltest.ovsschema unix:socket [$3] $4],
+            [0], [stdout], [ignore], [kill `cat pid`])
+   AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$7],,, [[| $7]]),
+            [0], [$5], [], [kill `cat pid`])
+   OVSDB_SERVER_SHUTDOWN
+   AT_CLEANUP])
+
+m4_define([OVSDB_CHECK_IDL_FETCH_COLUMNS],
+   [OVSDB_CHECK_IDL_FETCH_COLUMNS_PY($@)])
+
+OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially populated],
+  [['["idltest",
+      {"op": "insert",
+       "table": "simple",
+       "row": {"i": 1,
+               "r": 2.0,
+               "b": true,
+               "s": "mystring",
+               "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"],
+               "ia": ["set", [1, 2, 3]],
+               "ra": ["set", [-0.5]],
+               "ba": ["set", [true]],
+               "sa": ["set", ["abc", "def"]],
+               "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"],
+                              ["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
+      {"op": "insert",
+       "table": "simple",
+       "row": {}}]']],
+  [?simple:i,r!],
+  ['fetch 0 r'],
+  [[000: i=0 uuid=<0>
+000: i=1 uuid=<1>
+001: commit, status=success
+002: i=0 r=0 uuid=<0>
+002: i=1 uuid=<1>
+003: done
+]])
index ab951f9..a6897f3 100644 (file)
@@ -146,44 +146,53 @@ def do_parse_schema(schema_string):
 
 
 def print_idl(idl, step):
-    simple = idl.tables["simple"].rows
-    l1 = idl.tables["link1"].rows
-    l2 = idl.tables["link2"].rows
-
     n = 0
-    for row in simple.itervalues():
-        s = ("%03d: i=%s r=%s b=%s s=%s u=%s "
-             "ia=%s ra=%s ba=%s sa=%s ua=%s uuid=%s"
-             % (step, row.i, row.r, row.b, row.s, row.u,
-                row.ia, row.ra, row.ba, row.sa, row.ua, row.uuid))
-        s = re.sub('""|,|u?\'', "", s)
-        s = re.sub('UUID\(([^)]+)\)', r'\1', s)
-        s = re.sub('False', 'false', s)
-        s = re.sub('True', 'true', s)
-        s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s)
-        print(s)
-        n += 1
-
-    for row in l1.itervalues():
-        s = ["%03d: i=%s k=" % (step, row.i)]
-        if row.k:
-            s.append(str(row.k.i))
-        s.append(" ka=[")
-        s.append(' '.join(sorted(str(ka.i) for ka in row.ka)))
-        s.append("] l2=")
-        if row.l2:
-            s.append(str(row.l2[0].i))
-        s.append(" uuid=%s" % row.uuid)
-        print(''.join(s))
-        n += 1
-
-    for row in l2.itervalues():
-        s = ["%03d: i=%s l1=" % (step, row.i)]
-        if row.l1:
-            s.append(str(row.l1[0].i))
-        s.append(" uuid=%s" % row.uuid)
-        print(''.join(s))
-        n += 1
+    if "simple" in idl.tables:
+        simple_columns = ["i", "r", "b", "s", "u", "ia",
+                          "ra", "ba", "sa", "ua", "uuid"]
+        simple = idl.tables["simple"].rows
+        for row in simple.itervalues():
+            s = "%03d:" % step
+            for column in simple_columns:
+                if hasattr(row, column) and not (type(getattr(row, column))
+                                                 is ovs.db.data.Atom):
+                    s += " %s=%s" % (column, getattr(row, column))
+            s = re.sub('""|,|u?\'', "", s)
+            s = re.sub('UUID\(([^)]+)\)', r'\1', s)
+            s = re.sub('False', 'false', s)
+            s = re.sub('True', 'true', s)
+            s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s)
+            print(s)
+            n += 1
+
+    if "link1" in idl.tables:
+        l1 = idl.tables["link1"].rows
+        for row in l1.itervalues():
+            s = ["%03d: i=%s k=" % (step, row.i)]
+            if hasattr(row, "k") and row.k:
+                s.append(str(row.k.i))
+            if hasattr(row, "ka"):
+                s.append(" ka=[")
+                s.append(' '.join(sorted(str(ka.i) for ka in row.ka)))
+                s.append("] l2=")
+            if hasattr(row, "l2") and row.l2:
+                s.append(str(row.l2[0].i))
+            if hasattr(row, "uuid"):
+                s.append(" uuid=%s" % row.uuid)
+            print(''.join(s))
+            n += 1
+
+    if "link2" in idl.tables:
+        l2 = idl.tables["link2"].rows
+        for row in l2.itervalues():
+            s = ["%03d:" % step]
+            s.append(" i=%s l1=" % row.i)
+            if hasattr(row, "l1") and row.l1:
+                s.append(str(row.l1[0].i))
+            if hasattr(row, "uuid"):
+                s.append(" uuid=%s" % row.uuid)
+            print(''.join(s))
+            n += 1
 
     if not n:
         print("%03d: empty" % step)
@@ -228,6 +237,7 @@ def idltest_find_simple(idl, i):
 def idl_set(idl, commands, step):
     txn = ovs.db.idl.Transaction(idl)
     increment = False
+    fetch_cmds = []
     events = []
     for command in commands.split(','):
         words = command.split()
@@ -307,6 +317,20 @@ def idl_set(idl, commands, step):
                 sys.stderr.write('"verify" command asks for unknown column '
                                  '"%s"\n' % args[1])
                 sys.exit(1)
+        elif name == "fetch":
+            if len(args) != 2:
+                sys.stderr.write('"fetch" command requires 2 argument\n')
+                sys.exit(1)
+
+            row = idltest_find_simple(idl, int(args[0]))
+            if not row:
+                sys.stderr.write('"fetch" command asks for nonexistent i=%d\n'
+                                 % int(args[0]))
+                sys.exit(1)
+
+            column = args[1]
+            row.fetch(column)
+            fetch_cmds.append([row, column])
         elif name == "increment":
             if len(args) != 1:
                 sys.stderr.write('"increment" command requires 1 argument\n')
@@ -366,10 +390,16 @@ def do_idl(schema_file, remote, *commands):
     schema_helper = ovs.db.idl.SchemaHelper(schema_file)
     if commands and commands[0].startswith("?"):
         monitor = {}
+        readonly = {}
         for x in commands[0][1:].split("?"):
+            readonly = []
             table, columns = x.split(":")
-            monitor[table] = columns.split(",")
-            schema_helper.register_columns(table, monitor[table])
+            columns = columns.split(",")
+            for index, column in enumerate(columns):
+                if column[-1] == '!':
+                    columns[index] = columns[index][:-1]
+                    readonly.append(columns[index])
+            schema_helper.register_columns(table, columns, readonly)
         commands = commands[1:]
     else:
         schema_helper.register_all()
@@ -499,6 +529,12 @@ idl SCHEMA SERVER [?T1:C1,C2...[?T2:C1,C2,...]...] [TRANSACTION...]
   e.g.:
       ?simple:b?link1:i,k - Monitor column "b" in table "simple",
                             and column "i", "k" in table "link1"
+  Readonly columns: Suffixing a "!" after a column indicates that the
+  column is to be registered "readonly".
+  e.g.:
+      ?simple:i,b!  - Register interest in column "i" (monitoring) and
+                      column "b" (readonly).
+
 
 The following options are also available:
   -t, --timeout=SECS          give up after SECS seconds