libceph: grab snapc in ceph_osdc_alloc_request()
[cascardo/linux.git] / net / ceph / osd_client.c
1
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/module.h>
5 #include <linux/err.h>
6 #include <linux/highmem.h>
7 #include <linux/mm.h>
8 #include <linux/pagemap.h>
9 #include <linux/slab.h>
10 #include <linux/uaccess.h>
11 #ifdef CONFIG_BLOCK
12 #include <linux/bio.h>
13 #endif
14
15 #include <linux/ceph/libceph.h>
16 #include <linux/ceph/osd_client.h>
17 #include <linux/ceph/messenger.h>
18 #include <linux/ceph/decode.h>
19 #include <linux/ceph/auth.h>
20 #include <linux/ceph/pagelist.h>
21
22 #define OSD_OP_FRONT_LEN        4096
23 #define OSD_OPREPLY_FRONT_LEN   512
24
25 static struct kmem_cache        *ceph_osd_request_cache;
26
27 static const struct ceph_connection_operations osd_con_ops;
28
29 static void __send_queued(struct ceph_osd_client *osdc);
30 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
31 static void __register_request(struct ceph_osd_client *osdc,
32                                struct ceph_osd_request *req);
33 static void __unregister_request(struct ceph_osd_client *osdc,
34                                  struct ceph_osd_request *req);
35 static void __unregister_linger_request(struct ceph_osd_client *osdc,
36                                         struct ceph_osd_request *req);
37 static void __enqueue_request(struct ceph_osd_request *req);
38 static void __send_request(struct ceph_osd_client *osdc,
39                            struct ceph_osd_request *req);
40
41 /*
42  * Implement client access to distributed object storage cluster.
43  *
44  * All data objects are stored within a cluster/cloud of OSDs, or
45  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
46  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
47  * remote daemons serving up and coordinating consistent and safe
48  * access to storage.
49  *
50  * Cluster membership and the mapping of data objects onto storage devices
51  * are described by the osd map.
52  *
53  * We keep track of pending OSD requests (read, write), resubmit
54  * requests to different OSDs when the cluster topology/data layout
55  * change, or retry the affected requests when the communications
56  * channel with an OSD is reset.
57  */
58
59 /*
60  * calculate the mapping of a file extent onto an object, and fill out the
61  * request accordingly.  shorten extent as necessary if it crosses an
62  * object boundary.
63  *
64  * fill osd op in request message.
65  */
66 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
67                         u64 *objnum, u64 *objoff, u64 *objlen)
68 {
69         u64 orig_len = *plen;
70         int r;
71
72         /* object extent? */
73         r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
74                                           objoff, objlen);
75         if (r < 0)
76                 return r;
77         if (*objlen < orig_len) {
78                 *plen = *objlen;
79                 dout(" skipping last %llu, final file extent %llu~%llu\n",
80                      orig_len - *plen, off, *plen);
81         }
82
83         dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
84
85         return 0;
86 }
87
88 static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
89 {
90         memset(osd_data, 0, sizeof (*osd_data));
91         osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
92 }
93
94 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
95                         struct page **pages, u64 length, u32 alignment,
96                         bool pages_from_pool, bool own_pages)
97 {
98         osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
99         osd_data->pages = pages;
100         osd_data->length = length;
101         osd_data->alignment = alignment;
102         osd_data->pages_from_pool = pages_from_pool;
103         osd_data->own_pages = own_pages;
104 }
105
106 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
107                         struct ceph_pagelist *pagelist)
108 {
109         osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
110         osd_data->pagelist = pagelist;
111 }
112
113 #ifdef CONFIG_BLOCK
114 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
115                         struct bio *bio, size_t bio_length)
116 {
117         osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
118         osd_data->bio = bio;
119         osd_data->bio_length = bio_length;
120 }
121 #endif /* CONFIG_BLOCK */
122
123 #define osd_req_op_data(oreq, whch, typ, fld)                           \
124 ({                                                                      \
125         struct ceph_osd_request *__oreq = (oreq);                       \
126         unsigned int __whch = (whch);                                   \
127         BUG_ON(__whch >= __oreq->r_num_ops);                            \
128         &__oreq->r_ops[__whch].typ.fld;                                 \
129 })
130
131 static struct ceph_osd_data *
132 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
133 {
134         BUG_ON(which >= osd_req->r_num_ops);
135
136         return &osd_req->r_ops[which].raw_data_in;
137 }
138
139 struct ceph_osd_data *
140 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
141                         unsigned int which)
142 {
143         return osd_req_op_data(osd_req, which, extent, osd_data);
144 }
145 EXPORT_SYMBOL(osd_req_op_extent_osd_data);
146
147 struct ceph_osd_data *
148 osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
149                         unsigned int which)
150 {
151         return osd_req_op_data(osd_req, which, cls, response_data);
152 }
153 EXPORT_SYMBOL(osd_req_op_cls_response_data);    /* ??? */
154
155 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
156                         unsigned int which, struct page **pages,
157                         u64 length, u32 alignment,
158                         bool pages_from_pool, bool own_pages)
159 {
160         struct ceph_osd_data *osd_data;
161
162         osd_data = osd_req_op_raw_data_in(osd_req, which);
163         ceph_osd_data_pages_init(osd_data, pages, length, alignment,
164                                 pages_from_pool, own_pages);
165 }
166 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
167
168 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
169                         unsigned int which, struct page **pages,
170                         u64 length, u32 alignment,
171                         bool pages_from_pool, bool own_pages)
172 {
173         struct ceph_osd_data *osd_data;
174
175         osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
176         ceph_osd_data_pages_init(osd_data, pages, length, alignment,
177                                 pages_from_pool, own_pages);
178 }
179 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
180
181 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
182                         unsigned int which, struct ceph_pagelist *pagelist)
183 {
184         struct ceph_osd_data *osd_data;
185
186         osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
187         ceph_osd_data_pagelist_init(osd_data, pagelist);
188 }
189 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
190
191 #ifdef CONFIG_BLOCK
192 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
193                         unsigned int which, struct bio *bio, size_t bio_length)
194 {
195         struct ceph_osd_data *osd_data;
196
197         osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
198         ceph_osd_data_bio_init(osd_data, bio, bio_length);
199 }
200 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
201 #endif /* CONFIG_BLOCK */
202
203 static void osd_req_op_cls_request_info_pagelist(
204                         struct ceph_osd_request *osd_req,
205                         unsigned int which, struct ceph_pagelist *pagelist)
206 {
207         struct ceph_osd_data *osd_data;
208
209         osd_data = osd_req_op_data(osd_req, which, cls, request_info);
210         ceph_osd_data_pagelist_init(osd_data, pagelist);
211 }
212
213 void osd_req_op_cls_request_data_pagelist(
214                         struct ceph_osd_request *osd_req,
215                         unsigned int which, struct ceph_pagelist *pagelist)
216 {
217         struct ceph_osd_data *osd_data;
218
219         osd_data = osd_req_op_data(osd_req, which, cls, request_data);
220         ceph_osd_data_pagelist_init(osd_data, pagelist);
221 }
222 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
223
224 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
225                         unsigned int which, struct page **pages, u64 length,
226                         u32 alignment, bool pages_from_pool, bool own_pages)
227 {
228         struct ceph_osd_data *osd_data;
229
230         osd_data = osd_req_op_data(osd_req, which, cls, request_data);
231         ceph_osd_data_pages_init(osd_data, pages, length, alignment,
232                                 pages_from_pool, own_pages);
233 }
234 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
235
236 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
237                         unsigned int which, struct page **pages, u64 length,
238                         u32 alignment, bool pages_from_pool, bool own_pages)
239 {
240         struct ceph_osd_data *osd_data;
241
242         osd_data = osd_req_op_data(osd_req, which, cls, response_data);
243         ceph_osd_data_pages_init(osd_data, pages, length, alignment,
244                                 pages_from_pool, own_pages);
245 }
246 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
247
248 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
249 {
250         switch (osd_data->type) {
251         case CEPH_OSD_DATA_TYPE_NONE:
252                 return 0;
253         case CEPH_OSD_DATA_TYPE_PAGES:
254                 return osd_data->length;
255         case CEPH_OSD_DATA_TYPE_PAGELIST:
256                 return (u64)osd_data->pagelist->length;
257 #ifdef CONFIG_BLOCK
258         case CEPH_OSD_DATA_TYPE_BIO:
259                 return (u64)osd_data->bio_length;
260 #endif /* CONFIG_BLOCK */
261         default:
262                 WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
263                 return 0;
264         }
265 }
266
267 static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
268 {
269         if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
270                 int num_pages;
271
272                 num_pages = calc_pages_for((u64)osd_data->alignment,
273                                                 (u64)osd_data->length);
274                 ceph_release_page_vector(osd_data->pages, num_pages);
275         }
276         ceph_osd_data_init(osd_data);
277 }
278
279 static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
280                         unsigned int which)
281 {
282         struct ceph_osd_req_op *op;
283
284         BUG_ON(which >= osd_req->r_num_ops);
285         op = &osd_req->r_ops[which];
286
287         switch (op->op) {
288         case CEPH_OSD_OP_READ:
289         case CEPH_OSD_OP_WRITE:
290         case CEPH_OSD_OP_WRITEFULL:
291                 ceph_osd_data_release(&op->extent.osd_data);
292                 break;
293         case CEPH_OSD_OP_CALL:
294                 ceph_osd_data_release(&op->cls.request_info);
295                 ceph_osd_data_release(&op->cls.request_data);
296                 ceph_osd_data_release(&op->cls.response_data);
297                 break;
298         case CEPH_OSD_OP_SETXATTR:
299         case CEPH_OSD_OP_CMPXATTR:
300                 ceph_osd_data_release(&op->xattr.osd_data);
301                 break;
302         case CEPH_OSD_OP_STAT:
303                 ceph_osd_data_release(&op->raw_data_in);
304                 break;
305         default:
306                 break;
307         }
308 }
309
310 /*
311  * requests
312  */
313 static void ceph_osdc_release_request(struct kref *kref)
314 {
315         struct ceph_osd_request *req = container_of(kref,
316                                             struct ceph_osd_request, r_kref);
317         unsigned int which;
318
319         dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
320              req->r_request, req->r_reply);
321         WARN_ON(!RB_EMPTY_NODE(&req->r_node));
322         WARN_ON(!list_empty(&req->r_req_lru_item));
323         WARN_ON(!list_empty(&req->r_osd_item));
324         WARN_ON(!list_empty(&req->r_linger_item));
325         WARN_ON(!list_empty(&req->r_linger_osd_item));
326         WARN_ON(req->r_osd);
327
328         if (req->r_request)
329                 ceph_msg_put(req->r_request);
330         if (req->r_reply) {
331                 ceph_msg_revoke_incoming(req->r_reply);
332                 ceph_msg_put(req->r_reply);
333         }
334
335         for (which = 0; which < req->r_num_ops; which++)
336                 osd_req_op_data_release(req, which);
337
338         ceph_put_snap_context(req->r_snapc);
339         if (req->r_mempool)
340                 mempool_free(req, req->r_osdc->req_mempool);
341         else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
342                 kmem_cache_free(ceph_osd_request_cache, req);
343         else
344                 kfree(req);
345 }
346
347 void ceph_osdc_get_request(struct ceph_osd_request *req)
348 {
349         dout("%s %p (was %d)\n", __func__, req,
350              atomic_read(&req->r_kref.refcount));
351         kref_get(&req->r_kref);
352 }
353 EXPORT_SYMBOL(ceph_osdc_get_request);
354
355 void ceph_osdc_put_request(struct ceph_osd_request *req)
356 {
357         if (req) {
358                 dout("%s %p (was %d)\n", __func__, req,
359                      atomic_read(&req->r_kref.refcount));
360                 kref_put(&req->r_kref, ceph_osdc_release_request);
361         }
362 }
363 EXPORT_SYMBOL(ceph_osdc_put_request);
364
365 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
366                                                struct ceph_snap_context *snapc,
367                                                unsigned int num_ops,
368                                                bool use_mempool,
369                                                gfp_t gfp_flags)
370 {
371         struct ceph_osd_request *req;
372         struct ceph_msg *msg;
373         size_t msg_size;
374
375         if (use_mempool) {
376                 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
377                 req = mempool_alloc(osdc->req_mempool, gfp_flags);
378         } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
379                 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
380         } else {
381                 BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
382                 req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
383                               gfp_flags);
384         }
385         if (unlikely(!req))
386                 return NULL;
387
388         /* req only, each op is zeroed in _osd_req_op_init() */
389         memset(req, 0, sizeof(*req));
390
391         req->r_osdc = osdc;
392         req->r_mempool = use_mempool;
393         req->r_num_ops = num_ops;
394         req->r_snapid = CEPH_NOSNAP;
395         req->r_snapc = ceph_get_snap_context(snapc);
396
397         kref_init(&req->r_kref);
398         init_completion(&req->r_completion);
399         init_completion(&req->r_safe_completion);
400         RB_CLEAR_NODE(&req->r_node);
401         INIT_LIST_HEAD(&req->r_unsafe_item);
402         INIT_LIST_HEAD(&req->r_linger_item);
403         INIT_LIST_HEAD(&req->r_linger_osd_item);
404         INIT_LIST_HEAD(&req->r_req_lru_item);
405         INIT_LIST_HEAD(&req->r_osd_item);
406
407         req->r_base_oloc.pool = -1;
408         req->r_target_oloc.pool = -1;
409
410         msg_size = OSD_OPREPLY_FRONT_LEN;
411         if (num_ops > CEPH_OSD_SLAB_OPS) {
412                 /* ceph_osd_op and rval */
413                 msg_size += (num_ops - CEPH_OSD_SLAB_OPS) *
414                             (sizeof(struct ceph_osd_op) + 4);
415         }
416
417         /* create reply message */
418         if (use_mempool)
419                 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
420         else
421                 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size,
422                                    gfp_flags, true);
423         if (!msg) {
424                 ceph_osdc_put_request(req);
425                 return NULL;
426         }
427         req->r_reply = msg;
428
429         msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
430         msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
431         msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
432         msg_size += 1 + 8 + 4 + 4; /* pgid */
433         msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
434         msg_size += 2 + num_ops * sizeof(struct ceph_osd_op);
435         msg_size += 8; /* snapid */
436         msg_size += 8; /* snap_seq */
437         msg_size += 4 + 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
438         msg_size += 4; /* retry_attempt */
439
440         /* create request message; allow space for oid */
441         if (use_mempool)
442                 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
443         else
444                 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
445         if (!msg) {
446                 ceph_osdc_put_request(req);
447                 return NULL;
448         }
449
450         memset(msg->front.iov_base, 0, msg->front.iov_len);
451
452         req->r_request = msg;
453
454         return req;
455 }
456 EXPORT_SYMBOL(ceph_osdc_alloc_request);
457
458 static bool osd_req_opcode_valid(u16 opcode)
459 {
460         switch (opcode) {
461 #define GENERATE_CASE(op, opcode, str)  case CEPH_OSD_OP_##op: return true;
462 __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
463 #undef GENERATE_CASE
464         default:
465                 return false;
466         }
467 }
468
469 /*
470  * This is an osd op init function for opcodes that have no data or
471  * other information associated with them.  It also serves as a
472  * common init routine for all the other init functions, below.
473  */
474 static struct ceph_osd_req_op *
475 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
476                  u16 opcode, u32 flags)
477 {
478         struct ceph_osd_req_op *op;
479
480         BUG_ON(which >= osd_req->r_num_ops);
481         BUG_ON(!osd_req_opcode_valid(opcode));
482
483         op = &osd_req->r_ops[which];
484         memset(op, 0, sizeof (*op));
485         op->op = opcode;
486         op->flags = flags;
487
488         return op;
489 }
490
491 void osd_req_op_init(struct ceph_osd_request *osd_req,
492                      unsigned int which, u16 opcode, u32 flags)
493 {
494         (void)_osd_req_op_init(osd_req, which, opcode, flags);
495 }
496 EXPORT_SYMBOL(osd_req_op_init);
497
498 void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
499                                 unsigned int which, u16 opcode,
500                                 u64 offset, u64 length,
501                                 u64 truncate_size, u32 truncate_seq)
502 {
503         struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
504                                                       opcode, 0);
505         size_t payload_len = 0;
506
507         BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
508                opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
509                opcode != CEPH_OSD_OP_TRUNCATE);
510
511         op->extent.offset = offset;
512         op->extent.length = length;
513         op->extent.truncate_size = truncate_size;
514         op->extent.truncate_seq = truncate_seq;
515         if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
516                 payload_len += length;
517
518         op->indata_len = payload_len;
519 }
520 EXPORT_SYMBOL(osd_req_op_extent_init);
521
522 void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
523                                 unsigned int which, u64 length)
524 {
525         struct ceph_osd_req_op *op;
526         u64 previous;
527
528         BUG_ON(which >= osd_req->r_num_ops);
529         op = &osd_req->r_ops[which];
530         previous = op->extent.length;
531
532         if (length == previous)
533                 return;         /* Nothing to do */
534         BUG_ON(length > previous);
535
536         op->extent.length = length;
537         op->indata_len -= previous - length;
538 }
539 EXPORT_SYMBOL(osd_req_op_extent_update);
540
541 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
542                                 unsigned int which, u64 offset_inc)
543 {
544         struct ceph_osd_req_op *op, *prev_op;
545
546         BUG_ON(which + 1 >= osd_req->r_num_ops);
547
548         prev_op = &osd_req->r_ops[which];
549         op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
550         /* dup previous one */
551         op->indata_len = prev_op->indata_len;
552         op->outdata_len = prev_op->outdata_len;
553         op->extent = prev_op->extent;
554         /* adjust offset */
555         op->extent.offset += offset_inc;
556         op->extent.length -= offset_inc;
557
558         if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
559                 op->indata_len -= offset_inc;
560 }
561 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
562
563 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
564                         u16 opcode, const char *class, const char *method)
565 {
566         struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
567                                                       opcode, 0);
568         struct ceph_pagelist *pagelist;
569         size_t payload_len = 0;
570         size_t size;
571
572         BUG_ON(opcode != CEPH_OSD_OP_CALL);
573
574         pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
575         BUG_ON(!pagelist);
576         ceph_pagelist_init(pagelist);
577
578         op->cls.class_name = class;
579         size = strlen(class);
580         BUG_ON(size > (size_t) U8_MAX);
581         op->cls.class_len = size;
582         ceph_pagelist_append(pagelist, class, size);
583         payload_len += size;
584
585         op->cls.method_name = method;
586         size = strlen(method);
587         BUG_ON(size > (size_t) U8_MAX);
588         op->cls.method_len = size;
589         ceph_pagelist_append(pagelist, method, size);
590         payload_len += size;
591
592         osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
593
594         op->cls.argc = 0;       /* currently unused */
595
596         op->indata_len = payload_len;
597 }
598 EXPORT_SYMBOL(osd_req_op_cls_init);
599
600 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
601                           u16 opcode, const char *name, const void *value,
602                           size_t size, u8 cmp_op, u8 cmp_mode)
603 {
604         struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
605                                                       opcode, 0);
606         struct ceph_pagelist *pagelist;
607         size_t payload_len;
608
609         BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
610
611         pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
612         if (!pagelist)
613                 return -ENOMEM;
614
615         ceph_pagelist_init(pagelist);
616
617         payload_len = strlen(name);
618         op->xattr.name_len = payload_len;
619         ceph_pagelist_append(pagelist, name, payload_len);
620
621         op->xattr.value_len = size;
622         ceph_pagelist_append(pagelist, value, size);
623         payload_len += size;
624
625         op->xattr.cmp_op = cmp_op;
626         op->xattr.cmp_mode = cmp_mode;
627
628         ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
629         op->indata_len = payload_len;
630         return 0;
631 }
632 EXPORT_SYMBOL(osd_req_op_xattr_init);
633
634 void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
635                                 unsigned int which, u16 opcode,
636                                 u64 cookie, u64 version, int flag)
637 {
638         struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
639                                                       opcode, 0);
640
641         BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
642
643         op->watch.cookie = cookie;
644         op->watch.ver = version;
645         if (opcode == CEPH_OSD_OP_WATCH && flag)
646                 op->watch.flag = (u8)1;
647 }
648 EXPORT_SYMBOL(osd_req_op_watch_init);
649
650 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
651                                 unsigned int which,
652                                 u64 expected_object_size,
653                                 u64 expected_write_size)
654 {
655         struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
656                                                       CEPH_OSD_OP_SETALLOCHINT,
657                                                       0);
658
659         op->alloc_hint.expected_object_size = expected_object_size;
660         op->alloc_hint.expected_write_size = expected_write_size;
661
662         /*
663          * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
664          * not worth a feature bit.  Set FAILOK per-op flag to make
665          * sure older osds don't trip over an unsupported opcode.
666          */
667         op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
668 }
669 EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
670
671 static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
672                                 struct ceph_osd_data *osd_data)
673 {
674         u64 length = ceph_osd_data_length(osd_data);
675
676         if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
677                 BUG_ON(length > (u64) SIZE_MAX);
678                 if (length)
679                         ceph_msg_data_add_pages(msg, osd_data->pages,
680                                         length, osd_data->alignment);
681         } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
682                 BUG_ON(!length);
683                 ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
684 #ifdef CONFIG_BLOCK
685         } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
686                 ceph_msg_data_add_bio(msg, osd_data->bio, length);
687 #endif
688         } else {
689                 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
690         }
691 }
692
693 static u64 osd_req_encode_op(struct ceph_osd_request *req,
694                               struct ceph_osd_op *dst, unsigned int which)
695 {
696         struct ceph_osd_req_op *src;
697         struct ceph_osd_data *osd_data;
698         u64 request_data_len = 0;
699         u64 data_length;
700
701         BUG_ON(which >= req->r_num_ops);
702         src = &req->r_ops[which];
703         if (WARN_ON(!osd_req_opcode_valid(src->op))) {
704                 pr_err("unrecognized osd opcode %d\n", src->op);
705
706                 return 0;
707         }
708
709         switch (src->op) {
710         case CEPH_OSD_OP_STAT:
711                 osd_data = &src->raw_data_in;
712                 ceph_osdc_msg_data_add(req->r_reply, osd_data);
713                 break;
714         case CEPH_OSD_OP_READ:
715         case CEPH_OSD_OP_WRITE:
716         case CEPH_OSD_OP_WRITEFULL:
717         case CEPH_OSD_OP_ZERO:
718         case CEPH_OSD_OP_TRUNCATE:
719                 if (src->op == CEPH_OSD_OP_WRITE ||
720                     src->op == CEPH_OSD_OP_WRITEFULL)
721                         request_data_len = src->extent.length;
722                 dst->extent.offset = cpu_to_le64(src->extent.offset);
723                 dst->extent.length = cpu_to_le64(src->extent.length);
724                 dst->extent.truncate_size =
725                         cpu_to_le64(src->extent.truncate_size);
726                 dst->extent.truncate_seq =
727                         cpu_to_le32(src->extent.truncate_seq);
728                 osd_data = &src->extent.osd_data;
729                 if (src->op == CEPH_OSD_OP_WRITE ||
730                     src->op == CEPH_OSD_OP_WRITEFULL)
731                         ceph_osdc_msg_data_add(req->r_request, osd_data);
732                 else
733                         ceph_osdc_msg_data_add(req->r_reply, osd_data);
734                 break;
735         case CEPH_OSD_OP_CALL:
736                 dst->cls.class_len = src->cls.class_len;
737                 dst->cls.method_len = src->cls.method_len;
738                 osd_data = &src->cls.request_info;
739                 ceph_osdc_msg_data_add(req->r_request, osd_data);
740                 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
741                 request_data_len = osd_data->pagelist->length;
742
743                 osd_data = &src->cls.request_data;
744                 data_length = ceph_osd_data_length(osd_data);
745                 if (data_length) {
746                         BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
747                         dst->cls.indata_len = cpu_to_le32(data_length);
748                         ceph_osdc_msg_data_add(req->r_request, osd_data);
749                         src->indata_len += data_length;
750                         request_data_len += data_length;
751                 }
752                 osd_data = &src->cls.response_data;
753                 ceph_osdc_msg_data_add(req->r_reply, osd_data);
754                 break;
755         case CEPH_OSD_OP_STARTSYNC:
756                 break;
757         case CEPH_OSD_OP_NOTIFY_ACK:
758         case CEPH_OSD_OP_WATCH:
759                 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
760                 dst->watch.ver = cpu_to_le64(src->watch.ver);
761                 dst->watch.flag = src->watch.flag;
762                 break;
763         case CEPH_OSD_OP_SETALLOCHINT:
764                 dst->alloc_hint.expected_object_size =
765                     cpu_to_le64(src->alloc_hint.expected_object_size);
766                 dst->alloc_hint.expected_write_size =
767                     cpu_to_le64(src->alloc_hint.expected_write_size);
768                 break;
769         case CEPH_OSD_OP_SETXATTR:
770         case CEPH_OSD_OP_CMPXATTR:
771                 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
772                 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
773                 dst->xattr.cmp_op = src->xattr.cmp_op;
774                 dst->xattr.cmp_mode = src->xattr.cmp_mode;
775                 osd_data = &src->xattr.osd_data;
776                 ceph_osdc_msg_data_add(req->r_request, osd_data);
777                 request_data_len = osd_data->pagelist->length;
778                 break;
779         case CEPH_OSD_OP_CREATE:
780         case CEPH_OSD_OP_DELETE:
781                 break;
782         default:
783                 pr_err("unsupported osd opcode %s\n",
784                         ceph_osd_op_name(src->op));
785                 WARN_ON(1);
786
787                 return 0;
788         }
789
790         dst->op = cpu_to_le16(src->op);
791         dst->flags = cpu_to_le32(src->flags);
792         dst->payload_len = cpu_to_le32(src->indata_len);
793
794         return request_data_len;
795 }
796
797 /*
798  * build new request AND message, calculate layout, and adjust file
799  * extent as needed.
800  *
801  * if the file was recently truncated, we include information about its
802  * old and new size so that the object can be updated appropriately.  (we
803  * avoid synchronously deleting truncated objects because it's slow.)
804  *
805  * if @do_sync, include a 'startsync' command so that the osd will flush
806  * data quickly.
807  */
808 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
809                                                struct ceph_file_layout *layout,
810                                                struct ceph_vino vino,
811                                                u64 off, u64 *plen,
812                                                unsigned int which, int num_ops,
813                                                int opcode, int flags,
814                                                struct ceph_snap_context *snapc,
815                                                u32 truncate_seq,
816                                                u64 truncate_size,
817                                                bool use_mempool)
818 {
819         struct ceph_osd_request *req;
820         u64 objnum = 0;
821         u64 objoff = 0;
822         u64 objlen = 0;
823         int r;
824
825         BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
826                opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
827                opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
828
829         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
830                                         GFP_NOFS);
831         if (!req)
832                 return ERR_PTR(-ENOMEM);
833
834         req->r_flags = flags;
835
836         /* calculate max write size */
837         r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
838         if (r < 0) {
839                 ceph_osdc_put_request(req);
840                 return ERR_PTR(r);
841         }
842
843         if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
844                 osd_req_op_init(req, which, opcode, 0);
845         } else {
846                 u32 object_size = le32_to_cpu(layout->fl_object_size);
847                 u32 object_base = off - objoff;
848                 if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
849                         if (truncate_size <= object_base) {
850                                 truncate_size = 0;
851                         } else {
852                                 truncate_size -= object_base;
853                                 if (truncate_size > object_size)
854                                         truncate_size = object_size;
855                         }
856                 }
857                 osd_req_op_extent_init(req, which, opcode, objoff, objlen,
858                                        truncate_size, truncate_seq);
859         }
860
861         req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
862
863         snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
864                  "%llx.%08llx", vino.ino, objnum);
865         req->r_base_oid.name_len = strlen(req->r_base_oid.name);
866
867         return req;
868 }
869 EXPORT_SYMBOL(ceph_osdc_new_request);
870
871 /*
872  * We keep osd requests in an rbtree, sorted by ->r_tid.
873  */
874 static void __insert_request(struct ceph_osd_client *osdc,
875                              struct ceph_osd_request *new)
876 {
877         struct rb_node **p = &osdc->requests.rb_node;
878         struct rb_node *parent = NULL;
879         struct ceph_osd_request *req = NULL;
880
881         while (*p) {
882                 parent = *p;
883                 req = rb_entry(parent, struct ceph_osd_request, r_node);
884                 if (new->r_tid < req->r_tid)
885                         p = &(*p)->rb_left;
886                 else if (new->r_tid > req->r_tid)
887                         p = &(*p)->rb_right;
888                 else
889                         BUG();
890         }
891
892         rb_link_node(&new->r_node, parent, p);
893         rb_insert_color(&new->r_node, &osdc->requests);
894 }
895
896 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
897                                                  u64 tid)
898 {
899         struct ceph_osd_request *req;
900         struct rb_node *n = osdc->requests.rb_node;
901
902         while (n) {
903                 req = rb_entry(n, struct ceph_osd_request, r_node);
904                 if (tid < req->r_tid)
905                         n = n->rb_left;
906                 else if (tid > req->r_tid)
907                         n = n->rb_right;
908                 else
909                         return req;
910         }
911         return NULL;
912 }
913
914 static struct ceph_osd_request *
915 __lookup_request_ge(struct ceph_osd_client *osdc,
916                     u64 tid)
917 {
918         struct ceph_osd_request *req;
919         struct rb_node *n = osdc->requests.rb_node;
920
921         while (n) {
922                 req = rb_entry(n, struct ceph_osd_request, r_node);
923                 if (tid < req->r_tid) {
924                         if (!n->rb_left)
925                                 return req;
926                         n = n->rb_left;
927                 } else if (tid > req->r_tid) {
928                         n = n->rb_right;
929                 } else {
930                         return req;
931                 }
932         }
933         return NULL;
934 }
935
936 static void __kick_linger_request(struct ceph_osd_request *req)
937 {
938         struct ceph_osd_client *osdc = req->r_osdc;
939         struct ceph_osd *osd = req->r_osd;
940
941         /*
942          * Linger requests need to be resent with a new tid to avoid
943          * the dup op detection logic on the OSDs.  Achieve this with
944          * a re-register dance instead of open-coding.
945          */
946         ceph_osdc_get_request(req);
947         if (!list_empty(&req->r_linger_item))
948                 __unregister_linger_request(osdc, req);
949         else
950                 __unregister_request(osdc, req);
951         __register_request(osdc, req);
952         ceph_osdc_put_request(req);
953
954         /*
955          * Unless request has been registered as both normal and
956          * lingering, __unregister{,_linger}_request clears r_osd.
957          * However, here we need to preserve r_osd to make sure we
958          * requeue on the same OSD.
959          */
960         WARN_ON(req->r_osd || !osd);
961         req->r_osd = osd;
962
963         dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
964         __enqueue_request(req);
965 }
966
967 /*
968  * Resubmit requests pending on the given osd.
969  */
970 static void __kick_osd_requests(struct ceph_osd_client *osdc,
971                                 struct ceph_osd *osd)
972 {
973         struct ceph_osd_request *req, *nreq;
974         LIST_HEAD(resend);
975         LIST_HEAD(resend_linger);
976         int err;
977
978         dout("%s osd%d\n", __func__, osd->o_osd);
979         err = __reset_osd(osdc, osd);
980         if (err)
981                 return;
982
983         /*
984          * Build up a list of requests to resend by traversing the
985          * osd's list of requests.  Requests for a given object are
986          * sent in tid order, and that is also the order they're
987          * kept on this list.  Therefore all requests that are in
988          * flight will be found first, followed by all requests that
989          * have not yet been sent.  And to resend requests while
990          * preserving this order we will want to put any sent
991          * requests back on the front of the osd client's unsent
992          * list.
993          *
994          * So we build a separate ordered list of already-sent
995          * requests for the affected osd and splice it onto the
996          * front of the osd client's unsent list.  Once we've seen a
997          * request that has not yet been sent we're done.  Those
998          * requests are already sitting right where they belong.
999          */
1000         list_for_each_entry(req, &osd->o_requests, r_osd_item) {
1001                 if (!req->r_sent)
1002                         break;
1003
1004                 if (!req->r_linger) {
1005                         dout("%s requeueing %p tid %llu\n", __func__, req,
1006                              req->r_tid);
1007                         list_move_tail(&req->r_req_lru_item, &resend);
1008                         req->r_flags |= CEPH_OSD_FLAG_RETRY;
1009                 } else {
1010                         list_move_tail(&req->r_req_lru_item, &resend_linger);
1011                 }
1012         }
1013         list_splice(&resend, &osdc->req_unsent);
1014
1015         /*
1016          * Both registered and not yet registered linger requests are
1017          * enqueued with a new tid on the same OSD.  We add/move them
1018          * to req_unsent/o_requests at the end to keep things in tid
1019          * order.
1020          */
1021         list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
1022                                  r_linger_osd_item) {
1023                 WARN_ON(!list_empty(&req->r_req_lru_item));
1024                 __kick_linger_request(req);
1025         }
1026
1027         list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
1028                 __kick_linger_request(req);
1029 }
1030
1031 /*
1032  * If the osd connection drops, we need to resubmit all requests.
1033  */
1034 static void osd_reset(struct ceph_connection *con)
1035 {
1036         struct ceph_osd *osd = con->private;
1037         struct ceph_osd_client *osdc;
1038
1039         if (!osd)
1040                 return;
1041         dout("osd_reset osd%d\n", osd->o_osd);
1042         osdc = osd->o_osdc;
1043         down_read(&osdc->map_sem);
1044         mutex_lock(&osdc->request_mutex);
1045         __kick_osd_requests(osdc, osd);
1046         __send_queued(osdc);
1047         mutex_unlock(&osdc->request_mutex);
1048         up_read(&osdc->map_sem);
1049 }
1050
1051 /*
1052  * Track open sessions with osds.
1053  */
1054 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1055 {
1056         struct ceph_osd *osd;
1057
1058         osd = kzalloc(sizeof(*osd), GFP_NOFS);
1059         if (!osd)
1060                 return NULL;
1061
1062         atomic_set(&osd->o_ref, 1);
1063         osd->o_osdc = osdc;
1064         osd->o_osd = onum;
1065         RB_CLEAR_NODE(&osd->o_node);
1066         INIT_LIST_HEAD(&osd->o_requests);
1067         INIT_LIST_HEAD(&osd->o_linger_requests);
1068         INIT_LIST_HEAD(&osd->o_osd_lru);
1069         osd->o_incarnation = 1;
1070
1071         ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1072
1073         INIT_LIST_HEAD(&osd->o_keepalive_item);
1074         return osd;
1075 }
1076
1077 static struct ceph_osd *get_osd(struct ceph_osd *osd)
1078 {
1079         if (atomic_inc_not_zero(&osd->o_ref)) {
1080                 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
1081                      atomic_read(&osd->o_ref));
1082                 return osd;
1083         } else {
1084                 dout("get_osd %p FAIL\n", osd);
1085                 return NULL;
1086         }
1087 }
1088
1089 static void put_osd(struct ceph_osd *osd)
1090 {
1091         dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
1092              atomic_read(&osd->o_ref) - 1);
1093         if (atomic_dec_and_test(&osd->o_ref)) {
1094                 if (osd->o_auth.authorizer)
1095                         ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1096                 kfree(osd);
1097         }
1098 }
1099
1100 /*
1101  * remove an osd from our map
1102  */
1103 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1104 {
1105         dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
1106         WARN_ON(!list_empty(&osd->o_requests));
1107         WARN_ON(!list_empty(&osd->o_linger_requests));
1108
1109         list_del_init(&osd->o_osd_lru);
1110         rb_erase(&osd->o_node, &osdc->osds);
1111         RB_CLEAR_NODE(&osd->o_node);
1112 }
1113
1114 static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1115 {
1116         dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
1117
1118         if (!RB_EMPTY_NODE(&osd->o_node)) {
1119                 ceph_con_close(&osd->o_con);
1120                 __remove_osd(osdc, osd);
1121                 put_osd(osd);
1122         }
1123 }
1124
1125 static void remove_all_osds(struct ceph_osd_client *osdc)
1126 {
1127         dout("%s %p\n", __func__, osdc);
1128         mutex_lock(&osdc->request_mutex);
1129         while (!RB_EMPTY_ROOT(&osdc->osds)) {
1130                 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
1131                                                 struct ceph_osd, o_node);
1132                 remove_osd(osdc, osd);
1133         }
1134         mutex_unlock(&osdc->request_mutex);
1135 }
1136
1137 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
1138                               struct ceph_osd *osd)
1139 {
1140         dout("%s %p\n", __func__, osd);
1141         BUG_ON(!list_empty(&osd->o_osd_lru));
1142
1143         list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1144         osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1145 }
1146
1147 static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc,
1148                                   struct ceph_osd *osd)
1149 {
1150         dout("%s %p\n", __func__, osd);
1151
1152         if (list_empty(&osd->o_requests) &&
1153             list_empty(&osd->o_linger_requests))
1154                 __move_osd_to_lru(osdc, osd);
1155 }
1156
1157 static void __remove_osd_from_lru(struct ceph_osd *osd)
1158 {
1159         dout("__remove_osd_from_lru %p\n", osd);
1160         if (!list_empty(&osd->o_osd_lru))
1161                 list_del_init(&osd->o_osd_lru);
1162 }
1163
1164 static void remove_old_osds(struct ceph_osd_client *osdc)
1165 {
1166         struct ceph_osd *osd, *nosd;
1167
1168         dout("__remove_old_osds %p\n", osdc);
1169         mutex_lock(&osdc->request_mutex);
1170         list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
1171                 if (time_before(jiffies, osd->lru_ttl))
1172                         break;
1173                 remove_osd(osdc, osd);
1174         }
1175         mutex_unlock(&osdc->request_mutex);
1176 }
1177
1178 /*
1179  * reset osd connect
1180  */
1181 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1182 {
1183         struct ceph_entity_addr *peer_addr;
1184
1185         dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
1186         if (list_empty(&osd->o_requests) &&
1187             list_empty(&osd->o_linger_requests)) {
1188                 remove_osd(osdc, osd);
1189                 return -ENODEV;
1190         }
1191
1192         peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
1193         if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1194                         !ceph_con_opened(&osd->o_con)) {
1195                 struct ceph_osd_request *req;
1196
1197                 dout("osd addr hasn't changed and connection never opened, "
1198                      "letting msgr retry\n");
1199                 /* touch each r_stamp for handle_timeout()'s benfit */
1200                 list_for_each_entry(req, &osd->o_requests, r_osd_item)
1201                         req->r_stamp = jiffies;
1202
1203                 return -EAGAIN;
1204         }
1205
1206         ceph_con_close(&osd->o_con);
1207         ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1208         osd->o_incarnation++;
1209
1210         return 0;
1211 }
1212
1213 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
1214 {
1215         struct rb_node **p = &osdc->osds.rb_node;
1216         struct rb_node *parent = NULL;
1217         struct ceph_osd *osd = NULL;
1218
1219         dout("__insert_osd %p osd%d\n", new, new->o_osd);
1220         while (*p) {
1221                 parent = *p;
1222                 osd = rb_entry(parent, struct ceph_osd, o_node);
1223                 if (new->o_osd < osd->o_osd)
1224                         p = &(*p)->rb_left;
1225                 else if (new->o_osd > osd->o_osd)
1226                         p = &(*p)->rb_right;
1227                 else
1228                         BUG();
1229         }
1230
1231         rb_link_node(&new->o_node, parent, p);
1232         rb_insert_color(&new->o_node, &osdc->osds);
1233 }
1234
1235 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
1236 {
1237         struct ceph_osd *osd;
1238         struct rb_node *n = osdc->osds.rb_node;
1239
1240         while (n) {
1241                 osd = rb_entry(n, struct ceph_osd, o_node);
1242                 if (o < osd->o_osd)
1243                         n = n->rb_left;
1244                 else if (o > osd->o_osd)
1245                         n = n->rb_right;
1246                 else
1247                         return osd;
1248         }
1249         return NULL;
1250 }
1251
1252 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
1253 {
1254         schedule_delayed_work(&osdc->timeout_work,
1255                               osdc->client->options->osd_keepalive_timeout);
1256 }
1257
1258 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
1259 {
1260         cancel_delayed_work(&osdc->timeout_work);
1261 }
1262
1263 /*
1264  * Register request, assign tid.  If this is the first request, set up
1265  * the timeout event.
1266  */
1267 static void __register_request(struct ceph_osd_client *osdc,
1268                                struct ceph_osd_request *req)
1269 {
1270         req->r_tid = ++osdc->last_tid;
1271         req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
1272         dout("__register_request %p tid %lld\n", req, req->r_tid);
1273         __insert_request(osdc, req);
1274         ceph_osdc_get_request(req);
1275         osdc->num_requests++;
1276         if (osdc->num_requests == 1) {
1277                 dout(" first request, scheduling timeout\n");
1278                 __schedule_osd_timeout(osdc);
1279         }
1280 }
1281
1282 /*
1283  * called under osdc->request_mutex
1284  */
1285 static void __unregister_request(struct ceph_osd_client *osdc,
1286                                  struct ceph_osd_request *req)
1287 {
1288         if (RB_EMPTY_NODE(&req->r_node)) {
1289                 dout("__unregister_request %p tid %lld not registered\n",
1290                         req, req->r_tid);
1291                 return;
1292         }
1293
1294         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
1295         rb_erase(&req->r_node, &osdc->requests);
1296         RB_CLEAR_NODE(&req->r_node);
1297         osdc->num_requests--;
1298
1299         if (req->r_osd) {
1300                 /* make sure the original request isn't in flight. */
1301                 ceph_msg_revoke(req->r_request);
1302
1303                 list_del_init(&req->r_osd_item);
1304                 maybe_move_osd_to_lru(osdc, req->r_osd);
1305                 if (list_empty(&req->r_linger_osd_item))
1306                         req->r_osd = NULL;
1307         }
1308
1309         list_del_init(&req->r_req_lru_item);
1310         ceph_osdc_put_request(req);
1311
1312         if (osdc->num_requests == 0) {
1313                 dout(" no requests, canceling timeout\n");
1314                 __cancel_osd_timeout(osdc);
1315         }
1316 }
1317
1318 /*
1319  * Cancel a previously queued request message
1320  */
1321 static void __cancel_request(struct ceph_osd_request *req)
1322 {
1323         if (req->r_sent && req->r_osd) {
1324                 ceph_msg_revoke(req->r_request);
1325                 req->r_sent = 0;
1326         }
1327 }
1328
1329 static void __register_linger_request(struct ceph_osd_client *osdc,
1330                                     struct ceph_osd_request *req)
1331 {
1332         dout("%s %p tid %llu\n", __func__, req, req->r_tid);
1333         WARN_ON(!req->r_linger);
1334
1335         ceph_osdc_get_request(req);
1336         list_add_tail(&req->r_linger_item, &osdc->req_linger);
1337         if (req->r_osd)
1338                 list_add_tail(&req->r_linger_osd_item,
1339                               &req->r_osd->o_linger_requests);
1340 }
1341
1342 static void __unregister_linger_request(struct ceph_osd_client *osdc,
1343                                         struct ceph_osd_request *req)
1344 {
1345         WARN_ON(!req->r_linger);
1346
1347         if (list_empty(&req->r_linger_item)) {
1348                 dout("%s %p tid %llu not registered\n", __func__, req,
1349                      req->r_tid);
1350                 return;
1351         }
1352
1353         dout("%s %p tid %llu\n", __func__, req, req->r_tid);
1354         list_del_init(&req->r_linger_item);
1355
1356         if (req->r_osd) {
1357                 list_del_init(&req->r_linger_osd_item);
1358                 maybe_move_osd_to_lru(osdc, req->r_osd);
1359                 if (list_empty(&req->r_osd_item))
1360                         req->r_osd = NULL;
1361         }
1362         ceph_osdc_put_request(req);
1363 }
1364
1365 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
1366                                   struct ceph_osd_request *req)
1367 {
1368         if (!req->r_linger) {
1369                 dout("set_request_linger %p\n", req);
1370                 req->r_linger = 1;
1371         }
1372 }
1373 EXPORT_SYMBOL(ceph_osdc_set_request_linger);
1374
1375 /*
1376  * Returns whether a request should be blocked from being sent
1377  * based on the current osdmap and osd_client settings.
1378  *
1379  * Caller should hold map_sem for read.
1380  */
1381 static bool __req_should_be_paused(struct ceph_osd_client *osdc,
1382                                    struct ceph_osd_request *req)
1383 {
1384         bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
1385         bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
1386                 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
1387         return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) ||
1388                 (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
1389 }
1390
1391 /*
1392  * Calculate mapping of a request to a PG.  Takes tiering into account.
1393  */
1394 static int __calc_request_pg(struct ceph_osdmap *osdmap,
1395                              struct ceph_osd_request *req,
1396                              struct ceph_pg *pg_out)
1397 {
1398         bool need_check_tiering;
1399
1400         need_check_tiering = false;
1401         if (req->r_target_oloc.pool == -1) {
1402                 req->r_target_oloc = req->r_base_oloc; /* struct */
1403                 need_check_tiering = true;
1404         }
1405         if (req->r_target_oid.name_len == 0) {
1406                 ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
1407                 need_check_tiering = true;
1408         }
1409
1410         if (need_check_tiering &&
1411             (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1412                 struct ceph_pg_pool_info *pi;
1413
1414                 pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
1415                 if (pi) {
1416                         if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
1417                             pi->read_tier >= 0)
1418                                 req->r_target_oloc.pool = pi->read_tier;
1419                         if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1420                             pi->write_tier >= 0)
1421                                 req->r_target_oloc.pool = pi->write_tier;
1422                 }
1423                 /* !pi is caught in ceph_oloc_oid_to_pg() */
1424         }
1425
1426         return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
1427                                    &req->r_target_oid, pg_out);
1428 }
1429
1430 static void __enqueue_request(struct ceph_osd_request *req)
1431 {
1432         struct ceph_osd_client *osdc = req->r_osdc;
1433
1434         dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
1435              req->r_osd ? req->r_osd->o_osd : -1);
1436
1437         if (req->r_osd) {
1438                 __remove_osd_from_lru(req->r_osd);
1439                 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
1440                 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
1441         } else {
1442                 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
1443         }
1444 }
1445
1446 /*
1447  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
1448  * (as needed), and set the request r_osd appropriately.  If there is
1449  * no up osd, set r_osd to NULL.  Move the request to the appropriate list
1450  * (unsent, homeless) or leave on in-flight lru.
1451  *
1452  * Return 0 if unchanged, 1 if changed, or negative on error.
1453  *
1454  * Caller should hold map_sem for read and request_mutex.
1455  */
1456 static int __map_request(struct ceph_osd_client *osdc,
1457                          struct ceph_osd_request *req, int force_resend)
1458 {
1459         struct ceph_pg pgid;
1460         int acting[CEPH_PG_MAX_SIZE];
1461         int num, o;
1462         int err;
1463         bool was_paused;
1464
1465         dout("map_request %p tid %lld\n", req, req->r_tid);
1466
1467         err = __calc_request_pg(osdc->osdmap, req, &pgid);
1468         if (err) {
1469                 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1470                 return err;
1471         }
1472         req->r_pgid = pgid;
1473
1474         num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o);
1475         if (num < 0)
1476                 num = 0;
1477
1478         was_paused = req->r_paused;
1479         req->r_paused = __req_should_be_paused(osdc, req);
1480         if (was_paused && !req->r_paused)
1481                 force_resend = 1;
1482
1483         if ((!force_resend &&
1484              req->r_osd && req->r_osd->o_osd == o &&
1485              req->r_sent >= req->r_osd->o_incarnation &&
1486              req->r_num_pg_osds == num &&
1487              memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
1488             (req->r_osd == NULL && o == -1) ||
1489             req->r_paused)
1490                 return 0;  /* no change */
1491
1492         dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
1493              req->r_tid, pgid.pool, pgid.seed, o,
1494              req->r_osd ? req->r_osd->o_osd : -1);
1495
1496         /* record full pg acting set */
1497         memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
1498         req->r_num_pg_osds = num;
1499
1500         if (req->r_osd) {
1501                 __cancel_request(req);
1502                 list_del_init(&req->r_osd_item);
1503                 list_del_init(&req->r_linger_osd_item);
1504                 req->r_osd = NULL;
1505         }
1506
1507         req->r_osd = __lookup_osd(osdc, o);
1508         if (!req->r_osd && o >= 0) {
1509                 err = -ENOMEM;
1510                 req->r_osd = create_osd(osdc, o);
1511                 if (!req->r_osd) {
1512                         list_move(&req->r_req_lru_item, &osdc->req_notarget);
1513                         goto out;
1514                 }
1515
1516                 dout("map_request osd %p is osd%d\n", req->r_osd, o);
1517                 __insert_osd(osdc, req->r_osd);
1518
1519                 ceph_con_open(&req->r_osd->o_con,
1520                               CEPH_ENTITY_TYPE_OSD, o,
1521                               &osdc->osdmap->osd_addr[o]);
1522         }
1523
1524         __enqueue_request(req);
1525         err = 1;   /* osd or pg changed */
1526
1527 out:
1528         return err;
1529 }
1530
1531 /*
1532  * caller should hold map_sem (for read) and request_mutex
1533  */
1534 static void __send_request(struct ceph_osd_client *osdc,
1535                            struct ceph_osd_request *req)
1536 {
1537         void *p;
1538
1539         dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1540              req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1541              (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1542
1543         /* fill in message content that changes each time we send it */
1544         put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1545         put_unaligned_le32(req->r_flags, req->r_request_flags);
1546         put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
1547         p = req->r_request_pgid;
1548         ceph_encode_64(&p, req->r_pgid.pool);
1549         ceph_encode_32(&p, req->r_pgid.seed);
1550         put_unaligned_le64(1, req->r_request_attempts);  /* FIXME */
1551         memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1552                sizeof(req->r_reassert_version));
1553
1554         req->r_stamp = jiffies;
1555         list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1556
1557         ceph_msg_get(req->r_request); /* send consumes a ref */
1558
1559         req->r_sent = req->r_osd->o_incarnation;
1560
1561         ceph_con_send(&req->r_osd->o_con, req->r_request);
1562 }
1563
1564 /*
1565  * Send any requests in the queue (req_unsent).
1566  */
1567 static void __send_queued(struct ceph_osd_client *osdc)
1568 {
1569         struct ceph_osd_request *req, *tmp;
1570
1571         dout("__send_queued\n");
1572         list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
1573                 __send_request(osdc, req);
1574 }
1575
1576 /*
1577  * Caller should hold map_sem for read and request_mutex.
1578  */
1579 static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
1580                                      struct ceph_osd_request *req,
1581                                      bool nofail)
1582 {
1583         int rc;
1584
1585         __register_request(osdc, req);
1586         req->r_sent = 0;
1587         req->r_got_reply = 0;
1588         rc = __map_request(osdc, req, 0);
1589         if (rc < 0) {
1590                 if (nofail) {
1591                         dout("osdc_start_request failed map, "
1592                                 " will retry %lld\n", req->r_tid);
1593                         rc = 0;
1594                 } else {
1595                         __unregister_request(osdc, req);
1596                 }
1597                 return rc;
1598         }
1599
1600         if (req->r_osd == NULL) {
1601                 dout("send_request %p no up osds in pg\n", req);
1602                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1603         } else {
1604                 __send_queued(osdc);
1605         }
1606
1607         return 0;
1608 }
1609
1610 /*
1611  * Timeout callback, called every N seconds when 1 or more osd
1612  * requests has been active for more than N seconds.  When this
1613  * happens, we ping all OSDs with requests who have timed out to
1614  * ensure any communications channel reset is detected.  Reset the
1615  * request timeouts another N seconds in the future as we go.
1616  * Reschedule the timeout event another N seconds in future (unless
1617  * there are no open requests).
1618  */
1619 static void handle_timeout(struct work_struct *work)
1620 {
1621         struct ceph_osd_client *osdc =
1622                 container_of(work, struct ceph_osd_client, timeout_work.work);
1623         struct ceph_options *opts = osdc->client->options;
1624         struct ceph_osd_request *req;
1625         struct ceph_osd *osd;
1626         struct list_head slow_osds;
1627         dout("timeout\n");
1628         down_read(&osdc->map_sem);
1629
1630         ceph_monc_request_next_osdmap(&osdc->client->monc);
1631
1632         mutex_lock(&osdc->request_mutex);
1633
1634         /*
1635          * ping osds that are a bit slow.  this ensures that if there
1636          * is a break in the TCP connection we will notice, and reopen
1637          * a connection with that osd (from the fault callback).
1638          */
1639         INIT_LIST_HEAD(&slow_osds);
1640         list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1641                 if (time_before(jiffies,
1642                                 req->r_stamp + opts->osd_keepalive_timeout))
1643                         break;
1644
1645                 osd = req->r_osd;
1646                 BUG_ON(!osd);
1647                 dout(" tid %llu is slow, will send keepalive on osd%d\n",
1648                      req->r_tid, osd->o_osd);
1649                 list_move_tail(&osd->o_keepalive_item, &slow_osds);
1650         }
1651         while (!list_empty(&slow_osds)) {
1652                 osd = list_entry(slow_osds.next, struct ceph_osd,
1653                                  o_keepalive_item);
1654                 list_del_init(&osd->o_keepalive_item);
1655                 ceph_con_keepalive(&osd->o_con);
1656         }
1657
1658         __schedule_osd_timeout(osdc);
1659         __send_queued(osdc);
1660         mutex_unlock(&osdc->request_mutex);
1661         up_read(&osdc->map_sem);
1662 }
1663
1664 static void handle_osds_timeout(struct work_struct *work)
1665 {
1666         struct ceph_osd_client *osdc =
1667                 container_of(work, struct ceph_osd_client,
1668                              osds_timeout_work.work);
1669         unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
1670
1671         dout("osds timeout\n");
1672         down_read(&osdc->map_sem);
1673         remove_old_osds(osdc);
1674         up_read(&osdc->map_sem);
1675
1676         schedule_delayed_work(&osdc->osds_timeout_work,
1677                               round_jiffies_relative(delay));
1678 }
1679
1680 static int ceph_oloc_decode(void **p, void *end,
1681                             struct ceph_object_locator *oloc)
1682 {
1683         u8 struct_v, struct_cv;
1684         u32 len;
1685         void *struct_end;
1686         int ret = 0;
1687
1688         ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1689         struct_v = ceph_decode_8(p);
1690         struct_cv = ceph_decode_8(p);
1691         if (struct_v < 3) {
1692                 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
1693                         struct_v, struct_cv);
1694                 goto e_inval;
1695         }
1696         if (struct_cv > 6) {
1697                 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
1698                         struct_v, struct_cv);
1699                 goto e_inval;
1700         }
1701         len = ceph_decode_32(p);
1702         ceph_decode_need(p, end, len, e_inval);
1703         struct_end = *p + len;
1704
1705         oloc->pool = ceph_decode_64(p);
1706         *p += 4; /* skip preferred */
1707
1708         len = ceph_decode_32(p);
1709         if (len > 0) {
1710                 pr_warn("ceph_object_locator::key is set\n");
1711                 goto e_inval;
1712         }
1713
1714         if (struct_v >= 5) {
1715                 len = ceph_decode_32(p);
1716                 if (len > 0) {
1717                         pr_warn("ceph_object_locator::nspace is set\n");
1718                         goto e_inval;
1719                 }
1720         }
1721
1722         if (struct_v >= 6) {
1723                 s64 hash = ceph_decode_64(p);
1724                 if (hash != -1) {
1725                         pr_warn("ceph_object_locator::hash is set\n");
1726                         goto e_inval;
1727                 }
1728         }
1729
1730         /* skip the rest */
1731         *p = struct_end;
1732 out:
1733         return ret;
1734
1735 e_inval:
1736         ret = -EINVAL;
1737         goto out;
1738 }
1739
1740 static int ceph_redirect_decode(void **p, void *end,
1741                                 struct ceph_request_redirect *redir)
1742 {
1743         u8 struct_v, struct_cv;
1744         u32 len;
1745         void *struct_end;
1746         int ret;
1747
1748         ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1749         struct_v = ceph_decode_8(p);
1750         struct_cv = ceph_decode_8(p);
1751         if (struct_cv > 1) {
1752                 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
1753                         struct_v, struct_cv);
1754                 goto e_inval;
1755         }
1756         len = ceph_decode_32(p);
1757         ceph_decode_need(p, end, len, e_inval);
1758         struct_end = *p + len;
1759
1760         ret = ceph_oloc_decode(p, end, &redir->oloc);
1761         if (ret)
1762                 goto out;
1763
1764         len = ceph_decode_32(p);
1765         if (len > 0) {
1766                 pr_warn("ceph_request_redirect::object_name is set\n");
1767                 goto e_inval;
1768         }
1769
1770         len = ceph_decode_32(p);
1771         *p += len; /* skip osd_instructions */
1772
1773         /* skip the rest */
1774         *p = struct_end;
1775 out:
1776         return ret;
1777
1778 e_inval:
1779         ret = -EINVAL;
1780         goto out;
1781 }
1782
1783 static void complete_request(struct ceph_osd_request *req)
1784 {
1785         complete_all(&req->r_safe_completion);  /* fsync waiter */
1786 }
1787
1788 /*
1789  * handle osd op reply.  either call the callback if it is specified,
1790  * or do the completion to wake up the waiting thread.
1791  */
1792 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1793 {
1794         void *p, *end;
1795         struct ceph_osd_request *req;
1796         struct ceph_request_redirect redir;
1797         u64 tid;
1798         int object_len;
1799         unsigned int numops;
1800         int payload_len, flags;
1801         s32 result;
1802         s32 retry_attempt;
1803         struct ceph_pg pg;
1804         int err;
1805         u32 reassert_epoch;
1806         u64 reassert_version;
1807         u32 osdmap_epoch;
1808         int already_completed;
1809         u32 bytes;
1810         u8 decode_redir;
1811         unsigned int i;
1812
1813         tid = le64_to_cpu(msg->hdr.tid);
1814         dout("handle_reply %p tid %llu\n", msg, tid);
1815
1816         p = msg->front.iov_base;
1817         end = p + msg->front.iov_len;
1818
1819         ceph_decode_need(&p, end, 4, bad);
1820         object_len = ceph_decode_32(&p);
1821         ceph_decode_need(&p, end, object_len, bad);
1822         p += object_len;
1823
1824         err = ceph_decode_pgid(&p, end, &pg);
1825         if (err)
1826                 goto bad;
1827
1828         ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1829         flags = ceph_decode_64(&p);
1830         result = ceph_decode_32(&p);
1831         reassert_epoch = ceph_decode_32(&p);
1832         reassert_version = ceph_decode_64(&p);
1833         osdmap_epoch = ceph_decode_32(&p);
1834
1835         /* lookup */
1836         down_read(&osdc->map_sem);
1837         mutex_lock(&osdc->request_mutex);
1838         req = __lookup_request(osdc, tid);
1839         if (req == NULL) {
1840                 dout("handle_reply tid %llu dne\n", tid);
1841                 goto bad_mutex;
1842         }
1843         ceph_osdc_get_request(req);
1844
1845         dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1846              req, result);
1847
1848         ceph_decode_need(&p, end, 4, bad_put);
1849         numops = ceph_decode_32(&p);
1850         if (numops > CEPH_OSD_MAX_OPS)
1851                 goto bad_put;
1852         if (numops != req->r_num_ops)
1853                 goto bad_put;
1854         payload_len = 0;
1855         ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put);
1856         for (i = 0; i < numops; i++) {
1857                 struct ceph_osd_op *op = p;
1858                 int len;
1859
1860                 len = le32_to_cpu(op->payload_len);
1861                 req->r_ops[i].outdata_len = len;
1862                 dout(" op %d has %d bytes\n", i, len);
1863                 payload_len += len;
1864                 p += sizeof(*op);
1865         }
1866         bytes = le32_to_cpu(msg->hdr.data_len);
1867         if (payload_len != bytes) {
1868                 pr_warn("sum of op payload lens %d != data_len %d\n",
1869                         payload_len, bytes);
1870                 goto bad_put;
1871         }
1872
1873         ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
1874         retry_attempt = ceph_decode_32(&p);
1875         for (i = 0; i < numops; i++)
1876                 req->r_ops[i].rval = ceph_decode_32(&p);
1877
1878         if (le16_to_cpu(msg->hdr.version) >= 6) {
1879                 p += 8 + 4; /* skip replay_version */
1880                 p += 8; /* skip user_version */
1881
1882                 if (le16_to_cpu(msg->hdr.version) >= 7)
1883                         ceph_decode_8_safe(&p, end, decode_redir, bad_put);
1884                 else
1885                         decode_redir = 1;
1886         } else {
1887                 decode_redir = 0;
1888         }
1889
1890         if (decode_redir) {
1891                 err = ceph_redirect_decode(&p, end, &redir);
1892                 if (err)
1893                         goto bad_put;
1894         } else {
1895                 redir.oloc.pool = -1;
1896         }
1897
1898         if (redir.oloc.pool != -1) {
1899                 dout("redirect pool %lld\n", redir.oloc.pool);
1900
1901                 __unregister_request(osdc, req);
1902
1903                 req->r_target_oloc = redir.oloc; /* struct */
1904
1905                 /*
1906                  * Start redirect requests with nofail=true.  If
1907                  * mapping fails, request will end up on the notarget
1908                  * list, waiting for the new osdmap (which can take
1909                  * a while), even though the original request mapped
1910                  * successfully.  In the future we might want to follow
1911                  * original request's nofail setting here.
1912                  */
1913                 err = __ceph_osdc_start_request(osdc, req, true);
1914                 BUG_ON(err);
1915
1916                 goto out_unlock;
1917         }
1918
1919         already_completed = req->r_got_reply;
1920         if (!req->r_got_reply) {
1921                 req->r_result = result;
1922                 dout("handle_reply result %d bytes %d\n", req->r_result,
1923                      bytes);
1924                 if (req->r_result == 0)
1925                         req->r_result = bytes;
1926
1927                 /* in case this is a write and we need to replay, */
1928                 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1929                 req->r_reassert_version.version = cpu_to_le64(reassert_version);
1930
1931                 req->r_got_reply = 1;
1932         } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1933                 dout("handle_reply tid %llu dup ack\n", tid);
1934                 goto out_unlock;
1935         }
1936
1937         dout("handle_reply tid %llu flags %d\n", tid, flags);
1938
1939         if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1940                 __register_linger_request(osdc, req);
1941
1942         /* either this is a read, or we got the safe response */
1943         if (result < 0 ||
1944             (flags & CEPH_OSD_FLAG_ONDISK) ||
1945             ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1946                 __unregister_request(osdc, req);
1947
1948         mutex_unlock(&osdc->request_mutex);
1949         up_read(&osdc->map_sem);
1950
1951         if (!already_completed) {
1952                 if (req->r_unsafe_callback &&
1953                     result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK))
1954                         req->r_unsafe_callback(req, true);
1955                 if (req->r_callback)
1956                         req->r_callback(req, msg);
1957                 else
1958                         complete_all(&req->r_completion);
1959         }
1960
1961         if (flags & CEPH_OSD_FLAG_ONDISK) {
1962                 if (req->r_unsafe_callback && already_completed)
1963                         req->r_unsafe_callback(req, false);
1964                 complete_request(req);
1965         }
1966
1967 out:
1968         dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1969         ceph_osdc_put_request(req);
1970         return;
1971 out_unlock:
1972         mutex_unlock(&osdc->request_mutex);
1973         up_read(&osdc->map_sem);
1974         goto out;
1975
1976 bad_put:
1977         req->r_result = -EIO;
1978         __unregister_request(osdc, req);
1979         if (req->r_callback)
1980                 req->r_callback(req, msg);
1981         else
1982                 complete_all(&req->r_completion);
1983         complete_request(req);
1984         ceph_osdc_put_request(req);
1985 bad_mutex:
1986         mutex_unlock(&osdc->request_mutex);
1987         up_read(&osdc->map_sem);
1988 bad:
1989         pr_err("corrupt osd_op_reply got %d %d\n",
1990                (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1991         ceph_msg_dump(msg);
1992 }
1993
1994 static void reset_changed_osds(struct ceph_osd_client *osdc)
1995 {
1996         struct rb_node *p, *n;
1997
1998         dout("%s %p\n", __func__, osdc);
1999         for (p = rb_first(&osdc->osds); p; p = n) {
2000                 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
2001
2002                 n = rb_next(p);
2003                 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
2004                     memcmp(&osd->o_con.peer_addr,
2005                            ceph_osd_addr(osdc->osdmap,
2006                                          osd->o_osd),
2007                            sizeof(struct ceph_entity_addr)) != 0)
2008                         __reset_osd(osdc, osd);
2009         }
2010 }
2011
2012 /*
2013  * Requeue requests whose mapping to an OSD has changed.  If requests map to
2014  * no osd, request a new map.
2015  *
2016  * Caller should hold map_sem for read.
2017  */
2018 static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
2019                           bool force_resend_writes)
2020 {
2021         struct ceph_osd_request *req, *nreq;
2022         struct rb_node *p;
2023         int needmap = 0;
2024         int err;
2025         bool force_resend_req;
2026
2027         dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
2028                 force_resend_writes ? " (force resend writes)" : "");
2029         mutex_lock(&osdc->request_mutex);
2030         for (p = rb_first(&osdc->requests); p; ) {
2031                 req = rb_entry(p, struct ceph_osd_request, r_node);
2032                 p = rb_next(p);
2033
2034                 /*
2035                  * For linger requests that have not yet been
2036                  * registered, move them to the linger list; they'll
2037                  * be sent to the osd in the loop below.  Unregister
2038                  * the request before re-registering it as a linger
2039                  * request to ensure the __map_request() below
2040                  * will decide it needs to be sent.
2041                  */
2042                 if (req->r_linger && list_empty(&req->r_linger_item)) {
2043                         dout("%p tid %llu restart on osd%d\n",
2044                              req, req->r_tid,
2045                              req->r_osd ? req->r_osd->o_osd : -1);
2046                         ceph_osdc_get_request(req);
2047                         __unregister_request(osdc, req);
2048                         __register_linger_request(osdc, req);
2049                         ceph_osdc_put_request(req);
2050                         continue;
2051                 }
2052
2053                 force_resend_req = force_resend ||
2054                         (force_resend_writes &&
2055                                 req->r_flags & CEPH_OSD_FLAG_WRITE);
2056                 err = __map_request(osdc, req, force_resend_req);
2057                 if (err < 0)
2058                         continue;  /* error */
2059                 if (req->r_osd == NULL) {
2060                         dout("%p tid %llu maps to no osd\n", req, req->r_tid);
2061                         needmap++;  /* request a newer map */
2062                 } else if (err > 0) {
2063                         if (!req->r_linger) {
2064                                 dout("%p tid %llu requeued on osd%d\n", req,
2065                                      req->r_tid,
2066                                      req->r_osd ? req->r_osd->o_osd : -1);
2067                                 req->r_flags |= CEPH_OSD_FLAG_RETRY;
2068                         }
2069                 }
2070         }
2071
2072         list_for_each_entry_safe(req, nreq, &osdc->req_linger,
2073                                  r_linger_item) {
2074                 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
2075
2076                 err = __map_request(osdc, req,
2077                                     force_resend || force_resend_writes);
2078                 dout("__map_request returned %d\n", err);
2079                 if (err < 0)
2080                         continue;  /* hrm! */
2081                 if (req->r_osd == NULL || err > 0) {
2082                         if (req->r_osd == NULL) {
2083                                 dout("lingering %p tid %llu maps to no osd\n",
2084                                      req, req->r_tid);
2085                                 /*
2086                                  * A homeless lingering request makes
2087                                  * no sense, as it's job is to keep
2088                                  * a particular OSD connection open.
2089                                  * Request a newer map and kick the
2090                                  * request, knowing that it won't be
2091                                  * resent until we actually get a map
2092                                  * that can tell us where to send it.
2093                                  */
2094                                 needmap++;
2095                         }
2096
2097                         dout("kicking lingering %p tid %llu osd%d\n", req,
2098                              req->r_tid, req->r_osd ? req->r_osd->o_osd : -1);
2099                         __register_request(osdc, req);
2100                         __unregister_linger_request(osdc, req);
2101                 }
2102         }
2103         reset_changed_osds(osdc);
2104         mutex_unlock(&osdc->request_mutex);
2105
2106         if (needmap) {
2107                 dout("%d requests for down osds, need new map\n", needmap);
2108                 ceph_monc_request_next_osdmap(&osdc->client->monc);
2109         }
2110 }
2111
2112
2113 /*
2114  * Process updated osd map.
2115  *
2116  * The message contains any number of incremental and full maps, normally
2117  * indicating some sort of topology change in the cluster.  Kick requests
2118  * off to different OSDs as needed.
2119  */
2120 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
2121 {
2122         void *p, *end, *next;
2123         u32 nr_maps, maplen;
2124         u32 epoch;
2125         struct ceph_osdmap *newmap = NULL, *oldmap;
2126         int err;
2127         struct ceph_fsid fsid;
2128         bool was_full;
2129
2130         dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
2131         p = msg->front.iov_base;
2132         end = p + msg->front.iov_len;
2133
2134         /* verify fsid */
2135         ceph_decode_need(&p, end, sizeof(fsid), bad);
2136         ceph_decode_copy(&p, &fsid, sizeof(fsid));
2137         if (ceph_check_fsid(osdc->client, &fsid) < 0)
2138                 return;
2139
2140         down_write(&osdc->map_sem);
2141
2142         was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
2143
2144         /* incremental maps */
2145         ceph_decode_32_safe(&p, end, nr_maps, bad);
2146         dout(" %d inc maps\n", nr_maps);
2147         while (nr_maps > 0) {
2148                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2149                 epoch = ceph_decode_32(&p);
2150                 maplen = ceph_decode_32(&p);
2151                 ceph_decode_need(&p, end, maplen, bad);
2152                 next = p + maplen;
2153                 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
2154                         dout("applying incremental map %u len %d\n",
2155                              epoch, maplen);
2156                         newmap = osdmap_apply_incremental(&p, next,
2157                                                           osdc->osdmap,
2158                                                           &osdc->client->msgr);
2159                         if (IS_ERR(newmap)) {
2160                                 err = PTR_ERR(newmap);
2161                                 goto bad;
2162                         }
2163                         BUG_ON(!newmap);
2164                         if (newmap != osdc->osdmap) {
2165                                 ceph_osdmap_destroy(osdc->osdmap);
2166                                 osdc->osdmap = newmap;
2167                         }
2168                         was_full = was_full ||
2169                                 ceph_osdmap_flag(osdc->osdmap,
2170                                                  CEPH_OSDMAP_FULL);
2171                         kick_requests(osdc, 0, was_full);
2172                 } else {
2173                         dout("ignoring incremental map %u len %d\n",
2174                              epoch, maplen);
2175                 }
2176                 p = next;
2177                 nr_maps--;
2178         }
2179         if (newmap)
2180                 goto done;
2181
2182         /* full maps */
2183         ceph_decode_32_safe(&p, end, nr_maps, bad);
2184         dout(" %d full maps\n", nr_maps);
2185         while (nr_maps) {
2186                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2187                 epoch = ceph_decode_32(&p);
2188                 maplen = ceph_decode_32(&p);
2189                 ceph_decode_need(&p, end, maplen, bad);
2190                 if (nr_maps > 1) {
2191                         dout("skipping non-latest full map %u len %d\n",
2192                              epoch, maplen);
2193                 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
2194                         dout("skipping full map %u len %d, "
2195                              "older than our %u\n", epoch, maplen,
2196                              osdc->osdmap->epoch);
2197                 } else {
2198                         int skipped_map = 0;
2199
2200                         dout("taking full map %u len %d\n", epoch, maplen);
2201                         newmap = ceph_osdmap_decode(&p, p+maplen);
2202                         if (IS_ERR(newmap)) {
2203                                 err = PTR_ERR(newmap);
2204                                 goto bad;
2205                         }
2206                         BUG_ON(!newmap);
2207                         oldmap = osdc->osdmap;
2208                         osdc->osdmap = newmap;
2209                         if (oldmap) {
2210                                 if (oldmap->epoch + 1 < newmap->epoch)
2211                                         skipped_map = 1;
2212                                 ceph_osdmap_destroy(oldmap);
2213                         }
2214                         was_full = was_full ||
2215                                 ceph_osdmap_flag(osdc->osdmap,
2216                                                  CEPH_OSDMAP_FULL);
2217                         kick_requests(osdc, skipped_map, was_full);
2218                 }
2219                 p += maplen;
2220                 nr_maps--;
2221         }
2222
2223         if (!osdc->osdmap)
2224                 goto bad;
2225 done:
2226         downgrade_write(&osdc->map_sem);
2227         ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2228                           osdc->osdmap->epoch);
2229
2230         /*
2231          * subscribe to subsequent osdmap updates if full to ensure
2232          * we find out when we are no longer full and stop returning
2233          * ENOSPC.
2234          */
2235         if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
2236                 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
2237                 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
2238                 ceph_monc_request_next_osdmap(&osdc->client->monc);
2239
2240         mutex_lock(&osdc->request_mutex);
2241         __send_queued(osdc);
2242         mutex_unlock(&osdc->request_mutex);
2243         up_read(&osdc->map_sem);
2244         wake_up_all(&osdc->client->auth_wq);
2245         return;
2246
2247 bad:
2248         pr_err("osdc handle_map corrupt msg\n");
2249         ceph_msg_dump(msg);
2250         up_write(&osdc->map_sem);
2251 }
2252
2253 /*
2254  * watch/notify callback event infrastructure
2255  *
2256  * These callbacks are used both for watch and notify operations.
2257  */
2258 static void __release_event(struct kref *kref)
2259 {
2260         struct ceph_osd_event *event =
2261                 container_of(kref, struct ceph_osd_event, kref);
2262
2263         dout("__release_event %p\n", event);
2264         kfree(event);
2265 }
2266
2267 static void get_event(struct ceph_osd_event *event)
2268 {
2269         kref_get(&event->kref);
2270 }
2271
2272 void ceph_osdc_put_event(struct ceph_osd_event *event)
2273 {
2274         kref_put(&event->kref, __release_event);
2275 }
2276 EXPORT_SYMBOL(ceph_osdc_put_event);
2277
2278 static void __insert_event(struct ceph_osd_client *osdc,
2279                              struct ceph_osd_event *new)
2280 {
2281         struct rb_node **p = &osdc->event_tree.rb_node;
2282         struct rb_node *parent = NULL;
2283         struct ceph_osd_event *event = NULL;
2284
2285         while (*p) {
2286                 parent = *p;
2287                 event = rb_entry(parent, struct ceph_osd_event, node);
2288                 if (new->cookie < event->cookie)
2289                         p = &(*p)->rb_left;
2290                 else if (new->cookie > event->cookie)
2291                         p = &(*p)->rb_right;
2292                 else
2293                         BUG();
2294         }
2295
2296         rb_link_node(&new->node, parent, p);
2297         rb_insert_color(&new->node, &osdc->event_tree);
2298 }
2299
2300 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
2301                                                 u64 cookie)
2302 {
2303         struct rb_node **p = &osdc->event_tree.rb_node;
2304         struct rb_node *parent = NULL;
2305         struct ceph_osd_event *event = NULL;
2306
2307         while (*p) {
2308                 parent = *p;
2309                 event = rb_entry(parent, struct ceph_osd_event, node);
2310                 if (cookie < event->cookie)
2311                         p = &(*p)->rb_left;
2312                 else if (cookie > event->cookie)
2313                         p = &(*p)->rb_right;
2314                 else
2315                         return event;
2316         }
2317         return NULL;
2318 }
2319
2320 static void __remove_event(struct ceph_osd_event *event)
2321 {
2322         struct ceph_osd_client *osdc = event->osdc;
2323
2324         if (!RB_EMPTY_NODE(&event->node)) {
2325                 dout("__remove_event removed %p\n", event);
2326                 rb_erase(&event->node, &osdc->event_tree);
2327                 ceph_osdc_put_event(event);
2328         } else {
2329                 dout("__remove_event didn't remove %p\n", event);
2330         }
2331 }
2332
2333 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
2334                            void (*event_cb)(u64, u64, u8, void *),
2335                            void *data, struct ceph_osd_event **pevent)
2336 {
2337         struct ceph_osd_event *event;
2338
2339         event = kmalloc(sizeof(*event), GFP_NOIO);
2340         if (!event)
2341                 return -ENOMEM;
2342
2343         dout("create_event %p\n", event);
2344         event->cb = event_cb;
2345         event->one_shot = 0;
2346         event->data = data;
2347         event->osdc = osdc;
2348         INIT_LIST_HEAD(&event->osd_node);
2349         RB_CLEAR_NODE(&event->node);
2350         kref_init(&event->kref);   /* one ref for us */
2351         kref_get(&event->kref);    /* one ref for the caller */
2352
2353         spin_lock(&osdc->event_lock);
2354         event->cookie = ++osdc->event_count;
2355         __insert_event(osdc, event);
2356         spin_unlock(&osdc->event_lock);
2357
2358         *pevent = event;
2359         return 0;
2360 }
2361 EXPORT_SYMBOL(ceph_osdc_create_event);
2362
2363 void ceph_osdc_cancel_event(struct ceph_osd_event *event)
2364 {
2365         struct ceph_osd_client *osdc = event->osdc;
2366
2367         dout("cancel_event %p\n", event);
2368         spin_lock(&osdc->event_lock);
2369         __remove_event(event);
2370         spin_unlock(&osdc->event_lock);
2371         ceph_osdc_put_event(event); /* caller's */
2372 }
2373 EXPORT_SYMBOL(ceph_osdc_cancel_event);
2374
2375
2376 static void do_event_work(struct work_struct *work)
2377 {
2378         struct ceph_osd_event_work *event_work =
2379                 container_of(work, struct ceph_osd_event_work, work);
2380         struct ceph_osd_event *event = event_work->event;
2381         u64 ver = event_work->ver;
2382         u64 notify_id = event_work->notify_id;
2383         u8 opcode = event_work->opcode;
2384
2385         dout("do_event_work completing %p\n", event);
2386         event->cb(ver, notify_id, opcode, event->data);
2387         dout("do_event_work completed %p\n", event);
2388         ceph_osdc_put_event(event);
2389         kfree(event_work);
2390 }
2391
2392
2393 /*
2394  * Process osd watch notifications
2395  */
2396 static void handle_watch_notify(struct ceph_osd_client *osdc,
2397                                 struct ceph_msg *msg)
2398 {
2399         void *p, *end;
2400         u8 proto_ver;
2401         u64 cookie, ver, notify_id;
2402         u8 opcode;
2403         struct ceph_osd_event *event;
2404         struct ceph_osd_event_work *event_work;
2405
2406         p = msg->front.iov_base;
2407         end = p + msg->front.iov_len;
2408
2409         ceph_decode_8_safe(&p, end, proto_ver, bad);
2410         ceph_decode_8_safe(&p, end, opcode, bad);
2411         ceph_decode_64_safe(&p, end, cookie, bad);
2412         ceph_decode_64_safe(&p, end, ver, bad);
2413         ceph_decode_64_safe(&p, end, notify_id, bad);
2414
2415         spin_lock(&osdc->event_lock);
2416         event = __find_event(osdc, cookie);
2417         if (event) {
2418                 BUG_ON(event->one_shot);
2419                 get_event(event);
2420         }
2421         spin_unlock(&osdc->event_lock);
2422         dout("handle_watch_notify cookie %lld ver %lld event %p\n",
2423              cookie, ver, event);
2424         if (event) {
2425                 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
2426                 if (!event_work) {
2427                         pr_err("couldn't allocate event_work\n");
2428                         ceph_osdc_put_event(event);
2429                         return;
2430                 }
2431                 INIT_WORK(&event_work->work, do_event_work);
2432                 event_work->event = event;
2433                 event_work->ver = ver;
2434                 event_work->notify_id = notify_id;
2435                 event_work->opcode = opcode;
2436
2437                 queue_work(osdc->notify_wq, &event_work->work);
2438         }
2439
2440         return;
2441
2442 bad:
2443         pr_err("osdc handle_watch_notify corrupt msg\n");
2444 }
2445
2446 /*
2447  * build new request AND message
2448  *
2449  */
2450 void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
2451                                 struct ceph_snap_context *snapc, u64 snap_id,
2452                                 struct timespec *mtime)
2453 {
2454         struct ceph_msg *msg = req->r_request;
2455         void *p;
2456         size_t msg_size;
2457         int flags = req->r_flags;
2458         u64 data_len;
2459         unsigned int i;
2460
2461         req->r_snapid = snap_id;
2462         WARN_ON(snapc != req->r_snapc);
2463
2464         /* encode request */
2465         msg->hdr.version = cpu_to_le16(4);
2466
2467         p = msg->front.iov_base;
2468         ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
2469         req->r_request_osdmap_epoch = p;
2470         p += 4;
2471         req->r_request_flags = p;
2472         p += 4;
2473         if (req->r_flags & CEPH_OSD_FLAG_WRITE)
2474                 ceph_encode_timespec(p, mtime);
2475         p += sizeof(struct ceph_timespec);
2476         req->r_request_reassert_version = p;
2477         p += sizeof(struct ceph_eversion); /* will get filled in */
2478
2479         /* oloc */
2480         ceph_encode_8(&p, 4);
2481         ceph_encode_8(&p, 4);
2482         ceph_encode_32(&p, 8 + 4 + 4);
2483         req->r_request_pool = p;
2484         p += 8;
2485         ceph_encode_32(&p, -1);  /* preferred */
2486         ceph_encode_32(&p, 0);   /* key len */
2487
2488         ceph_encode_8(&p, 1);
2489         req->r_request_pgid = p;
2490         p += 8 + 4;
2491         ceph_encode_32(&p, -1);  /* preferred */
2492
2493         /* oid */
2494         ceph_encode_32(&p, req->r_base_oid.name_len);
2495         memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
2496         dout("oid '%.*s' len %d\n", req->r_base_oid.name_len,
2497              req->r_base_oid.name, req->r_base_oid.name_len);
2498         p += req->r_base_oid.name_len;
2499
2500         /* ops--can imply data */
2501         ceph_encode_16(&p, (u16)req->r_num_ops);
2502         data_len = 0;
2503         for (i = 0; i < req->r_num_ops; i++) {
2504                 data_len += osd_req_encode_op(req, p, i);
2505                 p += sizeof(struct ceph_osd_op);
2506         }
2507
2508         /* snaps */
2509         ceph_encode_64(&p, req->r_snapid);
2510         ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
2511         ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
2512         if (req->r_snapc) {
2513                 for (i = 0; i < req->r_snapc->num_snaps; i++) {
2514                         ceph_encode_64(&p, req->r_snapc->snaps[i]);
2515                 }
2516         }
2517
2518         req->r_request_attempts = p;
2519         p += 4;
2520
2521         /* data */
2522         if (flags & CEPH_OSD_FLAG_WRITE) {
2523                 u16 data_off;
2524
2525                 /*
2526                  * The header "data_off" is a hint to the receiver
2527                  * allowing it to align received data into its
2528                  * buffers such that there's no need to re-copy
2529                  * it before writing it to disk (direct I/O).
2530                  */
2531                 data_off = (u16) (off & 0xffff);
2532                 req->r_request->hdr.data_off = cpu_to_le16(data_off);
2533         }
2534         req->r_request->hdr.data_len = cpu_to_le32(data_len);
2535
2536         BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
2537         msg_size = p - msg->front.iov_base;
2538         msg->front.iov_len = msg_size;
2539         msg->hdr.front_len = cpu_to_le32(msg_size);
2540
2541         dout("build_request msg_size was %d\n", (int)msg_size);
2542 }
2543 EXPORT_SYMBOL(ceph_osdc_build_request);
2544
2545 /*
2546  * Register request, send initial attempt.
2547  */
2548 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
2549                             struct ceph_osd_request *req,
2550                             bool nofail)
2551 {
2552         int rc;
2553
2554         down_read(&osdc->map_sem);
2555         mutex_lock(&osdc->request_mutex);
2556
2557         rc = __ceph_osdc_start_request(osdc, req, nofail);
2558
2559         mutex_unlock(&osdc->request_mutex);
2560         up_read(&osdc->map_sem);
2561
2562         return rc;
2563 }
2564 EXPORT_SYMBOL(ceph_osdc_start_request);
2565
2566 /*
2567  * Unregister a registered request.  The request is not completed (i.e.
2568  * no callbacks or wakeups) - higher layers are supposed to know what
2569  * they are canceling.
2570  */
2571 void ceph_osdc_cancel_request(struct ceph_osd_request *req)
2572 {
2573         struct ceph_osd_client *osdc = req->r_osdc;
2574
2575         mutex_lock(&osdc->request_mutex);
2576         if (req->r_linger)
2577                 __unregister_linger_request(osdc, req);
2578         __unregister_request(osdc, req);
2579         mutex_unlock(&osdc->request_mutex);
2580
2581         dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid);
2582 }
2583 EXPORT_SYMBOL(ceph_osdc_cancel_request);
2584
2585 /*
2586  * wait for a request to complete
2587  */
2588 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
2589                            struct ceph_osd_request *req)
2590 {
2591         int rc;
2592
2593         dout("%s %p tid %llu\n", __func__, req, req->r_tid);
2594
2595         rc = wait_for_completion_interruptible(&req->r_completion);
2596         if (rc < 0) {
2597                 dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid);
2598                 ceph_osdc_cancel_request(req);
2599                 complete_request(req);
2600                 return rc;
2601         }
2602
2603         dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid,
2604              req->r_result);
2605         return req->r_result;
2606 }
2607 EXPORT_SYMBOL(ceph_osdc_wait_request);
2608
2609 /*
2610  * sync - wait for all in-flight requests to flush.  avoid starvation.
2611  */
2612 void ceph_osdc_sync(struct ceph_osd_client *osdc)
2613 {
2614         struct ceph_osd_request *req;
2615         u64 last_tid, next_tid = 0;
2616
2617         mutex_lock(&osdc->request_mutex);
2618         last_tid = osdc->last_tid;
2619         while (1) {
2620                 req = __lookup_request_ge(osdc, next_tid);
2621                 if (!req)
2622                         break;
2623                 if (req->r_tid > last_tid)
2624                         break;
2625
2626                 next_tid = req->r_tid + 1;
2627                 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
2628                         continue;
2629
2630                 ceph_osdc_get_request(req);
2631                 mutex_unlock(&osdc->request_mutex);
2632                 dout("sync waiting on tid %llu (last is %llu)\n",
2633                      req->r_tid, last_tid);
2634                 wait_for_completion(&req->r_safe_completion);
2635                 mutex_lock(&osdc->request_mutex);
2636                 ceph_osdc_put_request(req);
2637         }
2638         mutex_unlock(&osdc->request_mutex);
2639         dout("sync done (thru tid %llu)\n", last_tid);
2640 }
2641 EXPORT_SYMBOL(ceph_osdc_sync);
2642
2643 /*
2644  * Call all pending notify callbacks - for use after a watch is
2645  * unregistered, to make sure no more callbacks for it will be invoked
2646  */
2647 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
2648 {
2649         flush_workqueue(osdc->notify_wq);
2650 }
2651 EXPORT_SYMBOL(ceph_osdc_flush_notifies);
2652
2653
2654 /*
2655  * init, shutdown
2656  */
2657 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
2658 {
2659         int err;
2660
2661         dout("init\n");
2662         osdc->client = client;
2663         osdc->osdmap = NULL;
2664         init_rwsem(&osdc->map_sem);
2665         init_completion(&osdc->map_waiters);
2666         osdc->last_requested_map = 0;
2667         mutex_init(&osdc->request_mutex);
2668         osdc->last_tid = 0;
2669         osdc->osds = RB_ROOT;
2670         INIT_LIST_HEAD(&osdc->osd_lru);
2671         osdc->requests = RB_ROOT;
2672         INIT_LIST_HEAD(&osdc->req_lru);
2673         INIT_LIST_HEAD(&osdc->req_unsent);
2674         INIT_LIST_HEAD(&osdc->req_notarget);
2675         INIT_LIST_HEAD(&osdc->req_linger);
2676         osdc->num_requests = 0;
2677         INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
2678         INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
2679         spin_lock_init(&osdc->event_lock);
2680         osdc->event_tree = RB_ROOT;
2681         osdc->event_count = 0;
2682
2683         schedule_delayed_work(&osdc->osds_timeout_work,
2684             round_jiffies_relative(osdc->client->options->osd_idle_ttl));
2685
2686         err = -ENOMEM;
2687         osdc->req_mempool = mempool_create_slab_pool(10,
2688                                                      ceph_osd_request_cache);
2689         if (!osdc->req_mempool)
2690                 goto out;
2691
2692         err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
2693                                 OSD_OP_FRONT_LEN, 10, true,
2694                                 "osd_op");
2695         if (err < 0)
2696                 goto out_mempool;
2697         err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
2698                                 OSD_OPREPLY_FRONT_LEN, 10, true,
2699                                 "osd_op_reply");
2700         if (err < 0)
2701                 goto out_msgpool;
2702
2703         err = -ENOMEM;
2704         osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
2705         if (!osdc->notify_wq)
2706                 goto out_msgpool_reply;
2707
2708         return 0;
2709
2710 out_msgpool_reply:
2711         ceph_msgpool_destroy(&osdc->msgpool_op_reply);
2712 out_msgpool:
2713         ceph_msgpool_destroy(&osdc->msgpool_op);
2714 out_mempool:
2715         mempool_destroy(osdc->req_mempool);
2716 out:
2717         return err;
2718 }
2719
2720 void ceph_osdc_stop(struct ceph_osd_client *osdc)
2721 {
2722         flush_workqueue(osdc->notify_wq);
2723         destroy_workqueue(osdc->notify_wq);
2724         cancel_delayed_work_sync(&osdc->timeout_work);
2725         cancel_delayed_work_sync(&osdc->osds_timeout_work);
2726         if (osdc->osdmap) {
2727                 ceph_osdmap_destroy(osdc->osdmap);
2728                 osdc->osdmap = NULL;
2729         }
2730         remove_all_osds(osdc);
2731         mempool_destroy(osdc->req_mempool);
2732         ceph_msgpool_destroy(&osdc->msgpool_op);
2733         ceph_msgpool_destroy(&osdc->msgpool_op_reply);
2734 }
2735
2736 /*
2737  * Read some contiguous pages.  If we cross a stripe boundary, shorten
2738  * *plen.  Return number of bytes read, or error.
2739  */
2740 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
2741                         struct ceph_vino vino, struct ceph_file_layout *layout,
2742                         u64 off, u64 *plen,
2743                         u32 truncate_seq, u64 truncate_size,
2744                         struct page **pages, int num_pages, int page_align)
2745 {
2746         struct ceph_osd_request *req;
2747         int rc = 0;
2748
2749         dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
2750              vino.snap, off, *plen);
2751         req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
2752                                     CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
2753                                     NULL, truncate_seq, truncate_size,
2754                                     false);
2755         if (IS_ERR(req))
2756                 return PTR_ERR(req);
2757
2758         /* it may be a short read due to an object boundary */
2759
2760         osd_req_op_extent_osd_data_pages(req, 0,
2761                                 pages, *plen, page_align, false, false);
2762
2763         dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
2764              off, *plen, *plen, page_align);
2765
2766         ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
2767
2768         rc = ceph_osdc_start_request(osdc, req, false);
2769         if (!rc)
2770                 rc = ceph_osdc_wait_request(osdc, req);
2771
2772         ceph_osdc_put_request(req);
2773         dout("readpages result %d\n", rc);
2774         return rc;
2775 }
2776 EXPORT_SYMBOL(ceph_osdc_readpages);
2777
2778 /*
2779  * do a synchronous write on N pages
2780  */
2781 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
2782                          struct ceph_file_layout *layout,
2783                          struct ceph_snap_context *snapc,
2784                          u64 off, u64 len,
2785                          u32 truncate_seq, u64 truncate_size,
2786                          struct timespec *mtime,
2787                          struct page **pages, int num_pages)
2788 {
2789         struct ceph_osd_request *req;
2790         int rc = 0;
2791         int page_align = off & ~PAGE_MASK;
2792
2793         BUG_ON(vino.snap != CEPH_NOSNAP);       /* snapshots aren't writeable */
2794         req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
2795                                     CEPH_OSD_OP_WRITE,
2796                                     CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
2797                                     snapc, truncate_seq, truncate_size,
2798                                     true);
2799         if (IS_ERR(req))
2800                 return PTR_ERR(req);
2801
2802         /* it may be a short write due to an object boundary */
2803         osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
2804                                 false, false);
2805         dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
2806
2807         ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime);
2808
2809         rc = ceph_osdc_start_request(osdc, req, true);
2810         if (!rc)
2811                 rc = ceph_osdc_wait_request(osdc, req);
2812
2813         ceph_osdc_put_request(req);
2814         if (rc == 0)
2815                 rc = len;
2816         dout("writepages result %d\n", rc);
2817         return rc;
2818 }
2819 EXPORT_SYMBOL(ceph_osdc_writepages);
2820
2821 int ceph_osdc_setup(void)
2822 {
2823         size_t size = sizeof(struct ceph_osd_request) +
2824             CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
2825
2826         BUG_ON(ceph_osd_request_cache);
2827         ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
2828                                                    0, 0, NULL);
2829
2830         return ceph_osd_request_cache ? 0 : -ENOMEM;
2831 }
2832 EXPORT_SYMBOL(ceph_osdc_setup);
2833
2834 void ceph_osdc_cleanup(void)
2835 {
2836         BUG_ON(!ceph_osd_request_cache);
2837         kmem_cache_destroy(ceph_osd_request_cache);
2838         ceph_osd_request_cache = NULL;
2839 }
2840 EXPORT_SYMBOL(ceph_osdc_cleanup);
2841
2842 /*
2843  * handle incoming message
2844  */
2845 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2846 {
2847         struct ceph_osd *osd = con->private;
2848         struct ceph_osd_client *osdc;
2849         int type = le16_to_cpu(msg->hdr.type);
2850
2851         if (!osd)
2852                 goto out;
2853         osdc = osd->o_osdc;
2854
2855         switch (type) {
2856         case CEPH_MSG_OSD_MAP:
2857                 ceph_osdc_handle_map(osdc, msg);
2858                 break;
2859         case CEPH_MSG_OSD_OPREPLY:
2860                 handle_reply(osdc, msg);
2861                 break;
2862         case CEPH_MSG_WATCH_NOTIFY:
2863                 handle_watch_notify(osdc, msg);
2864                 break;
2865
2866         default:
2867                 pr_err("received unknown message type %d %s\n", type,
2868                        ceph_msg_type_name(type));
2869         }
2870 out:
2871         ceph_msg_put(msg);
2872 }
2873
2874 /*
2875  * Lookup and return message for incoming reply.  Don't try to do
2876  * anything about a larger than preallocated data portion of the
2877  * message at the moment - for now, just skip the message.
2878  */
2879 static struct ceph_msg *get_reply(struct ceph_connection *con,
2880                                   struct ceph_msg_header *hdr,
2881                                   int *skip)
2882 {
2883         struct ceph_osd *osd = con->private;
2884         struct ceph_osd_client *osdc = osd->o_osdc;
2885         struct ceph_msg *m;
2886         struct ceph_osd_request *req;
2887         int front_len = le32_to_cpu(hdr->front_len);
2888         int data_len = le32_to_cpu(hdr->data_len);
2889         u64 tid;
2890
2891         tid = le64_to_cpu(hdr->tid);
2892         mutex_lock(&osdc->request_mutex);
2893         req = __lookup_request(osdc, tid);
2894         if (!req) {
2895                 dout("%s osd%d tid %llu unknown, skipping\n", __func__,
2896                      osd->o_osd, tid);
2897                 m = NULL;
2898                 *skip = 1;
2899                 goto out;
2900         }
2901
2902         ceph_msg_revoke_incoming(req->r_reply);
2903
2904         if (front_len > req->r_reply->front_alloc_len) {
2905                 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
2906                         __func__, osd->o_osd, req->r_tid, front_len,
2907                         req->r_reply->front_alloc_len);
2908                 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
2909                                  false);
2910                 if (!m)
2911                         goto out;
2912                 ceph_msg_put(req->r_reply);
2913                 req->r_reply = m;
2914         }
2915
2916         if (data_len > req->r_reply->data_length) {
2917                 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
2918                         __func__, osd->o_osd, req->r_tid, data_len,
2919                         req->r_reply->data_length);
2920                 m = NULL;
2921                 *skip = 1;
2922                 goto out;
2923         }
2924
2925         m = ceph_msg_get(req->r_reply);
2926         dout("get_reply tid %lld %p\n", tid, m);
2927
2928 out:
2929         mutex_unlock(&osdc->request_mutex);
2930         return m;
2931 }
2932
2933 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2934                                   struct ceph_msg_header *hdr,
2935                                   int *skip)
2936 {
2937         struct ceph_osd *osd = con->private;
2938         int type = le16_to_cpu(hdr->type);
2939         int front = le32_to_cpu(hdr->front_len);
2940
2941         *skip = 0;
2942         switch (type) {
2943         case CEPH_MSG_OSD_MAP:
2944         case CEPH_MSG_WATCH_NOTIFY:
2945                 return ceph_msg_new(type, front, GFP_NOFS, false);
2946         case CEPH_MSG_OSD_OPREPLY:
2947                 return get_reply(con, hdr, skip);
2948         default:
2949                 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2950                         osd->o_osd);
2951                 *skip = 1;
2952                 return NULL;
2953         }
2954 }
2955
2956 /*
2957  * Wrappers to refcount containing ceph_osd struct
2958  */
2959 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2960 {
2961         struct ceph_osd *osd = con->private;
2962         if (get_osd(osd))
2963                 return con;
2964         return NULL;
2965 }
2966
2967 static void put_osd_con(struct ceph_connection *con)
2968 {
2969         struct ceph_osd *osd = con->private;
2970         put_osd(osd);
2971 }
2972
2973 /*
2974  * authentication
2975  */
2976 /*
2977  * Note: returned pointer is the address of a structure that's
2978  * managed separately.  Caller must *not* attempt to free it.
2979  */
2980 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2981                                         int *proto, int force_new)
2982 {
2983         struct ceph_osd *o = con->private;
2984         struct ceph_osd_client *osdc = o->o_osdc;
2985         struct ceph_auth_client *ac = osdc->client->monc.auth;
2986         struct ceph_auth_handshake *auth = &o->o_auth;
2987
2988         if (force_new && auth->authorizer) {
2989                 ceph_auth_destroy_authorizer(auth->authorizer);
2990                 auth->authorizer = NULL;
2991         }
2992         if (!auth->authorizer) {
2993                 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2994                                                       auth);
2995                 if (ret)
2996                         return ERR_PTR(ret);
2997         } else {
2998                 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2999                                                      auth);
3000                 if (ret)
3001                         return ERR_PTR(ret);
3002         }
3003         *proto = ac->protocol;
3004
3005         return auth;
3006 }
3007
3008
3009 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3010 {
3011         struct ceph_osd *o = con->private;
3012         struct ceph_osd_client *osdc = o->o_osdc;
3013         struct ceph_auth_client *ac = osdc->client->monc.auth;
3014
3015         return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len);
3016 }
3017
3018 static int invalidate_authorizer(struct ceph_connection *con)
3019 {
3020         struct ceph_osd *o = con->private;
3021         struct ceph_osd_client *osdc = o->o_osdc;
3022         struct ceph_auth_client *ac = osdc->client->monc.auth;
3023
3024         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
3025         return ceph_monc_validate_auth(&osdc->client->monc);
3026 }
3027
3028 static int osd_sign_message(struct ceph_msg *msg)
3029 {
3030         struct ceph_osd *o = msg->con->private;
3031         struct ceph_auth_handshake *auth = &o->o_auth;
3032
3033         return ceph_auth_sign_message(auth, msg);
3034 }
3035
3036 static int osd_check_message_signature(struct ceph_msg *msg)
3037 {
3038         struct ceph_osd *o = msg->con->private;
3039         struct ceph_auth_handshake *auth = &o->o_auth;
3040
3041         return ceph_auth_check_message_signature(auth, msg);
3042 }
3043
3044 static const struct ceph_connection_operations osd_con_ops = {
3045         .get = get_osd_con,
3046         .put = put_osd_con,
3047         .dispatch = dispatch,
3048         .get_authorizer = get_authorizer,
3049         .verify_authorizer_reply = verify_authorizer_reply,
3050         .invalidate_authorizer = invalidate_authorizer,
3051         .alloc_msg = alloc_msg,
3052         .sign_message = osd_sign_message,
3053         .check_message_signature = osd_check_message_signature,
3054         .fault = osd_reset,
3055 };