Merge branch 'drm-next' of git://people.freedesktop.org/~dvdhrm/linux into drm-next
[cascardo/linux.git] / fs / ceph / file.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/module.h>
4 #include <linux/sched.h>
5 #include <linux/slab.h>
6 #include <linux/file.h>
7 #include <linux/mount.h>
8 #include <linux/namei.h>
9 #include <linux/writeback.h>
10 #include <linux/aio.h>
11 #include <linux/falloc.h>
12
13 #include "super.h"
14 #include "mds_client.h"
15 #include "cache.h"
16
17 /*
18  * Ceph file operations
19  *
20  * Implement basic open/close functionality, and implement
21  * read/write.
22  *
23  * We implement three modes of file I/O:
24  *  - buffered uses the generic_file_aio_{read,write} helpers
25  *
26  *  - synchronous is used when there is multi-client read/write
27  *    sharing, avoids the page cache, and synchronously waits for an
28  *    ack from the OSD.
29  *
30  *  - direct io takes the variant of the sync path that references
31  *    user pages directly.
32  *
33  * fsync() flushes and waits on dirty pages, but just queues metadata
34  * for writeback: since the MDS can recover size and mtime there is no
35  * need to wait for MDS acknowledgement.
36  */
37
38
39 /*
40  * Prepare an open request.  Preallocate ceph_cap to avoid an
41  * inopportune ENOMEM later.
42  */
43 static struct ceph_mds_request *
44 prepare_open_request(struct super_block *sb, int flags, int create_mode)
45 {
46         struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
47         struct ceph_mds_client *mdsc = fsc->mdsc;
48         struct ceph_mds_request *req;
49         int want_auth = USE_ANY_MDS;
50         int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN;
51
52         if (flags & (O_WRONLY|O_RDWR|O_CREAT|O_TRUNC))
53                 want_auth = USE_AUTH_MDS;
54
55         req = ceph_mdsc_create_request(mdsc, op, want_auth);
56         if (IS_ERR(req))
57                 goto out;
58         req->r_fmode = ceph_flags_to_mode(flags);
59         req->r_args.open.flags = cpu_to_le32(flags);
60         req->r_args.open.mode = cpu_to_le32(create_mode);
61 out:
62         return req;
63 }
64
65 /*
66  * initialize private struct file data.
67  * if we fail, clean up by dropping fmode reference on the ceph_inode
68  */
69 static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
70 {
71         struct ceph_file_info *cf;
72         int ret = 0;
73         struct ceph_inode_info *ci = ceph_inode(inode);
74         struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
75         struct ceph_mds_client *mdsc = fsc->mdsc;
76
77         switch (inode->i_mode & S_IFMT) {
78         case S_IFREG:
79                 /* First file open request creates the cookie, we want to keep
80                  * this cookie around for the filetime of the inode as not to
81                  * have to worry about fscache register / revoke / operation
82                  * races.
83                  *
84                  * Also, if we know the operation is going to invalidate data
85                  * (non readonly) just nuke the cache right away.
86                  */
87                 ceph_fscache_register_inode_cookie(mdsc->fsc, ci);
88                 if ((fmode & CEPH_FILE_MODE_WR))
89                         ceph_fscache_invalidate(inode);
90         case S_IFDIR:
91                 dout("init_file %p %p 0%o (regular)\n", inode, file,
92                      inode->i_mode);
93                 cf = kmem_cache_alloc(ceph_file_cachep, GFP_NOFS | __GFP_ZERO);
94                 if (cf == NULL) {
95                         ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
96                         return -ENOMEM;
97                 }
98                 cf->fmode = fmode;
99                 cf->next_offset = 2;
100                 file->private_data = cf;
101                 BUG_ON(inode->i_fop->release != ceph_release);
102                 break;
103
104         case S_IFLNK:
105                 dout("init_file %p %p 0%o (symlink)\n", inode, file,
106                      inode->i_mode);
107                 ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
108                 break;
109
110         default:
111                 dout("init_file %p %p 0%o (special)\n", inode, file,
112                      inode->i_mode);
113                 /*
114                  * we need to drop the open ref now, since we don't
115                  * have .release set to ceph_release.
116                  */
117                 ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
118                 BUG_ON(inode->i_fop->release == ceph_release);
119
120                 /* call the proper open fop */
121                 ret = inode->i_fop->open(inode, file);
122         }
123         return ret;
124 }
125
126 /*
127  * If we already have the requisite capabilities, we can satisfy
128  * the open request locally (no need to request new caps from the
129  * MDS).  We do, however, need to inform the MDS (asynchronously)
130  * if our wanted caps set expands.
131  */
132 int ceph_open(struct inode *inode, struct file *file)
133 {
134         struct ceph_inode_info *ci = ceph_inode(inode);
135         struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
136         struct ceph_mds_client *mdsc = fsc->mdsc;
137         struct ceph_mds_request *req;
138         struct ceph_file_info *cf = file->private_data;
139         struct inode *parent_inode = NULL;
140         int err;
141         int flags, fmode, wanted;
142
143         if (cf) {
144                 dout("open file %p is already opened\n", file);
145                 return 0;
146         }
147
148         /* filter out O_CREAT|O_EXCL; vfs did that already.  yuck. */
149         flags = file->f_flags & ~(O_CREAT|O_EXCL);
150         if (S_ISDIR(inode->i_mode))
151                 flags = O_DIRECTORY;  /* mds likes to know */
152
153         dout("open inode %p ino %llx.%llx file %p flags %d (%d)\n", inode,
154              ceph_vinop(inode), file, flags, file->f_flags);
155         fmode = ceph_flags_to_mode(flags);
156         wanted = ceph_caps_for_mode(fmode);
157
158         /* snapped files are read-only */
159         if (ceph_snap(inode) != CEPH_NOSNAP && (file->f_mode & FMODE_WRITE))
160                 return -EROFS;
161
162         /* trivially open snapdir */
163         if (ceph_snap(inode) == CEPH_SNAPDIR) {
164                 spin_lock(&ci->i_ceph_lock);
165                 __ceph_get_fmode(ci, fmode);
166                 spin_unlock(&ci->i_ceph_lock);
167                 return ceph_init_file(inode, file, fmode);
168         }
169
170         /*
171          * No need to block if we have caps on the auth MDS (for
172          * write) or any MDS (for read).  Update wanted set
173          * asynchronously.
174          */
175         spin_lock(&ci->i_ceph_lock);
176         if (__ceph_is_any_real_caps(ci) &&
177             (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) {
178                 int mds_wanted = __ceph_caps_mds_wanted(ci);
179                 int issued = __ceph_caps_issued(ci, NULL);
180
181                 dout("open %p fmode %d want %s issued %s using existing\n",
182                      inode, fmode, ceph_cap_string(wanted),
183                      ceph_cap_string(issued));
184                 __ceph_get_fmode(ci, fmode);
185                 spin_unlock(&ci->i_ceph_lock);
186
187                 /* adjust wanted? */
188                 if ((issued & wanted) != wanted &&
189                     (mds_wanted & wanted) != wanted &&
190                     ceph_snap(inode) != CEPH_SNAPDIR)
191                         ceph_check_caps(ci, 0, NULL);
192
193                 return ceph_init_file(inode, file, fmode);
194         } else if (ceph_snap(inode) != CEPH_NOSNAP &&
195                    (ci->i_snap_caps & wanted) == wanted) {
196                 __ceph_get_fmode(ci, fmode);
197                 spin_unlock(&ci->i_ceph_lock);
198                 return ceph_init_file(inode, file, fmode);
199         }
200
201         spin_unlock(&ci->i_ceph_lock);
202
203         dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
204         req = prepare_open_request(inode->i_sb, flags, 0);
205         if (IS_ERR(req)) {
206                 err = PTR_ERR(req);
207                 goto out;
208         }
209         req->r_inode = inode;
210         ihold(inode);
211
212         req->r_num_caps = 1;
213         if (flags & O_CREAT)
214                 parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
215         err = ceph_mdsc_do_request(mdsc, parent_inode, req);
216         iput(parent_inode);
217         if (!err)
218                 err = ceph_init_file(inode, file, req->r_fmode);
219         ceph_mdsc_put_request(req);
220         dout("open result=%d on %llx.%llx\n", err, ceph_vinop(inode));
221 out:
222         return err;
223 }
224
225
226 /*
227  * Do a lookup + open with a single request.  If we get a non-existent
228  * file or symlink, return 1 so the VFS can retry.
229  */
230 int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
231                      struct file *file, unsigned flags, umode_t mode,
232                      int *opened)
233 {
234         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
235         struct ceph_mds_client *mdsc = fsc->mdsc;
236         struct ceph_mds_request *req;
237         struct dentry *dn;
238         int err;
239
240         dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n",
241              dir, dentry, dentry->d_name.len, dentry->d_name.name,
242              d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode);
243
244         if (dentry->d_name.len > NAME_MAX)
245                 return -ENAMETOOLONG;
246
247         err = ceph_init_dentry(dentry);
248         if (err < 0)
249                 return err;
250
251         /* do the open */
252         req = prepare_open_request(dir->i_sb, flags, mode);
253         if (IS_ERR(req))
254                 return PTR_ERR(req);
255         req->r_dentry = dget(dentry);
256         req->r_num_caps = 2;
257         if (flags & O_CREAT) {
258                 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
259                 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
260         }
261         req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
262         err = ceph_mdsc_do_request(mdsc,
263                                    (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
264                                    req);
265         if (err)
266                 goto out_err;
267
268         err = ceph_handle_snapdir(req, dentry, err);
269         if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
270                 err = ceph_handle_notrace_create(dir, dentry);
271
272         if (d_unhashed(dentry)) {
273                 dn = ceph_finish_lookup(req, dentry, err);
274                 if (IS_ERR(dn))
275                         err = PTR_ERR(dn);
276         } else {
277                 /* we were given a hashed negative dentry */
278                 dn = NULL;
279         }
280         if (err)
281                 goto out_err;
282         if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) {
283                 /* make vfs retry on splice, ENOENT, or symlink */
284                 dout("atomic_open finish_no_open on dn %p\n", dn);
285                 err = finish_no_open(file, dn);
286         } else {
287                 dout("atomic_open finish_open on dn %p\n", dn);
288                 if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
289                         ceph_init_acl(dentry, dentry->d_inode, dir);
290                         *opened |= FILE_CREATED;
291                 }
292                 err = finish_open(file, dentry, ceph_open, opened);
293         }
294 out_err:
295         if (!req->r_err && req->r_target_inode)
296                 ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
297         ceph_mdsc_put_request(req);
298         dout("atomic_open result=%d\n", err);
299         return err;
300 }
301
302 int ceph_release(struct inode *inode, struct file *file)
303 {
304         struct ceph_inode_info *ci = ceph_inode(inode);
305         struct ceph_file_info *cf = file->private_data;
306
307         dout("release inode %p file %p\n", inode, file);
308         ceph_put_fmode(ci, cf->fmode);
309         if (cf->last_readdir)
310                 ceph_mdsc_put_request(cf->last_readdir);
311         kfree(cf->last_name);
312         kfree(cf->dir_info);
313         dput(cf->dentry);
314         kmem_cache_free(ceph_file_cachep, cf);
315
316         /* wake up anyone waiting for caps on this inode */
317         wake_up_all(&ci->i_cap_wq);
318         return 0;
319 }
320
321 /*
322  * Read a range of bytes striped over one or more objects.  Iterate over
323  * objects we stripe over.  (That's not atomic, but good enough for now.)
324  *
325  * If we get a short result from the OSD, check against i_size; we need to
326  * only return a short read to the caller if we hit EOF.
327  */
328 static int striped_read(struct inode *inode,
329                         u64 off, u64 len,
330                         struct page **pages, int num_pages,
331                         int *checkeof, bool o_direct,
332                         unsigned long buf_align)
333 {
334         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
335         struct ceph_inode_info *ci = ceph_inode(inode);
336         u64 pos, this_len, left;
337         int io_align, page_align;
338         int pages_left;
339         int read;
340         struct page **page_pos;
341         int ret;
342         bool hit_stripe, was_short;
343
344         /*
345          * we may need to do multiple reads.  not atomic, unfortunately.
346          */
347         pos = off;
348         left = len;
349         page_pos = pages;
350         pages_left = num_pages;
351         read = 0;
352         io_align = off & ~PAGE_MASK;
353
354 more:
355         if (o_direct)
356                 page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
357         else
358                 page_align = pos & ~PAGE_MASK;
359         this_len = left;
360         ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
361                                   &ci->i_layout, pos, &this_len,
362                                   ci->i_truncate_seq,
363                                   ci->i_truncate_size,
364                                   page_pos, pages_left, page_align);
365         if (ret == -ENOENT)
366                 ret = 0;
367         hit_stripe = this_len < left;
368         was_short = ret >= 0 && ret < this_len;
369         dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
370              ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
371
372         if (ret >= 0) {
373                 int didpages;
374                 if (was_short && (pos + ret < inode->i_size)) {
375                         u64 tmp = min(this_len - ret,
376                                         inode->i_size - pos - ret);
377                         dout(" zero gap %llu to %llu\n",
378                                 pos + ret, pos + ret + tmp);
379                         ceph_zero_page_vector_range(page_align + read + ret,
380                                                         tmp, pages);
381                         ret += tmp;
382                 }
383
384                 didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
385                 pos += ret;
386                 read = pos - off;
387                 left -= ret;
388                 page_pos += didpages;
389                 pages_left -= didpages;
390
391                 /* hit stripe and need continue*/
392                 if (left && hit_stripe && pos < inode->i_size)
393                         goto more;
394         }
395
396         if (read > 0) {
397                 ret = read;
398                 /* did we bounce off eof? */
399                 if (pos + left > inode->i_size)
400                         *checkeof = 1;
401         }
402
403         dout("striped_read returns %d\n", ret);
404         return ret;
405 }
406
407 /*
408  * Completely synchronous read and write methods.  Direct from __user
409  * buffer to osd, or directly to user pages (if O_DIRECT).
410  *
411  * If the read spans object boundary, just do multiple reads.
412  */
413 static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
414                                 int *checkeof)
415 {
416         struct file *file = iocb->ki_filp;
417         struct inode *inode = file_inode(file);
418         struct page **pages;
419         u64 off = iocb->ki_pos;
420         int num_pages, ret;
421         size_t len = iov_iter_count(i);
422
423         dout("sync_read on file %p %llu~%u %s\n", file, off,
424              (unsigned)len,
425              (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
426         /*
427          * flush any page cache pages in this range.  this
428          * will make concurrent normal and sync io slow,
429          * but it will at least behave sensibly when they are
430          * in sequence.
431          */
432         ret = filemap_write_and_wait_range(inode->i_mapping, off,
433                                                 off + len);
434         if (ret < 0)
435                 return ret;
436
437         if (file->f_flags & O_DIRECT) {
438                 while (iov_iter_count(i)) {
439                         size_t start;
440                         ssize_t n;
441
442                         n = iov_iter_get_pages_alloc(i, &pages, INT_MAX, &start);
443                         if (n < 0)
444                                 return n;
445
446                         num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
447
448                         ret = striped_read(inode, off, n,
449                                            pages, num_pages, checkeof,
450                                            1, start);
451
452                         ceph_put_page_vector(pages, num_pages, true);
453
454                         if (ret <= 0)
455                                 break;
456                         off += ret;
457                         iov_iter_advance(i, ret);
458                         if (ret < n)
459                                 break;
460                 }
461         } else {
462                 num_pages = calc_pages_for(off, len);
463                 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
464                 if (IS_ERR(pages))
465                         return PTR_ERR(pages);
466                 ret = striped_read(inode, off, len, pages,
467                                         num_pages, checkeof, 0, 0);
468                 if (ret > 0) {
469                         int l, k = 0;
470                         size_t left = ret;
471
472                         while (left) {
473                                 int copy = min_t(size_t, PAGE_SIZE, left);
474                                 l = copy_page_to_iter(pages[k++], 0, copy, i);
475                                 off += l;
476                                 left -= l;
477                                 if (l < copy)
478                                         break;
479                         }
480                 }
481                 ceph_release_page_vector(pages, num_pages);
482         }
483
484         if (off > iocb->ki_pos) {
485                 ret = off - iocb->ki_pos;
486                 iocb->ki_pos = off;
487         }
488
489         dout("sync_read result %d\n", ret);
490         return ret;
491 }
492
493 /*
494  * Write commit request unsafe callback, called to tell us when a
495  * request is unsafe (that is, in flight--has been handed to the
496  * messenger to send to its target osd).  It is called again when
497  * we've received a response message indicating the request is
498  * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request
499  * is completed early (and unsuccessfully) due to a timeout or
500  * interrupt.
501  *
502  * This is used if we requested both an ACK and ONDISK commit reply
503  * from the OSD.
504  */
505 static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
506 {
507         struct ceph_inode_info *ci = ceph_inode(req->r_inode);
508
509         dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid,
510                 unsafe ? "un" : "");
511         if (unsafe) {
512                 ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
513                 spin_lock(&ci->i_unsafe_lock);
514                 list_add_tail(&req->r_unsafe_item,
515                               &ci->i_unsafe_writes);
516                 spin_unlock(&ci->i_unsafe_lock);
517         } else {
518                 spin_lock(&ci->i_unsafe_lock);
519                 list_del_init(&req->r_unsafe_item);
520                 spin_unlock(&ci->i_unsafe_lock);
521                 ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
522         }
523 }
524
525
526 /*
527  * Synchronous write, straight from __user pointer or user pages.
528  *
529  * If write spans object boundary, just do multiple writes.  (For a
530  * correct atomic write, we should e.g. take write locks on all
531  * objects, rollback on failure, etc.)
532  */
533 static ssize_t
534 ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from)
535 {
536         struct file *file = iocb->ki_filp;
537         struct inode *inode = file_inode(file);
538         struct ceph_inode_info *ci = ceph_inode(inode);
539         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
540         struct ceph_snap_context *snapc;
541         struct ceph_vino vino;
542         struct ceph_osd_request *req;
543         struct page **pages;
544         int num_pages;
545         int written = 0;
546         int flags;
547         int check_caps = 0;
548         int ret;
549         struct timespec mtime = CURRENT_TIME;
550         loff_t pos = iocb->ki_pos;
551         size_t count = iov_iter_count(from);
552
553         if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
554                 return -EROFS;
555
556         dout("sync_direct_write on file %p %lld~%u\n", file, pos,
557              (unsigned)count);
558
559         ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
560         if (ret < 0)
561                 return ret;
562
563         ret = invalidate_inode_pages2_range(inode->i_mapping,
564                                             pos >> PAGE_CACHE_SHIFT,
565                                             (pos + count) >> PAGE_CACHE_SHIFT);
566         if (ret < 0)
567                 dout("invalidate_inode_pages2_range returned %d\n", ret);
568
569         flags = CEPH_OSD_FLAG_ORDERSNAP |
570                 CEPH_OSD_FLAG_ONDISK |
571                 CEPH_OSD_FLAG_WRITE;
572
573         while (iov_iter_count(from) > 0) {
574                 u64 len = iov_iter_single_seg_count(from);
575                 size_t start;
576                 ssize_t n;
577
578                 snapc = ci->i_snap_realm->cached_context;
579                 vino = ceph_vino(inode);
580                 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
581                                             vino, pos, &len,
582                                             2,/*include a 'startsync' command*/
583                                             CEPH_OSD_OP_WRITE, flags, snapc,
584                                             ci->i_truncate_seq,
585                                             ci->i_truncate_size,
586                                             false);
587                 if (IS_ERR(req)) {
588                         ret = PTR_ERR(req);
589                         break;
590                 }
591
592                 n = iov_iter_get_pages_alloc(from, &pages, len, &start);
593                 if (unlikely(n < 0)) {
594                         ret = n;
595                         ceph_osdc_put_request(req);
596                         break;
597                 }
598
599                 num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
600                 /*
601                  * throw out any page cache pages in this range. this
602                  * may block.
603                  */
604                 truncate_inode_pages_range(inode->i_mapping, pos,
605                                    (pos+n) | (PAGE_CACHE_SIZE-1));
606                 osd_req_op_extent_osd_data_pages(req, 0, pages, n, start,
607                                                 false, false);
608
609                 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
610                 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
611
612                 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
613                 if (!ret)
614                         ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
615
616                 ceph_put_page_vector(pages, num_pages, false);
617
618                 ceph_osdc_put_request(req);
619                 if (ret)
620                         break;
621                 pos += n;
622                 written += n;
623                 iov_iter_advance(from, n);
624
625                 if (pos > i_size_read(inode)) {
626                         check_caps = ceph_inode_set_size(inode, pos);
627                         if (check_caps)
628                                 ceph_check_caps(ceph_inode(inode),
629                                                 CHECK_CAPS_AUTHONLY,
630                                                 NULL);
631                 }
632         }
633
634         if (ret != -EOLDSNAPC && written > 0) {
635                 iocb->ki_pos = pos;
636                 ret = written;
637         }
638         return ret;
639 }
640
641
642 /*
643  * Synchronous write, straight from __user pointer or user pages.
644  *
645  * If write spans object boundary, just do multiple writes.  (For a
646  * correct atomic write, we should e.g. take write locks on all
647  * objects, rollback on failure, etc.)
648  */
649 static ssize_t ceph_sync_write(struct kiocb *iocb, struct iov_iter *from)
650 {
651         struct file *file = iocb->ki_filp;
652         struct inode *inode = file_inode(file);
653         struct ceph_inode_info *ci = ceph_inode(inode);
654         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
655         struct ceph_snap_context *snapc;
656         struct ceph_vino vino;
657         struct ceph_osd_request *req;
658         struct page **pages;
659         u64 len;
660         int num_pages;
661         int written = 0;
662         int flags;
663         int check_caps = 0;
664         int ret;
665         struct timespec mtime = CURRENT_TIME;
666         loff_t pos = iocb->ki_pos;
667         size_t count = iov_iter_count(from);
668
669         if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
670                 return -EROFS;
671
672         dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
673
674         ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
675         if (ret < 0)
676                 return ret;
677
678         ret = invalidate_inode_pages2_range(inode->i_mapping,
679                                             pos >> PAGE_CACHE_SHIFT,
680                                             (pos + count) >> PAGE_CACHE_SHIFT);
681         if (ret < 0)
682                 dout("invalidate_inode_pages2_range returned %d\n", ret);
683
684         flags = CEPH_OSD_FLAG_ORDERSNAP |
685                 CEPH_OSD_FLAG_ONDISK |
686                 CEPH_OSD_FLAG_WRITE |
687                 CEPH_OSD_FLAG_ACK;
688
689         while ((len = iov_iter_count(from)) > 0) {
690                 size_t left;
691                 int n;
692
693                 snapc = ci->i_snap_realm->cached_context;
694                 vino = ceph_vino(inode);
695                 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
696                                             vino, pos, &len, 1,
697                                             CEPH_OSD_OP_WRITE, flags, snapc,
698                                             ci->i_truncate_seq,
699                                             ci->i_truncate_size,
700                                             false);
701                 if (IS_ERR(req)) {
702                         ret = PTR_ERR(req);
703                         break;
704                 }
705
706                 /*
707                  * write from beginning of first page,
708                  * regardless of io alignment
709                  */
710                 num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
711
712                 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
713                 if (IS_ERR(pages)) {
714                         ret = PTR_ERR(pages);
715                         goto out;
716                 }
717
718                 left = len;
719                 for (n = 0; n < num_pages; n++) {
720                         size_t plen = min_t(size_t, left, PAGE_SIZE);
721                         ret = copy_page_from_iter(pages[n], 0, plen, from);
722                         if (ret != plen) {
723                                 ret = -EFAULT;
724                                 break;
725                         }
726                         left -= ret;
727                 }
728
729                 if (ret < 0) {
730                         ceph_release_page_vector(pages, num_pages);
731                         goto out;
732                 }
733
734                 /* get a second commit callback */
735                 req->r_unsafe_callback = ceph_sync_write_unsafe;
736                 req->r_inode = inode;
737
738                 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
739                                                 false, true);
740
741                 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
742                 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
743
744                 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
745                 if (!ret)
746                         ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
747
748 out:
749                 ceph_osdc_put_request(req);
750                 if (ret == 0) {
751                         pos += len;
752                         written += len;
753
754                         if (pos > i_size_read(inode)) {
755                                 check_caps = ceph_inode_set_size(inode, pos);
756                                 if (check_caps)
757                                         ceph_check_caps(ceph_inode(inode),
758                                                         CHECK_CAPS_AUTHONLY,
759                                                         NULL);
760                         }
761                 } else
762                         break;
763         }
764
765         if (ret != -EOLDSNAPC && written > 0) {
766                 ret = written;
767                 iocb->ki_pos = pos;
768         }
769         return ret;
770 }
771
772 /*
773  * Wrap generic_file_aio_read with checks for cap bits on the inode.
774  * Atomically grab references, so that those bits are not released
775  * back to the MDS mid-read.
776  *
777  * Hmm, the sync read case isn't actually async... should it be?
778  */
779 static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
780 {
781         struct file *filp = iocb->ki_filp;
782         struct ceph_file_info *fi = filp->private_data;
783         size_t len = iocb->ki_nbytes;
784         struct inode *inode = file_inode(filp);
785         struct ceph_inode_info *ci = ceph_inode(inode);
786         ssize_t ret;
787         int want, got = 0;
788         int checkeof = 0, read = 0;
789
790 again:
791         dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
792              inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
793
794         if (fi->fmode & CEPH_FILE_MODE_LAZY)
795                 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
796         else
797                 want = CEPH_CAP_FILE_CACHE;
798         ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
799         if (ret < 0)
800                 return ret;
801
802         if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
803             (iocb->ki_filp->f_flags & O_DIRECT) ||
804             (fi->flags & CEPH_F_SYNC)) {
805
806                 dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n",
807                      inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
808                      ceph_cap_string(got));
809
810                 /* hmm, this isn't really async... */
811                 ret = ceph_sync_read(iocb, to, &checkeof);
812         } else {
813                 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
814                      inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
815                      ceph_cap_string(got));
816
817                 ret = generic_file_read_iter(iocb, to);
818         }
819         dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
820              inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
821         ceph_put_cap_refs(ci, got);
822
823         if (checkeof && ret >= 0) {
824                 int statret = ceph_do_getattr(inode,
825                                               CEPH_STAT_CAP_SIZE);
826
827                 /* hit EOF or hole? */
828                 if (statret == 0 && iocb->ki_pos < inode->i_size &&
829                         ret < len) {
830                         dout("sync_read hit hole, ppos %lld < size %lld"
831                              ", reading more\n", iocb->ki_pos,
832                              inode->i_size);
833
834                         iov_iter_advance(to, ret);
835                         read += ret;
836                         len -= ret;
837                         checkeof = 0;
838                         goto again;
839                 }
840         }
841
842         if (ret >= 0)
843                 ret += read;
844
845         return ret;
846 }
847
848 /*
849  * Take cap references to avoid releasing caps to MDS mid-write.
850  *
851  * If we are synchronous, and write with an old snap context, the OSD
852  * may return EOLDSNAPC.  In that case, retry the write.. _after_
853  * dropping our cap refs and allowing the pending snap to logically
854  * complete _before_ this write occurs.
855  *
856  * If we are near ENOSPC, write synchronously.
857  */
858 static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
859 {
860         struct file *file = iocb->ki_filp;
861         struct ceph_file_info *fi = file->private_data;
862         struct inode *inode = file_inode(file);
863         struct ceph_inode_info *ci = ceph_inode(inode);
864         struct ceph_osd_client *osdc =
865                 &ceph_sb_to_client(inode->i_sb)->client->osdc;
866         ssize_t count = iov_iter_count(from), written = 0;
867         int err, want, got;
868         loff_t pos = iocb->ki_pos;
869
870         if (ceph_snap(inode) != CEPH_NOSNAP)
871                 return -EROFS;
872
873         mutex_lock(&inode->i_mutex);
874
875         /* We can write back this queue in page reclaim */
876         current->backing_dev_info = file->f_mapping->backing_dev_info;
877
878         err = generic_write_checks(file, &pos, &count, S_ISBLK(inode->i_mode));
879         if (err)
880                 goto out;
881
882         if (count == 0)
883                 goto out;
884         iov_iter_truncate(from, count);
885
886         err = file_remove_suid(file);
887         if (err)
888                 goto out;
889
890         err = file_update_time(file);
891         if (err)
892                 goto out;
893
894 retry_snap:
895         if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
896                 err = -ENOSPC;
897                 goto out;
898         }
899
900         dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
901              inode, ceph_vinop(inode), pos, count, inode->i_size);
902         if (fi->fmode & CEPH_FILE_MODE_LAZY)
903                 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
904         else
905                 want = CEPH_CAP_FILE_BUFFER;
906         got = 0;
907         err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count);
908         if (err < 0)
909                 goto out;
910
911         dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
912              inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
913
914         if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
915             (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
916                 struct iov_iter data;
917                 mutex_unlock(&inode->i_mutex);
918                 /* we might need to revert back to that point */
919                 data = *from;
920                 if (file->f_flags & O_DIRECT)
921                         written = ceph_sync_direct_write(iocb, &data);
922                 else
923                         written = ceph_sync_write(iocb, &data);
924                 if (written == -EOLDSNAPC) {
925                         dout("aio_write %p %llx.%llx %llu~%u"
926                                 "got EOLDSNAPC, retrying\n",
927                                 inode, ceph_vinop(inode),
928                                 pos, (unsigned)count);
929                         mutex_lock(&inode->i_mutex);
930                         goto retry_snap;
931                 }
932                 if (written > 0)
933                         iov_iter_advance(from, written);
934         } else {
935                 loff_t old_size = inode->i_size;
936                 /*
937                  * No need to acquire the i_truncate_mutex. Because
938                  * the MDS revokes Fwb caps before sending truncate
939                  * message to us. We can't get Fwb cap while there
940                  * are pending vmtruncate. So write and vmtruncate
941                  * can not run at the same time
942                  */
943                 written = generic_perform_write(file, from, pos);
944                 if (likely(written >= 0))
945                         iocb->ki_pos = pos + written;
946                 if (inode->i_size > old_size)
947                         ceph_fscache_update_objectsize(inode);
948                 mutex_unlock(&inode->i_mutex);
949         }
950
951         if (written >= 0) {
952                 int dirty;
953                 spin_lock(&ci->i_ceph_lock);
954                 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
955                 spin_unlock(&ci->i_ceph_lock);
956                 if (dirty)
957                         __mark_inode_dirty(inode, dirty);
958         }
959
960         dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
961              inode, ceph_vinop(inode), pos, (unsigned)count,
962              ceph_cap_string(got));
963         ceph_put_cap_refs(ci, got);
964
965         if (written >= 0 &&
966             ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) ||
967              ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
968                 err = vfs_fsync_range(file, pos, pos + written - 1, 1);
969                 if (err < 0)
970                         written = err;
971         }
972
973         goto out_unlocked;
974
975 out:
976         mutex_unlock(&inode->i_mutex);
977 out_unlocked:
978         current->backing_dev_info = NULL;
979         return written ? written : err;
980 }
981
982 /*
983  * llseek.  be sure to verify file size on SEEK_END.
984  */
985 static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
986 {
987         struct inode *inode = file->f_mapping->host;
988         int ret;
989
990         mutex_lock(&inode->i_mutex);
991
992         if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
993                 ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
994                 if (ret < 0) {
995                         offset = ret;
996                         goto out;
997                 }
998         }
999
1000         switch (whence) {
1001         case SEEK_END:
1002                 offset += inode->i_size;
1003                 break;
1004         case SEEK_CUR:
1005                 /*
1006                  * Here we special-case the lseek(fd, 0, SEEK_CUR)
1007                  * position-querying operation.  Avoid rewriting the "same"
1008                  * f_pos value back to the file because a concurrent read(),
1009                  * write() or lseek() might have altered it
1010                  */
1011                 if (offset == 0) {
1012                         offset = file->f_pos;
1013                         goto out;
1014                 }
1015                 offset += file->f_pos;
1016                 break;
1017         case SEEK_DATA:
1018                 if (offset >= inode->i_size) {
1019                         ret = -ENXIO;
1020                         goto out;
1021                 }
1022                 break;
1023         case SEEK_HOLE:
1024                 if (offset >= inode->i_size) {
1025                         ret = -ENXIO;
1026                         goto out;
1027                 }
1028                 offset = inode->i_size;
1029                 break;
1030         }
1031
1032         offset = vfs_setpos(file, offset, inode->i_sb->s_maxbytes);
1033
1034 out:
1035         mutex_unlock(&inode->i_mutex);
1036         return offset;
1037 }
1038
1039 static inline void ceph_zero_partial_page(
1040         struct inode *inode, loff_t offset, unsigned size)
1041 {
1042         struct page *page;
1043         pgoff_t index = offset >> PAGE_CACHE_SHIFT;
1044
1045         page = find_lock_page(inode->i_mapping, index);
1046         if (page) {
1047                 wait_on_page_writeback(page);
1048                 zero_user(page, offset & (PAGE_CACHE_SIZE - 1), size);
1049                 unlock_page(page);
1050                 page_cache_release(page);
1051         }
1052 }
1053
1054 static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset,
1055                                       loff_t length)
1056 {
1057         loff_t nearly = round_up(offset, PAGE_CACHE_SIZE);
1058         if (offset < nearly) {
1059                 loff_t size = nearly - offset;
1060                 if (length < size)
1061                         size = length;
1062                 ceph_zero_partial_page(inode, offset, size);
1063                 offset += size;
1064                 length -= size;
1065         }
1066         if (length >= PAGE_CACHE_SIZE) {
1067                 loff_t size = round_down(length, PAGE_CACHE_SIZE);
1068                 truncate_pagecache_range(inode, offset, offset + size - 1);
1069                 offset += size;
1070                 length -= size;
1071         }
1072         if (length)
1073                 ceph_zero_partial_page(inode, offset, length);
1074 }
1075
1076 static int ceph_zero_partial_object(struct inode *inode,
1077                                     loff_t offset, loff_t *length)
1078 {
1079         struct ceph_inode_info *ci = ceph_inode(inode);
1080         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1081         struct ceph_osd_request *req;
1082         int ret = 0;
1083         loff_t zero = 0;
1084         int op;
1085
1086         if (!length) {
1087                 op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE;
1088                 length = &zero;
1089         } else {
1090                 op = CEPH_OSD_OP_ZERO;
1091         }
1092
1093         req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1094                                         ceph_vino(inode),
1095                                         offset, length,
1096                                         1, op,
1097                                         CEPH_OSD_FLAG_WRITE |
1098                                         CEPH_OSD_FLAG_ONDISK,
1099                                         NULL, 0, 0, false);
1100         if (IS_ERR(req)) {
1101                 ret = PTR_ERR(req);
1102                 goto out;
1103         }
1104
1105         ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap,
1106                                 &inode->i_mtime);
1107
1108         ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1109         if (!ret) {
1110                 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
1111                 if (ret == -ENOENT)
1112                         ret = 0;
1113         }
1114         ceph_osdc_put_request(req);
1115
1116 out:
1117         return ret;
1118 }
1119
1120 static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length)
1121 {
1122         int ret = 0;
1123         struct ceph_inode_info *ci = ceph_inode(inode);
1124         s32 stripe_unit = ceph_file_layout_su(ci->i_layout);
1125         s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
1126         s32 object_size = ceph_file_layout_object_size(ci->i_layout);
1127         u64 object_set_size = object_size * stripe_count;
1128         u64 nearly, t;
1129
1130         /* round offset up to next period boundary */
1131         nearly = offset + object_set_size - 1;
1132         t = nearly;
1133         nearly -= do_div(t, object_set_size);
1134
1135         while (length && offset < nearly) {
1136                 loff_t size = length;
1137                 ret = ceph_zero_partial_object(inode, offset, &size);
1138                 if (ret < 0)
1139                         return ret;
1140                 offset += size;
1141                 length -= size;
1142         }
1143         while (length >= object_set_size) {
1144                 int i;
1145                 loff_t pos = offset;
1146                 for (i = 0; i < stripe_count; ++i) {
1147                         ret = ceph_zero_partial_object(inode, pos, NULL);
1148                         if (ret < 0)
1149                                 return ret;
1150                         pos += stripe_unit;
1151                 }
1152                 offset += object_set_size;
1153                 length -= object_set_size;
1154         }
1155         while (length) {
1156                 loff_t size = length;
1157                 ret = ceph_zero_partial_object(inode, offset, &size);
1158                 if (ret < 0)
1159                         return ret;
1160                 offset += size;
1161                 length -= size;
1162         }
1163         return ret;
1164 }
1165
1166 static long ceph_fallocate(struct file *file, int mode,
1167                                 loff_t offset, loff_t length)
1168 {
1169         struct ceph_file_info *fi = file->private_data;
1170         struct inode *inode = file_inode(file);
1171         struct ceph_inode_info *ci = ceph_inode(inode);
1172         struct ceph_osd_client *osdc =
1173                 &ceph_inode_to_client(inode)->client->osdc;
1174         int want, got = 0;
1175         int dirty;
1176         int ret = 0;
1177         loff_t endoff = 0;
1178         loff_t size;
1179
1180         if (!S_ISREG(inode->i_mode))
1181                 return -EOPNOTSUPP;
1182
1183         mutex_lock(&inode->i_mutex);
1184
1185         if (ceph_snap(inode) != CEPH_NOSNAP) {
1186                 ret = -EROFS;
1187                 goto unlock;
1188         }
1189
1190         if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) &&
1191                 !(mode & FALLOC_FL_PUNCH_HOLE)) {
1192                 ret = -ENOSPC;
1193                 goto unlock;
1194         }
1195
1196         size = i_size_read(inode);
1197         if (!(mode & FALLOC_FL_KEEP_SIZE))
1198                 endoff = offset + length;
1199
1200         if (fi->fmode & CEPH_FILE_MODE_LAZY)
1201                 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1202         else
1203                 want = CEPH_CAP_FILE_BUFFER;
1204
1205         ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
1206         if (ret < 0)
1207                 goto unlock;
1208
1209         if (mode & FALLOC_FL_PUNCH_HOLE) {
1210                 if (offset < size)
1211                         ceph_zero_pagecache_range(inode, offset, length);
1212                 ret = ceph_zero_objects(inode, offset, length);
1213         } else if (endoff > size) {
1214                 truncate_pagecache_range(inode, size, -1);
1215                 if (ceph_inode_set_size(inode, endoff))
1216                         ceph_check_caps(ceph_inode(inode),
1217                                 CHECK_CAPS_AUTHONLY, NULL);
1218         }
1219
1220         if (!ret) {
1221                 spin_lock(&ci->i_ceph_lock);
1222                 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1223                 spin_unlock(&ci->i_ceph_lock);
1224                 if (dirty)
1225                         __mark_inode_dirty(inode, dirty);
1226         }
1227
1228         ceph_put_cap_refs(ci, got);
1229 unlock:
1230         mutex_unlock(&inode->i_mutex);
1231         return ret;
1232 }
1233
1234 const struct file_operations ceph_file_fops = {
1235         .open = ceph_open,
1236         .release = ceph_release,
1237         .llseek = ceph_llseek,
1238         .read = new_sync_read,
1239         .write = new_sync_write,
1240         .read_iter = ceph_read_iter,
1241         .write_iter = ceph_write_iter,
1242         .mmap = ceph_mmap,
1243         .fsync = ceph_fsync,
1244         .lock = ceph_lock,
1245         .flock = ceph_flock,
1246         .splice_read = generic_file_splice_read,
1247         .splice_write = iter_file_splice_write,
1248         .unlocked_ioctl = ceph_ioctl,
1249         .compat_ioctl   = ceph_ioctl,
1250         .fallocate      = ceph_fallocate,
1251 };
1252