#include "ovsdb-error.h"
#include "ovsdb-parser.h"
#include "ovsdb.h"
+#include "condition.h"
#include "poll-loop.h"
#include "reconnect.h"
#include "row.h"
struct ovsdb_jsonrpc_remote;
struct ovsdb_jsonrpc_session;
-/* Set false to defeature monitor2, causing jsonrpc to respond to monitor2
- * method with an error. */
-static bool monitor2_enable__ = true;
+/* Set false to defeature monitor_cond, causing jsonrpc to respond to
+ * monitor_cond method with an error. */
+static bool monitor_cond_enable__ = true;
/* Message rate-limiting. */
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
enum ovsdb_monitor_version, const struct json *request_id);
+static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_change(
+ struct ovsdb_jsonrpc_session *s,
+ struct json *params,
+ const struct json *request_id);
static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
struct ovsdb_jsonrpc_session *,
struct json_array *params,
static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
static struct json *ovsdb_jsonrpc_monitor_compose_update(
struct ovsdb_jsonrpc_monitor *monitor, bool initial);
+static struct jsonrpc_msg * ovsdb_jsonrpc_create_notify(
+ const struct ovsdb_jsonrpc_monitor *m,
+ struct json *params);
\f
/* JSON-RPC database server. */
static void ovsdb_jsonrpc_session_get_memory_usage(
const struct ovsdb_jsonrpc_session *, struct simap *usage);
static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
- struct jsonrpc_msg *);
+ struct jsonrpc_msg *);
static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
struct jsonrpc_msg *);
reply = execute_transaction(s, db, request);
}
} else if (!strcmp(request->method, "monitor") ||
- (monitor2_enable__ && !strcmp(request->method, "monitor2"))) {
+ (monitor_cond_enable__ && !strcmp(request->method,
+ "monitor_cond"))) {
struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
if (!reply) {
int l = strlen(request->method) - strlen("monitor");
reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
version, request->id);
}
+ } else if (!strcmp(request->method, "monitor_cond_change")) {
+ reply = ovsdb_jsonrpc_monitor_cond_change(s, request->params,
+ request->id);
} else if (!strcmp(request->method, "monitor_cancel")) {
reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
request->id);
uint64_t unflushed; /* The first transaction that has not been
flushed to the jsonrpc remote client. */
enum ovsdb_monitor_version version;
+ struct ovsdb_monitor_session_condition *condition;/* Session's condition */
};
static struct ovsdb_jsonrpc_monitor *
}
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
-ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_monitor *dbmon,
- const struct ovsdb_table *table,
- const struct json *monitor_request,
- size_t *allocated_columns)
+ovsdb_jsonrpc_parse_monitor_request(
+ struct ovsdb_monitor *dbmon,
+ const struct ovsdb_table *table,
+ struct ovsdb_monitor_session_condition *cond,
+ const struct json *monitor_request)
{
const struct ovsdb_table_schema *ts = table->schema;
enum ovsdb_monitor_selection select;
- const struct json *columns, *select_json;
+ const struct json *columns, *select_json, *where = NULL;
struct ovsdb_parser parser;
struct ovsdb_error *error;
ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
+ if (cond) {
+ where = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
+ }
columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
+
select_json = ovsdb_parser_member(&parser, "select",
OP_OBJECT | OP_OPTIONAL);
+
error = ovsdb_parser_finish(&parser);
if (error) {
return error;
return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
"column name", s);
}
- ovsdb_monitor_add_column(dbmon, table, column, select,
- allocated_columns);
+ if (ovsdb_monitor_add_column(dbmon, table, column,
+ select, true)) {
+ return ovsdb_syntax_error(columns, NULL, "column %s "
+ "mentioned more than once",
+ column->name);
+ }
}
} else {
struct shash_node *node;
SHASH_FOR_EACH (node, &ts->columns) {
const struct ovsdb_column *column = node->data;
if (column->index != OVSDB_COL_UUID) {
- ovsdb_monitor_add_column(dbmon, table, column, select,
- allocated_columns);
+ if (ovsdb_monitor_add_column(dbmon, table, column,
+ select, true)) {
+ return ovsdb_syntax_error(columns, NULL, "column %s "
+ "mentioned more than once",
+ column->name);
+ }
}
}
}
+ if (cond) {
+ error = ovsdb_monitor_table_condition_create(cond, table, where);
+ if (error) {
+ return error;
+ }
+ }
return NULL;
}
m->session = s;
m->db = db;
m->dbmon = ovsdb_monitor_create(db, m);
+ if (version == OVSDB_MONITOR_V2) {
+ m->condition = ovsdb_monitor_session_condition_create();
+ }
m->unflushed = 0;
m->version = version;
hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
SHASH_FOR_EACH (node, json_object(monitor_requests)) {
const struct ovsdb_table *table;
- const char *column_name;
- size_t allocated_columns;
const struct json *mr_value;
size_t i;
/* Parse columns. */
mr_value = node->data;
- allocated_columns = 0;
if (mr_value->type == JSON_ARRAY) {
const struct json_array *array = &mr_value->u.array;
for (i = 0; i < array->n; i++) {
- error = ovsdb_jsonrpc_parse_monitor_request(
- m->dbmon, table, array->elems[i], &allocated_columns);
+ error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
+ table,
+ m->condition,
+ array->elems[i]);
if (error) {
goto error;
}
}
} else {
- error = ovsdb_jsonrpc_parse_monitor_request(
- m->dbmon, table, mr_value, &allocated_columns);
+ error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
+ table,
+ m->condition,
+ mr_value);
if (error) {
goto error;
}
}
-
- column_name = ovsdb_monitor_table_check_duplicates(m->dbmon, table);
-
- if (column_name) {
- error = ovsdb_syntax_error(mr_value, NULL, "column %s "
- "mentioned more than once",
- column_name);
- goto error;
- }
}
dbmon = ovsdb_monitor_add(m->dbmon);
m->dbmon = dbmon;
}
+ /* Only now we can bind session's condition to ovsdb_monitor */
+ if (m->condition) {
+ ovsdb_monitor_condition_bind(m->dbmon, m->condition);
+ }
+
ovsdb_monitor_get_initial(m->dbmon);
json = ovsdb_jsonrpc_monitor_compose_update(m, true);
json = json ? json : json_object_create();
return jsonrpc_create_error(json, request_id);
}
+static struct ovsdb_error *
+ovsdb_jsonrpc_parse_monitor_cond_change_request(
+ struct ovsdb_jsonrpc_monitor *m,
+ const struct ovsdb_table *table,
+ const struct json *cond_change_req)
+{
+ const struct ovsdb_table_schema *ts = table->schema;
+ const struct json *condition, *columns;
+ struct ovsdb_parser parser;
+ struct ovsdb_error *error;
+
+ ovsdb_parser_init(&parser, cond_change_req, "table %s", ts->name);
+ columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
+ condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
+
+ error = ovsdb_parser_finish(&parser);
+ if (error) {
+ return error;
+ }
+
+ if (columns) {
+ error = ovsdb_syntax_error(cond_change_req, NULL, "changing columns "
+ "is unsupported");
+ return error;
+ }
+ error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition, table,
+ condition);
+
+ return error;
+}
+
+static struct jsonrpc_msg *
+ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
+ struct json *params,
+ const struct json *request_id)
+{
+ struct ovsdb_error *error;
+ struct ovsdb_jsonrpc_monitor *m;
+ struct json *monitor_cond_change_reqs;
+ struct shash_node *node;
+ struct json *json;
+
+ if (json_array(params)->n != 3) {
+ error = ovsdb_syntax_error(params, NULL, "invalid parameters");
+ goto error;
+ }
+
+ m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
+ if (!m) {
+ error = ovsdb_syntax_error(request_id, NULL,
+ "unknown monitor session");
+ goto error;
+ }
+
+ monitor_cond_change_reqs = params->u.array.elems[2];
+ if (monitor_cond_change_reqs->type != JSON_OBJECT) {
+ error =
+ ovsdb_syntax_error(NULL, NULL,
+ "monitor-cond-change-requests must be object");
+ goto error;
+ }
+
+ SHASH_FOR_EACH (node, json_object(monitor_cond_change_reqs)) {
+ const struct ovsdb_table *table;
+ const struct json *mr_value;
+ size_t i;
+
+ table = ovsdb_get_table(m->db, node->name);
+ if (!table) {
+ error = ovsdb_syntax_error(NULL, NULL,
+ "no table named %s", node->name);
+ goto error;
+ }
+ if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
+ error = ovsdb_syntax_error(NULL, NULL,
+ "no table named %s in monitor session",
+ node->name);
+ goto error;
+ }
+
+ mr_value = node->data;
+ if (mr_value->type == JSON_ARRAY) {
+ const struct json_array *array = &mr_value->u.array;
+
+ for (i = 0; i < array->n; i++) {
+ error = ovsdb_jsonrpc_parse_monitor_cond_change_request(
+ m, table, array->elems[i]);
+ if (error) {
+ goto error;
+ }
+ }
+ } else {
+ error = ovsdb_syntax_error(
+ NULL, NULL,
+ "table %s no monitor-cond-change JSON array",
+ node->name);
+ goto error;
+ }
+ }
+
+ /* Change monitor id */
+ hmap_remove(&s->monitors, &m->node);
+ json_destroy(m->monitor_id);
+ m->monitor_id = json_clone(params->u.array.elems[1]);
+ hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
+
+ /* Send the new update, if any, represents the difference from the old
+ * condition and the new one. */
+ struct json *update_json;
+
+ update_json = ovsdb_monitor_get_update(m->dbmon, false, true,
+ &m->unflushed, m->condition, m->version);
+ if (update_json) {
+ struct jsonrpc_msg *msg;
+ struct json *params;
+
+ params = json_array_create_2(json_clone(m->monitor_id), update_json);
+ msg = ovsdb_jsonrpc_create_notify(m, params);
+ jsonrpc_session_send(s->js, msg);
+ }
+
+ return jsonrpc_create_reply(json_object_create(), request_id);
+
+error:
+
+ json = ovsdb_error_to_json(error);
+ ovsdb_error_destroy(error);
+ return jsonrpc_create_error(json, request_id);
+}
+
static struct jsonrpc_msg *
ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
struct json_array *params,
ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
bool initial)
{
+
if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
return NULL;
}
- return ovsdb_monitor_get_update(m->dbmon, initial, &m->unflushed,
- m->version);
+ return ovsdb_monitor_get_update(m->dbmon, initial, false,
+ &m->unflushed, m->condition, m->version);
}
static bool
json_destroy(m->monitor_id);
hmap_remove(&m->session->monitors, &m->node);
ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
+ ovsdb_monitor_session_condition_destroy(m->condition);
free(m);
}
}
void
-ovsdb_jsonrpc_disable_monitor2(void)
+ovsdb_jsonrpc_disable_monitor_cond(void)
{
/* Once disabled, it is not possible to re-enable it. */
- monitor2_enable__ = false;
+ monitor_cond_enable__ = false;
}