ovsdb: generate update notifications for monitor_cond session
[cascardo/ovs.git] / ovsdb / monitor.c
index 5228a0a..b60ed67 100644 (file)
@@ -27,6 +27,7 @@
 #include "ovsdb-parser.h"
 #include "ovsdb.h"
 #include "row.h"
+#include "condition.h"
 #include "simap.h"
 #include "hash.h"
 #include "table.h"
 #include "monitor.h"
 #include "openvswitch/vlog.h"
 
+VLOG_DEFINE_THIS_MODULE(ovsdb_monitor);
 
 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
 static struct hmap ovsdb_monitors = HMAP_INITIALIZER(&ovsdb_monitors);
 
+/* Keep state of session's conditions */
+struct ovsdb_monitor_session_condition {
+    bool conditional;
+    size_t n_true_cnd;
+    struct shash tables;     /* Contains
+                              *   "struct ovsdb_monitor_table_condition *"s. */
+};
+
+/* Monitored table session's conditions */
+struct ovsdb_monitor_table_condition {
+    const struct ovsdb_table *table;
+    struct ovsdb_monitor_table *mt;
+    struct ovsdb_condition old_condition;
+    struct ovsdb_condition new_condition;
+};
+
 /*  Backend monitor.
  *
  *  ovsdb_monitor keep track of the ovsdb changes.
@@ -129,9 +147,11 @@ struct ovsdb_monitor_table {
 };
 
 typedef struct json *
-(*compose_row_update_cb_func)(const struct ovsdb_monitor_table *mt,
-                              const struct ovsdb_monitor_row *row,
-                              bool initial, unsigned long int *changed);
+(*compose_row_update_cb_func)
+    (const struct ovsdb_monitor_table *mt,
+     const struct ovsdb_monitor_session_condition * condition,
+     const struct ovsdb_monitor_row *row,
+     bool initial, unsigned long int *changed);
 
 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
 static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
@@ -421,6 +441,42 @@ ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
     return NULL;
 }
 
+static void
+ovsdb_monitor_condition_add_columns(struct ovsdb_monitor *dbmon,
+                                    const struct ovsdb_table *table,
+                                    struct ovsdb_condition *condition)
+{
+    size_t n_columns;
+    int i;
+    const struct ovsdb_column **columns =
+        ovsdb_condition_get_columns(condition, &n_columns);
+
+    for (i = 0; i < n_columns; i++) {
+        ovsdb_monitor_add_column(dbmon, table, columns[i],
+                                 OJMS_NONE, false);
+    }
+
+    free(columns);
+}
+
+/* Bind this session's condition to ovsdb_monitor */
+void
+ovsdb_monitor_condition_bind(struct ovsdb_monitor *dbmon,
+                          struct ovsdb_monitor_session_condition *cond)
+{
+    struct shash_node *node;
+
+    SHASH_FOR_EACH(node, &cond->tables) {
+        struct ovsdb_monitor_table_condition *mtc = node->data;
+        struct ovsdb_monitor_table *mt =
+            shash_find_data(&dbmon->tables, mtc->table->schema->name);
+
+        mtc->mt = mt;
+        ovsdb_monitor_condition_add_columns(dbmon, mtc->table,
+                                            &mtc->new_condition);
+    }
+}
+
 static struct ovsdb_monitor_changes *
 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
                                 uint64_t next_txn)
@@ -506,6 +562,161 @@ ovsdb_monitor_row_update_type(bool initial, const bool old, const bool new)
             : !new ? OJMS_DELETE
             : OJMS_MODIFY;
 }
+
+/* Set conditional monitoring mode only if we have non-empty condition in one
+ * of the tables at least */
+static inline void
+ovsdb_monitor_session_condition_set_mode(
+                                  struct ovsdb_monitor_session_condition *cond)
+{
+    cond->conditional = shash_count(&cond->tables) !=
+        cond->n_true_cnd;
+}
+
+/* Returnes an empty allocated session's condition state holder */
+struct ovsdb_monitor_session_condition *
+ovsdb_monitor_session_condition_create(void)
+{
+    struct ovsdb_monitor_session_condition *condition =
+        xzalloc(sizeof *condition);
+
+    condition->conditional = false;
+    shash_init(&condition->tables);
+    return condition;
+}
+
+void
+ovsdb_monitor_session_condition_destroy(
+                           struct ovsdb_monitor_session_condition *condition)
+{
+    struct shash_node *node, *next;
+
+    if (!condition) {
+        return;
+    }
+
+    SHASH_FOR_EACH_SAFE (node, next, &condition->tables) {
+        struct ovsdb_monitor_table_condition *mtc = node->data;
+
+        ovsdb_condition_destroy(&mtc->new_condition);
+        ovsdb_condition_destroy(&mtc->old_condition);
+        shash_delete(&condition->tables, node);
+        free(mtc);
+    }
+    free(condition);
+}
+
+struct ovsdb_error *
+ovsdb_monitor_table_condition_create(
+                         struct ovsdb_monitor_session_condition *condition,
+                         const struct ovsdb_table *table,
+                         const struct json *json_cnd)
+{
+    struct ovsdb_monitor_table_condition *mtc;
+    struct ovsdb_error *error;
+
+    mtc = xzalloc(sizeof *mtc);
+    mtc->table = table;
+    ovsdb_condition_init(&mtc->old_condition);
+    ovsdb_condition_init(&mtc->new_condition);
+
+    if (json_cnd) {
+        error = ovsdb_condition_from_json(table->schema,
+                                          json_cnd,
+                                          NULL,
+                                          &mtc->old_condition);
+        if (error) {
+            free(mtc);
+            return error;
+        }
+    }
+
+    shash_add(&condition->tables, table->schema->name, mtc);
+    /* On session startup old == new condition */
+    ovsdb_condition_clone(&mtc->new_condition, &mtc->old_condition);
+    if (ovsdb_condition_is_true(&mtc->old_condition)) {
+        condition->n_true_cnd++;
+        ovsdb_monitor_session_condition_set_mode(condition);
+    }
+
+    return NULL;
+}
+
+static bool
+ovsdb_monitor_get_table_conditions(
+                      const struct ovsdb_monitor_table *mt,
+                      const struct ovsdb_monitor_session_condition *condition,
+                      struct ovsdb_condition **old_condition,
+                      struct ovsdb_condition **new_condition)
+{
+    if (!condition) {
+        return false;
+    }
+
+    struct ovsdb_monitor_table_condition *mtc =
+        shash_find_data(&condition->tables, mt->table->schema->name);
+
+    if (!mtc) {
+        return false;
+    }
+    *old_condition = &mtc->old_condition;
+    *new_condition = &mtc->new_condition;
+
+    return true;
+}
+
+static enum ovsdb_monitor_selection
+ovsdb_monitor_row_update_type_condition(
+                      const struct ovsdb_monitor_table *mt,
+                      const struct ovsdb_monitor_session_condition *condition,
+                      bool initial,
+                      const struct ovsdb_datum *old,
+                      const struct ovsdb_datum *new)
+{
+    struct ovsdb_condition *old_condition, *new_condition;
+    enum ovsdb_monitor_selection type =
+        ovsdb_monitor_row_update_type(initial, old, new);
+
+    if (ovsdb_monitor_get_table_conditions(mt,
+                                           condition,
+                                           &old_condition,
+                                           &new_condition)) {
+        bool old_cond = !old ? false
+            : ovsdb_condition_empty_or_match_any(old,
+                                                 old_condition,
+                                                 mt->columns_index_map);
+        bool new_cond = !new ? false
+            : ovsdb_condition_empty_or_match_any(new,
+                                                 new_condition,
+                                                 mt->columns_index_map);
+
+        if (!old_cond && !new_cond) {
+            type = OJMS_NONE;
+        }
+
+        switch (type) {
+        case OJMS_INITIAL:
+        case OJMS_INSERT:
+            if (!new_cond) {
+                type = OJMS_NONE;
+            }
+            break;
+        case OJMS_MODIFY:
+            type = !old_cond ? OJMS_INSERT : !new_cond
+                ? OJMS_DELETE : OJMS_MODIFY;
+            break;
+        case OJMS_DELETE:
+            if (!old_cond) {
+                type = OJMS_NONE;
+            }
+            break;
+        case OJMS_NONE:
+            break;
+        }
+    }
+    return type;
+}
+
 static bool
 ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
                               const struct ovsdb_monitor_row *row,
@@ -550,6 +761,7 @@ ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
 static struct json *
 ovsdb_monitor_compose_row_update(
     const struct ovsdb_monitor_table *mt,
+    const struct ovsdb_monitor_session_condition *condition OVS_UNUSED,
     const struct ovsdb_monitor_row *row,
     bool initial, unsigned long int *changed)
 {
@@ -611,6 +823,7 @@ ovsdb_monitor_compose_row_update(
 static struct json *
 ovsdb_monitor_compose_row_update2(
     const struct ovsdb_monitor_table *mt,
+    const struct ovsdb_monitor_session_condition *condition,
     const struct ovsdb_monitor_row *row,
     bool initial, unsigned long int *changed)
 {
@@ -618,7 +831,8 @@ ovsdb_monitor_compose_row_update2(
     struct json *row_update2, *diff_json;
     size_t i;
 
-    type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
+    type = ovsdb_monitor_row_update_type_condition(mt, condition, initial,
+                                                   row->old, row->new);
     if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
         return NULL;
     }
@@ -688,9 +902,11 @@ ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
  * RFC 7047) for all the outstanding changes within 'monitor', starting from
  * 'transaction'.  */
 static struct json*
-ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
-                             bool initial, uint64_t transaction,
-                             compose_row_update_cb_func row_update)
+ovsdb_monitor_compose_update(
+                      struct ovsdb_monitor *dbmon,
+                      bool initial, uint64_t transaction,
+                      const struct ovsdb_monitor_session_condition *condition,
+                      compose_row_update_cb_func row_update)
 {
     struct shash_node *node;
     struct json *json;
@@ -712,7 +928,7 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
             struct json *row_json;
 
-            row_json = (*row_update)(mt, row, initial, changed);
+            row_json = (*row_update)(mt, condition, row, initial, changed);
             if (row_json) {
                 char uuid[UUID_LEN + 1];
 
@@ -746,11 +962,13 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
  * be used as part of the initial reply to a "monitor" request, false if it is
  * going to be used as part of an "update" notification. */
 struct json *
-ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon,
-                         bool initial, uint64_t *unflushed_,
-                         enum ovsdb_monitor_version version)
+ovsdb_monitor_get_update(
+             struct ovsdb_monitor *dbmon,
+             bool initial, uint64_t *unflushed_,
+             const struct ovsdb_monitor_session_condition *condition,
+             enum ovsdb_monitor_version version)
 {
-    struct ovsdb_monitor_json_cache_node *cache_node;
+    struct ovsdb_monitor_json_cache_node *cache_node = NULL;
     struct shash_node *node;
     struct json *json;
     const uint64_t unflushed = *unflushed_;
@@ -758,19 +976,28 @@ ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon,
 
     /* Return a clone of cached json if one exists. Otherwise,
      * generate a new one and add it to the cache.  */
-    cache_node = ovsdb_monitor_json_cache_search(dbmon, version, unflushed);
+    if (!condition || !condition->conditional) {
+        cache_node = ovsdb_monitor_json_cache_search(dbmon, version,
+                                                     unflushed);
+    }
     if (cache_node) {
         json = cache_node->json ? json_clone(cache_node->json) : NULL;
     } else {
         if (version == OVSDB_MONITOR_V1) {
-            json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
-                                        ovsdb_monitor_compose_row_update);
+            json =
+               ovsdb_monitor_compose_update(dbmon, initial, unflushed,
+                                            condition,
+                                            ovsdb_monitor_compose_row_update);
         } else {
             ovs_assert(version == OVSDB_MONITOR_V2);
-            json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
-                                        ovsdb_monitor_compose_row_update2);
+            json =
+               ovsdb_monitor_compose_update(dbmon, initial, unflushed,
+                                            condition,
+                                            ovsdb_monitor_compose_row_update2);
+        }
+        if (!condition || !condition->conditional) {
+            ovsdb_monitor_json_cache_insert(dbmon, version, unflushed, json);
         }
-        ovsdb_monitor_json_cache_insert(dbmon, version, unflushed, json);
     }
 
     /* Maintain transaction id of 'changes'. */
@@ -908,6 +1135,11 @@ ovsdb_monitor_changes_classify(enum ovsdb_monitor_selection type,
         return OVSDB_CHANGES_NO_EFFECT;
     }
 
+    if (type == OJMS_MODIFY) {
+        /* Condition might turn a modify operation to insert or delete */
+        type |= OJMS_INSERT | OJMS_DELETE;
+    }
+
     return (mt->select & type)
                 ?  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
                 :  OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
@@ -935,19 +1167,18 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
     }
     mt = aux->mt;
 
-    HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
-        enum ovsdb_monitor_changes_efficacy efficacy;
-        enum ovsdb_monitor_selection type;
+    enum ovsdb_monitor_selection type =
+        ovsdb_monitor_row_update_type(false, old, new);
+    enum ovsdb_monitor_changes_efficacy efficacy =
+        ovsdb_monitor_changes_classify(type, mt, changed);
 
-        type = ovsdb_monitor_row_update_type(false, old, new);
-        efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
+    HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
         if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
             ovsdb_monitor_changes_update(old, new, mt, changes);
         }
-
-        if (aux->efficacy < efficacy) {
-            aux->efficacy = efficacy;
-        }
+    }
+    if (aux->efficacy < efficacy) {
+        aux->efficacy = efficacy;
     }
 
     return true;