Linux 4.7-rc6
[cascardo/linux.git] / drivers / staging / lustre / lustre / lov / lov_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LOV
38
39 #include "../../include/linux/libcfs/libcfs.h"
40
41 #include "../include/obd_class.h"
42 #include "../include/lustre/lustre_idl.h"
43 #include "lov_internal.h"
44
45 static void lov_init_set(struct lov_request_set *set)
46 {
47         set->set_count = 0;
48         atomic_set(&set->set_completes, 0);
49         atomic_set(&set->set_success, 0);
50         atomic_set(&set->set_finish_checked, 0);
51         set->set_cookies = NULL;
52         INIT_LIST_HEAD(&set->set_list);
53         atomic_set(&set->set_refcount, 1);
54         init_waitqueue_head(&set->set_waitq);
55 }
56
57 void lov_finish_set(struct lov_request_set *set)
58 {
59         struct list_head *pos, *n;
60
61         LASSERT(set);
62         list_for_each_safe(pos, n, &set->set_list) {
63                 struct lov_request *req = list_entry(pos,
64                                                          struct lov_request,
65                                                          rq_link);
66                 list_del_init(&req->rq_link);
67
68                 if (req->rq_oi.oi_oa)
69                         kmem_cache_free(obdo_cachep, req->rq_oi.oi_oa);
70                 kfree(req->rq_oi.oi_osfs);
71                 kfree(req);
72         }
73         kfree(set);
74 }
75
76 static int lov_set_finished(struct lov_request_set *set, int idempotent)
77 {
78         int completes = atomic_read(&set->set_completes);
79
80         CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
81
82         if (completes == set->set_count) {
83                 if (idempotent)
84                         return 1;
85                 if (atomic_inc_return(&set->set_finish_checked) == 1)
86                         return 1;
87         }
88         return 0;
89 }
90
91 static void lov_update_set(struct lov_request_set *set,
92                            struct lov_request *req, int rc)
93 {
94         req->rq_complete = 1;
95         req->rq_rc = rc;
96
97         atomic_inc(&set->set_completes);
98         if (rc == 0)
99                 atomic_inc(&set->set_success);
100
101         wake_up(&set->set_waitq);
102 }
103
104 int lov_update_common_set(struct lov_request_set *set,
105                           struct lov_request *req, int rc)
106 {
107         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
108
109         lov_update_set(set, req, rc);
110
111         /* grace error on inactive ost */
112         if (rc && !(lov->lov_tgts[req->rq_idx] &&
113                     lov->lov_tgts[req->rq_idx]->ltd_active))
114                 rc = 0;
115
116         /* FIXME in raid1 regime, should return 0 */
117         return rc;
118 }
119
120 static void lov_set_add_req(struct lov_request *req,
121                             struct lov_request_set *set)
122 {
123         list_add_tail(&req->rq_link, &set->set_list);
124         set->set_count++;
125         req->rq_rqset = set;
126 }
127
128 static int lov_check_set(struct lov_obd *lov, int idx)
129 {
130         int rc;
131         struct lov_tgt_desc *tgt;
132
133         mutex_lock(&lov->lov_lock);
134         tgt = lov->lov_tgts[idx];
135         rc = !tgt || tgt->ltd_active ||
136                 (tgt->ltd_exp &&
137                  class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried);
138         mutex_unlock(&lov->lov_lock);
139
140         return rc;
141 }
142
143 /* Check if the OSC connection exists and is active.
144  * If the OSC has not yet had a chance to connect to the OST the first time,
145  * wait once for it to connect instead of returning an error.
146  */
147 static int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
148 {
149         wait_queue_head_t waitq;
150         struct l_wait_info lwi;
151         struct lov_tgt_desc *tgt;
152         int rc = 0;
153
154         mutex_lock(&lov->lov_lock);
155
156         tgt = lov->lov_tgts[ost_idx];
157
158         if (unlikely(!tgt)) {
159                 rc = 0;
160                 goto out;
161         }
162
163         if (likely(tgt->ltd_active)) {
164                 rc = 1;
165                 goto out;
166         }
167
168         if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried) {
169                 rc = 0;
170                 goto out;
171         }
172
173         mutex_unlock(&lov->lov_lock);
174
175         init_waitqueue_head(&waitq);
176         lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
177                                    cfs_time_seconds(1), NULL, NULL);
178
179         rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
180         if (tgt->ltd_active)
181                 return 1;
182
183         return 0;
184
185 out:
186         mutex_unlock(&lov->lov_lock);
187         return rc;
188 }
189
190 static int common_attr_done(struct lov_request_set *set)
191 {
192         struct lov_request *req;
193         struct obdo *tmp_oa;
194         int rc = 0, attrset = 0;
195
196         if (!set->set_oi->oi_oa)
197                 return 0;
198
199         if (!atomic_read(&set->set_success))
200                 return -EIO;
201
202         tmp_oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
203         if (!tmp_oa) {
204                 rc = -ENOMEM;
205                 goto out;
206         }
207
208         list_for_each_entry(req, &set->set_list, rq_link) {
209                 if (!req->rq_complete || req->rq_rc)
210                         continue;
211                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
212                         continue;
213                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
214                                 req->rq_oi.oi_oa->o_valid,
215                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
216         }
217         if (!attrset) {
218                 CERROR("No stripes had valid attrs\n");
219                 rc = -EIO;
220         }
221         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
222             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
223                 /* When we take attributes of some epoch, we require all the
224                  * ost to be active.
225                  */
226                 CERROR("Not all the stripes had valid attrs\n");
227                 rc = -EIO;
228                 goto out;
229         }
230
231         tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
232         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
233 out:
234         if (tmp_oa)
235                 kmem_cache_free(obdo_cachep, tmp_oa);
236         return rc;
237 }
238
239 int lov_fini_getattr_set(struct lov_request_set *set)
240 {
241         int rc = 0;
242
243         if (!set)
244                 return 0;
245         LASSERT(set->set_exp);
246         if (atomic_read(&set->set_completes))
247                 rc = common_attr_done(set);
248
249         lov_put_reqset(set);
250
251         return rc;
252 }
253
254 /* The callback for osc_getattr_async that finalizes a request info when a
255  * response is received.
256  */
257 static int cb_getattr_update(void *cookie, int rc)
258 {
259         struct obd_info *oinfo = cookie;
260         struct lov_request *lovreq;
261
262         lovreq = container_of(oinfo, struct lov_request, rq_oi);
263         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
264 }
265
266 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
267                          struct lov_request_set **reqset)
268 {
269         struct lov_request_set *set;
270         struct lov_obd *lov = &exp->exp_obd->u.lov;
271         int rc = 0, i;
272
273         set = kzalloc(sizeof(*set), GFP_NOFS);
274         if (!set)
275                 return -ENOMEM;
276         lov_init_set(set);
277
278         set->set_exp = exp;
279         set->set_oi = oinfo;
280
281         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
282                 struct lov_oinfo *loi;
283                 struct lov_request *req;
284
285                 loi = oinfo->oi_md->lsm_oinfo[i];
286                 if (lov_oinfo_is_dummy(loi))
287                         continue;
288
289                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
290                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
291                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) {
292                                 /* SOM requires all the OSTs to be active. */
293                                 rc = -EIO;
294                                 goto out_set;
295                         }
296                         continue;
297                 }
298
299                 req = kzalloc(sizeof(*req), GFP_NOFS);
300                 if (!req) {
301                         rc = -ENOMEM;
302                         goto out_set;
303                 }
304
305                 req->rq_stripe = i;
306                 req->rq_idx = loi->loi_ost_idx;
307
308                 req->rq_oi.oi_oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
309                 if (!req->rq_oi.oi_oa) {
310                         kfree(req);
311                         rc = -ENOMEM;
312                         goto out_set;
313                 }
314                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
315                        sizeof(*req->rq_oi.oi_oa));
316                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
317                 req->rq_oi.oi_cb_up = cb_getattr_update;
318
319                 lov_set_add_req(req, set);
320         }
321         if (!set->set_count) {
322                 rc = -EIO;
323                 goto out_set;
324         }
325         *reqset = set;
326         return rc;
327 out_set:
328         lov_fini_getattr_set(set);
329         return rc;
330 }
331
332 int lov_fini_destroy_set(struct lov_request_set *set)
333 {
334         if (!set)
335                 return 0;
336         LASSERT(set->set_exp);
337         if (atomic_read(&set->set_completes)) {
338                 /* FIXME update qos data here */
339         }
340
341         lov_put_reqset(set);
342
343         return 0;
344 }
345
346 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
347                          struct obdo *src_oa, struct lov_stripe_md *lsm,
348                          struct obd_trans_info *oti,
349                          struct lov_request_set **reqset)
350 {
351         struct lov_request_set *set;
352         struct lov_obd *lov = &exp->exp_obd->u.lov;
353         int rc = 0, i;
354
355         set = kzalloc(sizeof(*set), GFP_NOFS);
356         if (!set)
357                 return -ENOMEM;
358         lov_init_set(set);
359
360         set->set_exp = exp;
361         set->set_oi = oinfo;
362         set->set_oi->oi_md = lsm;
363         set->set_oi->oi_oa = src_oa;
364         if (oti && src_oa->o_valid & OBD_MD_FLCOOKIE)
365                 set->set_cookies = oti->oti_logcookies;
366
367         for (i = 0; i < lsm->lsm_stripe_count; i++) {
368                 struct lov_oinfo *loi;
369                 struct lov_request *req;
370
371                 loi = lsm->lsm_oinfo[i];
372                 if (lov_oinfo_is_dummy(loi))
373                         continue;
374
375                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
376                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
377                         continue;
378                 }
379
380                 req = kzalloc(sizeof(*req), GFP_NOFS);
381                 if (!req) {
382                         rc = -ENOMEM;
383                         goto out_set;
384                 }
385
386                 req->rq_stripe = i;
387                 req->rq_idx = loi->loi_ost_idx;
388
389                 req->rq_oi.oi_oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
390                 if (!req->rq_oi.oi_oa) {
391                         kfree(req);
392                         rc = -ENOMEM;
393                         goto out_set;
394                 }
395                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
396                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
397                 lov_set_add_req(req, set);
398         }
399         if (!set->set_count) {
400                 rc = -EIO;
401                 goto out_set;
402         }
403         *reqset = set;
404         return rc;
405 out_set:
406         lov_fini_destroy_set(set);
407         return rc;
408 }
409
410 int lov_fini_setattr_set(struct lov_request_set *set)
411 {
412         int rc = 0;
413
414         if (!set)
415                 return 0;
416         LASSERT(set->set_exp);
417         if (atomic_read(&set->set_completes)) {
418                 rc = common_attr_done(set);
419                 /* FIXME update qos data here */
420         }
421
422         lov_put_reqset(set);
423         return rc;
424 }
425
426 int lov_update_setattr_set(struct lov_request_set *set,
427                            struct lov_request *req, int rc)
428 {
429         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
430         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
431
432         lov_update_set(set, req, rc);
433
434         /* grace error on inactive ost */
435         if (rc && !(lov->lov_tgts[req->rq_idx] &&
436                     lov->lov_tgts[req->rq_idx]->ltd_active))
437                 rc = 0;
438
439         if (rc == 0) {
440                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
441                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
442                                 req->rq_oi.oi_oa->o_ctime;
443                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
444                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
445                                 req->rq_oi.oi_oa->o_mtime;
446                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
447                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
448                                 req->rq_oi.oi_oa->o_atime;
449         }
450
451         return rc;
452 }
453
454 /* The callback for osc_setattr_async that finalizes a request info when a
455  * response is received.
456  */
457 static int cb_setattr_update(void *cookie, int rc)
458 {
459         struct obd_info *oinfo = cookie;
460         struct lov_request *lovreq;
461
462         lovreq = container_of(oinfo, struct lov_request, rq_oi);
463         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
464 }
465
466 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
467                          struct obd_trans_info *oti,
468                          struct lov_request_set **reqset)
469 {
470         struct lov_request_set *set;
471         struct lov_obd *lov = &exp->exp_obd->u.lov;
472         int rc = 0, i;
473
474         set = kzalloc(sizeof(*set), GFP_NOFS);
475         if (!set)
476                 return -ENOMEM;
477         lov_init_set(set);
478
479         set->set_exp = exp;
480         set->set_oi = oinfo;
481         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
482                 set->set_cookies = oti->oti_logcookies;
483
484         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
485                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
486                 struct lov_request *req;
487
488                 if (lov_oinfo_is_dummy(loi))
489                         continue;
490
491                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
492                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
493                         continue;
494                 }
495
496                 req = kzalloc(sizeof(*req), GFP_NOFS);
497                 if (!req) {
498                         rc = -ENOMEM;
499                         goto out_set;
500                 }
501                 req->rq_stripe = i;
502                 req->rq_idx = loi->loi_ost_idx;
503
504                 req->rq_oi.oi_oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
505                 if (!req->rq_oi.oi_oa) {
506                         kfree(req);
507                         rc = -ENOMEM;
508                         goto out_set;
509                 }
510                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
511                        sizeof(*req->rq_oi.oi_oa));
512                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
513                 req->rq_oi.oi_oa->o_stripe_idx = i;
514                 req->rq_oi.oi_cb_up = cb_setattr_update;
515
516                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
517                         int off = lov_stripe_offset(oinfo->oi_md,
518                                                     oinfo->oi_oa->o_size, i,
519                                                     &req->rq_oi.oi_oa->o_size);
520
521                         if (off < 0 && req->rq_oi.oi_oa->o_size)
522                                 req->rq_oi.oi_oa->o_size--;
523
524                         CDEBUG(D_INODE, "stripe %d has size %llu/%llu\n",
525                                i, req->rq_oi.oi_oa->o_size,
526                                oinfo->oi_oa->o_size);
527                 }
528                 lov_set_add_req(req, set);
529         }
530         if (!set->set_count) {
531                 rc = -EIO;
532                 goto out_set;
533         }
534         *reqset = set;
535         return rc;
536 out_set:
537         lov_fini_setattr_set(set);
538         return rc;
539 }
540
541 #define LOV_U64_MAX ((__u64)~0ULL)
542 #define LOV_SUM_MAX(tot, add)                                      \
543         do {                                                        \
544                 if ((tot) + (add) < (tot))                            \
545                         (tot) = LOV_U64_MAX;                        \
546                 else                                                \
547                         (tot) += (add);                          \
548         } while (0)
549
550 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
551                     int success)
552 {
553         if (success) {
554                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
555                                                            LOV_MAGIC, 0);
556                 if (osfs->os_files != LOV_U64_MAX)
557                         lov_do_div64(osfs->os_files, expected_stripes);
558                 if (osfs->os_ffree != LOV_U64_MAX)
559                         lov_do_div64(osfs->os_ffree, expected_stripes);
560
561                 spin_lock(&obd->obd_osfs_lock);
562                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
563                 obd->obd_osfs_age = cfs_time_current_64();
564                 spin_unlock(&obd->obd_osfs_lock);
565                 return 0;
566         }
567
568         return -EIO;
569 }
570
571 int lov_fini_statfs_set(struct lov_request_set *set)
572 {
573         int rc = 0;
574
575         if (!set)
576                 return 0;
577
578         if (atomic_read(&set->set_completes)) {
579                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
580                                      atomic_read(&set->set_success));
581         }
582         lov_put_reqset(set);
583         return rc;
584 }
585
586 static void lov_update_statfs(struct obd_statfs *osfs,
587                               struct obd_statfs *lov_sfs,
588                               int success)
589 {
590         int shift = 0, quit = 0;
591         __u64 tmp;
592
593         if (success == 0) {
594                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
595         } else {
596                 if (osfs->os_bsize != lov_sfs->os_bsize) {
597                         /* assume all block sizes are always powers of 2 */
598                         /* get the bits difference */
599                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
600                         for (shift = 0; shift <= 64; ++shift) {
601                                 if (tmp & 1) {
602                                         if (quit)
603                                                 break;
604                                         quit = 1;
605                                         shift = 0;
606                                 }
607                                 tmp >>= 1;
608                         }
609                 }
610
611                 if (osfs->os_bsize < lov_sfs->os_bsize) {
612                         osfs->os_bsize = lov_sfs->os_bsize;
613
614                         osfs->os_bfree  >>= shift;
615                         osfs->os_bavail >>= shift;
616                         osfs->os_blocks >>= shift;
617                 } else if (shift != 0) {
618                         lov_sfs->os_bfree  >>= shift;
619                         lov_sfs->os_bavail >>= shift;
620                         lov_sfs->os_blocks >>= shift;
621                 }
622                 osfs->os_bfree += lov_sfs->os_bfree;
623                 osfs->os_bavail += lov_sfs->os_bavail;
624                 osfs->os_blocks += lov_sfs->os_blocks;
625                 /* XXX not sure about this one - depends on policy.
626                  *   - could be minimum if we always stripe on all OBDs
627                  *     (but that would be wrong for any other policy,
628                  *     if one of the OBDs has no more objects left)
629                  *   - could be sum if we stripe whole objects
630                  *   - could be average, just to give a nice number
631                  *
632                  * To give a "reasonable" (if not wholly accurate)
633                  * number, we divide the total number of free objects
634                  * by expected stripe count (watch out for overflow).
635                  */
636                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
637                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
638         }
639 }
640
641 /* The callback for osc_statfs_async that finalizes a request info when a
642  * response is received.
643  */
644 static int cb_statfs_update(void *cookie, int rc)
645 {
646         struct obd_info *oinfo = cookie;
647         struct lov_request *lovreq;
648         struct lov_request_set *set;
649         struct obd_statfs *osfs, *lov_sfs;
650         struct lov_obd *lov;
651         struct lov_tgt_desc *tgt;
652         struct obd_device *lovobd, *tgtobd;
653         int success;
654
655         lovreq = container_of(oinfo, struct lov_request, rq_oi);
656         set = lovreq->rq_rqset;
657         lovobd = set->set_obd;
658         lov = &lovobd->u.lov;
659         osfs = set->set_oi->oi_osfs;
660         lov_sfs = oinfo->oi_osfs;
661         success = atomic_read(&set->set_success);
662         /* XXX: the same is done in lov_update_common_set, however
663          * lovset->set_exp is not initialized.
664          */
665         lov_update_set(set, lovreq, rc);
666         if (rc)
667                 goto out;
668
669         obd_getref(lovobd);
670         tgt = lov->lov_tgts[lovreq->rq_idx];
671         if (!tgt || !tgt->ltd_active)
672                 goto out_update;
673
674         tgtobd = class_exp2obd(tgt->ltd_exp);
675         spin_lock(&tgtobd->obd_osfs_lock);
676         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
677         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
678                 tgtobd->obd_osfs_age = cfs_time_current_64();
679         spin_unlock(&tgtobd->obd_osfs_lock);
680
681 out_update:
682         lov_update_statfs(osfs, lov_sfs, success);
683         obd_putref(lovobd);
684
685 out:
686         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
687             lov_set_finished(set, 0)) {
688                 lov_statfs_interpret(NULL, set, set->set_count !=
689                                      atomic_read(&set->set_success));
690         }
691
692         return 0;
693 }
694
695 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
696                         struct lov_request_set **reqset)
697 {
698         struct lov_request_set *set;
699         struct lov_obd *lov = &obd->u.lov;
700         int rc = 0, i;
701
702         set = kzalloc(sizeof(*set), GFP_NOFS);
703         if (!set)
704                 return -ENOMEM;
705         lov_init_set(set);
706
707         set->set_obd = obd;
708         set->set_oi = oinfo;
709
710         /* We only get block data from the OBD */
711         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
712                 struct lov_request *req;
713
714                 if (!lov->lov_tgts[i] ||
715                     (oinfo->oi_flags & OBD_STATFS_NODELAY &&
716                      !lov->lov_tgts[i]->ltd_active)) {
717                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
718                         continue;
719                 }
720
721                 if (!lov->lov_tgts[i]->ltd_active)
722                         lov_check_and_wait_active(lov, i);
723
724                 /* skip targets that have been explicitly disabled by the
725                  * administrator
726                  */
727                 if (!lov->lov_tgts[i]->ltd_exp) {
728                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
729                         continue;
730                 }
731
732                 req = kzalloc(sizeof(*req), GFP_NOFS);
733                 if (!req) {
734                         rc = -ENOMEM;
735                         goto out_set;
736                 }
737
738                 req->rq_oi.oi_osfs = kzalloc(sizeof(*req->rq_oi.oi_osfs),
739                                              GFP_NOFS);
740                 if (!req->rq_oi.oi_osfs) {
741                         kfree(req);
742                         rc = -ENOMEM;
743                         goto out_set;
744                 }
745
746                 req->rq_idx = i;
747                 req->rq_oi.oi_cb_up = cb_statfs_update;
748                 req->rq_oi.oi_flags = oinfo->oi_flags;
749
750                 lov_set_add_req(req, set);
751         }
752         if (!set->set_count) {
753                 rc = -EIO;
754                 goto out_set;
755         }
756         *reqset = set;
757         return rc;
758 out_set:
759         lov_fini_statfs_set(set);
760         return rc;
761 }