ptr_ring: resize support
[cascardo/linux.git] / include / linux / ptr_ring.h
1 /*
2  *      Definitions for the 'struct ptr_ring' datastructure.
3  *
4  *      Author:
5  *              Michael S. Tsirkin <mst@redhat.com>
6  *
7  *      Copyright (C) 2016 Red Hat, Inc.
8  *
9  *      This program is free software; you can redistribute it and/or modify it
10  *      under the terms of the GNU General Public License as published by the
11  *      Free Software Foundation; either version 2 of the License, or (at your
12  *      option) any later version.
13  *
14  *      This is a limited-size FIFO maintaining pointers in FIFO order, with
15  *      one CPU producing entries and another consuming entries from a FIFO.
16  *
17  *      This implementation tries to minimize cache-contention when there is a
18  *      single producer and a single consumer CPU.
19  */
20
21 #ifndef _LINUX_PTR_RING_H
22 #define _LINUX_PTR_RING_H 1
23
24 #ifdef __KERNEL__
25 #include <linux/spinlock.h>
26 #include <linux/cache.h>
27 #include <linux/types.h>
28 #include <linux/compiler.h>
29 #include <linux/cache.h>
30 #include <linux/slab.h>
31 #include <asm/errno.h>
32 #endif
33
34 struct ptr_ring {
35         int producer ____cacheline_aligned_in_smp;
36         spinlock_t producer_lock;
37         int consumer ____cacheline_aligned_in_smp;
38         spinlock_t consumer_lock;
39         /* Shared consumer/producer data */
40         /* Read-only by both the producer and the consumer */
41         int size ____cacheline_aligned_in_smp; /* max entries in queue */
42         void **queue;
43 };
44
45 /* Note: callers invoking this in a loop must use a compiler barrier,
46  * for example cpu_relax().  If ring is ever resized, callers must hold
47  * producer_lock - see e.g. ptr_ring_full.  Otherwise, if callers don't hold
48  * producer_lock, the next call to __ptr_ring_produce may fail.
49  */
50 static inline bool __ptr_ring_full(struct ptr_ring *r)
51 {
52         return r->queue[r->producer];
53 }
54
55 static inline bool ptr_ring_full(struct ptr_ring *r)
56 {
57         bool ret;
58
59         spin_lock(&r->producer_lock);
60         ret = __ptr_ring_full(r);
61         spin_unlock(&r->producer_lock);
62
63         return ret;
64 }
65
66 static inline bool ptr_ring_full_irq(struct ptr_ring *r)
67 {
68         bool ret;
69
70         spin_lock_irq(&r->producer_lock);
71         ret = __ptr_ring_full(r);
72         spin_unlock_irq(&r->producer_lock);
73
74         return ret;
75 }
76
77 static inline bool ptr_ring_full_any(struct ptr_ring *r)
78 {
79         unsigned long flags;
80         bool ret;
81
82         spin_lock_irqsave(&r->producer_lock, flags);
83         ret = __ptr_ring_full(r);
84         spin_unlock_irqrestore(&r->producer_lock, flags);
85
86         return ret;
87 }
88
89 static inline bool ptr_ring_full_bh(struct ptr_ring *r)
90 {
91         bool ret;
92
93         spin_lock_bh(&r->producer_lock);
94         ret = __ptr_ring_full(r);
95         spin_unlock_bh(&r->producer_lock);
96
97         return ret;
98 }
99
100 /* Note: callers invoking this in a loop must use a compiler barrier,
101  * for example cpu_relax(). Callers must hold producer_lock.
102  */
103 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
104 {
105         if (r->queue[r->producer])
106                 return -ENOSPC;
107
108         r->queue[r->producer++] = ptr;
109         if (unlikely(r->producer >= r->size))
110                 r->producer = 0;
111         return 0;
112 }
113
114 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
115 {
116         int ret;
117
118         spin_lock(&r->producer_lock);
119         ret = __ptr_ring_produce(r, ptr);
120         spin_unlock(&r->producer_lock);
121
122         return ret;
123 }
124
125 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
126 {
127         int ret;
128
129         spin_lock_irq(&r->producer_lock);
130         ret = __ptr_ring_produce(r, ptr);
131         spin_unlock_irq(&r->producer_lock);
132
133         return ret;
134 }
135
136 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
137 {
138         unsigned long flags;
139         int ret;
140
141         spin_lock_irqsave(&r->producer_lock, flags);
142         ret = __ptr_ring_produce(r, ptr);
143         spin_unlock_irqrestore(&r->producer_lock, flags);
144
145         return ret;
146 }
147
148 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
149 {
150         int ret;
151
152         spin_lock_bh(&r->producer_lock);
153         ret = __ptr_ring_produce(r, ptr);
154         spin_unlock_bh(&r->producer_lock);
155
156         return ret;
157 }
158
159 /* Note: callers invoking this in a loop must use a compiler barrier,
160  * for example cpu_relax(). Callers must take consumer_lock
161  * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
162  * If ring is never resized, and if the pointer is merely
163  * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty.
164  */
165 static inline void *__ptr_ring_peek(struct ptr_ring *r)
166 {
167         return r->queue[r->consumer];
168 }
169
170 /* Note: callers invoking this in a loop must use a compiler barrier,
171  * for example cpu_relax(). Callers must take consumer_lock
172  * if the ring is ever resized - see e.g. ptr_ring_empty.
173  */
174 static inline bool __ptr_ring_empty(struct ptr_ring *r)
175 {
176         return !__ptr_ring_peek(r);
177 }
178
179 static inline bool ptr_ring_empty(struct ptr_ring *r)
180 {
181         bool ret;
182
183         spin_lock(&r->consumer_lock);
184         ret = __ptr_ring_empty(r);
185         spin_unlock(&r->consumer_lock);
186
187         return ret;
188 }
189
190 static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
191 {
192         bool ret;
193
194         spin_lock_irq(&r->consumer_lock);
195         ret = __ptr_ring_empty(r);
196         spin_unlock_irq(&r->consumer_lock);
197
198         return ret;
199 }
200
201 static inline bool ptr_ring_empty_any(struct ptr_ring *r)
202 {
203         unsigned long flags;
204         bool ret;
205
206         spin_lock_irqsave(&r->consumer_lock, flags);
207         ret = __ptr_ring_empty(r);
208         spin_unlock_irqrestore(&r->consumer_lock, flags);
209
210         return ret;
211 }
212
213 static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
214 {
215         bool ret;
216
217         spin_lock_bh(&r->consumer_lock);
218         ret = __ptr_ring_empty(r);
219         spin_unlock_bh(&r->consumer_lock);
220
221         return ret;
222 }
223
224 /* Must only be called after __ptr_ring_peek returned !NULL */
225 static inline void __ptr_ring_discard_one(struct ptr_ring *r)
226 {
227         r->queue[r->consumer++] = NULL;
228         if (unlikely(r->consumer >= r->size))
229                 r->consumer = 0;
230 }
231
232 static inline void *__ptr_ring_consume(struct ptr_ring *r)
233 {
234         void *ptr;
235
236         ptr = __ptr_ring_peek(r);
237         if (ptr)
238                 __ptr_ring_discard_one(r);
239
240         return ptr;
241 }
242
243 static inline void *ptr_ring_consume(struct ptr_ring *r)
244 {
245         void *ptr;
246
247         spin_lock(&r->consumer_lock);
248         ptr = __ptr_ring_consume(r);
249         spin_unlock(&r->consumer_lock);
250
251         return ptr;
252 }
253
254 static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
255 {
256         void *ptr;
257
258         spin_lock_irq(&r->consumer_lock);
259         ptr = __ptr_ring_consume(r);
260         spin_unlock_irq(&r->consumer_lock);
261
262         return ptr;
263 }
264
265 static inline void *ptr_ring_consume_any(struct ptr_ring *r)
266 {
267         unsigned long flags;
268         void *ptr;
269
270         spin_lock_irqsave(&r->consumer_lock, flags);
271         ptr = __ptr_ring_consume(r);
272         spin_unlock_irqrestore(&r->consumer_lock, flags);
273
274         return ptr;
275 }
276
277 static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
278 {
279         void *ptr;
280
281         spin_lock_bh(&r->consumer_lock);
282         ptr = __ptr_ring_consume(r);
283         spin_unlock_bh(&r->consumer_lock);
284
285         return ptr;
286 }
287
288 /* Cast to structure type and call a function without discarding from FIFO.
289  * Function must return a value.
290  * Callers must take consumer_lock.
291  */
292 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
293
294 #define PTR_RING_PEEK_CALL(r, f) ({ \
295         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
296         \
297         spin_lock(&(r)->consumer_lock); \
298         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
299         spin_unlock(&(r)->consumer_lock); \
300         __PTR_RING_PEEK_CALL_v; \
301 })
302
303 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
304         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
305         \
306         spin_lock_irq(&(r)->consumer_lock); \
307         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
308         spin_unlock_irq(&(r)->consumer_lock); \
309         __PTR_RING_PEEK_CALL_v; \
310 })
311
312 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
313         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
314         \
315         spin_lock_bh(&(r)->consumer_lock); \
316         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
317         spin_unlock_bh(&(r)->consumer_lock); \
318         __PTR_RING_PEEK_CALL_v; \
319 })
320
321 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
322         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
323         unsigned long __PTR_RING_PEEK_CALL_f;\
324         \
325         spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
326         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
327         spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
328         __PTR_RING_PEEK_CALL_v; \
329 })
330
331 static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp)
332 {
333         return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
334 }
335
336 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
337 {
338         r->queue = __ptr_ring_init_queue_alloc(size, gfp);
339         if (!r->queue)
340                 return -ENOMEM;
341
342         r->size = size;
343         r->producer = r->consumer = 0;
344         spin_lock_init(&r->producer_lock);
345         spin_lock_init(&r->consumer_lock);
346
347         return 0;
348 }
349
350 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
351                                   void (*destroy)(void *))
352 {
353         unsigned long flags;
354         int producer = 0;
355         void **queue = __ptr_ring_init_queue_alloc(size, gfp);
356         void **old;
357         void *ptr;
358
359         if (!queue)
360                 return -ENOMEM;
361
362         spin_lock_irqsave(&(r)->producer_lock, flags);
363
364         while ((ptr = ptr_ring_consume(r)))
365                 if (producer < size)
366                         queue[producer++] = ptr;
367                 else if (destroy)
368                         destroy(ptr);
369
370         r->size = size;
371         r->producer = producer;
372         r->consumer = 0;
373         old = r->queue;
374         r->queue = queue;
375
376         spin_unlock_irqrestore(&(r)->producer_lock, flags);
377
378         kfree(old);
379
380         return 0;
381 }
382
383 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
384 {
385         void *ptr;
386
387         if (destroy)
388                 while ((ptr = ptr_ring_consume(r)))
389                         destroy(ptr);
390         kfree(r->queue);
391 }
392
393 #endif /* _LINUX_PTR_RING_H  */