Merge tag 'fixes-rcu-fiq-signed' of git://git.kernel.org/pub/scm/linux/kernel/git...
[cascardo/linux.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
55 #include <linux/module.h> /* try_module_get()/module_put() */
56
57 #include "xprt_rdma.h"
58
59 /*
60  * Globals/Macros
61  */
62
63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
64 # define RPCDBG_FACILITY        RPCDBG_TRANS
65 #endif
66
67 /*
68  * internal functions
69  */
70
71 static struct workqueue_struct *rpcrdma_receive_wq;
72
73 int
74 rpcrdma_alloc_wq(void)
75 {
76         struct workqueue_struct *recv_wq;
77
78         recv_wq = alloc_workqueue("xprtrdma_receive",
79                                   WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
80                                   0);
81         if (!recv_wq)
82                 return -ENOMEM;
83
84         rpcrdma_receive_wq = recv_wq;
85         return 0;
86 }
87
88 void
89 rpcrdma_destroy_wq(void)
90 {
91         struct workqueue_struct *wq;
92
93         if (rpcrdma_receive_wq) {
94                 wq = rpcrdma_receive_wq;
95                 rpcrdma_receive_wq = NULL;
96                 destroy_workqueue(wq);
97         }
98 }
99
100 static void
101 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
102 {
103         struct rpcrdma_ep *ep = context;
104
105         pr_err("RPC:       %s: %s on device %s ep %p\n",
106                __func__, ib_event_msg(event->event),
107                 event->device->name, context);
108         if (ep->rep_connected == 1) {
109                 ep->rep_connected = -EIO;
110                 rpcrdma_conn_func(ep);
111                 wake_up_all(&ep->rep_connect_wait);
112         }
113 }
114
115 /**
116  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
117  * @cq: completion queue (ignored)
118  * @wc: completed WR
119  *
120  */
121 static void
122 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
123 {
124         /* WARNING: Only wr_cqe and status are reliable at this point */
125         if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
126                 pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
127                        ib_wc_status_msg(wc->status),
128                        wc->status, wc->vendor_err);
129 }
130
131 static void
132 rpcrdma_receive_worker(struct work_struct *work)
133 {
134         struct rpcrdma_rep *rep =
135                         container_of(work, struct rpcrdma_rep, rr_work);
136
137         rpcrdma_reply_handler(rep);
138 }
139
140 /* Perform basic sanity checking to avoid using garbage
141  * to update the credit grant value.
142  */
143 static void
144 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
145 {
146         struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
147         struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
148         u32 credits;
149
150         if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
151                 return;
152
153         credits = be32_to_cpu(rmsgp->rm_credit);
154         if (credits == 0)
155                 credits = 1;    /* don't deadlock */
156         else if (credits > buffer->rb_max_requests)
157                 credits = buffer->rb_max_requests;
158
159         atomic_set(&buffer->rb_credits, credits);
160 }
161
162 /**
163  * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
164  * @cq: completion queue (ignored)
165  * @wc: completed WR
166  *
167  */
168 static void
169 rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
170 {
171         struct ib_cqe *cqe = wc->wr_cqe;
172         struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
173                                                rr_cqe);
174
175         /* WARNING: Only wr_id and status are reliable at this point */
176         if (wc->status != IB_WC_SUCCESS)
177                 goto out_fail;
178
179         /* status == SUCCESS means all fields in wc are trustworthy */
180         if (wc->opcode != IB_WC_RECV)
181                 return;
182
183         dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
184                 __func__, rep, wc->byte_len);
185
186         rep->rr_len = wc->byte_len;
187         ib_dma_sync_single_for_cpu(rep->rr_device,
188                                    rdmab_addr(rep->rr_rdmabuf),
189                                    rep->rr_len, DMA_FROM_DEVICE);
190
191         rpcrdma_update_granted_credits(rep);
192
193 out_schedule:
194         queue_work(rpcrdma_receive_wq, &rep->rr_work);
195         return;
196
197 out_fail:
198         if (wc->status != IB_WC_WR_FLUSH_ERR)
199                 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
200                        ib_wc_status_msg(wc->status),
201                        wc->status, wc->vendor_err);
202         rep->rr_len = RPCRDMA_BAD_LEN;
203         goto out_schedule;
204 }
205
206 static int
207 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
208 {
209         struct rpcrdma_xprt *xprt = id->context;
210         struct rpcrdma_ia *ia = &xprt->rx_ia;
211         struct rpcrdma_ep *ep = &xprt->rx_ep;
212 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
213         struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
214 #endif
215         struct ib_qp_attr *attr = &ia->ri_qp_attr;
216         struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
217         int connstate = 0;
218
219         switch (event->event) {
220         case RDMA_CM_EVENT_ADDR_RESOLVED:
221         case RDMA_CM_EVENT_ROUTE_RESOLVED:
222                 ia->ri_async_rc = 0;
223                 complete(&ia->ri_done);
224                 break;
225         case RDMA_CM_EVENT_ADDR_ERROR:
226                 ia->ri_async_rc = -EHOSTUNREACH;
227                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
228                         __func__, ep);
229                 complete(&ia->ri_done);
230                 break;
231         case RDMA_CM_EVENT_ROUTE_ERROR:
232                 ia->ri_async_rc = -ENETUNREACH;
233                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
234                         __func__, ep);
235                 complete(&ia->ri_done);
236                 break;
237         case RDMA_CM_EVENT_ESTABLISHED:
238                 connstate = 1;
239                 ib_query_qp(ia->ri_id->qp, attr,
240                             IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
241                             iattr);
242                 dprintk("RPC:       %s: %d responder resources"
243                         " (%d initiator)\n",
244                         __func__, attr->max_dest_rd_atomic,
245                         attr->max_rd_atomic);
246                 goto connected;
247         case RDMA_CM_EVENT_CONNECT_ERROR:
248                 connstate = -ENOTCONN;
249                 goto connected;
250         case RDMA_CM_EVENT_UNREACHABLE:
251                 connstate = -ENETDOWN;
252                 goto connected;
253         case RDMA_CM_EVENT_REJECTED:
254                 connstate = -ECONNREFUSED;
255                 goto connected;
256         case RDMA_CM_EVENT_DISCONNECTED:
257                 connstate = -ECONNABORTED;
258                 goto connected;
259         case RDMA_CM_EVENT_DEVICE_REMOVAL:
260                 connstate = -ENODEV;
261 connected:
262                 dprintk("RPC:       %s: %sconnected\n",
263                                         __func__, connstate > 0 ? "" : "dis");
264                 atomic_set(&xprt->rx_buf.rb_credits, 1);
265                 ep->rep_connected = connstate;
266                 rpcrdma_conn_func(ep);
267                 wake_up_all(&ep->rep_connect_wait);
268                 /*FALLTHROUGH*/
269         default:
270                 dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
271                         __func__, sap, rpc_get_port(sap), ep,
272                         rdma_event_msg(event->event));
273                 break;
274         }
275
276 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
277         if (connstate == 1) {
278                 int ird = attr->max_dest_rd_atomic;
279                 int tird = ep->rep_remote_cma.responder_resources;
280
281                 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
282                         sap, rpc_get_port(sap),
283                         ia->ri_device->name,
284                         ia->ri_ops->ro_displayname,
285                         xprt->rx_buf.rb_max_requests,
286                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
287         } else if (connstate < 0) {
288                 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
289                         sap, rpc_get_port(sap), connstate);
290         }
291 #endif
292
293         return 0;
294 }
295
296 static void rpcrdma_destroy_id(struct rdma_cm_id *id)
297 {
298         if (id) {
299                 module_put(id->device->owner);
300                 rdma_destroy_id(id);
301         }
302 }
303
304 static struct rdma_cm_id *
305 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
306                         struct rpcrdma_ia *ia, struct sockaddr *addr)
307 {
308         struct rdma_cm_id *id;
309         int rc;
310
311         init_completion(&ia->ri_done);
312
313         id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
314                             IB_QPT_RC);
315         if (IS_ERR(id)) {
316                 rc = PTR_ERR(id);
317                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
318                         __func__, rc);
319                 return id;
320         }
321
322         ia->ri_async_rc = -ETIMEDOUT;
323         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
324         if (rc) {
325                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
326                         __func__, rc);
327                 goto out;
328         }
329         wait_for_completion_interruptible_timeout(&ia->ri_done,
330                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
331
332         /* FIXME:
333          * Until xprtrdma supports DEVICE_REMOVAL, the provider must
334          * be pinned while there are active NFS/RDMA mounts to prevent
335          * hangs and crashes at umount time.
336          */
337         if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
338                 dprintk("RPC:       %s: Failed to get device module\n",
339                         __func__);
340                 ia->ri_async_rc = -ENODEV;
341         }
342         rc = ia->ri_async_rc;
343         if (rc)
344                 goto out;
345
346         ia->ri_async_rc = -ETIMEDOUT;
347         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
348         if (rc) {
349                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
350                         __func__, rc);
351                 goto put;
352         }
353         wait_for_completion_interruptible_timeout(&ia->ri_done,
354                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
355         rc = ia->ri_async_rc;
356         if (rc)
357                 goto put;
358
359         return id;
360 put:
361         module_put(id->device->owner);
362 out:
363         rdma_destroy_id(id);
364         return ERR_PTR(rc);
365 }
366
367 /*
368  * Exported functions.
369  */
370
371 /*
372  * Open and initialize an Interface Adapter.
373  *  o initializes fields of struct rpcrdma_ia, including
374  *    interface and provider attributes and protection zone.
375  */
376 int
377 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
378 {
379         struct rpcrdma_ia *ia = &xprt->rx_ia;
380         int rc;
381
382         ia->ri_dma_mr = NULL;
383
384         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
385         if (IS_ERR(ia->ri_id)) {
386                 rc = PTR_ERR(ia->ri_id);
387                 goto out1;
388         }
389         ia->ri_device = ia->ri_id->device;
390
391         ia->ri_pd = ib_alloc_pd(ia->ri_device);
392         if (IS_ERR(ia->ri_pd)) {
393                 rc = PTR_ERR(ia->ri_pd);
394                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
395                         __func__, rc);
396                 goto out2;
397         }
398
399         if (memreg == RPCRDMA_FRMR) {
400                 if (!(ia->ri_device->attrs.device_cap_flags &
401                                 IB_DEVICE_MEM_MGT_EXTENSIONS) ||
402                     (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) {
403                         dprintk("RPC:       %s: FRMR registration "
404                                 "not supported by HCA\n", __func__);
405                         memreg = RPCRDMA_MTHCAFMR;
406                 }
407         }
408         if (memreg == RPCRDMA_MTHCAFMR) {
409                 if (!ia->ri_device->alloc_fmr) {
410                         dprintk("RPC:       %s: MTHCAFMR registration "
411                                 "not supported by HCA\n", __func__);
412                         rc = -EINVAL;
413                         goto out3;
414                 }
415         }
416
417         switch (memreg) {
418         case RPCRDMA_FRMR:
419                 ia->ri_ops = &rpcrdma_frwr_memreg_ops;
420                 break;
421         case RPCRDMA_ALLPHYSICAL:
422                 ia->ri_ops = &rpcrdma_physical_memreg_ops;
423                 break;
424         case RPCRDMA_MTHCAFMR:
425                 ia->ri_ops = &rpcrdma_fmr_memreg_ops;
426                 break;
427         default:
428                 printk(KERN_ERR "RPC: Unsupported memory "
429                                 "registration mode: %d\n", memreg);
430                 rc = -ENOMEM;
431                 goto out3;
432         }
433         dprintk("RPC:       %s: memory registration strategy is '%s'\n",
434                 __func__, ia->ri_ops->ro_displayname);
435
436         return 0;
437
438 out3:
439         ib_dealloc_pd(ia->ri_pd);
440         ia->ri_pd = NULL;
441 out2:
442         rpcrdma_destroy_id(ia->ri_id);
443         ia->ri_id = NULL;
444 out1:
445         return rc;
446 }
447
448 /*
449  * Clean up/close an IA.
450  *   o if event handles and PD have been initialized, free them.
451  *   o close the IA
452  */
453 void
454 rpcrdma_ia_close(struct rpcrdma_ia *ia)
455 {
456         dprintk("RPC:       %s: entering\n", __func__);
457         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
458                 if (ia->ri_id->qp)
459                         rdma_destroy_qp(ia->ri_id);
460                 rpcrdma_destroy_id(ia->ri_id);
461                 ia->ri_id = NULL;
462         }
463
464         /* If the pd is still busy, xprtrdma missed freeing a resource */
465         if (ia->ri_pd && !IS_ERR(ia->ri_pd))
466                 ib_dealloc_pd(ia->ri_pd);
467 }
468
469 /*
470  * Create unconnected endpoint.
471  */
472 int
473 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
474                                 struct rpcrdma_create_data_internal *cdata)
475 {
476         struct ib_cq *sendcq, *recvcq;
477         unsigned int max_qp_wr;
478         int rc;
479
480         if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
481                 dprintk("RPC:       %s: insufficient sge's available\n",
482                         __func__);
483                 return -ENOMEM;
484         }
485
486         if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
487                 dprintk("RPC:       %s: insufficient wqe's available\n",
488                         __func__);
489                 return -ENOMEM;
490         }
491         max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
492
493         /* check provider's send/recv wr limits */
494         if (cdata->max_requests > max_qp_wr)
495                 cdata->max_requests = max_qp_wr;
496
497         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
498         ep->rep_attr.qp_context = ep;
499         ep->rep_attr.srq = NULL;
500         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
501         ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
502         ep->rep_attr.cap.max_send_wr += 1;      /* drain cqe */
503         rc = ia->ri_ops->ro_open(ia, ep, cdata);
504         if (rc)
505                 return rc;
506         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
507         ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
508         ep->rep_attr.cap.max_recv_wr += 1;      /* drain cqe */
509         ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
510         ep->rep_attr.cap.max_recv_sge = 1;
511         ep->rep_attr.cap.max_inline_data = 0;
512         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
513         ep->rep_attr.qp_type = IB_QPT_RC;
514         ep->rep_attr.port_num = ~0;
515
516         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
517                 "iovs: send %d recv %d\n",
518                 __func__,
519                 ep->rep_attr.cap.max_send_wr,
520                 ep->rep_attr.cap.max_recv_wr,
521                 ep->rep_attr.cap.max_send_sge,
522                 ep->rep_attr.cap.max_recv_sge);
523
524         /* set trigger for requesting send completion */
525         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
526         if (ep->rep_cqinit <= 2)
527                 ep->rep_cqinit = 0;     /* always signal? */
528         INIT_CQCOUNT(ep);
529         init_waitqueue_head(&ep->rep_connect_wait);
530         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
531
532         sendcq = ib_alloc_cq(ia->ri_device, NULL,
533                              ep->rep_attr.cap.max_send_wr + 1,
534                              0, IB_POLL_SOFTIRQ);
535         if (IS_ERR(sendcq)) {
536                 rc = PTR_ERR(sendcq);
537                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
538                         __func__, rc);
539                 goto out1;
540         }
541
542         recvcq = ib_alloc_cq(ia->ri_device, NULL,
543                              ep->rep_attr.cap.max_recv_wr + 1,
544                              0, IB_POLL_SOFTIRQ);
545         if (IS_ERR(recvcq)) {
546                 rc = PTR_ERR(recvcq);
547                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
548                         __func__, rc);
549                 goto out2;
550         }
551
552         ep->rep_attr.send_cq = sendcq;
553         ep->rep_attr.recv_cq = recvcq;
554
555         /* Initialize cma parameters */
556         memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
557
558         /* RPC/RDMA does not use private data */
559         ep->rep_remote_cma.private_data = NULL;
560         ep->rep_remote_cma.private_data_len = 0;
561
562         /* Client offers RDMA Read but does not initiate */
563         ep->rep_remote_cma.initiator_depth = 0;
564         if (ia->ri_device->attrs.max_qp_rd_atom > 32)   /* arbitrary but <= 255 */
565                 ep->rep_remote_cma.responder_resources = 32;
566         else
567                 ep->rep_remote_cma.responder_resources =
568                                                 ia->ri_device->attrs.max_qp_rd_atom;
569
570         /* Limit transport retries so client can detect server
571          * GID changes quickly. RPC layer handles re-establishing
572          * transport connection and retransmission.
573          */
574         ep->rep_remote_cma.retry_count = 6;
575
576         /* RPC-over-RDMA handles its own flow control. In addition,
577          * make all RNR NAKs visible so we know that RPC-over-RDMA
578          * flow control is working correctly (no NAKs should be seen).
579          */
580         ep->rep_remote_cma.flow_control = 0;
581         ep->rep_remote_cma.rnr_retry_count = 0;
582
583         return 0;
584
585 out2:
586         ib_free_cq(sendcq);
587 out1:
588         if (ia->ri_dma_mr)
589                 ib_dereg_mr(ia->ri_dma_mr);
590         return rc;
591 }
592
593 /*
594  * rpcrdma_ep_destroy
595  *
596  * Disconnect and destroy endpoint. After this, the only
597  * valid operations on the ep are to free it (if dynamically
598  * allocated) or re-create it.
599  */
600 void
601 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
602 {
603         int rc;
604
605         dprintk("RPC:       %s: entering, connected is %d\n",
606                 __func__, ep->rep_connected);
607
608         cancel_delayed_work_sync(&ep->rep_connect_worker);
609
610         if (ia->ri_id->qp) {
611                 rpcrdma_ep_disconnect(ep, ia);
612                 rdma_destroy_qp(ia->ri_id);
613                 ia->ri_id->qp = NULL;
614         }
615
616         ib_free_cq(ep->rep_attr.recv_cq);
617         ib_free_cq(ep->rep_attr.send_cq);
618
619         if (ia->ri_dma_mr) {
620                 rc = ib_dereg_mr(ia->ri_dma_mr);
621                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
622                         __func__, rc);
623         }
624 }
625
626 /*
627  * Connect unconnected endpoint.
628  */
629 int
630 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
631 {
632         struct rdma_cm_id *id, *old;
633         int rc = 0;
634         int retry_count = 0;
635
636         if (ep->rep_connected != 0) {
637                 struct rpcrdma_xprt *xprt;
638 retry:
639                 dprintk("RPC:       %s: reconnecting...\n", __func__);
640
641                 rpcrdma_ep_disconnect(ep, ia);
642
643                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
644                 id = rpcrdma_create_id(xprt, ia,
645                                 (struct sockaddr *)&xprt->rx_data.addr);
646                 if (IS_ERR(id)) {
647                         rc = -EHOSTUNREACH;
648                         goto out;
649                 }
650                 /* TEMP TEMP TEMP - fail if new device:
651                  * Deregister/remarshal *all* requests!
652                  * Close and recreate adapter, pd, etc!
653                  * Re-determine all attributes still sane!
654                  * More stuff I haven't thought of!
655                  * Rrrgh!
656                  */
657                 if (ia->ri_device != id->device) {
658                         printk("RPC:       %s: can't reconnect on "
659                                 "different device!\n", __func__);
660                         rpcrdma_destroy_id(id);
661                         rc = -ENETUNREACH;
662                         goto out;
663                 }
664                 /* END TEMP */
665                 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
666                 if (rc) {
667                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
668                                 __func__, rc);
669                         rpcrdma_destroy_id(id);
670                         rc = -ENETUNREACH;
671                         goto out;
672                 }
673
674                 old = ia->ri_id;
675                 ia->ri_id = id;
676
677                 rdma_destroy_qp(old);
678                 rpcrdma_destroy_id(old);
679         } else {
680                 dprintk("RPC:       %s: connecting...\n", __func__);
681                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
682                 if (rc) {
683                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
684                                 __func__, rc);
685                         /* do not update ep->rep_connected */
686                         return -ENETUNREACH;
687                 }
688         }
689
690         ep->rep_connected = 0;
691
692         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
693         if (rc) {
694                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
695                                 __func__, rc);
696                 goto out;
697         }
698
699         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
700
701         /*
702          * Check state. A non-peer reject indicates no listener
703          * (ECONNREFUSED), which may be a transient state. All
704          * others indicate a transport condition which has already
705          * undergone a best-effort.
706          */
707         if (ep->rep_connected == -ECONNREFUSED &&
708             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
709                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
710                 goto retry;
711         }
712         if (ep->rep_connected <= 0) {
713                 /* Sometimes, the only way to reliably connect to remote
714                  * CMs is to use same nonzero values for ORD and IRD. */
715                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
716                     (ep->rep_remote_cma.responder_resources == 0 ||
717                      ep->rep_remote_cma.initiator_depth !=
718                                 ep->rep_remote_cma.responder_resources)) {
719                         if (ep->rep_remote_cma.responder_resources == 0)
720                                 ep->rep_remote_cma.responder_resources = 1;
721                         ep->rep_remote_cma.initiator_depth =
722                                 ep->rep_remote_cma.responder_resources;
723                         goto retry;
724                 }
725                 rc = ep->rep_connected;
726         } else {
727                 struct rpcrdma_xprt *r_xprt;
728                 unsigned int extras;
729
730                 dprintk("RPC:       %s: connected\n", __func__);
731
732                 r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
733                 extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
734
735                 if (extras) {
736                         rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
737                         if (rc) {
738                                 pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
739                                         __func__, rc);
740                                 rc = 0;
741                         }
742                 }
743         }
744
745 out:
746         if (rc)
747                 ep->rep_connected = rc;
748         return rc;
749 }
750
751 /*
752  * rpcrdma_ep_disconnect
753  *
754  * This is separate from destroy to facilitate the ability
755  * to reconnect without recreating the endpoint.
756  *
757  * This call is not reentrant, and must not be made in parallel
758  * on the same endpoint.
759  */
760 void
761 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
762 {
763         int rc;
764
765         rc = rdma_disconnect(ia->ri_id);
766         if (!rc) {
767                 /* returns without wait if not connected */
768                 wait_event_interruptible(ep->rep_connect_wait,
769                                                         ep->rep_connected != 1);
770                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
771                         (ep->rep_connected == 1) ? "still " : "dis");
772         } else {
773                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
774                 ep->rep_connected = rc;
775         }
776
777         ib_drain_qp(ia->ri_id->qp);
778 }
779
780 struct rpcrdma_req *
781 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
782 {
783         struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
784         struct rpcrdma_req *req;
785
786         req = kzalloc(sizeof(*req), GFP_KERNEL);
787         if (req == NULL)
788                 return ERR_PTR(-ENOMEM);
789
790         INIT_LIST_HEAD(&req->rl_free);
791         spin_lock(&buffer->rb_reqslock);
792         list_add(&req->rl_all, &buffer->rb_allreqs);
793         spin_unlock(&buffer->rb_reqslock);
794         req->rl_cqe.done = rpcrdma_wc_send;
795         req->rl_buffer = &r_xprt->rx_buf;
796         return req;
797 }
798
799 struct rpcrdma_rep *
800 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
801 {
802         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
803         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
804         struct rpcrdma_rep *rep;
805         int rc;
806
807         rc = -ENOMEM;
808         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
809         if (rep == NULL)
810                 goto out;
811
812         rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
813                                                GFP_KERNEL);
814         if (IS_ERR(rep->rr_rdmabuf)) {
815                 rc = PTR_ERR(rep->rr_rdmabuf);
816                 goto out_free;
817         }
818
819         rep->rr_device = ia->ri_device;
820         rep->rr_cqe.done = rpcrdma_receive_wc;
821         rep->rr_rxprt = r_xprt;
822         INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
823         return rep;
824
825 out_free:
826         kfree(rep);
827 out:
828         return ERR_PTR(rc);
829 }
830
831 int
832 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
833 {
834         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
835         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
836         int i, rc;
837
838         buf->rb_max_requests = r_xprt->rx_data.max_requests;
839         buf->rb_bc_srv_max_requests = 0;
840         spin_lock_init(&buf->rb_lock);
841         atomic_set(&buf->rb_credits, 1);
842
843         rc = ia->ri_ops->ro_init(r_xprt);
844         if (rc)
845                 goto out;
846
847         INIT_LIST_HEAD(&buf->rb_send_bufs);
848         INIT_LIST_HEAD(&buf->rb_allreqs);
849         spin_lock_init(&buf->rb_reqslock);
850         for (i = 0; i < buf->rb_max_requests; i++) {
851                 struct rpcrdma_req *req;
852
853                 req = rpcrdma_create_req(r_xprt);
854                 if (IS_ERR(req)) {
855                         dprintk("RPC:       %s: request buffer %d alloc"
856                                 " failed\n", __func__, i);
857                         rc = PTR_ERR(req);
858                         goto out;
859                 }
860                 req->rl_backchannel = false;
861                 list_add(&req->rl_free, &buf->rb_send_bufs);
862         }
863
864         INIT_LIST_HEAD(&buf->rb_recv_bufs);
865         for (i = 0; i < buf->rb_max_requests + 2; i++) {
866                 struct rpcrdma_rep *rep;
867
868                 rep = rpcrdma_create_rep(r_xprt);
869                 if (IS_ERR(rep)) {
870                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
871                                 __func__, i);
872                         rc = PTR_ERR(rep);
873                         goto out;
874                 }
875                 list_add(&rep->rr_list, &buf->rb_recv_bufs);
876         }
877
878         return 0;
879 out:
880         rpcrdma_buffer_destroy(buf);
881         return rc;
882 }
883
884 static struct rpcrdma_req *
885 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
886 {
887         struct rpcrdma_req *req;
888
889         req = list_first_entry(&buf->rb_send_bufs,
890                                struct rpcrdma_req, rl_free);
891         list_del(&req->rl_free);
892         return req;
893 }
894
895 static struct rpcrdma_rep *
896 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
897 {
898         struct rpcrdma_rep *rep;
899
900         rep = list_first_entry(&buf->rb_recv_bufs,
901                                struct rpcrdma_rep, rr_list);
902         list_del(&rep->rr_list);
903         return rep;
904 }
905
906 static void
907 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
908 {
909         rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
910         kfree(rep);
911 }
912
913 void
914 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
915 {
916         rpcrdma_free_regbuf(ia, req->rl_sendbuf);
917         rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
918         kfree(req);
919 }
920
921 void
922 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
923 {
924         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
925
926         while (!list_empty(&buf->rb_recv_bufs)) {
927                 struct rpcrdma_rep *rep;
928
929                 rep = rpcrdma_buffer_get_rep_locked(buf);
930                 rpcrdma_destroy_rep(ia, rep);
931         }
932
933         spin_lock(&buf->rb_reqslock);
934         while (!list_empty(&buf->rb_allreqs)) {
935                 struct rpcrdma_req *req;
936
937                 req = list_first_entry(&buf->rb_allreqs,
938                                        struct rpcrdma_req, rl_all);
939                 list_del(&req->rl_all);
940
941                 spin_unlock(&buf->rb_reqslock);
942                 rpcrdma_destroy_req(ia, req);
943                 spin_lock(&buf->rb_reqslock);
944         }
945         spin_unlock(&buf->rb_reqslock);
946
947         ia->ri_ops->ro_destroy(buf);
948 }
949
950 struct rpcrdma_mw *
951 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
952 {
953         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
954         struct rpcrdma_mw *mw = NULL;
955
956         spin_lock(&buf->rb_mwlock);
957         if (!list_empty(&buf->rb_mws)) {
958                 mw = list_first_entry(&buf->rb_mws,
959                                       struct rpcrdma_mw, mw_list);
960                 list_del_init(&mw->mw_list);
961         }
962         spin_unlock(&buf->rb_mwlock);
963
964         if (!mw)
965                 pr_err("RPC:       %s: no MWs available\n", __func__);
966         return mw;
967 }
968
969 void
970 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
971 {
972         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
973
974         spin_lock(&buf->rb_mwlock);
975         list_add_tail(&mw->mw_list, &buf->rb_mws);
976         spin_unlock(&buf->rb_mwlock);
977 }
978
979 /*
980  * Get a set of request/reply buffers.
981  *
982  * Reply buffer (if available) is attached to send buffer upon return.
983  */
984 struct rpcrdma_req *
985 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
986 {
987         struct rpcrdma_req *req;
988
989         spin_lock(&buffers->rb_lock);
990         if (list_empty(&buffers->rb_send_bufs))
991                 goto out_reqbuf;
992         req = rpcrdma_buffer_get_req_locked(buffers);
993         if (list_empty(&buffers->rb_recv_bufs))
994                 goto out_repbuf;
995         req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
996         spin_unlock(&buffers->rb_lock);
997         return req;
998
999 out_reqbuf:
1000         spin_unlock(&buffers->rb_lock);
1001         pr_warn("RPC:       %s: out of request buffers\n", __func__);
1002         return NULL;
1003 out_repbuf:
1004         spin_unlock(&buffers->rb_lock);
1005         pr_warn("RPC:       %s: out of reply buffers\n", __func__);
1006         req->rl_reply = NULL;
1007         return req;
1008 }
1009
1010 /*
1011  * Put request/reply buffers back into pool.
1012  * Pre-decrement counter/array index.
1013  */
1014 void
1015 rpcrdma_buffer_put(struct rpcrdma_req *req)
1016 {
1017         struct rpcrdma_buffer *buffers = req->rl_buffer;
1018         struct rpcrdma_rep *rep = req->rl_reply;
1019
1020         req->rl_niovs = 0;
1021         req->rl_reply = NULL;
1022
1023         spin_lock(&buffers->rb_lock);
1024         list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
1025         if (rep)
1026                 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1027         spin_unlock(&buffers->rb_lock);
1028 }
1029
1030 /*
1031  * Recover reply buffers from pool.
1032  * This happens when recovering from disconnect.
1033  */
1034 void
1035 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1036 {
1037         struct rpcrdma_buffer *buffers = req->rl_buffer;
1038
1039         spin_lock(&buffers->rb_lock);
1040         if (!list_empty(&buffers->rb_recv_bufs))
1041                 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1042         spin_unlock(&buffers->rb_lock);
1043 }
1044
1045 /*
1046  * Put reply buffers back into pool when not attached to
1047  * request. This happens in error conditions.
1048  */
1049 void
1050 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1051 {
1052         struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1053
1054         spin_lock(&buffers->rb_lock);
1055         list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1056         spin_unlock(&buffers->rb_lock);
1057 }
1058
1059 /*
1060  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1061  */
1062
1063 void
1064 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1065 {
1066         dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
1067                 seg->mr_offset,
1068                 (unsigned long long)seg->mr_dma, seg->mr_dmalen);
1069 }
1070
1071 /**
1072  * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1073  * @ia: controlling rpcrdma_ia
1074  * @size: size of buffer to be allocated, in bytes
1075  * @flags: GFP flags
1076  *
1077  * Returns pointer to private header of an area of internally
1078  * registered memory, or an ERR_PTR. The registered buffer follows
1079  * the end of the private header.
1080  *
1081  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1082  * receiving the payload of RDMA RECV operations. regbufs are not
1083  * used for RDMA READ/WRITE operations, thus are registered only for
1084  * LOCAL access.
1085  */
1086 struct rpcrdma_regbuf *
1087 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1088 {
1089         struct rpcrdma_regbuf *rb;
1090         struct ib_sge *iov;
1091
1092         rb = kmalloc(sizeof(*rb) + size, flags);
1093         if (rb == NULL)
1094                 goto out;
1095
1096         iov = &rb->rg_iov;
1097         iov->addr = ib_dma_map_single(ia->ri_device,
1098                                       (void *)rb->rg_base, size,
1099                                       DMA_BIDIRECTIONAL);
1100         if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1101                 goto out_free;
1102
1103         iov->length = size;
1104         iov->lkey = ia->ri_pd->local_dma_lkey;
1105         rb->rg_size = size;
1106         rb->rg_owner = NULL;
1107         return rb;
1108
1109 out_free:
1110         kfree(rb);
1111 out:
1112         return ERR_PTR(-ENOMEM);
1113 }
1114
1115 /**
1116  * rpcrdma_free_regbuf - deregister and free registered buffer
1117  * @ia: controlling rpcrdma_ia
1118  * @rb: regbuf to be deregistered and freed
1119  */
1120 void
1121 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1122 {
1123         struct ib_sge *iov;
1124
1125         if (!rb)
1126                 return;
1127
1128         iov = &rb->rg_iov;
1129         ib_dma_unmap_single(ia->ri_device,
1130                             iov->addr, iov->length, DMA_BIDIRECTIONAL);
1131         kfree(rb);
1132 }
1133
1134 /*
1135  * Prepost any receive buffer, then post send.
1136  *
1137  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1138  */
1139 int
1140 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1141                 struct rpcrdma_ep *ep,
1142                 struct rpcrdma_req *req)
1143 {
1144         struct ib_device *device = ia->ri_device;
1145         struct ib_send_wr send_wr, *send_wr_fail;
1146         struct rpcrdma_rep *rep = req->rl_reply;
1147         struct ib_sge *iov = req->rl_send_iov;
1148         int i, rc;
1149
1150         if (rep) {
1151                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1152                 if (rc)
1153                         goto out;
1154                 req->rl_reply = NULL;
1155         }
1156
1157         send_wr.next = NULL;
1158         send_wr.wr_cqe = &req->rl_cqe;
1159         send_wr.sg_list = iov;
1160         send_wr.num_sge = req->rl_niovs;
1161         send_wr.opcode = IB_WR_SEND;
1162
1163         for (i = 0; i < send_wr.num_sge; i++)
1164                 ib_dma_sync_single_for_device(device, iov[i].addr,
1165                                               iov[i].length, DMA_TO_DEVICE);
1166         dprintk("RPC:       %s: posting %d s/g entries\n",
1167                 __func__, send_wr.num_sge);
1168
1169         if (DECR_CQCOUNT(ep) > 0)
1170                 send_wr.send_flags = 0;
1171         else { /* Provider must take a send completion every now and then */
1172                 INIT_CQCOUNT(ep);
1173                 send_wr.send_flags = IB_SEND_SIGNALED;
1174         }
1175
1176         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1177         if (rc)
1178                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1179                         rc);
1180 out:
1181         return rc;
1182 }
1183
1184 /*
1185  * (Re)post a receive buffer.
1186  */
1187 int
1188 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1189                      struct rpcrdma_ep *ep,
1190                      struct rpcrdma_rep *rep)
1191 {
1192         struct ib_recv_wr recv_wr, *recv_wr_fail;
1193         int rc;
1194
1195         recv_wr.next = NULL;
1196         recv_wr.wr_cqe = &rep->rr_cqe;
1197         recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1198         recv_wr.num_sge = 1;
1199
1200         ib_dma_sync_single_for_cpu(ia->ri_device,
1201                                    rdmab_addr(rep->rr_rdmabuf),
1202                                    rdmab_length(rep->rr_rdmabuf),
1203                                    DMA_BIDIRECTIONAL);
1204
1205         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1206
1207         if (rc)
1208                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1209                         rc);
1210         return rc;
1211 }
1212
1213 /**
1214  * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
1215  * @r_xprt: transport associated with these backchannel resources
1216  * @min_reqs: minimum number of incoming requests expected
1217  *
1218  * Returns zero if all requested buffers were posted, or a negative errno.
1219  */
1220 int
1221 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
1222 {
1223         struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
1224         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1225         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1226         struct rpcrdma_rep *rep;
1227         int rc;
1228
1229         while (count--) {
1230                 spin_lock(&buffers->rb_lock);
1231                 if (list_empty(&buffers->rb_recv_bufs))
1232                         goto out_reqbuf;
1233                 rep = rpcrdma_buffer_get_rep_locked(buffers);
1234                 spin_unlock(&buffers->rb_lock);
1235
1236                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1237                 if (rc)
1238                         goto out_rc;
1239         }
1240
1241         return 0;
1242
1243 out_reqbuf:
1244         spin_unlock(&buffers->rb_lock);
1245         pr_warn("%s: no extra receive buffers\n", __func__);
1246         return -ENOMEM;
1247
1248 out_rc:
1249         rpcrdma_recv_buffer_put(rep);
1250         return rc;
1251 }