4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28 * Developed under the sponsorship of the US Government under
29 * Subcontract No. B514193
31 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32 * Use is subject to license terms.
34 * Copyright (c) 2010, 2012, Intel Corporation.
37 * This file is part of Lustre, http://www.lustre.org/
38 * Lustre is a trademark of Sun Microsystems, Inc.
42 * This file implements POSIX lock type for Lustre.
43 * Its policy properties are start and end of extent and PID.
45 * These locks are only done through MDS due to POSIX semantics requiring
46 * e.g. that locks could be only partially released and as such split into
47 * two parts, and also that two adjacent locks from the same process may be
48 * merged into a single wider lock.
50 * Lock modes are mapped like this:
51 * PR and PW for READ and WRITE locks
52 * NL to request a releasing of a portion of the lock
54 * These flock locks never timeout.
57 #define DEBUG_SUBSYSTEM S_LDLM
59 #include "../include/lustre_dlm.h"
60 #include "../include/obd_support.h"
61 #include "../include/obd_class.h"
62 #include "../include/lustre_lib.h"
63 #include <linux/list.h>
64 #include "ldlm_internal.h"
67 * list_for_remaining_safe - iterate over the remaining entries in a list
68 * and safeguard against removal of a list entry.
69 * \param pos the &struct list_head to use as a loop counter. pos MUST
70 * have been initialized prior to using it in this macro.
71 * \param n another &struct list_head to use as temporary storage
72 * \param head the head for your list.
74 #define list_for_remaining_safe(pos, n, head) \
75 for (n = pos->next; pos != (head); pos = n, n = pos->next)
78 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
80 return((new->l_policy_data.l_flock.owner ==
81 lock->l_policy_data.l_flock.owner) &&
82 (new->l_export == lock->l_export));
86 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
88 return((new->l_policy_data.l_flock.start <=
89 lock->l_policy_data.l_flock.end) &&
90 (new->l_policy_data.l_flock.end >=
91 lock->l_policy_data.l_flock.start));
95 ldlm_flock_destroy(struct ldlm_lock *lock, enum ldlm_mode mode, __u64 flags)
97 LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
100 /* Safe to not lock here, since it should be empty anyway */
101 LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
103 list_del_init(&lock->l_res_link);
104 if (flags == LDLM_FL_WAIT_NOREPROC && !ldlm_is_failed(lock)) {
105 /* client side - set a flag to prevent sending a CANCEL */
106 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
108 /* when reaching here, it is under lock_res_and_lock(). Thus,
109 * need call the nolock version of ldlm_lock_decref_internal
111 ldlm_lock_decref_internal_nolock(lock, mode);
114 ldlm_lock_destroy_nolock(lock);
118 * Process a granting attempt for flock lock.
119 * Must be called under ns lock held.
121 * This function looks for any conflicts for \a lock in the granted or
122 * waiting queues. The lock is granted if no conflicts are found in
125 * It is also responsible for splitting a lock if a portion of the lock
128 * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
129 * - blocking ASTs have already been sent
131 * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
132 * - blocking ASTs have not been sent yet, so list of conflicting locks
133 * would be collected and ASTs sent.
135 static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
136 int first_enq, enum ldlm_error *err,
137 struct list_head *work_list)
139 struct ldlm_resource *res = req->l_resource;
140 struct ldlm_namespace *ns = ldlm_res_to_ns(res);
141 struct list_head *tmp;
142 struct list_head *ownlocks = NULL;
143 struct ldlm_lock *lock = NULL;
144 struct ldlm_lock *new = req;
145 struct ldlm_lock *new2 = NULL;
146 enum ldlm_mode mode = req->l_req_mode;
147 int added = (mode == LCK_NL);
150 const struct ldlm_callback_suite null_cbs = { NULL };
153 "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
154 *flags, new->l_policy_data.l_flock.owner,
155 new->l_policy_data.l_flock.pid, mode,
156 req->l_policy_data.l_flock.start,
157 req->l_policy_data.l_flock.end);
161 /* No blocking ASTs are sent to the clients for
162 * Posix file & record locks
164 req->l_blocking_ast = NULL;
167 if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
168 /* This loop determines where this processes locks start
169 * in the resource lr_granted list.
171 list_for_each(tmp, &res->lr_granted) {
172 lock = list_entry(tmp, struct ldlm_lock,
174 if (ldlm_same_flock_owner(lock, req)) {
180 int reprocess_failed = 0;
182 lockmode_verify(mode);
184 /* This loop determines if there are existing locks
185 * that conflict with the new lock request.
187 list_for_each(tmp, &res->lr_granted) {
188 lock = list_entry(tmp, struct ldlm_lock,
191 if (ldlm_same_flock_owner(lock, req)) {
197 /* locks are compatible, overlap doesn't matter */
198 if (lockmode_compat(lock->l_granted_mode, mode))
201 if (!ldlm_flocks_overlap(lock, req))
205 reprocess_failed = 1;
209 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
210 ldlm_flock_destroy(req, mode, *flags);
212 return LDLM_ITER_STOP;
215 if (*flags & LDLM_FL_TEST_LOCK) {
216 ldlm_flock_destroy(req, mode, *flags);
217 req->l_req_mode = lock->l_granted_mode;
218 req->l_policy_data.l_flock.pid =
219 lock->l_policy_data.l_flock.pid;
220 req->l_policy_data.l_flock.start =
221 lock->l_policy_data.l_flock.start;
222 req->l_policy_data.l_flock.end =
223 lock->l_policy_data.l_flock.end;
224 *flags |= LDLM_FL_LOCK_CHANGED;
225 return LDLM_ITER_STOP;
228 ldlm_resource_add_lock(res, &res->lr_waiting, req);
229 *flags |= LDLM_FL_BLOCK_GRANTED;
230 return LDLM_ITER_STOP;
232 if (reprocess_failed)
233 return LDLM_ITER_CONTINUE;
236 if (*flags & LDLM_FL_TEST_LOCK) {
237 ldlm_flock_destroy(req, mode, *flags);
238 req->l_req_mode = LCK_NL;
239 *flags |= LDLM_FL_LOCK_CHANGED;
240 return LDLM_ITER_STOP;
243 /* Scan the locks owned by this process that overlap this request.
244 * We may have to merge or split existing locks.
247 ownlocks = &res->lr_granted;
249 list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
250 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
252 if (!ldlm_same_flock_owner(lock, new))
255 if (lock->l_granted_mode == mode) {
256 /* If the modes are the same then we need to process
257 * locks that overlap OR adjoin the new lock. The extra
258 * logic condition is necessary to deal with arithmetic
259 * overflow and underflow.
261 if ((new->l_policy_data.l_flock.start >
262 (lock->l_policy_data.l_flock.end + 1))
263 && (lock->l_policy_data.l_flock.end !=
267 if ((new->l_policy_data.l_flock.end <
268 (lock->l_policy_data.l_flock.start - 1))
269 && (lock->l_policy_data.l_flock.start != 0))
272 if (new->l_policy_data.l_flock.start <
273 lock->l_policy_data.l_flock.start) {
274 lock->l_policy_data.l_flock.start =
275 new->l_policy_data.l_flock.start;
277 new->l_policy_data.l_flock.start =
278 lock->l_policy_data.l_flock.start;
281 if (new->l_policy_data.l_flock.end >
282 lock->l_policy_data.l_flock.end) {
283 lock->l_policy_data.l_flock.end =
284 new->l_policy_data.l_flock.end;
286 new->l_policy_data.l_flock.end =
287 lock->l_policy_data.l_flock.end;
291 ldlm_flock_destroy(lock, mode, *flags);
299 if (new->l_policy_data.l_flock.start >
300 lock->l_policy_data.l_flock.end)
303 if (new->l_policy_data.l_flock.end <
304 lock->l_policy_data.l_flock.start)
309 if (new->l_policy_data.l_flock.start <=
310 lock->l_policy_data.l_flock.start) {
311 if (new->l_policy_data.l_flock.end <
312 lock->l_policy_data.l_flock.end) {
313 lock->l_policy_data.l_flock.start =
314 new->l_policy_data.l_flock.end + 1;
317 ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
320 if (new->l_policy_data.l_flock.end >=
321 lock->l_policy_data.l_flock.end) {
322 lock->l_policy_data.l_flock.end =
323 new->l_policy_data.l_flock.start - 1;
327 /* split the existing lock into two locks */
329 /* if this is an F_UNLCK operation then we could avoid
330 * allocating a new lock and use the req lock passed in
331 * with the request but this would complicate the reply
332 * processing since updates to req get reflected in the
333 * reply. The client side replays the lock request so
334 * it must see the original lock data in the reply.
337 /* XXX - if ldlm_lock_new() can sleep we should
338 * release the lr_lock, allocate the new lock,
339 * and restart processing this lock.
342 unlock_res_and_lock(req);
343 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
344 lock->l_granted_mode, &null_cbs,
345 NULL, 0, LVB_T_NONE);
346 lock_res_and_lock(req);
348 ldlm_flock_destroy(req, lock->l_granted_mode,
351 return LDLM_ITER_STOP;
358 new2->l_granted_mode = lock->l_granted_mode;
359 new2->l_policy_data.l_flock.pid =
360 new->l_policy_data.l_flock.pid;
361 new2->l_policy_data.l_flock.owner =
362 new->l_policy_data.l_flock.owner;
363 new2->l_policy_data.l_flock.start =
364 lock->l_policy_data.l_flock.start;
365 new2->l_policy_data.l_flock.end =
366 new->l_policy_data.l_flock.start - 1;
367 lock->l_policy_data.l_flock.start =
368 new->l_policy_data.l_flock.end + 1;
369 new2->l_conn_export = lock->l_conn_export;
370 if (lock->l_export) {
371 new2->l_export = class_export_lock_get(lock->l_export,
373 if (new2->l_export->exp_lock_hash &&
374 hlist_unhashed(&new2->l_exp_hash))
375 cfs_hash_add(new2->l_export->exp_lock_hash,
376 &new2->l_remote_handle,
379 if (*flags == LDLM_FL_WAIT_NOREPROC)
380 ldlm_lock_addref_internal_nolock(new2,
381 lock->l_granted_mode);
383 /* insert new2 at lock */
384 ldlm_resource_add_lock(res, ownlocks, new2);
385 LDLM_LOCK_RELEASE(new2);
389 /* if new2 is created but never used, destroy it*/
390 if (splitted == 0 && new2)
391 ldlm_lock_destroy_nolock(new2);
393 /* At this point we're granting the lock request. */
394 req->l_granted_mode = req->l_req_mode;
397 list_del_init(&req->l_res_link);
398 /* insert new lock before ownlocks in list. */
399 ldlm_resource_add_lock(res, ownlocks, req);
402 if (*flags != LDLM_FL_WAIT_NOREPROC) {
403 /* The only one possible case for client-side calls flock
404 * policy function is ldlm_flock_completion_ast inside which
405 * carries LDLM_FL_WAIT_NOREPROC flag.
407 CERROR("Illegal parameter for client-side-only module.\n");
411 /* In case we're reprocessing the requested lock we can't destroy
412 * it until after calling ldlm_add_ast_work_item() above so that laawi()
413 * can bump the reference count on \a req. Otherwise \a req
414 * could be freed before the completion AST can be sent.
417 ldlm_flock_destroy(req, mode, *flags);
419 ldlm_resource_dump(D_INFO, res);
420 return LDLM_ITER_CONTINUE;
423 struct ldlm_flock_wait_data {
424 struct ldlm_lock *fwd_lock;
429 ldlm_flock_interrupted_wait(void *data)
431 struct ldlm_lock *lock;
433 lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
435 lock_res_and_lock(lock);
437 /* client side - set flag to prevent lock from being put on LRU list */
438 ldlm_set_cbpending(lock);
439 unlock_res_and_lock(lock);
443 * Flock completion callback function.
445 * \param lock [in,out]: A lock to be handled
446 * \param flags [in]: flags
447 * \param *data [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
449 * \retval 0 : success
450 * \retval <0 : failure
453 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
455 struct file_lock *getlk = lock->l_ast_data;
456 struct obd_device *obd;
457 struct obd_import *imp = NULL;
458 struct ldlm_flock_wait_data fwd;
459 struct l_wait_info lwi;
463 CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
466 /* Import invalidation. We need to actually release the lock
467 * references being held, so that it can go away. No point in
468 * holding the lock even if app still believes it has it, since
469 * server already dropped it anyway. Only for granted locks too.
471 if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
472 (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
473 if (lock->l_req_mode == lock->l_granted_mode &&
474 lock->l_granted_mode != LCK_NL && !data)
475 ldlm_lock_decref_internal(lock, lock->l_req_mode);
477 /* Need to wake up the waiter if we were evicted */
478 wake_up(&lock->l_waitq);
482 LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
484 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
485 LDLM_FL_BLOCK_CONV))) {
487 /* mds granted the lock in the reply */
489 /* CP AST RPC: lock get granted, wake it up */
490 wake_up(&lock->l_waitq);
494 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
496 obd = class_exp2obd(lock->l_conn_export);
498 /* if this is a local lock, there is no import */
500 imp = obd->u.cli.cl_import;
503 spin_lock(&imp->imp_lock);
504 fwd.fwd_generation = imp->imp_generation;
505 spin_unlock(&imp->imp_lock);
508 lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
510 /* Go to sleep until the lock is granted. */
511 rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
514 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
520 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
522 if (ldlm_is_failed(lock)) {
523 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
527 LDLM_DEBUG(lock, "client-side enqueue granted");
529 lock_res_and_lock(lock);
532 * Protect against race where lock could have been just destroyed
533 * due to overlap in ldlm_process_flock_lock().
535 if (ldlm_is_destroyed(lock)) {
536 unlock_res_and_lock(lock);
537 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
541 /* ldlm_lock_enqueue() has already placed lock on the granted list. */
542 list_del_init(&lock->l_res_link);
544 if (ldlm_is_flock_deadlock(lock)) {
545 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
547 } else if (flags & LDLM_FL_TEST_LOCK) {
548 /* fcntl(F_GETLK) request */
549 /* The old mode was saved in getlk->fl_type so that if the mode
550 * in the lock changes we can decref the appropriate refcount.
552 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
553 switch (lock->l_granted_mode) {
555 getlk->fl_type = F_RDLCK;
558 getlk->fl_type = F_WRLCK;
561 getlk->fl_type = F_UNLCK;
563 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
564 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
565 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
567 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
569 /* We need to reprocess the lock to do merges or splits
570 * with existing locks owned by this process.
572 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
574 unlock_res_and_lock(lock);
577 EXPORT_SYMBOL(ldlm_flock_completion_ast);
579 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
580 ldlm_policy_data_t *lpolicy)
582 memset(lpolicy, 0, sizeof(*lpolicy));
583 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
584 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
585 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
586 /* Compat code, old clients had no idea about owner field and
587 * relied solely on pid for ownership. Introduced in LU-104, 2.1,
590 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
593 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
594 ldlm_policy_data_t *lpolicy)
596 memset(lpolicy, 0, sizeof(*lpolicy));
597 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
598 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
599 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
600 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
603 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
604 ldlm_wire_policy_data_t *wpolicy)
606 memset(wpolicy, 0, sizeof(*wpolicy));
607 wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
608 wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
609 wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
610 wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;