When the function tipc_link_xmit() is given a buffer list for
transmission, it currently consumes the list both when transmission
is successful and when it fails, except for the special case when
it encounters link congestion.
This behavior is inconsistent, and needs to be corrected if we want
to avoid problems in later commits in this series.
In this commit, we change this to let the function consume the list
only when transmission is successful, and leave the list with the
sender in all other cases. We also modifiy the socket code so that
it adapts to this change, i.e., purges the list when a non-congestion
error code is returned.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
/* Prepare clone of message for local node */
skb = tipc_msg_reassemble(list);
/* Prepare clone of message for local node */
skb = tipc_msg_reassemble(list);
- if (unlikely(!skb)) {
- __skb_queue_purge(list);
/* Broadcast to all nodes */
if (likely(bclink)) {
tipc_bclink_lock(net);
/* Broadcast to all nodes */
if (likely(bclink)) {
tipc_bclink_lock(net);
* @link: congested link
* @list: message that was attempted sent
* Create pseudo msg to send back to user when congestion abates
* @link: congested link
* @list: message that was attempted sent
* Create pseudo msg to send back to user when congestion abates
- * Only consumes message if there is an error
+ * Does not consume buffer list
*/
static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
{
*/
static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
{
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
tipc_link_reset(link);
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
tipc_link_reset(link);
}
/* Non-blocking sender: */
if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
}
/* Non-blocking sender: */
if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
addr, addr, oport, 0, 0);
if (!skb)
skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
addr, addr, oport, 0, 0);
if (!skb)
TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
TIPC_SKB_CB(skb)->chain_imp = imp;
skb_queue_tail(&link->wakeupq, skb);
link->stats.link_congs++;
return -ELINKCONG;
TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
TIPC_SKB_CB(skb)->chain_imp = imp;
skb_queue_tail(&link->wakeupq, skb);
link->stats.link_congs++;
return -ELINKCONG;
-err:
- __skb_queue_purge(list);
- return -ENOBUFS;
* @link: link to use
* @list: chain of buffers containing message
*
* @link: link to use
* @list: chain of buffers containing message
*
- * Consumes the buffer chain, except when returning -ELINKCONG,
- * since the caller then may want to make more send attempts.
+ * Consumes the buffer chain, except when returning an error code,
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
*/
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
*/
if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
return link_schedule_user(link, list);
}
if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
return link_schedule_user(link, list);
}
- if (unlikely(msg_size(msg) > mtu)) {
- __skb_queue_purge(list);
+ if (unlikely(msg_size(msg) > mtu))
/* Prepare each packet for sending, and add to relevant queue: */
while (skb_queue_len(list)) {
skb = skb_peek(list);
/* Prepare each packet for sending, and add to relevant queue: */
while (skb_queue_len(list)) {
skb = skb_peek(list);
/* tipc_link_xmit_skb(): send single buffer to destination
* Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
/* tipc_link_xmit_skb(): send single buffer to destination
* Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
- * messages, which will not be rejected
+ * messages, which will not cause link congestion
* The only exception is datagram messages rerouted after secondary
* lookup, which are rare and safe to dispose of anyway.
* TODO: Return real return value, and let callers use
* The only exception is datagram messages rerouted after secondary
* lookup, which are rare and safe to dispose of anyway.
* TODO: Return real return value, and let callers use
skb2list(skb, &head);
rc = tipc_link_xmit(net, &head, dnode, selector);
skb2list(skb, &head);
rc = tipc_link_xmit(net, &head, dnode, selector);
kfree_skb(skb);
return 0;
}
kfree_skb(skb);
return 0;
}
* @dsz: amount of user data to be sent
* @dnode: address of destination node
* @selector: a number used for deterministic link selection
* @dsz: amount of user data to be sent
* @dnode: address of destination node
* @selector: a number used for deterministic link selection
- * Consumes the buffer chain, except when returning -ELINKCONG
+ * Consumes the buffer chain, except when returning error
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
do {
rc = tipc_bclink_xmit(net, pktchain);
do {
rc = tipc_bclink_xmit(net, pktchain);
- if (likely(rc >= 0)) {
- rc = dsz;
- break;
+ if (likely(!rc))
+ return dsz;
+
+ if (rc == -ELINKCONG) {
+ tsk->link_cong = 1;
+ rc = tipc_wait_for_sndmsg(sock, &timeo);
+ if (!rc)
+ continue;
+ __skb_queue_purge(pktchain);
if (rc == -EMSGSIZE) {
msg->msg_iter = save;
goto new_mtu;
}
if (rc == -EMSGSIZE) {
msg->msg_iter = save;
goto new_mtu;
}
- if (rc != -ELINKCONG)
- break;
- tipc_sk(sk)->link_cong = 1;
- rc = tipc_wait_for_sndmsg(sock, &timeo);
- if (rc)
- __skb_queue_purge(pktchain);
- } while (!rc);
skb = skb_peek(pktchain);
TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
skb = skb_peek(pktchain);
TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
if (sock->state != SS_READY)
sock->state = SS_CONNECTING;
if (sock->state != SS_READY)
sock->state = SS_CONNECTING;
+ if (rc == -ELINKCONG) {
+ tsk->link_cong = 1;
+ rc = tipc_wait_for_sndmsg(sock, &timeo);
+ if (!rc)
+ continue;
+ }
+ __skb_queue_purge(pktchain);
if (rc == -EMSGSIZE) {
m->msg_iter = save;
goto new_mtu;
}
if (rc == -EMSGSIZE) {
m->msg_iter = save;
goto new_mtu;
}
- if (rc != -ELINKCONG)
- break;
- tsk->link_cong = 1;
- rc = tipc_wait_for_sndmsg(sock, &timeo);
- if (rc)
- __skb_queue_purge(pktchain);
- } while (!rc);
tsk->sent_unacked++;
sent += send;
if (sent == dsz)
tsk->sent_unacked++;
sent += send;
if (sent == dsz)
goto next;
}
if (rc == -EMSGSIZE) {
goto next;
}
if (rc == -EMSGSIZE) {
+ __skb_queue_purge(pktchain);
tsk->max_pkt = tipc_node_get_mtu(net, dnode,
portid);
m->msg_iter = save;
tsk->max_pkt = tipc_node_get_mtu(net, dnode,
portid);
m->msg_iter = save;
}
if (rc != -ELINKCONG)
break;
}
if (rc != -ELINKCONG)
break;
tsk->link_cong = 1;
}
rc = tipc_wait_for_sndpkt(sock, &timeo);
tsk->link_cong = 1;
}
rc = tipc_wait_for_sndpkt(sock, &timeo);
- if (rc)
- __skb_queue_purge(pktchain);
+ __skb_queue_purge(pktchain);
return sent ? sent : rc;
}
return sent ? sent : rc;
}