+
+ /* Update flow tracking data. */
+ nf_flow->packet_count = 0;
+ nf_flow->byte_count = 0;
+ nf_flow->tcp_flags = 0;
+}
+
+void
+netflow_flow_clear(struct netflow *nf, const struct flow *flow)
+ OVS_EXCLUDED(mutex)
+{
+ struct netflow_flow *nf_flow;
+
+ ovs_mutex_lock(&mutex);
+ nf_flow = netflow_flow_lookup(nf, flow);
+ if (nf_flow) {
+ netflow_expire__(nf, nf_flow);
+ hmap_remove(&nf->flows, &nf_flow->hmap_node);
+ free(nf_flow);
+ }
+ ovs_mutex_unlock(&mutex);
+}
+
+/* Returns true if it's time to send out a round of NetFlow active timeouts,
+ * false otherwise. */
+static void
+netflow_run__(struct netflow *nf) OVS_REQUIRES(mutex)
+{
+ long long int now = time_msec();
+ struct netflow_flow *nf_flow, *next;
+
+ if (nf->packet.size) {
+ collectors_send(nf->collectors, nf->packet.data, nf->packet.size);
+ nf->packet.size = 0;
+ }
+
+ if (!nf->active_timeout || now < nf->next_timeout) {
+ return;
+ }
+
+ nf->next_timeout = now + 1000;
+
+ HMAP_FOR_EACH_SAFE (nf_flow, next, hmap_node, &nf->flows) {
+ if (now > nf_flow->last_expired + nf->active_timeout) {
+ bool idle = nf_flow->used < nf_flow->last_expired;
+ netflow_expire__(nf, nf_flow);
+
+ if (idle) {
+ /* If the netflow_flow hasn't been used in a while, it's
+ * possible the upper layer lost track of it. */
+ hmap_remove(&nf->flows, &nf_flow->hmap_node);
+ free(nf_flow);
+ }
+ }
+ }
+}
+
+void
+netflow_run(struct netflow *nf)
+{
+ ovs_mutex_lock(&mutex);
+ netflow_run__(nf);
+ ovs_mutex_unlock(&mutex);
+}
+
+void
+netflow_wait(struct netflow *nf) OVS_EXCLUDED(mutex)
+{
+ ovs_mutex_lock(&mutex);
+ if (nf->active_timeout) {
+ poll_timer_wait_until(nf->next_timeout);
+ }
+ if (nf->packet.size) {
+ poll_immediate_wake();
+ }
+ ovs_mutex_unlock(&mutex);