lib: add monitor_cond_change API to C IDL lib
[cascardo/ovs.git] / python / ovs / db / idl.py
index d01fde8..437e9b0 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2009, 2010, 2011 Nicira Networks
+# Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
+import functools
 import uuid
 
+import six
+
 import ovs.jsonrpc
 import ovs.db.parser
 import ovs.db.schema
 from ovs.db import error
 import ovs.ovsuuid
 import ovs.poller
+import ovs.vlog
+
+vlog = ovs.vlog.Vlog("idl")
+
+__pychecker__ = 'no-classattr no-objattrs'
+
+ROW_CREATE = "create"
+ROW_UPDATE = "update"
+ROW_DELETE = "delete"
 
-class Idl:
+OVSDB_UPDATE = 0
+OVSDB_UPDATE2 = 1
+
+
+class Idl(object):
     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
 
     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
@@ -47,7 +62,13 @@ class Idl:
       'rows' map values.  Refer to Row for more details.
 
     - 'change_seqno': A number that represents the IDL's state.  When the IDL
-      is updated (by Idl.run()), its value changes.
+      is updated (by Idl.run()), its value changes.  The sequence number can
+      occasionally change even if the database does not.  This happens if the
+      connection to the database drops and reconnects, which causes the
+      database contents to be reloaded even if they didn't change.  (It could
+      also happen if the database server sends out a "change" that reflects
+      what the IDL already thought was in the database.  The database server is
+      not supposed to do that, but bugs could in theory cause it to do so.)
 
     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
       if no lock is configured.
@@ -68,6 +89,10 @@ class Idl:
       currently being constructed, if there is one, or None otherwise.
 """
 
+    IDL_S_INITIAL = 0
+    IDL_S_MONITOR_REQUESTED = 1
+    IDL_S_MONITOR_COND_REQUESTED = 2
+
     def __init__(self, remote, schema):
         """Creates and returns a connection to the database named 'db_name' on
         'remote', which should be in a form acceptable to
@@ -83,32 +108,43 @@ class Idl:
         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
         useful for columns that the IDL's client will write but not read.
 
+        As a convenience to users, 'schema' may also be an instance of the
+        SchemaHelper class.
+
         The IDL uses and modifies 'schema' directly."""
 
+        assert isinstance(schema, SchemaHelper)
+        schema = schema.get_idl_schema()
+
         self.tables = schema.tables
+        self.readonly = schema.readonly
         self._db = schema
         self._session = ovs.jsonrpc.Session.open(remote)
         self._monitor_request_id = None
         self._last_seqno = None
         self.change_seqno = 0
+        self.uuid = uuid.uuid1()
+        self.state = self.IDL_S_INITIAL
 
         # Database locking.
         self.lock_name = None          # Name of lock we need, None if none.
         self.has_lock = False          # Has db server said we have the lock?
-        self.is_lock_contended = False # Has db server said we can't get lock?
+        self.is_lock_contended = False  # Has db server said we can't get lock?
         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
 
         # Transaction support.
         self.txn = None
         self._outstanding_txns = {}
 
-        for table in schema.tables.itervalues():
-            for column in table.columns.itervalues():
+        for table in six.itervalues(schema.tables):
+            for column in six.itervalues(table.columns):
                 if not hasattr(column, 'alert'):
                     column.alert = True
             table.need_table = False
             table.rows = {}
             table.idl = self
+            table.condition = []
+            table.cond_changed = False
 
     def close(self):
         """Closes the connection to the database.  The IDL will no longer
@@ -135,6 +171,8 @@ class Idl:
         for changes in self.change_seqno."""
         assert not self.txn
         initial_change_seqno = self.change_seqno
+
+        self.send_cond_change()
         self._session.run()
         i = 0
         while i < 50:
@@ -155,11 +193,15 @@ class Idl:
             if msg is None:
                 break
             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
-                and msg.method == "update"
-                and len(msg.params) == 2
-                and msg.params[0] == None):
+                    and msg.method == "update2"
+                    and len(msg.params) == 2):
+                # Database contents changed.
+                self.__parse_update(msg.params[1], OVSDB_UPDATE2)
+            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+                    and msg.method == "update"
+                    and len(msg.params) == 2):
                 # Database contents changed.
-                self.__parse_update(msg.params[1])
+                self.__parse_update(msg.params[1], OVSDB_UPDATE)
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._monitor_request_id is not None
                   and self._monitor_request_id == msg.id):
@@ -168,10 +210,15 @@ class Idl:
                     self.change_seqno += 1
                     self._monitor_request_id = None
                     self.__clear()
-                    self.__parse_update(msg.result)
-                except error.Error, e:
-                    logging.error("%s: parse error in received schema: %s"
-                                  % (self._session.get_name(), e))
+                    if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+                        self.__parse_update(msg.result, OVSDB_UPDATE2)
+                    else:
+                        assert self.state == self.IDL_S_MONITOR_REQUESTED
+                        self.__parse_update(msg.result, OVSDB_UPDATE)
+
+                except error.Error as e:
+                    vlog.err("%s: parse error in received schema: %s"
+                             % (self._session.get_name(), e))
                     self.__error()
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._lock_request_id is not None
@@ -189,6 +236,11 @@ class Idl:
             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
                 # Reply to our echo request.  Ignore it.
                 pass
+            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+                  self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+                  self._monitor_request_id == msg.id):
+                if msg.error == "unknown method":
+                    self.__send_monitor_request()
             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
                                ovs.jsonrpc.Message.T_REPLY)
                   and self.__txn_process_reply(msg)):
@@ -197,12 +249,39 @@ class Idl:
             else:
                 # This can happen if a transaction is destroyed before we
                 # receive the reply, so keep the log level low.
-                logging.debug("%s: received unexpected %s message"
-                              % (self._session.get_name(),
-                                 ovs.jsonrpc.Message.type_to_string(msg.type)))
+                vlog.dbg("%s: received unexpected %s message"
+                         % (self._session.get_name(),
+                             ovs.jsonrpc.Message.type_to_string(msg.type)))
 
         return initial_change_seqno != self.change_seqno
 
+    def send_cond_change(self):
+        if not self._session.is_connected():
+            return
+
+        for table in six.itervalues(self.tables):
+            if table.cond_changed:
+                self.__send_cond_change(table, table.condition)
+                table.cond_changed = False
+
+    def cond_change(self, table_name, add_cmd, cond):
+        """Change conditions for this IDL session. If session is not already
+        connected, add condtion to table and submit it on send_monitor_request.
+        Otherwise  send monitor_cond_change method with the requested
+        changes."""
+
+        table = self.tables.get(table_name)
+        if not table:
+            raise error.Error('Unknown table "%s"' % table_name)
+
+        if add_cmd:
+            table.condition += cond
+        else:
+            for c in cond:
+                table.condition.remove(c)
+
+        table.cond_changed = True
+
     def wait(self, poller):
         """Arranges for poller.block() to wake up when self.run() has something
         to do or when activity occurs on a transaction on 'self'."""
@@ -247,10 +326,29 @@ class Idl:
             self.lock_name = lock_name
             self.__send_lock_request()
 
+    def notify(self, event, row, updates=None):
+        """Hook for implementing create/update/delete notifications
+
+        :param event:   The event that was triggered
+        :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE
+        :param row:     The row as it is after the operation has occured
+        :type row:      Row
+        :param updates: For updates, row with only updated columns
+        :type updates:  Row
+        """
+
+    def __send_cond_change(self, table, cond):
+        monitor_cond_change = {table.name: [{"where": cond}]}
+        old_uuid = str(self.uuid)
+        self.uuid = uuid.uuid1()
+        params = [old_uuid, str(self.uuid), monitor_cond_change]
+        msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
+        self._session.send(msg)
+
     def __clear(self):
         changed = False
 
-        for table in self.tables.itervalues():
+        for table in six.itervalues(self.tables):
             if table.rows:
                 changed = True
                 table.rows = {}
@@ -289,52 +387,70 @@ class Idl:
 
     def __parse_lock_reply(self, result):
         self._lock_request_id = None
-        got_lock = type(result) == dict and result.get("locked") is True
+        got_lock = isinstance(result, dict) and result.get("locked") is True
         self.__update_has_lock(got_lock)
         if not got_lock:
             self.is_lock_contended = True
 
     def __parse_lock_notify(self, params, new_has_lock):
         if (self.lock_name is not None
-            and type(params) in (list, tuple)
+            and isinstance(params, (list, tuple))
             and params
             and params[0] == self.lock_name):
-            self.__update_has_lock(self, new_has_lock)
+            self.__update_has_lock(new_has_lock)
             if not new_has_lock:
                 self.is_lock_contended = True
 
     def __send_monitor_request(self):
+        if self.state == self.IDL_S_INITIAL:
+            self.state = self.IDL_S_MONITOR_COND_REQUESTED
+            method = "monitor_cond"
+        else:
+            self.state = self.IDL_S_MONITOR_REQUESTED
+            method = "monitor"
+
         monitor_requests = {}
-        for table in self.tables.itervalues():
-            monitor_requests[table.name] = {"columns": table.columns.keys()}
+        for table in six.itervalues(self.tables):
+            columns = []
+            for column in six.iterkeys(table.columns):
+                if ((table.name not in self.readonly) or
+                        (table.name in self.readonly) and
+                        (column not in self.readonly[table.name])):
+                    columns.append(column)
+            monitor_requests[table.name] = {"columns": columns}
+            if method == "monitor_cond" and table.cond_changed and \
+                   table.condition:
+                monitor_requests[table.name]["where"] = table.condition
+                table.cond_change = False
+
         msg = ovs.jsonrpc.Message.create_request(
-            "monitor", [self._db.name, None, monitor_requests])
+            method, [self._db.name, str(self.uuid), monitor_requests])
         self._monitor_request_id = msg.id
         self._session.send(msg)
 
-    def __parse_update(self, update):
+    def __parse_update(self, update, version):
         try:
-            self.__do_parse_update(update)
-        except error.Error, e:
-            logging.error("%s: error parsing update: %s"
-                          % (self._session.get_name(), e))
+            self.__do_parse_update(update, version)
+        except error.Error as e:
+            vlog.err("%s: error parsing update: %s"
+                     % (self._session.get_name(), e))
 
-    def __do_parse_update(self, table_updates):
-        if type(table_updates) != dict:
+    def __do_parse_update(self, table_updates, version):
+        if not isinstance(table_updates, dict):
             raise error.Error("<table-updates> is not an object",
                               table_updates)
 
-        for table_name, table_update in table_updates.iteritems():
+        for table_name, table_update in six.iteritems(table_updates):
             table = self.tables.get(table_name)
             if not table:
                 raise error.Error('<table-updates> includes unknown '
                                   'table "%s"' % table_name)
 
-            if type(table_update) != dict:
+            if not isinstance(table_update, dict):
                 raise error.Error('<table-update> for table "%s" is not '
                                   'an object' % table_name, table_update)
 
-            for uuid_string, row_update in table_update.iteritems():
+            for uuid_string, row_update in six.iteritems(table_update):
                 if not ovs.ovsuuid.is_valid_string(uuid_string):
                     raise error.Error('<table-update> for table "%s" '
                                       'contains bad UUID "%s" as member '
@@ -342,12 +458,17 @@ class Idl:
                                       table_update)
                 uuid = ovs.ovsuuid.from_string(uuid_string)
 
-                if type(row_update) != dict:
+                if not isinstance(row_update, dict):
                     raise error.Error('<table-update> for table "%s" '
                                       'contains <row-update> for %s that '
                                       'is not an object'
                                       % (table_name, uuid_string))
 
+                if version == OVSDB_UPDATE2:
+                    if self.__process_update2(table, uuid, row_update):
+                        self.change_seqno += 1
+                    continue
+
                 parser = ovs.db.parser.Parser(row_update, "row-update")
                 old = parser.get_optional("old", [dict])
                 new = parser.get_optional("new", [dict])
@@ -360,6 +481,45 @@ class Idl:
                 if self.__process_update(table, uuid, old, new):
                     self.change_seqno += 1
 
+    def __process_update2(self, table, uuid, row_update):
+        row = table.rows.get(uuid)
+        changed = False
+        if "delete" in row_update:
+            if row:
+                del table.rows[uuid]
+                self.notify(ROW_DELETE, row)
+                changed = True
+            else:
+                # XXX rate-limit
+                vlog.warn("cannot delete missing row %s from table"
+                          "%s" % (uuid, table.name))
+        elif "insert" in row_update or "initial" in row_update:
+            if row:
+                vlog.warn("cannot add existing row %s from table"
+                          " %s" % (uuid, table.name))
+                del table.rows[uuid]
+            row = self.__create_row(table, uuid)
+            if "insert" in row_update:
+                row_update = row_update['insert']
+            else:
+                row_update = row_update['initial']
+            self.__add_default(table, row_update)
+            if self.__row_update(table, row, row_update):
+                changed = True
+                self.notify(ROW_CREATE, row)
+        elif "modify" in row_update:
+            if not row:
+                raise error.Error('Modify non-existing row')
+
+            self.__apply_diff(table, row, row_update['modify'])
+            self.notify(ROW_UPDATE, row,
+                        Row.from_json(self, table, uuid, row_update['modify']))
+            changed = True
+        else:
+            raise error.Error('<row-update> unknown operation',
+                              row_update)
+        return changed
+
     def __process_update(self, table, uuid, old, new):
         """Returns True if a column changed, False otherwise."""
         row = table.rows.get(uuid)
@@ -369,10 +529,11 @@ class Idl:
             if row:
                 del table.rows[uuid]
                 changed = True
+                self.notify(ROW_DELETE, row)
             else:
                 # XXX rate-limit
-                logging.warning("cannot delete missing row %s from table %s"
-                                % (uuid, table.name))
+                vlog.warn("cannot delete missing row %s from table %s"
+                          % (uuid, table.name))
         elif not old:
             # Insert row.
             if not row:
@@ -380,37 +541,77 @@ class Idl:
                 changed = True
             else:
                 # XXX rate-limit
-                logging.warning("cannot add existing row %s to table %s"
-                                % (uuid, table.name))
+                vlog.warn("cannot add existing row %s to table %s"
+                          % (uuid, table.name))
             if self.__row_update(table, row, new):
                 changed = True
+                self.notify(ROW_CREATE, row)
         else:
+            op = ROW_UPDATE
             if not row:
                 row = self.__create_row(table, uuid)
                 changed = True
+                op = ROW_CREATE
                 # XXX rate-limit
-                logging.warning("cannot modify missing row %s in table %s"
-                                % (uuid, table.name))
+                vlog.warn("cannot modify missing row %s in table %s"
+                          % (uuid, table.name))
             if self.__row_update(table, row, new):
                 changed = True
+                self.notify(op, row, Row.from_json(self, table, uuid, old))
         return changed
 
+    def __column_name(self, column):
+        if column.type.key.type == ovs.db.types.UuidType:
+            return ovs.ovsuuid.to_json(column.type.key.type.default)
+        else:
+            return column.type.key.type.default
+
+    def __add_default(self, table, row_update):
+        for column in six.itervalues(table.columns):
+            if column.name not in row_update:
+                if ((table.name not in self.readonly) or
+                        (table.name in self.readonly) and
+                        (column.name not in self.readonly[table.name])):
+                    if column.type.n_min != 0 and not column.type.is_map():
+                        row_update[column.name] = self.__column_name(column)
+
+    def __apply_diff(self, table, row, row_diff):
+        for column_name, datum_json in six.iteritems(row_diff):
+            column = table.columns.get(column_name)
+            if not column:
+                # XXX rate-limit
+                vlog.warn("unknown column %s updating table %s"
+                          % (column_name, table.name))
+                continue
+
+            try:
+                datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+            except error.Error as e:
+                # XXX rate-limit
+                vlog.warn("error parsing column %s in table %s: %s"
+                          % (column_name, table.name, e))
+                continue
+
+            datum = row._data[column_name].diff(datum)
+            if datum != row._data[column_name]:
+                row._data[column_name] = datum
+
     def __row_update(self, table, row, row_json):
         changed = False
-        for column_name, datum_json in row_json.iteritems():
+        for column_name, datum_json in six.iteritems(row_json):
             column = table.columns.get(column_name)
             if not column:
                 # XXX rate-limit
-                logging.warning("unknown column %s updating table %s"
-                                % (column_name, table.name))
+                vlog.warn("unknown column %s updating table %s"
+                          % (column_name, table.name))
                 continue
 
             try:
                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
-            except error.Error, e:
+            except error.Error as e:
                 # XXX rate-limit
-                logging.warning("error parsing column %s in table %s: %s"
-                                % (column_name, table.name, e))
+                vlog.warn("error parsing column %s in table %s: %s"
+                          % (column_name, table.name, e))
                 continue
 
             if datum != row._data[column_name]:
@@ -425,7 +626,7 @@ class Idl:
 
     def __create_row(self, table, uuid):
         data = {}
-        for column in table.columns.itervalues():
+        for column in six.itervalues(table.columns):
             data[column.name] = ovs.db.data.Datum.default(column.type)
         row = table.rows[uuid] = Row(self, table, uuid, data)
         return row
@@ -443,18 +644,22 @@ class Idl:
         if txn:
             txn._process_reply(msg)
 
+
 def _uuid_to_row(atom, base):
     if base.ref_table:
         return base.ref_table.rows.get(atom)
     else:
         return atom
 
+
 def _row_to_uuid(value):
-    if type(value) == Row:
+    if isinstance(value, Row):
         return value.uuid
     else:
         return value
 
+
+@functools.total_ordering
 class Row(object):
     """A row within an IDL.
 
@@ -523,12 +728,32 @@ class Row(object):
         # in the dictionary are all None.
         self.__dict__["_prereqs"] = {}
 
+    def __lt__(self, other):
+        if not isinstance(other, Row):
+            return NotImplemented
+        return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
+
+    def __eq__(self, other):
+        if not isinstance(other, Row):
+            return NotImplemented
+        return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
+
+    def __hash__(self):
+        return int(self.__dict__['uuid'])
+
     def __getattr__(self, column_name):
         assert self._changes is not None
 
         datum = self._changes.get(column_name)
         if datum is None:
-            datum = self._data[column_name]
+            if self._data is None:
+                raise AttributeError("%s instance has no attribute '%s'" %
+                                     (self.__class__.__name__, column_name))
+            if column_name in self._data:
+                datum = self._data[column_name]
+            else:
+                raise AttributeError("%s instance has no attribute '%s'" %
+                                     (self.__class__.__name__, column_name))
 
         return datum.to_python(_uuid_to_row)
 
@@ -536,17 +761,43 @@ class Row(object):
         assert self._changes is not None
         assert self._idl.txn
 
+        if ((self._table.name in self._idl.readonly) and
+                (column_name in self._idl.readonly[self._table.name])):
+            vlog.warn("attempting to write to readonly column %s"
+                      % column_name)
+            return
+
         column = self._table.columns[column_name]
         try:
             datum = ovs.db.data.Datum.from_python(column.type, value,
                                                   _row_to_uuid)
-        except error.Error, e:
+        except error.Error as e:
             # XXX rate-limit
-            logging.error("attempting to write bad value to column %s (%s)"
-                          % (column_name, e))
+            vlog.err("attempting to write bad value to column %s (%s)"
+                     % (column_name, e))
             return
         self._idl.txn._write(self, column, datum)
 
+    @classmethod
+    def from_json(cls, idl, table, uuid, row_json):
+        data = {}
+        for column_name, datum_json in six.iteritems(row_json):
+            column = table.columns.get(column_name)
+            if not column:
+                # XXX rate-limit
+                vlog.warn("unknown column %s in table %s"
+                          % (column_name, table.name))
+                continue
+            try:
+                datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+            except error.Error as e:
+                # XXX rate-limit
+                vlog.warn("error parsing column %s in table %s: %s"
+                          % (column_name, table.name, e))
+                continue
+            data[column_name] = datum
+        return cls(idl, table, uuid, data)
+
     def verify(self, column_name):
         """Causes the original contents of column 'column_name' in this row to
         be verified as a prerequisite to completing the transaction.  That is,
@@ -587,32 +838,109 @@ class Row(object):
         assert self._changes is not None
         if self._data is None:
             del self._idl.txn._txn_rows[self.uuid]
+        else:
+            self._idl.txn._txn_rows[self.uuid] = self
         self.__dict__["_changes"] = None
         del self._table.rows[self.uuid]
 
+    def fetch(self, column_name):
+        self._idl.txn._fetch(self, column_name)
+
+    def increment(self, column_name):
+        """Causes the transaction, when committed, to increment the value of
+        'column_name' within this row by 1.  'column_name' must have an integer
+        type.  After the transaction commits successfully, the client may
+        retrieve the final (incremented) value of 'column_name' with
+        Transaction.get_increment_new_value().
+
+        The client could accomplish something similar by reading and writing
+        and verify()ing columns.  However, increment() will never (by itself)
+        cause a transaction to fail because of a verify error.
+
+        The intended use is for incrementing the "next_cfg" column in
+        the Open_vSwitch table."""
+        self._idl.txn._increment(self, column_name)
+
+
 def _uuid_name_from_uuid(uuid):
     return "row%s" % str(uuid).replace("-", "_")
 
+
 def _where_uuid_equals(uuid):
     return [["_uuid", "==", ["uuid", str(uuid)]]]
 
+
 class _InsertedRow(object):
     def __init__(self, op_index):
         self.op_index = op_index
         self.real = None
 
+
 class Transaction(object):
+    """A transaction may modify the contents of a database by modifying the
+    values of columns, deleting rows, inserting rows, or adding checks that
+    columns in the database have not changed ("verify" operations), through
+    Row methods.
+
+    Reading and writing columns and inserting and deleting rows are all
+    straightforward.  The reasons to verify columns are less obvious.
+    Verification is the key to maintaining transactional integrity.  Because
+    OVSDB handles multiple clients, it can happen that between the time that
+    OVSDB client A reads a column and writes a new value, OVSDB client B has
+    written that column.  Client A's write should not ordinarily overwrite
+    client B's, especially if the column in question is a "map" column that
+    contains several more or less independent data items.  If client A adds a
+    "verify" operation before it writes the column, then the transaction fails
+    in case client B modifies it first.  Client A will then see the new value
+    of the column and compose a new transaction based on the new contents
+    written by client B.
+
+    When a transaction is complete, which must be before the next call to
+    Idl.run(), call Transaction.commit() or Transaction.abort().
+
+    The life-cycle of a transaction looks like this:
+
+    1. Create the transaction and record the initial sequence number:
+
+        seqno = idl.change_seqno(idl)
+        txn = Transaction(idl)
+
+    2. Modify the database with Row and Transaction methods.
+
+    3. Commit the transaction by calling Transaction.commit().  The first call
+       to this function probably returns Transaction.INCOMPLETE.  The client
+       must keep calling again along as this remains true, calling Idl.run() in
+       between to let the IDL do protocol processing.  (If the client doesn't
+       have anything else to do in the meantime, it can use
+       Transaction.commit_block() to avoid having to loop itself.)
+
+    4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
+       to change from the saved 'seqno' (it's possible that it's already
+       changed, in which case the client should not wait at all), then start
+       over from step 1.  Only a call to Idl.run() will change the return value
+       of Idl.change_seqno.  (Transaction.commit_block() calls Idl.run().)"""
+
     # Status values that Transaction.commit() can return.
-    UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
-    UNCHANGED = "unchanged"     # Transaction didn't include any changes.
-    INCOMPLETE = "incomplete"   # Commit in progress, please wait.
-    ABORTED = "aborted"         # ovsdb_idl_txn_abort() called.
-    SUCCESS = "success"         # Commit successful.
-    TRY_AGAIN = "try again"     # Commit failed because a "verify" operation
-                                # reported an inconsistency, due to a network
-                                # problem, or other transient failure.
-    NOT_LOCKED = "not locked"   # Server hasn't given us the lock yet.
-    ERROR = "error"             # Commit failed due to a hard error.
+
+    # Not yet committed or aborted.
+    UNCOMMITTED = "uncommitted"
+    # Transaction didn't include any changes.
+    UNCHANGED = "unchanged"
+    # Commit in progress, please wait.
+    INCOMPLETE = "incomplete"
+    # ovsdb_idl_txn_abort() called.
+    ABORTED = "aborted"
+    # Commit successful.
+    SUCCESS = "success"
+    # Commit failed because a "verify" operation
+    # reported an inconsistency, due to a network
+    # problem, or other transient failure.  Wait
+    # for a change, then try again.
+    TRY_AGAIN = "try again"
+    # Server hasn't given us the lock yet.
+    NOT_LOCKED = "not locked"
+    # Commit failed due to a hard error.
+    ERROR = "error"
 
     @staticmethod
     def status_to_string(status):
@@ -646,45 +974,44 @@ class Transaction(object):
         self._error = None
         self._comments = []
 
-        self._inc_table = None
+        self._inc_row = None
         self._inc_column = None
-        self._inc_where = None
 
-        self._inserted_rows = {} # Map from UUID to _InsertedRow
+        self._fetch_requests = []
+
+        self._inserted_rows = {}  # Map from UUID to _InsertedRow
 
     def add_comment(self, comment):
-        """Appens 'comment' to the comments that will be passed to the OVSDB
+        """Appends 'comment' to the comments that will be passed to the OVSDB
         server when this transaction is committed.  (The comment will be
         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
         relatively human-readable form.)"""
         self._comments.append(comment)
 
-    def increment(self, table, column, where):
-        assert not self._inc_table
-        self._inc_table = table
-        self._inc_column = column
-        self._inc_where = where
-
     def wait(self, poller):
+        """Causes poll_block() to wake up if this transaction has completed
+        committing."""
         if self._status not in (Transaction.UNCOMMITTED,
                                 Transaction.INCOMPLETE):
             poller.immediate_wake()
 
     def _substitute_uuids(self, json):
-        if type(json) in (list, tuple):
+        if isinstance(json, (list, tuple)):
             if (len(json) == 2
-                and json[0] == 'uuid'
-                and ovs.ovsuuid.is_valid_string(json[1])):
+                    and json[0] == 'uuid'
+                    and ovs.ovsuuid.is_valid_string(json[1])):
                 uuid = ovs.ovsuuid.from_string(json[1])
                 row = self._txn_rows.get(uuid, None)
                 if row and row._data is None:
                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
+            else:
+                return [self._substitute_uuids(elem) for elem in json]
         return json
 
     def __disassemble(self):
         self.idl.txn = None
 
-        for row in self._txn_rows.itervalues():
+        for row in six.itervalues(self._txn_rows):
             if row._changes is None:
                 row._table.rows[row.uuid] = row
             elif row._data is None:
@@ -694,23 +1021,63 @@ class Transaction(object):
         self._txn_rows = {}
 
     def commit(self):
-        """Attempts to commit this transaction and returns the status of the
-        commit operation, one of the constants declared as class attributes.
-        If the return value is Transaction.INCOMPLETE, then the transaction is
-        not yet complete and the caller should try calling again later, after
-        calling Idl.run() to run the Idl.
+        """Attempts to commit 'txn'.  Returns the status of the commit
+        operation, one of the following constants:
+
+          Transaction.INCOMPLETE:
+
+              The transaction is in progress, but not yet complete.  The caller
+              should call again later, after calling Idl.run() to let the
+              IDL do OVSDB protocol processing.
+
+          Transaction.UNCHANGED:
+
+              The transaction is complete.  (It didn't actually change the
+              database, so the IDL didn't send any request to the database
+              server.)
+
+          Transaction.ABORTED:
+
+              The caller previously called Transaction.abort().
+
+          Transaction.SUCCESS:
+
+              The transaction was successful.  The update made by the
+              transaction (and possibly other changes made by other database
+              clients) should already be visible in the IDL.
+
+          Transaction.TRY_AGAIN:
+
+              The transaction failed for some transient reason, e.g. because a
+              "verify" operation reported an inconsistency or due to a network
+              problem.  The caller should wait for a change to the database,
+              then compose a new transaction, and commit the new transaction.
+
+              Use Idl.change_seqno to wait for a change in the database.  It is
+              important to use its value *before* the initial call to
+              Transaction.commit() as the baseline for this purpose, because
+              the change that one should wait for can happen after the initial
+              call but before the call that returns Transaction.TRY_AGAIN, and
+              using some other baseline value in that situation could cause an
+              indefinite wait if the database rarely changes.
+
+          Transaction.NOT_LOCKED:
+
+              The transaction failed because the IDL has been configured to
+              require a database lock (with Idl.set_lock()) but didn't
+              get it yet or has already lost it.
 
         Committing a transaction rolls back all of the changes that it made to
-        the Idl's copy of the database.  If the transaction commits
+        the IDL's copy of the database.  If the transaction commits
         successfully, then the database server will send an update and, thus,
-        the Idl will be updated with the committed changes."""
+        the IDL will be updated with the committed changes."""
         # The status can only change if we're the active transaction.
         # (Otherwise, our status will change only in Idl.run().)
         if self != self.idl.txn:
             return self._status
 
         # If we need a lock but don't have it, give up quickly.
-        if self.idl.lock_name and not self.idl.has_lock():
+        if self.idl.lock_name and not self.idl.has_lock:
             self._status = Transaction.NOT_LOCKED
             self.__disassemble()
             return self._status
@@ -723,7 +1090,7 @@ class Transaction(object):
                                "lock": self.idl.lock_name})
 
         # Add prerequisites and declarations of new rows.
-        for row in self._txn_rows.itervalues():
+        for row in six.itervalues(self._txn_rows):
             if row._prereqs:
                 rows = {}
                 columns = []
@@ -740,7 +1107,7 @@ class Transaction(object):
 
         # Add updates.
         any_updates = False
-        for row in self._txn_rows.itervalues():
+        for row in six.itervalues(self._txn_rows):
             if row._changes is None:
                 if row._table.is_root:
                     operations.append({"op": "delete",
@@ -766,32 +1133,45 @@ class Transaction(object):
                 row_json = {}
                 op["row"] = row_json
 
-                for column_name, datum in row._changes.iteritems():
+                for column_name, datum in six.iteritems(row._changes):
                     if row._data is not None or not datum.is_default():
-                        row_json[column_name] = self._substitute_uuids(datum.to_json())
+                        row_json[column_name] = (
+                            self._substitute_uuids(datum.to_json()))
 
                         # If anything really changed, consider it an update.
                         # We can't suppress not-really-changed values earlier
                         # or transactions would become nonatomic (see the big
                         # comment inside Transaction._write()).
                         if (not any_updates and row._data is not None and
-                            row._data[column_name] != datum):
+                                row._data[column_name] != datum):
                             any_updates = True
 
                 if row._data is None or row_json:
                     operations.append(op)
 
+        if self._fetch_requests:
+            for fetch in self._fetch_requests:
+                fetch["index"] = len(operations) - 1
+                operations.append({"op": "select",
+                                   "table": fetch["row"]._table.name,
+                                   "where": self._substitute_uuids(
+                                       _where_uuid_equals(fetch["row"].uuid)),
+                                   "columns": [fetch["column_name"]]})
+            any_updates = True
+
         # Add increment.
-        if self._inc_table and any_updates:
+        if self._inc_row and any_updates:
             self._inc_index = len(operations) - 1
 
             operations.append({"op": "mutate",
-                               "table": self._inc_table,
-                               "where": self._substitute_uuids(self._inc_where),
+                               "table": self._inc_row._table.name,
+                               "where": self._substitute_uuids(
+                                   _where_uuid_equals(self._inc_row.uuid)),
                                "mutations": [[self._inc_column, "+=", 1]]})
             operations.append({"op": "select",
-                               "table": self._inc_table,
-                               "where": self._substitute_uuids(self._inc_where),
+                               "table": self._inc_row._table.name,
+                               "where": self._substitute_uuids(
+                                   _where_uuid_equals(self._inc_row.uuid)),
                                "columns": [self._inc_column]})
 
         # Add comment.
@@ -818,6 +1198,12 @@ class Transaction(object):
         return self._status
 
     def commit_block(self):
+        """Attempts to commit this transaction, blocking until the commit
+        either succeeds or fails.  Returns the final commit status, which may
+        be any Transaction.* value other than Transaction.INCOMPLETE.
+
+        This function calls Idl.run() on this transaction'ss IDL, so it may
+        cause Idl.change_seqno to change."""
         while True:
             status = self.commit()
             if status != Transaction.INCOMPLETE:
@@ -831,6 +1217,9 @@ class Transaction(object):
             poller.block()
 
     def get_increment_new_value(self):
+        """Returns the final (incremented) value of the column in this
+        transaction that was set to be incremented by Row.increment.  This
+        transaction must have committed successfully."""
         assert self._status == Transaction.SUCCESS
         return self._inc_new_value
 
@@ -875,6 +1264,14 @@ class Transaction(object):
             return inserted_row.real
         return None
 
+    def _increment(self, row, column):
+        assert not self._inc_row
+        self._inc_row = row
+        self._inc_column = column
+
+    def _fetch(self, row, column_name):
+        self._fetch_requests.append({"row": row, "column_name": column_name})
+
     def _write(self, row, column, datum):
         assert row._changes is not None
 
@@ -892,7 +1289,8 @@ class Transaction(object):
         # transaction only does writes of existing values, without making any
         # real changes, we will drop the whole transaction later in
         # ovsdb_idl_txn_commit().)
-        if not column.alert and row._data.get(column.name) == datum:
+        if (not column.alert and row._data and
+                row._data.get(column.name) == datum):
             new_value = row._changes.get(column.name)
             if new_value is None or new_value == datum:
                 return
@@ -921,9 +1319,9 @@ class Transaction(object):
     def _process_reply(self, msg):
         if msg.type == ovs.jsonrpc.Message.T_ERROR:
             self._status = Transaction.ERROR
-        elif type(msg.result) not in (list, tuple):
+        elif not isinstance(msg.result, (list, tuple)):
             # XXX rate-limit
-            logging.warning('reply to "transact" is not JSON array')
+            vlog.warn('reply to "transact" is not JSON array')
         else:
             hard_errors = False
             soft_errors = False
@@ -936,7 +1334,7 @@ class Transaction(object):
                     # prior operation failed, so make sure that we know about
                     # it.
                     soft_errors = True
-                elif type(op) == dict:
+                elif isinstance(op, dict):
                     error = op.get("error")
                     if error is not None:
                         if error == "timed out":
@@ -952,14 +1350,18 @@ class Transaction(object):
                     hard_errors = True
                     self.__set_error_json(op)
                     # XXX rate-limit
-                    logging.warning("operation reply is not JSON null or "
-                                    "object")
+                    vlog.warn("operation reply is not JSON null or object")
 
             if not soft_errors and not hard_errors and not lock_errors:
-                if self._inc_table and not self.__process_inc_reply(ops):
+                if self._inc_row and not self.__process_inc_reply(ops):
                     hard_errors = True
+                if self._fetch_requests:
+                    if self.__process_fetch_reply(ops):
+                        self.idl.change_seqno += 1
+                    else:
+                        hard_errors = True
 
-                for insert in self._inserted_rows.itervalues():
+                for insert in six.itervalues(self._inserted_rows):
                     if not self.__process_insert_reply(insert, ops):
                         hard_errors = True
 
@@ -976,33 +1378,64 @@ class Transaction(object):
     def __check_json_type(json, types, name):
         if not json:
             # XXX rate-limit
-            logging.warning("%s is missing" % name)
+            vlog.warn("%s is missing" % name)
             return False
-        elif type(json) not in types:
+        elif not isinstance(json, tuple(types)):
             # XXX rate-limit
-            logging.warning("%s has unexpected type %s" % (name, type(json)))
+            vlog.warn("%s has unexpected type %s" % (name, type(json)))
             return False
         else:
             return True
 
+    def __process_fetch_reply(self, ops):
+        update = False
+        for fetch_request in self._fetch_requests:
+            row = fetch_request["row"]
+            column_name = fetch_request["column_name"]
+            index = fetch_request["index"]
+            table = row._table
+
+            select = ops[index]
+            fetched_rows = select.get("rows")
+            if not Transaction.__check_json_type(fetched_rows, (list, tuple),
+                                                 '"select" reply "rows"'):
+                return False
+            if len(fetched_rows) != 1:
+                # XXX rate-limit
+                vlog.warn('"select" reply "rows" has %d elements '
+                          'instead of 1' % len(fetched_rows))
+                continue
+            fetched_row = fetched_rows[0]
+            if not Transaction.__check_json_type(fetched_row, (dict,),
+                                                 '"select" reply row'):
+                continue
+
+            column = table.columns.get(column_name)
+            datum_json = fetched_row.get(column_name)
+            datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+
+            row._data[column_name] = datum
+            update = True
+
+        return update
+
     def __process_inc_reply(self, ops):
         if self._inc_index + 2 > len(ops):
             # XXX rate-limit
-            logging.warning("reply does not contain enough operations for "
-                            "increment (has %d, needs %d)" %
-                            (len(ops), self._inc_index + 2))
+            vlog.warn("reply does not contain enough operations for "
+                      "increment (has %d, needs %d)" %
+                      (len(ops), self._inc_index + 2))
 
         # We know that this is a JSON object because the loop in
         # __process_reply() already checked.
         mutate = ops[self._inc_index]
         count = mutate.get("count")
-        if not Transaction.__check_json_type(count, (int, long),
+        if not Transaction.__check_json_type(count, six.integer_types,
                                              '"mutate" reply "count"'):
             return False
         if count != 1:
             # XXX rate-limit
-            logging.warning('"mutate" reply "count" is %d instead of 1'
-                            % count)
+            vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
             return False
 
         select = ops[self._inc_index + 1]
@@ -1012,15 +1445,15 @@ class Transaction(object):
             return False
         if len(rows) != 1:
             # XXX rate-limit
-            logging.warning('"select" reply "rows" has %d elements '
-                            'instead of 1' % len(rows))
+            vlog.warn('"select" reply "rows" has %d elements '
+                      'instead of 1' % len(rows))
             return False
         row = rows[0]
         if not Transaction.__check_json_type(row, (dict,),
                                              '"select" reply row'):
             return False
         column = row.get(self._inc_column)
-        if not Transaction.__check_json_type(column, (int, long),
+        if not Transaction.__check_json_type(column, six.integer_types,
                                              '"select" reply inc column'):
             return False
         self._inc_new_value = column
@@ -1029,9 +1462,9 @@ class Transaction(object):
     def __process_insert_reply(self, insert, ops):
         if insert.op_index >= len(ops):
             # XXX rate-limit
-            logging.warning("reply does not contain enough operations "
-                            "for insert (has %d, needs %d)"
-                            % (len(ops), insert.op_index))
+            vlog.warn("reply does not contain enough operations "
+                      "for insert (has %d, needs %d)"
+                      % (len(ops), insert.op_index))
             return False
 
         # We know that this is a JSON object because the loop in
@@ -1046,8 +1479,110 @@ class Transaction(object):
             uuid_ = ovs.ovsuuid.from_json(json_uuid)
         except error.Error:
             # XXX rate-limit
-            logging.warning('"insert" reply "uuid" is not a JSON UUID')
+            vlog.warn('"insert" reply "uuid" is not a JSON UUID')
             return False
 
         insert.real = uuid_
         return True
+
+
+class SchemaHelper(object):
+    """IDL Schema helper.
+
+    This class encapsulates the logic required to generate schemas suitable
+    for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
+    they are interested in using register_columns().  When finished, the
+    get_idl_schema() function may be called.
+
+    The location on disk of the schema used may be found in the
+    'schema_location' variable."""
+
+    def __init__(self, location=None, schema_json=None):
+        """Creates a new Schema object.
+
+        'location' file path to ovs schema. None means default location
+        'schema_json' schema in json preresentation in memory
+        """
+
+        if location and schema_json:
+            raise ValueError("both location and schema_json can't be "
+                             "specified. it's ambiguous.")
+        if schema_json is None:
+            if location is None:
+                location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
+            schema_json = ovs.json.from_file(location)
+
+        self.schema_json = schema_json
+        self._tables = {}
+        self._readonly = {}
+        self._all = False
+
+    def register_columns(self, table, columns, readonly=[]):
+        """Registers interest in the given 'columns' of 'table'.  Future calls
+        to get_idl_schema() will include 'table':column for each column in
+        'columns'. This function automatically avoids adding duplicate entries
+        to the schema.
+        A subset of 'columns' can be specified as 'readonly'. The readonly
+        columns are not replicated but can be fetched on-demand by the user
+        with Row.fetch().
+
+        'table' must be a string.
+        'columns' must be a list of strings.
+        'readonly' must be a list of strings.
+        """
+
+        assert isinstance(table, six.string_types)
+        assert isinstance(columns, list)
+
+        columns = set(columns) | self._tables.get(table, set())
+        self._tables[table] = columns
+        self._readonly[table] = readonly
+
+    def register_table(self, table):
+        """Registers interest in the given all columns of 'table'. Future calls
+        to get_idl_schema() will include all columns of 'table'.
+
+        'table' must be a string
+        """
+        assert isinstance(table, six.string_types)
+        self._tables[table] = set()  # empty set means all columns in the table
+
+    def register_all(self):
+        """Registers interest in every column of every table."""
+        self._all = True
+
+    def get_idl_schema(self):
+        """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
+        object based on columns registered using the register_columns()
+        function."""
+
+        schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
+        self.schema_json = None
+
+        if not self._all:
+            schema_tables = {}
+            for table, columns in six.iteritems(self._tables):
+                schema_tables[table] = (
+                    self._keep_table_columns(schema, table, columns))
+
+            schema.tables = schema_tables
+        schema.readonly = self._readonly
+        return schema
+
+    def _keep_table_columns(self, schema, table_name, columns):
+        assert table_name in schema.tables
+        table = schema.tables[table_name]
+
+        if not columns:
+            # empty set means all columns in the table
+            return table
+
+        new_columns = {}
+        for column_name in columns:
+            assert isinstance(column_name, six.string_types)
+            assert column_name in table.columns
+
+            new_columns[column_name] = table.columns[column_name]
+
+        table.columns = new_columns
+        return table