lib: add monitor_cond_change API to C IDL lib
[cascardo/ovs.git] / python / ovs / db / idl.py
index 6150a02..437e9b0 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2009, 2010, 2011 Nicira Networks
+# Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import functools
 import uuid
 
+import six
+
 import ovs.jsonrpc
 import ovs.db.parser
 import ovs.db.schema
@@ -26,8 +29,15 @@ vlog = ovs.vlog.Vlog("idl")
 
 __pychecker__ = 'no-classattr no-objattrs'
 
+ROW_CREATE = "create"
+ROW_UPDATE = "update"
+ROW_DELETE = "delete"
+
+OVSDB_UPDATE = 0
+OVSDB_UPDATE2 = 1
 
-class Idl:
+
+class Idl(object):
     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
 
     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
@@ -52,7 +62,13 @@ class Idl:
       'rows' map values.  Refer to Row for more details.
 
     - 'change_seqno': A number that represents the IDL's state.  When the IDL
-      is updated (by Idl.run()), its value changes.
+      is updated (by Idl.run()), its value changes.  The sequence number can
+      occasionally change even if the database does not.  This happens if the
+      connection to the database drops and reconnects, which causes the
+      database contents to be reloaded even if they didn't change.  (It could
+      also happen if the database server sends out a "change" that reflects
+      what the IDL already thought was in the database.  The database server is
+      not supposed to do that, but bugs could in theory cause it to do so.)
 
     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
       if no lock is configured.
@@ -73,6 +89,10 @@ class Idl:
       currently being constructed, if there is one, or None otherwise.
 """
 
+    IDL_S_INITIAL = 0
+    IDL_S_MONITOR_REQUESTED = 1
+    IDL_S_MONITOR_COND_REQUESTED = 2
+
     def __init__(self, remote, schema):
         """Creates and returns a connection to the database named 'db_name' on
         'remote', which should be in a form acceptable to
@@ -88,14 +108,23 @@ class Idl:
         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
         useful for columns that the IDL's client will write but not read.
 
+        As a convenience to users, 'schema' may also be an instance of the
+        SchemaHelper class.
+
         The IDL uses and modifies 'schema' directly."""
 
+        assert isinstance(schema, SchemaHelper)
+        schema = schema.get_idl_schema()
+
         self.tables = schema.tables
+        self.readonly = schema.readonly
         self._db = schema
         self._session = ovs.jsonrpc.Session.open(remote)
         self._monitor_request_id = None
         self._last_seqno = None
         self.change_seqno = 0
+        self.uuid = uuid.uuid1()
+        self.state = self.IDL_S_INITIAL
 
         # Database locking.
         self.lock_name = None          # Name of lock we need, None if none.
@@ -107,13 +136,15 @@ class Idl:
         self.txn = None
         self._outstanding_txns = {}
 
-        for table in schema.tables.itervalues():
-            for column in table.columns.itervalues():
+        for table in six.itervalues(schema.tables):
+            for column in six.itervalues(table.columns):
                 if not hasattr(column, 'alert'):
                     column.alert = True
             table.need_table = False
             table.rows = {}
             table.idl = self
+            table.condition = []
+            table.cond_changed = False
 
     def close(self):
         """Closes the connection to the database.  The IDL will no longer
@@ -140,6 +171,8 @@ class Idl:
         for changes in self.change_seqno."""
         assert not self.txn
         initial_change_seqno = self.change_seqno
+
+        self.send_cond_change()
         self._session.run()
         i = 0
         while i < 50:
@@ -160,11 +193,15 @@ class Idl:
             if msg is None:
                 break
             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
-                and msg.method == "update"
-                and len(msg.params) == 2
-                and msg.params[0] == None):
+                    and msg.method == "update2"
+                    and len(msg.params) == 2):
                 # Database contents changed.
-                self.__parse_update(msg.params[1])
+                self.__parse_update(msg.params[1], OVSDB_UPDATE2)
+            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+                    and msg.method == "update"
+                    and len(msg.params) == 2):
+                # Database contents changed.
+                self.__parse_update(msg.params[1], OVSDB_UPDATE)
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._monitor_request_id is not None
                   and self._monitor_request_id == msg.id):
@@ -173,10 +210,15 @@ class Idl:
                     self.change_seqno += 1
                     self._monitor_request_id = None
                     self.__clear()
-                    self.__parse_update(msg.result)
-                except error.Error, e:
+                    if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+                        self.__parse_update(msg.result, OVSDB_UPDATE2)
+                    else:
+                        assert self.state == self.IDL_S_MONITOR_REQUESTED
+                        self.__parse_update(msg.result, OVSDB_UPDATE)
+
+                except error.Error as e:
                     vlog.err("%s: parse error in received schema: %s"
-                              % (self._session.get_name(), e))
+                             % (self._session.get_name(), e))
                     self.__error()
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._lock_request_id is not None
@@ -194,6 +236,11 @@ class Idl:
             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
                 # Reply to our echo request.  Ignore it.
                 pass
+            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+                  self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+                  self._monitor_request_id == msg.id):
+                if msg.error == "unknown method":
+                    self.__send_monitor_request()
             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
                                ovs.jsonrpc.Message.T_REPLY)
                   and self.__txn_process_reply(msg)):
@@ -208,6 +255,33 @@ class Idl:
 
         return initial_change_seqno != self.change_seqno
 
+    def send_cond_change(self):
+        if not self._session.is_connected():
+            return
+
+        for table in six.itervalues(self.tables):
+            if table.cond_changed:
+                self.__send_cond_change(table, table.condition)
+                table.cond_changed = False
+
+    def cond_change(self, table_name, add_cmd, cond):
+        """Change conditions for this IDL session. If session is not already
+        connected, add condtion to table and submit it on send_monitor_request.
+        Otherwise  send monitor_cond_change method with the requested
+        changes."""
+
+        table = self.tables.get(table_name)
+        if not table:
+            raise error.Error('Unknown table "%s"' % table_name)
+
+        if add_cmd:
+            table.condition += cond
+        else:
+            for c in cond:
+                table.condition.remove(c)
+
+        table.cond_changed = True
+
     def wait(self, poller):
         """Arranges for poller.block() to wake up when self.run() has something
         to do or when activity occurs on a transaction on 'self'."""
@@ -252,10 +326,29 @@ class Idl:
             self.lock_name = lock_name
             self.__send_lock_request()
 
+    def notify(self, event, row, updates=None):
+        """Hook for implementing create/update/delete notifications
+
+        :param event:   The event that was triggered
+        :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE
+        :param row:     The row as it is after the operation has occured
+        :type row:      Row
+        :param updates: For updates, row with only updated columns
+        :type updates:  Row
+        """
+
+    def __send_cond_change(self, table, cond):
+        monitor_cond_change = {table.name: [{"where": cond}]}
+        old_uuid = str(self.uuid)
+        self.uuid = uuid.uuid1()
+        params = [old_uuid, str(self.uuid), monitor_cond_change]
+        msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
+        self._session.send(msg)
+
     def __clear(self):
         changed = False
 
-        for table in self.tables.itervalues():
+        for table in six.itervalues(self.tables):
             if table.rows:
                 changed = True
                 table.rows = {}
@@ -294,52 +387,70 @@ class Idl:
 
     def __parse_lock_reply(self, result):
         self._lock_request_id = None
-        got_lock = type(result) == dict and result.get("locked") is True
+        got_lock = isinstance(result, dict) and result.get("locked") is True
         self.__update_has_lock(got_lock)
         if not got_lock:
             self.is_lock_contended = True
 
     def __parse_lock_notify(self, params, new_has_lock):
         if (self.lock_name is not None
-            and type(params) in (list, tuple)
+            and isinstance(params, (list, tuple))
             and params
             and params[0] == self.lock_name):
-            self.__update_has_lock(self, new_has_lock)
+            self.__update_has_lock(new_has_lock)
             if not new_has_lock:
                 self.is_lock_contended = True
 
     def __send_monitor_request(self):
+        if self.state == self.IDL_S_INITIAL:
+            self.state = self.IDL_S_MONITOR_COND_REQUESTED
+            method = "monitor_cond"
+        else:
+            self.state = self.IDL_S_MONITOR_REQUESTED
+            method = "monitor"
+
         monitor_requests = {}
-        for table in self.tables.itervalues():
-            monitor_requests[table.name] = {"columns": table.columns.keys()}
+        for table in six.itervalues(self.tables):
+            columns = []
+            for column in six.iterkeys(table.columns):
+                if ((table.name not in self.readonly) or
+                        (table.name in self.readonly) and
+                        (column not in self.readonly[table.name])):
+                    columns.append(column)
+            monitor_requests[table.name] = {"columns": columns}
+            if method == "monitor_cond" and table.cond_changed and \
+                   table.condition:
+                monitor_requests[table.name]["where"] = table.condition
+                table.cond_change = False
+
         msg = ovs.jsonrpc.Message.create_request(
-            "monitor", [self._db.name, None, monitor_requests])
+            method, [self._db.name, str(self.uuid), monitor_requests])
         self._monitor_request_id = msg.id
         self._session.send(msg)
 
-    def __parse_update(self, update):
+    def __parse_update(self, update, version):
         try:
-            self.__do_parse_update(update)
-        except error.Error, e:
+            self.__do_parse_update(update, version)
+        except error.Error as e:
             vlog.err("%s: error parsing update: %s"
                      % (self._session.get_name(), e))
 
-    def __do_parse_update(self, table_updates):
-        if type(table_updates) != dict:
+    def __do_parse_update(self, table_updates, version):
+        if not isinstance(table_updates, dict):
             raise error.Error("<table-updates> is not an object",
                               table_updates)
 
-        for table_name, table_update in table_updates.iteritems():
+        for table_name, table_update in six.iteritems(table_updates):
             table = self.tables.get(table_name)
             if not table:
                 raise error.Error('<table-updates> includes unknown '
                                   'table "%s"' % table_name)
 
-            if type(table_update) != dict:
+            if not isinstance(table_update, dict):
                 raise error.Error('<table-update> for table "%s" is not '
                                   'an object' % table_name, table_update)
 
-            for uuid_string, row_update in table_update.iteritems():
+            for uuid_string, row_update in six.iteritems(table_update):
                 if not ovs.ovsuuid.is_valid_string(uuid_string):
                     raise error.Error('<table-update> for table "%s" '
                                       'contains bad UUID "%s" as member '
@@ -347,12 +458,17 @@ class Idl:
                                       table_update)
                 uuid = ovs.ovsuuid.from_string(uuid_string)
 
-                if type(row_update) != dict:
+                if not isinstance(row_update, dict):
                     raise error.Error('<table-update> for table "%s" '
                                       'contains <row-update> for %s that '
                                       'is not an object'
                                       % (table_name, uuid_string))
 
+                if version == OVSDB_UPDATE2:
+                    if self.__process_update2(table, uuid, row_update):
+                        self.change_seqno += 1
+                    continue
+
                 parser = ovs.db.parser.Parser(row_update, "row-update")
                 old = parser.get_optional("old", [dict])
                 new = parser.get_optional("new", [dict])
@@ -365,6 +481,45 @@ class Idl:
                 if self.__process_update(table, uuid, old, new):
                     self.change_seqno += 1
 
+    def __process_update2(self, table, uuid, row_update):
+        row = table.rows.get(uuid)
+        changed = False
+        if "delete" in row_update:
+            if row:
+                del table.rows[uuid]
+                self.notify(ROW_DELETE, row)
+                changed = True
+            else:
+                # XXX rate-limit
+                vlog.warn("cannot delete missing row %s from table"
+                          "%s" % (uuid, table.name))
+        elif "insert" in row_update or "initial" in row_update:
+            if row:
+                vlog.warn("cannot add existing row %s from table"
+                          " %s" % (uuid, table.name))
+                del table.rows[uuid]
+            row = self.__create_row(table, uuid)
+            if "insert" in row_update:
+                row_update = row_update['insert']
+            else:
+                row_update = row_update['initial']
+            self.__add_default(table, row_update)
+            if self.__row_update(table, row, row_update):
+                changed = True
+                self.notify(ROW_CREATE, row)
+        elif "modify" in row_update:
+            if not row:
+                raise error.Error('Modify non-existing row')
+
+            self.__apply_diff(table, row, row_update['modify'])
+            self.notify(ROW_UPDATE, row,
+                        Row.from_json(self, table, uuid, row_update['modify']))
+            changed = True
+        else:
+            raise error.Error('<row-update> unknown operation',
+                              row_update)
+        return changed
+
     def __process_update(self, table, uuid, old, new):
         """Returns True if a column changed, False otherwise."""
         row = table.rows.get(uuid)
@@ -374,6 +529,7 @@ class Idl:
             if row:
                 del table.rows[uuid]
                 changed = True
+                self.notify(ROW_DELETE, row)
             else:
                 # XXX rate-limit
                 vlog.warn("cannot delete missing row %s from table %s"
@@ -389,20 +545,60 @@ class Idl:
                           % (uuid, table.name))
             if self.__row_update(table, row, new):
                 changed = True
+                self.notify(ROW_CREATE, row)
         else:
+            op = ROW_UPDATE
             if not row:
                 row = self.__create_row(table, uuid)
                 changed = True
+                op = ROW_CREATE
                 # XXX rate-limit
                 vlog.warn("cannot modify missing row %s in table %s"
                           % (uuid, table.name))
             if self.__row_update(table, row, new):
                 changed = True
+                self.notify(op, row, Row.from_json(self, table, uuid, old))
         return changed
 
+    def __column_name(self, column):
+        if column.type.key.type == ovs.db.types.UuidType:
+            return ovs.ovsuuid.to_json(column.type.key.type.default)
+        else:
+            return column.type.key.type.default
+
+    def __add_default(self, table, row_update):
+        for column in six.itervalues(table.columns):
+            if column.name not in row_update:
+                if ((table.name not in self.readonly) or
+                        (table.name in self.readonly) and
+                        (column.name not in self.readonly[table.name])):
+                    if column.type.n_min != 0 and not column.type.is_map():
+                        row_update[column.name] = self.__column_name(column)
+
+    def __apply_diff(self, table, row, row_diff):
+        for column_name, datum_json in six.iteritems(row_diff):
+            column = table.columns.get(column_name)
+            if not column:
+                # XXX rate-limit
+                vlog.warn("unknown column %s updating table %s"
+                          % (column_name, table.name))
+                continue
+
+            try:
+                datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+            except error.Error as e:
+                # XXX rate-limit
+                vlog.warn("error parsing column %s in table %s: %s"
+                          % (column_name, table.name, e))
+                continue
+
+            datum = row._data[column_name].diff(datum)
+            if datum != row._data[column_name]:
+                row._data[column_name] = datum
+
     def __row_update(self, table, row, row_json):
         changed = False
-        for column_name, datum_json in row_json.iteritems():
+        for column_name, datum_json in six.iteritems(row_json):
             column = table.columns.get(column_name)
             if not column:
                 # XXX rate-limit
@@ -412,7 +608,7 @@ class Idl:
 
             try:
                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
-            except error.Error, e:
+            except error.Error as e:
                 # XXX rate-limit
                 vlog.warn("error parsing column %s in table %s: %s"
                           % (column_name, table.name, e))
@@ -430,7 +626,7 @@ class Idl:
 
     def __create_row(self, table, uuid):
         data = {}
-        for column in table.columns.itervalues():
+        for column in six.itervalues(table.columns):
             data[column.name] = ovs.db.data.Datum.default(column.type)
         row = table.rows[uuid] = Row(self, table, uuid, data)
         return row
@@ -441,7 +637,7 @@ class Idl:
     def __txn_abort_all(self):
         while self._outstanding_txns:
             txn = self._outstanding_txns.popitem()[1]
-            txn._status = Transaction.AGAIN_WAIT
+            txn._status = Transaction.TRY_AGAIN
 
     def __txn_process_reply(self, msg):
         txn = self._outstanding_txns.pop(msg.id, None)
@@ -457,12 +653,13 @@ def _uuid_to_row(atom, base):
 
 
 def _row_to_uuid(value):
-    if type(value) == Row:
+    if isinstance(value, Row):
         return value.uuid
     else:
         return value
 
 
+@functools.total_ordering
 class Row(object):
     """A row within an IDL.
 
@@ -531,12 +728,32 @@ class Row(object):
         # in the dictionary are all None.
         self.__dict__["_prereqs"] = {}
 
+    def __lt__(self, other):
+        if not isinstance(other, Row):
+            return NotImplemented
+        return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
+
+    def __eq__(self, other):
+        if not isinstance(other, Row):
+            return NotImplemented
+        return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
+
+    def __hash__(self):
+        return int(self.__dict__['uuid'])
+
     def __getattr__(self, column_name):
         assert self._changes is not None
 
         datum = self._changes.get(column_name)
         if datum is None:
-            datum = self._data[column_name]
+            if self._data is None:
+                raise AttributeError("%s instance has no attribute '%s'" %
+                                     (self.__class__.__name__, column_name))
+            if column_name in self._data:
+                datum = self._data[column_name]
+            else:
+                raise AttributeError("%s instance has no attribute '%s'" %
+                                     (self.__class__.__name__, column_name))
 
         return datum.to_python(_uuid_to_row)
 
@@ -544,26 +761,50 @@ class Row(object):
         assert self._changes is not None
         assert self._idl.txn
 
+        if ((self._table.name in self._idl.readonly) and
+                (column_name in self._idl.readonly[self._table.name])):
+            vlog.warn("attempting to write to readonly column %s"
+                      % column_name)
+            return
+
         column = self._table.columns[column_name]
         try:
             datum = ovs.db.data.Datum.from_python(column.type, value,
                                                   _row_to_uuid)
-        except error.Error, e:
+        except error.Error as e:
             # XXX rate-limit
             vlog.err("attempting to write bad value to column %s (%s)"
                      % (column_name, e))
             return
         self._idl.txn._write(self, column, datum)
 
+    @classmethod
+    def from_json(cls, idl, table, uuid, row_json):
+        data = {}
+        for column_name, datum_json in six.iteritems(row_json):
+            column = table.columns.get(column_name)
+            if not column:
+                # XXX rate-limit
+                vlog.warn("unknown column %s in table %s"
+                          % (column_name, table.name))
+                continue
+            try:
+                datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+            except error.Error as e:
+                # XXX rate-limit
+                vlog.warn("error parsing column %s in table %s: %s"
+                          % (column_name, table.name, e))
+                continue
+            data[column_name] = datum
+        return cls(idl, table, uuid, data)
+
     def verify(self, column_name):
         """Causes the original contents of column 'column_name' in this row to
         be verified as a prerequisite to completing the transaction.  That is,
         if 'column_name' changed in this row (or if this row was deleted)
         between the time that the IDL originally read its contents and the time
         that the transaction commits, then the transaction aborts and
-        Transaction.commit() returns Transaction.AGAIN_WAIT or
-        Transaction.AGAIN_NOW (depending on whether the database change has
-        already been received).
+        Transaction.commit() returns Transaction.TRY_AGAIN.
 
         The intention is that, to ensure that no transaction commits based on
         dirty reads, an application should call Row.verify() on each data item
@@ -597,9 +838,29 @@ class Row(object):
         assert self._changes is not None
         if self._data is None:
             del self._idl.txn._txn_rows[self.uuid]
+        else:
+            self._idl.txn._txn_rows[self.uuid] = self
         self.__dict__["_changes"] = None
         del self._table.rows[self.uuid]
 
+    def fetch(self, column_name):
+        self._idl.txn._fetch(self, column_name)
+
+    def increment(self, column_name):
+        """Causes the transaction, when committed, to increment the value of
+        'column_name' within this row by 1.  'column_name' must have an integer
+        type.  After the transaction commits successfully, the client may
+        retrieve the final (incremented) value of 'column_name' with
+        Transaction.get_increment_new_value().
+
+        The client could accomplish something similar by reading and writing
+        and verify()ing columns.  However, increment() will never (by itself)
+        cause a transaction to fail because of a verify error.
+
+        The intended use is for incrementing the "next_cfg" column in
+        the Open_vSwitch table."""
+        self._idl.txn._increment(self, column_name)
+
 
 def _uuid_name_from_uuid(uuid):
     return "row%s" % str(uuid).replace("-", "_")
@@ -616,20 +877,70 @@ class _InsertedRow(object):
 
 
 class Transaction(object):
+    """A transaction may modify the contents of a database by modifying the
+    values of columns, deleting rows, inserting rows, or adding checks that
+    columns in the database have not changed ("verify" operations), through
+    Row methods.
+
+    Reading and writing columns and inserting and deleting rows are all
+    straightforward.  The reasons to verify columns are less obvious.
+    Verification is the key to maintaining transactional integrity.  Because
+    OVSDB handles multiple clients, it can happen that between the time that
+    OVSDB client A reads a column and writes a new value, OVSDB client B has
+    written that column.  Client A's write should not ordinarily overwrite
+    client B's, especially if the column in question is a "map" column that
+    contains several more or less independent data items.  If client A adds a
+    "verify" operation before it writes the column, then the transaction fails
+    in case client B modifies it first.  Client A will then see the new value
+    of the column and compose a new transaction based on the new contents
+    written by client B.
+
+    When a transaction is complete, which must be before the next call to
+    Idl.run(), call Transaction.commit() or Transaction.abort().
+
+    The life-cycle of a transaction looks like this:
+
+    1. Create the transaction and record the initial sequence number:
+
+        seqno = idl.change_seqno(idl)
+        txn = Transaction(idl)
+
+    2. Modify the database with Row and Transaction methods.
+
+    3. Commit the transaction by calling Transaction.commit().  The first call
+       to this function probably returns Transaction.INCOMPLETE.  The client
+       must keep calling again along as this remains true, calling Idl.run() in
+       between to let the IDL do protocol processing.  (If the client doesn't
+       have anything else to do in the meantime, it can use
+       Transaction.commit_block() to avoid having to loop itself.)
+
+    4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
+       to change from the saved 'seqno' (it's possible that it's already
+       changed, in which case the client should not wait at all), then start
+       over from step 1.  Only a call to Idl.run() will change the return value
+       of Idl.change_seqno.  (Transaction.commit_block() calls Idl.run().)"""
+
     # Status values that Transaction.commit() can return.
-    UNCOMMITTED = "uncommitted"  # Not yet committed or aborted.
-    UNCHANGED = "unchanged"      # Transaction didn't include any changes.
-    INCOMPLETE = "incomplete"    # Commit in progress, please wait.
-    ABORTED = "aborted"          # ovsdb_idl_txn_abort() called.
-    SUCCESS = "success"          # Commit successful.
-    AGAIN_WAIT = "wait then try again"
-                                 # Commit failed because a "verify" operation
-                                 # reported an inconsistency, due to a network
-                                 # problem, or other transient failure.  Wait
-                                 # for a change, then try again.
-    AGAIN_NOW = "try again now"  # Same as AGAIN_WAIT but try again right away.
-    NOT_LOCKED = "not locked"    # Server hasn't given us the lock yet.
-    ERROR = "error"              # Commit failed due to a hard error.
+
+    # Not yet committed or aborted.
+    UNCOMMITTED = "uncommitted"
+    # Transaction didn't include any changes.
+    UNCHANGED = "unchanged"
+    # Commit in progress, please wait.
+    INCOMPLETE = "incomplete"
+    # ovsdb_idl_txn_abort() called.
+    ABORTED = "aborted"
+    # Commit successful.
+    SUCCESS = "success"
+    # Commit failed because a "verify" operation
+    # reported an inconsistency, due to a network
+    # problem, or other transient failure.  Wait
+    # for a change, then try again.
+    TRY_AGAIN = "try again"
+    # Server hasn't given us the lock yet.
+    NOT_LOCKED = "not locked"
+    # Commit failed due to a hard error.
+    ERROR = "error"
 
     @staticmethod
     def status_to_string(status):
@@ -662,47 +973,45 @@ class Transaction(object):
         self._status = Transaction.UNCOMMITTED
         self._error = None
         self._comments = []
-        self._commit_seqno = self.idl.change_seqno
 
-        self._inc_table = None
+        self._inc_row = None
         self._inc_column = None
-        self._inc_where = None
+
+        self._fetch_requests = []
 
         self._inserted_rows = {}  # Map from UUID to _InsertedRow
 
     def add_comment(self, comment):
-        """Appens 'comment' to the comments that will be passed to the OVSDB
+        """Appends 'comment' to the comments that will be passed to the OVSDB
         server when this transaction is committed.  (The comment will be
         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
         relatively human-readable form.)"""
         self._comments.append(comment)
 
-    def increment(self, table, column, where):
-        assert not self._inc_table
-        self._inc_table = table
-        self._inc_column = column
-        self._inc_where = where
-
     def wait(self, poller):
+        """Causes poll_block() to wake up if this transaction has completed
+        committing."""
         if self._status not in (Transaction.UNCOMMITTED,
                                 Transaction.INCOMPLETE):
             poller.immediate_wake()
 
     def _substitute_uuids(self, json):
-        if type(json) in (list, tuple):
+        if isinstance(json, (list, tuple)):
             if (len(json) == 2
-                and json[0] == 'uuid'
-                and ovs.ovsuuid.is_valid_string(json[1])):
+                    and json[0] == 'uuid'
+                    and ovs.ovsuuid.is_valid_string(json[1])):
                 uuid = ovs.ovsuuid.from_string(json[1])
                 row = self._txn_rows.get(uuid, None)
                 if row and row._data is None:
                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
+            else:
+                return [self._substitute_uuids(elem) for elem in json]
         return json
 
     def __disassemble(self):
         self.idl.txn = None
 
-        for row in self._txn_rows.itervalues():
+        for row in six.itervalues(self._txn_rows):
             if row._changes is None:
                 row._table.rows[row.uuid] = row
             elif row._data is None:
@@ -712,23 +1021,63 @@ class Transaction(object):
         self._txn_rows = {}
 
     def commit(self):
-        """Attempts to commit this transaction and returns the status of the
-        commit operation, one of the constants declared as class attributes.
-        If the return value is Transaction.INCOMPLETE, then the transaction is
-        not yet complete and the caller should try calling again later, after
-        calling Idl.run() to run the Idl.
+        """Attempts to commit 'txn'.  Returns the status of the commit
+        operation, one of the following constants:
+
+          Transaction.INCOMPLETE:
+
+              The transaction is in progress, but not yet complete.  The caller
+              should call again later, after calling Idl.run() to let the
+              IDL do OVSDB protocol processing.
+
+          Transaction.UNCHANGED:
+
+              The transaction is complete.  (It didn't actually change the
+              database, so the IDL didn't send any request to the database
+              server.)
+
+          Transaction.ABORTED:
+
+              The caller previously called Transaction.abort().
+
+          Transaction.SUCCESS:
+
+              The transaction was successful.  The update made by the
+              transaction (and possibly other changes made by other database
+              clients) should already be visible in the IDL.
+
+          Transaction.TRY_AGAIN:
+
+              The transaction failed for some transient reason, e.g. because a
+              "verify" operation reported an inconsistency or due to a network
+              problem.  The caller should wait for a change to the database,
+              then compose a new transaction, and commit the new transaction.
+
+              Use Idl.change_seqno to wait for a change in the database.  It is
+              important to use its value *before* the initial call to
+              Transaction.commit() as the baseline for this purpose, because
+              the change that one should wait for can happen after the initial
+              call but before the call that returns Transaction.TRY_AGAIN, and
+              using some other baseline value in that situation could cause an
+              indefinite wait if the database rarely changes.
+
+          Transaction.NOT_LOCKED:
+
+              The transaction failed because the IDL has been configured to
+              require a database lock (with Idl.set_lock()) but didn't
+              get it yet or has already lost it.
 
         Committing a transaction rolls back all of the changes that it made to
-        the Idl's copy of the database.  If the transaction commits
+        the IDL's copy of the database.  If the transaction commits
         successfully, then the database server will send an update and, thus,
-        the Idl will be updated with the committed changes."""
+        the IDL will be updated with the committed changes."""
         # The status can only change if we're the active transaction.
         # (Otherwise, our status will change only in Idl.run().)
         if self != self.idl.txn:
             return self._status
 
         # If we need a lock but don't have it, give up quickly.
-        if self.idl.lock_name and not self.idl.has_lock():
+        if self.idl.lock_name and not self.idl.has_lock:
             self._status = Transaction.NOT_LOCKED
             self.__disassemble()
             return self._status
@@ -741,7 +1090,7 @@ class Transaction(object):
                                "lock": self.idl.lock_name})
 
         # Add prerequisites and declarations of new rows.
-        for row in self._txn_rows.itervalues():
+        for row in six.itervalues(self._txn_rows):
             if row._prereqs:
                 rows = {}
                 columns = []
@@ -758,7 +1107,7 @@ class Transaction(object):
 
         # Add updates.
         any_updates = False
-        for row in self._txn_rows.itervalues():
+        for row in six.itervalues(self._txn_rows):
             if row._changes is None:
                 if row._table.is_root:
                     operations.append({"op": "delete",
@@ -784,35 +1133,45 @@ class Transaction(object):
                 row_json = {}
                 op["row"] = row_json
 
-                for column_name, datum in row._changes.iteritems():
+                for column_name, datum in six.iteritems(row._changes):
                     if row._data is not None or not datum.is_default():
                         row_json[column_name] = (
-                                self._substitute_uuids(datum.to_json()))
+                            self._substitute_uuids(datum.to_json()))
 
                         # If anything really changed, consider it an update.
                         # We can't suppress not-really-changed values earlier
                         # or transactions would become nonatomic (see the big
                         # comment inside Transaction._write()).
                         if (not any_updates and row._data is not None and
-                            row._data[column_name] != datum):
+                                row._data[column_name] != datum):
                             any_updates = True
 
                 if row._data is None or row_json:
                     operations.append(op)
 
+        if self._fetch_requests:
+            for fetch in self._fetch_requests:
+                fetch["index"] = len(operations) - 1
+                operations.append({"op": "select",
+                                   "table": fetch["row"]._table.name,
+                                   "where": self._substitute_uuids(
+                                       _where_uuid_equals(fetch["row"].uuid)),
+                                   "columns": [fetch["column_name"]]})
+            any_updates = True
+
         # Add increment.
-        if self._inc_table and any_updates:
+        if self._inc_row and any_updates:
             self._inc_index = len(operations) - 1
 
             operations.append({"op": "mutate",
-                               "table": self._inc_table,
+                               "table": self._inc_row._table.name,
                                "where": self._substitute_uuids(
-                                   self._inc_where),
+                                   _where_uuid_equals(self._inc_row.uuid)),
                                "mutations": [[self._inc_column, "+=", 1]]})
             operations.append({"op": "select",
-                               "table": self._inc_table,
+                               "table": self._inc_row._table.name,
                                "where": self._substitute_uuids(
-                                   self._inc_where),
+                                   _where_uuid_equals(self._inc_row.uuid)),
                                "columns": [self._inc_column]})
 
         # Add comment.
@@ -833,12 +1192,18 @@ class Transaction(object):
                 self.idl._outstanding_txns[self._request_id] = self
                 self._status = Transaction.INCOMPLETE
             else:
-                self._status = Transaction.AGAIN_WAIT
+                self._status = Transaction.TRY_AGAIN
 
         self.__disassemble()
         return self._status
 
     def commit_block(self):
+        """Attempts to commit this transaction, blocking until the commit
+        either succeeds or fails.  Returns the final commit status, which may
+        be any Transaction.* value other than Transaction.INCOMPLETE.
+
+        This function calls Idl.run() on this transaction'ss IDL, so it may
+        cause Idl.change_seqno to change."""
         while True:
             status = self.commit()
             if status != Transaction.INCOMPLETE:
@@ -852,6 +1217,9 @@ class Transaction(object):
             poller.block()
 
     def get_increment_new_value(self):
+        """Returns the final (incremented) value of the column in this
+        transaction that was set to be incremented by Row.increment.  This
+        transaction must have committed successfully."""
         assert self._status == Transaction.SUCCESS
         return self._inc_new_value
 
@@ -896,6 +1264,14 @@ class Transaction(object):
             return inserted_row.real
         return None
 
+    def _increment(self, row, column):
+        assert not self._inc_row
+        self._inc_row = row
+        self._inc_column = column
+
+    def _fetch(self, row, column_name):
+        self._fetch_requests.append({"row": row, "column_name": column_name})
+
     def _write(self, row, column, datum):
         assert row._changes is not None
 
@@ -913,7 +1289,8 @@ class Transaction(object):
         # transaction only does writes of existing values, without making any
         # real changes, we will drop the whole transaction later in
         # ovsdb_idl_txn_commit().)
-        if not column.alert and row._data.get(column.name) == datum:
+        if (not column.alert and row._data and
+                row._data.get(column.name) == datum):
             new_value = row._changes.get(column.name)
             if new_value is None or new_value == datum:
                 return
@@ -942,7 +1319,7 @@ class Transaction(object):
     def _process_reply(self, msg):
         if msg.type == ovs.jsonrpc.Message.T_ERROR:
             self._status = Transaction.ERROR
-        elif type(msg.result) not in (list, tuple):
+        elif not isinstance(msg.result, (list, tuple)):
             # XXX rate-limit
             vlog.warn('reply to "transact" is not JSON array')
         else:
@@ -957,7 +1334,7 @@ class Transaction(object):
                     # prior operation failed, so make sure that we know about
                     # it.
                     soft_errors = True
-                elif type(op) == dict:
+                elif isinstance(op, dict):
                     error = op.get("error")
                     if error is not None:
                         if error == "timed out":
@@ -976,10 +1353,15 @@ class Transaction(object):
                     vlog.warn("operation reply is not JSON null or object")
 
             if not soft_errors and not hard_errors and not lock_errors:
-                if self._inc_table and not self.__process_inc_reply(ops):
+                if self._inc_row and not self.__process_inc_reply(ops):
                     hard_errors = True
+                if self._fetch_requests:
+                    if self.__process_fetch_reply(ops):
+                        self.idl.change_seqno += 1
+                    else:
+                        hard_errors = True
 
-                for insert in self._inserted_rows.itervalues():
+                for insert in six.itervalues(self._inserted_rows):
                     if not self.__process_insert_reply(insert, ops):
                         hard_errors = True
 
@@ -988,10 +1370,7 @@ class Transaction(object):
             elif lock_errors:
                 self._status = Transaction.NOT_LOCKED
             elif soft_errors:
-                if self._commit_seqno == self.idl.change_seqno:
-                    self._status = Transaction.AGAIN_WAIT
-                else:
-                    self._status = Transaction.AGAIN_NOW
+                self._status = Transaction.TRY_AGAIN
             else:
                 self._status = Transaction.SUCCESS
 
@@ -1001,13 +1380,45 @@ class Transaction(object):
             # XXX rate-limit
             vlog.warn("%s is missing" % name)
             return False
-        elif type(json) not in types:
+        elif not isinstance(json, tuple(types)):
             # XXX rate-limit
             vlog.warn("%s has unexpected type %s" % (name, type(json)))
             return False
         else:
             return True
 
+    def __process_fetch_reply(self, ops):
+        update = False
+        for fetch_request in self._fetch_requests:
+            row = fetch_request["row"]
+            column_name = fetch_request["column_name"]
+            index = fetch_request["index"]
+            table = row._table
+
+            select = ops[index]
+            fetched_rows = select.get("rows")
+            if not Transaction.__check_json_type(fetched_rows, (list, tuple),
+                                                 '"select" reply "rows"'):
+                return False
+            if len(fetched_rows) != 1:
+                # XXX rate-limit
+                vlog.warn('"select" reply "rows" has %d elements '
+                          'instead of 1' % len(fetched_rows))
+                continue
+            fetched_row = fetched_rows[0]
+            if not Transaction.__check_json_type(fetched_row, (dict,),
+                                                 '"select" reply row'):
+                continue
+
+            column = table.columns.get(column_name)
+            datum_json = fetched_row.get(column_name)
+            datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+
+            row._data[column_name] = datum
+            update = True
+
+        return update
+
     def __process_inc_reply(self, ops):
         if self._inc_index + 2 > len(ops):
             # XXX rate-limit
@@ -1019,7 +1430,7 @@ class Transaction(object):
         # __process_reply() already checked.
         mutate = ops[self._inc_index]
         count = mutate.get("count")
-        if not Transaction.__check_json_type(count, (int, long),
+        if not Transaction.__check_json_type(count, six.integer_types,
                                              '"mutate" reply "count"'):
             return False
         if count != 1:
@@ -1042,7 +1453,7 @@ class Transaction(object):
                                              '"select" reply row'):
             return False
         column = row.get(self._inc_column)
-        if not Transaction.__check_json_type(column, (int, long),
+        if not Transaction.__check_json_type(column, six.integer_types,
                                              '"select" reply inc column'):
             return False
         self._inc_new_value = column
@@ -1073,3 +1484,105 @@ class Transaction(object):
 
         insert.real = uuid_
         return True
+
+
+class SchemaHelper(object):
+    """IDL Schema helper.
+
+    This class encapsulates the logic required to generate schemas suitable
+    for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
+    they are interested in using register_columns().  When finished, the
+    get_idl_schema() function may be called.
+
+    The location on disk of the schema used may be found in the
+    'schema_location' variable."""
+
+    def __init__(self, location=None, schema_json=None):
+        """Creates a new Schema object.
+
+        'location' file path to ovs schema. None means default location
+        'schema_json' schema in json preresentation in memory
+        """
+
+        if location and schema_json:
+            raise ValueError("both location and schema_json can't be "
+                             "specified. it's ambiguous.")
+        if schema_json is None:
+            if location is None:
+                location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
+            schema_json = ovs.json.from_file(location)
+
+        self.schema_json = schema_json
+        self._tables = {}
+        self._readonly = {}
+        self._all = False
+
+    def register_columns(self, table, columns, readonly=[]):
+        """Registers interest in the given 'columns' of 'table'.  Future calls
+        to get_idl_schema() will include 'table':column for each column in
+        'columns'. This function automatically avoids adding duplicate entries
+        to the schema.
+        A subset of 'columns' can be specified as 'readonly'. The readonly
+        columns are not replicated but can be fetched on-demand by the user
+        with Row.fetch().
+
+        'table' must be a string.
+        'columns' must be a list of strings.
+        'readonly' must be a list of strings.
+        """
+
+        assert isinstance(table, six.string_types)
+        assert isinstance(columns, list)
+
+        columns = set(columns) | self._tables.get(table, set())
+        self._tables[table] = columns
+        self._readonly[table] = readonly
+
+    def register_table(self, table):
+        """Registers interest in the given all columns of 'table'. Future calls
+        to get_idl_schema() will include all columns of 'table'.
+
+        'table' must be a string
+        """
+        assert isinstance(table, six.string_types)
+        self._tables[table] = set()  # empty set means all columns in the table
+
+    def register_all(self):
+        """Registers interest in every column of every table."""
+        self._all = True
+
+    def get_idl_schema(self):
+        """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
+        object based on columns registered using the register_columns()
+        function."""
+
+        schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
+        self.schema_json = None
+
+        if not self._all:
+            schema_tables = {}
+            for table, columns in six.iteritems(self._tables):
+                schema_tables[table] = (
+                    self._keep_table_columns(schema, table, columns))
+
+            schema.tables = schema_tables
+        schema.readonly = self._readonly
+        return schema
+
+    def _keep_table_columns(self, schema, table_name, columns):
+        assert table_name in schema.tables
+        table = schema.tables[table_name]
+
+        if not columns:
+            # empty set means all columns in the table
+            return table
+
+        new_columns = {}
+        for column_name in columns:
+            assert isinstance(column_name, six.string_types)
+            assert column_name in table.columns
+
+            new_columns[column_name] = table.columns[column_name]
+
+        table.columns = new_columns
+        return table