d9ee1b96549ab8dcab03773b2bd7884353b51823
[cascardo/linux.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94
95 /*
96  * Lock compatibilty matrix - thanks Steve
97  * UN = Unlocked state. Not really a state, used as a flag
98  * PD = Padding. Used to make the matrix a nice power of two in size
99  * Other states are the same as the VMS DLM.
100  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
101  */
102
103 static const int __dlm_compat_matrix[8][8] = {
104       /* UN NL CR CW PR PW EX PD */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
106         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
107         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
108         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
109         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
110         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
111         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
112         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
113 };
114
115 /*
116  * This defines the direction of transfer of LVB data.
117  * Granted mode is the row; requested mode is the column.
118  * Usage: matrix[grmode+1][rqmode+1]
119  * 1 = LVB is returned to the caller
120  * 0 = LVB is written to the resource
121  * -1 = nothing happens to the LVB
122  */
123
124 const int dlm_lvb_operations[8][8] = {
125         /* UN   NL  CR  CW  PR  PW  EX  PD*/
126         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
127         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
128         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
129         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
130         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
131         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
133         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
134 };
135
136 #define modes_compat(gr, rq) \
137         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138
139 int dlm_modes_compat(int mode1, int mode2)
140 {
141         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143
144 /*
145  * Compatibility matrix for conversions with QUECVT set.
146  * Granted mode is the row; requested mode is the column.
147  * Usage: matrix[grmode+1][rqmode+1]
148  */
149
150 static const int __quecvt_compat_matrix[8][8] = {
151       /* UN NL CR CW PR PW EX PD */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
153         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
154         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
155         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
156         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
157         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
158         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
159         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
160 };
161
162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169                (unsigned long long)lkb->lkb_recover_seq);
170 }
171
172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175                "rlc %d name %s\n",
176                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178                r->res_name);
179 }
180
181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183         struct dlm_lkb *lkb;
184
185         dlm_print_rsb(r);
186
187         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189         printk(KERN_ERR "rsb lookup list\n");
190         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb grant queue:\n");
193         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195         printk(KERN_ERR "rsb convert queue:\n");
196         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197                 dlm_print_lkb(lkb);
198         printk(KERN_ERR "rsb wait queue:\n");
199         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200                 dlm_print_lkb(lkb);
201 }
202
203 /* Threads cannot use the lockspace while it's being recovered */
204
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
207         down_read(&ls->ls_in_recovery);
208 }
209
210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212         up_read(&ls->ls_in_recovery);
213 }
214
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 {
217         return down_read_trylock(&ls->ls_in_recovery);
218 }
219
220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229
230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234
235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239
240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244
245 static inline int is_remote(struct dlm_rsb *r)
246 {
247         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248         return !!r->res_nodeid;
249 }
250
251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287                                   DLM_IFL_OVERLAP_CANCEL));
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         del_timeout(lkb);
296
297         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298
299         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
300            timeout caused the cancel then return -ETIMEDOUT */
301         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303                 rv = -ETIMEDOUT;
304         }
305
306         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308                 rv = -EDEADLK;
309         }
310
311         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316         queue_cast(r, lkb,
317                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322         if (is_master_copy(lkb)) {
323                 send_bast(r, lkb, rqmode);
324         } else {
325                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326         }
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 /* This is only called to add a reference when the code already holds
334    a valid reference to the rsb, so there's no need for locking. */
335
336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338         kref_get(&r->res_ref);
339 }
340
341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343         hold_rsb(r);
344 }
345
346 /* When all references to the rsb are gone it's transferred to
347    the tossed list for later disposal. */
348
349 static void put_rsb(struct dlm_rsb *r)
350 {
351         struct dlm_ls *ls = r->res_ls;
352         uint32_t bucket = r->res_bucket;
353
354         spin_lock(&ls->ls_rsbtbl[bucket].lock);
355         kref_put(&r->res_ref, toss_rsb);
356         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357 }
358
359 void dlm_put_rsb(struct dlm_rsb *r)
360 {
361         put_rsb(r);
362 }
363
364 static int pre_rsb_struct(struct dlm_ls *ls)
365 {
366         struct dlm_rsb *r1, *r2;
367         int count = 0;
368
369         spin_lock(&ls->ls_new_rsb_spin);
370         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
371                 spin_unlock(&ls->ls_new_rsb_spin);
372                 return 0;
373         }
374         spin_unlock(&ls->ls_new_rsb_spin);
375
376         r1 = dlm_allocate_rsb(ls);
377         r2 = dlm_allocate_rsb(ls);
378
379         spin_lock(&ls->ls_new_rsb_spin);
380         if (r1) {
381                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
382                 ls->ls_new_rsb_count++;
383         }
384         if (r2) {
385                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
386                 ls->ls_new_rsb_count++;
387         }
388         count = ls->ls_new_rsb_count;
389         spin_unlock(&ls->ls_new_rsb_spin);
390
391         if (!count)
392                 return -ENOMEM;
393         return 0;
394 }
395
396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
397    unlock any spinlocks, go back and call pre_rsb_struct again.
398    Otherwise, take an rsb off the list and return it. */
399
400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
401                           struct dlm_rsb **r_ret)
402 {
403         struct dlm_rsb *r;
404         int count;
405
406         spin_lock(&ls->ls_new_rsb_spin);
407         if (list_empty(&ls->ls_new_rsb)) {
408                 count = ls->ls_new_rsb_count;
409                 spin_unlock(&ls->ls_new_rsb_spin);
410                 log_debug(ls, "find_rsb retry %d %d %s",
411                           count, dlm_config.ci_new_rsb_count, name);
412                 return -EAGAIN;
413         }
414
415         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
416         list_del(&r->res_hashchain);
417         /* Convert the empty list_head to a NULL rb_node for tree usage: */
418         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
419         ls->ls_new_rsb_count--;
420         spin_unlock(&ls->ls_new_rsb_spin);
421
422         r->res_ls = ls;
423         r->res_length = len;
424         memcpy(r->res_name, name, len);
425         mutex_init(&r->res_mutex);
426
427         INIT_LIST_HEAD(&r->res_lookup);
428         INIT_LIST_HEAD(&r->res_grantqueue);
429         INIT_LIST_HEAD(&r->res_convertqueue);
430         INIT_LIST_HEAD(&r->res_waitqueue);
431         INIT_LIST_HEAD(&r->res_root_list);
432         INIT_LIST_HEAD(&r->res_recover_list);
433
434         *r_ret = r;
435         return 0;
436 }
437
438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
439 {
440         char maxname[DLM_RESNAME_MAXLEN];
441
442         memset(maxname, 0, DLM_RESNAME_MAXLEN);
443         memcpy(maxname, name, nlen);
444         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
445 }
446
447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448                         struct dlm_rsb **r_ret)
449 {
450         struct rb_node *node = tree->rb_node;
451         struct dlm_rsb *r;
452         int rc;
453
454         while (node) {
455                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
456                 rc = rsb_cmp(r, name, len);
457                 if (rc < 0)
458                         node = node->rb_left;
459                 else if (rc > 0)
460                         node = node->rb_right;
461                 else
462                         goto found;
463         }
464         *r_ret = NULL;
465         return -EBADR;
466
467  found:
468         *r_ret = r;
469         return 0;
470 }
471
472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
473 {
474         struct rb_node **newn = &tree->rb_node;
475         struct rb_node *parent = NULL;
476         int rc;
477
478         while (*newn) {
479                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
480                                                res_hashnode);
481
482                 parent = *newn;
483                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
484                 if (rc < 0)
485                         newn = &parent->rb_left;
486                 else if (rc > 0)
487                         newn = &parent->rb_right;
488                 else {
489                         log_print("rsb_insert match");
490                         dlm_dump_rsb(rsb);
491                         dlm_dump_rsb(cur);
492                         return -EEXIST;
493                 }
494         }
495
496         rb_link_node(&rsb->res_hashnode, parent, newn);
497         rb_insert_color(&rsb->res_hashnode, tree);
498         return 0;
499 }
500
501 /*
502  * Find rsb in rsbtbl and potentially create/add one
503  *
504  * Delaying the release of rsb's has a similar benefit to applications keeping
505  * NL locks on an rsb, but without the guarantee that the cached master value
506  * will still be valid when the rsb is reused.  Apps aren't always smart enough
507  * to keep NL locks on an rsb that they may lock again shortly; this can lead
508  * to excessive master lookups and removals if we don't delay the release.
509  *
510  * Searching for an rsb means looking through both the normal list and toss
511  * list.  When found on the toss list the rsb is moved to the normal list with
512  * ref count of 1; when found on normal list the ref count is incremented.
513  *
514  * rsb's on the keep list are being used locally and refcounted.
515  * rsb's on the toss list are not being used locally, and are not refcounted.
516  *
517  * The toss list rsb's were either
518  * - previously used locally but not any more (were on keep list, then
519  *   moved to toss list when last refcount dropped)
520  * - created and put on toss list as a directory record for a lookup
521  *   (we are the dir node for the res, but are not using the res right now,
522  *   but some other node is)
523  *
524  * The purpose of find_rsb() is to return a refcounted rsb for local use.
525  * So, if the given rsb is on the toss list, it is moved to the keep list
526  * before being returned.
527  *
528  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529  * more refcounts exist, so the rsb is moved from the keep list to the
530  * toss list.
531  *
532  * rsb's on both keep and toss lists are used for doing a name to master
533  * lookups.  rsb's that are in use locally (and being refcounted) are on
534  * the keep list, rsb's that are not in use locally (not refcounted) and
535  * only exist for name/master lookups are on the toss list.
536  *
537  * rsb's on the toss list who's dir_nodeid is not local can have stale
538  * name/master mappings.  So, remote requests on such rsb's can potentially
539  * return with an error, which means the mapping is stale and needs to
540  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
541  * first_lkid is to keep only a single outstanding request on an rsb
542  * while that rsb has a potentially stale master.)
543  */
544
545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546                         uint32_t hash, uint32_t b,
547                         int dir_nodeid, int from_nodeid,
548                         unsigned int flags, struct dlm_rsb **r_ret)
549 {
550         struct dlm_rsb *r = NULL;
551         int our_nodeid = dlm_our_nodeid();
552         int from_local = 0;
553         int from_other = 0;
554         int from_dir = 0;
555         int create = 0;
556         int error;
557
558         if (flags & R_RECEIVE_REQUEST) {
559                 if (from_nodeid == dir_nodeid)
560                         from_dir = 1;
561                 else
562                         from_other = 1;
563         } else if (flags & R_REQUEST) {
564                 from_local = 1;
565         }
566
567         /*
568          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569          * from_nodeid has sent us a lock in dlm_recover_locks, believing
570          * we're the new master.  Our local recovery may not have set
571          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
572          * create the rsb; dlm_recover_process_copy() will handle EBADR
573          * by resending.
574          *
575          * If someone sends us a request, we are the dir node, and we do
576          * not find the rsb anywhere, then recreate it.  This happens if
577          * someone sends us a request after we have removed/freed an rsb
578          * from our toss list.  (They sent a request instead of lookup
579          * because they are using an rsb from their toss list.)
580          */
581
582         if (from_local || from_dir ||
583             (from_other && (dir_nodeid == our_nodeid))) {
584                 create = 1;
585         }
586
587  retry:
588         if (create) {
589                 error = pre_rsb_struct(ls);
590                 if (error < 0)
591                         goto out;
592         }
593
594         spin_lock(&ls->ls_rsbtbl[b].lock);
595
596         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
597         if (error)
598                 goto do_toss;
599         
600         /*
601          * rsb is active, so we can't check master_nodeid without lock_rsb.
602          */
603
604         kref_get(&r->res_ref);
605         error = 0;
606         goto out_unlock;
607
608
609  do_toss:
610         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
611         if (error)
612                 goto do_new;
613
614         /*
615          * rsb found inactive (master_nodeid may be out of date unless
616          * we are the dir_nodeid or were the master)  No other thread
617          * is using this rsb because it's on the toss list, so we can
618          * look at or update res_master_nodeid without lock_rsb.
619          */
620
621         if ((r->res_master_nodeid != our_nodeid) && from_other) {
622                 /* our rsb was not master, and another node (not the dir node)
623                    has sent us a request */
624                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625                           from_nodeid, r->res_master_nodeid, dir_nodeid,
626                           r->res_name);
627                 error = -ENOTBLK;
628                 goto out_unlock;
629         }
630
631         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632                 /* don't think this should ever happen */
633                 log_error(ls, "find_rsb toss from_dir %d master %d",
634                           from_nodeid, r->res_master_nodeid);
635                 dlm_print_rsb(r);
636                 /* fix it and go on */
637                 r->res_master_nodeid = our_nodeid;
638                 r->res_nodeid = 0;
639                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
640                 r->res_first_lkid = 0;
641         }
642
643         if (from_local && (r->res_master_nodeid != our_nodeid)) {
644                 /* Because we have held no locks on this rsb,
645                    res_master_nodeid could have become stale. */
646                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
647                 r->res_first_lkid = 0;
648         }
649
650         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652         goto out_unlock;
653
654
655  do_new:
656         /*
657          * rsb not found
658          */
659
660         if (error == -EBADR && !create)
661                 goto out_unlock;
662
663         error = get_rsb_struct(ls, name, len, &r);
664         if (error == -EAGAIN) {
665                 spin_unlock(&ls->ls_rsbtbl[b].lock);
666                 goto retry;
667         }
668         if (error)
669                 goto out_unlock;
670
671         r->res_hash = hash;
672         r->res_bucket = b;
673         r->res_dir_nodeid = dir_nodeid;
674         kref_init(&r->res_ref);
675
676         if (from_dir) {
677                 /* want to see how often this happens */
678                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679                           from_nodeid, r->res_name);
680                 r->res_master_nodeid = our_nodeid;
681                 r->res_nodeid = 0;
682                 goto out_add;
683         }
684
685         if (from_other && (dir_nodeid != our_nodeid)) {
686                 /* should never happen */
687                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689                 dlm_free_rsb(r);
690                 error = -ENOTBLK;
691                 goto out_unlock;
692         }
693
694         if (from_other) {
695                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
696                           from_nodeid, dir_nodeid, r->res_name);
697         }
698
699         if (dir_nodeid == our_nodeid) {
700                 /* When we are the dir nodeid, we can set the master
701                    node immediately */
702                 r->res_master_nodeid = our_nodeid;
703                 r->res_nodeid = 0;
704         } else {
705                 /* set_master will send_lookup to dir_nodeid */
706                 r->res_master_nodeid = 0;
707                 r->res_nodeid = -1;
708         }
709
710  out_add:
711         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
712  out_unlock:
713         spin_unlock(&ls->ls_rsbtbl[b].lock);
714  out:
715         *r_ret = r;
716         return error;
717 }
718
719 /* During recovery, other nodes can send us new MSTCPY locks (from
720    dlm_recover_locks) before we've made ourself master (in
721    dlm_recover_masters). */
722
723 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
724                           uint32_t hash, uint32_t b,
725                           int dir_nodeid, int from_nodeid,
726                           unsigned int flags, struct dlm_rsb **r_ret)
727 {
728         struct dlm_rsb *r = NULL;
729         int our_nodeid = dlm_our_nodeid();
730         int recover = (flags & R_RECEIVE_RECOVER);
731         int error;
732
733  retry:
734         error = pre_rsb_struct(ls);
735         if (error < 0)
736                 goto out;
737
738         spin_lock(&ls->ls_rsbtbl[b].lock);
739
740         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
741         if (error)
742                 goto do_toss;
743
744         /*
745          * rsb is active, so we can't check master_nodeid without lock_rsb.
746          */
747
748         kref_get(&r->res_ref);
749         goto out_unlock;
750
751
752  do_toss:
753         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
754         if (error)
755                 goto do_new;
756
757         /*
758          * rsb found inactive. No other thread is using this rsb because
759          * it's on the toss list, so we can look at or update
760          * res_master_nodeid without lock_rsb.
761          */
762
763         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
764                 /* our rsb is not master, and another node has sent us a
765                    request; this should never happen */
766                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
767                           from_nodeid, r->res_master_nodeid, dir_nodeid);
768                 dlm_print_rsb(r);
769                 error = -ENOTBLK;
770                 goto out_unlock;
771         }
772
773         if (!recover && (r->res_master_nodeid != our_nodeid) &&
774             (dir_nodeid == our_nodeid)) {
775                 /* our rsb is not master, and we are dir; may as well fix it;
776                    this should never happen */
777                 log_error(ls, "find_rsb toss our %d master %d dir %d",
778                           our_nodeid, r->res_master_nodeid, dir_nodeid);
779                 dlm_print_rsb(r);
780                 r->res_master_nodeid = our_nodeid;
781                 r->res_nodeid = 0;
782         }
783
784         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
785         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
786         goto out_unlock;
787
788
789  do_new:
790         /*
791          * rsb not found
792          */
793
794         error = get_rsb_struct(ls, name, len, &r);
795         if (error == -EAGAIN) {
796                 spin_unlock(&ls->ls_rsbtbl[b].lock);
797                 goto retry;
798         }
799         if (error)
800                 goto out_unlock;
801
802         r->res_hash = hash;
803         r->res_bucket = b;
804         r->res_dir_nodeid = dir_nodeid;
805         r->res_master_nodeid = dir_nodeid;
806         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
807         kref_init(&r->res_ref);
808
809         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
810  out_unlock:
811         spin_unlock(&ls->ls_rsbtbl[b].lock);
812  out:
813         *r_ret = r;
814         return error;
815 }
816
817 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
818                     unsigned int flags, struct dlm_rsb **r_ret)
819 {
820         uint32_t hash, b;
821         int dir_nodeid;
822
823         if (len > DLM_RESNAME_MAXLEN)
824                 return -EINVAL;
825
826         hash = jhash(name, len, 0);
827         b = hash & (ls->ls_rsbtbl_size - 1);
828
829         dir_nodeid = dlm_hash2nodeid(ls, hash);
830
831         if (dlm_no_directory(ls))
832                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
833                                       from_nodeid, flags, r_ret);
834         else
835                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
836                                       from_nodeid, flags, r_ret);
837 }
838
839 /* we have received a request and found that res_master_nodeid != our_nodeid,
840    so we need to return an error or make ourself the master */
841
842 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
843                                   int from_nodeid)
844 {
845         if (dlm_no_directory(ls)) {
846                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
847                           from_nodeid, r->res_master_nodeid,
848                           r->res_dir_nodeid);
849                 dlm_print_rsb(r);
850                 return -ENOTBLK;
851         }
852
853         if (from_nodeid != r->res_dir_nodeid) {
854                 /* our rsb is not master, and another node (not the dir node)
855                    has sent us a request.  this is much more common when our
856                    master_nodeid is zero, so limit debug to non-zero.  */
857
858                 if (r->res_master_nodeid) {
859                         log_debug(ls, "validate master from_other %d master %d "
860                                   "dir %d first %x %s", from_nodeid,
861                                   r->res_master_nodeid, r->res_dir_nodeid,
862                                   r->res_first_lkid, r->res_name);
863                 }
864                 return -ENOTBLK;
865         } else {
866                 /* our rsb is not master, but the dir nodeid has sent us a
867                    request; this could happen with master 0 / res_nodeid -1 */
868
869                 if (r->res_master_nodeid) {
870                         log_error(ls, "validate master from_dir %d master %d "
871                                   "first %x %s",
872                                   from_nodeid, r->res_master_nodeid,
873                                   r->res_first_lkid, r->res_name);
874                 }
875
876                 r->res_master_nodeid = dlm_our_nodeid();
877                 r->res_nodeid = 0;
878                 return 0;
879         }
880 }
881
882 /*
883  * We're the dir node for this res and another node wants to know the
884  * master nodeid.  During normal operation (non recovery) this is only
885  * called from receive_lookup(); master lookups when the local node is
886  * the dir node are done by find_rsb().
887  *
888  * normal operation, we are the dir node for a resource
889  * . _request_lock
890  * . set_master
891  * . send_lookup
892  * . receive_lookup
893  * . dlm_master_lookup flags 0
894  *
895  * recover directory, we are rebuilding dir for all resources
896  * . dlm_recover_directory
897  * . dlm_rcom_names
898  *   remote node sends back the rsb names it is master of and we are dir of
899  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
900  *   we either create new rsb setting remote node as master, or find existing
901  *   rsb and set master to be the remote node.
902  *
903  * recover masters, we are finding the new master for resources
904  * . dlm_recover_masters
905  * . recover_master
906  * . dlm_send_rcom_lookup
907  * . receive_rcom_lookup
908  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
909  */
910
911 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
912                       unsigned int flags, int *r_nodeid, int *result)
913 {
914         struct dlm_rsb *r = NULL;
915         uint32_t hash, b;
916         int from_master = (flags & DLM_LU_RECOVER_DIR);
917         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
918         int our_nodeid = dlm_our_nodeid();
919         int dir_nodeid, error, toss_list = 0;
920
921         if (len > DLM_RESNAME_MAXLEN)
922                 return -EINVAL;
923
924         if (from_nodeid == our_nodeid) {
925                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
926                           our_nodeid, flags);
927                 return -EINVAL;
928         }
929
930         hash = jhash(name, len, 0);
931         b = hash & (ls->ls_rsbtbl_size - 1);
932
933         dir_nodeid = dlm_hash2nodeid(ls, hash);
934         if (dir_nodeid != our_nodeid) {
935                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
936                           from_nodeid, dir_nodeid, our_nodeid, hash,
937                           ls->ls_num_nodes);
938                 *r_nodeid = -1;
939                 return -EINVAL;
940         }
941
942  retry:
943         error = pre_rsb_struct(ls);
944         if (error < 0)
945                 return error;
946
947         spin_lock(&ls->ls_rsbtbl[b].lock);
948         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
949         if (!error) {
950                 /* because the rsb is active, we need to lock_rsb before
951                    checking/changing re_master_nodeid */
952
953                 hold_rsb(r);
954                 spin_unlock(&ls->ls_rsbtbl[b].lock);
955                 lock_rsb(r);
956                 goto found;
957         }
958
959         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
960         if (error)
961                 goto not_found;
962
963         /* because the rsb is inactive (on toss list), it's not refcounted
964            and lock_rsb is not used, but is protected by the rsbtbl lock */
965
966         toss_list = 1;
967  found:
968         if (r->res_dir_nodeid != our_nodeid) {
969                 /* should not happen, but may as well fix it and carry on */
970                 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
971                           r->res_dir_nodeid, our_nodeid, r->res_name);
972                 r->res_dir_nodeid = our_nodeid;
973         }
974
975         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
976                 /* Recovery uses this function to set a new master when
977                    the previous master failed.  Setting NEW_MASTER will
978                    force dlm_recover_masters to call recover_master on this
979                    rsb even though the res_nodeid is no longer removed. */
980
981                 r->res_master_nodeid = from_nodeid;
982                 r->res_nodeid = from_nodeid;
983                 rsb_set_flag(r, RSB_NEW_MASTER);
984
985                 if (toss_list) {
986                         /* I don't think we should ever find it on toss list. */
987                         log_error(ls, "dlm_master_lookup fix_master on toss");
988                         dlm_dump_rsb(r);
989                 }
990         }
991
992         if (from_master && (r->res_master_nodeid != from_nodeid)) {
993                 /* this will happen if from_nodeid became master during
994                    a previous recovery cycle, and we aborted the previous
995                    cycle before recovering this master value */
996
997                 log_limit(ls, "dlm_master_lookup from_master %d "
998                           "master_nodeid %d res_nodeid %d first %x %s",
999                           from_nodeid, r->res_master_nodeid, r->res_nodeid,
1000                           r->res_first_lkid, r->res_name);
1001
1002                 if (r->res_master_nodeid == our_nodeid) {
1003                         log_error(ls, "from_master %d our_master", from_nodeid);
1004                         dlm_dump_rsb(r);
1005                         dlm_send_rcom_lookup_dump(r, from_nodeid);
1006                         goto out_found;
1007                 }
1008
1009                 r->res_master_nodeid = from_nodeid;
1010                 r->res_nodeid = from_nodeid;
1011                 rsb_set_flag(r, RSB_NEW_MASTER);
1012         }
1013
1014         if (!r->res_master_nodeid) {
1015                 /* this will happen if recovery happens while we're looking
1016                    up the master for this rsb */
1017
1018                 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1019                           from_nodeid, r->res_first_lkid, r->res_name);
1020                 r->res_master_nodeid = from_nodeid;
1021                 r->res_nodeid = from_nodeid;
1022         }
1023
1024         if (!from_master && !fix_master &&
1025             (r->res_master_nodeid == from_nodeid)) {
1026                 /* this can happen when the master sends remove, the dir node
1027                    finds the rsb on the keep list and ignores the remove,
1028                    and the former master sends a lookup */
1029
1030                 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1031                           "first %x %s", from_nodeid, flags,
1032                           r->res_first_lkid, r->res_name);
1033         }
1034
1035  out_found:
1036         *r_nodeid = r->res_master_nodeid;
1037         if (result)
1038                 *result = DLM_LU_MATCH;
1039
1040         if (toss_list) {
1041                 r->res_toss_time = jiffies;
1042                 /* the rsb was inactive (on toss list) */
1043                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1044         } else {
1045                 /* the rsb was active */
1046                 unlock_rsb(r);
1047                 put_rsb(r);
1048         }
1049         return 0;
1050
1051  not_found:
1052         error = get_rsb_struct(ls, name, len, &r);
1053         if (error == -EAGAIN) {
1054                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1055                 goto retry;
1056         }
1057         if (error)
1058                 goto out_unlock;
1059
1060         r->res_hash = hash;
1061         r->res_bucket = b;
1062         r->res_dir_nodeid = our_nodeid;
1063         r->res_master_nodeid = from_nodeid;
1064         r->res_nodeid = from_nodeid;
1065         kref_init(&r->res_ref);
1066         r->res_toss_time = jiffies;
1067
1068         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1069         if (error) {
1070                 /* should never happen */
1071                 dlm_free_rsb(r);
1072                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1073                 goto retry;
1074         }
1075
1076         if (result)
1077                 *result = DLM_LU_ADD;
1078         *r_nodeid = from_nodeid;
1079         error = 0;
1080  out_unlock:
1081         spin_unlock(&ls->ls_rsbtbl[b].lock);
1082         return error;
1083 }
1084
1085 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1086 {
1087         struct rb_node *n;
1088         struct dlm_rsb *r;
1089         int i;
1090
1091         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1092                 spin_lock(&ls->ls_rsbtbl[i].lock);
1093                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1094                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1095                         if (r->res_hash == hash)
1096                                 dlm_dump_rsb(r);
1097                 }
1098                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1099         }
1100 }
1101
1102 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1103 {
1104         struct dlm_rsb *r = NULL;
1105         uint32_t hash, b;
1106         int error;
1107
1108         hash = jhash(name, len, 0);
1109         b = hash & (ls->ls_rsbtbl_size - 1);
1110
1111         spin_lock(&ls->ls_rsbtbl[b].lock);
1112         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1113         if (!error)
1114                 goto out_dump;
1115
1116         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1117         if (error)
1118                 goto out;
1119  out_dump:
1120         dlm_dump_rsb(r);
1121  out:
1122         spin_unlock(&ls->ls_rsbtbl[b].lock);
1123 }
1124
1125 static void toss_rsb(struct kref *kref)
1126 {
1127         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1128         struct dlm_ls *ls = r->res_ls;
1129
1130         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1131         kref_init(&r->res_ref);
1132         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1133         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1134         r->res_toss_time = jiffies;
1135         if (r->res_lvbptr) {
1136                 dlm_free_lvb(r->res_lvbptr);
1137                 r->res_lvbptr = NULL;
1138         }
1139 }
1140
1141 /* See comment for unhold_lkb */
1142
1143 static void unhold_rsb(struct dlm_rsb *r)
1144 {
1145         int rv;
1146         rv = kref_put(&r->res_ref, toss_rsb);
1147         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1148 }
1149
1150 static void kill_rsb(struct kref *kref)
1151 {
1152         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1153
1154         /* All work is done after the return from kref_put() so we
1155            can release the write_lock before the remove and free. */
1156
1157         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1158         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1159         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1160         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1161         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1162         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1163 }
1164
1165 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1166    The rsb must exist as long as any lkb's for it do. */
1167
1168 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1169 {
1170         hold_rsb(r);
1171         lkb->lkb_resource = r;
1172 }
1173
1174 static void detach_lkb(struct dlm_lkb *lkb)
1175 {
1176         if (lkb->lkb_resource) {
1177                 put_rsb(lkb->lkb_resource);
1178                 lkb->lkb_resource = NULL;
1179         }
1180 }
1181
1182 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1183 {
1184         struct dlm_lkb *lkb;
1185         int rv, id;
1186
1187         lkb = dlm_allocate_lkb(ls);
1188         if (!lkb)
1189                 return -ENOMEM;
1190
1191         lkb->lkb_nodeid = -1;
1192         lkb->lkb_grmode = DLM_LOCK_IV;
1193         kref_init(&lkb->lkb_ref);
1194         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1195         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1196         INIT_LIST_HEAD(&lkb->lkb_time_list);
1197         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1198         mutex_init(&lkb->lkb_cb_mutex);
1199         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1200
1201  retry:
1202         rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
1203         if (!rv)
1204                 return -ENOMEM;
1205
1206         spin_lock(&ls->ls_lkbidr_spin);
1207         rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
1208         if (!rv)
1209                 lkb->lkb_id = id;
1210         spin_unlock(&ls->ls_lkbidr_spin);
1211
1212         if (rv == -EAGAIN)
1213                 goto retry;
1214
1215         if (rv < 0) {
1216                 log_error(ls, "create_lkb idr error %d", rv);
1217                 return rv;
1218         }
1219
1220         *lkb_ret = lkb;
1221         return 0;
1222 }
1223
1224 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1225 {
1226         struct dlm_lkb *lkb;
1227
1228         spin_lock(&ls->ls_lkbidr_spin);
1229         lkb = idr_find(&ls->ls_lkbidr, lkid);
1230         if (lkb)
1231                 kref_get(&lkb->lkb_ref);
1232         spin_unlock(&ls->ls_lkbidr_spin);
1233
1234         *lkb_ret = lkb;
1235         return lkb ? 0 : -ENOENT;
1236 }
1237
1238 static void kill_lkb(struct kref *kref)
1239 {
1240         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1241
1242         /* All work is done after the return from kref_put() so we
1243            can release the write_lock before the detach_lkb */
1244
1245         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1246 }
1247
1248 /* __put_lkb() is used when an lkb may not have an rsb attached to
1249    it so we need to provide the lockspace explicitly */
1250
1251 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1252 {
1253         uint32_t lkid = lkb->lkb_id;
1254
1255         spin_lock(&ls->ls_lkbidr_spin);
1256         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1257                 idr_remove(&ls->ls_lkbidr, lkid);
1258                 spin_unlock(&ls->ls_lkbidr_spin);
1259
1260                 detach_lkb(lkb);
1261
1262                 /* for local/process lkbs, lvbptr points to caller's lksb */
1263                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1264                         dlm_free_lvb(lkb->lkb_lvbptr);
1265                 dlm_free_lkb(lkb);
1266                 return 1;
1267         } else {
1268                 spin_unlock(&ls->ls_lkbidr_spin);
1269                 return 0;
1270         }
1271 }
1272
1273 int dlm_put_lkb(struct dlm_lkb *lkb)
1274 {
1275         struct dlm_ls *ls;
1276
1277         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1278         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1279
1280         ls = lkb->lkb_resource->res_ls;
1281         return __put_lkb(ls, lkb);
1282 }
1283
1284 /* This is only called to add a reference when the code already holds
1285    a valid reference to the lkb, so there's no need for locking. */
1286
1287 static inline void hold_lkb(struct dlm_lkb *lkb)
1288 {
1289         kref_get(&lkb->lkb_ref);
1290 }
1291
1292 /* This is called when we need to remove a reference and are certain
1293    it's not the last ref.  e.g. del_lkb is always called between a
1294    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1295    put_lkb would work fine, but would involve unnecessary locking */
1296
1297 static inline void unhold_lkb(struct dlm_lkb *lkb)
1298 {
1299         int rv;
1300         rv = kref_put(&lkb->lkb_ref, kill_lkb);
1301         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1302 }
1303
1304 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1305                             int mode)
1306 {
1307         struct dlm_lkb *lkb = NULL;
1308
1309         list_for_each_entry(lkb, head, lkb_statequeue)
1310                 if (lkb->lkb_rqmode < mode)
1311                         break;
1312
1313         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1314 }
1315
1316 /* add/remove lkb to rsb's grant/convert/wait queue */
1317
1318 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1319 {
1320         kref_get(&lkb->lkb_ref);
1321
1322         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1323
1324         lkb->lkb_timestamp = ktime_get();
1325
1326         lkb->lkb_status = status;
1327
1328         switch (status) {
1329         case DLM_LKSTS_WAITING:
1330                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1331                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1332                 else
1333                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1334                 break;
1335         case DLM_LKSTS_GRANTED:
1336                 /* convention says granted locks kept in order of grmode */
1337                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1338                                 lkb->lkb_grmode);
1339                 break;
1340         case DLM_LKSTS_CONVERT:
1341                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1342                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1343                 else
1344                         list_add_tail(&lkb->lkb_statequeue,
1345                                       &r->res_convertqueue);
1346                 break;
1347         default:
1348                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1349         }
1350 }
1351
1352 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1353 {
1354         lkb->lkb_status = 0;
1355         list_del(&lkb->lkb_statequeue);
1356         unhold_lkb(lkb);
1357 }
1358
1359 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1360 {
1361         hold_lkb(lkb);
1362         del_lkb(r, lkb);
1363         add_lkb(r, lkb, sts);
1364         unhold_lkb(lkb);
1365 }
1366
1367 static int msg_reply_type(int mstype)
1368 {
1369         switch (mstype) {
1370         case DLM_MSG_REQUEST:
1371                 return DLM_MSG_REQUEST_REPLY;
1372         case DLM_MSG_CONVERT:
1373                 return DLM_MSG_CONVERT_REPLY;
1374         case DLM_MSG_UNLOCK:
1375                 return DLM_MSG_UNLOCK_REPLY;
1376         case DLM_MSG_CANCEL:
1377                 return DLM_MSG_CANCEL_REPLY;
1378         case DLM_MSG_LOOKUP:
1379                 return DLM_MSG_LOOKUP_REPLY;
1380         }
1381         return -1;
1382 }
1383
1384 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1385 {
1386         int i;
1387
1388         for (i = 0; i < num_nodes; i++) {
1389                 if (!warned[i]) {
1390                         warned[i] = nodeid;
1391                         return 0;
1392                 }
1393                 if (warned[i] == nodeid)
1394                         return 1;
1395         }
1396         return 0;
1397 }
1398
1399 void dlm_scan_waiters(struct dlm_ls *ls)
1400 {
1401         struct dlm_lkb *lkb;
1402         ktime_t zero = ktime_set(0, 0);
1403         s64 us;
1404         s64 debug_maxus = 0;
1405         u32 debug_scanned = 0;
1406         u32 debug_expired = 0;
1407         int num_nodes = 0;
1408         int *warned = NULL;
1409
1410         if (!dlm_config.ci_waitwarn_us)
1411                 return;
1412
1413         mutex_lock(&ls->ls_waiters_mutex);
1414
1415         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1416                 if (ktime_equal(lkb->lkb_wait_time, zero))
1417                         continue;
1418
1419                 debug_scanned++;
1420
1421                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1422
1423                 if (us < dlm_config.ci_waitwarn_us)
1424                         continue;
1425
1426                 lkb->lkb_wait_time = zero;
1427
1428                 debug_expired++;
1429                 if (us > debug_maxus)
1430                         debug_maxus = us;
1431
1432                 if (!num_nodes) {
1433                         num_nodes = ls->ls_num_nodes;
1434                         warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
1435                 }
1436                 if (!warned)
1437                         continue;
1438                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1439                         continue;
1440
1441                 log_error(ls, "waitwarn %x %lld %d us check connection to "
1442                           "node %d", lkb->lkb_id, (long long)us,
1443                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1444         }
1445         mutex_unlock(&ls->ls_waiters_mutex);
1446         kfree(warned);
1447
1448         if (debug_expired)
1449                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1450                           debug_scanned, debug_expired,
1451                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1452 }
1453
1454 /* add/remove lkb from global waiters list of lkb's waiting for
1455    a reply from a remote node */
1456
1457 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1458 {
1459         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1460         int error = 0;
1461
1462         mutex_lock(&ls->ls_waiters_mutex);
1463
1464         if (is_overlap_unlock(lkb) ||
1465             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1466                 error = -EINVAL;
1467                 goto out;
1468         }
1469
1470         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1471                 switch (mstype) {
1472                 case DLM_MSG_UNLOCK:
1473                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1474                         break;
1475                 case DLM_MSG_CANCEL:
1476                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1477                         break;
1478                 default:
1479                         error = -EBUSY;
1480                         goto out;
1481                 }
1482                 lkb->lkb_wait_count++;
1483                 hold_lkb(lkb);
1484
1485                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1486                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1487                           lkb->lkb_wait_count, lkb->lkb_flags);
1488                 goto out;
1489         }
1490
1491         DLM_ASSERT(!lkb->lkb_wait_count,
1492                    dlm_print_lkb(lkb);
1493                    printk("wait_count %d\n", lkb->lkb_wait_count););
1494
1495         lkb->lkb_wait_count++;
1496         lkb->lkb_wait_type = mstype;
1497         lkb->lkb_wait_time = ktime_get();
1498         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1499         hold_lkb(lkb);
1500         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1501  out:
1502         if (error)
1503                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1504                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1505                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1506         mutex_unlock(&ls->ls_waiters_mutex);
1507         return error;
1508 }
1509
1510 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1511    list as part of process_requestqueue (e.g. a lookup that has an optimized
1512    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1513    set RESEND and dlm_recover_waiters_post() */
1514
1515 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1516                                 struct dlm_message *ms)
1517 {
1518         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1519         int overlap_done = 0;
1520
1521         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1522                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1523                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1524                 overlap_done = 1;
1525                 goto out_del;
1526         }
1527
1528         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1529                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1530                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1531                 overlap_done = 1;
1532                 goto out_del;
1533         }
1534
1535         /* Cancel state was preemptively cleared by a successful convert,
1536            see next comment, nothing to do. */
1537
1538         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1539             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1540                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1541                           lkb->lkb_id, lkb->lkb_wait_type);
1542                 return -1;
1543         }
1544
1545         /* Remove for the convert reply, and premptively remove for the
1546            cancel reply.  A convert has been granted while there's still
1547            an outstanding cancel on it (the cancel is moot and the result
1548            in the cancel reply should be 0).  We preempt the cancel reply
1549            because the app gets the convert result and then can follow up
1550            with another op, like convert.  This subsequent op would see the
1551            lingering state of the cancel and fail with -EBUSY. */
1552
1553         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1554             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1555             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1556                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1557                           lkb->lkb_id);
1558                 lkb->lkb_wait_type = 0;
1559                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1560                 lkb->lkb_wait_count--;
1561                 goto out_del;
1562         }
1563
1564         /* N.B. type of reply may not always correspond to type of original
1565            msg due to lookup->request optimization, verify others? */
1566
1567         if (lkb->lkb_wait_type) {
1568                 lkb->lkb_wait_type = 0;
1569                 goto out_del;
1570         }
1571
1572         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1573                   lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1574                   mstype, lkb->lkb_flags);
1575         return -1;
1576
1577  out_del:
1578         /* the force-unlock/cancel has completed and we haven't recvd a reply
1579            to the op that was in progress prior to the unlock/cancel; we
1580            give up on any reply to the earlier op.  FIXME: not sure when/how
1581            this would happen */
1582
1583         if (overlap_done && lkb->lkb_wait_type) {
1584                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1585                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1586                 lkb->lkb_wait_count--;
1587                 lkb->lkb_wait_type = 0;
1588         }
1589
1590         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1591
1592         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1593         lkb->lkb_wait_count--;
1594         if (!lkb->lkb_wait_count)
1595                 list_del_init(&lkb->lkb_wait_reply);
1596         unhold_lkb(lkb);
1597         return 0;
1598 }
1599
1600 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1601 {
1602         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1603         int error;
1604
1605         mutex_lock(&ls->ls_waiters_mutex);
1606         error = _remove_from_waiters(lkb, mstype, NULL);
1607         mutex_unlock(&ls->ls_waiters_mutex);
1608         return error;
1609 }
1610
1611 /* Handles situations where we might be processing a "fake" or "stub" reply in
1612    which we can't try to take waiters_mutex again. */
1613
1614 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1615 {
1616         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1617         int error;
1618
1619         if (ms->m_flags != DLM_IFL_STUB_MS)
1620                 mutex_lock(&ls->ls_waiters_mutex);
1621         error = _remove_from_waiters(lkb, ms->m_type, ms);
1622         if (ms->m_flags != DLM_IFL_STUB_MS)
1623                 mutex_unlock(&ls->ls_waiters_mutex);
1624         return error;
1625 }
1626
1627 /* FIXME: make this more efficient */
1628
1629 static int shrink_bucket(struct dlm_ls *ls, int b)
1630 {
1631         struct rb_node *n;
1632         struct dlm_rsb *r;
1633         int our_nodeid = dlm_our_nodeid();
1634         int count = 0, found;
1635
1636         for (;;) {
1637                 found = 0;
1638                 spin_lock(&ls->ls_rsbtbl[b].lock);
1639                 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1640                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1641
1642                         /* If we're the directory record for this rsb, and
1643                            we're not the master of it, then we need to wait
1644                            for the master node to send us a dir remove for
1645                            before removing the dir record. */
1646
1647                         if (!dlm_no_directory(ls) && !is_master(r) &&
1648                             (dlm_dir_nodeid(r) == our_nodeid)) {
1649                                 continue;
1650                         }
1651
1652                         if (!time_after_eq(jiffies, r->res_toss_time +
1653                                            dlm_config.ci_toss_secs * HZ))
1654                                 continue;
1655                         found = 1;
1656                         break;
1657                 }
1658
1659                 if (!found) {
1660                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1661                         break;
1662                 }
1663
1664                 if (kref_put(&r->res_ref, kill_rsb)) {
1665                         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1666                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1667
1668                         /* We're the master of this rsb but we're not
1669                            the directory record, so we need to tell the
1670                            dir node to remove the dir record. */
1671
1672                         if (!dlm_no_directory(ls) && is_master(r) &&
1673                             (dlm_dir_nodeid(r) != our_nodeid)) {
1674                                 send_remove(r);
1675                         }
1676
1677                         dlm_free_rsb(r);
1678                         count++;
1679                 } else {
1680                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1681                         log_error(ls, "tossed rsb in use %s", r->res_name);
1682                 }
1683         }
1684
1685         return count;
1686 }
1687
1688 void dlm_scan_rsbs(struct dlm_ls *ls)
1689 {
1690         int i;
1691
1692         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1693                 shrink_bucket(ls, i);
1694                 if (dlm_locking_stopped(ls))
1695                         break;
1696                 cond_resched();
1697         }
1698 }
1699
1700 static void add_timeout(struct dlm_lkb *lkb)
1701 {
1702         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1703
1704         if (is_master_copy(lkb))
1705                 return;
1706
1707         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1708             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1709                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1710                 goto add_it;
1711         }
1712         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1713                 goto add_it;
1714         return;
1715
1716  add_it:
1717         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1718         mutex_lock(&ls->ls_timeout_mutex);
1719         hold_lkb(lkb);
1720         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1721         mutex_unlock(&ls->ls_timeout_mutex);
1722 }
1723
1724 static void del_timeout(struct dlm_lkb *lkb)
1725 {
1726         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1727
1728         mutex_lock(&ls->ls_timeout_mutex);
1729         if (!list_empty(&lkb->lkb_time_list)) {
1730                 list_del_init(&lkb->lkb_time_list);
1731                 unhold_lkb(lkb);
1732         }
1733         mutex_unlock(&ls->ls_timeout_mutex);
1734 }
1735
1736 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1737    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1738    and then lock rsb because of lock ordering in add_timeout.  We may need
1739    to specify some special timeout-related bits in the lkb that are just to
1740    be accessed under the timeout_mutex. */
1741
1742 void dlm_scan_timeout(struct dlm_ls *ls)
1743 {
1744         struct dlm_rsb *r;
1745         struct dlm_lkb *lkb;
1746         int do_cancel, do_warn;
1747         s64 wait_us;
1748
1749         for (;;) {
1750                 if (dlm_locking_stopped(ls))
1751                         break;
1752
1753                 do_cancel = 0;
1754                 do_warn = 0;
1755                 mutex_lock(&ls->ls_timeout_mutex);
1756                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1757
1758                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1759                                                         lkb->lkb_timestamp));
1760
1761                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1762                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1763                                 do_cancel = 1;
1764
1765                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1766                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1767                                 do_warn = 1;
1768
1769                         if (!do_cancel && !do_warn)
1770                                 continue;
1771                         hold_lkb(lkb);
1772                         break;
1773                 }
1774                 mutex_unlock(&ls->ls_timeout_mutex);
1775
1776                 if (!do_cancel && !do_warn)
1777                         break;
1778
1779                 r = lkb->lkb_resource;
1780                 hold_rsb(r);
1781                 lock_rsb(r);
1782
1783                 if (do_warn) {
1784                         /* clear flag so we only warn once */
1785                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1786                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1787                                 del_timeout(lkb);
1788                         dlm_timeout_warn(lkb);
1789                 }
1790
1791                 if (do_cancel) {
1792                         log_debug(ls, "timeout cancel %x node %d %s",
1793                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1794                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1795                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1796                         del_timeout(lkb);
1797                         _cancel_lock(r, lkb);
1798                 }
1799
1800                 unlock_rsb(r);
1801                 unhold_rsb(r);
1802                 dlm_put_lkb(lkb);
1803         }
1804 }
1805
1806 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1807    dlm_recoverd before checking/setting ls_recover_begin. */
1808
1809 void dlm_adjust_timeouts(struct dlm_ls *ls)
1810 {
1811         struct dlm_lkb *lkb;
1812         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1813
1814         ls->ls_recover_begin = 0;
1815         mutex_lock(&ls->ls_timeout_mutex);
1816         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1817                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1818         mutex_unlock(&ls->ls_timeout_mutex);
1819
1820         if (!dlm_config.ci_waitwarn_us)
1821                 return;
1822
1823         mutex_lock(&ls->ls_waiters_mutex);
1824         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1825                 if (ktime_to_us(lkb->lkb_wait_time))
1826                         lkb->lkb_wait_time = ktime_get();
1827         }
1828         mutex_unlock(&ls->ls_waiters_mutex);
1829 }
1830
1831 /* lkb is master or local copy */
1832
1833 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1834 {
1835         int b, len = r->res_ls->ls_lvblen;
1836
1837         /* b=1 lvb returned to caller
1838            b=0 lvb written to rsb or invalidated
1839            b=-1 do nothing */
1840
1841         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1842
1843         if (b == 1) {
1844                 if (!lkb->lkb_lvbptr)
1845                         return;
1846
1847                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1848                         return;
1849
1850                 if (!r->res_lvbptr)
1851                         return;
1852
1853                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1854                 lkb->lkb_lvbseq = r->res_lvbseq;
1855
1856         } else if (b == 0) {
1857                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1858                         rsb_set_flag(r, RSB_VALNOTVALID);
1859                         return;
1860                 }
1861
1862                 if (!lkb->lkb_lvbptr)
1863                         return;
1864
1865                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1866                         return;
1867
1868                 if (!r->res_lvbptr)
1869                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1870
1871                 if (!r->res_lvbptr)
1872                         return;
1873
1874                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1875                 r->res_lvbseq++;
1876                 lkb->lkb_lvbseq = r->res_lvbseq;
1877                 rsb_clear_flag(r, RSB_VALNOTVALID);
1878         }
1879
1880         if (rsb_flag(r, RSB_VALNOTVALID))
1881                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1882 }
1883
1884 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1885 {
1886         if (lkb->lkb_grmode < DLM_LOCK_PW)
1887                 return;
1888
1889         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1890                 rsb_set_flag(r, RSB_VALNOTVALID);
1891                 return;
1892         }
1893
1894         if (!lkb->lkb_lvbptr)
1895                 return;
1896
1897         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1898                 return;
1899
1900         if (!r->res_lvbptr)
1901                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1902
1903         if (!r->res_lvbptr)
1904                 return;
1905
1906         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1907         r->res_lvbseq++;
1908         rsb_clear_flag(r, RSB_VALNOTVALID);
1909 }
1910
1911 /* lkb is process copy (pc) */
1912
1913 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1914                             struct dlm_message *ms)
1915 {
1916         int b;
1917
1918         if (!lkb->lkb_lvbptr)
1919                 return;
1920
1921         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1922                 return;
1923
1924         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1925         if (b == 1) {
1926                 int len = receive_extralen(ms);
1927                 if (len > DLM_RESNAME_MAXLEN)
1928                         len = DLM_RESNAME_MAXLEN;
1929                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1930                 lkb->lkb_lvbseq = ms->m_lvbseq;
1931         }
1932 }
1933
1934 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1935    remove_lock -- used for unlock, removes lkb from granted
1936    revert_lock -- used for cancel, moves lkb from convert to granted
1937    grant_lock  -- used for request and convert, adds lkb to granted or
1938                   moves lkb from convert or waiting to granted
1939
1940    Each of these is used for master or local copy lkb's.  There is
1941    also a _pc() variation used to make the corresponding change on
1942    a process copy (pc) lkb. */
1943
1944 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1945 {
1946         del_lkb(r, lkb);
1947         lkb->lkb_grmode = DLM_LOCK_IV;
1948         /* this unhold undoes the original ref from create_lkb()
1949            so this leads to the lkb being freed */
1950         unhold_lkb(lkb);
1951 }
1952
1953 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1954 {
1955         set_lvb_unlock(r, lkb);
1956         _remove_lock(r, lkb);
1957 }
1958
1959 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1960 {
1961         _remove_lock(r, lkb);
1962 }
1963
1964 /* returns: 0 did nothing
1965             1 moved lock to granted
1966            -1 removed lock */
1967
1968 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1969 {
1970         int rv = 0;
1971
1972         lkb->lkb_rqmode = DLM_LOCK_IV;
1973
1974         switch (lkb->lkb_status) {
1975         case DLM_LKSTS_GRANTED:
1976                 break;
1977         case DLM_LKSTS_CONVERT:
1978                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1979                 rv = 1;
1980                 break;
1981         case DLM_LKSTS_WAITING:
1982                 del_lkb(r, lkb);
1983                 lkb->lkb_grmode = DLM_LOCK_IV;
1984                 /* this unhold undoes the original ref from create_lkb()
1985                    so this leads to the lkb being freed */
1986                 unhold_lkb(lkb);
1987                 rv = -1;
1988                 break;
1989         default:
1990                 log_print("invalid status for revert %d", lkb->lkb_status);
1991         }
1992         return rv;
1993 }
1994
1995 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1996 {
1997         return revert_lock(r, lkb);
1998 }
1999
2000 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2001 {
2002         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2003                 lkb->lkb_grmode = lkb->lkb_rqmode;
2004                 if (lkb->lkb_status)
2005                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2006                 else
2007                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2008         }
2009
2010         lkb->lkb_rqmode = DLM_LOCK_IV;
2011         lkb->lkb_highbast = 0;
2012 }
2013
2014 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2015 {
2016         set_lvb_lock(r, lkb);
2017         _grant_lock(r, lkb);
2018 }
2019
2020 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2021                           struct dlm_message *ms)
2022 {
2023         set_lvb_lock_pc(r, lkb, ms);
2024         _grant_lock(r, lkb);
2025 }
2026
2027 /* called by grant_pending_locks() which means an async grant message must
2028    be sent to the requesting node in addition to granting the lock if the
2029    lkb belongs to a remote node. */
2030
2031 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2032 {
2033         grant_lock(r, lkb);
2034         if (is_master_copy(lkb))
2035                 send_grant(r, lkb);
2036         else
2037                 queue_cast(r, lkb, 0);
2038 }
2039
2040 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2041    change the granted/requested modes.  We're munging things accordingly in
2042    the process copy.
2043    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2044    conversion deadlock
2045    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2046    compatible with other granted locks */
2047
2048 static void munge_demoted(struct dlm_lkb *lkb)
2049 {
2050         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2051                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2052                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2053                 return;
2054         }
2055
2056         lkb->lkb_grmode = DLM_LOCK_NL;
2057 }
2058
2059 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2060 {
2061         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2062             ms->m_type != DLM_MSG_GRANT) {
2063                 log_print("munge_altmode %x invalid reply type %d",
2064                           lkb->lkb_id, ms->m_type);
2065                 return;
2066         }
2067
2068         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2069                 lkb->lkb_rqmode = DLM_LOCK_PR;
2070         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2071                 lkb->lkb_rqmode = DLM_LOCK_CW;
2072         else {
2073                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2074                 dlm_print_lkb(lkb);
2075         }
2076 }
2077
2078 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2079 {
2080         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2081                                            lkb_statequeue);
2082         if (lkb->lkb_id == first->lkb_id)
2083                 return 1;
2084
2085         return 0;
2086 }
2087
2088 /* Check if the given lkb conflicts with another lkb on the queue. */
2089
2090 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2091 {
2092         struct dlm_lkb *this;
2093
2094         list_for_each_entry(this, head, lkb_statequeue) {
2095                 if (this == lkb)
2096                         continue;
2097                 if (!modes_compat(this, lkb))
2098                         return 1;
2099         }
2100         return 0;
2101 }
2102
2103 /*
2104  * "A conversion deadlock arises with a pair of lock requests in the converting
2105  * queue for one resource.  The granted mode of each lock blocks the requested
2106  * mode of the other lock."
2107  *
2108  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2109  * convert queue from being granted, then deadlk/demote lkb.
2110  *
2111  * Example:
2112  * Granted Queue: empty
2113  * Convert Queue: NL->EX (first lock)
2114  *                PR->EX (second lock)
2115  *
2116  * The first lock can't be granted because of the granted mode of the second
2117  * lock and the second lock can't be granted because it's not first in the
2118  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2119  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2120  * flag set and return DEMOTED in the lksb flags.
2121  *
2122  * Originally, this function detected conv-deadlk in a more limited scope:
2123  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2124  * - if lkb1 was the first entry in the queue (not just earlier), and was
2125  *   blocked by the granted mode of lkb2, and there was nothing on the
2126  *   granted queue preventing lkb1 from being granted immediately, i.e.
2127  *   lkb2 was the only thing preventing lkb1 from being granted.
2128  *
2129  * That second condition meant we'd only say there was conv-deadlk if
2130  * resolving it (by demotion) would lead to the first lock on the convert
2131  * queue being granted right away.  It allowed conversion deadlocks to exist
2132  * between locks on the convert queue while they couldn't be granted anyway.
2133  *
2134  * Now, we detect and take action on conversion deadlocks immediately when
2135  * they're created, even if they may not be immediately consequential.  If
2136  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2137  * mode that would prevent lkb1's conversion from being granted, we do a
2138  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2139  * I think this means that the lkb_is_ahead condition below should always
2140  * be zero, i.e. there will never be conv-deadlk between two locks that are
2141  * both already on the convert queue.
2142  */
2143
2144 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2145 {
2146         struct dlm_lkb *lkb1;
2147         int lkb_is_ahead = 0;
2148
2149         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2150                 if (lkb1 == lkb2) {
2151                         lkb_is_ahead = 1;
2152                         continue;
2153                 }
2154
2155                 if (!lkb_is_ahead) {
2156                         if (!modes_compat(lkb2, lkb1))
2157                                 return 1;
2158                 } else {
2159                         if (!modes_compat(lkb2, lkb1) &&
2160                             !modes_compat(lkb1, lkb2))
2161                                 return 1;
2162                 }
2163         }
2164         return 0;
2165 }
2166
2167 /*
2168  * Return 1 if the lock can be granted, 0 otherwise.
2169  * Also detect and resolve conversion deadlocks.
2170  *
2171  * lkb is the lock to be granted
2172  *
2173  * now is 1 if the function is being called in the context of the
2174  * immediate request, it is 0 if called later, after the lock has been
2175  * queued.
2176  *
2177  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2178  */
2179
2180 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
2181 {
2182         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2183
2184         /*
2185          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2186          * a new request for a NL mode lock being blocked.
2187          *
2188          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2189          * request, then it would be granted.  In essence, the use of this flag
2190          * tells the Lock Manager to expedite theis request by not considering
2191          * what may be in the CONVERTING or WAITING queues...  As of this
2192          * writing, the EXPEDITE flag can be used only with new requests for NL
2193          * mode locks.  This flag is not valid for conversion requests.
2194          *
2195          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2196          * conversion or used with a non-NL requested mode.  We also know an
2197          * EXPEDITE request is always granted immediately, so now must always
2198          * be 1.  The full condition to grant an expedite request: (now &&
2199          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2200          * therefore be shortened to just checking the flag.
2201          */
2202
2203         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2204                 return 1;
2205
2206         /*
2207          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2208          * added to the remaining conditions.
2209          */
2210
2211         if (queue_conflict(&r->res_grantqueue, lkb))
2212                 goto out;
2213
2214         /*
2215          * 6-3: By default, a conversion request is immediately granted if the
2216          * requested mode is compatible with the modes of all other granted
2217          * locks
2218          */
2219
2220         if (queue_conflict(&r->res_convertqueue, lkb))
2221                 goto out;
2222
2223         /*
2224          * 6-5: But the default algorithm for deciding whether to grant or
2225          * queue conversion requests does not by itself guarantee that such
2226          * requests are serviced on a "first come first serve" basis.  This, in
2227          * turn, can lead to a phenomenon known as "indefinate postponement".
2228          *
2229          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2230          * the system service employed to request a lock conversion.  This flag
2231          * forces certain conversion requests to be queued, even if they are
2232          * compatible with the granted modes of other locks on the same
2233          * resource.  Thus, the use of this flag results in conversion requests
2234          * being ordered on a "first come first servce" basis.
2235          *
2236          * DCT: This condition is all about new conversions being able to occur
2237          * "in place" while the lock remains on the granted queue (assuming
2238          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2239          * doesn't _have_ to go onto the convert queue where it's processed in
2240          * order.  The "now" variable is necessary to distinguish converts
2241          * being received and processed for the first time now, because once a
2242          * convert is moved to the conversion queue the condition below applies
2243          * requiring fifo granting.
2244          */
2245
2246         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2247                 return 1;
2248
2249         /*
2250          * Even if the convert is compat with all granted locks,
2251          * QUECVT forces it behind other locks on the convert queue.
2252          */
2253
2254         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2255                 if (list_empty(&r->res_convertqueue))
2256                         return 1;
2257                 else
2258                         goto out;
2259         }
2260
2261         /*
2262          * The NOORDER flag is set to avoid the standard vms rules on grant
2263          * order.
2264          */
2265
2266         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2267                 return 1;
2268
2269         /*
2270          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2271          * granted until all other conversion requests ahead of it are granted
2272          * and/or canceled.
2273          */
2274
2275         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2276                 return 1;
2277
2278         /*
2279          * 6-4: By default, a new request is immediately granted only if all
2280          * three of the following conditions are satisfied when the request is
2281          * issued:
2282          * - The queue of ungranted conversion requests for the resource is
2283          *   empty.
2284          * - The queue of ungranted new requests for the resource is empty.
2285          * - The mode of the new request is compatible with the most
2286          *   restrictive mode of all granted locks on the resource.
2287          */
2288
2289         if (now && !conv && list_empty(&r->res_convertqueue) &&
2290             list_empty(&r->res_waitqueue))
2291                 return 1;
2292
2293         /*
2294          * 6-4: Once a lock request is in the queue of ungranted new requests,
2295          * it cannot be granted until the queue of ungranted conversion
2296          * requests is empty, all ungranted new requests ahead of it are
2297          * granted and/or canceled, and it is compatible with the granted mode
2298          * of the most restrictive lock granted on the resource.
2299          */
2300
2301         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2302             first_in_list(lkb, &r->res_waitqueue))
2303                 return 1;
2304  out:
2305         return 0;
2306 }
2307
2308 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2309                           int *err)
2310 {
2311         int rv;
2312         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2313         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2314
2315         if (err)
2316                 *err = 0;
2317
2318         rv = _can_be_granted(r, lkb, now);
2319         if (rv)
2320                 goto out;
2321
2322         /*
2323          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2324          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2325          * cancels one of the locks.
2326          */
2327
2328         if (is_convert && can_be_queued(lkb) &&
2329             conversion_deadlock_detect(r, lkb)) {
2330                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2331                         lkb->lkb_grmode = DLM_LOCK_NL;
2332                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2333                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2334                         if (err)
2335                                 *err = -EDEADLK;
2336                         else {
2337                                 log_print("can_be_granted deadlock %x now %d",
2338                                           lkb->lkb_id, now);
2339                                 dlm_dump_rsb(r);
2340                         }
2341                 }
2342                 goto out;
2343         }
2344
2345         /*
2346          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2347          * to grant a request in a mode other than the normal rqmode.  It's a
2348          * simple way to provide a big optimization to applications that can
2349          * use them.
2350          */
2351
2352         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2353                 alt = DLM_LOCK_PR;
2354         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2355                 alt = DLM_LOCK_CW;
2356
2357         if (alt) {
2358                 lkb->lkb_rqmode = alt;
2359                 rv = _can_be_granted(r, lkb, now);
2360                 if (rv)
2361                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2362                 else
2363                         lkb->lkb_rqmode = rqmode;
2364         }
2365  out:
2366         return rv;
2367 }
2368
2369 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
2370    for locks pending on the convert list.  Once verified (watch for these
2371    log_prints), we should be able to just call _can_be_granted() and not
2372    bother with the demote/deadlk cases here (and there's no easy way to deal
2373    with a deadlk here, we'd have to generate something like grant_lock with
2374    the deadlk error.) */
2375
2376 /* Returns the highest requested mode of all blocked conversions; sets
2377    cw if there's a blocked conversion to DLM_LOCK_CW. */
2378
2379 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2380                                  unsigned int *count)
2381 {
2382         struct dlm_lkb *lkb, *s;
2383         int hi, demoted, quit, grant_restart, demote_restart;
2384         int deadlk;
2385
2386         quit = 0;
2387  restart:
2388         grant_restart = 0;
2389         demote_restart = 0;
2390         hi = DLM_LOCK_IV;
2391
2392         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2393                 demoted = is_demoted(lkb);
2394                 deadlk = 0;
2395
2396                 if (can_be_granted(r, lkb, 0, &deadlk)) {
2397                         grant_lock_pending(r, lkb);
2398                         grant_restart = 1;
2399                         if (count)
2400                                 (*count)++;
2401                         continue;
2402                 }
2403
2404                 if (!demoted && is_demoted(lkb)) {
2405                         log_print("WARN: pending demoted %x node %d %s",
2406                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2407                         demote_restart = 1;
2408                         continue;
2409                 }
2410
2411                 if (deadlk) {
2412                         log_print("WARN: pending deadlock %x node %d %s",
2413                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2414                         dlm_dump_rsb(r);
2415                         continue;
2416                 }
2417
2418                 hi = max_t(int, lkb->lkb_rqmode, hi);
2419
2420                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2421                         *cw = 1;
2422         }
2423
2424         if (grant_restart)
2425                 goto restart;
2426         if (demote_restart && !quit) {
2427                 quit = 1;
2428                 goto restart;
2429         }
2430
2431         return max_t(int, high, hi);
2432 }
2433
2434 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2435                               unsigned int *count)
2436 {
2437         struct dlm_lkb *lkb, *s;
2438
2439         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2440                 if (can_be_granted(r, lkb, 0, NULL)) {
2441                         grant_lock_pending(r, lkb);
2442                         if (count)
2443                                 (*count)++;
2444                 } else {
2445                         high = max_t(int, lkb->lkb_rqmode, high);
2446                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2447                                 *cw = 1;
2448                 }
2449         }
2450
2451         return high;
2452 }
2453
2454 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2455    on either the convert or waiting queue.
2456    high is the largest rqmode of all locks blocked on the convert or
2457    waiting queue. */
2458
2459 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2460 {
2461         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2462                 if (gr->lkb_highbast < DLM_LOCK_EX)
2463                         return 1;
2464                 return 0;
2465         }
2466
2467         if (gr->lkb_highbast < high &&
2468             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2469                 return 1;
2470         return 0;
2471 }
2472
2473 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2474 {
2475         struct dlm_lkb *lkb, *s;
2476         int high = DLM_LOCK_IV;
2477         int cw = 0;
2478
2479         if (!is_master(r)) {
2480                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2481                 dlm_dump_rsb(r);
2482                 return;
2483         }
2484
2485         high = grant_pending_convert(r, high, &cw, count);
2486         high = grant_pending_wait(r, high, &cw, count);
2487
2488         if (high == DLM_LOCK_IV)
2489                 return;
2490
2491         /*
2492          * If there are locks left on the wait/convert queue then send blocking
2493          * ASTs to granted locks based on the largest requested mode (high)
2494          * found above.
2495          */
2496
2497         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2498                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2499                         if (cw && high == DLM_LOCK_PR &&
2500                             lkb->lkb_grmode == DLM_LOCK_PR)
2501                                 queue_bast(r, lkb, DLM_LOCK_CW);
2502                         else
2503                                 queue_bast(r, lkb, high);
2504                         lkb->lkb_highbast = high;
2505                 }
2506         }
2507 }
2508
2509 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2510 {
2511         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2512             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2513                 if (gr->lkb_highbast < DLM_LOCK_EX)
2514                         return 1;
2515                 return 0;
2516         }
2517
2518         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2519                 return 1;
2520         return 0;
2521 }
2522
2523 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2524                             struct dlm_lkb *lkb)
2525 {
2526         struct dlm_lkb *gr;
2527
2528         list_for_each_entry(gr, head, lkb_statequeue) {
2529                 /* skip self when sending basts to convertqueue */
2530                 if (gr == lkb)
2531                         continue;
2532                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2533                         queue_bast(r, gr, lkb->lkb_rqmode);
2534                         gr->lkb_highbast = lkb->lkb_rqmode;
2535                 }
2536         }
2537 }
2538
2539 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2540 {
2541         send_bast_queue(r, &r->res_grantqueue, lkb);
2542 }
2543
2544 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2545 {
2546         send_bast_queue(r, &r->res_grantqueue, lkb);
2547         send_bast_queue(r, &r->res_convertqueue, lkb);
2548 }
2549
2550 /* set_master(r, lkb) -- set the master nodeid of a resource
2551
2552    The purpose of this function is to set the nodeid field in the given
2553    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2554    known, it can just be copied to the lkb and the function will return
2555    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2556    before it can be copied to the lkb.
2557
2558    When the rsb nodeid is being looked up remotely, the initial lkb
2559    causing the lookup is kept on the ls_waiters list waiting for the
2560    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2561    on the rsb's res_lookup list until the master is verified.
2562
2563    Return values:
2564    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2565    1: the rsb master is not available and the lkb has been placed on
2566       a wait queue
2567 */
2568
2569 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2570 {
2571         int our_nodeid = dlm_our_nodeid();
2572
2573         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2574                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2575                 r->res_first_lkid = lkb->lkb_id;
2576                 lkb->lkb_nodeid = r->res_nodeid;
2577                 return 0;
2578         }
2579
2580         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2581                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2582                 return 1;
2583         }
2584
2585         if (r->res_master_nodeid == our_nodeid) {
2586                 lkb->lkb_nodeid = 0;
2587                 return 0;
2588         }
2589
2590         if (r->res_master_nodeid) {
2591                 lkb->lkb_nodeid = r->res_master_nodeid;
2592                 return 0;
2593         }
2594
2595         if (dlm_dir_nodeid(r) == our_nodeid) {
2596                 /* This is a somewhat unusual case; find_rsb will usually
2597                    have set res_master_nodeid when dir nodeid is local, but
2598                    there are cases where we become the dir node after we've
2599                    past find_rsb and go through _request_lock again.
2600                    confirm_master() or process_lookup_list() needs to be
2601                    called after this. */
2602                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2603                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2604                           r->res_name);
2605                 r->res_master_nodeid = our_nodeid;
2606                 r->res_nodeid = 0;
2607                 lkb->lkb_nodeid = 0;
2608                 return 0;
2609         }
2610
2611         r->res_first_lkid = lkb->lkb_id;
2612         send_lookup(r, lkb);
2613         return 1;
2614 }
2615
2616 static void process_lookup_list(struct dlm_rsb *r)
2617 {
2618         struct dlm_lkb *lkb, *safe;
2619
2620         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2621                 list_del_init(&lkb->lkb_rsb_lookup);
2622                 _request_lock(r, lkb);
2623                 schedule();
2624         }
2625 }
2626
2627 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2628
2629 static void confirm_master(struct dlm_rsb *r, int error)
2630 {
2631         struct dlm_lkb *lkb;
2632
2633         if (!r->res_first_lkid)
2634                 return;
2635
2636         switch (error) {
2637         case 0:
2638         case -EINPROGRESS:
2639                 r->res_first_lkid = 0;
2640                 process_lookup_list(r);
2641                 break;
2642
2643         case -EAGAIN:
2644         case -EBADR:
2645         case -ENOTBLK:
2646                 /* the remote request failed and won't be retried (it was
2647                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2648                    lkb the first_lkid */
2649
2650                 r->res_first_lkid = 0;
2651
2652                 if (!list_empty(&r->res_lookup)) {
2653                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2654                                          lkb_rsb_lookup);
2655                         list_del_init(&lkb->lkb_rsb_lookup);
2656                         r->res_first_lkid = lkb->lkb_id;
2657                         _request_lock(r, lkb);
2658                 }
2659                 break;
2660
2661         default:
2662                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2663         }
2664 }
2665
2666 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2667                          int namelen, unsigned long timeout_cs,
2668                          void (*ast) (void *astparam),
2669                          void *astparam,
2670                          void (*bast) (void *astparam, int mode),
2671                          struct dlm_args *args)
2672 {
2673         int rv = -EINVAL;
2674
2675         /* check for invalid arg usage */
2676
2677         if (mode < 0 || mode > DLM_LOCK_EX)
2678                 goto out;
2679
2680         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2681                 goto out;
2682
2683         if (flags & DLM_LKF_CANCEL)
2684                 goto out;
2685
2686         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2687                 goto out;
2688
2689         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2690                 goto out;
2691
2692         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2693                 goto out;
2694
2695         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2696                 goto out;
2697
2698         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2699                 goto out;
2700
2701         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2702                 goto out;
2703
2704         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2705                 goto out;
2706
2707         if (!ast || !lksb)
2708                 goto out;
2709
2710         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2711                 goto out;
2712
2713         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2714                 goto out;
2715
2716         /* these args will be copied to the lkb in validate_lock_args,
2717            it cannot be done now because when converting locks, fields in
2718            an active lkb cannot be modified before locking the rsb */
2719
2720         args->flags = flags;
2721         args->astfn = ast;
2722         args->astparam = astparam;
2723         args->bastfn = bast;
2724         args->timeout = timeout_cs;
2725         args->mode = mode;
2726         args->lksb = lksb;
2727         rv = 0;
2728  out:
2729         return rv;
2730 }
2731
2732 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2733 {
2734         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2735                       DLM_LKF_FORCEUNLOCK))
2736                 return -EINVAL;
2737
2738         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2739                 return -EINVAL;
2740
2741         args->flags = flags;
2742         args->astparam = astarg;
2743         return 0;
2744 }
2745
2746 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2747                               struct dlm_args *args)
2748 {
2749         int rv = -EINVAL;
2750
2751         if (args->flags & DLM_LKF_CONVERT) {
2752                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2753                         goto out;
2754
2755                 if (args->flags & DLM_LKF_QUECVT &&
2756                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2757                         goto out;
2758
2759                 rv = -EBUSY;
2760                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2761                         goto out;
2762
2763                 if (lkb->lkb_wait_type)
2764                         goto out;
2765
2766                 if (is_overlap(lkb))
2767                         goto out;
2768         }
2769
2770         lkb->lkb_exflags = args->flags;
2771         lkb->lkb_sbflags = 0;
2772         lkb->lkb_astfn = args->astfn;
2773         lkb->lkb_astparam = args->astparam;
2774         lkb->lkb_bastfn = args->bastfn;
2775         lkb->lkb_rqmode = args->mode;
2776         lkb->lkb_lksb = args->lksb;
2777         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2778         lkb->lkb_ownpid = (int) current->pid;
2779         lkb->lkb_timeout_cs = args->timeout;
2780         rv = 0;
2781  out:
2782         if (rv)
2783                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2784                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2785                           lkb->lkb_status, lkb->lkb_wait_type,
2786                           lkb->lkb_resource->res_name);
2787         return rv;
2788 }
2789
2790 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2791    for success */
2792
2793 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2794    because there may be a lookup in progress and it's valid to do
2795    cancel/unlockf on it */
2796
2797 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2798 {
2799         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2800         int rv = -EINVAL;
2801
2802         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2803                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2804                 dlm_print_lkb(lkb);
2805                 goto out;
2806         }
2807
2808         /* an lkb may still exist even though the lock is EOL'ed due to a
2809            cancel, unlock or failed noqueue request; an app can't use these
2810            locks; return same error as if the lkid had not been found at all */
2811
2812         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2813                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2814                 rv = -ENOENT;
2815                 goto out;
2816         }
2817
2818         /* an lkb may be waiting for an rsb lookup to complete where the
2819            lookup was initiated by another lock */
2820
2821         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2822                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2823                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2824                         list_del_init(&lkb->lkb_rsb_lookup);
2825                         queue_cast(lkb->lkb_resource, lkb,
2826                                    args->flags & DLM_LKF_CANCEL ?
2827                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2828                         unhold_lkb(lkb); /* undoes create_lkb() */
2829                 }
2830                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2831                 rv = -EBUSY;
2832                 goto out;
2833         }
2834
2835         /* cancel not allowed with another cancel/unlock in progress */
2836
2837         if (args->flags & DLM_LKF_CANCEL) {
2838                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2839                         goto out;
2840
2841                 if (is_overlap(lkb))
2842                         goto out;
2843
2844                 /* don't let scand try to do a cancel */
2845                 del_timeout(lkb);
2846
2847                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2848                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2849                         rv = -EBUSY;
2850                         goto out;
2851                 }
2852
2853                 /* there's nothing to cancel */
2854                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2855                     !lkb->lkb_wait_type) {
2856                         rv = -EBUSY;
2857                         goto out;
2858                 }
2859
2860                 switch (lkb->lkb_wait_type) {
2861                 case DLM_MSG_LOOKUP:
2862                 case DLM_MSG_REQUEST:
2863                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2864                         rv = -EBUSY;
2865                         goto out;
2866                 case DLM_MSG_UNLOCK:
2867                 case DLM_MSG_CANCEL:
2868                         goto out;
2869                 }
2870                 /* add_to_waiters() will set OVERLAP_CANCEL */
2871                 goto out_ok;
2872         }
2873
2874         /* do we need to allow a force-unlock if there's a normal unlock
2875            already in progress?  in what conditions could the normal unlock
2876            fail such that we'd want to send a force-unlock to be sure? */
2877
2878         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2879                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2880                         goto out;
2881
2882                 if (is_overlap_unlock(lkb))
2883                         goto out;
2884
2885                 /* don't let scand try to do a cancel */
2886                 del_timeout(lkb);
2887
2888                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2889                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2890                         rv = -EBUSY;
2891                         goto out;
2892                 }
2893
2894                 switch (lkb->lkb_wait_type) {
2895                 case DLM_MSG_LOOKUP:
2896                 case DLM_MSG_REQUEST:
2897                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2898                         rv = -EBUSY;
2899                         goto out;
2900                 case DLM_MSG_UNLOCK:
2901                         goto out;
2902                 }
2903                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2904                 goto out_ok;
2905         }
2906
2907         /* normal unlock not allowed if there's any op in progress */
2908         rv = -EBUSY;
2909         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2910                 goto out;
2911
2912  out_ok:
2913         /* an overlapping op shouldn't blow away exflags from other op */
2914         lkb->lkb_exflags |= args->flags;
2915         lkb->lkb_sbflags = 0;
2916         lkb->lkb_astparam = args->astparam;
2917         rv = 0;
2918  out:
2919         if (rv)
2920                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2921                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2922                           args->flags, lkb->lkb_wait_type,
2923                           lkb->lkb_resource->res_name);
2924         return rv;
2925 }
2926
2927 /*
2928  * Four stage 4 varieties:
2929  * do_request(), do_convert(), do_unlock(), do_cancel()
2930  * These are called on the master node for the given lock and
2931  * from the central locking logic.
2932  */
2933
2934 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2935 {
2936         int error = 0;
2937
2938         if (can_be_granted(r, lkb, 1, NULL)) {
2939                 grant_lock(r, lkb);
2940                 queue_cast(r, lkb, 0);
2941                 goto out;
2942         }
2943
2944         if (can_be_queued(lkb)) {
2945                 error = -EINPROGRESS;
2946                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2947                 add_timeout(lkb);
2948                 goto out;
2949         }
2950
2951         error = -EAGAIN;
2952         queue_cast(r, lkb, -EAGAIN);
2953  out:
2954         return error;
2955 }
2956
2957 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2958                                int error)
2959 {
2960         switch (error) {
2961         case -EAGAIN:
2962                 if (force_blocking_asts(lkb))
2963                         send_blocking_asts_all(r, lkb);
2964                 break;
2965         case -EINPROGRESS:
2966                 send_blocking_asts(r, lkb);
2967                 break;
2968         }
2969 }
2970
2971 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2972 {
2973         int error = 0;
2974         int deadlk = 0;
2975
2976         /* changing an existing lock may allow others to be granted */
2977
2978         if (can_be_granted(r, lkb, 1, &deadlk)) {
2979                 grant_lock(r, lkb);
2980                 queue_cast(r, lkb, 0);
2981                 goto out;
2982         }
2983
2984         /* can_be_granted() detected that this lock would block in a conversion
2985            deadlock, so we leave it on the granted queue and return EDEADLK in
2986            the ast for the convert. */
2987
2988         if (deadlk) {
2989                 /* it's left on the granted queue */
2990                 revert_lock(r, lkb);
2991                 queue_cast(r, lkb, -EDEADLK);
2992                 error = -EDEADLK;
2993                 goto out;
2994         }
2995
2996         /* is_demoted() means the can_be_granted() above set the grmode
2997            to NL, and left us on the granted queue.  This auto-demotion
2998            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2999            now grantable.  We have to try to grant other converting locks
3000            before we try again to grant this one. */
3001
3002         if (is_demoted(lkb)) {
3003                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3004                 if (_can_be_granted(r, lkb, 1)) {
3005                         grant_lock(r, lkb);
3006                         queue_cast(r, lkb, 0);
3007                         goto out;
3008                 }
3009                 /* else fall through and move to convert queue */
3010         }
3011
3012         if (can_be_queued(lkb)) {
3013                 error = -EINPROGRESS;
3014                 del_lkb(r, lkb);
3015                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3016                 add_timeout(lkb);
3017                 goto out;
3018         }
3019
3020         error = -EAGAIN;
3021         queue_cast(r, lkb, -EAGAIN);
3022  out:
3023         return error;
3024 }
3025
3026 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3027                                int error)
3028 {
3029         switch (error) {
3030         case 0:
3031                 grant_pending_locks(r, NULL);
3032                 /* grant_pending_locks also sends basts */
3033                 break;
3034         case -EAGAIN:
3035                 if (force_blocking_asts(lkb))
3036                         send_blocking_asts_all(r, lkb);
3037                 break;
3038         case -EINPROGRESS:
3039                 send_blocking_asts(r, lkb);
3040                 break;
3041         }
3042 }
3043
3044 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3045 {
3046         remove_lock(r, lkb);
3047         queue_cast(r, lkb, -DLM_EUNLOCK);
3048         return -DLM_EUNLOCK;
3049 }
3050
3051 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3052                               int error)
3053 {
3054         grant_pending_locks(r, NULL);
3055 }
3056
3057 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3058
3059 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3060 {
3061         int error;
3062
3063         error = revert_lock(r, lkb);
3064         if (error) {
3065                 queue_cast(r, lkb, -DLM_ECANCEL);
3066                 return -DLM_ECANCEL;
3067         }
3068         return 0;
3069 }
3070
3071 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3072                               int error)
3073 {
3074         if (error)
3075                 grant_pending_locks(r, NULL);
3076 }
3077
3078 /*
3079  * Four stage 3 varieties:
3080  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3081  */
3082
3083 /* add a new lkb to a possibly new rsb, called by requesting process */
3084
3085 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3086 {
3087         int error;
3088
3089         /* set_master: sets lkb nodeid from r */
3090
3091         error = set_master(r, lkb);
3092         if (error < 0)
3093                 goto out;
3094         if (error) {
3095                 error = 0;
3096                 goto out;
3097         }
3098
3099         if (is_remote(r)) {
3100                 /* receive_request() calls do_request() on remote node */
3101                 error = send_request(r, lkb);
3102         } else {
3103                 error = do_request(r, lkb);
3104                 /* for remote locks the request_reply is sent
3105                    between do_request and do_request_effects */
3106                 do_request_effects(r, lkb, error);
3107         }
3108  out:
3109         return error;
3110 }
3111
3112 /* change some property of an existing lkb, e.g. mode */
3113
3114 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3115 {
3116         int error;
3117
3118         if (is_remote(r)) {
3119                 /* receive_convert() calls do_convert() on remote node */
3120                 error = send_convert(r, lkb);
3121         } else {
3122                 error = do_convert(r, lkb);
3123                 /* for remote locks the convert_reply is sent
3124                    between do_convert and do_convert_effects */
3125                 do_convert_effects(r, lkb, error);
3126         }
3127
3128         return error;
3129 }
3130
3131 /* remove an existing lkb from the granted queue */
3132
3133 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3134 {
3135         int error;
3136
3137         if (is_remote(r)) {
3138                 /* receive_unlock() calls do_unlock() on remote node */
3139                 error = send_unlock(r, lkb);
3140         } else {
3141                 error = do_unlock(r, lkb);
3142                 /* for remote locks the unlock_reply is sent
3143                    between do_unlock and do_unlock_effects */
3144                 do_unlock_effects(r, lkb, error);
3145         }
3146
3147         return error;
3148 }
3149
3150 /* remove an existing lkb from the convert or wait queue */
3151
3152 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3153 {
3154         int error;
3155
3156         if (is_remote(r)) {
3157                 /* receive_cancel() calls do_cancel() on remote node */
3158                 error = send_cancel(r, lkb);
3159         } else {
3160                 error = do_cancel(r, lkb);
3161                 /* for remote locks the cancel_reply is sent
3162                    between do_cancel and do_cancel_effects */
3163                 do_cancel_effects(r, lkb, error);
3164         }
3165
3166         return error;
3167 }
3168
3169 /*
3170  * Four stage 2 varieties:
3171  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3172  */
3173
3174 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3175                         int len, struct dlm_args *args)
3176 {
3177         struct dlm_rsb *r;
3178         int error;
3179
3180         error = validate_lock_args(ls, lkb, args);
3181         if (error)
3182                 return error;
3183
3184         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3185         if (error)
3186                 return error;
3187
3188         lock_rsb(r);
3189
3190         attach_lkb(r, lkb);
3191         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3192
3193         error = _request_lock(r, lkb);
3194
3195         unlock_rsb(r);
3196         put_rsb(r);
3197         return error;
3198 }
3199
3200 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3201                         struct dlm_args *args)
3202 {
3203         struct dlm_rsb *r;
3204         int error;
3205
3206         r = lkb->lkb_resource;
3207
3208         hold_rsb(r);
3209         lock_rsb(r);
3210
3211         error = validate_lock_args(ls, lkb, args);
3212         if (error)
3213                 goto out;
3214
3215         error = _convert_lock(r, lkb);
3216  out:
3217         unlock_rsb(r);
3218         put_rsb(r);
3219         return error;
3220 }
3221
3222 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3223                        struct dlm_args *args)
3224 {
3225         struct dlm_rsb *r;
3226         int error;
3227
3228         r = lkb->lkb_resource;
3229
3230         hold_rsb(r);
3231         lock_rsb(r);
3232
3233         error = validate_unlock_args(lkb, args);
3234         if (error)
3235                 goto out;
3236
3237         error = _unlock_lock(r, lkb);
3238  out:
3239         unlock_rsb(r);
3240         put_rsb(r);
3241         return error;
3242 }
3243
3244 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3245                        struct dlm_args *args)
3246 {
3247         struct dlm_rsb *r;
3248         int error;
3249
3250         r = lkb->lkb_resource;
3251
3252         hold_rsb(r);
3253         lock_rsb(r);
3254
3255         error = validate_unlock_args(lkb, args);
3256         if (error)
3257                 goto out;
3258
3259         error = _cancel_lock(r, lkb);
3260  out:
3261         unlock_rsb(r);
3262         put_rsb(r);
3263         return error;
3264 }
3265
3266 /*
3267  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3268  */
3269
3270 int dlm_lock(dlm_lockspace_t *lockspace,
3271              int mode,
3272              struct dlm_lksb *lksb,
3273              uint32_t flags,
3274              void *name,
3275              unsigned int namelen,
3276              uint32_t parent_lkid,
3277              void (*ast) (void *astarg),
3278              void *astarg,
3279              void (*bast) (void *astarg, int mode))
3280 {
3281         struct dlm_ls *ls;
3282         struct dlm_lkb *lkb;
3283         struct dlm_args args;
3284         int error, convert = flags & DLM_LKF_CONVERT;
3285
3286         ls = dlm_find_lockspace_local(lockspace);
3287         if (!ls)
3288                 return -EINVAL;
3289
3290         dlm_lock_recovery(ls);
3291
3292         if (convert)
3293                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3294         else
3295                 error = create_lkb(ls, &lkb);
3296
3297         if (error)
3298                 goto out;
3299
3300         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3301                               astarg, bast, &args);
3302         if (error)
3303                 goto out_put;
3304
3305         if (convert)
3306                 error = convert_lock(ls, lkb, &args);
3307         else
3308                 error = request_lock(ls, lkb, name, namelen, &args);
3309
3310         if (error == -EINPROGRESS)
3311                 error = 0;
3312  out_put:
3313         if (convert || error)
3314                 __put_lkb(ls, lkb);
3315         if (error == -EAGAIN || error == -EDEADLK)
3316                 error = 0;
3317  out:
3318         dlm_unlock_recovery(ls);
3319         dlm_put_lockspace(ls);
3320         return error;
3321 }
3322
3323 int dlm_unlock(dlm_lockspace_t *lockspace,
3324                uint32_t lkid,
3325                uint32_t flags,
3326                struct dlm_lksb *lksb,
3327                void *astarg)
3328 {
3329         struct dlm_ls *ls;
3330         struct dlm_lkb *lkb;
3331         struct dlm_args args;
3332         int error;
3333
3334         ls = dlm_find_lockspace_local(lockspace);
3335         if (!ls)
3336                 return -EINVAL;
3337
3338         dlm_lock_recovery(ls);
3339
3340         error = find_lkb(ls, lkid, &lkb);
3341         if (error)
3342                 goto out;
3343
3344         error = set_unlock_args(flags, astarg, &args);
3345         if (error)
3346                 goto out_put;
3347
3348         if (flags & DLM_LKF_CANCEL)
3349                 error = cancel_lock(ls, lkb, &args);
3350         else
3351                 error = unlock_lock(ls, lkb, &args);
3352
3353         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3354                 error = 0;
3355         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3356                 error = 0;
3357  out_put:
3358         dlm_put_lkb(lkb);
3359  out:
3360         dlm_unlock_recovery(ls);
3361         dlm_put_lockspace(ls);
3362         return error;
3363 }
3364
3365 /*
3366  * send/receive routines for remote operations and replies
3367  *
3368  * send_args
3369  * send_common
3370  * send_request                 receive_request
3371  * send_convert                 receive_convert
3372  * send_unlock                  receive_unlock
3373  * send_cancel                  receive_cancel
3374  * send_grant                   receive_grant
3375  * send_bast                    receive_bast
3376  * send_lookup                  receive_lookup
3377  * send_remove                  receive_remove
3378  *
3379  *                              send_common_reply
3380  * receive_request_reply        send_request_reply
3381  * receive_convert_reply        send_convert_reply
3382  * receive_unlock_reply         send_unlock_reply
3383  * receive_cancel_reply         send_cancel_reply
3384  * receive_lookup_reply         send_lookup_reply
3385  */
3386
3387 static int _create_message(struct dlm_ls *ls, int mb_len,
3388                            int to_nodeid, int mstype,
3389                            struct dlm_message **ms_ret,
3390                            struct dlm_mhandle **mh_ret)
3391 {
3392         struct dlm_message *ms;
3393         struct dlm_mhandle *mh;
3394         char *mb;
3395
3396         /* get_buffer gives us a message handle (mh) that we need to
3397            pass into lowcomms_commit and a message buffer (mb) that we
3398            write our data into */
3399
3400         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3401         if (!mh)
3402                 return -ENOBUFS;
3403
3404         memset(mb, 0, mb_len);
3405
3406         ms = (struct dlm_message *) mb;
3407
3408         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3409         ms->m_header.h_lockspace = ls->ls_global_id;
3410         ms->m_header.h_nodeid = dlm_our_nodeid();
3411         ms->m_header.h_length = mb_len;
3412         ms->m_header.h_cmd = DLM_MSG;
3413
3414         ms->m_type = mstype;
3415
3416         *mh_ret = mh;
3417         *ms_ret = ms;
3418         return 0;
3419 }
3420
3421 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3422                           int to_nodeid, int mstype,
3423                           struct dlm_message **ms_ret,
3424                           struct dlm_mhandle **mh_ret)
3425 {
3426         int mb_len = sizeof(struct dlm_message);
3427
3428         switch (mstype) {
3429         case DLM_MSG_REQUEST:
3430         case DLM_MSG_LOOKUP:
3431         case DLM_MSG_REMOVE:
3432                 mb_len += r->res_length;
3433                 break;
3434         case DLM_MSG_CONVERT:
3435         case DLM_MSG_UNLOCK:
3436         case DLM_MSG_REQUEST_REPLY:
3437         case DLM_MSG_CONVERT_REPLY:
3438         case DLM_MSG_GRANT:
3439                 if (lkb && lkb->lkb_lvbptr)
3440                         mb_len += r->res_ls->ls_lvblen;
3441                 break;
3442         }
3443
3444         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3445                                ms_ret, mh_ret);
3446 }
3447
3448 /* further lowcomms enhancements or alternate implementations may make
3449    the return value from this function useful at some point */
3450
3451 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3452 {
3453         dlm_message_out(ms);
3454         dlm_lowcomms_commit_buffer(mh);
3455         return 0;
3456 }
3457
3458 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3459                       struct dlm_message *ms)
3460 {
3461         ms->m_nodeid   = lkb->lkb_nodeid;
3462         ms->m_pid      = lkb->lkb_ownpid;
3463         ms->m_lkid     = lkb->lkb_id;
3464         ms->m_remid    = lkb->lkb_remid;
3465         ms->m_exflags  = lkb->lkb_exflags;
3466         ms->m_sbflags  = lkb->lkb_sbflags;
3467         ms->m_flags    = lkb->lkb_flags;
3468         ms->m_lvbseq   = lkb->lkb_lvbseq;
3469         ms->m_status   = lkb->lkb_status;
3470         ms->m_grmode   = lkb->lkb_grmode;
3471         ms->m_rqmode   = lkb->lkb_rqmode;
3472         ms->m_hash     = r->res_hash;
3473
3474         /* m_result and m_bastmode are set from function args,
3475            not from lkb fields */
3476
3477         if (lkb->lkb_bastfn)
3478                 ms->m_asts |= DLM_CB_BAST;
3479         if (lkb->lkb_astfn)
3480                 ms->m_asts |= DLM_CB_CAST;
3481
3482         /* compare with switch in create_message; send_remove() doesn't
3483            use send_args() */
3484
3485         switch (ms->m_type) {
3486         case DLM_MSG_REQUEST:
3487         case DLM_MSG_LOOKUP:
3488                 memcpy(ms->m_extra, r->res_name, r->res_length);
3489                 break;
3490         case DLM_MSG_CONVERT:
3491         case DLM_MSG_UNLOCK:
3492         case DLM_MSG_REQUEST_REPLY:
3493         case DLM_MSG_CONVERT_REPLY:
3494         case DLM_MSG_GRANT:
3495                 if (!lkb->lkb_lvbptr)
3496                         break;
3497                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3498                 break;
3499         }
3500 }
3501
3502 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3503 {
3504         struct dlm_message *ms;
3505         struct dlm_mhandle *mh;
3506         int to_nodeid, error;
3507
3508         to_nodeid = r->res_nodeid;
3509
3510         error = add_to_waiters(lkb, mstype, to_nodeid);
3511         if (error)
3512                 return error;
3513
3514         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3515         if (error)
3516                 goto fail;
3517
3518         send_args(r, lkb, ms);
3519
3520         error = send_message(mh, ms);
3521         if (error)
3522                 goto fail;
3523         return 0;
3524
3525  fail:
3526         remove_from_waiters(lkb, msg_reply_type(mstype));
3527         return error;
3528 }
3529
3530 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3531 {
3532         return send_common(r, lkb, DLM_MSG_REQUEST);
3533 }
3534
3535 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3536 {
3537         int error;
3538
3539         error = send_common(r, lkb, DLM_MSG_CONVERT);
3540
3541         /* down conversions go without a reply from the master */
3542         if (!error && down_conversion(lkb)) {
3543                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3544                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3545                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3546                 r->res_ls->ls_stub_ms.m_result = 0;
3547                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3548         }
3549
3550         return error;
3551 }
3552
3553 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3554    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3555    that the master is still correct. */
3556
3557 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3558 {
3559         return send_common(r, lkb, DLM_MSG_UNLOCK);
3560 }
3561
3562 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3563 {
3564         return send_common(r, lkb, DLM_MSG_CANCEL);
3565 }
3566
3567 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3568 {
3569         struct dlm_message *ms;
3570         struct dlm_mhandle *mh;
3571         int to_nodeid, error;
3572
3573         to_nodeid = lkb->lkb_nodeid;
3574
3575         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3576         if (error)
3577                 goto out;
3578
3579         send_args(r, lkb, ms);
3580
3581         ms->m_result = 0;
3582
3583         error = send_message(mh, ms);
3584  out:
3585         return error;
3586 }
3587
3588 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3589 {
3590         struct dlm_message *ms;
3591         struct dlm_mhandle *mh;
3592         int to_nodeid, error;
3593
3594         to_nodeid = lkb->lkb_nodeid;
3595
3596         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3597         if (error)
3598                 goto out;
3599
3600         send_args(r, lkb, ms);
3601
3602         ms->m_bastmode = mode;
3603
3604         error = send_message(mh, ms);
3605  out:
3606         return error;
3607 }
3608
3609 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3610 {
3611         struct dlm_message *ms;
3612         struct dlm_mhandle *mh;
3613         int to_nodeid, error;
3614
3615         to_nodeid = dlm_dir_nodeid(r);
3616
3617         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3618         if (error)
3619                 return error;
3620
3621         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3622         if (error)
3623                 goto fail;
3624
3625         send_args(r, lkb, ms);
3626
3627         error = send_message(mh, ms);
3628         if (error)
3629                 goto fail;
3630         return 0;
3631
3632  fail:
3633         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3634         return error;
3635 }
3636
3637 static int send_remove(struct dlm_rsb *r)
3638 {
3639         struct dlm_message *ms;
3640         struct dlm_mhandle *mh;
3641         int to_nodeid, error;
3642
3643         to_nodeid = dlm_dir_nodeid(r);
3644
3645         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3646         if (error)
3647                 goto out;
3648
3649         memcpy(ms->m_extra, r->res_name, r->res_length);
3650         ms->m_hash = r->res_hash;
3651
3652         error = send_message(mh, ms);
3653  out:
3654         return error;
3655 }
3656
3657 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3658                              int mstype, int rv)
3659 {
3660         struct dlm_message *ms;
3661         struct dlm_mhandle *mh;
3662         int to_nodeid, error;
3663
3664         to_nodeid = lkb->lkb_nodeid;
3665
3666         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3667         if (error)
3668                 goto out;
3669
3670         send_args(r, lkb, ms);
3671
3672         ms->m_result = rv;
3673
3674         error = send_message(mh, ms);
3675  out:
3676         return error;
3677 }
3678
3679 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3680 {
3681         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3682 }
3683
3684 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3685 {
3686         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3687 }
3688
3689 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3690 {
3691         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3692 }
3693
3694 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3695 {
3696         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3697 }
3698
3699 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3700                              int ret_nodeid, int rv)
3701 {
3702         struct dlm_rsb *r = &ls->ls_stub_rsb;
3703         struct dlm_message *ms;
3704         struct dlm_mhandle *mh;
3705         int error, nodeid = ms_in->m_header.h_nodeid;
3706
3707         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3708         if (error)
3709                 goto out;
3710
3711         ms->m_lkid = ms_in->m_lkid;
3712         ms->m_result = rv;
3713         ms->m_nodeid = ret_nodeid;
3714
3715         error = send_message(mh, ms);
3716  out:
3717         return error;
3718 }
3719
3720 /* which args we save from a received message depends heavily on the type
3721    of message, unlike the send side where we can safely send everything about
3722    the lkb for any type of message */
3723
3724 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3725 {
3726         lkb->lkb_exflags = ms->m_exflags;
3727         lkb->lkb_sbflags = ms->m_sbflags;
3728         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3729                          (ms->m_flags & 0x0000FFFF);
3730 }
3731
3732 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3733 {
3734         if (ms->m_flags == DLM_IFL_STUB_MS)
3735                 return;
3736
3737         lkb->lkb_sbflags = ms->m_sbflags;
3738         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3739                          (ms->m_flags & 0x0000FFFF);
3740 }
3741
3742 static int receive_extralen(struct dlm_message *ms)
3743 {
3744         return (ms->m_header.h_length - sizeof(struct dlm_message));
3745 }
3746
3747 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3748                        struct dlm_message *ms)
3749 {
3750         int len;
3751
3752         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3753                 if (!lkb->lkb_lvbptr)
3754                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3755                 if (!lkb->lkb_lvbptr)
3756                         return -ENOMEM;
3757                 len = receive_extralen(ms);
3758                 if (len > DLM_RESNAME_MAXLEN)
3759                         len = DLM_RESNAME_MAXLEN;
3760                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3761         }
3762         return 0;
3763 }
3764
3765 static void fake_bastfn(void *astparam, int mode)
3766 {
3767         log_print("fake_bastfn should not be called");
3768 }
3769
3770 static void fake_astfn(void *astparam)
3771 {
3772         log_print("fake_astfn should not be called");
3773 }
3774
3775 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3776                                 struct dlm_message *ms)
3777 {
3778         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3779         lkb->lkb_ownpid = ms->m_pid;
3780         lkb->lkb_remid = ms->m_lkid;
3781         lkb->lkb_grmode = DLM_LOCK_IV;
3782         lkb->lkb_rqmode = ms->m_rqmode;
3783
3784         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3785         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3786
3787         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3788                 /* lkb was just created so there won't be an lvb yet */
3789                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3790                 if (!lkb->lkb_lvbptr)
3791                         return -ENOMEM;
3792         }
3793
3794         return 0;
3795 }
3796
3797 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3798                                 struct dlm_message *ms)
3799 {
3800         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3801                 return -EBUSY;
3802
3803         if (receive_lvb(ls, lkb, ms))
3804                 return -ENOMEM;
3805
3806         lkb->lkb_rqmode = ms->m_rqmode;
3807         lkb->lkb_lvbseq = ms->m_lvbseq;
3808
3809         return 0;
3810 }
3811
3812 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3813                                struct dlm_message *ms)
3814 {
3815         if (receive_lvb(ls, lkb, ms))
3816                 return -ENOMEM;
3817         return 0;
3818 }
3819
3820 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3821    uses to send a reply and that the remote end uses to process the reply. */
3822
3823 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3824 {
3825         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3826         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3827         lkb->lkb_remid = ms->m_lkid;
3828 }
3829
3830 /* This is called after the rsb is locked so that we can safely inspect
3831    fields in the lkb. */
3832
3833 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3834 {
3835         int from = ms->m_header.h_nodeid;
3836         int error = 0;
3837
3838         switch (ms->m_type) {
3839         case DLM_MSG_CONVERT:
3840         case DLM_MSG_UNLOCK:
3841         case DLM_MSG_CANCEL:
3842                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3843                         error = -EINVAL;
3844                 break;
3845
3846         case DLM_MSG_CONVERT_REPLY:
3847         case DLM_MSG_UNLOCK_REPLY:
3848         case DLM_MSG_CANCEL_REPLY:
3849         case DLM_MSG_GRANT:
3850         case DLM_MSG_BAST:
3851                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3852                         error = -EINVAL;
3853                 break;
3854
3855         case DLM_MSG_REQUEST_REPLY:
3856                 if (!is_process_copy(lkb))
3857                         error = -EINVAL;
3858                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3859                         error = -EINVAL;
3860                 break;
3861
3862         default:
3863                 error = -EINVAL;
3864         }
3865
3866         if (error)
3867                 log_error(lkb->lkb_resource->res_ls,
3868                           "ignore invalid message %d from %d %x %x %x %d",
3869                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3870                           lkb->lkb_flags, lkb->lkb_nodeid);
3871         return error;
3872 }
3873
3874 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3875 {
3876         struct dlm_lkb *lkb;
3877         struct dlm_rsb *r;
3878         int from_nodeid;
3879         int error, namelen;
3880
3881         from_nodeid = ms->m_header.h_nodeid;
3882
3883         error = create_lkb(ls, &lkb);
3884         if (error)
3885                 goto fail;
3886
3887         receive_flags(lkb, ms);
3888         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3889         error = receive_request_args(ls, lkb, ms);
3890         if (error) {
3891                 __put_lkb(ls, lkb);
3892                 goto fail;
3893         }
3894
3895         /* The dir node is the authority on whether we are the master
3896            for this rsb or not, so if the master sends us a request, we should
3897            recreate the rsb if we've destroyed it.   This race happens when we
3898            send a remove message to the dir node at the same time that the dir
3899            node sends us a request for the rsb. */
3900
3901         namelen = receive_extralen(ms);
3902
3903         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
3904                          R_RECEIVE_REQUEST, &r);
3905         if (error) {
3906                 __put_lkb(ls, lkb);
3907                 goto fail;
3908         }
3909
3910         lock_rsb(r);
3911
3912         if (r->res_master_nodeid != dlm_our_nodeid()) {
3913                 error = validate_master_nodeid(ls, r, from_nodeid);
3914                 if (error) {
3915                         unlock_rsb(r);
3916                         put_rsb(r);
3917                         __put_lkb(ls, lkb);
3918                         goto fail;
3919                 }
3920         }
3921
3922         attach_lkb(r, lkb);
3923         error = do_request(r, lkb);
3924         send_request_reply(r, lkb, error);
3925         do_request_effects(r, lkb, error);
3926
3927         unlock_rsb(r);
3928         put_rsb(r);
3929
3930         if (error == -EINPROGRESS)
3931                 error = 0;
3932         if (error)
3933                 dlm_put_lkb(lkb);
3934         return 0;
3935
3936  fail:
3937         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
3938            and do this receive_request again from process_lookup_list once
3939            we get the lookup reply.  This would avoid a many repeated
3940            ENOTBLK request failures when the lookup reply designating us
3941            as master is delayed. */
3942
3943         /* We could repeatedly return -EBADR here if our send_remove() is
3944            delayed in being sent/arriving/being processed on the dir node.
3945            Another node would repeatedly lookup up the master, and the dir
3946            node would continue returning our nodeid until our send_remove
3947            took effect. */
3948
3949         if (error != -ENOTBLK) {
3950                 log_limit(ls, "receive_request %x from %d %d",
3951                           ms->m_lkid, from_nodeid, error);
3952         }
3953
3954         setup_stub_lkb(ls, ms);
3955         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3956         return error;
3957 }
3958
3959 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3960 {
3961         struct dlm_lkb *lkb;
3962         struct dlm_rsb *r;
3963         int error, reply = 1;
3964
3965         error = find_lkb(ls, ms->m_remid, &lkb);
3966         if (error)
3967                 goto fail;
3968
3969         if (lkb->lkb_remid != ms->m_lkid) {
3970                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3971                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3972                           (unsigned long long)lkb->lkb_recover_seq,
3973                           ms->m_header.h_nodeid, ms->m_lkid);
3974                 error = -ENOENT;
3975                 goto fail;
3976         }
3977
3978         r = lkb->lkb_resource;
3979
3980         hold_rsb(r);
3981         lock_rsb(r);
3982
3983         error = validate_message(lkb, ms);
3984         if (error)
3985                 goto out;
3986
3987         receive_flags(lkb, ms);
3988
3989         error = receive_convert_args(ls, lkb, ms);
3990         if (error) {
3991                 send_convert_reply(r, lkb, error);
3992                 goto out;
3993         }
3994
3995         reply = !down_conversion(lkb);
3996
3997         error = do_convert(r, lkb);
3998         if (reply)
3999                 send_convert_reply(r, lkb, error);
4000         do_convert_effects(r, lkb, error);
4001  out:
4002         unlock_rsb(r);
4003         put_rsb(r);
4004         dlm_put_lkb(lkb);
4005         return 0;
4006
4007  fail:
4008         setup_stub_lkb(ls, ms);
4009         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4010         return error;
4011 }
4012
4013 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4014 {
4015         struct dlm_lkb *lkb;
4016         struct dlm_rsb *r;
4017         int error;
4018
4019         error = find_lkb(ls, ms->m_remid, &lkb);
4020         if (error)
4021                 goto fail;
4022
4023         if (lkb->lkb_remid != ms->m_lkid) {
4024                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4025                           lkb->lkb_id, lkb->lkb_remid,
4026                           ms->m_header.h_nodeid, ms->m_lkid);
4027                 error = -ENOENT;
4028                 goto fail;
4029         }
4030
4031         r = lkb->lkb_resource;
4032
4033         hold_rsb(r);
4034         lock_rsb(r);
4035
4036         error = validate_message(lkb, ms);
4037         if (error)
4038                 goto out;
4039
4040         receive_flags(lkb, ms);
4041
4042         error = receive_unlock_args(ls, lkb, ms);
4043         if (error) {
4044                 send_unlock_reply(r, lkb, error);
4045                 goto out;
4046         }
4047
4048         error = do_unlock(r, lkb);
4049         send_unlock_reply(r, lkb, error);
4050         do_unlock_effects(r, lkb, error);
4051  out:
4052         unlock_rsb(r);
4053         put_rsb(r);
4054         dlm_put_lkb(lkb);
4055         return 0;
4056
4057  fail:
4058         setup_stub_lkb(ls, ms);
4059         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4060         return error;
4061 }
4062
4063 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4064 {
4065         struct dlm_lkb *lkb;
4066         struct dlm_rsb *r;
4067         int error;
4068
4069         error = find_lkb(ls, ms->m_remid, &lkb);
4070         if (error)
4071                 goto fail;
4072
4073         receive_flags(lkb, ms);
4074
4075         r = lkb->lkb_resource;
4076
4077         hold_rsb(r);
4078         lock_rsb(r);
4079
4080         error = validate_message(lkb, ms);
4081         if (error)
4082                 goto out;
4083
4084         error = do_cancel(r, lkb);
4085         send_cancel_reply(r, lkb, error);
4086         do_cancel_effects(r, lkb, error);
4087  out:
4088         unlock_rsb(r);
4089         put_rsb(r);
4090         dlm_put_lkb(lkb);
4091         return 0;
4092
4093  fail:
4094         setup_stub_lkb(ls, ms);
4095         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4096         return error;
4097 }
4098
4099 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4100 {
4101         struct dlm_lkb *lkb;
4102         struct dlm_rsb *r;
4103         int error;
4104
4105         error = find_lkb(ls, ms->m_remid, &lkb);
4106         if (error)
4107                 return error;
4108
4109         r = lkb->lkb_resource;
4110
4111         hold_rsb(r);
4112         lock_rsb(r);
4113
4114         error = validate_message(lkb, ms);
4115         if (error)
4116                 goto out;
4117
4118         receive_flags_reply(lkb, ms);
4119         if (is_altmode(lkb))
4120                 munge_altmode(lkb, ms);
4121         grant_lock_pc(r, lkb, ms);
4122         queue_cast(r, lkb, 0);
4123  out:
4124         unlock_rsb(r);
4125         put_rsb(r);
4126         dlm_put_lkb(lkb);
4127         return 0;
4128 }
4129
4130 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4131 {
4132         struct dlm_lkb *lkb;
4133         struct dlm_rsb *r;
4134         int error;
4135
4136         error = find_lkb(ls, ms->m_remid, &lkb);
4137         if (error)
4138                 return error;
4139
4140         r = lkb->lkb_resource;
4141
4142         hold_rsb(r);
4143         lock_rsb(r);
4144
4145         error = validate_message(lkb, ms);
4146         if (error)
4147                 goto out;
4148
4149         queue_bast(r, lkb, ms->m_bastmode);
4150         lkb->lkb_highbast = ms->m_bastmode;
4151  out:
4152         unlock_rsb(r);
4153         put_rsb(r);
4154         dlm_put_lkb(lkb);
4155         return 0;
4156 }
4157
4158 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4159 {
4160         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4161
4162         from_nodeid = ms->m_header.h_nodeid;
4163         our_nodeid = dlm_our_nodeid();
4164
4165         len = receive_extralen(ms);
4166
4167         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4168                                   &ret_nodeid, NULL);
4169
4170         /* Optimization: we're master so treat lookup as a request */
4171         if (!error && ret_nodeid == our_nodeid) {
4172                 receive_request(ls, ms);
4173                 return;
4174         }
4175         send_lookup_reply(ls, ms, ret_nodeid, error);
4176 }
4177
4178 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4179 {
4180         char name[DLM_RESNAME_MAXLEN+1];
4181         struct dlm_rsb *r;
4182         uint32_t hash, b;
4183         int rv, len, dir_nodeid, from_nodeid;
4184
4185         from_nodeid = ms->m_header.h_nodeid;
4186
4187         len = receive_extralen(ms);
4188
4189         if (len > DLM_RESNAME_MAXLEN) {
4190                 log_error(ls, "receive_remove from %d bad len %d",
4191                           from_nodeid, len);
4192                 return;
4193         }
4194
4195         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4196         if (dir_nodeid != dlm_our_nodeid()) {
4197                 log_error(ls, "receive_remove from %d bad nodeid %d",
4198                           from_nodeid, dir_nodeid);
4199                 return;
4200         }
4201
4202         /* Look for name on rsbtbl.toss, if it's there, kill it.
4203            If it's on rsbtbl.keep, it's being used, and we should ignore this
4204            message.  This is an expected race between the dir node sending a
4205            request to the master node at the same time as the master node sends
4206            a remove to the dir node.  The resolution to that race is for the
4207            dir node to ignore the remove message, and the master node to
4208            recreate the master rsb when it gets a request from the dir node for
4209            an rsb it doesn't have. */
4210
4211         memset(name, 0, sizeof(name));
4212         memcpy(name, ms->m_extra, len);
4213
4214         hash = jhash(name, len, 0);
4215         b = hash & (ls->ls_rsbtbl_size - 1);
4216
4217         spin_lock(&ls->ls_rsbtbl[b].lock);
4218
4219         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4220         if (rv) {
4221                 /* verify the rsb is on keep list per comment above */
4222                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4223                 if (rv) {
4224                         /* should not happen */
4225                         log_error(ls, "receive_remove from %d not found %s",
4226                                   from_nodeid, name);
4227                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4228                         return;
4229                 }
4230                 if (r->res_master_nodeid != from_nodeid) {
4231                         /* should not happen */
4232                         log_error(ls, "receive_remove keep from %d master %d",
4233                                   from_nodeid, r->res_master_nodeid);
4234                         dlm_print_rsb(r);
4235                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4236                         return;
4237                 }
4238
4239                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4240                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4241                           name);
4242                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4243                 return;
4244         }
4245
4246         if (r->res_master_nodeid != from_nodeid) {
4247                 log_error(ls, "receive_remove toss from %d master %d",
4248                           from_nodeid, r->res_master_nodeid);
4249                 dlm_print_rsb(r);
4250                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4251                 return;
4252         }
4253
4254         if (kref_put(&r->res_ref, kill_rsb)) {
4255                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4256                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4257                 dlm_free_rsb(r);
4258         } else {
4259                 log_error(ls, "receive_remove from %d rsb ref error",
4260                           from_nodeid);
4261                 dlm_print_rsb(r);
4262                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4263         }
4264 }
4265
4266 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4267 {
4268         do_purge(ls, ms->m_nodeid, ms->m_pid);
4269 }
4270
4271 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4272 {
4273         struct dlm_lkb *lkb;
4274         struct dlm_rsb *r;
4275         int error, mstype, result;
4276         int from_nodeid = ms->m_header.h_nodeid;
4277
4278         error = find_lkb(ls, ms->m_remid, &lkb);
4279         if (error)
4280                 return error;
4281
4282         r = lkb->lkb_resource;
4283         hold_rsb(r);
4284         lock_rsb(r);
4285
4286         error = validate_message(lkb, ms);
4287         if (error)
4288                 goto out;
4289
4290         mstype = lkb->lkb_wait_type;
4291         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4292         if (error) {
4293                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4294                           lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4295                 dlm_dump_rsb(r);
4296                 goto out;
4297         }
4298
4299         /* Optimization: the dir node was also the master, so it took our
4300            lookup as a request and sent request reply instead of lookup reply */
4301         if (mstype == DLM_MSG_LOOKUP) {
4302                 r->res_master_nodeid = from_nodeid;
4303                 r->res_nodeid = from_nodeid;
4304                 lkb->lkb_nodeid = from_nodeid;
4305         }
4306
4307         /* this is the value returned from do_request() on the master */
4308         result = ms->m_result;
4309
4310         switch (result) {
4311         case -EAGAIN:
4312                 /* request would block (be queued) on remote master */
4313                 queue_cast(r, lkb, -EAGAIN);
4314                 confirm_master(r, -EAGAIN);
4315                 unhold_lkb(lkb); /* undoes create_lkb() */
4316                 break;
4317
4318         case -EINPROGRESS:
4319         case 0:
4320                 /* request was queued or granted on remote master */
4321                 receive_flags_reply(lkb, ms);
4322                 lkb->lkb_remid = ms->m_lkid;
4323                 if (is_altmode(lkb))
4324                         munge_altmode(lkb, ms);
4325                 if (result) {
4326                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4327                         add_timeout(lkb);
4328                 } else {
4329                         grant_lock_pc(r, lkb, ms);
4330                         queue_cast(r, lkb, 0);
4331                 }
4332                 confirm_master(r, result);
4333                 break;
4334
4335         case -EBADR:
4336         case -ENOTBLK:
4337                 /* find_rsb failed to find rsb or rsb wasn't master */
4338                 log_limit(ls, "receive_request_reply %x from %d %d "
4339                           "master %d dir %d first %x %s", lkb->lkb_id,
4340                           from_nodeid, result, r->res_master_nodeid,
4341                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4342
4343                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4344                     r->res_master_nodeid != dlm_our_nodeid()) {
4345                         /* cause _request_lock->set_master->send_lookup */
4346                         r->res_master_nodeid = 0;
4347                         r->res_nodeid = -1;
4348                         lkb->lkb_nodeid = -1;
4349                 }
4350
4351                 if (is_overlap(lkb)) {
4352                         /* we'll ignore error in cancel/unlock reply */
4353                         queue_cast_overlap(r, lkb);
4354                         confirm_master(r, result);
4355                         unhold_lkb(lkb); /* undoes create_lkb() */
4356                 } else {
4357                         _request_lock(r, lkb);
4358
4359                         if (r->res_master_nodeid == dlm_our_nodeid())
4360                                 confirm_master(r, 0);
4361                 }
4362                 break;
4363
4364         default:
4365                 log_error(ls, "receive_request_reply %x error %d",
4366                           lkb->lkb_id, result);
4367         }
4368
4369         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4370                 log_debug(ls, "receive_request_reply %x result %d unlock",
4371                           lkb->lkb_id, result);
4372                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4373                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4374                 send_unlock(r, lkb);
4375         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4376                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4377                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4378                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4379                 send_cancel(r, lkb);
4380         } else {
4381                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4382                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4383         }
4384  out:
4385         unlock_rsb(r);
4386         put_rsb(r);
4387         dlm_put_lkb(lkb);
4388         return 0;
4389 }
4390
4391 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4392                                     struct dlm_message *ms)
4393 {
4394         /* this is the value returned from do_convert() on the master */
4395         switch (ms->m_result) {
4396         case -EAGAIN:
4397                 /* convert would block (be queued) on remote master */
4398                 queue_cast(r, lkb, -EAGAIN);
4399                 break;
4400
4401         case -EDEADLK:
4402                 receive_flags_reply(lkb, ms);
4403                 revert_lock_pc(r, lkb);
4404                 queue_cast(r, lkb, -EDEADLK);
4405                 break;
4406
4407         case -EINPROGRESS:
4408                 /* convert was queued on remote master */
4409                 receive_flags_reply(lkb, ms);
4410                 if (is_demoted(lkb))
4411                         munge_demoted(lkb);
4412                 del_lkb(r, lkb);
4413                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4414                 add_timeout(lkb);
4415                 break;
4416
4417         case 0:
4418                 /* convert was granted on remote master */
4419                 receive_flags_reply(lkb, ms);
4420                 if (is_demoted(lkb))
4421                         munge_demoted(lkb);
4422                 grant_lock_pc(r, lkb, ms);
4423                 queue_cast(r, lkb, 0);
4424                 break;
4425
4426         default:
4427                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4428                           lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4429                           ms->m_result);
4430                 dlm_print_rsb(r);
4431                 dlm_print_lkb(lkb);
4432         }
4433 }
4434
4435 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4436 {
4437         struct dlm_rsb *r = lkb->lkb_resource;
4438         int error;
4439
4440         hold_rsb(r);
4441         lock_rsb(r);
4442
4443         error = validate_message(lkb, ms);
4444         if (error)
4445                 goto out;
4446
4447         /* stub reply can happen with waiters_mutex held */
4448         error = remove_from_waiters_ms(lkb, ms);
4449         if (error)
4450                 goto out;
4451
4452         __receive_convert_reply(r, lkb, ms);
4453  out:
4454         unlock_rsb(r);
4455         put_rsb(r);
4456 }
4457
4458 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4459 {
4460         struct dlm_lkb *lkb;
4461         int error;
4462
4463         error = find_lkb(ls, ms->m_remid, &lkb);
4464         if (error)
4465                 return error;
4466
4467         _receive_convert_reply(lkb, ms);
4468         dlm_put_lkb(lkb);
4469         return 0;
4470 }
4471
4472 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4473 {
4474         struct dlm_rsb *r = lkb->lkb_resource;
4475         int error;
4476
4477         hold_rsb(r);
4478         lock_rsb(r);
4479
4480         error = validate_message(lkb, ms);
4481         if (error)
4482                 goto out;
4483
4484         /* stub reply can happen with waiters_mutex held */
4485         error = remove_from_waiters_ms(lkb, ms);
4486         if (error)
4487                 goto out;
4488
4489         /* this is the value returned from do_unlock() on the master */
4490
4491         switch (ms->m_result) {
4492         case -DLM_EUNLOCK:
4493                 receive_flags_reply(lkb, ms);
4494                 remove_lock_pc(r, lkb);
4495                 queue_cast(r, lkb, -DLM_EUNLOCK);
4496                 break;
4497         case -ENOENT:
4498                 break;
4499         default:
4500                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4501                           lkb->lkb_id, ms->m_result);
4502         }
4503  out:
4504         unlock_rsb(r);
4505         put_rsb(r);
4506 }
4507
4508 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4509 {
4510         struct dlm_lkb *lkb;
4511         int error;
4512
4513         error = find_lkb(ls, ms->m_remid, &lkb);
4514         if (error)
4515                 return error;
4516
4517         _receive_unlock_reply(lkb, ms);
4518         dlm_put_lkb(lkb);
4519         return 0;
4520 }
4521
4522 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4523 {
4524         struct dlm_rsb *r = lkb->lkb_resource;
4525         int error;
4526
4527         hold_rsb(r);
4528         lock_rsb(r);
4529
4530         error = validate_message(lkb, ms);
4531         if (error)
4532                 goto out;
4533
4534         /* stub reply can happen with waiters_mutex held */
4535         error = remove_from_waiters_ms(lkb, ms);
4536         if (error)
4537                 goto out;
4538
4539         /* this is the value returned from do_cancel() on the master */
4540
4541         switch (ms->m_result) {
4542         case -DLM_ECANCEL:
4543                 receive_flags_reply(lkb, ms);
4544                 revert_lock_pc(r, lkb);
4545                 queue_cast(r, lkb, -DLM_ECANCEL);
4546                 break;
4547         case 0:
4548                 break;
4549         default:
4550                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4551                           lkb->lkb_id, ms->m_result);
4552         }
4553  out:
4554         unlock_rsb(r);
4555         put_rsb(r);
4556 }
4557
4558 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4559 {
4560         struct dlm_lkb *lkb;
4561         int error;
4562
4563         error = find_lkb(ls, ms->m_remid, &lkb);
4564         if (error)
4565                 return error;
4566
4567         _receive_cancel_reply(lkb, ms);
4568         dlm_put_lkb(lkb);
4569         return 0;
4570 }
4571
4572 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4573 {
4574         struct dlm_lkb *lkb;
4575         struct dlm_rsb *r;
4576         int error, ret_nodeid;
4577         int do_lookup_list = 0;
4578
4579         error = find_lkb(ls, ms->m_lkid, &lkb);
4580         if (error) {
4581                 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4582                 return;
4583         }
4584
4585         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4586            FIXME: will a non-zero error ever be returned? */
4587
4588         r = lkb->lkb_resource;
4589         hold_rsb(r);
4590         lock_rsb(r);
4591
4592         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4593         if (error)
4594                 goto out;
4595
4596         ret_nodeid = ms->m_nodeid;
4597
4598         /* We sometimes receive a request from the dir node for this
4599            rsb before we've received the dir node's loookup_reply for it.
4600            The request from the dir node implies we're the master, so we set
4601            ourself as master in receive_request_reply, and verify here that
4602            we are indeed the master. */
4603
4604         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4605                 /* This should never happen */
4606                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4607                           "master %d dir %d our %d first %x %s",
4608                           lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4609                           r->res_master_nodeid, r->res_dir_nodeid,
4610                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4611         }
4612
4613         if (ret_nodeid == dlm_our_nodeid()) {
4614                 r->res_master_nodeid = ret_nodeid;
4615                 r->res_nodeid = 0;
4616                 do_lookup_list = 1;
4617                 r->res_first_lkid = 0;
4618         } else if (ret_nodeid == -1) {
4619                 /* the remote node doesn't believe it's the dir node */
4620                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4621                           lkb->lkb_id, ms->m_header.h_nodeid);
4622                 r->res_master_nodeid = 0;
4623                 r->res_nodeid = -1;
4624                 lkb->lkb_nodeid = -1;
4625         } else {
4626                 /* set_master() will set lkb_nodeid from r */
4627                 r->res_master_nodeid = ret_nodeid;
4628                 r->res_nodeid = ret_nodeid;
4629         }
4630
4631         if (is_overlap(lkb)) {
4632                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4633                           lkb->lkb_id, lkb->lkb_flags);
4634                 queue_cast_overlap(r, lkb);
4635                 unhold_lkb(lkb); /* undoes create_lkb() */
4636                 goto out_list;
4637         }
4638
4639         _request_lock(r, lkb);
4640
4641  out_list:
4642         if (do_lookup_list)
4643                 process_lookup_list(r);
4644  out:
4645         unlock_rsb(r);
4646         put_rsb(r);
4647         dlm_put_lkb(lkb);
4648 }
4649
4650 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4651                              uint32_t saved_seq)
4652 {
4653         int error = 0, noent = 0;
4654
4655         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4656                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4657                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4658                           ms->m_remid, ms->m_result);
4659                 return;
4660         }
4661
4662         switch (ms->m_type) {
4663
4664         /* messages sent to a master node */
4665
4666         case DLM_MSG_REQUEST:
4667                 error = receive_request(ls, ms);
4668                 break;
4669
4670         case DLM_MSG_CONVERT:
4671                 error = receive_convert(ls, ms);
4672                 break;
4673
4674         case DLM_MSG_UNLOCK:
4675                 error = receive_unlock(ls, ms);
4676                 break;
4677
4678         case DLM_MSG_CANCEL:
4679                 noent = 1;
4680                 error = receive_cancel(ls, ms);
4681                 break;
4682
4683         /* messages sent from a master node (replies to above) */
4684
4685         case DLM_MSG_REQUEST_REPLY:
4686                 error = receive_request_reply(ls, ms);
4687                 break;
4688
4689         case DLM_MSG_CONVERT_REPLY:
4690                 error = receive_convert_reply(ls, ms);
4691                 break;
4692
4693         case DLM_MSG_UNLOCK_REPLY:
4694                 error = receive_unlock_reply(ls, ms);
4695                 break;
4696
4697         case DLM_MSG_CANCEL_REPLY:
4698                 error = receive_cancel_reply(ls, ms);
4699                 break;
4700
4701         /* messages sent from a master node (only two types of async msg) */
4702
4703         case DLM_MSG_GRANT:
4704                 noent = 1;
4705                 error = receive_grant(ls, ms);
4706                 break;
4707
4708         case DLM_MSG_BAST:
4709                 noent = 1;
4710                 error = receive_bast(ls, ms);
4711                 break;
4712
4713         /* messages sent to a dir node */
4714
4715         case DLM_MSG_LOOKUP:
4716                 receive_lookup(ls, ms);
4717                 break;
4718
4719         case DLM_MSG_REMOVE:
4720                 receive_remove(ls, ms);
4721                 break;
4722
4723         /* messages sent from a dir node (remove has no reply) */
4724
4725         case DLM_MSG_LOOKUP_REPLY:
4726                 receive_lookup_reply(ls, ms);
4727                 break;
4728
4729         /* other messages */
4730
4731         case DLM_MSG_PURGE:
4732                 receive_purge(ls, ms);
4733                 break;
4734
4735         default:
4736                 log_error(ls, "unknown message type %d", ms->m_type);
4737         }
4738
4739         /*
4740          * When checking for ENOENT, we're checking the result of
4741          * find_lkb(m_remid):
4742          *
4743          * The lock id referenced in the message wasn't found.  This may
4744          * happen in normal usage for the async messages and cancel, so
4745          * only use log_debug for them.
4746          *
4747          * Some errors are expected and normal.
4748          */
4749
4750         if (error == -ENOENT && noent) {
4751                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4752                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4753                           ms->m_lkid, saved_seq);
4754         } else if (error == -ENOENT) {
4755                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4756                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4757                           ms->m_lkid, saved_seq);
4758
4759                 if (ms->m_type == DLM_MSG_CONVERT)
4760                         dlm_dump_rsb_hash(ls, ms->m_hash);
4761         }
4762
4763         if (error == -EINVAL) {
4764                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4765                           "saved_seq %u",
4766                           ms->m_type, ms->m_header.h_nodeid,
4767                           ms->m_lkid, ms->m_remid, saved_seq);
4768         }
4769 }
4770
4771 /* If the lockspace is in recovery mode (locking stopped), then normal
4772    messages are saved on the requestqueue for processing after recovery is
4773    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4774    messages off the requestqueue before we process new ones. This occurs right
4775    after recovery completes when we transition from saving all messages on
4776    requestqueue, to processing all the saved messages, to processing new
4777    messages as they arrive. */
4778
4779 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4780                                 int nodeid)
4781 {
4782         if (dlm_locking_stopped(ls)) {
4783                 /* If we were a member of this lockspace, left, and rejoined,
4784                    other nodes may still be sending us messages from the
4785                    lockspace generation before we left. */
4786                 if (!ls->ls_generation) {
4787                         log_limit(ls, "receive %d from %d ignore old gen",
4788                                   ms->m_type, nodeid);
4789                         return;
4790                 }
4791
4792                 dlm_add_requestqueue(ls, nodeid, ms);
4793         } else {
4794                 dlm_wait_requestqueue(ls);
4795                 _receive_message(ls, ms, 0);
4796         }
4797 }
4798
4799 /* This is called by dlm_recoverd to process messages that were saved on
4800    the requestqueue. */
4801
4802 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4803                                uint32_t saved_seq)
4804 {
4805         _receive_message(ls, ms, saved_seq);
4806 }
4807
4808 /* This is called by the midcomms layer when something is received for
4809    the lockspace.  It could be either a MSG (normal message sent as part of
4810    standard locking activity) or an RCOM (recovery message sent as part of
4811    lockspace recovery). */
4812
4813 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4814 {
4815         struct dlm_header *hd = &p->header;
4816         struct dlm_ls *ls;
4817         int type = 0;
4818
4819         switch (hd->h_cmd) {
4820         case DLM_MSG:
4821                 dlm_message_in(&p->message);
4822                 type = p->message.m_type;
4823                 break;
4824         case DLM_RCOM:
4825                 dlm_rcom_in(&p->rcom);
4826                 type = p->rcom.rc_type;
4827                 break;
4828         default:
4829                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4830                 return;
4831         }
4832
4833         if (hd->h_nodeid != nodeid) {
4834                 log_print("invalid h_nodeid %d from %d lockspace %x",
4835                           hd->h_nodeid, nodeid, hd->h_lockspace);
4836                 return;
4837         }
4838
4839         ls = dlm_find_lockspace_global(hd->h_lockspace);
4840         if (!ls) {
4841                 if (dlm_config.ci_log_debug) {
4842                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4843                                 "%u from %d cmd %d type %d\n",
4844                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
4845                 }
4846
4847                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4848                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4849                 return;
4850         }
4851
4852         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4853            be inactive (in this ls) before transitioning to recovery mode */
4854
4855         down_read(&ls->ls_recv_active);
4856         if (hd->h_cmd == DLM_MSG)
4857                 dlm_receive_message(ls, &p->message, nodeid);
4858         else
4859                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4860         up_read(&ls->ls_recv_active);
4861
4862         dlm_put_lockspace(ls);
4863 }
4864
4865 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4866                                    struct dlm_message *ms_stub)
4867 {
4868         if (middle_conversion(lkb)) {
4869                 hold_lkb(lkb);
4870                 memset(ms_stub, 0, sizeof(struct dlm_message));
4871                 ms_stub->m_flags = DLM_IFL_STUB_MS;
4872                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4873                 ms_stub->m_result = -EINPROGRESS;
4874                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4875                 _receive_convert_reply(lkb, ms_stub);
4876
4877                 /* Same special case as in receive_rcom_lock_args() */
4878                 lkb->lkb_grmode = DLM_LOCK_IV;
4879                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4880                 unhold_lkb(lkb);
4881
4882         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4883                 lkb->lkb_flags |= DLM_IFL_RESEND;
4884         }
4885
4886         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4887            conversions are async; there's no reply from the remote master */
4888 }
4889
4890 /* A waiting lkb needs recovery if the master node has failed, or
4891    the master node is changing (only when no directory is used) */
4892
4893 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4894                                  int dir_nodeid)
4895 {
4896         if (dlm_no_directory(ls))
4897                 return 1;
4898
4899         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4900                 return 1;
4901
4902         return 0;
4903 }
4904
4905 /* Recovery for locks that are waiting for replies from nodes that are now
4906    gone.  We can just complete unlocks and cancels by faking a reply from the
4907    dead node.  Requests and up-conversions we flag to be resent after
4908    recovery.  Down-conversions can just be completed with a fake reply like
4909    unlocks.  Conversions between PR and CW need special attention. */
4910
4911 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4912 {
4913         struct dlm_lkb *lkb, *safe;
4914         struct dlm_message *ms_stub;
4915         int wait_type, stub_unlock_result, stub_cancel_result;
4916         int dir_nodeid;
4917
4918         ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4919         if (!ms_stub) {
4920                 log_error(ls, "dlm_recover_waiters_pre no mem");
4921                 return;
4922         }
4923
4924         mutex_lock(&ls->ls_waiters_mutex);
4925
4926         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4927
4928                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4929
4930                 /* exclude debug messages about unlocks because there can be so
4931                    many and they aren't very interesting */
4932
4933                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4934                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4935                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4936                                   lkb->lkb_id,
4937                                   lkb->lkb_remid,
4938                                   lkb->lkb_wait_type,
4939                                   lkb->lkb_resource->res_nodeid,
4940                                   lkb->lkb_nodeid,
4941                                   lkb->lkb_wait_nodeid,
4942                                   dir_nodeid);
4943                 }
4944
4945                 /* all outstanding lookups, regardless of destination  will be
4946                    resent after recovery is done */
4947
4948                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4949                         lkb->lkb_flags |= DLM_IFL_RESEND;
4950                         continue;
4951                 }
4952
4953                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4954                         continue;
4955
4956                 wait_type = lkb->lkb_wait_type;
4957                 stub_unlock_result = -DLM_EUNLOCK;
4958                 stub_cancel_result = -DLM_ECANCEL;
4959
4960                 /* Main reply may have been received leaving a zero wait_type,
4961                    but a reply for the overlapping op may not have been
4962                    received.  In that case we need to fake the appropriate
4963                    reply for the overlap op. */
4964
4965                 if (!wait_type) {
4966                         if (is_overlap_cancel(lkb)) {
4967                                 wait_type = DLM_MSG_CANCEL;
4968                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4969                                         stub_cancel_result = 0;
4970                         }
4971                         if (is_overlap_unlock(lkb)) {
4972                                 wait_type = DLM_MSG_UNLOCK;
4973                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4974                                         stub_unlock_result = -ENOENT;
4975                         }
4976
4977                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4978                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4979                                   stub_cancel_result, stub_unlock_result);
4980                 }
4981
4982                 switch (wait_type) {
4983
4984                 case DLM_MSG_REQUEST:
4985                         lkb->lkb_flags |= DLM_IFL_RESEND;
4986                         break;
4987
4988                 case DLM_MSG_CONVERT:
4989                         recover_convert_waiter(ls, lkb, ms_stub);
4990                         break;
4991
4992                 case DLM_MSG_UNLOCK:
4993                         hold_lkb(lkb);
4994                         memset(ms_stub, 0, sizeof(struct dlm_message));
4995                         ms_stub->m_flags = DLM_IFL_STUB_MS;
4996                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4997                         ms_stub->m_result = stub_unlock_result;
4998                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4999                         _receive_unlock_reply(lkb, ms_stub);
5000                         dlm_put_lkb(lkb);
5001                         break;
5002
5003                 case DLM_MSG_CANCEL:
5004                         hold_lkb(lkb);
5005                         memset(ms_stub, 0, sizeof(struct dlm_message));
5006                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5007                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5008                         ms_stub->m_result = stub_cancel_result;
5009                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5010                         _receive_cancel_reply(lkb, ms_stub);
5011                         dlm_put_lkb(lkb);
5012                         break;
5013
5014                 default:
5015                         log_error(ls, "invalid lkb wait_type %d %d",
5016                                   lkb->lkb_wait_type, wait_type);
5017                 }
5018                 schedule();
5019         }
5020         mutex_unlock(&ls->ls_waiters_mutex);
5021         kfree(ms_stub);
5022 }
5023
5024 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5025 {
5026         struct dlm_lkb *lkb;
5027         int found = 0;
5028
5029         mutex_lock(&ls->ls_waiters_mutex);
5030         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
5031                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
5032                         hold_lkb(lkb);
5033                         found = 1;
5034                         break;
5035                 }
5036         }
5037         mutex_unlock(&ls->ls_waiters_mutex);
5038
5039         if (!found)
5040                 lkb = NULL;
5041         return lkb;
5042 }
5043
5044 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5045    master or dir-node for r.  Processing the lkb may result in it being placed
5046    back on waiters. */
5047
5048 /* We do this after normal locking has been enabled and any saved messages
5049    (in requestqueue) have been processed.  We should be confident that at
5050    this point we won't get or process a reply to any of these waiting
5051    operations.  But, new ops may be coming in on the rsbs/locks here from
5052    userspace or remotely. */
5053
5054 /* there may have been an overlap unlock/cancel prior to recovery or after
5055    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5056    overlap flag would just have been set and nothing new sent.  we can be
5057    confident here than any replies to either the initial op or overlap ops
5058    prior to recovery have been received. */
5059
5060 int dlm_recover_waiters_post(struct dlm_ls *ls)
5061 {
5062         struct dlm_lkb *lkb;
5063         struct dlm_rsb *r;
5064         int error = 0, mstype, err, oc, ou;
5065
5066         while (1) {
5067                 if (dlm_locking_stopped(ls)) {
5068                         log_debug(ls, "recover_waiters_post aborted");
5069                         error = -EINTR;
5070                         break;
5071                 }
5072
5073                 lkb = find_resend_waiter(ls);
5074                 if (!lkb)
5075                         break;
5076
5077                 r = lkb->lkb_resource;
5078                 hold_rsb(r);
5079                 lock_rsb(r);
5080
5081                 mstype = lkb->lkb_wait_type;
5082                 oc = is_overlap_cancel(lkb);
5083                 ou = is_overlap_unlock(lkb);
5084                 err = 0;
5085
5086                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5087                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5088                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5089                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5090                           dlm_dir_nodeid(r), oc, ou);
5091
5092                 /* At this point we assume that we won't get a reply to any
5093                    previous op or overlap op on this lock.  First, do a big
5094                    remove_from_waiters() for all previous ops. */
5095
5096                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5097                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5098                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5099                 lkb->lkb_wait_type = 0;
5100                 lkb->lkb_wait_count = 0;
5101                 mutex_lock(&ls->ls_waiters_mutex);
5102                 list_del_init(&lkb->lkb_wait_reply);
5103                 mutex_unlock(&ls->ls_waiters_mutex);
5104                 unhold_lkb(lkb); /* for waiters list */
5105
5106                 if (oc || ou) {
5107                         /* do an unlock or cancel instead of resending */
5108                         switch (mstype) {
5109                         case DLM_MSG_LOOKUP:
5110                         case DLM_MSG_REQUEST:
5111                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5112                                                         -DLM_ECANCEL);
5113                                 unhold_lkb(lkb); /* undoes create_lkb() */
5114                                 break;
5115                         case DLM_MSG_CONVERT:
5116                                 if (oc) {
5117                                         queue_cast(r, lkb, -DLM_ECANCEL);
5118                                 } else {
5119                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5120                                         _unlock_lock(r, lkb);
5121                                 }
5122                                 break;
5123                         default:
5124                                 err = 1;
5125                         }
5126                 } else {
5127                         switch (mstype) {
5128                         case DLM_MSG_LOOKUP:
5129                         case DLM_MSG_REQUEST:
5130                                 _request_lock(r, lkb);
5131                                 if (is_master(r))
5132                                         confirm_master(r, 0);
5133                                 break;
5134                         case DLM_MSG_CONVERT:
5135                                 _convert_lock(r, lkb);
5136                                 break;
5137                         default:
5138                                 err = 1;
5139                         }
5140                 }
5141
5142                 if (err) {
5143                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5144                                   "dir_nodeid %d overlap %d %d",
5145                                   lkb->lkb_id, mstype, r->res_nodeid,
5146                                   dlm_dir_nodeid(r), oc, ou);
5147                 }
5148                 unlock_rsb(r);
5149                 put_rsb(r);
5150                 dlm_put_lkb(lkb);
5151         }
5152
5153         return error;
5154 }
5155
5156 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5157                               struct list_head *list)
5158 {
5159         struct dlm_lkb *lkb, *safe;
5160
5161         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5162                 if (!is_master_copy(lkb))
5163                         continue;
5164
5165                 /* don't purge lkbs we've added in recover_master_copy for
5166                    the current recovery seq */
5167
5168                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5169                         continue;
5170
5171                 del_lkb(r, lkb);
5172
5173                 /* this put should free the lkb */
5174                 if (!dlm_put_lkb(lkb))
5175                         log_error(ls, "purged mstcpy lkb not released");
5176         }
5177 }
5178
5179 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5180 {
5181         struct dlm_ls *ls = r->res_ls;
5182
5183         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5184         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5185         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5186 }
5187
5188 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5189                             struct list_head *list,
5190                             int nodeid_gone, unsigned int *count)
5191 {
5192         struct dlm_lkb *lkb, *safe;
5193
5194         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5195                 if (!is_master_copy(lkb))
5196                         continue;
5197
5198                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5199                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5200
5201                         del_lkb(r, lkb);
5202
5203                         /* this put should free the lkb */
5204                         if (!dlm_put_lkb(lkb))
5205                                 log_error(ls, "purged dead lkb not released");
5206
5207                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5208
5209                         (*count)++;
5210                 }
5211         }
5212 }
5213
5214 /* Get rid of locks held by nodes that are gone. */
5215
5216 void dlm_recover_purge(struct dlm_ls *ls)
5217 {
5218         struct dlm_rsb *r;
5219         struct dlm_member *memb;
5220         int nodes_count = 0;
5221         int nodeid_gone = 0;
5222         unsigned int lkb_count = 0;
5223
5224         /* cache one removed nodeid to optimize the common
5225            case of a single node removed */
5226
5227         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5228                 nodes_count++;
5229                 nodeid_gone = memb->nodeid;
5230         }
5231
5232         if (!nodes_count)
5233                 return;
5234
5235         down_write(&ls->ls_root_sem);
5236         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5237                 hold_rsb(r);
5238                 lock_rsb(r);
5239                 if (is_master(r)) {
5240                         purge_dead_list(ls, r, &r->res_grantqueue,
5241                                         nodeid_gone, &lkb_count);
5242                         purge_dead_list(ls, r, &r->res_convertqueue,
5243                                         nodeid_gone, &lkb_count);
5244                         purge_dead_list(ls, r, &r->res_waitqueue,
5245                                         nodeid_gone, &lkb_count);
5246                 }
5247                 unlock_rsb(r);
5248                 unhold_rsb(r);
5249                 cond_resched();
5250         }
5251         up_write(&ls->ls_root_sem);
5252
5253         if (lkb_count)
5254                 log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
5255                           lkb_count, nodes_count);
5256 }
5257
5258 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5259 {
5260         struct rb_node *n;
5261         struct dlm_rsb *r;
5262
5263         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5264         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5265                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5266
5267                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5268                         continue;
5269                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5270                 if (!is_master(r))
5271                         continue;
5272                 hold_rsb(r);
5273                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5274                 return r;
5275         }
5276         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5277         return NULL;
5278 }
5279
5280 /*
5281  * Attempt to grant locks on resources that we are the master of.
5282  * Locks may have become grantable during recovery because locks
5283  * from departed nodes have been purged (or not rebuilt), allowing
5284  * previously blocked locks to now be granted.  The subset of rsb's
5285  * we are interested in are those with lkb's on either the convert or
5286  * waiting queues.
5287  *
5288  * Simplest would be to go through each master rsb and check for non-empty
5289  * convert or waiting queues, and attempt to grant on those rsbs.
5290  * Checking the queues requires lock_rsb, though, for which we'd need
5291  * to release the rsbtbl lock.  This would make iterating through all
5292  * rsb's very inefficient.  So, we rely on earlier recovery routines
5293  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5294  * locks for.
5295  */
5296
5297 void dlm_recover_grant(struct dlm_ls *ls)
5298 {
5299         struct dlm_rsb *r;
5300         int bucket = 0;
5301         unsigned int count = 0;
5302         unsigned int rsb_count = 0;
5303         unsigned int lkb_count = 0;
5304
5305         while (1) {
5306                 r = find_grant_rsb(ls, bucket);
5307                 if (!r) {
5308                         if (bucket == ls->ls_rsbtbl_size - 1)
5309                                 break;
5310                         bucket++;
5311                         continue;
5312                 }
5313                 rsb_count++;
5314                 count = 0;
5315                 lock_rsb(r);
5316                 grant_pending_locks(r, &count);
5317                 lkb_count += count;
5318                 confirm_master(r, 0);
5319                 unlock_rsb(r);
5320                 put_rsb(r);
5321                 cond_resched();
5322         }
5323
5324         if (lkb_count)
5325                 log_debug(ls, "dlm_recover_grant %u locks on %u resources",
5326                           lkb_count, rsb_count);
5327 }
5328
5329 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5330                                          uint32_t remid)
5331 {
5332         struct dlm_lkb *lkb;
5333
5334         list_for_each_entry(lkb, head, lkb_statequeue) {
5335                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5336                         return lkb;
5337         }
5338         return NULL;
5339 }
5340
5341 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5342                                     uint32_t remid)
5343 {
5344         struct dlm_lkb *lkb;
5345
5346         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5347         if (lkb)
5348                 return lkb;
5349         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5350         if (lkb)
5351                 return lkb;
5352         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5353         if (lkb)
5354                 return lkb;
5355         return NULL;
5356 }
5357
5358 /* needs at least dlm_rcom + rcom_lock */
5359 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5360                                   struct dlm_rsb *r, struct dlm_rcom *rc)
5361 {
5362         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5363
5364         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5365         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5366         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5367         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5368         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5369         lkb->lkb_flags |= DLM_IFL_MSTCPY;
5370         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5371         lkb->lkb_rqmode = rl->rl_rqmode;
5372         lkb->lkb_grmode = rl->rl_grmode;
5373         /* don't set lkb_status because add_lkb wants to itself */
5374
5375         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5376         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5377
5378         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5379                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5380                          sizeof(struct rcom_lock);
5381                 if (lvblen > ls->ls_lvblen)
5382                         return -EINVAL;
5383                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5384                 if (!lkb->lkb_lvbptr)
5385                         return -ENOMEM;
5386                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5387         }
5388
5389         /* Conversions between PR and CW (middle modes) need special handling.
5390            The real granted mode of these converting locks cannot be determined
5391            until all locks have been rebuilt on the rsb (recover_conversion) */
5392
5393         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5394             middle_conversion(lkb)) {
5395                 rl->rl_status = DLM_LKSTS_CONVERT;
5396                 lkb->lkb_grmode = DLM_LOCK_IV;
5397                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5398         }
5399
5400         return 0;
5401 }
5402
5403 /* This lkb may have been recovered in a previous aborted recovery so we need
5404    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5405    If so we just send back a standard reply.  If not, we create a new lkb with
5406    the given values and send back our lkid.  We send back our lkid by sending
5407    back the rcom_lock struct we got but with the remid field filled in. */
5408
5409 /* needs at least dlm_rcom + rcom_lock */
5410 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5411 {
5412         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5413         struct dlm_rsb *r;
5414         struct dlm_lkb *lkb;
5415         uint32_t remid = 0;
5416         int from_nodeid = rc->rc_header.h_nodeid;
5417         int error;
5418
5419         if (rl->rl_parent_lkid) {
5420                 error = -EOPNOTSUPP;
5421                 goto out;
5422         }
5423
5424         remid = le32_to_cpu(rl->rl_lkid);
5425
5426         /* In general we expect the rsb returned to be R_MASTER, but we don't
5427            have to require it.  Recovery of masters on one node can overlap
5428            recovery of locks on another node, so one node can send us MSTCPY
5429            locks before we've made ourselves master of this rsb.  We can still
5430            add new MSTCPY locks that we receive here without any harm; when
5431            we make ourselves master, dlm_recover_masters() won't touch the
5432            MSTCPY locks we've received early. */
5433
5434         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5435                          from_nodeid, R_RECEIVE_RECOVER, &r);
5436         if (error)
5437                 goto out;
5438
5439         lock_rsb(r);
5440
5441         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5442                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5443                           from_nodeid, remid);
5444                 error = -EBADR;
5445                 goto out_unlock;
5446         }
5447
5448         lkb = search_remid(r, from_nodeid, remid);
5449         if (lkb) {
5450                 error = -EEXIST;
5451                 goto out_remid;
5452         }
5453
5454         error = create_lkb(ls, &lkb);
5455         if (error)
5456                 goto out_unlock;
5457
5458         error = receive_rcom_lock_args(ls, lkb, r, rc);
5459         if (error) {
5460                 __put_lkb(ls, lkb);
5461                 goto out_unlock;
5462         }
5463
5464         attach_lkb(r, lkb);
5465         add_lkb(r, lkb, rl->rl_status);
5466         error = 0;
5467         ls->ls_recover_locks_in++;
5468
5469         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5470                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5471
5472  out_remid:
5473         /* this is the new value returned to the lock holder for
5474            saving in its process-copy lkb */
5475         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5476
5477         lkb->lkb_recover_seq = ls->ls_recover_seq;
5478
5479  out_unlock:
5480         unlock_rsb(r);
5481         put_rsb(r);
5482  out:
5483         if (error && error != -EEXIST)
5484                 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
5485                           from_nodeid, remid, error);
5486         rl->rl_result = cpu_to_le32(error);
5487         return error;
5488 }
5489
5490 /* needs at least dlm_rcom + rcom_lock */
5491 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5492 {
5493         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5494         struct dlm_rsb *r;
5495         struct dlm_lkb *lkb;
5496         uint32_t lkid, remid;
5497         int error, result;
5498
5499         lkid = le32_to_cpu(rl->rl_lkid);
5500         remid = le32_to_cpu(rl->rl_remid);
5501         result = le32_to_cpu(rl->rl_result);
5502
5503         error = find_lkb(ls, lkid, &lkb);
5504         if (error) {
5505                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5506                           lkid, rc->rc_header.h_nodeid, remid, result);
5507                 return error;
5508         }
5509
5510         r = lkb->lkb_resource;
5511         hold_rsb(r);
5512         lock_rsb(r);
5513
5514         if (!is_process_copy(lkb)) {
5515                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5516                           lkid, rc->rc_header.h_nodeid, remid, result);
5517                 dlm_dump_rsb(r);
5518                 unlock_rsb(r);
5519                 put_rsb(r);
5520                 dlm_put_lkb(lkb);
5521                 return -EINVAL;
5522         }
5523
5524         switch (result) {
5525         case -EBADR:
5526                 /* There's a chance the new master received our lock before
5527                    dlm_recover_master_reply(), this wouldn't happen if we did
5528                    a barrier between recover_masters and recover_locks. */
5529
5530                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5531                           lkid, rc->rc_header.h_nodeid, remid, result);
5532         
5533                 dlm_send_rcom_lock(r, lkb);
5534                 goto out;
5535         case -EEXIST:
5536         case 0:
5537                 lkb->lkb_remid = remid;
5538                 break;
5539         default:
5540                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5541                           lkid, rc->rc_header.h_nodeid, remid, result);
5542         }
5543
5544         /* an ack for dlm_recover_locks() which waits for replies from
5545            all the locks it sends to new masters */
5546         dlm_recovered_lock(r);
5547  out:
5548         unlock_rsb(r);
5549         put_rsb(r);
5550         dlm_put_lkb(lkb);
5551
5552         return 0;
5553 }
5554
5555 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5556                      int mode, uint32_t flags, void *name, unsigned int namelen,
5557                      unsigned long timeout_cs)
5558 {
5559         struct dlm_lkb *lkb;
5560         struct dlm_args args;
5561         int error;
5562
5563         dlm_lock_recovery(ls);
5564
5565         error = create_lkb(ls, &lkb);
5566         if (error) {
5567                 kfree(ua);
5568                 goto out;
5569         }
5570
5571         if (flags & DLM_LKF_VALBLK) {
5572                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5573                 if (!ua->lksb.sb_lvbptr) {
5574                         kfree(ua);
5575                         __put_lkb(ls, lkb);
5576                         error = -ENOMEM;
5577                         goto out;
5578                 }
5579         }
5580
5581         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5582            When DLM_IFL_USER is set, the dlm knows that this is a userspace
5583            lock and that lkb_astparam is the dlm_user_args structure. */
5584
5585         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5586                               fake_astfn, ua, fake_bastfn, &args);
5587         lkb->lkb_flags |= DLM_IFL_USER;
5588
5589         if (error) {
5590                 __put_lkb(ls, lkb);
5591                 goto out;
5592         }
5593
5594         error = request_lock(ls, lkb, name, namelen, &args);
5595
5596         switch (error) {
5597         case 0:
5598                 break;
5599         case -EINPROGRESS:
5600                 error = 0;
5601                 break;
5602         case -EAGAIN:
5603                 error = 0;
5604                 /* fall through */
5605         default:
5606                 __put_lkb(ls, lkb);
5607                 goto out;
5608         }
5609
5610         /* add this new lkb to the per-process list of locks */
5611         spin_lock(&ua->proc->locks_spin);
5612         hold_lkb(lkb);
5613         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5614         spin_unlock(&ua->proc->locks_spin);
5615  out:
5616         dlm_unlock_recovery(ls);
5617         return error;
5618 }
5619
5620 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5621                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5622                      unsigned long timeout_cs)
5623 {
5624         struct dlm_lkb *lkb;
5625         struct dlm_args args;
5626         struct dlm_user_args *ua;
5627         int error;
5628
5629         dlm_lock_recovery(ls);
5630
5631         error = find_lkb(ls, lkid, &lkb);
5632         if (error)
5633                 goto out;
5634
5635         /* user can change the params on its lock when it converts it, or
5636            add an lvb that didn't exist before */
5637
5638         ua = lkb->lkb_ua;
5639
5640         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5641                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5642                 if (!ua->lksb.sb_lvbptr) {
5643                         error = -ENOMEM;
5644                         goto out_put;
5645                 }
5646         }
5647         if (lvb_in && ua->lksb.sb_lvbptr)
5648                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5649
5650         ua->xid = ua_tmp->xid;
5651         ua->castparam = ua_tmp->castparam;
5652         ua->castaddr = ua_tmp->castaddr;
5653         ua->bastparam = ua_tmp->bastparam;
5654         ua->bastaddr = ua_tmp->bastaddr;
5655         ua->user_lksb = ua_tmp->user_lksb;
5656
5657         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5658                               fake_astfn, ua, fake_bastfn, &args);
5659         if (error)
5660                 goto out_put;
5661
5662         error = convert_lock(ls, lkb, &args);
5663
5664         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5665                 error = 0;
5666  out_put:
5667         dlm_put_lkb(lkb);
5668  out:
5669         dlm_unlock_recovery(ls);
5670         kfree(ua_tmp);
5671         return error;
5672 }
5673
5674 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5675                     uint32_t flags, uint32_t lkid, char *lvb_in)
5676 {
5677         struct dlm_lkb *lkb;
5678         struct dlm_args args;
5679         struct dlm_user_args *ua;
5680         int error;
5681
5682         dlm_lock_recovery(ls);
5683
5684         error = find_lkb(ls, lkid, &lkb);
5685         if (error)
5686                 goto out;
5687
5688         ua = lkb->lkb_ua;
5689
5690         if (lvb_in && ua->lksb.sb_lvbptr)
5691                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5692         if (ua_tmp->castparam)
5693                 ua->castparam = ua_tmp->castparam;
5694         ua->user_lksb = ua_tmp->user_lksb;
5695
5696         error = set_unlock_args(flags, ua, &args);
5697         if (error)
5698                 goto out_put;
5699
5700         error = unlock_lock(ls, lkb, &args);
5701
5702         if (error == -DLM_EUNLOCK)
5703                 error = 0;
5704         /* from validate_unlock_args() */
5705         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5706                 error = 0;
5707         if (error)
5708                 goto out_put;
5709
5710         spin_lock(&ua->proc->locks_spin);
5711         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5712         if (!list_empty(&lkb->lkb_ownqueue))
5713                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5714         spin_unlock(&ua->proc->locks_spin);
5715  out_put:
5716         dlm_put_lkb(lkb);
5717  out:
5718         dlm_unlock_recovery(ls);
5719         kfree(ua_tmp);
5720         return error;
5721 }
5722
5723 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5724                     uint32_t flags, uint32_t lkid)
5725 {
5726         struct dlm_lkb *lkb;
5727         struct dlm_args args;
5728         struct dlm_user_args *ua;
5729         int error;
5730
5731         dlm_lock_recovery(ls);
5732
5733         error = find_lkb(ls, lkid, &lkb);
5734         if (error)
5735                 goto out;
5736
5737         ua = lkb->lkb_ua;
5738         if (ua_tmp->castparam)
5739                 ua->castparam = ua_tmp->castparam;
5740         ua->user_lksb = ua_tmp->user_lksb;
5741
5742         error = set_unlock_args(flags, ua, &args);
5743         if (error)
5744                 goto out_put;
5745
5746         error = cancel_lock(ls, lkb, &args);
5747
5748         if (error == -DLM_ECANCEL)
5749                 error = 0;
5750         /* from validate_unlock_args() */
5751         if (error == -EBUSY)
5752                 error = 0;
5753  out_put:
5754         dlm_put_lkb(lkb);
5755  out:
5756         dlm_unlock_recovery(ls);
5757         kfree(ua_tmp);
5758         return error;
5759 }
5760
5761 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5762 {
5763         struct dlm_lkb *lkb;
5764         struct dlm_args args;
5765         struct dlm_user_args *ua;
5766         struct dlm_rsb *r;
5767         int error;
5768
5769         dlm_lock_recovery(ls);
5770
5771         error = find_lkb(ls, lkid, &lkb);
5772         if (error)
5773                 goto out;
5774
5775         ua = lkb->lkb_ua;
5776
5777         error = set_unlock_args(flags, ua, &args);
5778         if (error)
5779                 goto out_put;
5780
5781         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5782
5783         r = lkb->lkb_resource;
5784         hold_rsb(r);
5785         lock_rsb(r);
5786
5787         error = validate_unlock_args(lkb, &args);
5788         if (error)
5789                 goto out_r;
5790         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
5791
5792         error = _cancel_lock(r, lkb);
5793  out_r:
5794         unlock_rsb(r);
5795         put_rsb(r);
5796
5797         if (error == -DLM_ECANCEL)
5798                 error = 0;
5799         /* from validate_unlock_args() */
5800         if (error == -EBUSY)
5801                 error = 0;
5802  out_put:
5803         dlm_put_lkb(lkb);
5804  out:
5805         dlm_unlock_recovery(ls);
5806         return error;
5807 }
5808
5809 /* lkb's that are removed from the waiters list by revert are just left on the
5810    orphans list with the granted orphan locks, to be freed by purge */
5811
5812 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5813 {
5814         struct dlm_args args;
5815         int error;
5816
5817         hold_lkb(lkb);
5818         mutex_lock(&ls->ls_orphans_mutex);
5819         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5820         mutex_unlock(&ls->ls_orphans_mutex);
5821
5822         set_unlock_args(0, lkb->lkb_ua, &args);
5823
5824         error = cancel_lock(ls, lkb, &args);
5825         if (error == -DLM_ECANCEL)
5826                 error = 0;
5827         return error;
5828 }
5829
5830 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5831    Regardless of what rsb queue the lock is on, it's removed and freed. */
5832
5833 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5834 {
5835         struct dlm_args args;
5836         int error;
5837
5838         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
5839
5840         error = unlock_lock(ls, lkb, &args);
5841         if (error == -DLM_EUNLOCK)
5842                 error = 0;
5843         return error;
5844 }
5845
5846 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5847    (which does lock_rsb) due to deadlock with receiving a message that does
5848    lock_rsb followed by dlm_user_add_cb() */
5849
5850 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5851                                      struct dlm_user_proc *proc)
5852 {
5853         struct dlm_lkb *lkb = NULL;
5854
5855         mutex_lock(&ls->ls_clear_proc_locks);
5856         if (list_empty(&proc->locks))
5857                 goto out;
5858
5859         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5860         list_del_init(&lkb->lkb_ownqueue);
5861
5862         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5863                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
5864         else
5865                 lkb->lkb_flags |= DLM_IFL_DEAD;
5866  out:
5867         mutex_unlock(&ls->ls_clear_proc_locks);
5868         return lkb;
5869 }
5870
5871 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5872    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5873    which we clear here. */
5874
5875 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5876    list, and no more device_writes should add lkb's to proc->locks list; so we
5877    shouldn't need to take asts_spin or locks_spin here.  this assumes that
5878    device reads/writes/closes are serialized -- FIXME: we may need to serialize
5879    them ourself. */
5880
5881 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5882 {
5883         struct dlm_lkb *lkb, *safe;
5884
5885         dlm_lock_recovery(ls);
5886
5887         while (1) {
5888                 lkb = del_proc_lock(ls, proc);
5889                 if (!lkb)
5890                         break;
5891                 del_timeout(lkb);
5892                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5893                         orphan_proc_lock(ls, lkb);
5894                 else
5895                         unlock_proc_lock(ls, lkb);
5896
5897                 /* this removes the reference for the proc->locks list
5898                    added by dlm_user_request, it may result in the lkb
5899                    being freed */
5900
5901                 dlm_put_lkb(lkb);
5902         }
5903
5904         mutex_lock(&ls->ls_clear_proc_locks);
5905
5906         /* in-progress unlocks */
5907         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5908                 list_del_init(&lkb->lkb_ownqueue);
5909                 lkb->lkb_flags |= DLM_IFL_DEAD;
5910                 dlm_put_lkb(lkb);
5911         }
5912
5913         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5914                 memset(&lkb->lkb_callbacks, 0,
5915                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5916                 list_del_init(&lkb->lkb_cb_list);
5917                 dlm_put_lkb(lkb);
5918         }
5919
5920         mutex_unlock(&ls->ls_clear_proc_locks);
5921         dlm_unlock_recovery(ls);
5922 }
5923
5924 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5925 {
5926         struct dlm_lkb *lkb, *safe;
5927
5928         while (1) {
5929                 lkb = NULL;
5930                 spin_lock(&proc->locks_spin);
5931                 if (!list_empty(&proc->locks)) {
5932                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
5933                                          lkb_ownqueue);
5934                         list_del_init(&lkb->lkb_ownqueue);
5935                 }
5936                 spin_unlock(&proc->locks_spin);
5937
5938                 if (!lkb)
5939                         break;
5940
5941                 lkb->lkb_flags |= DLM_IFL_DEAD;
5942                 unlock_proc_lock(ls, lkb);
5943                 dlm_put_lkb(lkb); /* ref from proc->locks list */
5944         }
5945
5946         spin_lock(&proc->locks_spin);
5947         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5948                 list_del_init(&lkb->lkb_ownqueue);
5949                 lkb->lkb_flags |= DLM_IFL_DEAD;
5950                 dlm_put_lkb(lkb);
5951         }
5952         spin_unlock(&proc->locks_spin);
5953
5954         spin_lock(&proc->asts_spin);
5955         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5956                 memset(&lkb->lkb_callbacks, 0,
5957                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5958                 list_del_init(&lkb->lkb_cb_list);
5959                 dlm_put_lkb(lkb);
5960         }
5961         spin_unlock(&proc->asts_spin);
5962 }
5963
5964 /* pid of 0 means purge all orphans */
5965
5966 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5967 {
5968         struct dlm_lkb *lkb, *safe;
5969
5970         mutex_lock(&ls->ls_orphans_mutex);
5971         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5972                 if (pid && lkb->lkb_ownpid != pid)
5973                         continue;
5974                 unlock_proc_lock(ls, lkb);
5975                 list_del_init(&lkb->lkb_ownqueue);
5976                 dlm_put_lkb(lkb);
5977         }
5978         mutex_unlock(&ls->ls_orphans_mutex);
5979 }
5980
5981 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5982 {
5983         struct dlm_message *ms;
5984         struct dlm_mhandle *mh;
5985         int error;
5986
5987         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5988                                 DLM_MSG_PURGE, &ms, &mh);
5989         if (error)
5990                 return error;
5991         ms->m_nodeid = nodeid;
5992         ms->m_pid = pid;
5993
5994         return send_message(mh, ms);
5995 }
5996
5997 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5998                    int nodeid, int pid)
5999 {
6000         int error = 0;
6001
6002         if (nodeid != dlm_our_nodeid()) {
6003                 error = send_purge(ls, nodeid, pid);
6004         } else {
6005                 dlm_lock_recovery(ls);
6006                 if (pid == current->pid)
6007                         purge_proc_locks(ls, proc);
6008                 else
6009                         do_purge(ls, nodeid, pid);
6010                 dlm_unlock_recovery(ls);
6011         }
6012         return error;
6013 }
6014