From: Andy Zhou Date: Fri, 13 Mar 2015 23:35:49 +0000 (-0700) Subject: ovsdb-monitor: add ovsdb_monitor_changes X-Git-Tag: v2.4.0~158 X-Git-Url: http://git.cascardo.eti.br/?p=cascardo%2Fovs.git;a=commitdiff_plain;h=1158f320622954f4027a35260916f6a950529c27 ovsdb-monitor: add ovsdb_monitor_changes Currently, each monitor table contains a single hmap 'changes' to track updates. This patch introduces a new data structure 'ovsdb_monitor_changes' that stores the updates 'rows' tagged by its first commit transaction id. Each 'ovsdb_monitor_changes' is refenece counted allowing multiple jsonrpc_monitors to share them. The next patch will allow each ovsdb monitor table to store a list of 'ovsdb_monitor_changes'. This patch stores only one, same as before. Signed-off-by: Andy Zhou Acked-by: Ben Pfaff --- diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index 05aaf8718..efd83b8f0 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -1282,11 +1282,10 @@ ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s) } static struct json * -ovsdb_jsonrpc_monitor_compose_update( - struct ovsdb_jsonrpc_monitor *monitor, bool initial) +ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m, + bool initial) { - return ovsdb_monitor_compose_update(monitor->dbmon, initial, - &monitor->unflushed); + return ovsdb_monitor_compose_update(m->dbmon, initial, &m->unflushed); } static bool diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c index 0898808b4..b82941b38 100644 --- a/ovsdb/monitor.c +++ b/ovsdb/monitor.c @@ -71,6 +71,23 @@ struct ovsdb_monitor_row { struct ovsdb_datum *new; /* New data, NULL for a deleted row. */ }; +/* Contains 'struct ovsdb_monitor_row's for rows that have been + * updated but not yet flushed to all the jsonrpc connection. + * + * 'n_refs' represent the number of jsonrpc connections that have + * not received updates. Generate the update for the last jsonprc + * connection will also destroy the whole "struct ovsdb_monitor_changes" + * object. + * + * 'transaction' stores the first update's transaction id. + * */ +struct ovsdb_monitor_changes { + struct ovsdb_monitor_table *mt; + struct hmap rows; + int n_refs; + uint64_t transaction; +}; + /* A particular table being monitored. */ struct ovsdb_monitor_table { const struct ovsdb_table *table; @@ -85,10 +102,16 @@ struct ovsdb_monitor_table { /* Contains 'struct ovsdb_monitor_row's for rows that have been * updated but not yet flushed to the jsonrpc connection. */ - struct hmap changes; + struct ovsdb_monitor_changes *changes; }; static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon); +static void ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt, + uint64_t next_txn); +static void ovsdb_monitor_changes_destroy( + struct ovsdb_monitor_changes *changes); +static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt, + uint64_t transaction); static int compare_ovsdb_monitor_column(const void *a_, const void *b_) @@ -106,7 +129,7 @@ ovsdb_monitor_cast(struct ovsdb_replica *replica) return CONTAINER_OF(replica, struct ovsdb_monitor, replica); } -/* Finds and returns the ovsdb_monitor_row in 'mt->changes' for the +/* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the * given 'uuid', or NULL if there is no such row. */ static struct ovsdb_monitor_row * ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt, @@ -114,7 +137,8 @@ ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt, { struct ovsdb_monitor_row *row; - HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &mt->changes) { + HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), + &mt->changes->rows) { if (uuid_equals(uuid, &row->uuid)) { return row; } @@ -233,7 +257,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m, mt = xzalloc(sizeof *mt); mt->table = table; - hmap_init(&mt->changes); + mt->changes = NULL; shash_add(&m->tables, table->schema->name, mt); } @@ -286,6 +310,61 @@ ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m, return NULL; } +static void +ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt, + uint64_t next_txn) +{ + struct ovsdb_monitor_changes *changes; + + changes = xzalloc(sizeof *changes); + + changes->transaction = next_txn; + changes->mt = mt; + changes->n_refs = 1; + hmap_init(&changes->rows); + mt->changes = changes; +} + +/* Stop currently tracking changes to table 'mt' since 'transaction'. + * + * Return 'true' if the 'transaction' is being tracked. 'false' otherwise. */ +static void +ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt, + uint64_t transaction) +{ + struct ovsdb_monitor_changes *changes = mt->changes; + if (changes) { + ovs_assert(changes->transaction == transaction); + if (--changes->n_refs == 0) { + ovsdb_monitor_changes_destroy(changes); + mt->changes = NULL; + } + } +} + +/* Start tracking changes to table 'mt' begins from 'transaction' inclusive. + */ +static void +ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt, + uint64_t transaction) +{ + ovs_assert(!mt->changes); + ovsdb_monitor_table_add_changes(mt, transaction); +} + +static void +ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes) +{ + struct ovsdb_monitor_row *row, *next; + + HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) { + hmap_remove(&changes->rows, &row->hmap_node); + ovsdb_monitor_row_destroy(changes->mt, row); + } + hmap_destroy(&changes->rows); + free(changes); +} + /* Returns JSON for a (as described in RFC 7047) for 'row' within * 'mt', or NULL if no row update should be sent. * @@ -376,7 +455,13 @@ ovsdb_monitor_compose_row_update( * * The caller should specify 'initial' as true if the returned JSON is going to * be used as part of the initial reply to a "monitor" request, false if it is - * going to be used as part of an "update" notification. */ + * going to be used as part of an "update" notification. + * + * 'unflushed' should point to value that is the transaction ID that did + * was not updated. The update contains changes between + * ['unflushed, ovsdb->n_transcations]. Before the function returns, this + * value will be updated to ovsdb->n_transactions + 1, ready for the next + * update. */ struct json * ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon, bool initial, uint64_t *unflushed) @@ -385,8 +470,8 @@ ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon, unsigned long int *changed; struct json *json; size_t max_columns; - - *unflushed = dbmon->n_transactions + 1; + uint64_t prev_txn = *unflushed; + uint64_t next_txn = dbmon->n_transactions + 1; max_columns = 0; SHASH_FOR_EACH (node, &dbmon->tables) { @@ -402,7 +487,12 @@ ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon, struct ovsdb_monitor_row *row, *next; struct json *table_json = NULL; - HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) { + if (!mt->changes) { + ovsdb_monitor_table_track_changes(mt, next_txn); + continue; + } + + HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes->rows) { struct json *row_json; row_json = ovsdb_monitor_compose_row_update( @@ -426,11 +516,15 @@ ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon, json_object_put(table_json, uuid, row_json); } - hmap_remove(&mt->changes, &row->hmap_node); + hmap_remove(&mt->changes->rows, &row->hmap_node); ovsdb_monitor_row_destroy(mt, row); } + + ovsdb_monitor_table_untrack_changes(mt, prev_txn); + ovsdb_monitor_table_track_changes(mt, next_txn); } + *unflushed = next_txn; free(changed); return json; } @@ -492,8 +586,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, change = ovsdb_monitor_row_find(mt, uuid); if (!change) { - change = xmalloc(sizeof *change); - hmap_insert(&mt->changes, &change->hmap_node, uuid_hash(uuid)); + change = xzalloc(sizeof *change); + hmap_insert(&mt->changes->rows, &change->hmap_node, uuid_hash(uuid)); change->uuid = *uuid; change->old = clone_monitor_row_data(mt, old); change->new = clone_monitor_row_data(mt, new); @@ -506,7 +600,7 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, if (!change->old) { /* This row was added then deleted. Forget about it. */ - hmap_remove(&mt->changes, &change->hmap_node); + hmap_remove(&mt->changes->rows, &change->hmap_node); free(change); } } @@ -527,6 +621,10 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon) if (mt->select & OJMS_INITIAL) { struct ovsdb_row *row; + if (!mt->changes) { + ovsdb_monitor_table_add_changes(mt, 0); + } + HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) { ovsdb_monitor_change_cb(NULL, row, NULL, &aux); } @@ -568,14 +666,8 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon) SHASH_FOR_EACH (node, &dbmon->tables) { struct ovsdb_monitor_table *mt = node->data; - struct ovsdb_monitor_row *row, *next; - - HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) { - hmap_remove(&mt->changes, &row->hmap_node); - ovsdb_monitor_row_destroy(mt, row); - } - hmap_destroy(&mt->changes); + ovsdb_monitor_changes_destroy(mt->changes); free(mt->columns); free(mt); }