ovsdb: optimize match_any_clause() condition evaluation
[cascardo/ovs.git] / ovsdb / condition.c
index 7bd8ee2..39a0977 100644 (file)
@@ -192,6 +192,69 @@ compare_clauses_3way_with_data(const void *a_, const void *b_)
                                                 &a->column->type);
  }
 
+struct ovsdb_o_column {
+    const struct ovsdb_column *column;
+    struct hmap o_clauses;
+};
+
+struct ovsdb_o_clause {
+    struct ovsdb_datum *arg;
+    struct hmap_node hmap_node;
+};
+
+static void
+ovsdb_condition_optimize(struct ovsdb_condition *cnd)
+{
+    size_t i;
+    uint32_t hash;
+
+    if (!cnd->optimized) {
+        return;
+    }
+
+    for(i = 0; i < cnd->n_clauses; i++) {
+        struct ovsdb_clause *clause = &cnd->clauses[i];
+
+        if (clause->function != OVSDB_F_EQ) {
+            continue;
+        }
+
+        struct ovsdb_o_clause *o_clause = xzalloc(sizeof *o_clause);
+        struct ovsdb_o_column *o_column =
+            shash_find_data(&cnd->o_columns, clause->column->name);
+
+        if (!o_column) {
+            o_column = xzalloc(sizeof *o_column);
+            o_column->column = clause->column;
+            hmap_init(&o_column->o_clauses);
+            shash_add(&cnd->o_columns, clause->column->name, o_column);
+        }
+        o_clause->arg = &clause->arg;
+        hash = ovsdb_datum_hash(&clause->arg, &clause->column->type, 0);
+        hmap_insert(&o_column->o_clauses, &o_clause->hmap_node, hash);
+    }
+}
+
+static void
+ovsdb_condition_optimize_destroy(struct ovsdb_condition *cnd)
+{
+     struct shash_node *node, *next;
+
+     SHASH_FOR_EACH_SAFE (node, next, &cnd->o_columns) {
+         struct ovsdb_o_column *o_column = node->data;
+         struct ovsdb_o_clause *c, *c_next;
+
+         HMAP_FOR_EACH_SAFE(c, c_next, hmap_node, &o_column->o_clauses) {
+             hmap_remove(&o_column->o_clauses, &c->hmap_node);
+             free(c);
+         }
+         hmap_destroy(&o_column->o_clauses);
+         shash_delete(&cnd->o_columns, node);
+         free(o_column);
+     }
+     shash_destroy(&cnd->o_columns);
+}
+
 struct ovsdb_error *
 ovsdb_condition_from_json(const struct ovsdb_table_schema *ts,
                           const struct json *json,
@@ -201,8 +264,9 @@ ovsdb_condition_from_json(const struct ovsdb_table_schema *ts,
     const struct json_array *array = json_array(json);
     size_t i;
 
+    ovsdb_condition_init(cnd);
     cnd->clauses = xmalloc(array->n * sizeof *cnd->clauses);
-    cnd->n_clauses = 0;
+
     for (i = 0; i < array->n; i++) {
         struct ovsdb_error *error;
         error = ovsdb_clause_from_json(ts, array->elems[i], symtab,
@@ -214,12 +278,17 @@ ovsdb_condition_from_json(const struct ovsdb_table_schema *ts,
             return error;
         }
         cnd->n_clauses++;
+        if (cnd->clauses[i].function > OVSDB_F_EQ) {
+            cnd->optimized = false;
+        }
     }
 
     /* A real database would have a query optimizer here. */
     qsort(cnd->clauses, cnd->n_clauses, sizeof *cnd->clauses,
           compare_clauses_3way_with_data);
 
+    ovsdb_condition_optimize(cnd);
+
     return NULL;
 }
 
@@ -352,6 +421,34 @@ ovsdb_condition_match_every_clause(const struct ovsdb_row *row,
     return true;
 }
 
+static bool
+ovsdb_condition_match_any_clause_optimized(const struct ovsdb_datum *row_datum,
+                                           const struct ovsdb_condition *cnd,
+                                           unsigned int index_map[])
+{
+    if (ovsdb_condition_is_true(cnd)) {
+        return true;
+    }
+
+    struct shash_node *node;
+    SHASH_FOR_EACH (node, &cnd->o_columns) {
+        struct ovsdb_o_column *o_column = node->data;
+        const struct ovsdb_column *column = o_column->column;
+        const struct ovsdb_datum *arg = &row_datum[index_map ?
+                                                   index_map[column->index] :
+                                                   column->index];
+        uint32_t hash = ovsdb_datum_hash(arg, &column->type, 0);
+        struct ovsdb_o_clause *o_clause;
+
+        HMAP_FOR_EACH_WITH_HASH(o_clause, hmap_node, hash, &o_column->o_clauses) {
+            if (ovsdb_datum_equals(arg, o_clause->arg, &column->type)) {
+                return true;
+            }
+        }
+    }
+    return false;
+}
+
 /* Returns true if condition evaluation of one of the clauses is
  * true. index_map[] is an optional array that if exists indicates a mapping
  * between indexing row_datum to the indexes in ovsdb_column */
@@ -362,6 +459,11 @@ ovsdb_condition_match_any_clause(const struct ovsdb_datum *row_datum,
 {
     size_t i;
 
+    if (cnd->optimized) {
+        return ovsdb_condition_match_any_clause_optimized(row_datum, cnd,
+                                                          index_map);
+    }
+
     for (i = 0; i < cnd->n_clauses; i++) {
         if (ovsdb_clause_evaluate(row_datum, &cnd->clauses[i], index_map)) {
             return true;
@@ -381,6 +483,8 @@ ovsdb_condition_destroy(struct ovsdb_condition *cnd)
     }
     free(cnd->clauses);
     cnd->n_clauses = 0;
+
+    ovsdb_condition_optimize_destroy(cnd);
 }
 
 void
@@ -388,6 +492,8 @@ ovsdb_condition_init(struct ovsdb_condition *cnd)
 {
     cnd->clauses = NULL;
     cnd->n_clauses = 0;
+    cnd->optimized = true;
+    shash_init(&cnd->o_columns);
 }
 
 bool
@@ -424,12 +530,18 @@ ovsdb_condition_clone(struct ovsdb_condition *to,
 {
     size_t i;
 
+    ovsdb_condition_init(to);
+
     to->clauses = xzalloc(from->n_clauses * sizeof *to->clauses);
 
     for (i = 0; i < from->n_clauses; i++) {
         ovsdb_clause_clone(&to->clauses[i], &from->clauses[i]);
     }
     to->n_clauses = from->n_clauses;
+    to->optimized = from->optimized;
+    if (to->optimized) {
+        ovsdb_condition_optimize(to);
+    }
 }
 
 /* Return true if ovsdb_condition_match_any_clause() will return true on