2 * Copyright (c) 2014 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include "fatal-signal.h"
21 #include "guarded-list.h"
22 #include "openvswitch/list.h"
23 #include "ovs-thread.h"
24 #include "poll-loop.h"
27 #include "openvswitch/vlog.h"
29 VLOG_DEFINE_THIS_MODULE(ovs_rcu);
32 void (*function)(void *aux);
37 struct ovs_list list_node;
38 struct ovsrcu_cb cbs[16];
42 struct ovsrcu_perthread {
43 struct ovs_list list_node; /* In global list. */
45 struct ovs_mutex mutex;
47 struct ovsrcu_cbset *cbset;
48 char name[16]; /* This thread's name. */
51 static struct seq *global_seqno;
53 static pthread_key_t perthread_key;
54 static struct ovs_list ovsrcu_threads;
55 static struct ovs_mutex ovsrcu_threads_mutex;
57 static struct guarded_list flushed_cbsets;
58 static struct seq *flushed_cbsets_seq;
60 static void ovsrcu_init_module(void);
61 static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
62 static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
63 static void ovsrcu_unregister__(struct ovsrcu_perthread *);
64 static bool ovsrcu_call_postponed(void);
65 static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
67 static struct ovsrcu_perthread *
68 ovsrcu_perthread_get(void)
70 struct ovsrcu_perthread *perthread;
74 perthread = pthread_getspecific(perthread_key);
76 const char *name = get_subprogram_name();
78 perthread = xmalloc(sizeof *perthread);
79 ovs_mutex_init(&perthread->mutex);
80 perthread->seqno = seq_read(global_seqno);
81 perthread->cbset = NULL;
82 ovs_strlcpy(perthread->name, name[0] ? name : "main",
83 sizeof perthread->name);
85 ovs_mutex_lock(&ovsrcu_threads_mutex);
86 ovs_list_push_back(&ovsrcu_threads, &perthread->list_node);
87 ovs_mutex_unlock(&ovsrcu_threads_mutex);
89 pthread_setspecific(perthread_key, perthread);
94 /* Indicates the end of a quiescent state. See "Details" near the top of
97 * Quiescent states don't stack or nest, so this always ends a quiescent state
98 * even if ovsrcu_quiesce_start() was called multiple times in a row. */
100 ovsrcu_quiesce_end(void)
102 ovsrcu_perthread_get();
106 ovsrcu_quiesced(void)
108 if (single_threaded()) {
109 ovsrcu_call_postponed();
111 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
112 if (ovsthread_once_start(&once)) {
113 ovs_thread_create("urcu", ovsrcu_postpone_thread, NULL);
114 ovsthread_once_done(&once);
119 /* Indicates the beginning of a quiescent state. See "Details" near the top of
122 ovsrcu_quiesce_start(void)
124 struct ovsrcu_perthread *perthread;
126 ovsrcu_init_module();
127 perthread = pthread_getspecific(perthread_key);
129 pthread_setspecific(perthread_key, NULL);
130 ovsrcu_unregister__(perthread);
136 /* Indicates a momentary quiescent state. See "Details" near the top of
139 * Provides a full memory barrier via seq_change().
144 struct ovsrcu_perthread *perthread;
146 perthread = ovsrcu_perthread_get();
147 perthread->seqno = seq_read(global_seqno);
148 if (perthread->cbset) {
149 ovsrcu_flush_cbset(perthread);
151 seq_change(global_seqno);
157 ovsrcu_try_quiesce(void)
159 struct ovsrcu_perthread *perthread;
162 ovs_assert(!single_threaded());
163 perthread = ovsrcu_perthread_get();
164 if (!seq_try_lock()) {
165 perthread->seqno = seq_read_protected(global_seqno);
166 if (perthread->cbset) {
167 ovsrcu_flush_cbset__(perthread, true);
169 seq_change_protected(global_seqno);
178 ovsrcu_is_quiescent(void)
180 ovsrcu_init_module();
181 return pthread_getspecific(perthread_key) == NULL;
185 ovsrcu_synchronize(void)
187 unsigned int warning_threshold = 1000;
188 uint64_t target_seqno;
191 if (single_threaded()) {
195 target_seqno = seq_read(global_seqno);
196 ovsrcu_quiesce_start();
200 uint64_t cur_seqno = seq_read(global_seqno);
201 struct ovsrcu_perthread *perthread;
202 char stalled_thread[16];
203 unsigned int elapsed;
206 ovs_mutex_lock(&ovsrcu_threads_mutex);
207 LIST_FOR_EACH (perthread, list_node, &ovsrcu_threads) {
208 if (perthread->seqno <= target_seqno) {
209 ovs_strlcpy(stalled_thread, perthread->name,
210 sizeof stalled_thread);
215 ovs_mutex_unlock(&ovsrcu_threads_mutex);
221 elapsed = time_msec() - start;
222 if (elapsed >= warning_threshold) {
223 VLOG_WARN("blocked %u ms waiting for %s to quiesce",
224 elapsed, stalled_thread);
225 warning_threshold *= 2;
227 poll_timer_wait_until(start + warning_threshold);
229 seq_wait(global_seqno, cur_seqno);
232 ovsrcu_quiesce_end();
235 /* Registers 'function' to be called, passing 'aux' as argument, after the
238 * The call is guaranteed to happen after the next time all participating
239 * threads have quiesced at least once, but there is no quarantee that all
240 * registered functions are called as early as possible, or that the functions
241 * registered by different threads would be called in the order the
242 * registrations took place. In particular, even if two threads provably
243 * register a function each in a specific order, the functions may still be
244 * called in the opposite order, depending on the timing of when the threads
245 * call ovsrcu_quiesce(), how many functions they postpone, and when the
246 * ovs-rcu thread happens to grab the functions to be called.
248 * All functions registered by a single thread are guaranteed to execute in the
249 * registering order, however.
251 * This function is more conveniently called through the ovsrcu_postpone()
252 * macro, which provides a type-safe way to allow 'function''s parameter to be
253 * any pointer type. */
255 ovsrcu_postpone__(void (*function)(void *aux), void *aux)
257 struct ovsrcu_perthread *perthread = ovsrcu_perthread_get();
258 struct ovsrcu_cbset *cbset;
259 struct ovsrcu_cb *cb;
261 cbset = perthread->cbset;
263 cbset = perthread->cbset = xmalloc(sizeof *perthread->cbset);
267 cb = &cbset->cbs[cbset->n_cbs++];
268 cb->function = function;
271 if (cbset->n_cbs >= ARRAY_SIZE(cbset->cbs)) {
272 ovsrcu_flush_cbset(perthread);
277 ovsrcu_call_postponed(void)
279 struct ovsrcu_cbset *cbset;
280 struct ovs_list cbsets;
282 guarded_list_pop_all(&flushed_cbsets, &cbsets);
283 if (ovs_list_is_empty(&cbsets)) {
287 ovsrcu_synchronize();
289 LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
290 struct ovsrcu_cb *cb;
292 for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
293 cb->function(cb->aux);
302 ovsrcu_postpone_thread(void *arg OVS_UNUSED)
304 pthread_detach(pthread_self());
307 uint64_t seqno = seq_read(flushed_cbsets_seq);
308 if (!ovsrcu_call_postponed()) {
309 seq_wait(flushed_cbsets_seq, seqno);
318 ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
320 struct ovsrcu_cbset *cbset = perthread->cbset;
323 guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
324 perthread->cbset = NULL;
327 seq_change_protected(flushed_cbsets_seq);
329 seq_change(flushed_cbsets_seq);
335 ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
337 ovsrcu_flush_cbset__(perthread, false);
341 ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
343 if (perthread->cbset) {
344 ovsrcu_flush_cbset(perthread);
347 ovs_mutex_lock(&ovsrcu_threads_mutex);
348 ovs_list_remove(&perthread->list_node);
349 ovs_mutex_unlock(&ovsrcu_threads_mutex);
351 ovs_mutex_destroy(&perthread->mutex);
354 seq_change(global_seqno);
358 ovsrcu_thread_exit_cb(void *perthread)
360 ovsrcu_unregister__(perthread);
363 /* Cancels the callback to ovsrcu_thread_exit_cb().
365 * Cancelling the call to the destructor during the main thread exit
366 * is needed while using pthreads-win32 library in Windows. It has been
367 * observed that in pthreads-win32, a call to the destructor during
368 * main thread exit causes undefined behavior. */
370 ovsrcu_cancel_thread_exit_cb(void *aux OVS_UNUSED)
372 pthread_setspecific(perthread_key, NULL);
376 ovsrcu_init_module(void)
378 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
379 if (ovsthread_once_start(&once)) {
380 global_seqno = seq_create();
381 xpthread_key_create(&perthread_key, ovsrcu_thread_exit_cb);
382 fatal_signal_add_hook(ovsrcu_cancel_thread_exit_cb, NULL, NULL, true);
383 ovs_list_init(&ovsrcu_threads);
384 ovs_mutex_init(&ovsrcu_threads_mutex);
386 guarded_list_init(&flushed_cbsets);
387 flushed_cbsets_seq = seq_create();
389 ovsthread_once_done(&once);