ovsdb: optimize match_any_clause() condition evaluation
[cascardo/ovs.git] / ovsdb / monitor.c
index 257959c..54c27c4 100644 (file)
 
 #include "bitmap.h"
 #include "column.h"
-#include "dynamic-string.h"
+#include "openvswitch/dynamic-string.h"
 #include "json.h"
 #include "jsonrpc.h"
 #include "ovsdb-error.h"
 #include "ovsdb-parser.h"
 #include "ovsdb.h"
 #include "row.h"
+#include "condition.h"
 #include "simap.h"
 #include "hash.h"
 #include "table.h"
 #include "monitor.h"
 #include "openvswitch/vlog.h"
 
+VLOG_DEFINE_THIS_MODULE(ovsdb_monitor);
 
 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
 static struct hmap ovsdb_monitors = HMAP_INITIALIZER(&ovsdb_monitors);
 
+/* Keep state of session's conditions */
+struct ovsdb_monitor_session_condition {
+    bool conditional;
+    size_t n_true_cnd;
+    struct shash tables;     /* Contains
+                              *   "struct ovsdb_monitor_table_condition *"s. */
+};
+
+/* Monitored table session's conditions */
+struct ovsdb_monitor_table_condition {
+    const struct ovsdb_table *table;
+    struct ovsdb_monitor_table *mt;
+    struct ovsdb_condition old_condition;
+    struct ovsdb_condition new_condition;
+};
+
 /*  Backend monitor.
  *
  *  ovsdb_monitor keep track of the ovsdb changes.
@@ -61,6 +79,7 @@ struct ovsdb_monitor {
  * inclusive.  */
 struct ovsdb_monitor_json_cache_node {
     struct hmap_node hmap_node;   /* Elements in json cache. */
+    enum ovsdb_monitor_version version;
     uint64_t from_txn;
     struct json *json;            /* Null, or a cloned of json */
 };
@@ -74,6 +93,7 @@ struct jsonrpc_monitor_node {
 struct ovsdb_monitor_column {
     const struct ovsdb_column *column;
     enum ovsdb_monitor_selection select;
+    bool monitored;
 };
 
 /* A row that has changed in a monitored table. */
@@ -114,11 +134,31 @@ struct ovsdb_monitor_table {
     /* Columns being monitored. */
     struct ovsdb_monitor_column *columns;
     size_t n_columns;
+    size_t n_monitored_columns;
+    size_t allocated_columns;
+
+    /* Columns in ovsdb_monitor_row have different indexes then in
+     * ovsdb_row. This field maps between column->index to the index in the
+     * ovsdb_monitor_row. It is used for condition evaluation. */
+    unsigned int *columns_index_map;
 
     /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
     struct hmap changes;
 };
 
+enum ovsdb_monitor_row_type {
+    OVSDB_ROW,
+    OVSDB_MONITOR_ROW
+};
+
+typedef struct json *
+(*compose_row_update_cb_func)
+    (const struct ovsdb_monitor_table *mt,
+     const struct ovsdb_monitor_session_condition * condition,
+     enum ovsdb_monitor_row_type row_type,
+     const void *,
+     bool initial, unsigned long int *changed);
+
 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
 static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
     struct ovsdb_monitor_table *mt, uint64_t next_txn);
@@ -129,15 +169,27 @@ static void ovsdb_monitor_changes_destroy(
 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
                                   uint64_t unflushed);
 
+static uint32_t
+json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn)
+{
+    uint32_t hash;
+
+    hash = hash_uint64(version);
+    hash = hash_uint64_basis(from_txn, hash);
+
+    return hash;
+}
+
 static struct ovsdb_monitor_json_cache_node *
 ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
+                                enum ovsdb_monitor_version version,
                                 uint64_t from_txn)
 {
     struct ovsdb_monitor_json_cache_node *node;
-    uint32_t hash = hash_uint64(from_txn);
+    uint32_t hash = json_cache_hash(version, from_txn);
 
     HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) {
-        if (node->from_txn == from_txn) {
+        if (node->from_txn == from_txn && node->version == version) {
             return node;
         }
     }
@@ -147,14 +199,15 @@ ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
 
 static void
 ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon,
+                                enum ovsdb_monitor_version version,
                                 uint64_t from_txn, struct json *json)
 {
     struct ovsdb_monitor_json_cache_node *node;
-    uint32_t hash;
+    uint32_t hash = json_cache_hash(version, from_txn);
 
     node = xmalloc(sizeof *node);
 
-    hash = hash_uint64(from_txn);
+    node->version = version;
     node->from_txn = from_txn;
     node->json = json ? json_clone(json) : NULL;
 
@@ -164,10 +217,9 @@ ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon,
 static void
 ovsdb_monitor_json_cache_flush(struct ovsdb_monitor *dbmon)
 {
-    struct ovsdb_monitor_json_cache_node *node, *next;
+    struct ovsdb_monitor_json_cache_node *node;
 
-    HMAP_FOR_EACH_SAFE(node, next, hmap_node, &dbmon->json_cache) {
-        hmap_remove(&dbmon->json_cache, &node->hmap_node);
+    HMAP_FOR_EACH_POP(node, hmap_node, &dbmon->json_cache) {
         json_destroy(node->json);
         free(node);
     }
@@ -179,6 +231,11 @@ compare_ovsdb_monitor_column(const void *a_, const void *b_)
     const struct ovsdb_monitor_column *a = a_;
     const struct ovsdb_monitor_column *b = b_;
 
+    /* put all monitored columns at the begining */
+    if (a->monitored != b->monitored) {
+        return a->monitored ? -1 : 1;
+    }
+
     return a->column < b->column ? -1 : a->column > b->column;
 }
 
@@ -286,6 +343,24 @@ ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
     }
 }
 
+static void
+ovsdb_monitor_columns_sort(struct ovsdb_monitor *dbmon)
+{
+    int i;
+    struct shash_node *node;
+
+    SHASH_FOR_EACH (node, &dbmon->tables) {
+        struct ovsdb_monitor_table *mt = node->data;
+
+        qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
+              compare_ovsdb_monitor_column);
+        for (i = 0; i < mt->n_columns; i++) {
+            /* re-set index map due to sort */
+            mt->columns_index_map[mt->columns[i].column->index] = i;
+        }
+    }
+}
+
 void
 ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
                                   struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
@@ -294,7 +369,7 @@ ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
 
     jm = xzalloc(sizeof *jm);
     jm->jsonrpc_monitor = jsonrpc_monitor;
-    list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
+    ovs_list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
 }
 
 struct ovsdb_monitor *
@@ -307,7 +382,7 @@ ovsdb_monitor_create(struct ovsdb *db,
 
     ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
     ovsdb_add_replica(db, &dbmon->replica);
-    list_init(&dbmon->jsonrpc_monitors);
+    ovs_list_init(&dbmon->jsonrpc_monitors);
     dbmon->db = db;
     dbmon->n_transactions = 0;
     shash_init(&dbmon->tables);
@@ -323,60 +398,96 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m,
                         const struct ovsdb_table *table)
 {
     struct ovsdb_monitor_table *mt;
+    int i;
+    size_t n_columns = shash_count(&table->schema->columns);
 
     mt = xzalloc(sizeof *mt);
     mt->table = table;
     shash_add(&m->tables, table->schema->name, mt);
     hmap_init(&mt->changes);
+    mt->columns_index_map =
+        xmalloc(sizeof *mt->columns_index_map * n_columns);
+    for (i = 0; i < n_columns; i++) {
+        mt->columns_index_map[i] = -1;
+    }
 }
 
-void
+const char *
 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
                          const struct ovsdb_table *table,
                          const struct ovsdb_column *column,
                          enum ovsdb_monitor_selection select,
-                         size_t *allocated_columns)
+                         bool monitored)
 {
     struct ovsdb_monitor_table *mt;
     struct ovsdb_monitor_column *c;
 
     mt = shash_find_data(&dbmon->tables, table->schema->name);
 
-    if (mt->n_columns >= *allocated_columns) {
-        mt->columns = x2nrealloc(mt->columns, allocated_columns,
+    /* Check for column duplication. Return duplicated column name. */
+    if (mt->columns_index_map[column->index] != -1) {
+        return column->name;
+    }
+
+    if (mt->n_columns >= mt->allocated_columns) {
+        mt->columns = x2nrealloc(mt->columns, &mt->allocated_columns,
                                  sizeof *mt->columns);
     }
 
     mt->select |= select;
+    mt->columns_index_map[column->index] = mt->n_columns;
     c = &mt->columns[mt->n_columns++];
     c->column = column;
     c->select = select;
+    c->monitored = monitored;
+    if (monitored) {
+        mt->n_monitored_columns++;
+    }
+
+    return NULL;
 }
 
-/* Check for duplicated column names. Return the first
- * duplicated column's name if found. Otherwise return
- * NULL.  */
-const char * OVS_WARN_UNUSED_RESULT
-ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
-                                     const struct ovsdb_table *table)
+static void
+ovsdb_monitor_condition_add_columns(struct ovsdb_monitor *dbmon,
+                                    const struct ovsdb_table *table,
+                                    struct ovsdb_condition *condition)
 {
-    struct ovsdb_monitor_table *mt;
+    size_t n_columns;
     int i;
+    const struct ovsdb_column **columns =
+        ovsdb_condition_get_columns(condition, &n_columns);
 
-    mt = shash_find_data(&m->tables, table->schema->name);
+    for (i = 0; i < n_columns; i++) {
+        ovsdb_monitor_add_column(dbmon, table, columns[i],
+                                 OJMS_NONE, false);
+    }
 
-    if (mt) {
-        /* Check for duplicate columns. */
-        qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
-              compare_ovsdb_monitor_column);
-        for (i = 1; i < mt->n_columns; i++) {
-            if (mt->columns[i].column == mt->columns[i - 1].column) {
-                return mt->columns[i].column->name;
-            }
-        }
+    free(columns);
+}
+
+/* Bind this session's condition to ovsdb_monitor */
+void
+ovsdb_monitor_condition_bind(struct ovsdb_monitor *dbmon,
+                          struct ovsdb_monitor_session_condition *cond)
+{
+    struct shash_node *node;
+
+    SHASH_FOR_EACH(node, &cond->tables) {
+        struct ovsdb_monitor_table_condition *mtc = node->data;
+        struct ovsdb_monitor_table *mt =
+            shash_find_data(&dbmon->tables, mtc->table->schema->name);
+
+        mtc->mt = mt;
+        ovsdb_monitor_condition_add_columns(dbmon, mtc->table,
+                                            &mtc->new_condition);
     }
+}
 
-    return NULL;
+bool
+ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
+                           const struct ovsdb_table *table)
+{
+    return shash_find_data(&m->tables, table->schema->name);
 }
 
 static struct ovsdb_monitor_changes *
@@ -412,9 +523,7 @@ ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
     return NULL;
 }
 
-/* Stop currently tracking changes to table 'mt' since 'transaction'.
- *
- * Return 'true' if the 'transaction' is being tracked. 'false' otherwise. */
+/* Stop currently tracking changes to table 'mt' since 'transaction'. */
 static void
 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
                                     uint64_t transaction)
@@ -458,6 +567,267 @@ ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
     free(changes);
 }
 
+static enum ovsdb_monitor_selection
+ovsdb_monitor_row_update_type(bool initial, const bool old, const bool new)
+{
+    return initial ? OJMS_INITIAL
+            : !old ? OJMS_INSERT
+            : !new ? OJMS_DELETE
+            : OJMS_MODIFY;
+}
+
+/* Set conditional monitoring mode only if we have non-empty condition in one
+ * of the tables at least */
+static inline void
+ovsdb_monitor_session_condition_set_mode(
+                                  struct ovsdb_monitor_session_condition *cond)
+{
+    cond->conditional = shash_count(&cond->tables) !=
+        cond->n_true_cnd;
+}
+
+/* Returnes an empty allocated session's condition state holder */
+struct ovsdb_monitor_session_condition *
+ovsdb_monitor_session_condition_create(void)
+{
+    struct ovsdb_monitor_session_condition *condition =
+        xzalloc(sizeof *condition);
+
+    condition->conditional = false;
+    shash_init(&condition->tables);
+    return condition;
+}
+
+void
+ovsdb_monitor_session_condition_destroy(
+                           struct ovsdb_monitor_session_condition *condition)
+{
+    struct shash_node *node, *next;
+
+    if (!condition) {
+        return;
+    }
+
+    SHASH_FOR_EACH_SAFE (node, next, &condition->tables) {
+        struct ovsdb_monitor_table_condition *mtc = node->data;
+
+        ovsdb_condition_destroy(&mtc->new_condition);
+        ovsdb_condition_destroy(&mtc->old_condition);
+        shash_delete(&condition->tables, node);
+        free(mtc);
+    }
+    free(condition);
+}
+
+struct ovsdb_error *
+ovsdb_monitor_table_condition_create(
+                         struct ovsdb_monitor_session_condition *condition,
+                         const struct ovsdb_table *table,
+                         const struct json *json_cnd)
+{
+    struct ovsdb_monitor_table_condition *mtc;
+    struct ovsdb_error *error;
+
+    mtc = xzalloc(sizeof *mtc);
+    mtc->table = table;
+    ovsdb_condition_init(&mtc->old_condition);
+    ovsdb_condition_init(&mtc->new_condition);
+
+    if (json_cnd) {
+        error = ovsdb_condition_from_json(table->schema,
+                                          json_cnd,
+                                          NULL,
+                                          &mtc->old_condition);
+        if (error) {
+            free(mtc);
+            return error;
+        }
+    }
+
+    shash_add(&condition->tables, table->schema->name, mtc);
+    /* On session startup old == new condition */
+    ovsdb_condition_clone(&mtc->new_condition, &mtc->old_condition);
+    if (ovsdb_condition_is_true(&mtc->old_condition)) {
+        condition->n_true_cnd++;
+        ovsdb_monitor_session_condition_set_mode(condition);
+    }
+
+    return NULL;
+}
+
+static bool
+ovsdb_monitor_get_table_conditions(
+                      const struct ovsdb_monitor_table *mt,
+                      const struct ovsdb_monitor_session_condition *condition,
+                      struct ovsdb_condition **old_condition,
+                      struct ovsdb_condition **new_condition)
+{
+    if (!condition) {
+        return false;
+    }
+
+    struct ovsdb_monitor_table_condition *mtc =
+        shash_find_data(&condition->tables, mt->table->schema->name);
+
+    if (!mtc) {
+        return false;
+    }
+    *old_condition = &mtc->old_condition;
+    *new_condition = &mtc->new_condition;
+
+    return true;
+}
+
+struct ovsdb_error *
+ovsdb_monitor_table_condition_update(
+                            struct ovsdb_monitor *dbmon,
+                            struct ovsdb_monitor_session_condition *condition,
+                            const struct ovsdb_table *table,
+                            const struct json *cond_json)
+{
+    struct ovsdb_monitor_table_condition *mtc =
+        shash_find_data(&condition->tables, table->schema->name);
+    struct ovsdb_error *error;
+    struct ovsdb_condition cond = OVSDB_CONDITION_INITIALIZER(&cond);
+
+    if (!condition) {
+        return NULL;
+    }
+
+    error = ovsdb_condition_from_json(table->schema, cond_json,
+                                      NULL, &cond);
+    if (error) {
+        return error;
+    }
+    ovsdb_condition_destroy(&mtc->new_condition);
+    ovsdb_condition_clone(&mtc->new_condition, &cond);
+    ovsdb_condition_destroy(&cond);
+    ovsdb_monitor_condition_add_columns(dbmon,
+                                        table,
+                                        &mtc->new_condition);
+
+    return NULL;
+}
+
+static void
+ovsdb_monitor_table_condition_updated(struct ovsdb_monitor_table *mt,
+                    struct ovsdb_monitor_session_condition *condition)
+{
+    struct ovsdb_monitor_table_condition *mtc =
+        shash_find_data(&condition->tables, mt->table->schema->name);
+
+    if (mtc) {
+        /* If conditional monitoring - set old condition to new condition */
+        if (ovsdb_condition_cmp_3way(&mtc->old_condition,
+                                     &mtc->new_condition)) {
+            if (ovsdb_condition_is_true(&mtc->new_condition)) {
+                               if (!ovsdb_condition_is_true(&mtc->old_condition)) {
+                    condition->n_true_cnd++;
+                }
+            } else {
+                if (ovsdb_condition_is_true(&mtc->old_condition)) {
+                    condition->n_true_cnd--;
+                }
+            }
+            ovsdb_condition_destroy(&mtc->old_condition);
+            ovsdb_condition_clone(&mtc->old_condition, &mtc->new_condition);
+            ovsdb_monitor_session_condition_set_mode(condition);
+        }
+    }
+}
+
+static enum ovsdb_monitor_selection
+ovsdb_monitor_row_update_type_condition(
+                      const struct ovsdb_monitor_table *mt,
+                      const struct ovsdb_monitor_session_condition *condition,
+                      bool initial,
+                      enum ovsdb_monitor_row_type row_type,
+                      const struct ovsdb_datum *old,
+                      const struct ovsdb_datum *new)
+{
+    struct ovsdb_condition *old_condition, *new_condition;
+    enum ovsdb_monitor_selection type =
+        ovsdb_monitor_row_update_type(initial, old, new);
+
+    if (ovsdb_monitor_get_table_conditions(mt,
+                                           condition,
+                                           &old_condition,
+                                           &new_condition)) {
+        bool old_cond = !old ? false
+            : ovsdb_condition_empty_or_match_any(old,
+                                                old_condition,
+                                                row_type == OVSDB_MONITOR_ROW ?
+                                                mt->columns_index_map :
+                                                NULL);
+        bool new_cond = !new ? false
+            : ovsdb_condition_empty_or_match_any(new,
+                                                new_condition,
+                                                row_type == OVSDB_MONITOR_ROW ?
+                                                mt->columns_index_map :
+                                                NULL);
+
+        if (!old_cond && !new_cond) {
+            type = OJMS_NONE;
+        }
+
+        switch (type) {
+        case OJMS_INITIAL:
+        case OJMS_INSERT:
+            if (!new_cond) {
+                type = OJMS_NONE;
+            }
+            break;
+        case OJMS_MODIFY:
+            type = !old_cond ? OJMS_INSERT : !new_cond
+                ? OJMS_DELETE : OJMS_MODIFY;
+            break;
+        case OJMS_DELETE:
+            if (!old_cond) {
+                type = OJMS_NONE;
+            }
+            break;
+        case OJMS_NONE:
+            break;
+        }
+    }
+    return type;
+}
+
+static bool
+ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
+                              enum ovsdb_monitor_row_type row_type,
+                              const struct ovsdb_datum *old,
+                              const struct ovsdb_datum *new,
+                              enum ovsdb_monitor_selection type,
+                              unsigned long int *changed)
+{
+    if (!(mt->select & type)) {
+        return true;
+    }
+
+    if (type == OJMS_MODIFY) {
+        size_t i, n_changes;
+
+        n_changes = 0;
+        memset(changed, 0, bitmap_n_bytes(mt->n_columns));
+        for (i = 0; i < mt->n_columns; i++) {
+            const struct ovsdb_column *c = mt->columns[i].column;
+            size_t index = row_type == OVSDB_ROW ? c->index : i;
+            if (!ovsdb_datum_equals(&old[index], &new[index], &c->type)) {
+                bitmap_set1(changed, i);
+                n_changes++;
+            }
+        }
+        if (!n_changes) {
+            /* No actual changes: presumably a row changed and then
+             * changed back later. */
+            return true;
+        }
+    }
+
+    return false;
+}
+
 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
  * 'mt', or NULL if no row update should be sent.
  *
@@ -470,41 +840,24 @@ ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
 static struct json *
 ovsdb_monitor_compose_row_update(
     const struct ovsdb_monitor_table *mt,
-    const struct ovsdb_monitor_row *row,
+    const struct ovsdb_monitor_session_condition *condition OVS_UNUSED,
+    enum ovsdb_monitor_row_type row_type OVS_UNUSED,
+    const void *_row,
     bool initial, unsigned long int *changed)
 {
+    const struct ovsdb_monitor_row *row = _row;
     enum ovsdb_monitor_selection type;
     struct json *old_json, *new_json;
     struct json *row_json;
     size_t i;
 
-    type = (initial ? OJMS_INITIAL
-            : !row->old ? OJMS_INSERT
-            : !row->new ? OJMS_DELETE
-            : OJMS_MODIFY);
-    if (!(mt->select & type)) {
+    ovs_assert(row_type == OVSDB_MONITOR_ROW);
+    type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
+    if (ovsdb_monitor_row_skip_update(mt, row_type, row->old,
+                                      row->new, type, changed)) {
         return NULL;
     }
 
-    if (type == OJMS_MODIFY) {
-        size_t n_changes;
-
-        n_changes = 0;
-        memset(changed, 0, bitmap_n_bytes(mt->n_columns));
-        for (i = 0; i < mt->n_columns; i++) {
-            const struct ovsdb_column *c = mt->columns[i].column;
-            if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
-                bitmap_set1(changed, i);
-                n_changes++;
-            }
-        }
-        if (!n_changes) {
-            /* No actual changes: presumably a row changed and then
-             * changed back later. */
-            return NULL;
-        }
-    }
-
     row_json = json_object_create();
     old_json = new_json = NULL;
     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
@@ -515,10 +868,10 @@ ovsdb_monitor_compose_row_update(
         new_json = json_object_create();
         json_object_put(row_json, "new", new_json);
     }
-    for (i = 0; i < mt->n_columns; i++) {
+    for (i = 0; i < mt->n_monitored_columns; i++) {
         const struct ovsdb_monitor_column *c = &mt->columns[i];
 
-        if (!(type & c->select)) {
+        if (!c->monitored || !(type & c->select))  {
             /* We don't care about this type of change for this
              * particular column (but we will care about it for some
              * other column). */
@@ -541,25 +894,139 @@ ovsdb_monitor_compose_row_update(
     return row_json;
 }
 
-/* Constructs and returns JSON for a <table-updates> object (as described in
- * RFC 7047) for all the outstanding changes within 'monitor', starting from
- * 'transaction'.  */
-static struct json*
-ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
-                             bool initial, uint64_t transaction)
+/* Returns JSON for a <row-update2> (as described in ovsdb-server(1) mapage)
+ * for 'row' within * 'mt', or NULL if no row update should be sent.
+ *
+ * The caller should specify 'initial' as true if the returned JSON is
+ * going to be used as part of the initial reply to a "monitor_cond" request,
+ * false if it is going to be used as part of an "update2" notification.
+ *
+ * 'changed' must be a scratch buffer for internal use that is at least
+ * bitmap_n_bytes(mt->n_columns) bytes long. */
+static struct json *
+ovsdb_monitor_compose_row_update2(
+    const struct ovsdb_monitor_table *mt,
+    const struct ovsdb_monitor_session_condition *condition,
+    enum ovsdb_monitor_row_type row_type,
+    const void *_row,
+    bool initial, unsigned long int *changed)
+{
+    enum ovsdb_monitor_selection type;
+    struct json *row_update2, *diff_json;
+    const struct ovsdb_datum *old, *new;
+    size_t i;
+
+    if (row_type == OVSDB_MONITOR_ROW) {
+        old = ((const struct ovsdb_monitor_row *)_row)->old;;
+        new = ((const struct ovsdb_monitor_row *)_row)->new;
+    } else {
+        old = new = ((const struct ovsdb_row *)_row)->fields;
+    }
+
+    type = ovsdb_monitor_row_update_type_condition(mt, condition, initial,
+                                                   row_type, old, new);
+    if (ovsdb_monitor_row_skip_update(mt, row_type, old, new, type, changed)) {
+        return NULL;
+    }
+
+    row_update2 = json_object_create();
+    if (type == OJMS_DELETE) {
+        json_object_put(row_update2, "delete", json_null_create());
+    } else {
+        diff_json = json_object_create();
+        const char *op;
+
+        for (i = 0; i < mt->n_monitored_columns; i++) {
+            const struct ovsdb_monitor_column *c = &mt->columns[i];
+            size_t index = row_type == OVSDB_ROW ? c->column->index : i;
+            if (!c->monitored || !(type & c->select))  {
+                /* We don't care about this type of change for this
+                 * particular column (but we will care about it for some
+                 * other column). */
+                continue;
+            }
+
+            if (type == OJMS_MODIFY) {
+                struct ovsdb_datum diff;
+
+                if (!bitmap_is_set(changed, i)) {
+                    continue;
+                }
+
+                ovsdb_datum_diff(&diff ,&old[index], &new[index],
+                                        &c->column->type);
+                json_object_put(diff_json, c->column->name,
+                                ovsdb_datum_to_json(&diff, &c->column->type));
+                ovsdb_datum_destroy(&diff, &c->column->type);
+            } else {
+                if (!ovsdb_datum_is_default(&new[index], &c->column->type)) {
+                    json_object_put(diff_json, c->column->name,
+                                    ovsdb_datum_to_json(&new[index],
+                                                        &c->column->type));
+                }
+            }
+        }
+
+        op = type == OJMS_INITIAL ? "initial"
+                                  : type == OJMS_MODIFY ? "modify" : "insert";
+        json_object_put(row_update2, op, diff_json);
+    }
+
+    return row_update2;
+}
+
+static size_t
+ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
 {
     struct shash_node *node;
-    unsigned long int *changed;
-    struct json *json;
-    size_t max_columns;
+    size_t max_columns = 0;
 
-    max_columns = 0;
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
 
         max_columns = MAX(max_columns, mt->n_columns);
     }
-    changed = xmalloc(bitmap_n_bytes(max_columns));
+
+    return max_columns;
+}
+
+static void
+ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
+                           struct json **table_json, struct json *row_json,
+                           const struct uuid *row_uuid)
+{
+    char uuid[UUID_LEN + 1];
+
+    /* Create JSON object for transaction overall. */
+    if (!*json) {
+        *json = json_object_create();
+    }
+
+    /* Create JSON object for transaction on this table. */
+    if (!*table_json) {
+        *table_json = json_object_create();
+        json_object_put(*json, table_name, *table_json);
+    }
+
+    /* Add JSON row to JSON table. */
+    snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(row_uuid));
+    json_object_put(*table_json, uuid, row_json);
+}
+
+/* Constructs and returns JSON for a <table-updates> object (as described in
+ * RFC 7047) for all the outstanding changes within 'monitor', starting from
+ * 'transaction'.  */
+static struct json*
+ovsdb_monitor_compose_update(
+                      struct ovsdb_monitor *dbmon,
+                      bool initial, uint64_t transaction,
+                      const struct ovsdb_monitor_session_condition *condition,
+                      compose_row_update_cb_func row_update)
+{
+    struct shash_node *node;
+    struct json *json;
+    size_t max_columns = ovsdb_monitor_max_columns(dbmon);
+    unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
 
     json = NULL;
     SHASH_FOR_EACH (node, &dbmon->tables) {
@@ -573,30 +1040,61 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
             continue;
         }
 
-        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
-            struct json *row_json;
+               HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
+                       struct json *row_json;
+                       row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
+                                                                        initial, changed);
+                       if (row_json) {
+                               ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
+                                                                                  &table_json, row_json,
+                                                                                  &row->uuid);
+                       }
+               }
+       }
+    free(changed);
 
-            row_json = ovsdb_monitor_compose_row_update(
-                mt, row, initial, changed);
-            if (row_json) {
-                char uuid[UUID_LEN + 1];
+    return json;
+}
 
-                /* Create JSON object for transaction overall. */
-                if (!json) {
-                    json = json_object_create();
-                }
+static struct json*
+ovsdb_monitor_compose_cond_change_update(
+                    struct ovsdb_monitor *dbmon,
+                    struct ovsdb_monitor_session_condition *condition)
+{
+    struct shash_node *node;
+    struct json *json = NULL;
+    size_t max_columns = ovsdb_monitor_max_columns(dbmon);
+    unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
 
-                /* Create JSON object for transaction on this table. */
-                if (!table_json) {
-                    table_json = json_object_create();
-                    json_object_put(json, mt->table->schema->name, table_json);
-                }
+    SHASH_FOR_EACH (node, &dbmon->tables) {
+        struct ovsdb_monitor_table *mt = node->data;
+        struct ovsdb_row *row;
+        struct json *table_json = NULL;
+        struct ovsdb_condition *old_condition, *new_condition;
+
+        if (!ovsdb_monitor_get_table_conditions(mt,
+                                                condition,
+                                                &old_condition,
+                                                &new_condition) ||
+            !ovsdb_condition_cmp_3way(old_condition, new_condition)) {
+            /* Nothing to update on this table */
+            continue;
+        }
 
-                /* Add JSON row to JSON table. */
-                snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
-                json_object_put(table_json, uuid, row_json);
+        /* Iterate over all rows in table */
+        HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
+            struct json *row_json;
+
+            row_json = ovsdb_monitor_compose_row_update2(mt, condition,
+                                                         OVSDB_ROW, row,
+                                                         false, changed);
+            if (row_json) {
+                ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
+                                           &table_json, row_json,
+                                           ovsdb_row_get_uuid(row));
             }
         }
+        ovsdb_monitor_table_condition_updated(mt, condition);
     }
     free(changed);
 
@@ -605,39 +1103,71 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
 
 /* Returns JSON for a <table-updates> object (as described in RFC 7047)
  * for all the outstanding changes within 'monitor' that starts from
- * '*unflushed' transaction id.
+ * '*unflushed'.
+ * If cond_updated is true all rows in the db that match conditions will be
+ * sent.
  *
  * The caller should specify 'initial' as true if the returned JSON is going to
  * be used as part of the initial reply to a "monitor" request, false if it is
  * going to be used as part of an "update" notification. */
 struct json *
-ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon,
-                         bool initial, uint64_t *unflushed)
+ovsdb_monitor_get_update(
+             struct ovsdb_monitor *dbmon,
+             bool initial, bool cond_updated,
+             uint64_t *unflushed_,
+             struct ovsdb_monitor_session_condition *condition,
+             enum ovsdb_monitor_version version)
 {
-    struct ovsdb_monitor_json_cache_node *cache_node;
+    struct ovsdb_monitor_json_cache_node *cache_node = NULL;
     struct shash_node *node;
     struct json *json;
-    uint64_t prev_txn = *unflushed;
-    uint64_t next_txn = dbmon->n_transactions + 1;
+    const uint64_t unflushed = *unflushed_;
+    const uint64_t next_unflushed = dbmon->n_transactions + 1;
+
+    ovs_assert(cond_updated ? unflushed == next_unflushed : true);
 
     /* Return a clone of cached json if one exists. Otherwise,
      * generate a new one and add it to the cache.  */
-    cache_node = ovsdb_monitor_json_cache_search(dbmon, prev_txn);
+    if (!condition || (!condition->conditional && !cond_updated)) {
+        cache_node = ovsdb_monitor_json_cache_search(dbmon, version,
+                                                     unflushed);
+    }
     if (cache_node) {
         json = cache_node->json ? json_clone(cache_node->json) : NULL;
     } else {
-        json = ovsdb_monitor_compose_update(dbmon, initial, prev_txn);
-        ovsdb_monitor_json_cache_insert(dbmon, prev_txn, json);
+        if (version == OVSDB_MONITOR_V1) {
+            json =
+               ovsdb_monitor_compose_update(dbmon, initial, unflushed,
+                                            condition,
+                                            ovsdb_monitor_compose_row_update);
+        } else {
+            ovs_assert(version == OVSDB_MONITOR_V2);
+            if (!cond_updated) {
+                               json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
+                                                                                       condition,
+                                                                                       ovsdb_monitor_compose_row_update2);
+
+                               if (!condition || !condition->conditional) {
+                                       ovsdb_monitor_json_cache_insert(dbmon, version, unflushed,
+                                                                                                       json);
+                               }
+                       } else {
+                /* Compose update on whole db due to condition update.
+                   Session must be flushed (change list is empty)*/
+                               json =
+                                       ovsdb_monitor_compose_cond_change_update(dbmon, condition);
+                       }
+               }
     }
 
     /* Maintain transaction id of 'changes'. */
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
 
-        ovsdb_monitor_table_untrack_changes(mt, prev_txn);
-        ovsdb_monitor_table_track_changes(mt, next_txn);
+        ovsdb_monitor_table_untrack_changes(mt, unflushed);
+        ovsdb_monitor_table_track_changes(mt, next_unflushed);
     }
-    *unflushed = next_txn;
+    *unflushed_ = next_unflushed;
 
     return json;
 }
@@ -661,9 +1191,36 @@ ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
     mt->select |= select;
 }
 
+ /*
+ * If a row's change type (insert, delete or modify) matches that of
+ * the monitor, they should be sent to the monitor's clients as updates.
+ * Of cause, the monitor should also internally update with this change.
+ *
+ * When a change type does not require client side update, the monitor
+ * may still need to keep track of certain changes in order to generate
+ * correct future updates.  For example, the monitor internal state should
+ * be updated whenever a new row is inserted, in order to generate the
+ * correct initial state, regardless if a insert change type is being
+ * monitored.
+ *
+ * On the other hand, if a transaction only contains changes to columns
+ * that are not monitored, this transaction can be safely ignored by the
+ * monitor.
+ *
+ * Thus, the order of the declaration is important:
+ * 'OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE' always implies
+ * 'OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE', but not vice versa.  */
+enum ovsdb_monitor_changes_efficacy {
+    OVSDB_CHANGES_NO_EFFECT,                /* Monitor does not care about this
+                                               change.  */
+    OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE,  /* Monitor internal updates. */
+    OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE,  /* Client needs to be updated.  */
+};
+
 struct ovsdb_monitor_aux {
     const struct ovsdb_monitor *monitor;
     struct ovsdb_monitor_table *mt;
+    enum ovsdb_monitor_changes_efficacy efficacy;
 };
 
 static void
@@ -672,6 +1229,7 @@ ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
 {
     aux->monitor = m;
     aux->mt = NULL;
+    aux->efficacy = OVSDB_CHANGES_NO_EFFECT;
 }
 
 static void
@@ -706,10 +1264,51 @@ ovsdb_monitor_changes_update(const struct ovsdb_row *old,
     }
 }
 
+static bool
+ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
+                              const unsigned long int *changed)
+{
+    size_t i;
+
+    for (i = 0; i < mt->n_columns; i++) {
+        size_t column_index = mt->columns[i].column->index;
+
+        if (bitmap_is_set(changed, column_index)) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
+/* Return the efficacy of a row's change to a monitor table.
+ *
+ * Please see the block comment above 'ovsdb_monitor_changes_efficacy'
+ * definition form more information.  */
+static enum ovsdb_monitor_changes_efficacy
+ovsdb_monitor_changes_classify(enum ovsdb_monitor_selection type,
+                               const struct ovsdb_monitor_table *mt,
+                               const unsigned long int *changed)
+{
+    if (type == OJMS_MODIFY &&
+        !ovsdb_monitor_columns_changed(mt, changed)) {
+        return OVSDB_CHANGES_NO_EFFECT;
+    }
+
+    if (type == OJMS_MODIFY) {
+        /* Condition might turn a modify operation to insert or delete */
+        type |= OJMS_INSERT | OJMS_DELETE;
+    }
+
+    return (mt->select & type)
+                ?  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
+                :  OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
+}
+
 static bool
 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
                         const struct ovsdb_row *new,
-                        const unsigned long int *changed OVS_UNUSED,
+                        const unsigned long int *changed,
                         void *aux_)
 {
     struct ovsdb_monitor_aux *aux = aux_;
@@ -728,19 +1327,28 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
     }
     mt = aux->mt;
 
+    enum ovsdb_monitor_selection type =
+        ovsdb_monitor_row_update_type(false, old, new);
+    enum ovsdb_monitor_changes_efficacy efficacy =
+        ovsdb_monitor_changes_classify(type, mt, changed);
+
     HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
-        ovsdb_monitor_changes_update(old, new, mt, changes);
+        if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
+            ovsdb_monitor_changes_update(old, new, mt, changes);
+        }
     }
+    if (aux->efficacy < efficacy) {
+        aux->efficacy = efficacy;
+    }
+
     return true;
 }
 
 void
 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
 {
-    struct ovsdb_monitor_aux aux;
     struct shash_node *node;
 
-    ovsdb_monitor_init_aux(&aux, dbmon);
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
 
@@ -763,11 +1371,12 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
 
 void
 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
-                   struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
+                   struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
+                   uint64_t unflushed)
 {
     struct jsonrpc_monitor_node *jm;
 
-    if (list_is_empty(&dbmon->jsonrpc_monitors)) {
+    if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
         ovsdb_monitor_destroy(dbmon);
         return;
     }
@@ -775,11 +1384,17 @@ ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
     /* Find and remove the jsonrpc monitor from the list.  */
     LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
         if (jm->jsonrpc_monitor == jsonrpc_monitor) {
-            list_remove(&jm->node);
+            /* Release the tracked changes. */
+            struct shash_node *node;
+            SHASH_FOR_EACH (node, &dbmon->tables) {
+                struct ovsdb_monitor_table *mt = node->data;
+                ovsdb_monitor_table_untrack_changes(mt, unflushed);
+            }
+            ovs_list_remove(&jm->node);
             free(jm);
 
             /* Destroy ovsdb monitor if this is the last user.  */
-            if (list_is_empty(&dbmon->jsonrpc_monitors)) {
+            if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
                 ovsdb_monitor_destroy(dbmon);
             }
 
@@ -797,19 +1412,21 @@ ovsdb_monitor_table_equal(const struct ovsdb_monitor_table *a,
 {
     size_t i;
 
+    ovs_assert(b->n_columns == b->n_monitored_columns);
+
     if ((a->table != b->table) ||
         (a->select != b->select) ||
-        (a->n_columns != b->n_columns)) {
+        (a->n_monitored_columns != b->n_monitored_columns)) {
         return false;
     }
 
-    for (i = 0; i < a->n_columns; i++) {
+    /* Compare only monitored columns that must be sorted already */
+    for (i = 0; i < a->n_monitored_columns; i++) {
         if ((a->columns[i].column != b->columns[i].column) ||
             (a->columns[i].select != b->columns[i].select)) {
             return false;
         }
     }
-
     return true;
 }
 
@@ -873,7 +1490,9 @@ ovsdb_monitor_add(struct ovsdb_monitor *new_dbmon)
 
     /* New_dbmon should be associated with only one jsonrpc
      * connections.  */
-    ovs_assert(list_is_singleton(&new_dbmon->jsonrpc_monitors));
+    ovs_assert(ovs_list_is_singleton(&new_dbmon->jsonrpc_monitors));
+
+    ovsdb_monitor_columns_sort(new_dbmon);
 
     hash = ovsdb_monitor_hash(new_dbmon, 0);
     HMAP_FOR_EACH_WITH_HASH(dbmon, hmap_node, hash, &ovsdb_monitors) {
@@ -891,7 +1510,7 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
 {
     struct shash_node *node;
 
-    list_remove(&dbmon->replica.node);
+    ovs_list_remove(&dbmon->replica.node);
 
     if (!hmap_node_is_null(&dbmon->hmap_node)) {
         hmap_remove(&ovsdb_monitors, &dbmon->hmap_node);
@@ -908,7 +1527,9 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
             hmap_remove(&mt->changes, &changes->hmap_node);
             ovsdb_monitor_changes_destroy(changes);
         }
+        hmap_destroy(&mt->changes);
         free(mt->columns);
+        free(mt->columns_index_map);
         free(mt);
     }
     shash_destroy(&dbmon->tables);
@@ -923,10 +1544,26 @@ ovsdb_monitor_commit(struct ovsdb_replica *replica,
     struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
     struct ovsdb_monitor_aux aux;
 
-    ovsdb_monitor_json_cache_flush(m);
     ovsdb_monitor_init_aux(&aux, m);
-    ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
+    /* Update ovsdb_monitor's transaction number for
+     * each transaction, before calling ovsdb_monitor_change_cb().  */
     m->n_transactions++;
+    ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
+
+    switch(aux.efficacy) {
+    case OVSDB_CHANGES_NO_EFFECT:
+        /* The transaction is ignored by the monitor.
+         * Roll back the 'n_transactions' as if the transaction
+         * has never happened. */
+        m->n_transactions--;
+        break;
+    case OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE:
+        /* Nothing.  */
+        break;
+    case  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE:
+        ovsdb_monitor_json_cache_flush(m);
+        break;
+    }
 
     return NULL;
 }
@@ -945,6 +1582,19 @@ ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
     }
 }
 
+/* Add some memory usage statics for monitors into 'usage', for use with
+ * memory_report().  */
+void
+ovsdb_monitor_get_memory_usage(struct simap *usage)
+{
+    struct ovsdb_monitor *dbmon;
+    simap_put(usage, "monitors", hmap_count(&ovsdb_monitors));
+
+    HMAP_FOR_EACH(dbmon, hmap_node,  &ovsdb_monitors) {
+        simap_increase(usage, "json-caches", hmap_count(&dbmon->json_cache));
+    }
+}
+
 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
     ovsdb_monitor_commit,
     ovsdb_monitor_destroy_callback,