bool need_revalidate; /* As indicated by 'reval_seq'. */
bool reval_exit; /* Set by leader on 'exit_latch. */
pthread_barrier_t reval_barrier; /* Barrier used by revalidators. */
- struct dpif_flow_dump dump; /* DPIF flow dump state. */
+ struct dpif_flow_dump *dump; /* DPIF flow dump state. */
long long int dump_duration; /* Duration of the last flow dump. */
struct seq *dump_seq; /* Increments each dump iteration. */
start_time = time_msec();
if (!udpif->reval_exit) {
- dpif_flow_dump_start(&udpif->dump, udpif->dpif);
+ udpif->dump = dpif_flow_dump_create(udpif->dpif);
}
}
if (leader) {
long long int duration;
- dpif_flow_dump_done(&udpif->dump);
+ dpif_flow_dump_destroy(udpif->dump);
seq_change(udpif->dump_seq);
duration = MAX(time_msec() - start_time, 1);
static bool
revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
- const struct nlattr *mask, size_t mask_len,
- const struct nlattr *actions, size_t actions_len,
- const struct dpif_flow_stats *stats)
+ const struct dpif_flow *f)
{
uint64_t slow_path_buf[128 / 8];
struct xlate_out xout, *xoutp;
ovs_mutex_lock(&ukey->mutex);
last_used = ukey->stats.used;
- push.used = stats->used;
- push.tcp_flags = stats->tcp_flags;
- push.n_packets = stats->n_packets > ukey->stats.n_packets
- ? stats->n_packets - ukey->stats.n_packets
- : 0;
- push.n_bytes = stats->n_bytes > ukey->stats.n_bytes
- ? stats->n_bytes - ukey->stats.n_bytes
- : 0;
+ push.used = f->stats.used;
+ push.tcp_flags = f->stats.tcp_flags;
+ push.n_packets = (f->stats.n_packets > ukey->stats.n_packets
+ ? f->stats.n_packets - ukey->stats.n_packets
+ : 0);
+ push.n_bytes = (f->stats.n_bytes > ukey->stats.n_bytes
+ ? f->stats.n_bytes - ukey->stats.n_bytes
+ : 0);
if (!ukey->flow_exists) {
/* Don't bother revalidating if the flow was already deleted. */
}
/* We will push the stats, so update the ukey stats cache. */
- ukey->stats = *stats;
+ ukey->stats = f->stats;
if (!push.n_packets && !udpif->need_revalidate) {
ok = true;
goto exit;
compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
}
- if (actions_len != ofpbuf_size(&xout_actions)
- || memcmp(ofpbuf_data(&xout_actions), actions, actions_len)) {
+ if (f->actions_len != ofpbuf_size(&xout_actions)
+ || memcmp(ofpbuf_data(&xout_actions), f->actions, f->actions_len)) {
goto exit;
}
- if (odp_flow_key_to_mask(mask, mask_len, &dp_mask, &flow)
+ if (odp_flow_key_to_mask(f->mask, f->mask_len, &dp_mask, &flow)
== ODP_FIT_ERROR) {
goto exit;
}
revalidate(struct revalidator *revalidator)
{
struct udpif *udpif = revalidator->udpif;
-
- struct dump_op ops[REVALIDATE_MAX_BATCH];
- const struct nlattr *key, *mask, *actions;
- size_t key_len, mask_len, actions_len;
- const struct dpif_flow_stats *stats;
- long long int now;
+ struct dpif_flow_dump_thread *dump_thread;
unsigned int flow_limit;
- size_t n_ops;
- void *state;
- n_ops = 0;
- now = time_msec();
atomic_read(&udpif->flow_limit, &flow_limit);
+ dump_thread = dpif_flow_dump_thread_create(udpif->dump);
+ for (;;) {
+ struct dump_op ops[REVALIDATE_MAX_BATCH];
+ int n_ops = 0;
- dpif_flow_dump_state_init(udpif->dpif, &state);
- while (dpif_flow_dump_next(&udpif->dump, state, &key, &key_len, &mask,
- &mask_len, &actions, &actions_len, &stats)) {
- struct udpif_key *ukey;
- bool mark, may_destroy;
- long long int used, max_idle;
- uint32_t hash;
- size_t n_flows;
-
- hash = hash_bytes(key, key_len, udpif->secret);
- ukey = ukey_lookup(udpif, key, key_len, hash);
-
- used = stats->used;
- if (!used && ukey) {
- ovs_mutex_lock(&ukey->mutex);
-
- if (ukey->mark || !ukey->flow_exists) {
- /* The flow has already been dumped. This can occasionally
- * occur if the datapath is changed in the middle of a flow
- * dump. Rather than perform the same work twice, skip the
- * flow this time. */
- ovs_mutex_unlock(&ukey->mutex);
- COVERAGE_INC(upcall_duplicate_flow);
- goto next;
- }
+ struct dpif_flow flows[REVALIDATE_MAX_BATCH];
+ const struct dpif_flow *f;
+ int n_dumped;
- used = ukey->created;
- ovs_mutex_unlock(&ukey->mutex);
- }
+ long long int max_idle;
+ long long int now;
+ size_t n_dp_flows;
+ bool kill_them_all;
- n_flows = udpif_get_n_flows(udpif);
- max_idle = ofproto_max_idle;
- if (n_flows > flow_limit) {
- max_idle = 100;
+ n_dumped = dpif_flow_dump_next(dump_thread, flows, ARRAY_SIZE(flows));
+ if (!n_dumped) {
+ break;
}
- if ((used && used < now - max_idle) || n_flows > flow_limit * 2) {
- mark = false;
- } else {
- if (!ukey) {
- ukey = ukey_create(key, key_len, used);
- if (!udpif_insert_ukey(udpif, ukey, hash)) {
- /* The same ukey has already been created. This means that
- * another revalidator is processing this flow
- * concurrently, so don't bother processing it. */
- ukey_delete(NULL, ukey);
- goto next;
+ now = time_msec();
+
+ /* In normal operation we want to keep flows around until they have
+ * been idle for 'ofproto_max_idle' milliseconds. However:
+ *
+ * - If the number of datapath flows climbs above 'flow_limit',
+ * drop that down to 100 ms to try to bring the flows down to
+ * the limit.
+ *
+ * - If the number of datapath flows climbs above twice
+ * 'flow_limit', delete all the datapath flows as an emergency
+ * measure. (We reassess this condition for the next batch of
+ * datapath flows, so we will recover before all the flows are
+ * gone.) */
+ n_dp_flows = udpif_get_n_flows(udpif);
+ kill_them_all = n_dp_flows > flow_limit * 2;
+ max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
+
+ for (f = flows; f < &flows[n_dumped]; f++) {
+ long long int used = f->stats.used;
+ uint32_t hash = hash_bytes(f->key, f->key_len, udpif->secret);
+ struct udpif_key *ukey = ukey_lookup(udpif, f->key, f->key_len,
+ hash);
+ bool mark;
+
+ if (!used && ukey) {
+ bool already_dumped;
+
+ ovs_mutex_lock(&ukey->mutex);
+ already_dumped = ukey->mark || !ukey->flow_exists;
+ used = ukey->created;
+ ovs_mutex_unlock(&ukey->mutex);
+
+ if (already_dumped) {
+ /* The flow has already been dumped. This can occasionally
+ * occur if the datapath is changed in the middle of a flow
+ * dump. Rather than perform the same work twice, skip the
+ * flow this time. */
+ COVERAGE_INC(upcall_duplicate_flow);
+ continue;
}
}
- mark = revalidate_ukey(udpif, ukey, mask, mask_len, actions,
- actions_len, stats);
- }
-
- if (ukey) {
- ovs_mutex_lock(&ukey->mutex);
- ukey->mark = ukey->flow_exists = mark;
- ovs_mutex_unlock(&ukey->mutex);
- }
+ if (kill_them_all || (used && used < now - max_idle)) {
+ mark = false;
+ } else {
+ if (!ukey) {
+ ukey = ukey_create(f->key, f->key_len, used);
+ if (!udpif_insert_ukey(udpif, ukey, hash)) {
+ /* The same ukey has already been created. This means
+ * that another revalidator is processing this flow
+ * concurrently, so don't bother processing it. */
+ ukey_delete(NULL, ukey);
+ continue;
+ }
+ }
- if (!mark) {
- dump_op_init(&ops[n_ops++], key, key_len, ukey);
- }
+ mark = revalidate_ukey(udpif, ukey, f);
+ }
- next:
- may_destroy = dpif_flow_dump_next_may_destroy_keys(&udpif->dump,
- state);
+ if (ukey) {
+ ovs_mutex_lock(&ukey->mutex);
+ ukey->mark = ukey->flow_exists = mark;
+ ovs_mutex_unlock(&ukey->mutex);
+ }
- /* Only update 'now' immediately before 'buffer' will be updated.
- * This gives us the current time relative to the time the datapath
- * will write into 'stats'. */
- if (may_destroy) {
- now = time_msec();
+ if (!mark) {
+ dump_op_init(&ops[n_ops++], f->key, f->key_len, ukey);
+ }
}
- /* Only do a dpif_operate when we've hit our maximum batch, or when our
- * memory is about to be clobbered by the next call to
- * dpif_flow_dump_next(). */
- if (n_ops == REVALIDATE_MAX_BATCH || (n_ops && may_destroy)) {
+ if (n_ops) {
push_dump_ops__(udpif, ops, n_ops);
- n_ops = 0;
}
}
-
- if (n_ops) {
- push_dump_ops__(udpif, ops, n_ops);
- }
-
- dpif_flow_dump_state_uninit(udpif->dpif, state);
+ dpif_flow_dump_thread_destroy(dump_thread);
}
static void