rxrpc: Improve the call tracking tracepoint
[cascardo/linux.git] / net / rxrpc / recvmsg.c
1 /* RxRPC recvmsg() implementation
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
13
14 #include <linux/net.h>
15 #include <linux/skbuff.h>
16 #include <linux/export.h>
17 #include <net/sock.h>
18 #include <net/af_rxrpc.h>
19 #include "ar-internal.h"
20
21 /*
22  * removal a call's user ID from the socket tree to make the user ID available
23  * again and so that it won't be seen again in association with that call
24  */
25 void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
26 {
27         _debug("RELEASE CALL %d", call->debug_id);
28
29         if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
30                 write_lock_bh(&rx->call_lock);
31                 rb_erase(&call->sock_node, &call->socket->calls);
32                 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
33                 write_unlock_bh(&rx->call_lock);
34         }
35
36         read_lock_bh(&call->state_lock);
37         if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
38             !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
39                 rxrpc_queue_call(call);
40         read_unlock_bh(&call->state_lock);
41 }
42
43 /*
44  * receive a message from an RxRPC socket
45  * - we need to be careful about two or more threads calling recvmsg
46  *   simultaneously
47  */
48 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
49                   int flags)
50 {
51         struct rxrpc_skb_priv *sp;
52         struct rxrpc_call *call = NULL, *continue_call = NULL;
53         struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
54         struct sk_buff *skb;
55         long timeo;
56         int copy, ret, ullen, offset, copied = 0;
57         u32 abort_code;
58
59         DEFINE_WAIT(wait);
60
61         _enter(",,,%zu,%d", len, flags);
62
63         if (flags & (MSG_OOB | MSG_TRUNC))
64                 return -EOPNOTSUPP;
65
66         ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long);
67
68         timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
69         msg->msg_flags |= MSG_MORE;
70
71         lock_sock(&rx->sk);
72
73         for (;;) {
74                 /* return immediately if a client socket has no outstanding
75                  * calls */
76                 if (RB_EMPTY_ROOT(&rx->calls)) {
77                         if (copied)
78                                 goto out;
79                         if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
80                                 release_sock(&rx->sk);
81                                 if (continue_call)
82                                         rxrpc_put_call(continue_call,
83                                                        rxrpc_call_put);
84                                 return -ENODATA;
85                         }
86                 }
87
88                 /* get the next message on the Rx queue */
89                 skb = skb_peek(&rx->sk.sk_receive_queue);
90                 if (!skb) {
91                         /* nothing remains on the queue */
92                         if (copied &&
93                             (flags & MSG_PEEK || timeo == 0))
94                                 goto out;
95
96                         /* wait for a message to turn up */
97                         release_sock(&rx->sk);
98                         prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
99                                                   TASK_INTERRUPTIBLE);
100                         ret = sock_error(&rx->sk);
101                         if (ret)
102                                 goto wait_error;
103
104                         if (skb_queue_empty(&rx->sk.sk_receive_queue)) {
105                                 if (signal_pending(current))
106                                         goto wait_interrupted;
107                                 timeo = schedule_timeout(timeo);
108                         }
109                         finish_wait(sk_sleep(&rx->sk), &wait);
110                         lock_sock(&rx->sk);
111                         continue;
112                 }
113
114         peek_next_packet:
115                 rxrpc_see_skb(skb);
116                 sp = rxrpc_skb(skb);
117                 call = sp->call;
118                 ASSERT(call != NULL);
119                 rxrpc_see_call(call);
120
121                 _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
122
123                 /* make sure we wait for the state to be updated in this call */
124                 spin_lock_bh(&call->lock);
125                 spin_unlock_bh(&call->lock);
126
127                 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
128                         _debug("packet from released call");
129                         if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
130                                 BUG();
131                         rxrpc_free_skb(skb);
132                         continue;
133                 }
134
135                 /* determine whether to continue last data receive */
136                 if (continue_call) {
137                         _debug("maybe cont");
138                         if (call != continue_call ||
139                             skb->mark != RXRPC_SKB_MARK_DATA) {
140                                 release_sock(&rx->sk);
141                                 rxrpc_put_call(continue_call, rxrpc_call_put);
142                                 _leave(" = %d [noncont]", copied);
143                                 return copied;
144                         }
145                 }
146
147                 rxrpc_get_call(call, rxrpc_call_got);
148
149                 /* copy the peer address and timestamp */
150                 if (!continue_call) {
151                         if (msg->msg_name) {
152                                 size_t len =
153                                         sizeof(call->conn->params.peer->srx);
154                                 memcpy(msg->msg_name,
155                                        &call->conn->params.peer->srx, len);
156                                 msg->msg_namelen = len;
157                         }
158                         sock_recv_timestamp(msg, &rx->sk, skb);
159                 }
160
161                 /* receive the message */
162                 if (skb->mark != RXRPC_SKB_MARK_DATA)
163                         goto receive_non_data_message;
164
165                 _debug("recvmsg DATA #%u { %d, %d }",
166                        sp->hdr.seq, skb->len, sp->offset);
167
168                 if (!continue_call) {
169                         /* only set the control data once per recvmsg() */
170                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
171                                        ullen, &call->user_call_ID);
172                         if (ret < 0)
173                                 goto copy_error;
174                         ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
175                 }
176
177                 ASSERTCMP(sp->hdr.seq, >=, call->rx_data_recv);
178                 ASSERTCMP(sp->hdr.seq, <=, call->rx_data_recv + 1);
179                 call->rx_data_recv = sp->hdr.seq;
180
181                 ASSERTCMP(sp->hdr.seq, >, call->rx_data_eaten);
182
183                 offset = sp->offset;
184                 copy = skb->len - offset;
185                 if (copy > len - copied)
186                         copy = len - copied;
187
188                 ret = skb_copy_datagram_msg(skb, offset, msg, copy);
189
190                 if (ret < 0)
191                         goto copy_error;
192
193                 /* handle piecemeal consumption of data packets */
194                 _debug("copied %d+%d", copy, copied);
195
196                 offset += copy;
197                 copied += copy;
198
199                 if (!(flags & MSG_PEEK))
200                         sp->offset = offset;
201
202                 if (sp->offset < skb->len) {
203                         _debug("buffer full");
204                         ASSERTCMP(copied, ==, len);
205                         break;
206                 }
207
208                 /* we transferred the whole data packet */
209                 if (!(flags & MSG_PEEK))
210                         rxrpc_kernel_data_consumed(call, skb);
211
212                 if (sp->hdr.flags & RXRPC_LAST_PACKET) {
213                         _debug("last");
214                         if (rxrpc_conn_is_client(call->conn)) {
215                                  /* last byte of reply received */
216                                 ret = copied;
217                                 goto terminal_message;
218                         }
219
220                         /* last bit of request received */
221                         if (!(flags & MSG_PEEK)) {
222                                 _debug("eat packet");
223                                 if (skb_dequeue(&rx->sk.sk_receive_queue) !=
224                                     skb)
225                                         BUG();
226                                 rxrpc_free_skb(skb);
227                         }
228                         msg->msg_flags &= ~MSG_MORE;
229                         break;
230                 }
231
232                 /* move on to the next data message */
233                 _debug("next");
234                 if (!continue_call)
235                         continue_call = sp->call;
236                 else
237                         rxrpc_put_call(call, rxrpc_call_put);
238                 call = NULL;
239
240                 if (flags & MSG_PEEK) {
241                         _debug("peek next");
242                         skb = skb->next;
243                         if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue)
244                                 break;
245                         goto peek_next_packet;
246                 }
247
248                 _debug("eat packet");
249                 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
250                         BUG();
251                 rxrpc_free_skb(skb);
252         }
253
254         /* end of non-terminal data packet reception for the moment */
255         _debug("end rcv data");
256 out:
257         release_sock(&rx->sk);
258         if (call)
259                 rxrpc_put_call(call, rxrpc_call_put);
260         if (continue_call)
261                 rxrpc_put_call(continue_call, rxrpc_call_put);
262         _leave(" = %d [data]", copied);
263         return copied;
264
265         /* handle non-DATA messages such as aborts, incoming connections and
266          * final ACKs */
267 receive_non_data_message:
268         _debug("non-data");
269
270         if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) {
271                 _debug("RECV NEW CALL");
272                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code);
273                 if (ret < 0)
274                         goto copy_error;
275                 if (!(flags & MSG_PEEK)) {
276                         if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
277                                 BUG();
278                         rxrpc_free_skb(skb);
279                 }
280                 goto out;
281         }
282
283         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
284                        ullen, &call->user_call_ID);
285         if (ret < 0)
286                 goto copy_error;
287         ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
288
289         switch (skb->mark) {
290         case RXRPC_SKB_MARK_DATA:
291                 BUG();
292         case RXRPC_SKB_MARK_FINAL_ACK:
293                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code);
294                 break;
295         case RXRPC_SKB_MARK_BUSY:
296                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
297                 break;
298         case RXRPC_SKB_MARK_REMOTE_ABORT:
299                 abort_code = call->abort_code;
300                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
301                 break;
302         case RXRPC_SKB_MARK_LOCAL_ABORT:
303                 abort_code = call->abort_code;
304                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
305                 if (call->error) {
306                         abort_code = call->error;
307                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
308                                        &abort_code);
309                 }
310                 break;
311         case RXRPC_SKB_MARK_NET_ERROR:
312                 _debug("RECV NET ERROR %d", sp->error);
313                 abort_code = sp->error;
314                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code);
315                 break;
316         case RXRPC_SKB_MARK_LOCAL_ERROR:
317                 _debug("RECV LOCAL ERROR %d", sp->error);
318                 abort_code = sp->error;
319                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
320                                &abort_code);
321                 break;
322         default:
323                 pr_err("Unknown packet mark %u\n", skb->mark);
324                 BUG();
325                 break;
326         }
327
328         if (ret < 0)
329                 goto copy_error;
330
331 terminal_message:
332         _debug("terminal");
333         msg->msg_flags &= ~MSG_MORE;
334         msg->msg_flags |= MSG_EOR;
335
336         if (!(flags & MSG_PEEK)) {
337                 _net("free terminal skb %p", skb);
338                 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
339                         BUG();
340                 rxrpc_free_skb(skb);
341                 rxrpc_remove_user_ID(rx, call);
342         }
343
344         release_sock(&rx->sk);
345         rxrpc_put_call(call, rxrpc_call_put);
346         if (continue_call)
347                 rxrpc_put_call(continue_call, rxrpc_call_put);
348         _leave(" = %d", ret);
349         return ret;
350
351 copy_error:
352         _debug("copy error");
353         release_sock(&rx->sk);
354         rxrpc_put_call(call, rxrpc_call_put);
355         if (continue_call)
356                 rxrpc_put_call(continue_call, rxrpc_call_put);
357         _leave(" = %d", ret);
358         return ret;
359
360 wait_interrupted:
361         ret = sock_intr_errno(timeo);
362 wait_error:
363         finish_wait(sk_sleep(&rx->sk), &wait);
364         if (continue_call)
365                 rxrpc_put_call(continue_call, rxrpc_call_put);
366         if (copied)
367                 copied = ret;
368         _leave(" = %d [waitfail %d]", copied, ret);
369         return copied;
370
371 }
372
373 /*
374  * Deliver messages to a call.  This keeps processing packets until the buffer
375  * is filled and we find either more DATA (returns 0) or the end of the DATA
376  * (returns 1).  If more packets are required, it returns -EAGAIN.
377  *
378  * TODO: Note that this is hacked in at the moment and will be replaced.
379  */
380 static int temp_deliver_data(struct socket *sock, struct rxrpc_call *call,
381                              struct iov_iter *iter, size_t size,
382                              size_t *_offset)
383 {
384         struct rxrpc_skb_priv *sp;
385         struct sk_buff *skb;
386         size_t remain;
387         int ret, copy;
388
389         _enter("%d", call->debug_id);
390
391 next:
392         local_bh_disable();
393         skb = skb_dequeue(&call->knlrecv_queue);
394         local_bh_enable();
395         if (!skb) {
396                 if (test_bit(RXRPC_CALL_RX_NO_MORE, &call->flags))
397                         return 1;
398                 _leave(" = -EAGAIN [empty]");
399                 return -EAGAIN;
400         }
401
402         sp = rxrpc_skb(skb);
403         _debug("dequeued %p %u/%zu", skb, sp->offset, size);
404
405         switch (skb->mark) {
406         case RXRPC_SKB_MARK_DATA:
407                 remain = size - *_offset;
408                 if (remain > 0) {
409                         copy = skb->len - sp->offset;
410                         if (copy > remain)
411                                 copy = remain;
412                         ret = skb_copy_datagram_iter(skb, sp->offset, iter,
413                                                      copy);
414                         if (ret < 0)
415                                 goto requeue_and_leave;
416
417                         /* handle piecemeal consumption of data packets */
418                         sp->offset += copy;
419                         *_offset += copy;
420                 }
421
422                 if (sp->offset < skb->len)
423                         goto partially_used_skb;
424
425                 /* We consumed the whole packet */
426                 ASSERTCMP(sp->offset, ==, skb->len);
427                 if (sp->hdr.flags & RXRPC_LAST_PACKET)
428                         set_bit(RXRPC_CALL_RX_NO_MORE, &call->flags);
429                 rxrpc_kernel_data_consumed(call, skb);
430                 rxrpc_free_skb(skb);
431                 goto next;
432
433         default:
434                 rxrpc_free_skb(skb);
435                 goto next;
436         }
437
438 partially_used_skb:
439         ASSERTCMP(*_offset, ==, size);
440         ret = 0;
441 requeue_and_leave:
442         skb_queue_head(&call->knlrecv_queue, skb);
443         return ret;
444 }
445
446 /**
447  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
448  * @sock: The socket that the call exists on
449  * @call: The call to send data through
450  * @buf: The buffer to receive into
451  * @size: The size of the buffer, including data already read
452  * @_offset: The running offset into the buffer.
453  * @want_more: True if more data is expected to be read
454  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
455  *
456  * Allow a kernel service to receive data and pick up information about the
457  * state of a call.  Returns 0 if got what was asked for and there's more
458  * available, 1 if we got what was asked for and we're at the end of the data
459  * and -EAGAIN if we need more data.
460  *
461  * Note that we may return -EAGAIN to drain empty packets at the end of the
462  * data, even if we've already copied over the requested data.
463  *
464  * This function adds the amount it transfers to *_offset, so this should be
465  * precleared as appropriate.  Note that the amount remaining in the buffer is
466  * taken to be size - *_offset.
467  *
468  * *_abort should also be initialised to 0.
469  */
470 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
471                            void *buf, size_t size, size_t *_offset,
472                            bool want_more, u32 *_abort)
473 {
474         struct iov_iter iter;
475         struct kvec iov;
476         int ret;
477
478         _enter("{%d,%s},%zu,%d",
479                call->debug_id, rxrpc_call_states[call->state], size, want_more);
480
481         ASSERTCMP(*_offset, <=, size);
482         ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
483
484         iov.iov_base = buf + *_offset;
485         iov.iov_len = size - *_offset;
486         iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
487
488         lock_sock(sock->sk);
489
490         switch (call->state) {
491         case RXRPC_CALL_CLIENT_RECV_REPLY:
492         case RXRPC_CALL_SERVER_RECV_REQUEST:
493         case RXRPC_CALL_SERVER_ACK_REQUEST:
494                 ret = temp_deliver_data(sock, call, &iter, size, _offset);
495                 if (ret < 0)
496                         goto out;
497
498                 /* We can only reach here with a partially full buffer if we
499                  * have reached the end of the data.  We must otherwise have a
500                  * full buffer or have been given -EAGAIN.
501                  */
502                 if (ret == 1) {
503                         if (*_offset < size)
504                                 goto short_data;
505                         if (!want_more)
506                                 goto read_phase_complete;
507                         ret = 0;
508                         goto out;
509                 }
510
511                 if (!want_more)
512                         goto excess_data;
513                 goto out;
514
515         case RXRPC_CALL_COMPLETE:
516                 goto call_complete;
517
518         default:
519                 *_offset = 0;
520                 ret = -EINPROGRESS;
521                 goto out;
522         }
523
524 read_phase_complete:
525         ret = 1;
526 out:
527         release_sock(sock->sk);
528         _leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
529         return ret;
530
531 short_data:
532         ret = -EBADMSG;
533         goto out;
534 excess_data:
535         ret = -EMSGSIZE;
536         goto out;
537 call_complete:
538         *_abort = call->abort_code;
539         ret = call->error;
540         if (call->completion == RXRPC_CALL_SUCCEEDED) {
541                 ret = 1;
542                 if (size > 0)
543                         ret = -ECONNRESET;
544         }
545         goto out;
546 }
547 EXPORT_SYMBOL(rxrpc_kernel_recv_data);