Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net
[cascardo/linux.git] / net / rxrpc / recvmsg.c
1 /* RxRPC recvmsg() implementation
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
13
14 #include <linux/net.h>
15 #include <linux/skbuff.h>
16 #include <linux/export.h>
17 #include <net/sock.h>
18 #include <net/af_rxrpc.h>
19 #include "ar-internal.h"
20
21 /*
22  * Post a call for attention by the socket or kernel service.  Further
23  * notifications are suppressed by putting recvmsg_link on a dummy queue.
24  */
25 void rxrpc_notify_socket(struct rxrpc_call *call)
26 {
27         struct rxrpc_sock *rx;
28         struct sock *sk;
29
30         _enter("%d", call->debug_id);
31
32         if (!list_empty(&call->recvmsg_link))
33                 return;
34
35         rcu_read_lock();
36
37         rx = rcu_dereference(call->socket);
38         sk = &rx->sk;
39         if (rx && sk->sk_state < RXRPC_CLOSE) {
40                 if (call->notify_rx) {
41                         call->notify_rx(sk, call, call->user_call_ID);
42                 } else {
43                         write_lock_bh(&rx->recvmsg_lock);
44                         if (list_empty(&call->recvmsg_link)) {
45                                 rxrpc_get_call(call, rxrpc_call_got);
46                                 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
47                         }
48                         write_unlock_bh(&rx->recvmsg_lock);
49
50                         if (!sock_flag(sk, SOCK_DEAD)) {
51                                 _debug("call %ps", sk->sk_data_ready);
52                                 sk->sk_data_ready(sk);
53                         }
54                 }
55         }
56
57         rcu_read_unlock();
58         _leave("");
59 }
60
61 /*
62  * Pass a call terminating message to userspace.
63  */
64 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
65 {
66         u32 tmp = 0;
67         int ret;
68
69         switch (call->completion) {
70         case RXRPC_CALL_SUCCEEDED:
71                 ret = 0;
72                 if (rxrpc_is_service_call(call))
73                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
74                 break;
75         case RXRPC_CALL_REMOTELY_ABORTED:
76                 tmp = call->abort_code;
77                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
78                 break;
79         case RXRPC_CALL_LOCALLY_ABORTED:
80                 tmp = call->abort_code;
81                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
82                 break;
83         case RXRPC_CALL_NETWORK_ERROR:
84                 tmp = call->error;
85                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
86                 break;
87         case RXRPC_CALL_LOCAL_ERROR:
88                 tmp = call->error;
89                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
90                 break;
91         default:
92                 pr_err("Invalid terminal call state %u\n", call->state);
93                 BUG();
94                 break;
95         }
96
97         return ret;
98 }
99
100 /*
101  * Pass back notification of a new call.  The call is added to the
102  * to-be-accepted list.  This means that the next call to be accepted might not
103  * be the last call seen awaiting acceptance, but unless we leave this on the
104  * front of the queue and block all other messages until someone gives us a
105  * user_ID for it, there's not a lot we can do.
106  */
107 static int rxrpc_recvmsg_new_call(struct rxrpc_sock *rx,
108                                   struct rxrpc_call *call,
109                                   struct msghdr *msg, int flags)
110 {
111         int tmp = 0, ret;
112
113         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &tmp);
114
115         if (ret == 0 && !(flags & MSG_PEEK)) {
116                 _debug("to be accepted");
117                 write_lock_bh(&rx->recvmsg_lock);
118                 list_del_init(&call->recvmsg_link);
119                 write_unlock_bh(&rx->recvmsg_lock);
120
121                 write_lock(&rx->call_lock);
122                 list_add_tail(&call->accept_link, &rx->to_be_accepted);
123                 write_unlock(&rx->call_lock);
124         }
125
126         return ret;
127 }
128
129 /*
130  * End the packet reception phase.
131  */
132 static void rxrpc_end_rx_phase(struct rxrpc_call *call)
133 {
134         _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
135
136         if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
137                 rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, 0, true, false);
138                 rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ACK);
139         } else {
140                 rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, 0, false, false);
141         }
142
143         write_lock_bh(&call->state_lock);
144
145         switch (call->state) {
146         case RXRPC_CALL_CLIENT_RECV_REPLY:
147                 __rxrpc_call_completed(call);
148                 break;
149
150         case RXRPC_CALL_SERVER_RECV_REQUEST:
151                 call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
152                 break;
153         default:
154                 break;
155         }
156
157         write_unlock_bh(&call->state_lock);
158 }
159
160 /*
161  * Discard a packet we've used up and advance the Rx window by one.
162  */
163 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
164 {
165         struct sk_buff *skb;
166         rxrpc_seq_t hard_ack, top;
167         int ix;
168
169         _enter("%d", call->debug_id);
170
171         hard_ack = call->rx_hard_ack;
172         top = smp_load_acquire(&call->rx_top);
173         ASSERT(before(hard_ack, top));
174
175         hard_ack++;
176         ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
177         skb = call->rxtx_buffer[ix];
178         rxrpc_see_skb(skb);
179         call->rxtx_buffer[ix] = NULL;
180         call->rxtx_annotations[ix] = 0;
181         /* Barrier against rxrpc_input_data(). */
182         smp_store_release(&call->rx_hard_ack, hard_ack);
183
184         rxrpc_free_skb(skb);
185
186         _debug("%u,%u,%lx", hard_ack, top, call->flags);
187         if (hard_ack == top && test_bit(RXRPC_CALL_RX_LAST, &call->flags))
188                 rxrpc_end_rx_phase(call);
189 }
190
191 /*
192  * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
193  * padding, but if this is the case, the packet length will be resident in the
194  * socket buffer.  Note that we can't modify the master skb info as the skb may
195  * be the home to multiple subpackets.
196  */
197 static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
198                                u8 annotation,
199                                unsigned int offset, unsigned int len)
200 {
201         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
202         rxrpc_seq_t seq = sp->hdr.seq;
203         u16 cksum = sp->hdr.cksum;
204
205         _enter("");
206
207         /* For all but the head jumbo subpacket, the security checksum is in a
208          * jumbo header immediately prior to the data.
209          */
210         if ((annotation & RXRPC_RX_ANNO_JUMBO) > 1) {
211                 __be16 tmp;
212                 if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
213                         BUG();
214                 cksum = ntohs(tmp);
215                 seq += (annotation & RXRPC_RX_ANNO_JUMBO) - 1;
216         }
217
218         return call->conn->security->verify_packet(call, skb, offset, len,
219                                                    seq, cksum);
220 }
221
222 /*
223  * Locate the data within a packet.  This is complicated by:
224  *
225  * (1) An skb may contain a jumbo packet - so we have to find the appropriate
226  *     subpacket.
227  *
228  * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
229  *     contains an extra header which includes the true length of the data,
230  *     excluding any encrypted padding.
231  */
232 static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
233                              u8 *_annotation,
234                              unsigned int *_offset, unsigned int *_len)
235 {
236         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
237         unsigned int offset = *_offset;
238         unsigned int len = *_len;
239         int ret;
240         u8 annotation = *_annotation;
241
242         if (offset > 0)
243                 return 0;
244
245         /* Locate the subpacket */
246         offset = sp->offset;
247         len = skb->len - sp->offset;
248         if ((annotation & RXRPC_RX_ANNO_JUMBO) > 0) {
249                 offset += (((annotation & RXRPC_RX_ANNO_JUMBO) - 1) *
250                            RXRPC_JUMBO_SUBPKTLEN);
251                 len = (annotation & RXRPC_RX_ANNO_JLAST) ?
252                         skb->len - offset : RXRPC_JUMBO_SUBPKTLEN;
253         }
254
255         if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
256                 ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
257                 if (ret < 0)
258                         return ret;
259                 *_annotation |= RXRPC_RX_ANNO_VERIFIED;
260         }
261
262         *_offset = offset;
263         *_len = len;
264         call->conn->security->locate_data(call, skb, _offset, _len);
265         return 0;
266 }
267
268 /*
269  * Deliver messages to a call.  This keeps processing packets until the buffer
270  * is filled and we find either more DATA (returns 0) or the end of the DATA
271  * (returns 1).  If more packets are required, it returns -EAGAIN.
272  */
273 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
274                               struct msghdr *msg, struct iov_iter *iter,
275                               size_t len, int flags, size_t *_offset)
276 {
277         struct rxrpc_skb_priv *sp;
278         struct sk_buff *skb;
279         rxrpc_seq_t hard_ack, top, seq;
280         size_t remain;
281         bool last;
282         unsigned int rx_pkt_offset, rx_pkt_len;
283         int ix, copy, ret = 0;
284
285         _enter("");
286
287         rx_pkt_offset = call->rx_pkt_offset;
288         rx_pkt_len = call->rx_pkt_len;
289
290         /* Barriers against rxrpc_input_data(). */
291         hard_ack = call->rx_hard_ack;
292         top = smp_load_acquire(&call->rx_top);
293         for (seq = hard_ack + 1; before_eq(seq, top); seq++) {
294                 ix = seq & RXRPC_RXTX_BUFF_MASK;
295                 skb = call->rxtx_buffer[ix];
296                 if (!skb)
297                         break;
298                 smp_rmb();
299                 rxrpc_see_skb(skb);
300                 sp = rxrpc_skb(skb);
301
302                 if (msg)
303                         sock_recv_timestamp(msg, sock->sk, skb);
304
305                 ret = rxrpc_locate_data(call, skb, &call->rxtx_annotations[ix],
306                                         &rx_pkt_offset, &rx_pkt_len);
307                 _debug("recvmsg %x DATA #%u { %d, %d }",
308                        sp->hdr.callNumber, seq, rx_pkt_offset, rx_pkt_len);
309
310                 /* We have to handle short, empty and used-up DATA packets. */
311                 remain = len - *_offset;
312                 copy = rx_pkt_len;
313                 if (copy > remain)
314                         copy = remain;
315                 if (copy > 0) {
316                         ret = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
317                                                      copy);
318                         if (ret < 0)
319                                 goto out;
320
321                         /* handle piecemeal consumption of data packets */
322                         _debug("copied %d @%zu", copy, *_offset);
323
324                         rx_pkt_offset += copy;
325                         rx_pkt_len -= copy;
326                         *_offset += copy;
327                 }
328
329                 if (rx_pkt_len > 0) {
330                         _debug("buffer full");
331                         ASSERTCMP(*_offset, ==, len);
332                         break;
333                 }
334
335                 /* The whole packet has been transferred. */
336                 last = sp->hdr.flags & RXRPC_LAST_PACKET;
337                 if (!(flags & MSG_PEEK))
338                         rxrpc_rotate_rx_window(call);
339                 rx_pkt_offset = 0;
340                 rx_pkt_len = 0;
341
342                 ASSERTIFCMP(last, seq, ==, top);
343         }
344
345         if (after(seq, top)) {
346                 ret = -EAGAIN;
347                 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags))
348                         ret = 1;
349         }
350 out:
351         if (!(flags & MSG_PEEK)) {
352                 call->rx_pkt_offset = rx_pkt_offset;
353                 call->rx_pkt_len = rx_pkt_len;
354         }
355         _leave(" = %d [%u/%u]", ret, seq, top);
356         return ret;
357 }
358
359 /*
360  * Receive a message from an RxRPC socket
361  * - we need to be careful about two or more threads calling recvmsg
362  *   simultaneously
363  */
364 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
365                   int flags)
366 {
367         struct rxrpc_call *call;
368         struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
369         struct list_head *l;
370         size_t copied = 0;
371         long timeo;
372         int ret;
373
374         DEFINE_WAIT(wait);
375
376         _enter(",,,%zu,%d", len, flags);
377
378         if (flags & (MSG_OOB | MSG_TRUNC))
379                 return -EOPNOTSUPP;
380
381         timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
382
383 try_again:
384         lock_sock(&rx->sk);
385
386         /* Return immediately if a client socket has no outstanding calls */
387         if (RB_EMPTY_ROOT(&rx->calls) &&
388             list_empty(&rx->recvmsg_q) &&
389             rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
390                 release_sock(&rx->sk);
391                 return -ENODATA;
392         }
393
394         if (list_empty(&rx->recvmsg_q)) {
395                 ret = -EWOULDBLOCK;
396                 if (timeo == 0)
397                         goto error_no_call;
398
399                 release_sock(&rx->sk);
400
401                 /* Wait for something to happen */
402                 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
403                                           TASK_INTERRUPTIBLE);
404                 ret = sock_error(&rx->sk);
405                 if (ret)
406                         goto wait_error;
407
408                 if (list_empty(&rx->recvmsg_q)) {
409                         if (signal_pending(current))
410                                 goto wait_interrupted;
411                         timeo = schedule_timeout(timeo);
412                 }
413                 finish_wait(sk_sleep(&rx->sk), &wait);
414                 goto try_again;
415         }
416
417         /* Find the next call and dequeue it if we're not just peeking.  If we
418          * do dequeue it, that comes with a ref that we will need to release.
419          */
420         write_lock_bh(&rx->recvmsg_lock);
421         l = rx->recvmsg_q.next;
422         call = list_entry(l, struct rxrpc_call, recvmsg_link);
423         if (!(flags & MSG_PEEK))
424                 list_del_init(&call->recvmsg_link);
425         else
426                 rxrpc_get_call(call, rxrpc_call_got);
427         write_unlock_bh(&rx->recvmsg_lock);
428
429         _debug("recvmsg call %p", call);
430
431         if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
432                 BUG();
433
434         if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
435                 if (flags & MSG_CMSG_COMPAT) {
436                         unsigned int id32 = call->user_call_ID;
437
438                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
439                                        sizeof(unsigned int), &id32);
440                 } else {
441                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
442                                        sizeof(unsigned long),
443                                        &call->user_call_ID);
444                 }
445                 if (ret < 0)
446                         goto error;
447         }
448
449         if (msg->msg_name) {
450                 size_t len = sizeof(call->conn->params.peer->srx);
451                 memcpy(msg->msg_name, &call->conn->params.peer->srx, len);
452                 msg->msg_namelen = len;
453         }
454
455         switch (call->state) {
456         case RXRPC_CALL_SERVER_ACCEPTING:
457                 ret = rxrpc_recvmsg_new_call(rx, call, msg, flags);
458                 break;
459         case RXRPC_CALL_CLIENT_RECV_REPLY:
460         case RXRPC_CALL_SERVER_RECV_REQUEST:
461         case RXRPC_CALL_SERVER_ACK_REQUEST:
462                 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
463                                          flags, &copied);
464                 if (ret == -EAGAIN)
465                         ret = 0;
466                 break;
467         default:
468                 ret = 0;
469                 break;
470         }
471
472         if (ret < 0)
473                 goto error;
474
475         if (call->state == RXRPC_CALL_COMPLETE) {
476                 ret = rxrpc_recvmsg_term(call, msg);
477                 if (ret < 0)
478                         goto error;
479                 if (!(flags & MSG_PEEK))
480                         rxrpc_release_call(rx, call);
481                 msg->msg_flags |= MSG_EOR;
482                 ret = 1;
483         }
484
485         if (ret == 0)
486                 msg->msg_flags |= MSG_MORE;
487         else
488                 msg->msg_flags &= ~MSG_MORE;
489         ret = copied;
490
491 error:
492         rxrpc_put_call(call, rxrpc_call_put);
493 error_no_call:
494         release_sock(&rx->sk);
495         _leave(" = %d", ret);
496         return ret;
497
498 wait_interrupted:
499         ret = sock_intr_errno(timeo);
500 wait_error:
501         finish_wait(sk_sleep(&rx->sk), &wait);
502         release_sock(&rx->sk);
503         _leave(" = %d [wait]", ret);
504         return ret;
505 }
506
507 /**
508  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
509  * @sock: The socket that the call exists on
510  * @call: The call to send data through
511  * @buf: The buffer to receive into
512  * @size: The size of the buffer, including data already read
513  * @_offset: The running offset into the buffer.
514  * @want_more: True if more data is expected to be read
515  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
516  *
517  * Allow a kernel service to receive data and pick up information about the
518  * state of a call.  Returns 0 if got what was asked for and there's more
519  * available, 1 if we got what was asked for and we're at the end of the data
520  * and -EAGAIN if we need more data.
521  *
522  * Note that we may return -EAGAIN to drain empty packets at the end of the
523  * data, even if we've already copied over the requested data.
524  *
525  * This function adds the amount it transfers to *_offset, so this should be
526  * precleared as appropriate.  Note that the amount remaining in the buffer is
527  * taken to be size - *_offset.
528  *
529  * *_abort should also be initialised to 0.
530  */
531 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
532                            void *buf, size_t size, size_t *_offset,
533                            bool want_more, u32 *_abort)
534 {
535         struct iov_iter iter;
536         struct kvec iov;
537         int ret;
538
539         _enter("{%d,%s},%zu/%zu,%d",
540                call->debug_id, rxrpc_call_states[call->state],
541                *_offset, size, want_more);
542
543         ASSERTCMP(*_offset, <=, size);
544         ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
545
546         iov.iov_base = buf + *_offset;
547         iov.iov_len = size - *_offset;
548         iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
549
550         lock_sock(sock->sk);
551
552         switch (call->state) {
553         case RXRPC_CALL_CLIENT_RECV_REPLY:
554         case RXRPC_CALL_SERVER_RECV_REQUEST:
555         case RXRPC_CALL_SERVER_ACK_REQUEST:
556                 ret = rxrpc_recvmsg_data(sock, call, NULL, &iter, size, 0,
557                                          _offset);
558                 if (ret < 0)
559                         goto out;
560
561                 /* We can only reach here with a partially full buffer if we
562                  * have reached the end of the data.  We must otherwise have a
563                  * full buffer or have been given -EAGAIN.
564                  */
565                 if (ret == 1) {
566                         if (*_offset < size)
567                                 goto short_data;
568                         if (!want_more)
569                                 goto read_phase_complete;
570                         ret = 0;
571                         goto out;
572                 }
573
574                 if (!want_more)
575                         goto excess_data;
576                 goto out;
577
578         case RXRPC_CALL_COMPLETE:
579                 goto call_complete;
580
581         default:
582                 ret = -EINPROGRESS;
583                 goto out;
584         }
585
586 read_phase_complete:
587         ret = 1;
588 out:
589         release_sock(sock->sk);
590         _leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
591         return ret;
592
593 short_data:
594         ret = -EBADMSG;
595         goto out;
596 excess_data:
597         ret = -EMSGSIZE;
598         goto out;
599 call_complete:
600         *_abort = call->abort_code;
601         ret = call->error;
602         if (call->completion == RXRPC_CALL_SUCCEEDED) {
603                 ret = 1;
604                 if (size > 0)
605                         ret = -ECONNRESET;
606         }
607         goto out;
608 }
609 EXPORT_SYMBOL(rxrpc_kernel_recv_data);