datapath: Add workqueue API to ovs compat workqueue.
[cascardo/ovs.git] / datapath / linux / compat / workqueue.c
1 /*
2  * Derived from the kernel/workqueue.c
3  *
4  * This is the generic async execution mechanism.  Work items as are
5  * executed in process context.
6  *
7  */
8
9 #include <linux/kernel.h>
10 #include <linux/sched.h>
11 #include <linux/init.h>
12 #include <linux/signal.h>
13 #include <linux/completion.h>
14 #include <linux/workqueue.h>
15 #include <linux/slab.h>
16 #include <linux/cpu.h>
17 #include <linux/notifier.h>
18 #include <linux/kthread.h>
19 #include <linux/hardirq.h>
20 #include <linux/mempolicy.h>
21 #include <linux/kallsyms.h>
22 #include <linux/debug_locks.h>
23 #include <linux/lockdep.h>
24 #include <linux/idr.h>
25
26 static spinlock_t wq_lock;
27 static struct list_head workq;
28 static wait_queue_head_t more_work;
29 static struct task_struct *workq_thread;
30 static struct work_struct *current_work;
31
32 static void add_work_to_ovs_wq(struct work_struct *work)
33 {
34         list_add_tail(&work->entry, &workq);
35         wake_up(&more_work);
36 }
37 static void __queue_work(struct work_struct *work)
38 {
39         unsigned long flags;
40
41         spin_lock_irqsave(&wq_lock, flags);
42         add_work_to_ovs_wq(work);
43         spin_unlock_irqrestore(&wq_lock, flags);
44 }
45
46 void queue_work(struct work_struct *work)
47 {
48         if (test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
49                 return;
50         __queue_work(work);
51 }
52
53 static void _delayed_work_timer_fn(unsigned long __data)
54 {
55         struct delayed_work *dwork = (struct delayed_work *)__data;
56         __queue_work(&dwork->work);
57 }
58
59 static void __queue_delayed_work(struct delayed_work *dwork,
60                 unsigned long delay)
61 {
62         struct timer_list *timer = &dwork->timer;
63         struct work_struct *work = &dwork->work;
64
65         BUG_ON(timer_pending(timer));
66         BUG_ON(!list_empty(&work->entry));
67
68         timer->expires = jiffies + delay;
69         timer->data = (unsigned long)dwork;
70         timer->function = _delayed_work_timer_fn;
71
72         add_timer(timer);
73 }
74
75 int schedule_delayed_work(struct delayed_work *dwork, unsigned long delay)
76 {
77         if (test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(&dwork->work)))
78                 return 0;
79
80         if (delay == 0)
81                 __queue_work(&dwork->work);
82         else
83                 __queue_delayed_work(dwork, delay);
84
85         return 1;
86 }
87
88 struct wq_barrier {
89         struct work_struct      work;
90         struct completion       done;
91 };
92
93 static void wq_barrier_func(struct work_struct *work)
94 {
95         struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
96         complete(&barr->done);
97 }
98
99 static void workqueue_barrier(struct work_struct *work)
100 {
101         bool need_barrier;
102         struct wq_barrier barr;
103
104         spin_lock_irq(&wq_lock);
105         if (current_work != work)
106                 need_barrier = false;
107         else {
108                 INIT_WORK(&barr.work, wq_barrier_func);
109                 init_completion(&barr.done);
110                 add_work_to_ovs_wq(&barr.work);
111                 need_barrier = true;
112         }
113         spin_unlock_irq(&wq_lock);
114
115         if (need_barrier)
116                 wait_for_completion(&barr.done);
117 }
118
119 static int try_to_grab_pending(struct work_struct *work)
120 {
121         int ret;
122
123         BUG_ON(in_interrupt());
124
125         if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
126                 return 0;
127
128         spin_lock_irq(&wq_lock);
129         if (!list_empty(&work->entry)) {
130                 list_del_init(&work->entry);
131                 ret = 0;
132         } else
133                 /* Already executed, retry. */
134                 ret = -1;
135         spin_unlock_irq(&wq_lock);
136
137         return ret;
138 }
139
140 static int __cancel_work_timer(struct work_struct *work,
141                                struct timer_list *timer)
142 {
143         int ret;
144
145         for (;;) {
146                 ret = (timer && likely(del_timer(timer)));
147                 if (ret) /* Was active timer, return true. */
148                         break;
149
150                 /* Inactive timer case */
151                 ret = try_to_grab_pending(work);
152                 if (!ret)
153                         break;
154         }
155         workqueue_barrier(work);
156         work_clear_pending(work);
157         return ret;
158 }
159
160 int cancel_delayed_work_sync(struct delayed_work *dwork)
161 {
162         return __cancel_work_timer(&dwork->work, &dwork->timer);
163 }
164
165 bool cancel_work_sync(struct work_struct *work)
166 {
167         return __cancel_work_timer(work, NULL);
168 }
169
170 static void run_workqueue(void)
171 {
172         spin_lock_irq(&wq_lock);
173         while (!list_empty(&workq)) {
174                 struct work_struct *work = list_entry(workq.next,
175                                 struct work_struct, entry);
176
177                 work_func_t f = work->func;
178                 list_del_init(workq.next);
179                 current_work = work;
180                 spin_unlock_irq(&wq_lock);
181
182                 work_clear_pending(work);
183                 f(work);
184
185                 BUG_ON(in_interrupt());
186                 spin_lock_irq(&wq_lock);
187                 current_work = NULL;
188         }
189         spin_unlock_irq(&wq_lock);
190 }
191
192 static int worker_thread(void *dummy)
193 {
194         for (;;) {
195                 wait_event_interruptible(more_work,
196                                 (kthread_should_stop() || !list_empty(&workq)));
197
198                 if (kthread_should_stop())
199                         break;
200
201                 run_workqueue();
202         }
203
204         return 0;
205 }
206
207 int __init ovs_workqueues_init(void)
208 {
209         spin_lock_init(&wq_lock);
210         INIT_LIST_HEAD(&workq);
211         init_waitqueue_head(&more_work);
212
213         workq_thread = kthread_create(worker_thread, NULL, "ovs_workq");
214         if (IS_ERR(workq_thread))
215                 return PTR_ERR(workq_thread);
216
217         wake_up_process(workq_thread);
218         return 0;
219 }
220
221 void  ovs_workqueues_exit(void)
222 {
223         BUG_ON(!list_empty(&workq));
224         kthread_stop(workq_thread);
225 }