staging/lustre/llite: define per open file cache for ll_cl_context
[cascardo/linux.git] / drivers / staging / lustre / lustre / llite / rw.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/rw.c
37  *
38  * Lustre Lite I/O page cache routines shared by different kernel revs
39  */
40
41 #include <linux/kernel.h>
42 #include <linux/mm.h>
43 #include <linux/string.h>
44 #include <linux/stat.h>
45 #include <linux/errno.h>
46 #include <linux/unistd.h>
47 #include <linux/writeback.h>
48 #include <linux/uaccess.h>
49
50 #include <linux/fs.h>
51 #include <linux/pagemap.h>
52 /* current_is_kswapd() */
53 #include <linux/swap.h>
54
55 #define DEBUG_SUBSYSTEM S_LLITE
56
57 #include "../include/lustre_lite.h"
58 #include "../include/obd_cksum.h"
59 #include "llite_internal.h"
60 #include "../include/linux/lustre_compat25.h"
61
62 static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which);
63
64 /**
65  * Get readahead pages from the filesystem readahead pool of the client for a
66  * thread.
67  *
68  * /param sbi superblock for filesystem readahead state ll_ra_info
69  * /param ria per-thread readahead state
70  * /param pages number of pages requested for readahead for the thread.
71  *
72  * WARNING: This algorithm is used to reduce contention on sbi->ll_lock.
73  * It should work well if the ra_max_pages is much greater than the single
74  * file's read-ahead window, and not too many threads contending for
75  * these readahead pages.
76  *
77  * TODO: There may be a 'global sync problem' if many threads are trying
78  * to get an ra budget that is larger than the remaining readahead pages
79  * and reach here at exactly the same time. They will compute /a ret to
80  * consume the remaining pages, but will fail at atomic_add_return() and
81  * get a zero ra window, although there is still ra space remaining. - Jay
82  */
83 static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
84                                      struct ra_io_arg *ria,
85                                      unsigned long pages, unsigned long min)
86 {
87         struct ll_ra_info *ra = &sbi->ll_ra_info;
88         long ret;
89
90         /* If read-ahead pages left are less than 1M, do not do read-ahead,
91          * otherwise it will form small read RPC(< 1M), which hurt server
92          * performance a lot.
93          */
94         ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages), pages);
95         if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages)) {
96                 ret = 0;
97                 goto out;
98         }
99
100         /* If the non-strided (ria_pages == 0) readahead window
101          * (ria_start + ret) has grown across an RPC boundary, then trim
102          * readahead size by the amount beyond the RPC so it ends on an
103          * RPC boundary. If the readahead window is already ending on
104          * an RPC boundary (beyond_rpc == 0), or smaller than a full
105          * RPC (beyond_rpc < ret) the readahead size is unchanged.
106          * The (beyond_rpc != 0) check is skipped since the conditional
107          * branch is more expensive than subtracting zero from the result.
108          *
109          * Strided read is left unaligned to avoid small fragments beyond
110          * the RPC boundary from needing an extra read RPC.
111          */
112         if (ria->ria_pages == 0) {
113                 long beyond_rpc = (ria->ria_start + ret) % PTLRPC_MAX_BRW_PAGES;
114
115                 if (/* beyond_rpc != 0 && */ beyond_rpc < ret)
116                         ret -= beyond_rpc;
117         }
118
119         if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
120                 atomic_sub(ret, &ra->ra_cur_pages);
121                 ret = 0;
122         }
123
124 out:
125         if (ret < min) {
126                 /* override ra limit for maximum performance */
127                 atomic_add(min - ret, &ra->ra_cur_pages);
128                 ret = min;
129         }
130         return ret;
131 }
132
133 void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
134 {
135         struct ll_ra_info *ra = &sbi->ll_ra_info;
136
137         atomic_sub(len, &ra->ra_cur_pages);
138 }
139
140 static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
141 {
142         LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which);
143         lprocfs_counter_incr(sbi->ll_ra_stats, which);
144 }
145
146 void ll_ra_stats_inc(struct inode *inode, enum ra_stat which)
147 {
148         struct ll_sb_info *sbi = ll_i2sbi(inode);
149
150         ll_ra_stats_inc_sbi(sbi, which);
151 }
152
153 #define RAS_CDEBUG(ras) \
154         CDEBUG(D_READA,                                               \
155                "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu r %lu ri %lu"    \
156                "csr %lu sf %lu sp %lu sl %lu\n",                            \
157                ras->ras_last_readpage, ras->ras_consecutive_requests,   \
158                ras->ras_consecutive_pages, ras->ras_window_start,           \
159                ras->ras_window_len, ras->ras_next_readahead,             \
160                ras->ras_requests, ras->ras_request_index,                   \
161                ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
162                ras->ras_stride_pages, ras->ras_stride_length)
163
164 static int index_in_window(unsigned long index, unsigned long point,
165                            unsigned long before, unsigned long after)
166 {
167         unsigned long start = point - before, end = point + after;
168
169         if (start > point)
170                 start = 0;
171         if (end < point)
172                 end = ~0;
173
174         return start <= index && index <= end;
175 }
176
177 void ll_ras_enter(struct file *f)
178 {
179         struct ll_file_data *fd = LUSTRE_FPRIVATE(f);
180         struct ll_readahead_state *ras = &fd->fd_ras;
181
182         spin_lock(&ras->ras_lock);
183         ras->ras_requests++;
184         ras->ras_request_index = 0;
185         ras->ras_consecutive_requests++;
186         spin_unlock(&ras->ras_lock);
187 }
188
189 static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
190                               struct cl_page_list *queue, struct cl_page *page,
191                               struct cl_object *clob, pgoff_t *max_index)
192 {
193         struct page *vmpage = page->cp_vmpage;
194         struct vvp_page *vpg;
195         int           rc;
196
197         rc = 0;
198         cl_page_assume(env, io, page);
199         lu_ref_add(&page->cp_reference, "ra", current);
200         vpg = cl2vvp_page(cl_object_page_slice(clob, page));
201         if (!vpg->vpg_defer_uptodate && !PageUptodate(vmpage)) {
202                 CDEBUG(D_READA, "page index %lu, max_index: %lu\n",
203                        vvp_index(vpg), *max_index);
204                 if (*max_index == 0 || vvp_index(vpg) > *max_index)
205                         rc = cl_page_is_under_lock(env, io, page, max_index);
206                 if (rc == 0) {
207                         vpg->vpg_defer_uptodate = 1;
208                         vpg->vpg_ra_used = 0;
209                         cl_page_list_add(queue, page);
210                         rc = 1;
211                 } else {
212                         cl_page_discard(env, io, page);
213                         rc = -ENOLCK;
214                 }
215         } else {
216                 /* skip completed pages */
217                 cl_page_unassume(env, io, page);
218         }
219         lu_ref_del(&page->cp_reference, "ra", current);
220         cl_page_put(env, page);
221         return rc;
222 }
223
224 /**
225  * Initiates read-ahead of a page with given index.
226  *
227  * \retval     +ve: page was added to \a queue.
228  *
229  * \retval -ENOLCK: there is no extent lock for this part of a file, stop
230  *                read-ahead.
231  *
232  * \retval  -ve, 0: page wasn't added to \a queue for other reason.
233  */
234 static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
235                               struct cl_page_list *queue,
236                               pgoff_t index, pgoff_t *max_index)
237 {
238         struct cl_object *clob  = io->ci_obj;
239         struct inode     *inode = vvp_object_inode(clob);
240         struct page      *vmpage;
241         struct cl_page   *page;
242         enum ra_stat      which = _NR_RA_STAT; /* keep gcc happy */
243         int            rc    = 0;
244         const char       *msg   = NULL;
245
246         vmpage = grab_cache_page_nowait(inode->i_mapping, index);
247         if (vmpage) {
248                 /* Check if vmpage was truncated or reclaimed */
249                 if (vmpage->mapping == inode->i_mapping) {
250                         page = cl_page_find(env, clob, vmpage->index,
251                                             vmpage, CPT_CACHEABLE);
252                         if (!IS_ERR(page)) {
253                                 rc = cl_read_ahead_page(env, io, queue,
254                                                         page, clob, max_index);
255                                 if (rc == -ENOLCK) {
256                                         which = RA_STAT_FAILED_MATCH;
257                                         msg   = "lock match failed";
258                                 }
259                         } else {
260                                 which = RA_STAT_FAILED_GRAB_PAGE;
261                                 msg   = "cl_page_find failed";
262                         }
263                 } else {
264                         which = RA_STAT_WRONG_GRAB_PAGE;
265                         msg   = "g_c_p_n returned invalid page";
266                 }
267                 if (rc != 1)
268                         unlock_page(vmpage);
269                 put_page(vmpage);
270         } else {
271                 which = RA_STAT_FAILED_GRAB_PAGE;
272                 msg   = "g_c_p_n failed";
273         }
274         if (msg) {
275                 ll_ra_stats_inc(inode, which);
276                 CDEBUG(D_READA, "%s\n", msg);
277         }
278         return rc;
279 }
280
281 #define RIA_DEBUG(ria)                                                 \
282         CDEBUG(D_READA, "rs %lu re %lu ro %lu rl %lu rp %lu\n",       \
283         ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\
284         ria->ria_pages)
285
286 /* Limit this to the blocksize instead of PTLRPC_BRW_MAX_SIZE, since we don't
287  * know what the actual RPC size is.  If this needs to change, it makes more
288  * sense to tune the i_blkbits value for the file based on the OSTs it is
289  * striped over, rather than having a constant value for all files here.
290  */
291
292 /* RAS_INCREASE_STEP should be (1UL << (inode->i_blkbits - PAGE_SHIFT)).
293  * Temporarily set RAS_INCREASE_STEP to 1MB. After 4MB RPC is enabled
294  * by default, this should be adjusted corresponding with max_read_ahead_mb
295  * and max_read_ahead_per_file_mb otherwise the readahead budget can be used
296  * up quickly which will affect read performance significantly. See LU-2816
297  */
298 #define RAS_INCREASE_STEP(inode) (ONE_MB_BRW_SIZE >> PAGE_SHIFT)
299
300 static inline int stride_io_mode(struct ll_readahead_state *ras)
301 {
302         return ras->ras_consecutive_stride_requests > 1;
303 }
304
305 /* The function calculates how much pages will be read in
306  * [off, off + length], in such stride IO area,
307  * stride_offset = st_off, stride_length = st_len,
308  * stride_pages = st_pgs
309  *
310  *   |------------------|*****|------------------|*****|------------|*****|....
311  * st_off
312  *   |--- st_pgs     ---|
313  *   |-----     st_len   -----|
314  *
315  *            How many pages it should read in such pattern
316  *            |-------------------------------------------------------------|
317  *            off
318  *            |<------            length                      ------->|
319  *
320  *        =   |<----->|  +  |-------------------------------------| +   |---|
321  *           start_left          st_pgs * i                 end_left
322  */
323 static unsigned long
324 stride_pg_count(pgoff_t st_off, unsigned long st_len, unsigned long st_pgs,
325                 unsigned long off, unsigned long length)
326 {
327         __u64 start = off > st_off ? off - st_off : 0;
328         __u64 end = off + length > st_off ? off + length - st_off : 0;
329         unsigned long start_left = 0;
330         unsigned long end_left = 0;
331         unsigned long pg_count;
332
333         if (st_len == 0 || length == 0 || end == 0)
334                 return length;
335
336         start_left = do_div(start, st_len);
337         if (start_left < st_pgs)
338                 start_left = st_pgs - start_left;
339         else
340                 start_left = 0;
341
342         end_left = do_div(end, st_len);
343         if (end_left > st_pgs)
344                 end_left = st_pgs;
345
346         CDEBUG(D_READA, "start %llu, end %llu start_left %lu end_left %lu\n",
347                start, end, start_left, end_left);
348
349         if (start == end)
350                 pg_count = end_left - (st_pgs - start_left);
351         else
352                 pg_count = start_left + st_pgs * (end - start - 1) + end_left;
353
354         CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %lu pgcount %lu\n",
355                st_off, st_len, st_pgs, off, length, pg_count);
356
357         return pg_count;
358 }
359
360 static int ria_page_count(struct ra_io_arg *ria)
361 {
362         __u64 length = ria->ria_end >= ria->ria_start ?
363                        ria->ria_end - ria->ria_start + 1 : 0;
364
365         return stride_pg_count(ria->ria_stoff, ria->ria_length,
366                                ria->ria_pages, ria->ria_start,
367                                length);
368 }
369
370 /*Check whether the index is in the defined ra-window */
371 static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria)
372 {
373         /* If ria_length == ria_pages, it means non-stride I/O mode,
374          * idx should always inside read-ahead window in this case
375          * For stride I/O mode, just check whether the idx is inside
376          * the ria_pages.
377          */
378         return ria->ria_length == 0 || ria->ria_length == ria->ria_pages ||
379                (idx >= ria->ria_stoff && (idx - ria->ria_stoff) %
380                 ria->ria_length < ria->ria_pages);
381 }
382
383 static int ll_read_ahead_pages(const struct lu_env *env,
384                                struct cl_io *io, struct cl_page_list *queue,
385                                struct ra_io_arg *ria,
386                                unsigned long *reserved_pages,
387                                unsigned long *ra_end)
388 {
389         int rc, count = 0;
390         bool stride_ria;
391         pgoff_t page_idx;
392         pgoff_t max_index = 0;
393
394         LASSERT(ria);
395         RIA_DEBUG(ria);
396
397         stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
398         for (page_idx = ria->ria_start;
399              page_idx <= ria->ria_end && *reserved_pages > 0; page_idx++) {
400                 if (ras_inside_ra_window(page_idx, ria)) {
401                         /* If the page is inside the read-ahead window*/
402                         rc = ll_read_ahead_page(env, io, queue,
403                                                 page_idx, &max_index);
404                         if (rc == 1) {
405                                 (*reserved_pages)--;
406                                 count++;
407                         } else if (rc == -ENOLCK) {
408                                 break;
409                         }
410                 } else if (stride_ria) {
411                         /* If it is not in the read-ahead window, and it is
412                          * read-ahead mode, then check whether it should skip
413                          * the stride gap
414                          */
415                         pgoff_t offset;
416                         /* FIXME: This assertion only is valid when it is for
417                          * forward read-ahead, it will be fixed when backward
418                          * read-ahead is implemented
419                          */
420                         LASSERTF(page_idx > ria->ria_stoff, "Invalid page_idx %lu rs %lu re %lu ro %lu rl %lu rp %lu\n",
421                                  page_idx,
422                                  ria->ria_start, ria->ria_end, ria->ria_stoff,
423                                  ria->ria_length, ria->ria_pages);
424                         offset = page_idx - ria->ria_stoff;
425                         offset = offset % (ria->ria_length);
426                         if (offset > ria->ria_pages) {
427                                 page_idx += ria->ria_length - offset;
428                                 CDEBUG(D_READA, "i %lu skip %lu\n", page_idx,
429                                        ria->ria_length - offset);
430                                 continue;
431                         }
432                 }
433         }
434         *ra_end = page_idx;
435         return count;
436 }
437
438 int ll_readahead(const struct lu_env *env, struct cl_io *io,
439                  struct cl_page_list *queue, struct ll_readahead_state *ras,
440                  bool hit)
441 {
442         struct vvp_io *vio = vvp_env_io(env);
443         struct ll_thread_info *lti = ll_env_info(env);
444         struct cl_attr *attr = vvp_env_thread_attr(env);
445         unsigned long start = 0, end = 0, reserved;
446         unsigned long ra_end, len, mlen = 0;
447         struct inode *inode;
448         struct ra_io_arg *ria = &lti->lti_ria;
449         struct cl_object *clob;
450         int ret = 0;
451         __u64 kms;
452
453         clob = io->ci_obj;
454         inode = vvp_object_inode(clob);
455
456         memset(ria, 0, sizeof(*ria));
457
458         cl_object_attr_lock(clob);
459         ret = cl_object_attr_get(env, clob, attr);
460         cl_object_attr_unlock(clob);
461
462         if (ret != 0)
463                 return ret;
464         kms = attr->cat_kms;
465         if (kms == 0) {
466                 ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
467                 return 0;
468         }
469
470         spin_lock(&ras->ras_lock);
471
472         /* Enlarge the RA window to encompass the full read */
473         if (vio->vui_ra_valid &&
474             ras->ras_window_start + ras->ras_window_len <
475             vio->vui_ra_start + vio->vui_ra_count) {
476                 ras->ras_window_len = vio->vui_ra_start + vio->vui_ra_count -
477                                       ras->ras_window_start;
478         }
479
480         /* Reserve a part of the read-ahead window that we'll be issuing */
481         if (ras->ras_window_len) {
482                 start = ras->ras_next_readahead;
483                 end = ras->ras_window_start + ras->ras_window_len - 1;
484         }
485         if (end != 0) {
486                 unsigned long rpc_boundary;
487                 /*
488                  * Align RA window to an optimal boundary.
489                  *
490                  * XXX This would be better to align to cl_max_pages_per_rpc
491                  * instead of PTLRPC_MAX_BRW_PAGES, because the RPC size may
492                  * be aligned to the RAID stripe size in the future and that
493                  * is more important than the RPC size.
494                  */
495                 /* Note: we only trim the RPC, instead of extending the RPC
496                  * to the boundary, so to avoid reading too much pages during
497                  * random reading.
498                  */
499                 rpc_boundary = (end + 1) & (~(PTLRPC_MAX_BRW_PAGES - 1));
500                 if (rpc_boundary > 0)
501                         rpc_boundary--;
502
503                 if (rpc_boundary  > start)
504                         end = rpc_boundary;
505
506                 /* Truncate RA window to end of file */
507                 end = min(end, (unsigned long)((kms - 1) >> PAGE_SHIFT));
508
509                 ras->ras_next_readahead = max(end, end + 1);
510                 RAS_CDEBUG(ras);
511         }
512         ria->ria_start = start;
513         ria->ria_end = end;
514         /* If stride I/O mode is detected, get stride window*/
515         if (stride_io_mode(ras)) {
516                 ria->ria_stoff = ras->ras_stride_offset;
517                 ria->ria_length = ras->ras_stride_length;
518                 ria->ria_pages = ras->ras_stride_pages;
519         }
520         spin_unlock(&ras->ras_lock);
521
522         if (end == 0) {
523                 ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
524                 return 0;
525         }
526         len = ria_page_count(ria);
527         if (len == 0) {
528                 ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
529                 return 0;
530         }
531
532         CDEBUG(D_READA, DFID ": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n",
533                PFID(lu_object_fid(&clob->co_lu)),
534                ria->ria_start, ria->ria_end,
535                vio->vui_ra_valid ? vio->vui_ra_start : 0,
536                vio->vui_ra_valid ? vio->vui_ra_count : 0,
537                hit);
538
539         /* at least to extend the readahead window to cover current read */
540         if (!hit && vio->vui_ra_valid &&
541             vio->vui_ra_start + vio->vui_ra_count > ria->ria_start) {
542                 /* to the end of current read window. */
543                 mlen = vio->vui_ra_start + vio->vui_ra_count - ria->ria_start;
544                 /* trim to RPC boundary */
545                 start = ria->ria_start & (PTLRPC_MAX_BRW_PAGES - 1);
546                 mlen = min(mlen, PTLRPC_MAX_BRW_PAGES - start);
547         }
548
549         reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len, mlen);
550         if (reserved < len)
551                 ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
552
553         CDEBUG(D_READA, "reserved pages %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
554                reserved, len, mlen,
555                atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
556                ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
557
558         ret = ll_read_ahead_pages(env, io, queue, ria, &reserved, &ra_end);
559
560         if (reserved != 0)
561                 ll_ra_count_put(ll_i2sbi(inode), reserved);
562
563         if (ra_end == end + 1 && ra_end == (kms >> PAGE_SHIFT))
564                 ll_ra_stats_inc(inode, RA_STAT_EOF);
565
566         /* if we didn't get to the end of the region we reserved from
567          * the ras we need to go back and update the ras so that the
568          * next read-ahead tries from where we left off.  we only do so
569          * if the region we failed to issue read-ahead on is still ahead
570          * of the app and behind the next index to start read-ahead from
571          */
572         CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu\n",
573                ra_end, end, ria->ria_end);
574
575         if (ra_end != end + 1) {
576                 ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
577                 spin_lock(&ras->ras_lock);
578                 if (ra_end < ras->ras_next_readahead &&
579                     index_in_window(ra_end, ras->ras_window_start, 0,
580                                     ras->ras_window_len)) {
581                         ras->ras_next_readahead = ra_end;
582                         RAS_CDEBUG(ras);
583                 }
584                 spin_unlock(&ras->ras_lock);
585         }
586
587         return ret;
588 }
589
590 static void ras_set_start(struct inode *inode, struct ll_readahead_state *ras,
591                           unsigned long index)
592 {
593         ras->ras_window_start = index & (~(RAS_INCREASE_STEP(inode) - 1));
594 }
595
596 /* called with the ras_lock held or from places where it doesn't matter */
597 static void ras_reset(struct inode *inode, struct ll_readahead_state *ras,
598                       unsigned long index)
599 {
600         ras->ras_last_readpage = index;
601         ras->ras_consecutive_requests = 0;
602         ras->ras_consecutive_pages = 0;
603         ras->ras_window_len = 0;
604         ras_set_start(inode, ras, index);
605         ras->ras_next_readahead = max(ras->ras_window_start, index);
606
607         RAS_CDEBUG(ras);
608 }
609
610 /* called with the ras_lock held or from places where it doesn't matter */
611 static void ras_stride_reset(struct ll_readahead_state *ras)
612 {
613         ras->ras_consecutive_stride_requests = 0;
614         ras->ras_stride_length = 0;
615         ras->ras_stride_pages = 0;
616         RAS_CDEBUG(ras);
617 }
618
619 void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
620 {
621         spin_lock_init(&ras->ras_lock);
622         ras_reset(inode, ras, 0);
623         ras->ras_requests = 0;
624 }
625
626 /*
627  * Check whether the read request is in the stride window.
628  * If it is in the stride window, return 1, otherwise return 0.
629  */
630 static int index_in_stride_window(struct ll_readahead_state *ras,
631                                   unsigned long index)
632 {
633         unsigned long stride_gap;
634
635         if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 ||
636             ras->ras_stride_pages == ras->ras_stride_length)
637                 return 0;
638
639         stride_gap = index - ras->ras_last_readpage - 1;
640
641         /* If it is contiguous read */
642         if (stride_gap == 0)
643                 return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
644
645         /* Otherwise check the stride by itself */
646         return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
647                 ras->ras_consecutive_pages == ras->ras_stride_pages;
648 }
649
650 static void ras_update_stride_detector(struct ll_readahead_state *ras,
651                                        unsigned long index)
652 {
653         unsigned long stride_gap = index - ras->ras_last_readpage - 1;
654
655         if (!stride_io_mode(ras) && (stride_gap != 0 ||
656             ras->ras_consecutive_stride_requests == 0)) {
657                 ras->ras_stride_pages = ras->ras_consecutive_pages;
658                 ras->ras_stride_length = stride_gap+ras->ras_consecutive_pages;
659         }
660         LASSERT(ras->ras_request_index == 0);
661         LASSERT(ras->ras_consecutive_stride_requests == 0);
662
663         if (index <= ras->ras_last_readpage) {
664                 /*Reset stride window for forward read*/
665                 ras_stride_reset(ras);
666                 return;
667         }
668
669         ras->ras_stride_pages = ras->ras_consecutive_pages;
670         ras->ras_stride_length = stride_gap+ras->ras_consecutive_pages;
671
672         RAS_CDEBUG(ras);
673         return;
674 }
675
676 /* Stride Read-ahead window will be increased inc_len according to
677  * stride I/O pattern
678  */
679 static void ras_stride_increase_window(struct ll_readahead_state *ras,
680                                        struct ll_ra_info *ra,
681                                        unsigned long inc_len)
682 {
683         unsigned long left, step, window_len;
684         unsigned long stride_len;
685
686         LASSERT(ras->ras_stride_length > 0);
687         LASSERTF(ras->ras_window_start + ras->ras_window_len
688                  >= ras->ras_stride_offset, "window_start %lu, window_len %lu stride_offset %lu\n",
689                  ras->ras_window_start,
690                  ras->ras_window_len, ras->ras_stride_offset);
691
692         stride_len = ras->ras_window_start + ras->ras_window_len -
693                      ras->ras_stride_offset;
694
695         left = stride_len % ras->ras_stride_length;
696         window_len = ras->ras_window_len - left;
697
698         if (left < ras->ras_stride_pages)
699                 left += inc_len;
700         else
701                 left = ras->ras_stride_pages + inc_len;
702
703         LASSERT(ras->ras_stride_pages != 0);
704
705         step = left / ras->ras_stride_pages;
706         left %= ras->ras_stride_pages;
707
708         window_len += step * ras->ras_stride_length + left;
709
710         if (stride_pg_count(ras->ras_stride_offset, ras->ras_stride_length,
711                             ras->ras_stride_pages, ras->ras_stride_offset,
712                             window_len) <= ra->ra_max_pages_per_file)
713                 ras->ras_window_len = window_len;
714
715         RAS_CDEBUG(ras);
716 }
717
718 static void ras_increase_window(struct inode *inode,
719                                 struct ll_readahead_state *ras,
720                                 struct ll_ra_info *ra)
721 {
722         /* The stretch of ra-window should be aligned with max rpc_size
723          * but current clio architecture does not support retrieve such
724          * information from lower layer. FIXME later
725          */
726         if (stride_io_mode(ras))
727                 ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP(inode));
728         else
729                 ras->ras_window_len = min(ras->ras_window_len +
730                                           RAS_INCREASE_STEP(inode),
731                                           ra->ra_max_pages_per_file);
732 }
733
734 void ras_update(struct ll_sb_info *sbi, struct inode *inode,
735                 struct ll_readahead_state *ras, unsigned long index,
736                 unsigned hit)
737 {
738         struct ll_ra_info *ra = &sbi->ll_ra_info;
739         int zero = 0, stride_detect = 0, ra_miss = 0;
740
741         spin_lock(&ras->ras_lock);
742
743         ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS);
744
745         /* reset the read-ahead window in two cases.  First when the app seeks
746          * or reads to some other part of the file.  Secondly if we get a
747          * read-ahead miss that we think we've previously issued.  This can
748          * be a symptom of there being so many read-ahead pages that the VM is
749          * reclaiming it before we get to it.
750          */
751         if (!index_in_window(index, ras->ras_last_readpage, 8, 8)) {
752                 zero = 1;
753                 ll_ra_stats_inc_sbi(sbi, RA_STAT_DISTANT_READPAGE);
754         } else if (!hit && ras->ras_window_len &&
755                    index < ras->ras_next_readahead &&
756                    index_in_window(index, ras->ras_window_start, 0,
757                                    ras->ras_window_len)) {
758                 ra_miss = 1;
759                 ll_ra_stats_inc_sbi(sbi, RA_STAT_MISS_IN_WINDOW);
760         }
761
762         /* On the second access to a file smaller than the tunable
763          * ra_max_read_ahead_whole_pages trigger RA on all pages in the
764          * file up to ra_max_pages_per_file.  This is simply a best effort
765          * and only occurs once per open file.  Normal RA behavior is reverted
766          * to for subsequent IO.  The mmap case does not increment
767          * ras_requests and thus can never trigger this behavior.
768          */
769         if (ras->ras_requests == 2 && !ras->ras_request_index) {
770                 __u64 kms_pages;
771
772                 kms_pages = (i_size_read(inode) + PAGE_SIZE - 1) >>
773                             PAGE_SHIFT;
774
775                 CDEBUG(D_READA, "kmsp %llu mwp %lu mp %lu\n", kms_pages,
776                        ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages_per_file);
777
778                 if (kms_pages &&
779                     kms_pages <= ra->ra_max_read_ahead_whole_pages) {
780                         ras->ras_window_start = 0;
781                         ras->ras_last_readpage = 0;
782                         ras->ras_next_readahead = 0;
783                         ras->ras_window_len = min(ra->ra_max_pages_per_file,
784                                 ra->ra_max_read_ahead_whole_pages);
785                         goto out_unlock;
786                 }
787         }
788         if (zero) {
789                 /* check whether it is in stride I/O mode*/
790                 if (!index_in_stride_window(ras, index)) {
791                         if (ras->ras_consecutive_stride_requests == 0 &&
792                             ras->ras_request_index == 0) {
793                                 ras_update_stride_detector(ras, index);
794                                 ras->ras_consecutive_stride_requests++;
795                         } else {
796                                 ras_stride_reset(ras);
797                         }
798                         ras_reset(inode, ras, index);
799                         ras->ras_consecutive_pages++;
800                         goto out_unlock;
801                 } else {
802                         ras->ras_consecutive_pages = 0;
803                         ras->ras_consecutive_requests = 0;
804                         if (++ras->ras_consecutive_stride_requests > 1)
805                                 stride_detect = 1;
806                         RAS_CDEBUG(ras);
807                 }
808         } else {
809                 if (ra_miss) {
810                         if (index_in_stride_window(ras, index) &&
811                             stride_io_mode(ras)) {
812                                 /*If stride-RA hit cache miss, the stride dector
813                                  *will not be reset to avoid the overhead of
814                                  *redetecting read-ahead mode
815                                  */
816                                 if (index != ras->ras_last_readpage + 1)
817                                         ras->ras_consecutive_pages = 0;
818                                 ras_reset(inode, ras, index);
819                                 RAS_CDEBUG(ras);
820                         } else {
821                                 /* Reset both stride window and normal RA
822                                  * window
823                                  */
824                                 ras_reset(inode, ras, index);
825                                 ras->ras_consecutive_pages++;
826                                 ras_stride_reset(ras);
827                                 goto out_unlock;
828                         }
829                 } else if (stride_io_mode(ras)) {
830                         /* If this is contiguous read but in stride I/O mode
831                          * currently, check whether stride step still is valid,
832                          * if invalid, it will reset the stride ra window
833                          */
834                         if (!index_in_stride_window(ras, index)) {
835                                 /* Shrink stride read-ahead window to be zero */
836                                 ras_stride_reset(ras);
837                                 ras->ras_window_len = 0;
838                                 ras->ras_next_readahead = index;
839                         }
840                 }
841         }
842         ras->ras_consecutive_pages++;
843         ras->ras_last_readpage = index;
844         ras_set_start(inode, ras, index);
845
846         if (stride_io_mode(ras)) {
847                 /* Since stride readahead is sensitive to the offset
848                  * of read-ahead, so we use original offset here,
849                  * instead of ras_window_start, which is RPC aligned
850                  */
851                 ras->ras_next_readahead = max(index, ras->ras_next_readahead);
852         } else {
853                 if (ras->ras_next_readahead < ras->ras_window_start)
854                         ras->ras_next_readahead = ras->ras_window_start;
855                 if (!hit)
856                         ras->ras_next_readahead = index + 1;
857         }
858         RAS_CDEBUG(ras);
859
860         /* Trigger RA in the mmap case where ras_consecutive_requests
861          * is not incremented and thus can't be used to trigger RA
862          */
863         if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
864                 ras->ras_window_len = RAS_INCREASE_STEP(inode);
865                 goto out_unlock;
866         }
867
868         /* Initially reset the stride window offset to next_readahead*/
869         if (ras->ras_consecutive_stride_requests == 2 && stride_detect) {
870                 /**
871                  * Once stride IO mode is detected, next_readahead should be
872                  * reset to make sure next_readahead > stride offset
873                  */
874                 ras->ras_next_readahead = max(index, ras->ras_next_readahead);
875                 ras->ras_stride_offset = index;
876                 ras->ras_window_len = RAS_INCREASE_STEP(inode);
877         }
878
879         /* The initial ras_window_len is set to the request size.  To avoid
880          * uselessly reading and discarding pages for random IO the window is
881          * only increased once per consecutive request received. */
882         if ((ras->ras_consecutive_requests > 1 || stride_detect) &&
883             !ras->ras_request_index)
884                 ras_increase_window(inode, ras, ra);
885 out_unlock:
886         RAS_CDEBUG(ras);
887         ras->ras_request_index++;
888         spin_unlock(&ras->ras_lock);
889         return;
890 }
891
892 int ll_writepage(struct page *vmpage, struct writeback_control *wbc)
893 {
894         struct inode           *inode = vmpage->mapping->host;
895         struct ll_inode_info   *lli   = ll_i2info(inode);
896         struct lu_env     *env;
897         struct cl_io       *io;
898         struct cl_page   *page;
899         struct cl_object       *clob;
900         struct cl_env_nest      nest;
901         bool redirtied = false;
902         bool unlocked = false;
903         int result;
904
905         LASSERT(PageLocked(vmpage));
906         LASSERT(!PageWriteback(vmpage));
907
908         LASSERT(ll_i2dtexp(inode));
909
910         env = cl_env_nested_get(&nest);
911         if (IS_ERR(env)) {
912                 result = PTR_ERR(env);
913                 goto out;
914         }
915
916         clob  = ll_i2info(inode)->lli_clob;
917         LASSERT(clob);
918
919         io = vvp_env_thread_io(env);
920         io->ci_obj = clob;
921         io->ci_ignore_layout = 1;
922         result = cl_io_init(env, io, CIT_MISC, clob);
923         if (result == 0) {
924                 page = cl_page_find(env, clob, vmpage->index,
925                                     vmpage, CPT_CACHEABLE);
926                 if (!IS_ERR(page)) {
927                         lu_ref_add(&page->cp_reference, "writepage",
928                                    current);
929                         cl_page_assume(env, io, page);
930                         result = cl_page_flush(env, io, page);
931                         if (result != 0) {
932                                 /*
933                                  * Re-dirty page on error so it retries write,
934                                  * but not in case when IO has actually
935                                  * occurred and completed with an error.
936                                  */
937                                 if (!PageError(vmpage)) {
938                                         redirty_page_for_writepage(wbc, vmpage);
939                                         result = 0;
940                                         redirtied = true;
941                                 }
942                         }
943                         cl_page_disown(env, io, page);
944                         unlocked = true;
945                         lu_ref_del(&page->cp_reference,
946                                    "writepage", current);
947                         cl_page_put(env, page);
948                 } else {
949                         result = PTR_ERR(page);
950                 }
951         }
952         cl_io_fini(env, io);
953
954         if (redirtied && wbc->sync_mode == WB_SYNC_ALL) {
955                 loff_t offset = cl_offset(clob, vmpage->index);
956
957                 /* Flush page failed because the extent is being written out.
958                  * Wait for the write of extent to be finished to avoid
959                  * breaking kernel which assumes ->writepage should mark
960                  * PageWriteback or clean the page.
961                  */
962                 result = cl_sync_file_range(inode, offset,
963                                             offset + PAGE_SIZE - 1,
964                                             CL_FSYNC_LOCAL, 1);
965                 if (result > 0) {
966                         /* actually we may have written more than one page.
967                          * decreasing this page because the caller will count
968                          * it.
969                          */
970                         wbc->nr_to_write -= result - 1;
971                         result = 0;
972                 }
973         }
974
975         cl_env_nested_put(&nest, env);
976         goto out;
977
978 out:
979         if (result < 0) {
980                 if (!lli->lli_async_rc)
981                         lli->lli_async_rc = result;
982                 SetPageError(vmpage);
983                 if (!unlocked)
984                         unlock_page(vmpage);
985         }
986         return result;
987 }
988
989 int ll_writepages(struct address_space *mapping, struct writeback_control *wbc)
990 {
991         struct inode *inode = mapping->host;
992         struct ll_sb_info *sbi = ll_i2sbi(inode);
993         loff_t start;
994         loff_t end;
995         enum cl_fsync_mode mode;
996         int range_whole = 0;
997         int result;
998         int ignore_layout = 0;
999
1000         if (wbc->range_cyclic) {
1001                 start = mapping->writeback_index << PAGE_SHIFT;
1002                 end = OBD_OBJECT_EOF;
1003         } else {
1004                 start = wbc->range_start;
1005                 end = wbc->range_end;
1006                 if (end == LLONG_MAX) {
1007                         end = OBD_OBJECT_EOF;
1008                         range_whole = start == 0;
1009                 }
1010         }
1011
1012         mode = CL_FSYNC_NONE;
1013         if (wbc->sync_mode == WB_SYNC_ALL)
1014                 mode = CL_FSYNC_LOCAL;
1015
1016         if (sbi->ll_umounting)
1017                 /* if the mountpoint is being umounted, all pages have to be
1018                  * evicted to avoid hitting LBUG when truncate_inode_pages()
1019                  * is called later on.
1020                  */
1021                 ignore_layout = 1;
1022         result = cl_sync_file_range(inode, start, end, mode, ignore_layout);
1023         if (result > 0) {
1024                 wbc->nr_to_write -= result;
1025                 result = 0;
1026          }
1027
1028         if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) {
1029                 if (end == OBD_OBJECT_EOF)
1030                         mapping->writeback_index = 0;
1031                 else
1032                         mapping->writeback_index = (end >> PAGE_SHIFT) + 1;
1033         }
1034         return result;
1035 }
1036
1037 struct ll_cl_context *ll_cl_find(struct file *file)
1038 {
1039         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1040         struct ll_cl_context *lcc;
1041         struct ll_cl_context *found = NULL;
1042
1043         read_lock(&fd->fd_lock);
1044         list_for_each_entry(lcc, &fd->fd_lccs, lcc_list) {
1045                 if (lcc->lcc_cookie == current) {
1046                         found = lcc;
1047                         break;
1048                 }
1049         }
1050         read_unlock(&fd->fd_lock);
1051
1052         return found;
1053 }
1054
1055 void ll_cl_add(struct file *file, const struct lu_env *env, struct cl_io *io)
1056 {
1057         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1058         struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
1059
1060         memset(lcc, 0, sizeof(*lcc));
1061         INIT_LIST_HEAD(&lcc->lcc_list);
1062         lcc->lcc_cookie = current;
1063         lcc->lcc_env = env;
1064         lcc->lcc_io = io;
1065
1066         write_lock(&fd->fd_lock);
1067         list_add(&lcc->lcc_list, &fd->fd_lccs);
1068         write_unlock(&fd->fd_lock);
1069 }
1070
1071 void ll_cl_remove(struct file *file, const struct lu_env *env)
1072 {
1073         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1074         struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
1075
1076         write_lock(&fd->fd_lock);
1077         list_del_init(&lcc->lcc_list);
1078         write_unlock(&fd->fd_lock);
1079 }
1080
1081 int ll_readpage(struct file *file, struct page *vmpage)
1082 {
1083         struct cl_object *clob = ll_i2info(file_inode(file))->lli_clob;
1084         struct ll_cl_context *lcc;
1085         const struct lu_env  *env;
1086         struct cl_io   *io;
1087         struct cl_page *page;
1088         int result;
1089
1090         lcc = ll_cl_find(file);
1091         if (!lcc) {
1092                 unlock_page(vmpage);
1093                 return -EIO;
1094         }
1095
1096         env = lcc->lcc_env;
1097         io = lcc->lcc_io;
1098         LASSERT(io->ci_state == CIS_IO_GOING);
1099         page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
1100         if (!IS_ERR(page)) {
1101                 LASSERT(page->cp_type == CPT_CACHEABLE);
1102                 if (likely(!PageUptodate(vmpage))) {
1103                         cl_page_assume(env, io, page);
1104                         result = cl_io_read_page(env, io, page);
1105                 } else {
1106                         /* Page from a non-object file. */
1107                         unlock_page(vmpage);
1108                         result = 0;
1109                 }
1110                 cl_page_put(env, page);
1111         } else {
1112                 unlock_page(vmpage);
1113                 result = PTR_ERR(page);
1114         }
1115         return result;
1116 }
1117
1118 int ll_page_sync_io(const struct lu_env *env, struct cl_io *io,
1119                     struct cl_page *page, enum cl_req_type crt)
1120 {
1121         struct cl_2queue  *queue;
1122         int result;
1123
1124         LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
1125
1126         queue = &io->ci_queue;
1127         cl_2queue_init_page(queue, page);
1128
1129         result = cl_io_submit_sync(env, io, crt, queue, 0);
1130         LASSERT(cl_page_is_owned(page, io));
1131
1132         if (crt == CRT_READ)
1133                 /*
1134                  * in CRT_WRITE case page is left locked even in case of
1135                  * error.
1136                  */
1137                 cl_page_list_disown(env, io, &queue->c2_qin);
1138         cl_2queue_fini(env, queue);
1139
1140         return result;
1141 }