netdev-dpdk: fix mbuf leaks
[cascardo/ovs.git] / lib / seq.c
1 /*
2  * Copyright (c) 2013, 2014 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include "seq.h"
20
21 #include <stdbool.h>
22
23 #include "coverage.h"
24 #include "hash.h"
25 #include "hmap.h"
26 #include "latch.h"
27 #include "list.h"
28 #include "ovs-thread.h"
29 #include "poll-loop.h"
30
31 COVERAGE_DEFINE(seq_change);
32
33 /* A sequence number object. */
34 struct seq {
35     uint64_t value OVS_GUARDED;
36     struct hmap waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */
37 };
38
39 /* A thread waiting on a particular seq. */
40 struct seq_waiter {
41     struct seq *seq OVS_GUARDED;            /* Seq being waited for. */
42     struct hmap_node hmap_node OVS_GUARDED; /* In 'seq->waiters'. */
43     unsigned int ovsthread_id OVS_GUARDED;  /* Key in 'waiters' hmap. */
44
45     struct seq_thread *thread OVS_GUARDED;  /* Thread preparing to wait. */
46     struct ovs_list list_node OVS_GUARDED;  /* In 'thread->waiters'. */
47
48     uint64_t value OVS_GUARDED; /* seq->value we're waiting to change. */
49 };
50
51 /* A thread that might be waiting on one or more seqs. */
52 struct seq_thread {
53     struct ovs_list waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */
54     struct latch latch OVS_GUARDED;  /* Wakeup latch for this thread. */
55     bool waiting OVS_GUARDED;        /* True if latch_wait() already called. */
56 };
57
58 static struct ovs_mutex seq_mutex = OVS_MUTEX_INITIALIZER;
59
60 static uint64_t seq_next OVS_GUARDED_BY(seq_mutex) = 1;
61
62 static pthread_key_t seq_thread_key;
63
64 static void seq_init(void);
65 static struct seq_thread *seq_thread_get(void) OVS_REQUIRES(seq_mutex);
66 static void seq_thread_exit(void *thread_) OVS_EXCLUDED(seq_mutex);
67 static void seq_thread_woke(struct seq_thread *) OVS_REQUIRES(seq_mutex);
68 static void seq_waiter_destroy(struct seq_waiter *) OVS_REQUIRES(seq_mutex);
69 static void seq_wake_waiters(struct seq *) OVS_REQUIRES(seq_mutex);
70
71 /* Creates and returns a new 'seq' object. */
72 struct seq * OVS_EXCLUDED(seq_mutex)
73 seq_create(void)
74 {
75     struct seq *seq;
76
77     seq_init();
78
79     seq = xmalloc(sizeof *seq);
80
81     COVERAGE_INC(seq_change);
82
83     ovs_mutex_lock(&seq_mutex);
84     seq->value = seq_next++;
85     hmap_init(&seq->waiters);
86     ovs_mutex_unlock(&seq_mutex);
87
88     return seq;
89 }
90
91 /* Destroys 'seq', waking up threads that were waiting on it, if any. */
92 void
93 seq_destroy(struct seq *seq)
94      OVS_EXCLUDED(seq_mutex)
95 {
96     ovs_mutex_lock(&seq_mutex);
97     seq_wake_waiters(seq);
98     hmap_destroy(&seq->waiters);
99     free(seq);
100     ovs_mutex_unlock(&seq_mutex);
101 }
102
103 /* Increments 'seq''s sequence number, waking up any threads that are waiting
104  * on 'seq'. */
105 void
106 seq_change(struct seq *seq)
107     OVS_EXCLUDED(seq_mutex)
108 {
109     COVERAGE_INC(seq_change);
110
111     ovs_mutex_lock(&seq_mutex);
112     seq->value = seq_next++;
113     seq_wake_waiters(seq);
114     ovs_mutex_unlock(&seq_mutex);
115 }
116
117 /* Returns 'seq''s current sequence number (which could change immediately).
118  *
119  * seq_read() and seq_wait() can be used together to yield a race-free wakeup
120  * when an object changes, even without an ability to lock the object.  See
121  * Usage in seq.h for details. */
122 uint64_t
123 seq_read(const struct seq *seq)
124     OVS_EXCLUDED(seq_mutex)
125 {
126     uint64_t value;
127
128     ovs_mutex_lock(&seq_mutex);
129     value = seq->value;
130     ovs_mutex_unlock(&seq_mutex);
131
132     return value;
133 }
134
135 static void
136 seq_wait__(struct seq *seq, uint64_t value, const char *where)
137     OVS_REQUIRES(seq_mutex)
138 {
139     unsigned int id = ovsthread_id_self();
140     uint32_t hash = hash_int(id, 0);
141     struct seq_waiter *waiter;
142
143     HMAP_FOR_EACH_IN_BUCKET (waiter, hmap_node, hash, &seq->waiters) {
144         if (waiter->ovsthread_id == id) {
145             if (waiter->value != value) {
146                 /* The current value is different from the value we've already
147                  * waited for, */
148                 poll_immediate_wake_at(where);
149             } else {
150                 /* Already waiting on 'value', nothing more to do. */
151             }
152             return;
153         }
154     }
155
156     waiter = xmalloc(sizeof *waiter);
157     waiter->seq = seq;
158     hmap_insert(&seq->waiters, &waiter->hmap_node, hash);
159     waiter->ovsthread_id = id;
160     waiter->value = value;
161     waiter->thread = seq_thread_get();
162     list_push_back(&waiter->thread->waiters, &waiter->list_node);
163
164     if (!waiter->thread->waiting) {
165         latch_wait_at(&waiter->thread->latch, where);
166         waiter->thread->waiting = true;
167     }
168 }
169
170 /* Causes the following poll_block() to wake up when 'seq''s sequence number
171  * changes from 'value'.  (If 'seq''s sequence number isn't 'value', then
172  * poll_block() won't block at all.)
173  *
174  * seq_read() and seq_wait() can be used together to yield a race-free wakeup
175  * when an object changes, even without an ability to lock the object.  See
176  * Usage in seq.h for details.
177  *
178  * ('where' is used in debug logging.  Commonly one would use seq_wait() to
179  * automatically provide the caller's source file and line number for
180  * 'where'.) */
181 void
182 seq_wait_at(const struct seq *seq_, uint64_t value, const char *where)
183     OVS_EXCLUDED(seq_mutex)
184 {
185     struct seq *seq = CONST_CAST(struct seq *, seq_);
186
187     ovs_mutex_lock(&seq_mutex);
188     if (value == seq->value) {
189         seq_wait__(seq, value, where);
190     } else {
191         poll_immediate_wake_at(where);
192     }
193     ovs_mutex_unlock(&seq_mutex);
194 }
195
196 /* Called by poll_block() just before it returns, this function destroys any
197  * seq_waiter objects associated with the current thread. */
198 void
199 seq_woke(void)
200     OVS_EXCLUDED(seq_mutex)
201 {
202     struct seq_thread *thread;
203
204     seq_init();
205
206     thread = pthread_getspecific(seq_thread_key);
207     if (thread) {
208         ovs_mutex_lock(&seq_mutex);
209         seq_thread_woke(thread);
210         thread->waiting = false;
211         ovs_mutex_unlock(&seq_mutex);
212     }
213 }
214 \f
215 static void
216 seq_init(void)
217 {
218     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
219
220     if (ovsthread_once_start(&once)) {
221         xpthread_key_create(&seq_thread_key, seq_thread_exit);
222         ovsthread_once_done(&once);
223     }
224 }
225
226 static struct seq_thread *
227 seq_thread_get(void)
228     OVS_REQUIRES(seq_mutex)
229 {
230     struct seq_thread *thread = pthread_getspecific(seq_thread_key);
231     if (!thread) {
232         thread = xmalloc(sizeof *thread);
233         list_init(&thread->waiters);
234         latch_init(&thread->latch);
235         thread->waiting = false;
236
237         xpthread_setspecific(seq_thread_key, thread);
238     }
239     return thread;
240 }
241
242 static void
243 seq_thread_exit(void *thread_)
244     OVS_EXCLUDED(seq_mutex)
245 {
246     struct seq_thread *thread = thread_;
247
248     ovs_mutex_lock(&seq_mutex);
249     seq_thread_woke(thread);
250     latch_destroy(&thread->latch);
251     free(thread);
252     ovs_mutex_unlock(&seq_mutex);
253 }
254
255 static void
256 seq_thread_woke(struct seq_thread *thread)
257     OVS_REQUIRES(seq_mutex)
258 {
259     struct seq_waiter *waiter, *next_waiter;
260
261     LIST_FOR_EACH_SAFE (waiter, next_waiter, list_node, &thread->waiters) {
262         ovs_assert(waiter->thread == thread);
263         seq_waiter_destroy(waiter);
264     }
265     latch_poll(&thread->latch);
266 }
267
268 static void
269 seq_waiter_destroy(struct seq_waiter *waiter)
270     OVS_REQUIRES(seq_mutex)
271 {
272     hmap_remove(&waiter->seq->waiters, &waiter->hmap_node);
273     list_remove(&waiter->list_node);
274     free(waiter);
275 }
276
277 static void
278 seq_wake_waiters(struct seq *seq)
279     OVS_REQUIRES(seq_mutex)
280 {
281     struct seq_waiter *waiter, *next_waiter;
282
283     HMAP_FOR_EACH_SAFE (waiter, next_waiter, hmap_node, &seq->waiters) {
284         latch_set(&waiter->thread->latch);
285         seq_waiter_destroy(waiter);
286     }
287 }