static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
enum ovsdb_monitor_version, const struct json *request_id);
+static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_change(
+ struct ovsdb_jsonrpc_session *s,
+ struct json *params,
+ const struct json *request_id);
static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
struct ovsdb_jsonrpc_session *,
struct json_array *params,
static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
static struct json *ovsdb_jsonrpc_monitor_compose_update(
struct ovsdb_jsonrpc_monitor *monitor, bool initial);
+static struct jsonrpc_msg * ovsdb_jsonrpc_create_notify(
+ const struct ovsdb_jsonrpc_monitor *m,
+ struct json *params);
\f
/* JSON-RPC database server. */
static void ovsdb_jsonrpc_session_get_memory_usage(
const struct ovsdb_jsonrpc_session *, struct simap *usage);
static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
- struct jsonrpc_msg *);
+ struct jsonrpc_msg *);
static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
struct jsonrpc_msg *);
reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
version, request->id);
}
+ } else if (!strcmp(request->method, "monitor_cond_change")) {
+ reply = ovsdb_jsonrpc_monitor_cond_change(s, request->params,
+ request->id);
} else if (!strcmp(request->method, "monitor_cancel")) {
reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
request->id);
return jsonrpc_create_error(json, request_id);
}
+static struct ovsdb_error *
+ovsdb_jsonrpc_parse_monitor_cond_change_request(
+ struct ovsdb_jsonrpc_monitor *m,
+ const struct ovsdb_table *table,
+ const struct json *cond_change_req)
+{
+ const struct ovsdb_table_schema *ts = table->schema;
+ const struct json *condition, *columns;
+ struct ovsdb_parser parser;
+ struct ovsdb_error *error;
+
+ ovsdb_parser_init(&parser, cond_change_req, "table %s", ts->name);
+ columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
+ condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
+
+ error = ovsdb_parser_finish(&parser);
+ if (error) {
+ return error;
+ }
+
+ if (columns) {
+ error = ovsdb_syntax_error(cond_change_req, NULL, "changing columns "
+ "is unsupported");
+ return error;
+ }
+ error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition, table,
+ condition);
+
+ return error;
+}
+
+static struct jsonrpc_msg *
+ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
+ struct json *params,
+ const struct json *request_id)
+{
+ struct ovsdb_error *error;
+ struct ovsdb_jsonrpc_monitor *m;
+ struct json *monitor_cond_change_reqs;
+ struct shash_node *node;
+ struct json *json;
+
+ if (json_array(params)->n != 3) {
+ error = ovsdb_syntax_error(params, NULL, "invalid parameters");
+ goto error;
+ }
+
+ m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
+ if (!m) {
+ error = ovsdb_syntax_error(request_id, NULL,
+ "unknown monitor session");
+ goto error;
+ }
+
+ monitor_cond_change_reqs = params->u.array.elems[2];
+ if (monitor_cond_change_reqs->type != JSON_OBJECT) {
+ error =
+ ovsdb_syntax_error(NULL, NULL,
+ "monitor-cond-change-requests must be object");
+ goto error;
+ }
+
+ SHASH_FOR_EACH (node, json_object(monitor_cond_change_reqs)) {
+ const struct ovsdb_table *table;
+ const struct json *mr_value;
+ size_t i;
+
+ table = ovsdb_get_table(m->db, node->name);
+ if (!table) {
+ error = ovsdb_syntax_error(NULL, NULL,
+ "no table named %s", node->name);
+ goto error;
+ }
+ if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
+ error = ovsdb_syntax_error(NULL, NULL,
+ "no table named %s in monitor session",
+ node->name);
+ goto error;
+ }
+
+ mr_value = node->data;
+ if (mr_value->type == JSON_ARRAY) {
+ const struct json_array *array = &mr_value->u.array;
+
+ for (i = 0; i < array->n; i++) {
+ error = ovsdb_jsonrpc_parse_monitor_cond_change_request(
+ m, table, array->elems[i]);
+ if (error) {
+ goto error;
+ }
+ }
+ } else {
+ error = ovsdb_syntax_error(
+ NULL, NULL,
+ "table %s no monitor-cond-change JSON array",
+ node->name);
+ goto error;
+ }
+ }
+
+ /* Change monitor id */
+ hmap_remove(&s->monitors, &m->node);
+ json_destroy(m->monitor_id);
+ m->monitor_id = json_clone(params->u.array.elems[1]);
+ hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
+
+ /* Send the new update, if any, represents the difference from the old
+ * condition and the new one. */
+ struct json *update_json;
+
+ update_json = ovsdb_monitor_get_update(m->dbmon, false, true,
+ &m->unflushed, m->condition, m->version);
+ if (update_json) {
+ struct jsonrpc_msg *msg;
+ struct json *params;
+
+ params = json_array_create_2(json_clone(m->monitor_id), update_json);
+ msg = ovsdb_jsonrpc_create_notify(m, params);
+ jsonrpc_session_send(s->js, msg);
+ }
+
+ return jsonrpc_create_reply(json_object_create(), request_id);
+
+error:
+
+ json = ovsdb_error_to_json(error);
+ ovsdb_error_destroy(error);
+ return jsonrpc_create_error(json, request_id);
+}
+
static struct jsonrpc_msg *
ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
struct json_array *params,
ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
bool initial)
{
+
if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
return NULL;
}
- return ovsdb_monitor_get_update(m->dbmon, initial, &m->unflushed,
- m->condition, m->version);
+ return ovsdb_monitor_get_update(m->dbmon, initial, false,
+ &m->unflushed, m->condition, m->version);
}
static bool
struct hmap changes;
};
+enum ovsdb_monitor_row_type {
+ OVSDB_ROW,
+ OVSDB_MONITOR_ROW
+};
+
typedef struct json *
(*compose_row_update_cb_func)
(const struct ovsdb_monitor_table *mt,
const struct ovsdb_monitor_session_condition * condition,
- const struct ovsdb_monitor_row *row,
+ enum ovsdb_monitor_row_type row_type,
+ const void *,
bool initial, unsigned long int *changed);
static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
}
}
+bool
+ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
+ const struct ovsdb_table *table)
+{
+ return shash_find_data(&m->tables, table->schema->name);
+}
+
static struct ovsdb_monitor_changes *
ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
uint64_t next_txn)
return true;
}
+struct ovsdb_error *
+ovsdb_monitor_table_condition_update(
+ struct ovsdb_monitor *dbmon,
+ struct ovsdb_monitor_session_condition *condition,
+ const struct ovsdb_table *table,
+ const struct json *cond_json)
+{
+ struct ovsdb_monitor_table_condition *mtc =
+ shash_find_data(&condition->tables, table->schema->name);
+ struct ovsdb_error *error;
+ struct ovsdb_condition cond = OVSDB_CONDITION_INITIALIZER;
+
+ if (!condition) {
+ return NULL;
+ }
+
+ error = ovsdb_condition_from_json(table->schema, cond_json,
+ NULL, &cond);
+ if (error) {
+ return error;
+ }
+ ovsdb_condition_destroy(&mtc->new_condition);
+ ovsdb_condition_clone(&mtc->new_condition, &cond);
+ ovsdb_condition_destroy(&cond);
+ ovsdb_monitor_condition_add_columns(dbmon,
+ table,
+ &mtc->new_condition);
+
+ return NULL;
+}
+
+static void
+ovsdb_monitor_table_condition_updated(struct ovsdb_monitor_table *mt,
+ struct ovsdb_monitor_session_condition *condition)
+{
+ struct ovsdb_monitor_table_condition *mtc =
+ shash_find_data(&condition->tables, mt->table->schema->name);
+
+ if (mtc) {
+ /* If conditional monitoring - set old condition to new condition */
+ if (ovsdb_condition_cmp_3way(&mtc->old_condition,
+ &mtc->new_condition)) {
+ if (ovsdb_condition_is_true(&mtc->new_condition)) {
+ if (!ovsdb_condition_is_true(&mtc->old_condition)) {
+ condition->n_true_cnd++;
+ }
+ } else {
+ if (ovsdb_condition_is_true(&mtc->old_condition)) {
+ condition->n_true_cnd--;
+ }
+ }
+ ovsdb_condition_destroy(&mtc->old_condition);
+ ovsdb_condition_clone(&mtc->old_condition, &mtc->new_condition);
+ ovsdb_monitor_session_condition_set_mode(condition);
+ }
+ }
+}
+
static enum ovsdb_monitor_selection
ovsdb_monitor_row_update_type_condition(
const struct ovsdb_monitor_table *mt,
const struct ovsdb_monitor_session_condition *condition,
bool initial,
+ enum ovsdb_monitor_row_type row_type,
const struct ovsdb_datum *old,
const struct ovsdb_datum *new)
{
&new_condition)) {
bool old_cond = !old ? false
: ovsdb_condition_empty_or_match_any(old,
- old_condition,
- mt->columns_index_map);
+ old_condition,
+ row_type == OVSDB_MONITOR_ROW ?
+ mt->columns_index_map :
+ NULL);
bool new_cond = !new ? false
: ovsdb_condition_empty_or_match_any(new,
- new_condition,
- mt->columns_index_map);
+ new_condition,
+ row_type == OVSDB_MONITOR_ROW ?
+ mt->columns_index_map :
+ NULL);
if (!old_cond && !new_cond) {
type = OJMS_NONE;
static bool
ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
- const struct ovsdb_monitor_row *row,
+ enum ovsdb_monitor_row_type row_type,
+ const struct ovsdb_datum *old,
+ const struct ovsdb_datum *new,
enum ovsdb_monitor_selection type,
unsigned long int *changed)
{
memset(changed, 0, bitmap_n_bytes(mt->n_columns));
for (i = 0; i < mt->n_columns; i++) {
const struct ovsdb_column *c = mt->columns[i].column;
- if (!ovsdb_datum_equals(&row->old[i], &row->new[i], &c->type)) {
+ size_t index = row_type == OVSDB_ROW ? c->index : i;
+ if (!ovsdb_datum_equals(&old[index], &new[index], &c->type)) {
bitmap_set1(changed, i);
n_changes++;
}
ovsdb_monitor_compose_row_update(
const struct ovsdb_monitor_table *mt,
const struct ovsdb_monitor_session_condition *condition OVS_UNUSED,
- const struct ovsdb_monitor_row *row,
+ enum ovsdb_monitor_row_type row_type OVS_UNUSED,
+ const void *_row,
bool initial, unsigned long int *changed)
{
+ const struct ovsdb_monitor_row *row = _row;
enum ovsdb_monitor_selection type;
struct json *old_json, *new_json;
struct json *row_json;
size_t i;
+ ovs_assert(row_type == OVSDB_MONITOR_ROW);
type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
- if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
+ if (ovsdb_monitor_row_skip_update(mt, row_type, row->old,
+ row->new, type, changed)) {
return NULL;
}
* for 'row' within * 'mt', or NULL if no row update should be sent.
*
* The caller should specify 'initial' as true if the returned JSON is
- * going to be used as part of the initial reply to a "monitor2" request,
+ * going to be used as part of the initial reply to a "monitor_cond" request,
* false if it is going to be used as part of an "update2" notification.
*
* 'changed' must be a scratch buffer for internal use that is at least
ovsdb_monitor_compose_row_update2(
const struct ovsdb_monitor_table *mt,
const struct ovsdb_monitor_session_condition *condition,
- const struct ovsdb_monitor_row *row,
+ enum ovsdb_monitor_row_type row_type,
+ const void *_row,
bool initial, unsigned long int *changed)
{
enum ovsdb_monitor_selection type;
struct json *row_update2, *diff_json;
+ const struct ovsdb_datum *old, *new;
size_t i;
+ if (row_type == OVSDB_MONITOR_ROW) {
+ old = ((const struct ovsdb_monitor_row *)_row)->old;;
+ new = ((const struct ovsdb_monitor_row *)_row)->new;
+ } else {
+ old = new = ((const struct ovsdb_row *)_row)->fields;
+ }
+
type = ovsdb_monitor_row_update_type_condition(mt, condition, initial,
- row->old, row->new);
- if (ovsdb_monitor_row_skip_update(mt, row, type, changed)) {
+ row_type, old, new);
+ if (ovsdb_monitor_row_skip_update(mt, row_type, old, new, type, changed)) {
return NULL;
}
for (i = 0; i < mt->n_monitored_columns; i++) {
const struct ovsdb_monitor_column *c = &mt->columns[i];
-
+ size_t index = row_type == OVSDB_ROW ? c->column->index : i;
if (!c->monitored || !(type & c->select)) {
/* We don't care about this type of change for this
* particular column (but we will care about it for some
continue;
}
- ovsdb_datum_diff(&diff ,&row->old[i], &row->new[i],
+ ovsdb_datum_diff(&diff ,&old[index], &new[index],
&c->column->type);
json_object_put(diff_json, c->column->name,
ovsdb_datum_to_json(&diff, &c->column->type));
ovsdb_datum_destroy(&diff, &c->column->type);
} else {
- if (!ovsdb_datum_is_default(&row->new[i], &c->column->type)) {
+ if (!ovsdb_datum_is_default(&new[index], &c->column->type)) {
json_object_put(diff_json, c->column->name,
- ovsdb_datum_to_json(&row->new[i],
+ ovsdb_datum_to_json(&new[index],
&c->column->type));
}
}
return max_columns;
}
+static void
+ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
+ struct json **table_json, struct json *row_json,
+ const struct uuid *row_uuid)
+{
+ char uuid[UUID_LEN + 1];
+
+ /* Create JSON object for transaction overall. */
+ if (!*json) {
+ *json = json_object_create();
+ }
+
+ /* Create JSON object for transaction on this table. */
+ if (!*table_json) {
+ *table_json = json_object_create();
+ json_object_put(*json, table_name, *table_json);
+ }
+
+ /* Add JSON row to JSON table. */
+ snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(row_uuid));
+ json_object_put(*table_json, uuid, row_json);
+}
+
/* Constructs and returns JSON for a <table-updates> object (as described in
* RFC 7047) for all the outstanding changes within 'monitor', starting from
* 'transaction'. */
continue;
}
- HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
- struct json *row_json;
+ HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
+ struct json *row_json;
+ row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
+ initial, changed);
+ if (row_json) {
+ ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
+ &table_json, row_json,
+ &row->uuid);
+ }
+ }
+ }
+ free(changed);
- row_json = (*row_update)(mt, condition, row, initial, changed);
- if (row_json) {
- char uuid[UUID_LEN + 1];
+ return json;
+}
- /* Create JSON object for transaction overall. */
- if (!json) {
- json = json_object_create();
- }
+static struct json*
+ovsdb_monitor_compose_cond_change_update(
+ struct ovsdb_monitor *dbmon,
+ struct ovsdb_monitor_session_condition *condition)
+{
+ struct shash_node *node;
+ struct json *json = NULL;
+ size_t max_columns = ovsdb_monitor_max_columns(dbmon);
+ unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
- /* Create JSON object for transaction on this table. */
- if (!table_json) {
- table_json = json_object_create();
- json_object_put(json, mt->table->schema->name, table_json);
- }
+ SHASH_FOR_EACH (node, &dbmon->tables) {
+ struct ovsdb_monitor_table *mt = node->data;
+ struct ovsdb_row *row;
+ struct json *table_json = NULL;
+ struct ovsdb_condition *old_condition, *new_condition;
+
+ if (!ovsdb_monitor_get_table_conditions(mt,
+ condition,
+ &old_condition,
+ &new_condition) ||
+ !ovsdb_condition_cmp_3way(old_condition, new_condition)) {
+ /* Nothing to update on this table */
+ continue;
+ }
+
+ /* Iterate over all rows in table */
+ HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
+ struct json *row_json;
- /* Add JSON row to JSON table. */
- snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
- json_object_put(table_json, uuid, row_json);
+ row_json = ovsdb_monitor_compose_row_update2(mt, condition,
+ OVSDB_ROW, row,
+ false, changed);
+ if (row_json) {
+ ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
+ &table_json, row_json,
+ ovsdb_row_get_uuid(row));
}
}
+ ovsdb_monitor_table_condition_updated(mt, condition);
}
free(changed);
/* Returns JSON for a <table-updates> object (as described in RFC 7047)
* for all the outstanding changes within 'monitor' that starts from
- * '*unflushed' transaction id.
+ * '*unflushed'.
+ * If cond_updated is true all rows in the db that match conditions will be
+ * sent.
*
* The caller should specify 'initial' as true if the returned JSON is going to
* be used as part of the initial reply to a "monitor" request, false if it is
struct json *
ovsdb_monitor_get_update(
struct ovsdb_monitor *dbmon,
- bool initial, uint64_t *unflushed_,
- const struct ovsdb_monitor_session_condition *condition,
+ bool initial, bool cond_updated,
+ uint64_t *unflushed_,
+ struct ovsdb_monitor_session_condition *condition,
enum ovsdb_monitor_version version)
{
struct ovsdb_monitor_json_cache_node *cache_node = NULL;
const uint64_t unflushed = *unflushed_;
const uint64_t next_unflushed = dbmon->n_transactions + 1;
+ ovs_assert(cond_updated ? unflushed == next_unflushed : true);
+
/* Return a clone of cached json if one exists. Otherwise,
* generate a new one and add it to the cache. */
- if (!condition || !condition->conditional) {
+ if (!condition || (!condition->conditional && !cond_updated)) {
cache_node = ovsdb_monitor_json_cache_search(dbmon, version,
unflushed);
}
ovsdb_monitor_compose_row_update);
} else {
ovs_assert(version == OVSDB_MONITOR_V2);
- json =
- ovsdb_monitor_compose_update(dbmon, initial, unflushed,
- condition,
- ovsdb_monitor_compose_row_update2);
- }
- if (!condition || !condition->conditional) {
- ovsdb_monitor_json_cache_insert(dbmon, version, unflushed, json);
- }
+ if (!cond_updated) {
+ json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
+ condition,
+ ovsdb_monitor_compose_row_update2);
+
+ if (!condition || !condition->conditional) {
+ ovsdb_monitor_json_cache_insert(dbmon, version, unflushed,
+ json);
+ }
+ } else {
+ /* Compose update on whole db due to condition update.
+ Session must be flushed (change list is empty)*/
+ json =
+ ovsdb_monitor_compose_cond_change_update(dbmon, condition);
+ }
+ }
}
/* Maintain transaction id of 'changes'. */
void
ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
{
- struct ovsdb_monitor_aux aux;
struct shash_node *node;
- ovsdb_monitor_init_aux(&aux, dbmon);
SHASH_FOR_EACH (node, &dbmon->tables) {
struct ovsdb_monitor_table *mt = node->data;