193c84964db9b04965faf197979f47342000351e
[cascardo/linux.git] / fs / btrfs / async-thread.c
1 /*
2  * Copyright (C) 2007 Oracle.  All rights reserved.
3  * Copyright (C) 2014 Fujitsu.  All rights reserved.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public
7  * License v2 as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public
15  * License along with this program; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 021110-1307, USA.
18  */
19
20 #include <linux/kthread.h>
21 #include <linux/slab.h>
22 #include <linux/list.h>
23 #include <linux/spinlock.h>
24 #include <linux/freezer.h>
25 #include <linux/workqueue.h>
26 #include "async-thread.h"
27
28 #define WORK_QUEUED_BIT 0
29 #define WORK_DONE_BIT 1
30 #define WORK_ORDER_DONE_BIT 2
31 #define WORK_HIGH_PRIO_BIT 3
32
33 /*
34  * container for the kthread task pointer and the list of pending work
35  * One of these is allocated per thread.
36  */
37 struct btrfs_worker_thread {
38         /* pool we belong to */
39         struct btrfs_workers *workers;
40
41         /* list of struct btrfs_work that are waiting for service */
42         struct list_head pending;
43         struct list_head prio_pending;
44
45         /* list of worker threads from struct btrfs_workers */
46         struct list_head worker_list;
47
48         /* kthread */
49         struct task_struct *task;
50
51         /* number of things on the pending list */
52         atomic_t num_pending;
53
54         /* reference counter for this struct */
55         atomic_t refs;
56
57         unsigned long sequence;
58
59         /* protects the pending list. */
60         spinlock_t lock;
61
62         /* set to non-zero when this thread is already awake and kicking */
63         int working;
64
65         /* are we currently idle */
66         int idle;
67 };
68
69 static int __btrfs_start_workers(struct btrfs_workers *workers);
70
71 /*
72  * btrfs_start_workers uses kthread_run, which can block waiting for memory
73  * for a very long time.  It will actually throttle on page writeback,
74  * and so it may not make progress until after our btrfs worker threads
75  * process all of the pending work structs in their queue
76  *
77  * This means we can't use btrfs_start_workers from inside a btrfs worker
78  * thread that is used as part of cleaning dirty memory, which pretty much
79  * involves all of the worker threads.
80  *
81  * Instead we have a helper queue who never has more than one thread
82  * where we scheduler thread start operations.  This worker_start struct
83  * is used to contain the work and hold a pointer to the queue that needs
84  * another worker.
85  */
86 struct worker_start {
87         struct btrfs_work work;
88         struct btrfs_workers *queue;
89 };
90
91 static void start_new_worker_func(struct btrfs_work *work)
92 {
93         struct worker_start *start;
94         start = container_of(work, struct worker_start, work);
95         __btrfs_start_workers(start->queue);
96         kfree(start);
97 }
98
99 /*
100  * helper function to move a thread onto the idle list after it
101  * has finished some requests.
102  */
103 static void check_idle_worker(struct btrfs_worker_thread *worker)
104 {
105         if (!worker->idle && atomic_read(&worker->num_pending) <
106             worker->workers->idle_thresh / 2) {
107                 unsigned long flags;
108                 spin_lock_irqsave(&worker->workers->lock, flags);
109                 worker->idle = 1;
110
111                 /* the list may be empty if the worker is just starting */
112                 if (!list_empty(&worker->worker_list) &&
113                     !worker->workers->stopping) {
114                         list_move(&worker->worker_list,
115                                  &worker->workers->idle_list);
116                 }
117                 spin_unlock_irqrestore(&worker->workers->lock, flags);
118         }
119 }
120
121 /*
122  * helper function to move a thread off the idle list after new
123  * pending work is added.
124  */
125 static void check_busy_worker(struct btrfs_worker_thread *worker)
126 {
127         if (worker->idle && atomic_read(&worker->num_pending) >=
128             worker->workers->idle_thresh) {
129                 unsigned long flags;
130                 spin_lock_irqsave(&worker->workers->lock, flags);
131                 worker->idle = 0;
132
133                 if (!list_empty(&worker->worker_list) &&
134                     !worker->workers->stopping) {
135                         list_move_tail(&worker->worker_list,
136                                       &worker->workers->worker_list);
137                 }
138                 spin_unlock_irqrestore(&worker->workers->lock, flags);
139         }
140 }
141
142 static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
143 {
144         struct btrfs_workers *workers = worker->workers;
145         struct worker_start *start;
146         unsigned long flags;
147
148         rmb();
149         if (!workers->atomic_start_pending)
150                 return;
151
152         start = kzalloc(sizeof(*start), GFP_NOFS);
153         if (!start)
154                 return;
155
156         start->work.func = start_new_worker_func;
157         start->queue = workers;
158
159         spin_lock_irqsave(&workers->lock, flags);
160         if (!workers->atomic_start_pending)
161                 goto out;
162
163         workers->atomic_start_pending = 0;
164         if (workers->num_workers + workers->num_workers_starting >=
165             workers->max_workers)
166                 goto out;
167
168         workers->num_workers_starting += 1;
169         spin_unlock_irqrestore(&workers->lock, flags);
170         btrfs_queue_worker(workers->atomic_worker_start, &start->work);
171         return;
172
173 out:
174         kfree(start);
175         spin_unlock_irqrestore(&workers->lock, flags);
176 }
177
178 static noinline void run_ordered_completions(struct btrfs_workers *workers,
179                                             struct btrfs_work *work)
180 {
181         if (!workers->ordered)
182                 return;
183
184         set_bit(WORK_DONE_BIT, &work->flags);
185
186         spin_lock(&workers->order_lock);
187
188         while (1) {
189                 if (!list_empty(&workers->prio_order_list)) {
190                         work = list_entry(workers->prio_order_list.next,
191                                           struct btrfs_work, order_list);
192                 } else if (!list_empty(&workers->order_list)) {
193                         work = list_entry(workers->order_list.next,
194                                           struct btrfs_work, order_list);
195                 } else {
196                         break;
197                 }
198                 if (!test_bit(WORK_DONE_BIT, &work->flags))
199                         break;
200
201                 /* we are going to call the ordered done function, but
202                  * we leave the work item on the list as a barrier so
203                  * that later work items that are done don't have their
204                  * functions called before this one returns
205                  */
206                 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
207                         break;
208
209                 spin_unlock(&workers->order_lock);
210
211                 work->ordered_func(work);
212
213                 /* now take the lock again and drop our item from the list */
214                 spin_lock(&workers->order_lock);
215                 list_del(&work->order_list);
216                 spin_unlock(&workers->order_lock);
217
218                 /*
219                  * we don't want to call the ordered free functions
220                  * with the lock held though
221                  */
222                 work->ordered_free(work);
223                 spin_lock(&workers->order_lock);
224         }
225
226         spin_unlock(&workers->order_lock);
227 }
228
229 static void put_worker(struct btrfs_worker_thread *worker)
230 {
231         if (atomic_dec_and_test(&worker->refs))
232                 kfree(worker);
233 }
234
235 static int try_worker_shutdown(struct btrfs_worker_thread *worker)
236 {
237         int freeit = 0;
238
239         spin_lock_irq(&worker->lock);
240         spin_lock(&worker->workers->lock);
241         if (worker->workers->num_workers > 1 &&
242             worker->idle &&
243             !worker->working &&
244             !list_empty(&worker->worker_list) &&
245             list_empty(&worker->prio_pending) &&
246             list_empty(&worker->pending) &&
247             atomic_read(&worker->num_pending) == 0) {
248                 freeit = 1;
249                 list_del_init(&worker->worker_list);
250                 worker->workers->num_workers--;
251         }
252         spin_unlock(&worker->workers->lock);
253         spin_unlock_irq(&worker->lock);
254
255         if (freeit)
256                 put_worker(worker);
257         return freeit;
258 }
259
260 static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
261                                         struct list_head *prio_head,
262                                         struct list_head *head)
263 {
264         struct btrfs_work *work = NULL;
265         struct list_head *cur = NULL;
266
267         if (!list_empty(prio_head)) {
268                 cur = prio_head->next;
269                 goto out;
270         }
271
272         smp_mb();
273         if (!list_empty(&worker->prio_pending))
274                 goto refill;
275
276         if (!list_empty(head)) {
277                 cur = head->next;
278                 goto out;
279         }
280
281 refill:
282         spin_lock_irq(&worker->lock);
283         list_splice_tail_init(&worker->prio_pending, prio_head);
284         list_splice_tail_init(&worker->pending, head);
285
286         if (!list_empty(prio_head))
287                 cur = prio_head->next;
288         else if (!list_empty(head))
289                 cur = head->next;
290         spin_unlock_irq(&worker->lock);
291
292         if (!cur)
293                 goto out_fail;
294
295 out:
296         work = list_entry(cur, struct btrfs_work, list);
297
298 out_fail:
299         return work;
300 }
301
302 /*
303  * main loop for servicing work items
304  */
305 static int worker_loop(void *arg)
306 {
307         struct btrfs_worker_thread *worker = arg;
308         struct list_head head;
309         struct list_head prio_head;
310         struct btrfs_work *work;
311
312         INIT_LIST_HEAD(&head);
313         INIT_LIST_HEAD(&prio_head);
314
315         do {
316 again:
317                 while (1) {
318
319
320                         work = get_next_work(worker, &prio_head, &head);
321                         if (!work)
322                                 break;
323
324                         list_del(&work->list);
325                         clear_bit(WORK_QUEUED_BIT, &work->flags);
326
327                         work->worker = worker;
328
329                         work->func(work);
330
331                         atomic_dec(&worker->num_pending);
332                         /*
333                          * unless this is an ordered work queue,
334                          * 'work' was probably freed by func above.
335                          */
336                         run_ordered_completions(worker->workers, work);
337
338                         check_pending_worker_creates(worker);
339                         cond_resched();
340                 }
341
342                 spin_lock_irq(&worker->lock);
343                 check_idle_worker(worker);
344
345                 if (freezing(current)) {
346                         worker->working = 0;
347                         spin_unlock_irq(&worker->lock);
348                         try_to_freeze();
349                 } else {
350                         spin_unlock_irq(&worker->lock);
351                         if (!kthread_should_stop()) {
352                                 cpu_relax();
353                                 /*
354                                  * we've dropped the lock, did someone else
355                                  * jump_in?
356                                  */
357                                 smp_mb();
358                                 if (!list_empty(&worker->pending) ||
359                                     !list_empty(&worker->prio_pending))
360                                         continue;
361
362                                 /*
363                                  * this short schedule allows more work to
364                                  * come in without the queue functions
365                                  * needing to go through wake_up_process()
366                                  *
367                                  * worker->working is still 1, so nobody
368                                  * is going to try and wake us up
369                                  */
370                                 schedule_timeout(1);
371                                 smp_mb();
372                                 if (!list_empty(&worker->pending) ||
373                                     !list_empty(&worker->prio_pending))
374                                         continue;
375
376                                 if (kthread_should_stop())
377                                         break;
378
379                                 /* still no more work?, sleep for real */
380                                 spin_lock_irq(&worker->lock);
381                                 set_current_state(TASK_INTERRUPTIBLE);
382                                 if (!list_empty(&worker->pending) ||
383                                     !list_empty(&worker->prio_pending)) {
384                                         spin_unlock_irq(&worker->lock);
385                                         set_current_state(TASK_RUNNING);
386                                         goto again;
387                                 }
388
389                                 /*
390                                  * this makes sure we get a wakeup when someone
391                                  * adds something new to the queue
392                                  */
393                                 worker->working = 0;
394                                 spin_unlock_irq(&worker->lock);
395
396                                 if (!kthread_should_stop()) {
397                                         schedule_timeout(HZ * 120);
398                                         if (!worker->working &&
399                                             try_worker_shutdown(worker)) {
400                                                 return 0;
401                                         }
402                                 }
403                         }
404                         __set_current_state(TASK_RUNNING);
405                 }
406         } while (!kthread_should_stop());
407         return 0;
408 }
409
410 /*
411  * this will wait for all the worker threads to shutdown
412  */
413 void btrfs_stop_workers(struct btrfs_workers *workers)
414 {
415         struct list_head *cur;
416         struct btrfs_worker_thread *worker;
417         int can_stop;
418
419         spin_lock_irq(&workers->lock);
420         workers->stopping = 1;
421         list_splice_init(&workers->idle_list, &workers->worker_list);
422         while (!list_empty(&workers->worker_list)) {
423                 cur = workers->worker_list.next;
424                 worker = list_entry(cur, struct btrfs_worker_thread,
425                                     worker_list);
426
427                 atomic_inc(&worker->refs);
428                 workers->num_workers -= 1;
429                 if (!list_empty(&worker->worker_list)) {
430                         list_del_init(&worker->worker_list);
431                         put_worker(worker);
432                         can_stop = 1;
433                 } else
434                         can_stop = 0;
435                 spin_unlock_irq(&workers->lock);
436                 if (can_stop)
437                         kthread_stop(worker->task);
438                 spin_lock_irq(&workers->lock);
439                 put_worker(worker);
440         }
441         spin_unlock_irq(&workers->lock);
442 }
443
444 /*
445  * simple init on struct btrfs_workers
446  */
447 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
448                         struct btrfs_workers *async_helper)
449 {
450         workers->num_workers = 0;
451         workers->num_workers_starting = 0;
452         INIT_LIST_HEAD(&workers->worker_list);
453         INIT_LIST_HEAD(&workers->idle_list);
454         INIT_LIST_HEAD(&workers->order_list);
455         INIT_LIST_HEAD(&workers->prio_order_list);
456         spin_lock_init(&workers->lock);
457         spin_lock_init(&workers->order_lock);
458         workers->max_workers = max;
459         workers->idle_thresh = 32;
460         workers->name = name;
461         workers->ordered = 0;
462         workers->atomic_start_pending = 0;
463         workers->atomic_worker_start = async_helper;
464         workers->stopping = 0;
465 }
466
467 /*
468  * starts new worker threads.  This does not enforce the max worker
469  * count in case you need to temporarily go past it.
470  */
471 static int __btrfs_start_workers(struct btrfs_workers *workers)
472 {
473         struct btrfs_worker_thread *worker;
474         int ret = 0;
475
476         worker = kzalloc(sizeof(*worker), GFP_NOFS);
477         if (!worker) {
478                 ret = -ENOMEM;
479                 goto fail;
480         }
481
482         INIT_LIST_HEAD(&worker->pending);
483         INIT_LIST_HEAD(&worker->prio_pending);
484         INIT_LIST_HEAD(&worker->worker_list);
485         spin_lock_init(&worker->lock);
486
487         atomic_set(&worker->num_pending, 0);
488         atomic_set(&worker->refs, 1);
489         worker->workers = workers;
490         worker->task = kthread_create(worker_loop, worker,
491                                       "btrfs-%s-%d", workers->name,
492                                       workers->num_workers + 1);
493         if (IS_ERR(worker->task)) {
494                 ret = PTR_ERR(worker->task);
495                 goto fail;
496         }
497
498         spin_lock_irq(&workers->lock);
499         if (workers->stopping) {
500                 spin_unlock_irq(&workers->lock);
501                 ret = -EINVAL;
502                 goto fail_kthread;
503         }
504         list_add_tail(&worker->worker_list, &workers->idle_list);
505         worker->idle = 1;
506         workers->num_workers++;
507         workers->num_workers_starting--;
508         WARN_ON(workers->num_workers_starting < 0);
509         spin_unlock_irq(&workers->lock);
510
511         wake_up_process(worker->task);
512         return 0;
513
514 fail_kthread:
515         kthread_stop(worker->task);
516 fail:
517         kfree(worker);
518         spin_lock_irq(&workers->lock);
519         workers->num_workers_starting--;
520         spin_unlock_irq(&workers->lock);
521         return ret;
522 }
523
524 int btrfs_start_workers(struct btrfs_workers *workers)
525 {
526         spin_lock_irq(&workers->lock);
527         workers->num_workers_starting++;
528         spin_unlock_irq(&workers->lock);
529         return __btrfs_start_workers(workers);
530 }
531
532 /*
533  * run through the list and find a worker thread that doesn't have a lot
534  * to do right now.  This can return null if we aren't yet at the thread
535  * count limit and all of the threads are busy.
536  */
537 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
538 {
539         struct btrfs_worker_thread *worker;
540         struct list_head *next;
541         int enforce_min;
542
543         enforce_min = (workers->num_workers + workers->num_workers_starting) <
544                 workers->max_workers;
545
546         /*
547          * if we find an idle thread, don't move it to the end of the
548          * idle list.  This improves the chance that the next submission
549          * will reuse the same thread, and maybe catch it while it is still
550          * working
551          */
552         if (!list_empty(&workers->idle_list)) {
553                 next = workers->idle_list.next;
554                 worker = list_entry(next, struct btrfs_worker_thread,
555                                     worker_list);
556                 return worker;
557         }
558         if (enforce_min || list_empty(&workers->worker_list))
559                 return NULL;
560
561         /*
562          * if we pick a busy task, move the task to the end of the list.
563          * hopefully this will keep things somewhat evenly balanced.
564          * Do the move in batches based on the sequence number.  This groups
565          * requests submitted at roughly the same time onto the same worker.
566          */
567         next = workers->worker_list.next;
568         worker = list_entry(next, struct btrfs_worker_thread, worker_list);
569         worker->sequence++;
570
571         if (worker->sequence % workers->idle_thresh == 0)
572                 list_move_tail(next, &workers->worker_list);
573         return worker;
574 }
575
576 /*
577  * selects a worker thread to take the next job.  This will either find
578  * an idle worker, start a new worker up to the max count, or just return
579  * one of the existing busy workers.
580  */
581 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
582 {
583         struct btrfs_worker_thread *worker;
584         unsigned long flags;
585         struct list_head *fallback;
586         int ret;
587
588         spin_lock_irqsave(&workers->lock, flags);
589 again:
590         worker = next_worker(workers);
591
592         if (!worker) {
593                 if (workers->num_workers + workers->num_workers_starting >=
594                     workers->max_workers) {
595                         goto fallback;
596                 } else if (workers->atomic_worker_start) {
597                         workers->atomic_start_pending = 1;
598                         goto fallback;
599                 } else {
600                         workers->num_workers_starting++;
601                         spin_unlock_irqrestore(&workers->lock, flags);
602                         /* we're below the limit, start another worker */
603                         ret = __btrfs_start_workers(workers);
604                         spin_lock_irqsave(&workers->lock, flags);
605                         if (ret)
606                                 goto fallback;
607                         goto again;
608                 }
609         }
610         goto found;
611
612 fallback:
613         fallback = NULL;
614         /*
615          * we have failed to find any workers, just
616          * return the first one we can find.
617          */
618         if (!list_empty(&workers->worker_list))
619                 fallback = workers->worker_list.next;
620         if (!list_empty(&workers->idle_list))
621                 fallback = workers->idle_list.next;
622         BUG_ON(!fallback);
623         worker = list_entry(fallback,
624                   struct btrfs_worker_thread, worker_list);
625 found:
626         /*
627          * this makes sure the worker doesn't exit before it is placed
628          * onto a busy/idle list
629          */
630         atomic_inc(&worker->num_pending);
631         spin_unlock_irqrestore(&workers->lock, flags);
632         return worker;
633 }
634
635 /*
636  * btrfs_requeue_work just puts the work item back on the tail of the list
637  * it was taken from.  It is intended for use with long running work functions
638  * that make some progress and want to give the cpu up for others.
639  */
640 void btrfs_requeue_work(struct btrfs_work *work)
641 {
642         struct btrfs_worker_thread *worker = work->worker;
643         unsigned long flags;
644         int wake = 0;
645
646         if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
647                 return;
648
649         spin_lock_irqsave(&worker->lock, flags);
650         if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
651                 list_add_tail(&work->list, &worker->prio_pending);
652         else
653                 list_add_tail(&work->list, &worker->pending);
654         atomic_inc(&worker->num_pending);
655
656         /* by definition we're busy, take ourselves off the idle
657          * list
658          */
659         if (worker->idle) {
660                 spin_lock(&worker->workers->lock);
661                 worker->idle = 0;
662                 list_move_tail(&worker->worker_list,
663                               &worker->workers->worker_list);
664                 spin_unlock(&worker->workers->lock);
665         }
666         if (!worker->working) {
667                 wake = 1;
668                 worker->working = 1;
669         }
670
671         if (wake)
672                 wake_up_process(worker->task);
673         spin_unlock_irqrestore(&worker->lock, flags);
674 }
675
676 void btrfs_set_work_high_prio(struct btrfs_work *work)
677 {
678         set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
679 }
680
681 /*
682  * places a struct btrfs_work into the pending queue of one of the kthreads
683  */
684 void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
685 {
686         struct btrfs_worker_thread *worker;
687         unsigned long flags;
688         int wake = 0;
689
690         /* don't requeue something already on a list */
691         if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
692                 return;
693
694         worker = find_worker(workers);
695         if (workers->ordered) {
696                 /*
697                  * you're not allowed to do ordered queues from an
698                  * interrupt handler
699                  */
700                 spin_lock(&workers->order_lock);
701                 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
702                         list_add_tail(&work->order_list,
703                                       &workers->prio_order_list);
704                 } else {
705                         list_add_tail(&work->order_list, &workers->order_list);
706                 }
707                 spin_unlock(&workers->order_lock);
708         } else {
709                 INIT_LIST_HEAD(&work->order_list);
710         }
711
712         spin_lock_irqsave(&worker->lock, flags);
713
714         if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
715                 list_add_tail(&work->list, &worker->prio_pending);
716         else
717                 list_add_tail(&work->list, &worker->pending);
718         check_busy_worker(worker);
719
720         /*
721          * avoid calling into wake_up_process if this thread has already
722          * been kicked
723          */
724         if (!worker->working)
725                 wake = 1;
726         worker->working = 1;
727
728         if (wake)
729                 wake_up_process(worker->task);
730         spin_unlock_irqrestore(&worker->lock, flags);
731 }
732
733 struct __btrfs_workqueue_struct {
734         struct workqueue_struct *normal_wq;
735         /* List head pointing to ordered work list */
736         struct list_head ordered_list;
737
738         /* Spinlock for ordered_list */
739         spinlock_t list_lock;
740 };
741
742 struct btrfs_workqueue_struct {
743         struct __btrfs_workqueue_struct *normal;
744         struct __btrfs_workqueue_struct *high;
745 };
746
747 static inline struct __btrfs_workqueue_struct
748 *__btrfs_alloc_workqueue(char *name, int flags, int max_active)
749 {
750         struct __btrfs_workqueue_struct *ret = kzalloc(sizeof(*ret), GFP_NOFS);
751
752         if (unlikely(!ret))
753                 return NULL;
754
755         if (flags & WQ_HIGHPRI)
756                 ret->normal_wq = alloc_workqueue("%s-%s-high", flags,
757                                                  max_active, "btrfs", name);
758         else
759                 ret->normal_wq = alloc_workqueue("%s-%s", flags,
760                                                  max_active, "btrfs", name);
761         if (unlikely(!ret->normal_wq)) {
762                 kfree(ret);
763                 return NULL;
764         }
765
766         INIT_LIST_HEAD(&ret->ordered_list);
767         spin_lock_init(&ret->list_lock);
768         return ret;
769 }
770
771 static inline void
772 __btrfs_destroy_workqueue(struct __btrfs_workqueue_struct *wq);
773
774 struct btrfs_workqueue_struct *btrfs_alloc_workqueue(char *name,
775                                                      int flags,
776                                                      int max_active)
777 {
778         struct btrfs_workqueue_struct *ret = kzalloc(sizeof(*ret), GFP_NOFS);
779
780         if (unlikely(!ret))
781                 return NULL;
782
783         ret->normal = __btrfs_alloc_workqueue(name, flags & ~WQ_HIGHPRI,
784                                               max_active);
785         if (unlikely(!ret->normal)) {
786                 kfree(ret);
787                 return NULL;
788         }
789
790         if (flags & WQ_HIGHPRI) {
791                 ret->high = __btrfs_alloc_workqueue(name, flags, max_active);
792                 if (unlikely(!ret->high)) {
793                         __btrfs_destroy_workqueue(ret->normal);
794                         kfree(ret);
795                         return NULL;
796                 }
797         }
798         return ret;
799 }
800
801 static void run_ordered_work(struct __btrfs_workqueue_struct *wq)
802 {
803         struct list_head *list = &wq->ordered_list;
804         struct btrfs_work_struct *work;
805         spinlock_t *lock = &wq->list_lock;
806         unsigned long flags;
807
808         while (1) {
809                 spin_lock_irqsave(lock, flags);
810                 if (list_empty(list))
811                         break;
812                 work = list_entry(list->next, struct btrfs_work_struct,
813                                   ordered_list);
814                 if (!test_bit(WORK_DONE_BIT, &work->flags))
815                         break;
816
817                 /*
818                  * we are going to call the ordered done function, but
819                  * we leave the work item on the list as a barrier so
820                  * that later work items that are done don't have their
821                  * functions called before this one returns
822                  */
823                 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
824                         break;
825                 spin_unlock_irqrestore(lock, flags);
826                 work->ordered_func(work);
827
828                 /* now take the lock again and drop our item from the list */
829                 spin_lock_irqsave(lock, flags);
830                 list_del(&work->ordered_list);
831                 spin_unlock_irqrestore(lock, flags);
832
833                 /*
834                  * we don't want to call the ordered free functions
835                  * with the lock held though
836                  */
837                 work->ordered_free(work);
838         }
839         spin_unlock_irqrestore(lock, flags);
840 }
841
842 static void normal_work_helper(struct work_struct *arg)
843 {
844         struct btrfs_work_struct *work;
845         struct __btrfs_workqueue_struct *wq;
846         int need_order = 0;
847
848         work = container_of(arg, struct btrfs_work_struct, normal_work);
849         /*
850          * We should not touch things inside work in the following cases:
851          * 1) after work->func() if it has no ordered_free
852          *    Since the struct is freed in work->func().
853          * 2) after setting WORK_DONE_BIT
854          *    The work may be freed in other threads almost instantly.
855          * So we save the needed things here.
856          */
857         if (work->ordered_func)
858                 need_order = 1;
859         wq = work->wq;
860
861         work->func(work);
862         if (need_order) {
863                 set_bit(WORK_DONE_BIT, &work->flags);
864                 run_ordered_work(wq);
865         }
866 }
867
868 void btrfs_init_work(struct btrfs_work_struct *work,
869                      void (*func)(struct btrfs_work_struct *),
870                      void (*ordered_func)(struct btrfs_work_struct *),
871                      void (*ordered_free)(struct btrfs_work_struct *))
872 {
873         work->func = func;
874         work->ordered_func = ordered_func;
875         work->ordered_free = ordered_free;
876         INIT_WORK(&work->normal_work, normal_work_helper);
877         INIT_LIST_HEAD(&work->ordered_list);
878         work->flags = 0;
879 }
880
881 static inline void __btrfs_queue_work(struct __btrfs_workqueue_struct *wq,
882                                       struct btrfs_work_struct *work)
883 {
884         unsigned long flags;
885
886         work->wq = wq;
887         if (work->ordered_func) {
888                 spin_lock_irqsave(&wq->list_lock, flags);
889                 list_add_tail(&work->ordered_list, &wq->ordered_list);
890                 spin_unlock_irqrestore(&wq->list_lock, flags);
891         }
892         queue_work(wq->normal_wq, &work->normal_work);
893 }
894
895 void btrfs_queue_work(struct btrfs_workqueue_struct *wq,
896                       struct btrfs_work_struct *work)
897 {
898         struct __btrfs_workqueue_struct *dest_wq;
899
900         if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags) && wq->high)
901                 dest_wq = wq->high;
902         else
903                 dest_wq = wq->normal;
904         __btrfs_queue_work(dest_wq, work);
905 }
906
907 static inline void
908 __btrfs_destroy_workqueue(struct __btrfs_workqueue_struct *wq)
909 {
910         destroy_workqueue(wq->normal_wq);
911         kfree(wq);
912 }
913
914 void btrfs_destroy_workqueue(struct btrfs_workqueue_struct *wq)
915 {
916         if (!wq)
917                 return;
918         if (wq->high)
919                 __btrfs_destroy_workqueue(wq->high);
920         __btrfs_destroy_workqueue(wq->normal);
921 }
922
923 void btrfs_workqueue_set_max(struct btrfs_workqueue_struct *wq, int max)
924 {
925         workqueue_set_max_active(wq->normal->normal_wq, max);
926         if (wq->high)
927                 workqueue_set_max_active(wq->high->normal_wq, max);
928 }
929
930 void btrfs_set_work_high_priority(struct btrfs_work_struct *work)
931 {
932         set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
933 }