dlm: fixes for nodir mode
authorDavid Teigland <teigland@redhat.com>
Thu, 26 Apr 2012 20:54:29 +0000 (15:54 -0500)
committerDavid Teigland <teigland@redhat.com>
Wed, 2 May 2012 19:15:27 +0000 (14:15 -0500)
The "nodir" mode (statically assign master nodes instead
of using the resource directory) has always been highly
experimental, and never seriously used.  This commit
fixes a number of problems, making nodir much more usable.

- Major change to recovery: recover all locks and restart
  all in-progress operations after recovery.  In some
  cases it's not possible to know which in-progess locks
  to recover, so recover all.  (Most require recovery
  in nodir mode anyway since rehashing changes most
  master nodes.)

- Change the way nodir mode is enabled, from a command
  line mount arg passed through gfs2, into a sysfs
  file managed by dlm_controld, consistent with the
  other config settings.

- Allow recovering MSTCPY locks on an rsb that has not
  yet been turned into a master copy.

- Ignore RCOM_LOCK and RCOM_LOCK_REPLY recovery messages
  from a previous, aborted recovery cycle.  Base this
  on the local recovery status not being in the state
  where any nodes should be sending LOCK messages for the
  current recovery cycle.

- Hold rsb lock around dlm_purge_mstcpy_locks() because it
  may run concurrently with dlm_recover_master_copy().

- Maintain highbast on process-copy lkb's (in addition to
  the master as is usual), because the lkb can switch
  back and forth between being a master and being a
  process copy as the master node changes in recovery.

- When recovering MSTCPY locks, flag rsb's that have
  non-empty convert or waiting queues for granting
  at the end of recovery.  (Rename flag from LOCKS_PURGED
  to RECOVER_GRANT and similar for the recovery function,
  because it's not only resources with purged locks
  that need grant a grant attempt.)

- Replace a couple of unnecessary assertion panics with
  error messages.

Signed-off-by: David Teigland <teigland@redhat.com>
13 files changed:
fs/dlm/ast.c
fs/dlm/dlm_internal.h
fs/dlm/lock.c
fs/dlm/lock.h
fs/dlm/lockspace.c
fs/dlm/rcom.c
fs/dlm/recover.c
fs/dlm/recoverd.c
fs/dlm/requestqueue.c
fs/gfs2/incore.h
fs/gfs2/lock_dlm.c
fs/gfs2/ops_fstype.c
include/linux/dlm.h

index 90e5997..63dc19c 100644 (file)
@@ -310,6 +310,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
        }
        mutex_unlock(&ls->ls_cb_mutex);
 
-       log_debug(ls, "dlm_callback_resume %d", count);
+       if (count)
+               log_debug(ls, "dlm_callback_resume %d", count);
 }
 
index 0e74832..bc342f7 100644 (file)
@@ -271,6 +271,8 @@ struct dlm_lkb {
        ktime_t                 lkb_last_cast_time;     /* for debugging */
        ktime_t                 lkb_last_bast_time;     /* for debugging */
 
+       uint64_t                lkb_recover_seq; /* from ls_recover_seq */
+
        char                    *lkb_lvbptr;
        struct dlm_lksb         *lkb_lksb;      /* caller's status block */
        void                    (*lkb_astfn) (void *astparam);
@@ -325,7 +327,7 @@ enum rsb_flags {
        RSB_NEW_MASTER,
        RSB_NEW_MASTER2,
        RSB_RECOVER_CONVERT,
-       RSB_LOCKS_PURGED,
+       RSB_RECOVER_GRANT,
 };
 
 static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
@@ -571,6 +573,7 @@ struct dlm_ls {
        struct mutex            ls_requestqueue_mutex;
        struct dlm_rcom         *ls_recover_buf;
        int                     ls_recover_nodeid; /* for debugging */
+       unsigned int            ls_recover_locks_in; /* for log info */
        uint64_t                ls_rcom_seq;
        spinlock_t              ls_rcom_spin;
        struct list_head        ls_recover_list;
@@ -597,6 +600,7 @@ struct dlm_ls {
 #define LSFL_UEVENT_WAIT       5
 #define LSFL_TIMEWARN          6
 #define LSFL_CB_DELAY          7
+#define LSFL_NODIR             8
 
 /* much of this is just saving user space pointers associated with the
    lock that we pass back to the user lib with an ast */
@@ -644,7 +648,7 @@ static inline int dlm_recovery_stopped(struct dlm_ls *ls)
 
 static inline int dlm_no_directory(struct dlm_ls *ls)
 {
-       return (ls->ls_exflags & DLM_LSFL_NODIR) ? 1 : 0;
+       return test_bit(LSFL_NODIR, &ls->ls_flags);
 }
 
 int dlm_netlink_init(void);
index f3ba703..bdafb65 100644 (file)
@@ -161,10 +161,11 @@ static const int __quecvt_compat_matrix[8][8] = {
 void dlm_print_lkb(struct dlm_lkb *lkb)
 {
        printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
-              "sts %d rq %d gr %d wait_type %d wait_nodeid %d\n",
+              "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
               lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
               lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
-              lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
+              lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
+              (unsigned long long)lkb->lkb_recover_seq);
 }
 
 static void dlm_print_rsb(struct dlm_rsb *r)
@@ -251,8 +252,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb)
 
 static inline int is_master_copy(struct dlm_lkb *lkb)
 {
-       if (lkb->lkb_flags & DLM_IFL_MSTCPY)
-               DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
        return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
 }
 
@@ -1519,13 +1518,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
        }
 
        lkb->lkb_rqmode = DLM_LOCK_IV;
+       lkb->lkb_highbast = 0;
 }
 
 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        set_lvb_lock(r, lkb);
        _grant_lock(r, lkb);
-       lkb->lkb_highbast = 0;
 }
 
 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -1887,7 +1886,8 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
 /* Returns the highest requested mode of all blocked conversions; sets
    cw if there's a blocked conversion to DLM_LOCK_CW. */
 
-static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
+                                unsigned int *count)
 {
        struct dlm_lkb *lkb, *s;
        int hi, demoted, quit, grant_restart, demote_restart;
@@ -1906,6 +1906,8 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
                if (can_be_granted(r, lkb, 0, &deadlk)) {
                        grant_lock_pending(r, lkb);
                        grant_restart = 1;
+                       if (count)
+                               (*count)++;
                        continue;
                }
 
@@ -1939,14 +1941,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
        return max_t(int, high, hi);
 }
 
-static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
+                             unsigned int *count)
 {
        struct dlm_lkb *lkb, *s;
 
        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
-               if (can_be_granted(r, lkb, 0, NULL))
+               if (can_be_granted(r, lkb, 0, NULL)) {
                        grant_lock_pending(r, lkb);
-                else {
+                       if (count)
+                               (*count)++;
+               } else {
                        high = max_t(int, lkb->lkb_rqmode, high);
                        if (lkb->lkb_rqmode == DLM_LOCK_CW)
                                *cw = 1;
@@ -1975,16 +1980,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
        return 0;
 }
 
-static void grant_pending_locks(struct dlm_rsb *r)
+static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
 {
        struct dlm_lkb *lkb, *s;
        int high = DLM_LOCK_IV;
        int cw = 0;
 
-       DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
+       if (!is_master(r)) {
+               log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
+               dlm_dump_rsb(r);
+               return;
+       }
 
-       high = grant_pending_convert(r, high, &cw);
-       high = grant_pending_wait(r, high, &cw);
+       high = grant_pending_convert(r, high, &cw, count);
+       high = grant_pending_wait(r, high, &cw, count);
 
        if (high == DLM_LOCK_IV)
                return;
@@ -2520,7 +2529,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
           before we try again to grant this one. */
 
        if (is_demoted(lkb)) {
-               grant_pending_convert(r, DLM_LOCK_IV, NULL);
+               grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
                if (_can_be_granted(r, lkb, 1)) {
                        grant_lock(r, lkb);
                        queue_cast(r, lkb, 0);
@@ -2548,7 +2557,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
 {
        switch (error) {
        case 0:
-               grant_pending_locks(r);
+               grant_pending_locks(r, NULL);
                /* grant_pending_locks also sends basts */
                break;
        case -EAGAIN:
@@ -2571,7 +2580,7 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
                              int error)
 {
-       grant_pending_locks(r);
+       grant_pending_locks(r, NULL);
 }
 
 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
@@ -2592,7 +2601,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
                              int error)
 {
        if (error)
-               grant_pending_locks(r);
+               grant_pending_locks(r, NULL);
 }
 
 /*
@@ -3452,8 +3461,9 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
                goto fail;
 
        if (lkb->lkb_remid != ms->m_lkid) {
-               log_error(ls, "receive_convert %x remid %x remote %d %x",
-                         lkb->lkb_id, lkb->lkb_remid,
+               log_error(ls, "receive_convert %x remid %x recover_seq %llu "
+                         "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
+                         (unsigned long long)lkb->lkb_recover_seq,
                          ms->m_header.h_nodeid, ms->m_lkid);
                error = -ENOENT;
                goto fail;
@@ -3631,6 +3641,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
                goto out;
 
        queue_bast(r, lkb, ms->m_bastmode);
+       lkb->lkb_highbast = ms->m_bastmode;
  out:
        unlock_rsb(r);
        put_rsb(r);
@@ -3710,8 +3721,13 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
        mstype = lkb->lkb_wait_type;
        error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
-       if (error)
+       if (error) {
+               log_error(ls, "receive_request_reply %x remote %d %x result %d",
+                         lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
+                         ms->m_result);
+               dlm_dump_rsb(r);
                goto out;
+       }
 
        /* Optimization: the dir node was also the master, so it took our
           lookup as a request and sent request reply instead of lookup reply */
@@ -4122,21 +4138,28 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
         * happen in normal usage for the async messages and cancel, so
         * only use log_debug for them.
         *
-        * Other errors are expected and normal.
+        * Some errors are expected and normal.
         */
 
        if (error == -ENOENT && noent) {
-               log_debug(ls, "receive %d no %x remote %d %x seq %u",
+               log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
                          ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
                          ms->m_lkid, saved_seq);
        } else if (error == -ENOENT) {
-               log_error(ls, "receive %d no %x remote %d %x seq %u",
+               log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
                          ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
                          ms->m_lkid, saved_seq);
 
                if (ms->m_type == DLM_MSG_CONVERT)
                        dlm_dump_rsb_hash(ls, ms->m_hash);
        }
+
+       if (error == -EINVAL) {
+               log_error(ls, "receive %d inval from %d lkid %x remid %x "
+                         "saved_seq %u",
+                         ms->m_type, ms->m_header.h_nodeid,
+                         ms->m_lkid, ms->m_remid, saved_seq);
+       }
 }
 
 /* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4200,9 +4223,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
 
        ls = dlm_find_lockspace_global(hd->h_lockspace);
        if (!ls) {
-               if (dlm_config.ci_log_debug)
-                       log_print("invalid lockspace %x from %d cmd %d type %d",
-                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
+               if (dlm_config.ci_log_debug) {
+                       printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
+                               "%u from %d cmd %d type %d\n",
+                               hd->h_lockspace, nodeid, hd->h_cmd, type);
+               }
 
                if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
                        dlm_send_ls_not_ready(nodeid, &p->rcom);
@@ -4253,16 +4278,10 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
                                 int dir_nodeid)
 {
-       if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
-               return 1;
-
-       if (!dlm_no_directory(ls))
-               return 0;
-
-       if (dir_nodeid == dlm_our_nodeid())
+       if (dlm_no_directory(ls))
                return 1;
 
-       if (dir_nodeid != lkb->lkb_wait_nodeid)
+       if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
                return 1;
 
        return 0;
@@ -4519,112 +4538,177 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
        return error;
 }
 
-static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
-                       int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
+static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
+                             struct list_head *list)
 {
-       struct dlm_ls *ls = r->res_ls;
        struct dlm_lkb *lkb, *safe;
 
-       list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
-               if (test(ls, lkb)) {
-                       rsb_set_flag(r, RSB_LOCKS_PURGED);
-                       del_lkb(r, lkb);
-                       /* this put should free the lkb */
-                       if (!dlm_put_lkb(lkb))
-                               log_error(ls, "purged lkb not released");
-               }
+       list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+               if (!is_master_copy(lkb))
+                       continue;
+
+               /* don't purge lkbs we've added in recover_master_copy for
+                  the current recovery seq */
+
+               if (lkb->lkb_recover_seq == ls->ls_recover_seq)
+                       continue;
+
+               del_lkb(r, lkb);
+
+               /* this put should free the lkb */
+               if (!dlm_put_lkb(lkb))
+                       log_error(ls, "purged mstcpy lkb not released");
        }
 }
 
-static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
+void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
 {
-       return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
-}
+       struct dlm_ls *ls = r->res_ls;
 
-static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
-{
-       return is_master_copy(lkb);
+       purge_mstcpy_list(ls, r, &r->res_grantqueue);
+       purge_mstcpy_list(ls, r, &r->res_convertqueue);
+       purge_mstcpy_list(ls, r, &r->res_waitqueue);
 }
 
-static void purge_dead_locks(struct dlm_rsb *r)
+static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
+                           struct list_head *list,
+                           int nodeid_gone, unsigned int *count)
 {
-       purge_queue(r, &r->res_grantqueue, &purge_dead_test);
-       purge_queue(r, &r->res_convertqueue, &purge_dead_test);
-       purge_queue(r, &r->res_waitqueue, &purge_dead_test);
-}
+       struct dlm_lkb *lkb, *safe;
 
-void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
-{
-       purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
-       purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
-       purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
+       list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+               if (!is_master_copy(lkb))
+                       continue;
+
+               if ((lkb->lkb_nodeid == nodeid_gone) ||
+                   dlm_is_removed(ls, lkb->lkb_nodeid)) {
+
+                       del_lkb(r, lkb);
+
+                       /* this put should free the lkb */
+                       if (!dlm_put_lkb(lkb))
+                               log_error(ls, "purged dead lkb not released");
+
+                       rsb_set_flag(r, RSB_RECOVER_GRANT);
+
+                       (*count)++;
+               }
+       }
 }
 
 /* Get rid of locks held by nodes that are gone. */
 
-int dlm_purge_locks(struct dlm_ls *ls)
+void dlm_recover_purge(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
+       struct dlm_member *memb;
+       int nodes_count = 0;
+       int nodeid_gone = 0;
+       unsigned int lkb_count = 0;
+
+       /* cache one removed nodeid to optimize the common
+          case of a single node removed */
+
+       list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
+               nodes_count++;
+               nodeid_gone = memb->nodeid;
+       }
 
-       log_debug(ls, "dlm_purge_locks");
+       if (!nodes_count)
+               return;
 
        down_write(&ls->ls_root_sem);
        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
                hold_rsb(r);
                lock_rsb(r);
-               if (is_master(r))
-                       purge_dead_locks(r);
+               if (is_master(r)) {
+                       purge_dead_list(ls, r, &r->res_grantqueue,
+                                       nodeid_gone, &lkb_count);
+                       purge_dead_list(ls, r, &r->res_convertqueue,
+                                       nodeid_gone, &lkb_count);
+                       purge_dead_list(ls, r, &r->res_waitqueue,
+                                       nodeid_gone, &lkb_count);
+               }
                unlock_rsb(r);
                unhold_rsb(r);
-
-               schedule();
+               cond_resched();
        }
        up_write(&ls->ls_root_sem);
 
-       return 0;
+       if (lkb_count)
+               log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
+                         lkb_count, nodes_count);
 }
 
-static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
+static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 {
        struct rb_node *n;
-       struct dlm_rsb *r, *r_ret = NULL;
+       struct dlm_rsb *r;
 
        spin_lock(&ls->ls_rsbtbl[bucket].lock);
        for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
                r = rb_entry(n, struct dlm_rsb, res_hashnode);
-               if (!rsb_flag(r, RSB_LOCKS_PURGED))
+
+               if (!rsb_flag(r, RSB_RECOVER_GRANT))
+                       continue;
+               rsb_clear_flag(r, RSB_RECOVER_GRANT);
+               if (!is_master(r))
                        continue;
                hold_rsb(r);
-               rsb_clear_flag(r, RSB_LOCKS_PURGED);
-               r_ret = r;
-               break;
+               spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+               return r;
        }
        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
-       return r_ret;
+       return NULL;
 }
 
-void dlm_grant_after_purge(struct dlm_ls *ls)
+/*
+ * Attempt to grant locks on resources that we are the master of.
+ * Locks may have become grantable during recovery because locks
+ * from departed nodes have been purged (or not rebuilt), allowing
+ * previously blocked locks to now be granted.  The subset of rsb's
+ * we are interested in are those with lkb's on either the convert or
+ * waiting queues.
+ *
+ * Simplest would be to go through each master rsb and check for non-empty
+ * convert or waiting queues, and attempt to grant on those rsbs.
+ * Checking the queues requires lock_rsb, though, for which we'd need
+ * to release the rsbtbl lock.  This would make iterating through all
+ * rsb's very inefficient.  So, we rely on earlier recovery routines
+ * to set RECOVER_GRANT on any rsb's that we should attempt to grant
+ * locks for.
+ */
+
+void dlm_recover_grant(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
        int bucket = 0;
+       unsigned int count = 0;
+       unsigned int rsb_count = 0;
+       unsigned int lkb_count = 0;
 
        while (1) {
-               r = find_purged_rsb(ls, bucket);
+               r = find_grant_rsb(ls, bucket);
                if (!r) {
                        if (bucket == ls->ls_rsbtbl_size - 1)
                                break;
                        bucket++;
                        continue;
                }
+               rsb_count++;
+               count = 0;
                lock_rsb(r);
-               if (is_master(r)) {
-                       grant_pending_locks(r);
-                       confirm_master(r, 0);
-               }
+               grant_pending_locks(r, &count);
+               lkb_count += count;
+               confirm_master(r, 0);
                unlock_rsb(r);
                put_rsb(r);
-               schedule();
+               cond_resched();
        }
+
+       if (lkb_count)
+               log_debug(ls, "dlm_recover_grant %u locks on %u resources",
+                         lkb_count, rsb_count);
 }
 
 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
@@ -4723,11 +4807,26 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
 
        remid = le32_to_cpu(rl->rl_lkid);
 
-       error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
-                        R_MASTER, &r);
+       /* In general we expect the rsb returned to be R_MASTER, but we don't
+          have to require it.  Recovery of masters on one node can overlap
+          recovery of locks on another node, so one node can send us MSTCPY
+          locks before we've made ourselves master of this rsb.  We can still
+          add new MSTCPY locks that we receive here without any harm; when
+          we make ourselves master, dlm_recover_masters() won't touch the
+          MSTCPY locks we've received early. */
+
+       error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
        if (error)
                goto out;
 
+       if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
+               log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
+                         rc->rc_header.h_nodeid, remid);
+               error = -EBADR;
+               put_rsb(r);
+               goto out;
+       }
+
        lock_rsb(r);
 
        lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
@@ -4749,12 +4848,18 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        attach_lkb(r, lkb);
        add_lkb(r, lkb, rl->rl_status);
        error = 0;
+       ls->ls_recover_locks_in++;
+
+       if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
+               rsb_set_flag(r, RSB_RECOVER_GRANT);
 
  out_remid:
        /* this is the new value returned to the lock holder for
           saving in its process-copy lkb */
        rl->rl_remid = cpu_to_le32(lkb->lkb_id);
 
+       lkb->lkb_recover_seq = ls->ls_recover_seq;
+
  out_unlock:
        unlock_rsb(r);
        put_rsb(r);
@@ -4786,17 +4891,20 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
                return error;
        }
 
+       r = lkb->lkb_resource;
+       hold_rsb(r);
+       lock_rsb(r);
+
        if (!is_process_copy(lkb)) {
                log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
                          lkid, rc->rc_header.h_nodeid, remid, result);
-               dlm_print_lkb(lkb);
+               dlm_dump_rsb(r);
+               unlock_rsb(r);
+               put_rsb(r);
+               dlm_put_lkb(lkb);
                return -EINVAL;
        }
 
-       r = lkb->lkb_resource;
-       hold_rsb(r);
-       lock_rsb(r);
-
        switch (result) {
        case -EBADR:
                /* There's a chance the new master received our lock before
index 56e2bc6..c8b226c 100644 (file)
@@ -32,9 +32,9 @@ void dlm_adjust_timeouts(struct dlm_ls *ls);
 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
                        unsigned int flags, struct dlm_rsb **r_ret);
 
-int dlm_purge_locks(struct dlm_ls *ls);
+void dlm_recover_purge(struct dlm_ls *ls);
 void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
-void dlm_grant_after_purge(struct dlm_ls *ls);
+void dlm_recover_grant(struct dlm_ls *ls);
 int dlm_recover_waiters_post(struct dlm_ls *ls);
 void dlm_recover_waiters_pre(struct dlm_ls *ls);
 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
index a1ea25f..ca506ab 100644 (file)
@@ -74,6 +74,19 @@ static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
        return len;
 }
 
+static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
+{
+       return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
+}
+
+static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
+{
+       int val = simple_strtoul(buf, NULL, 0);
+       if (val == 1)
+               set_bit(LSFL_NODIR, &ls->ls_flags);
+       return len;
+}
+
 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 {
        uint32_t status = dlm_recover_status(ls);
@@ -107,6 +120,12 @@ static struct dlm_attr dlm_attr_id = {
        .store = dlm_id_store
 };
 
+static struct dlm_attr dlm_attr_nodir = {
+       .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
+       .show  = dlm_nodir_show,
+       .store = dlm_nodir_store
+};
+
 static struct dlm_attr dlm_attr_recover_status = {
        .attr  = {.name = "recover_status", .mode = S_IRUGO},
        .show  = dlm_recover_status_show
@@ -121,6 +140,7 @@ static struct attribute *dlm_attrs[] = {
        &dlm_attr_control.attr,
        &dlm_attr_event.attr,
        &dlm_attr_id.attr,
+       &dlm_attr_nodir.attr,
        &dlm_attr_recover_status.attr,
        &dlm_attr_recover_nodeid.attr,
        NULL,
index 6565fd5..64d3e2b 100644 (file)
@@ -492,30 +492,41 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 {
        int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
-       int stop, reply = 0;
+       int stop, reply = 0, lock = 0;
+       uint32_t status;
        uint64_t seq;
 
        switch (rc->rc_type) {
+       case DLM_RCOM_LOCK:
+               lock = 1;
+               break;
+       case DLM_RCOM_LOCK_REPLY:
+               lock = 1;
+               reply = 1;
+               break;
        case DLM_RCOM_STATUS_REPLY:
        case DLM_RCOM_NAMES_REPLY:
        case DLM_RCOM_LOOKUP_REPLY:
-       case DLM_RCOM_LOCK_REPLY:
                reply = 1;
        };
 
        spin_lock(&ls->ls_recover_lock);
+       status = ls->ls_recover_status;
        stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
        seq = ls->ls_recover_seq;
        spin_unlock(&ls->ls_recover_lock);
 
        if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) ||
-           (reply && (rc->rc_seq_reply != seq))) {
+           (reply && (rc->rc_seq_reply != seq)) ||
+           (lock && !(status & DLM_RS_DIR))) {
                log_limit(ls, "dlm_receive_rcom ignore msg %d "
-                         "from %d %llu %llu seq %llu",
-                          rc->rc_type, nodeid,
+                         "from %d %llu %llu recover seq %llu sts %x gen %u",
+                          rc->rc_type,
+                          nodeid,
                           (unsigned long long)rc->rc_seq,
                           (unsigned long long)rc->rc_seq_reply,
-                          (unsigned long long)seq);
+                          (unsigned long long)seq,
+                          status, ls->ls_generation);
                goto out;
        }
 
index 34d5adf..7554e4d 100644 (file)
@@ -339,9 +339,12 @@ static void set_lock_master(struct list_head *queue, int nodeid)
 {
        struct dlm_lkb *lkb;
 
-       list_for_each_entry(lkb, queue, lkb_statequeue)
-               if (!(lkb->lkb_flags & DLM_IFL_MSTCPY))
+       list_for_each_entry(lkb, queue, lkb_statequeue) {
+               if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
                        lkb->lkb_nodeid = nodeid;
+                       lkb->lkb_remid = 0;
+               }
+       }
 }
 
 static void set_master_lkbs(struct dlm_rsb *r)
@@ -354,18 +357,16 @@ static void set_master_lkbs(struct dlm_rsb *r)
 /*
  * Propagate the new master nodeid to locks
  * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
- * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which
+ * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
  * rsb's to consider.
  */
 
 static void set_new_master(struct dlm_rsb *r, int nodeid)
 {
-       lock_rsb(r);
        r->res_nodeid = nodeid;
        set_master_lkbs(r);
        rsb_set_flag(r, RSB_NEW_MASTER);
        rsb_set_flag(r, RSB_NEW_MASTER2);
-       unlock_rsb(r);
 }
 
 /*
@@ -376,9 +377,9 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
 static int recover_master(struct dlm_rsb *r)
 {
        struct dlm_ls *ls = r->res_ls;
-       int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
-
-       dir_nodeid = dlm_dir_nodeid(r);
+       int error, ret_nodeid;
+       int our_nodeid = dlm_our_nodeid();
+       int dir_nodeid = dlm_dir_nodeid(r);
 
        if (dir_nodeid == our_nodeid) {
                error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
@@ -388,7 +389,9 @@ static int recover_master(struct dlm_rsb *r)
 
                if (ret_nodeid == our_nodeid)
                        ret_nodeid = 0;
+               lock_rsb(r);
                set_new_master(r, ret_nodeid);
+               unlock_rsb(r);
        } else {
                recover_list_add(r);
                error = dlm_send_rcom_lookup(r, dir_nodeid);
@@ -398,24 +401,33 @@ static int recover_master(struct dlm_rsb *r)
 }
 
 /*
- * When not using a directory, most resource names will hash to a new static
- * master nodeid and the resource will need to be remastered.
+ * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
+ * This is necessary because recovery can be started, aborted and restarted,
+ * causing the master nodeid to briefly change during the aborted recovery, and
+ * change back to the original value in the second recovery.  The MSTCPY locks
+ * may or may not have been purged during the aborted recovery.  Another node
+ * with an outstanding request in waiters list and a request reply saved in the
+ * requestqueue, cannot know whether it should ignore the reply and resend the
+ * request, or accept the reply and complete the request.  It must do the
+ * former if the remote node purged MSTCPY locks, and it must do the later if
+ * the remote node did not.  This is solved by always purging MSTCPY locks, in
+ * which case, the request reply would always be ignored and the request
+ * resent.
  */
 
 static int recover_master_static(struct dlm_rsb *r)
 {
-       int master = dlm_dir_nodeid(r);
+       int dir_nodeid = dlm_dir_nodeid(r);
+       int new_master = dir_nodeid;
 
-       if (master == dlm_our_nodeid())
-               master = 0;
+       if (dir_nodeid == dlm_our_nodeid())
+               new_master = 0;
 
-       if (r->res_nodeid != master) {
-               if (is_master(r))
-                       dlm_purge_mstcpy_locks(r);
-               set_new_master(r, master);
-               return 1;
-       }
-       return 0;
+       lock_rsb(r);
+       dlm_purge_mstcpy_locks(r);
+       set_new_master(r, new_master);
+       unlock_rsb(r);
+       return 1;
 }
 
 /*
@@ -481,7 +493,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
        if (nodeid == dlm_our_nodeid())
                nodeid = 0;
 
+       lock_rsb(r);
        set_new_master(r, nodeid);
+       unlock_rsb(r);
        recover_list_del(r);
 
        if (recover_list_empty(ls))
@@ -556,8 +570,6 @@ int dlm_recover_locks(struct dlm_ls *ls)
        struct dlm_rsb *r;
        int error, count = 0;
 
-       log_debug(ls, "dlm_recover_locks");
-
        down_read(&ls->ls_root_sem);
        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
                if (is_master(r)) {
@@ -584,7 +596,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
        }
        up_read(&ls->ls_root_sem);
 
-       log_debug(ls, "dlm_recover_locks %d locks", count);
+       log_debug(ls, "dlm_recover_locks %d out", count);
 
        error = dlm_wait_function(ls, &recover_list_empty);
  out:
@@ -721,21 +733,19 @@ static void recover_conversion(struct dlm_rsb *r)
 }
 
 /* We've become the new master for this rsb and waiting/converting locks may
-   need to be granted in dlm_grant_after_purge() due to locks that may have
+   need to be granted in dlm_recover_grant() due to locks that may have
    existed from a removed node. */
 
-static void set_locks_purged(struct dlm_rsb *r)
+static void recover_grant(struct dlm_rsb *r)
 {
        if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
-               rsb_set_flag(r, RSB_LOCKS_PURGED);
+               rsb_set_flag(r, RSB_RECOVER_GRANT);
 }
 
 void dlm_recover_rsbs(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
-       int count = 0;
-
-       log_debug(ls, "dlm_recover_rsbs");
+       unsigned int count = 0;
 
        down_read(&ls->ls_root_sem);
        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
@@ -744,7 +754,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
                        if (rsb_flag(r, RSB_RECOVER_CONVERT))
                                recover_conversion(r);
                        if (rsb_flag(r, RSB_NEW_MASTER2))
-                               set_locks_purged(r);
+                               recover_grant(r);
                        recover_lvb(r);
                        count++;
                }
@@ -754,7 +764,8 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
        }
        up_read(&ls->ls_root_sem);
 
-       log_debug(ls, "dlm_recover_rsbs %d rsbs", count);
+       if (count)
+               log_debug(ls, "dlm_recover_rsbs %d done", count);
 }
 
 /* Create a single list of all root rsb's to be used during recovery */
index 11351b5..f1a9073 100644 (file)
@@ -84,6 +84,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
                goto fail;
        }
 
+       ls->ls_recover_locks_in = 0;
+
        dlm_set_recover_status(ls, DLM_RS_NODES);
 
        error = dlm_recover_members_wait(ls);
@@ -130,7 +132,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
                 * Clear lkb's for departed nodes.
                 */
 
-               dlm_purge_locks(ls);
+               dlm_recover_purge(ls);
 
                /*
                 * Get new master nodeid's for rsb's that were mastered on
@@ -161,6 +163,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
                        goto fail;
                }
 
+               log_debug(ls, "dlm_recover_locks %u in",
+                         ls->ls_recover_locks_in);
+
                /*
                 * Finalize state in master rsb's now that all locks can be
                 * checked.  This includes conversion resolution and lvb
@@ -225,7 +230,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
                goto fail;
        }
 
-       dlm_grant_after_purge(ls);
+       dlm_recover_grant(ls);
 
        log_debug(ls, "dlm_recover %llu generation %u done: %u ms",
                  (unsigned long long)rv->seq, ls->ls_generation,
index d3191bf..1695f1b 100644 (file)
@@ -65,6 +65,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
 int dlm_process_requestqueue(struct dlm_ls *ls)
 {
        struct rq_entry *e;
+       struct dlm_message *ms;
        int error = 0;
 
        mutex_lock(&ls->ls_requestqueue_mutex);
@@ -78,6 +79,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
                e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
                mutex_unlock(&ls->ls_requestqueue_mutex);
 
+               ms = &e->request;
+
+               log_limit(ls, "dlm_process_requestqueue msg %d from %d "
+                         "lkid %x remid %x result %d seq %u",
+                         ms->m_type, ms->m_header.h_nodeid,
+                         ms->m_lkid, ms->m_remid, ms->m_result,
+                         e->recover_seq);
+
                dlm_receive_message_saved(ls, &e->request, e->recover_seq);
 
                mutex_lock(&ls->ls_requestqueue_mutex);
@@ -140,35 +149,7 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
        if (!dlm_no_directory(ls))
                return 0;
 
-       /* with no directory, the master is likely to change as a part of
-          recovery; requests to/from the defunct master need to be purged */
-
-       switch (type) {
-       case DLM_MSG_REQUEST:
-       case DLM_MSG_CONVERT:
-       case DLM_MSG_UNLOCK:
-       case DLM_MSG_CANCEL:
-               /* we're no longer the master of this resource, the sender
-                  will resend to the new master (see waiter_needs_recovery) */
-
-               if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid())
-                       return 1;
-               break;
-
-       case DLM_MSG_REQUEST_REPLY:
-       case DLM_MSG_CONVERT_REPLY:
-       case DLM_MSG_UNLOCK_REPLY:
-       case DLM_MSG_CANCEL_REPLY:
-       case DLM_MSG_GRANT:
-               /* this reply is from the former master of the resource,
-                  we'll resend to the new master if needed */
-
-               if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid)
-                       return 1;
-               break;
-       }
-
-       return 0;
+       return 1;
 }
 
 void dlm_purge_requestqueue(struct dlm_ls *ls)
index 47d0bda..c7975bf 100644 (file)
@@ -556,7 +556,6 @@ struct gfs2_sb_host {
 struct lm_lockstruct {
        int ls_jid;
        unsigned int ls_first;
-       unsigned int ls_nodir;
        const struct lm_lockops *ls_ops;
        dlm_lockspace_t *ls_dlm;
 
index 5f5e70e..4a38db7 100644 (file)
@@ -1209,8 +1209,6 @@ static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
        fsname++;
 
        flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL;
-       if (ls->ls_nodir)
-               flags |= DLM_LSFL_NODIR;
 
        /*
         * create/join lockspace
index 6f3a18f..018b4c1 100644 (file)
@@ -994,6 +994,7 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
                                ls->ls_jid = option;
                        break;
                case Opt_id:
+               case Opt_nodir:
                        /* Obsolete, but left for backward compat purposes */
                        break;
                case Opt_first:
@@ -1002,12 +1003,6 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
                                goto hostdata_error;
                        ls->ls_first = option;
                        break;
-               case Opt_nodir:
-                       ret = match_int(&tmp[0], &option);
-                       if (ret || (option != 0 && option != 1))
-                               goto hostdata_error;
-                       ls->ls_nodir = option;
-                       break;
                case Opt_err:
                default:
 hostdata_error:
index 6c7f6e9..5201524 100644 (file)
@@ -67,7 +67,6 @@ struct dlm_lksb {
 
 /* dlm_new_lockspace() flags */
 
-#define DLM_LSFL_NODIR         0x00000001
 #define DLM_LSFL_TIMEWARN      0x00000002
 #define DLM_LSFL_FS            0x00000004
 #define DLM_LSFL_NEWEXCL       0x00000008