#include "ovsdb.h"
#include "row.h"
#include "simap.h"
+#include "hash.h"
#include "table.h"
#include "timeval.h"
#include "transaction.h"
struct ovsdb_monitor {
struct ovsdb_replica replica;
struct shash tables; /* Holds "struct ovsdb_monitor_table"s. */
- struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
+ struct ovs_list jsonrpc_monitors; /* Contains "jsonrpc_monitor_node"s. */
struct ovsdb *db;
+ uint64_t n_transactions; /* Count number of committed transactions. */
+};
+
+struct jsonrpc_monitor_node {
+ struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
+ struct ovs_list node;
};
/* A particular column being monitored. */
struct ovsdb_datum *new; /* New data, NULL for a deleted row. */
};
+/* Contains 'struct ovsdb_monitor_row's for rows that have been
+ * updated but not yet flushed to all the jsonrpc connection.
+ *
+ * 'n_refs' represent the number of jsonrpc connections that have
+ * not received updates. Generate the update for the last jsonprc
+ * connection will also destroy the whole "struct ovsdb_monitor_changes"
+ * object.
+ *
+ * 'transaction' stores the first update's transaction id.
+ * */
+struct ovsdb_monitor_changes {
+ struct ovsdb_monitor_table *mt;
+ struct hmap rows;
+ int n_refs;
+ uint64_t transaction;
+ struct hmap_node hmap_node; /* Element in ovsdb_monitor_tables' changes
+ hmap. */
+};
+
/* A particular table being monitored. */
struct ovsdb_monitor_table {
const struct ovsdb_table *table;
struct ovsdb_monitor_column *columns;
size_t n_columns;
- /* Contains 'struct ovsdb_monitor_row's for rows that have been
- * updated but not yet flushed to the jsonrpc connection. */
+ /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
struct hmap changes;
};
+static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
+static void ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
+ uint64_t next_txn);
+static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
+ struct ovsdb_monitor_table *mt, uint64_t unflushed);
+static void ovsdb_monitor_changes_destroy(
+ struct ovsdb_monitor_changes *changes);
+static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+ uint64_t unflushed);
+
static int
compare_ovsdb_monitor_column(const void *a_, const void *b_)
{
return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
}
-/* Finds and returns the ovsdb_monitor_row in 'mt->changes' for the
+/* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
* given 'uuid', or NULL if there is no such row. */
static struct ovsdb_monitor_row *
-ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt,
- const struct uuid *uuid)
+ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
+ const struct uuid *uuid)
{
struct ovsdb_monitor_row *row;
- HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &mt->changes) {
+ HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
+ &changes->rows) {
if (uuid_equals(uuid, &row->uuid)) {
return row;
}
struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
{
struct ovsdb_monitor *dbmon;
+ struct jsonrpc_monitor_node *jm;
dbmon = xzalloc(sizeof *dbmon);
ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
ovsdb_add_replica(db, &dbmon->replica);
- dbmon->jsonrpc_monitor = jsonrpc_monitor;
+ list_init(&dbmon->jsonrpc_monitors);
dbmon->db = db;
+ dbmon->n_transactions = 0;
shash_init(&dbmon->tables);
+ jm = xzalloc(sizeof *jm);
+ jm->jsonrpc_monitor = jsonrpc_monitor;
+ list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
+
return dbmon;
}
mt = xzalloc(sizeof *mt);
mt->table = table;
- hmap_init(&mt->changes);
shash_add(&m->tables, table->schema->name, mt);
+ hmap_init(&mt->changes);
}
void
return NULL;
}
+static void
+ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
+ uint64_t next_txn)
+{
+ struct ovsdb_monitor_changes *changes;
+
+ changes = xzalloc(sizeof *changes);
+
+ changes->transaction = next_txn;
+ changes->mt = mt;
+ changes->n_refs = 1;
+ hmap_init(&changes->rows);
+ hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
+};
+
+static struct ovsdb_monitor_changes *
+ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
+ uint64_t transaction)
+{
+ struct ovsdb_monitor_changes *changes;
+ size_t hash = hash_uint64(transaction);
+
+ HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
+ if (changes->transaction == transaction) {
+ return changes;
+ }
+ }
+
+ return NULL;
+}
+
+/* Stop currently tracking changes to table 'mt' since 'transaction'.
+ *
+ * Return 'true' if the 'transaction' is being tracked. 'false' otherwise. */
+static void
+ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
+ uint64_t transaction)
+{
+ struct ovsdb_monitor_changes *changes =
+ ovsdb_monitor_table_find_changes(mt, transaction);
+ if (changes) {
+ ovs_assert(changes->transaction == transaction);
+ if (--changes->n_refs == 0) {
+ hmap_remove(&mt->changes, &changes->hmap_node);
+ ovsdb_monitor_changes_destroy(changes);
+ }
+ }
+}
+
+/* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
+ */
+static void
+ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+ uint64_t transaction)
+{
+ struct ovsdb_monitor_changes *changes;
+
+ changes = ovsdb_monitor_table_find_changes(mt, transaction);
+ if (changes) {
+ changes->n_refs++;
+ } else {
+ ovsdb_monitor_table_add_changes(mt, transaction);
+ }
+}
+
+static void
+ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
+{
+ struct ovsdb_monitor_row *row, *next;
+
+ HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
+ hmap_remove(&changes->rows, &row->hmap_node);
+ ovsdb_monitor_row_destroy(changes->mt, row);
+ }
+ hmap_destroy(&changes->rows);
+ free(changes);
+}
+
/* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
* 'mt', or NULL if no row update should be sent.
*
*
* The caller should specify 'initial' as true if the returned JSON is going to
* be used as part of the initial reply to a "monitor" request, false if it is
- * going to be used as part of an "update" notification. */
+ * going to be used as part of an "update" notification.
+ *
+ * 'unflushed' should point to value that is the transaction ID that did
+ * was not updated. The update contains changes between
+ * ['unflushed, ovsdb->n_transcations]. Before the function returns, this
+ * value will be updated to ovsdb->n_transactions + 1, ready for the next
+ * update. */
struct json *
-ovsdb_monitor_compose_table_update(
- const struct ovsdb_monitor *dbmon, bool initial)
+ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
+ bool initial, uint64_t *unflushed)
{
struct shash_node *node;
unsigned long int *changed;
struct json *json;
size_t max_columns;
+ uint64_t prev_txn = *unflushed;
+ uint64_t next_txn = dbmon->n_transactions + 1;
max_columns = 0;
SHASH_FOR_EACH (node, &dbmon->tables) {
SHASH_FOR_EACH (node, &dbmon->tables) {
struct ovsdb_monitor_table *mt = node->data;
struct ovsdb_monitor_row *row, *next;
+ struct ovsdb_monitor_changes *changes;
struct json *table_json = NULL;
- HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
+ changes = ovsdb_monitor_table_find_changes(mt, prev_txn);
+ if (!changes) {
+ ovsdb_monitor_table_track_changes(mt, next_txn);
+ continue;
+ }
+
+ HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
struct json *row_json;
row_json = ovsdb_monitor_compose_row_update(
json_object_put(table_json, uuid, row_json);
}
- hmap_remove(&mt->changes, &row->hmap_node);
+ hmap_remove(&changes->rows, &row->hmap_node);
ovsdb_monitor_row_destroy(mt, row);
}
+
+ ovsdb_monitor_table_untrack_changes(mt, prev_txn);
+ ovsdb_monitor_table_track_changes(mt, next_txn);
}
+ *unflushed = next_txn;
free(changed);
return json;
}
bool
-ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon)
+ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
+ uint64_t next_transaction)
{
- struct shash_node *node;
-
- SHASH_FOR_EACH (node, &dbmon->tables) {
- struct ovsdb_monitor_table *mt = node->data;
-
- if (!hmap_is_empty(&mt->changes)) {
- return true;
- }
- }
- return false;
+ ovs_assert(next_transaction <= dbmon->n_transactions + 1);
+ return (next_transaction <= dbmon->n_transactions);
}
void
aux->mt = NULL;
}
+static void
+ovsdb_monitor_changes_update(const struct ovsdb_row *old,
+ const struct ovsdb_row *new,
+ const struct ovsdb_monitor_table *mt,
+ struct ovsdb_monitor_changes *changes)
+{
+ const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
+ struct ovsdb_monitor_row *change;
+
+ change = ovsdb_monitor_changes_row_find(changes, uuid);
+ if (!change) {
+ change = xzalloc(sizeof *change);
+ hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
+ change->uuid = *uuid;
+ change->old = clone_monitor_row_data(mt, old);
+ change->new = clone_monitor_row_data(mt, new);
+ } else {
+ if (new) {
+ update_monitor_row_data(mt, new, change->new);
+ } else {
+ free_monitor_row_data(mt, change->new);
+ change->new = NULL;
+
+ if (!change->old) {
+ /* This row was added then deleted. Forget about it. */
+ hmap_remove(&changes->rows, &change->hmap_node);
+ free(change);
+ }
+ }
+ }
+}
+
static bool
ovsdb_monitor_change_cb(const struct ovsdb_row *old,
const struct ovsdb_row *new,
struct ovsdb_monitor_aux *aux = aux_;
const struct ovsdb_monitor *m = aux->monitor;
struct ovsdb_table *table = new ? new->table : old->table;
- const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
- struct ovsdb_monitor_row *change;
struct ovsdb_monitor_table *mt;
+ struct ovsdb_monitor_changes *changes;
if (!aux->mt || table != aux->mt->table) {
aux->mt = shash_find_data(&m->tables, table->schema->name);
}
mt = aux->mt;
- change = ovsdb_monitor_row_find(mt, uuid);
- if (!change) {
- change = xmalloc(sizeof *change);
- hmap_insert(&mt->changes, &change->hmap_node, uuid_hash(uuid));
- change->uuid = *uuid;
- change->old = clone_monitor_row_data(mt, old);
- change->new = clone_monitor_row_data(mt, new);
- } else {
- if (new) {
- update_monitor_row_data(mt, new, change->new);
- } else {
- free_monitor_row_data(mt, change->new);
- change->new = NULL;
-
- if (!change->old) {
- /* This row was added then deleted. Forget about it. */
- hmap_remove(&mt->changes, &change->hmap_node);
- free(change);
- }
- }
+ HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
+ ovsdb_monitor_changes_update(old, new, mt, changes);
}
return true;
}
-struct json *
+void
ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
{
struct ovsdb_monitor_aux aux;
struct shash_node *node;
- struct json *json;
ovsdb_monitor_init_aux(&aux, dbmon);
SHASH_FOR_EACH (node, &dbmon->tables) {
if (mt->select & OJMS_INITIAL) {
struct ovsdb_row *row;
+ if (hmap_is_empty(&mt->changes)) {
+ ovsdb_monitor_table_add_changes(mt, 0);
+ }
+
HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
ovsdb_monitor_change_cb(NULL, row, NULL, &aux);
}
}
}
- json = ovsdb_monitor_compose_table_update(dbmon, true);
- return json ? json : json_object_create();
}
void
+ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
+ struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
+{
+ struct jsonrpc_monitor_node *jm;
+
+ /* Find and remove the jsonrpc monitor from the list. */
+ LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
+ if (jm->jsonrpc_monitor == jsonrpc_monitor) {
+ list_remove(&jm->node);
+ free(jm);
+
+ /* Destroy ovsdb monitor if this is the last user. */
+ if (list_is_empty(&dbmon->jsonrpc_monitors)) {
+ ovsdb_monitor_destroy(dbmon);
+ }
+
+ return;
+ };
+ }
+
+ /* Should never reach here. jsonrpc_monitor should be on the list. */
+ OVS_NOT_REACHED();
+}
+
+static void
ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
{
struct shash_node *node;
SHASH_FOR_EACH (node, &dbmon->tables) {
struct ovsdb_monitor_table *mt = node->data;
- struct ovsdb_monitor_row *row, *next;
+ struct ovsdb_monitor_changes *changes, *next;
- HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
- hmap_remove(&mt->changes, &row->hmap_node);
- ovsdb_monitor_row_destroy(mt, row);
+ HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
+ hmap_remove(&mt->changes, &changes->hmap_node);
+ ovsdb_monitor_changes_destroy(changes);
}
- hmap_destroy(&mt->changes);
-
free(mt->columns);
free(mt);
}
ovsdb_monitor_init_aux(&aux, m);
ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
+ m->n_transactions++;
return NULL;
}
ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
{
struct ovsdb_monitor *dbmon = ovsdb_monitor_cast(replica);
- struct ovsdb_jsonrpc_monitor *m = dbmon->jsonrpc_monitor;
+ struct jsonrpc_monitor_node *jm, *next;
- ovsdb_jsonrpc_monitor_destroy(m);
+ /* Delete all front end monitors. Removing the last front
+ * end monitor will also destroy the corresponding 'ovsdb_monitor'.
+ * ovsdb monitor will also be destroied. */
+ LIST_FOR_EACH_SAFE(jm, next, node, &dbmon->jsonrpc_monitors) {
+ ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor);
+ }
}
static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {