Merge commit '2c563880ea' into work.xattr
[cascardo/linux.git] / fs / ceph / dir.c
index 6ae6356..df4b3e6 100644 (file)
@@ -5,6 +5,7 @@
 #include <linux/namei.h>
 #include <linux/slab.h>
 #include <linux/sched.h>
+#include <linux/xattr.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -58,7 +59,7 @@ int ceph_init_dentry(struct dentry *dentry)
 
        di->dentry = dentry;
        di->lease_session = NULL;
-       dentry->d_time = jiffies;
+       di->time = jiffies;
        /* avoid reordering d_fsdata setup so that the check above is safe */
        smp_mb();
        dentry->d_fsdata = di;
@@ -69,16 +70,42 @@ out_unlock:
 }
 
 /*
- * for readdir, we encode the directory frag and offset within that
- * frag into f_pos.
+ * for f_pos for readdir:
+ * - hash order:
+ *     (0xff << 52) | ((24 bits hash) << 28) |
+ *     (the nth entry has hash collision);
+ * - frag+name order;
+ *     ((frag value) << 28) | (the nth entry in frag);
  */
+#define OFFSET_BITS    28
+#define OFFSET_MASK    ((1 << OFFSET_BITS) - 1)
+#define HASH_ORDER     (0xffull << (OFFSET_BITS + 24))
+loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
+{
+       loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
+       if (hash_order)
+               fpos |= HASH_ORDER;
+       return fpos;
+}
+
+static bool is_hash_order(loff_t p)
+{
+       return (p & HASH_ORDER) == HASH_ORDER;
+}
+
 static unsigned fpos_frag(loff_t p)
 {
-       return p >> 32;
+       return p >> OFFSET_BITS;
+}
+
+static unsigned fpos_hash(loff_t p)
+{
+       return ceph_frag_value(fpos_frag(p));
 }
+
 static unsigned fpos_off(loff_t p)
 {
-       return p & 0xffffffff;
+       return p & OFFSET_MASK;
 }
 
 static int fpos_cmp(loff_t l, loff_t r)
@@ -177,7 +204,7 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
        u64 idx = 0;
        int err = 0;
 
-       dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
+       dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
 
        /* search start position */
        if (ctx->pos > 2) {
@@ -234,7 +261,7 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
                spin_unlock(&dentry->d_lock);
 
                if (emit_dentry) {
-                       dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
+                       dout(" %llx dentry %p %pd %p\n", di->offset,
                             dentry, dentry, d_inode(dentry));
                        ctx->pos = di->offset;
                        if (!dir_emit(ctx, dentry->d_name.name,
@@ -269,6 +296,16 @@ out:
        return err;
 }
 
+static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos)
+{
+       if (!fi->last_readdir)
+               return true;
+       if (is_hash_order(pos))
+               return !ceph_frag_contains_value(fi->frag, fpos_hash(pos));
+       else
+               return fi->frag != fpos_frag(pos);
+}
+
 static int ceph_readdir(struct file *file, struct dir_context *ctx)
 {
        struct ceph_file_info *fi = file->private_data;
@@ -276,7 +313,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_mds_client *mdsc = fsc->mdsc;
-       unsigned frag = fpos_frag(ctx->pos);
        int i;
        int err;
        u32 ftype;
@@ -317,7 +353,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
                err = __dcache_readdir(file, ctx, shared_gen);
                if (err != -EAGAIN)
                        return err;
-               frag = fpos_frag(ctx->pos);
        } else {
                spin_unlock(&ci->i_ceph_lock);
        }
@@ -325,8 +360,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        /* proceed with a normal readdir */
 more:
        /* do we have the correct frag content buffered? */
-       if (fi->frag != frag || fi->last_readdir == NULL) {
+       if (need_send_readdir(fi, ctx->pos)) {
                struct ceph_mds_request *req;
+               unsigned frag;
                int op = ceph_snap(inode) == CEPH_SNAPDIR ?
                        CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
 
@@ -336,6 +372,13 @@ more:
                        fi->last_readdir = NULL;
                }
 
+               if (is_hash_order(ctx->pos)) {
+                       frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
+                                               NULL, NULL);
+               } else {
+                       frag = fpos_frag(ctx->pos);
+               }
+
                dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
                     ceph_vinop(inode), frag, fi->last_name);
                req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
@@ -373,19 +416,23 @@ more:
                        ceph_mdsc_put_request(req);
                        return err;
                }
-               dout("readdir got and parsed readdir result=%d"
-                    " on frag %x, end=%d, complete=%d\n", err, frag,
+               dout("readdir got and parsed readdir result=%d on "
+                    "frag %x, end=%d, complete=%d, hash_order=%d\n",
+                    err, frag,
                     (int)req->r_reply_info.dir_end,
-                    (int)req->r_reply_info.dir_complete);
+                    (int)req->r_reply_info.dir_complete,
+                    (int)req->r_reply_info.hash_order);
 
-
-               /* note next offset and last dentry name */
                rinfo = &req->r_reply_info;
                if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
                        frag = le32_to_cpu(rinfo->dir_dir->frag);
-                       fi->next_offset = req->r_readdir_offset;
-                       /* adjust ctx->pos to beginning of frag */
-                       ctx->pos = ceph_make_fpos(frag, fi->next_offset);
+                       if (!rinfo->hash_order) {
+                               fi->next_offset = req->r_readdir_offset;
+                               /* adjust ctx->pos to beginning of frag */
+                               ctx->pos = ceph_make_fpos(frag,
+                                                         fi->next_offset,
+                                                         false);
+                       }
                }
 
                fi->frag = frag;
@@ -411,23 +458,25 @@ more:
                        fi->dir_release_count = 0;
                }
 
-               if (req->r_reply_info.dir_end) {
-                       kfree(fi->last_name);
-                       fi->last_name = NULL;
-                       fi->next_offset = 2;
-               } else {
+               /* note next offset and last dentry name */
+               if (rinfo->dir_nr > 0) {
                        struct ceph_mds_reply_dir_entry *rde =
                                        rinfo->dir_entries + (rinfo->dir_nr-1);
+                       unsigned next_offset = req->r_reply_info.dir_end ?
+                                       2 : (fpos_off(rde->offset) + 1);
                        err = note_last_dentry(fi, rde->name, rde->name_len,
-                                              fpos_off(rde->offset) + 1);
+                                              next_offset);
                        if (err)
                                return err;
+               } else if (req->r_reply_info.dir_end) {
+                       fi->next_offset = 2;
+                       /* keep last name */
                }
        }
 
        rinfo = &fi->last_readdir->r_reply_info;
        dout("readdir frag %x num %d pos %llx chunk first %llx\n",
-            frag, rinfo->dir_nr, ctx->pos,
+            fi->frag, rinfo->dir_nr, ctx->pos,
             rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
 
        i = 0;
@@ -470,16 +519,26 @@ more:
                ctx->pos++;
        }
 
-       if (fi->last_name) {
+       if (fi->next_offset > 2) {
                ceph_mdsc_put_request(fi->last_readdir);
                fi->last_readdir = NULL;
                goto more;
        }
 
        /* more frags? */
-       if (!ceph_frag_is_rightmost(frag)) {
-               frag = ceph_frag_next(frag);
-               ctx->pos = ceph_make_fpos(frag, 2);
+       if (!ceph_frag_is_rightmost(fi->frag)) {
+               unsigned frag = ceph_frag_next(fi->frag);
+               if (is_hash_order(ctx->pos)) {
+                       loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
+                                                       fi->next_offset, true);
+                       if (new_pos > ctx->pos)
+                               ctx->pos = new_pos;
+                       /* keep last_name */
+               } else {
+                       ctx->pos = ceph_make_fpos(frag, fi->next_offset, false);
+                       kfree(fi->last_name);
+                       fi->last_name = NULL;
+               }
                dout("readdir next frag is %x\n", frag);
                goto more;
        }
@@ -532,21 +591,27 @@ static void reset_readdir(struct ceph_file_info *fi)
 static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
 {
        struct ceph_mds_reply_info_parsed *rinfo;
+       loff_t chunk_offset;
        if (new_pos == 0)
                return true;
-       if (fpos_frag(new_pos) != fi->frag)
+       if (is_hash_order(new_pos)) {
+               /* no need to reset last_name for a forward seek when
+                * dentries are sotred in hash order */
+       } else if (fi->frag != fpos_frag(new_pos)) {
                return true;
+       }
        rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
        if (!rinfo || !rinfo->dir_nr)
                return true;
-       return new_pos < rinfo->dir_entries[0].offset;;
+       chunk_offset = rinfo->dir_entries[0].offset;
+       return new_pos < chunk_offset ||
+              is_hash_order(new_pos) != is_hash_order(chunk_offset);
 }
 
 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
 {
        struct ceph_file_info *fi = file->private_data;
        struct inode *inode = file->f_mapping->host;
-       loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
        loff_t retval;
 
        inode_lock(inode);
@@ -563,21 +628,22 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
        }
 
        if (offset >= 0) {
-               if (offset != file->f_pos) {
-                       file->f_pos = offset;
-                       file->f_version = 0;
-                       fi->flags &= ~CEPH_F_ATEND;
-               }
-               retval = offset;
-
                if (need_reset_readdir(fi, offset)) {
                        dout("dir_llseek dropping %p content\n", file);
                        reset_readdir(fi);
-               } else if (fpos_cmp(offset, old_offset) > 0) {
-                       /* reset dir_release_count if we did a forward seek */
+               } else if (is_hash_order(offset) && offset > file->f_pos) {
+                       /* for hash offset, we don't know if a forward seek
+                        * is within same frag */
                        fi->dir_release_count = 0;
                        fi->readdir_cache_idx = -1;
                }
+
+               if (offset != file->f_pos) {
+                       file->f_pos = offset;
+                       file->f_version = 0;
+                       fi->flags &= ~CEPH_F_ATEND;
+               }
+               retval = offset;
        }
 out:
        inode_unlock(inode);
@@ -645,7 +711,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
        return dentry;
 }
 
-static int is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
+static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
 {
        return ceph_ino(inode) == CEPH_INO_ROOT &&
                strncmp(dentry->d_name.name, ".ceph", 5) == 0;
@@ -1058,7 +1124,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
 void ceph_invalidate_dentry_lease(struct dentry *dentry)
 {
        spin_lock(&dentry->d_lock);
-       dentry->d_time = jiffies;
+       ceph_dentry(dentry)->time = jiffies;
        ceph_dentry(dentry)->lease_shared_gen = 0;
        spin_unlock(&dentry->d_lock);
 }
@@ -1067,7 +1133,8 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry)
  * Check if dentry lease is valid.  If not, delete the lease.  Try to
  * renew if the least is more than half up.
  */
-static int dentry_lease_is_valid(struct dentry *dentry)
+static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
+                                struct inode *dir)
 {
        struct ceph_dentry_info *di;
        struct ceph_mds_session *s;
@@ -1075,12 +1142,11 @@ static int dentry_lease_is_valid(struct dentry *dentry)
        u32 gen;
        unsigned long ttl;
        struct ceph_mds_session *session = NULL;
-       struct inode *dir = NULL;
        u32 seq = 0;
 
        spin_lock(&dentry->d_lock);
        di = ceph_dentry(dentry);
-       if (di->lease_session) {
+       if (di && di->lease_session) {
                s = di->lease_session;
                spin_lock(&s->s_gen_ttl_lock);
                gen = s->s_cap_gen;
@@ -1088,17 +1154,24 @@ static int dentry_lease_is_valid(struct dentry *dentry)
                spin_unlock(&s->s_gen_ttl_lock);
 
                if (di->lease_gen == gen &&
-                   time_before(jiffies, dentry->d_time) &&
+                   time_before(jiffies, di->time) &&
                    time_before(jiffies, ttl)) {
                        valid = 1;
                        if (di->lease_renew_after &&
                            time_after(jiffies, di->lease_renew_after)) {
-                               /* we should renew */
-                               dir = d_inode(dentry->d_parent);
-                               session = ceph_get_mds_session(s);
-                               seq = di->lease_seq;
-                               di->lease_renew_after = 0;
-                               di->lease_renew_from = jiffies;
+                               /*
+                                * We should renew. If we're in RCU walk mode
+                                * though, we can't do that so just return
+                                * -ECHILD.
+                                */
+                               if (flags & LOOKUP_RCU) {
+                                       valid = -ECHILD;
+                               } else {
+                                       session = ceph_get_mds_session(s);
+                                       seq = di->lease_seq;
+                                       di->lease_renew_after = 0;
+                                       di->lease_renew_from = jiffies;
+                               }
                        }
                }
        }
@@ -1141,15 +1214,19 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
        struct dentry *parent;
        struct inode *dir;
 
-       if (flags & LOOKUP_RCU)
-               return -ECHILD;
+       if (flags & LOOKUP_RCU) {
+               parent = ACCESS_ONCE(dentry->d_parent);
+               dir = d_inode_rcu(parent);
+               if (!dir)
+                       return -ECHILD;
+       } else {
+               parent = dget_parent(dentry);
+               dir = d_inode(parent);
+       }
 
        dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
             dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
 
-       parent = dget_parent(dentry);
-       dir = d_inode(parent);
-
        /* always trust cached snapped dentries, snapdir dentry */
        if (ceph_snap(dir) != CEPH_NOSNAP) {
                dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
@@ -1158,12 +1235,16 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
        } else if (d_really_is_positive(dentry) &&
                   ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) {
                valid = 1;
-       } else if (dentry_lease_is_valid(dentry) ||
-                  dir_lease_is_valid(dir, dentry)) {
-               if (d_really_is_positive(dentry))
-                       valid = ceph_is_any_caps(d_inode(dentry));
-               else
-                       valid = 1;
+       } else {
+               valid = dentry_lease_is_valid(dentry, flags, dir);
+               if (valid == -ECHILD)
+                       return valid;
+               if (valid || dir_lease_is_valid(dir, dentry)) {
+                       if (d_really_is_positive(dentry))
+                               valid = ceph_is_any_caps(d_inode(dentry));
+                       else
+                               valid = 1;
+               }
        }
 
        if (!valid) {
@@ -1172,6 +1253,9 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
                struct ceph_mds_request *req;
                int op, mask, err;
 
+               if (flags & LOOKUP_RCU)
+                       return -ECHILD;
+
                op = ceph_snap(dir) == CEPH_SNAPDIR ?
                        CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
                req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
@@ -1207,7 +1291,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
                ceph_dir_clear_complete(dir);
        }
 
-       dput(parent);
+       if (!(flags & LOOKUP_RCU))
+               dput(parent);
        return valid;
 }
 
@@ -1220,10 +1305,14 @@ static void ceph_d_release(struct dentry *dentry)
 
        dout("d_release %p\n", dentry);
        ceph_dentry_lru_del(dentry);
+
+       spin_lock(&dentry->d_lock);
+       dentry->d_fsdata = NULL;
+       spin_unlock(&dentry->d_lock);
+
        if (di->lease_session)
                ceph_put_mds_session(di->lease_session);
        kmem_cache_free(ceph_dentry_cachep, di);
-       dentry->d_fsdata = NULL;
 }
 
 static int ceph_snapdir_d_revalidate(struct dentry *dentry,
@@ -1397,10 +1486,10 @@ const struct inode_operations ceph_dir_iops = {
        .permission = ceph_permission,
        .getattr = ceph_getattr,
        .setattr = ceph_setattr,
-       .setxattr = ceph_setxattr,
-       .getxattr = ceph_getxattr,
+       .setxattr = generic_setxattr,
+       .getxattr = generic_getxattr,
        .listxattr = ceph_listxattr,
-       .removexattr = ceph_removexattr,
+       .removexattr = generic_removexattr,
        .get_acl = ceph_get_acl,
        .set_acl = ceph_set_acl,
        .mknod = ceph_mknod,