Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
[cascardo/linux.git] / fs / ceph / file.c
index 9f8e357..ce74b39 100644 (file)
@@ -333,6 +333,11 @@ int ceph_release(struct inode *inode, struct file *file)
        return 0;
 }
 
+enum {
+       CHECK_EOF = 1,
+       READ_INLINE = 2,
+};
+
 /*
  * Read a range of bytes striped over one or more objects.  Iterate over
  * objects we stripe over.  (That's not atomic, but good enough for now.)
@@ -412,7 +417,7 @@ more:
                ret = read;
                /* did we bounce off eof? */
                if (pos + left > inode->i_size)
-                       *checkeof = 1;
+                       *checkeof = CHECK_EOF;
        }
 
        dout("striped_read returns %d\n", ret);
@@ -598,7 +603,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
                snapc = ci->i_snap_realm->cached_context;
                vino = ceph_vino(inode);
                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-                                           vino, pos, &len,
+                                           vino, pos, &len, 0,
                                            2,/*include a 'startsync' command*/
                                            CEPH_OSD_OP_WRITE, flags, snapc,
                                            ci->i_truncate_seq,
@@ -609,6 +614,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
                        break;
                }
 
+               osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+
                n = iov_iter_get_pages_alloc(from, &pages, len, &start);
                if (unlikely(n < 0)) {
                        ret = n;
@@ -713,7 +720,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
                snapc = ci->i_snap_realm->cached_context;
                vino = ceph_vino(inode);
                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-                                           vino, pos, &len, 1,
+                                           vino, pos, &len, 0, 1,
                                            CEPH_OSD_OP_WRITE, flags, snapc,
                                            ci->i_truncate_seq,
                                            ci->i_truncate_size,
@@ -803,9 +810,10 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
        size_t len = iocb->ki_nbytes;
        struct inode *inode = file_inode(filp);
        struct ceph_inode_info *ci = ceph_inode(inode);
+       struct page *pinned_page = NULL;
        ssize_t ret;
        int want, got = 0;
-       int checkeof = 0, read = 0;
+       int retry_op = 0, read = 0;
 
 again:
        dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
@@ -815,7 +823,7 @@ again:
                want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
        else
                want = CEPH_CAP_FILE_CACHE;
-       ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
+       ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
        if (ret < 0)
                return ret;
 
@@ -827,8 +835,12 @@ again:
                     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
                     ceph_cap_string(got));
 
-               /* hmm, this isn't really async... */
-               ret = ceph_sync_read(iocb, to, &checkeof);
+               if (ci->i_inline_version == CEPH_INLINE_NONE) {
+                       /* hmm, this isn't really async... */
+                       ret = ceph_sync_read(iocb, to, &retry_op);
+               } else {
+                       retry_op = READ_INLINE;
+               }
        } else {
                dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
                     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
@@ -838,13 +850,55 @@ again:
        }
        dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
             inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
+       if (pinned_page) {
+               page_cache_release(pinned_page);
+               pinned_page = NULL;
+       }
        ceph_put_cap_refs(ci, got);
+       if (retry_op && ret >= 0) {
+               int statret;
+               struct page *page = NULL;
+               loff_t i_size;
+               if (retry_op == READ_INLINE) {
+                       page = __page_cache_alloc(GFP_NOFS);
+                       if (!page)
+                               return -ENOMEM;
+               }
 
-       if (checkeof && ret >= 0) {
-               int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
+               statret = __ceph_do_getattr(inode, page,
+                                           CEPH_STAT_CAP_INLINE_DATA, !!page);
+               if (statret < 0) {
+                        __free_page(page);
+                       if (statret == -ENODATA) {
+                               BUG_ON(retry_op != READ_INLINE);
+                               goto again;
+                       }
+                       return statret;
+               }
+
+               i_size = i_size_read(inode);
+               if (retry_op == READ_INLINE) {
+                       /* does not support inline data > PAGE_SIZE */
+                       if (i_size > PAGE_CACHE_SIZE) {
+                               ret = -EIO;
+                       } else if (iocb->ki_pos < i_size) {
+                               loff_t end = min_t(loff_t, i_size,
+                                                  iocb->ki_pos + len);
+                               if (statret < end)
+                                       zero_user_segment(page, statret, end);
+                               ret = copy_page_to_iter(page,
+                                               iocb->ki_pos & ~PAGE_MASK,
+                                               end - iocb->ki_pos, to);
+                               iocb->ki_pos += ret;
+                       } else {
+                               ret = 0;
+                       }
+                       __free_pages(page, 0);
+                       return ret;
+               }
 
                /* hit EOF or hole? */
-               if (statret == 0 && iocb->ki_pos < inode->i_size &&
+               if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
                        ret < len) {
                        dout("sync_read hit hole, ppos %lld < size %lld"
                             ", reading more\n", iocb->ki_pos,
@@ -852,7 +906,7 @@ again:
 
                        read += ret;
                        len -= ret;
-                       checkeof = 0;
+                       retry_op = 0;
                        goto again;
                }
        }
@@ -909,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
        if (err)
                goto out;
 
+       if (ci->i_inline_version != CEPH_INLINE_NONE) {
+               err = ceph_uninline_data(file, NULL);
+               if (err < 0)
+                       goto out;
+       }
+
 retry_snap:
        if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
                err = -ENOSPC;
@@ -922,7 +982,8 @@ retry_snap:
        else
                want = CEPH_CAP_FILE_BUFFER;
        got = 0;
-       err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count);
+       err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
+                           &got, NULL);
        if (err < 0)
                goto out;
 
@@ -969,6 +1030,7 @@ retry_snap:
        if (written >= 0) {
                int dirty;
                spin_lock(&ci->i_ceph_lock);
+               ci->i_inline_version = CEPH_INLINE_NONE;
                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)
@@ -1111,7 +1173,7 @@ static int ceph_zero_partial_object(struct inode *inode,
        req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
                                        ceph_vino(inode),
                                        offset, length,
-                                       1, op,
+                                       0, 1, op,
                                        CEPH_OSD_FLAG_WRITE |
                                        CEPH_OSD_FLAG_ONDISK,
                                        NULL, 0, 0, false);
@@ -1214,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode,
                goto unlock;
        }
 
+       if (ci->i_inline_version != CEPH_INLINE_NONE) {
+               ret = ceph_uninline_data(file, NULL);
+               if (ret < 0)
+                       goto unlock;
+       }
+
        size = i_size_read(inode);
        if (!(mode & FALLOC_FL_KEEP_SIZE))
                endoff = offset + length;
@@ -1223,7 +1291,7 @@ static long ceph_fallocate(struct file *file, int mode,
        else
                want = CEPH_CAP_FILE_BUFFER;
 
-       ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
+       ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
        if (ret < 0)
                goto unlock;
 
@@ -1240,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode,
 
        if (!ret) {
                spin_lock(&ci->i_ceph_lock);
+               ci->i_inline_version = CEPH_INLINE_NONE;
                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)