Merge tag 'upstream-4.9-rc3' of git://git.infradead.org/linux-ubifs
[cascardo/linux.git] / net / rds / send.c
index b1962f8..896626b 100644 (file)
@@ -62,14 +62,14 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status);
  * Reset the send state.  Callers must ensure that this doesn't race with
  * rds_send_xmit().
  */
-void rds_send_reset(struct rds_connection *conn)
+void rds_send_path_reset(struct rds_conn_path *cp)
 {
        struct rds_message *rm, *tmp;
        unsigned long flags;
 
-       if (conn->c_xmit_rm) {
-               rm = conn->c_xmit_rm;
-               conn->c_xmit_rm = NULL;
+       if (cp->cp_xmit_rm) {
+               rm = cp->cp_xmit_rm;
+               cp->cp_xmit_rm = NULL;
                /* Tell the user the RDMA op is no longer mapped by the
                 * transport. This isn't entirely true (it's flushed out
                 * independently) but as the connection is down, there's
@@ -78,37 +78,37 @@ void rds_send_reset(struct rds_connection *conn)
                rds_message_put(rm);
        }
 
-       conn->c_xmit_sg = 0;
-       conn->c_xmit_hdr_off = 0;
-       conn->c_xmit_data_off = 0;
-       conn->c_xmit_atomic_sent = 0;
-       conn->c_xmit_rdma_sent = 0;
-       conn->c_xmit_data_sent = 0;
+       cp->cp_xmit_sg = 0;
+       cp->cp_xmit_hdr_off = 0;
+       cp->cp_xmit_data_off = 0;
+       cp->cp_xmit_atomic_sent = 0;
+       cp->cp_xmit_rdma_sent = 0;
+       cp->cp_xmit_data_sent = 0;
 
-       conn->c_map_queued = 0;
+       cp->cp_conn->c_map_queued = 0;
 
-       conn->c_unacked_packets = rds_sysctl_max_unacked_packets;
-       conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes;
+       cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
+       cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
 
        /* Mark messages as retransmissions, and move them to the send q */
-       spin_lock_irqsave(&conn->c_lock, flags);
-       list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
+       spin_lock_irqsave(&cp->cp_lock, flags);
+       list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
                set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
                set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
        }
-       list_splice_init(&conn->c_retrans, &conn->c_send_queue);
-       spin_unlock_irqrestore(&conn->c_lock, flags);
+       list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
+       spin_unlock_irqrestore(&cp->cp_lock, flags);
 }
-EXPORT_SYMBOL_GPL(rds_send_reset);
+EXPORT_SYMBOL_GPL(rds_send_path_reset);
 
-static int acquire_in_xmit(struct rds_connection *conn)
+static int acquire_in_xmit(struct rds_conn_path *cp)
 {
-       return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
+       return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
 }
 
-static void release_in_xmit(struct rds_connection *conn)
+static void release_in_xmit(struct rds_conn_path *cp)
 {
-       clear_bit(RDS_IN_XMIT, &conn->c_flags);
+       clear_bit(RDS_IN_XMIT, &cp->cp_flags);
        smp_mb__after_atomic();
        /*
         * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
@@ -116,8 +116,8 @@ static void release_in_xmit(struct rds_connection *conn)
         * the system-wide hashed waitqueue buckets in the fast path only to
         * almost never find waiters.
         */
-       if (waitqueue_active(&conn->c_waitq))
-               wake_up_all(&conn->c_waitq);
+       if (waitqueue_active(&cp->cp_waitq))
+               wake_up_all(&cp->cp_waitq);
 }
 
 /*
@@ -134,8 +134,9 @@ static void release_in_xmit(struct rds_connection *conn)
  *      - small message latency is higher behind queued large messages
  *      - large message latency isn't starved by intervening small sends
  */
-int rds_send_xmit(struct rds_connection *conn)
+int rds_send_xmit(struct rds_conn_path *cp)
 {
+       struct rds_connection *conn = cp->cp_conn;
        struct rds_message *rm;
        unsigned long flags;
        unsigned int tmp;
@@ -155,7 +156,7 @@ restart:
         * avoids blocking the caller and trading per-connection data between
         * caches per message.
         */
-       if (!acquire_in_xmit(conn)) {
+       if (!acquire_in_xmit(cp)) {
                rds_stats_inc(s_send_lock_contention);
                ret = -ENOMEM;
                goto out;
@@ -169,21 +170,21 @@ restart:
         * The acquire_in_xmit() check above ensures that only one
         * caller can increment c_send_gen at any time.
         */
-       conn->c_send_gen++;
-       send_gen = conn->c_send_gen;
+       cp->cp_send_gen++;
+       send_gen = cp->cp_send_gen;
 
        /*
         * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
         * we do the opposite to avoid races.
         */
-       if (!rds_conn_up(conn)) {
-               release_in_xmit(conn);
+       if (!rds_conn_path_up(cp)) {
+               release_in_xmit(cp);
                ret = 0;
                goto out;
        }
 
-       if (conn->c_trans->xmit_prepare)
-               conn->c_trans->xmit_prepare(conn);
+       if (conn->c_trans->xmit_path_prepare)
+               conn->c_trans->xmit_path_prepare(cp);
 
        /*
         * spin trying to push headers and data down the connection until
@@ -191,7 +192,7 @@ restart:
         */
        while (1) {
 
-               rm = conn->c_xmit_rm;
+               rm = cp->cp_xmit_rm;
 
                /*
                 * If between sending messages, we can send a pending congestion
@@ -204,14 +205,16 @@ restart:
                                break;
                        }
                        rm->data.op_active = 1;
+                       rm->m_inc.i_conn_path = cp;
+                       rm->m_inc.i_conn = cp->cp_conn;
 
-                       conn->c_xmit_rm = rm;
+                       cp->cp_xmit_rm = rm;
                }
 
                /*
                 * If not already working on one, grab the next message.
                 *
-                * c_xmit_rm holds a ref while we're sending this message down
+                * cp_xmit_rm holds a ref while we're sending this message down
                 * the connction.  We can use this ref while holding the
                 * send_sem.. rds_send_reset() is serialized with it.
                 */
@@ -228,10 +231,10 @@ restart:
                        if (batch_count >= send_batch_count)
                                goto over_batch;
 
-                       spin_lock_irqsave(&conn->c_lock, flags);
+                       spin_lock_irqsave(&cp->cp_lock, flags);
 
-                       if (!list_empty(&conn->c_send_queue)) {
-                               rm = list_entry(conn->c_send_queue.next,
+                       if (!list_empty(&cp->cp_send_queue)) {
+                               rm = list_entry(cp->cp_send_queue.next,
                                                struct rds_message,
                                                m_conn_item);
                                rds_message_addref(rm);
@@ -240,10 +243,11 @@ restart:
                                 * Move the message from the send queue to the retransmit
                                 * list right away.
                                 */
-                               list_move_tail(&rm->m_conn_item, &conn->c_retrans);
+                               list_move_tail(&rm->m_conn_item,
+                                              &cp->cp_retrans);
                        }
 
-                       spin_unlock_irqrestore(&conn->c_lock, flags);
+                       spin_unlock_irqrestore(&cp->cp_lock, flags);
 
                        if (!rm)
                                break;
@@ -257,32 +261,34 @@ restart:
                         */
                        if (rm->rdma.op_active &&
                            test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
-                               spin_lock_irqsave(&conn->c_lock, flags);
+                               spin_lock_irqsave(&cp->cp_lock, flags);
                                if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
                                        list_move(&rm->m_conn_item, &to_be_dropped);
-                               spin_unlock_irqrestore(&conn->c_lock, flags);
+                               spin_unlock_irqrestore(&cp->cp_lock, flags);
                                continue;
                        }
 
                        /* Require an ACK every once in a while */
                        len = ntohl(rm->m_inc.i_hdr.h_len);
-                       if (conn->c_unacked_packets == 0 ||
-                           conn->c_unacked_bytes < len) {
+                       if (cp->cp_unacked_packets == 0 ||
+                           cp->cp_unacked_bytes < len) {
                                __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 
-                               conn->c_unacked_packets = rds_sysctl_max_unacked_packets;
-                               conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes;
+                               cp->cp_unacked_packets =
+                                       rds_sysctl_max_unacked_packets;
+                               cp->cp_unacked_bytes =
+                                       rds_sysctl_max_unacked_bytes;
                                rds_stats_inc(s_send_ack_required);
                        } else {
-                               conn->c_unacked_bytes -= len;
-                               conn->c_unacked_packets--;
+                               cp->cp_unacked_bytes -= len;
+                               cp->cp_unacked_packets--;
                        }
 
-                       conn->c_xmit_rm = rm;
+                       cp->cp_xmit_rm = rm;
                }
 
                /* The transport either sends the whole rdma or none of it */
-               if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
+               if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
                        rm->m_final_op = &rm->rdma;
                        /* The transport owns the mapped memory for now.
                         * You can't unmap it while it's on the send queue
@@ -294,11 +300,11 @@ restart:
                                wake_up_interruptible(&rm->m_flush_wait);
                                break;
                        }
-                       conn->c_xmit_rdma_sent = 1;
+                       cp->cp_xmit_rdma_sent = 1;
 
                }
 
-               if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
+               if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
                        rm->m_final_op = &rm->atomic;
                        /* The transport owns the mapped memory for now.
                         * You can't unmap it while it's on the send queue
@@ -310,7 +316,7 @@ restart:
                                wake_up_interruptible(&rm->m_flush_wait);
                                break;
                        }
-                       conn->c_xmit_atomic_sent = 1;
+                       cp->cp_xmit_atomic_sent = 1;
 
                }
 
@@ -336,41 +342,42 @@ restart:
                                rm->data.op_active = 0;
                }
 
-               if (rm->data.op_active && !conn->c_xmit_data_sent) {
+               if (rm->data.op_active && !cp->cp_xmit_data_sent) {
                        rm->m_final_op = &rm->data;
+
                        ret = conn->c_trans->xmit(conn, rm,
-                                                 conn->c_xmit_hdr_off,
-                                                 conn->c_xmit_sg,
-                                                 conn->c_xmit_data_off);
+                                                 cp->cp_xmit_hdr_off,
+                                                 cp->cp_xmit_sg,
+                                                 cp->cp_xmit_data_off);
                        if (ret <= 0)
                                break;
 
-                       if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) {
+                       if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
                                tmp = min_t(int, ret,
                                            sizeof(struct rds_header) -
-                                           conn->c_xmit_hdr_off);
-                               conn->c_xmit_hdr_off += tmp;
+                                           cp->cp_xmit_hdr_off);
+                               cp->cp_xmit_hdr_off += tmp;
                                ret -= tmp;
                        }
 
-                       sg = &rm->data.op_sg[conn->c_xmit_sg];
+                       sg = &rm->data.op_sg[cp->cp_xmit_sg];
                        while (ret) {
                                tmp = min_t(int, ret, sg->length -
-                                                     conn->c_xmit_data_off);
-                               conn->c_xmit_data_off += tmp;
+                                                     cp->cp_xmit_data_off);
+                               cp->cp_xmit_data_off += tmp;
                                ret -= tmp;
-                               if (conn->c_xmit_data_off == sg->length) {
-                                       conn->c_xmit_data_off = 0;
+                               if (cp->cp_xmit_data_off == sg->length) {
+                                       cp->cp_xmit_data_off = 0;
                                        sg++;
-                                       conn->c_xmit_sg++;
-                                       BUG_ON(ret != 0 &&
-                                              conn->c_xmit_sg == rm->data.op_nents);
+                                       cp->cp_xmit_sg++;
+                                       BUG_ON(ret != 0 && cp->cp_xmit_sg ==
+                                              rm->data.op_nents);
                                }
                        }
 
-                       if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
-                           (conn->c_xmit_sg == rm->data.op_nents))
-                               conn->c_xmit_data_sent = 1;
+                       if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
+                           (cp->cp_xmit_sg == rm->data.op_nents))
+                               cp->cp_xmit_data_sent = 1;
                }
 
                /*
@@ -378,23 +385,23 @@ restart:
                 * if there is a data op. Thus, if the data is sent (or there was
                 * none), then we're done with the rm.
                 */
-               if (!rm->data.op_active || conn->c_xmit_data_sent) {
-                       conn->c_xmit_rm = NULL;
-                       conn->c_xmit_sg = 0;
-                       conn->c_xmit_hdr_off = 0;
-                       conn->c_xmit_data_off = 0;
-                       conn->c_xmit_rdma_sent = 0;
-                       conn->c_xmit_atomic_sent = 0;
-                       conn->c_xmit_data_sent = 0;
+               if (!rm->data.op_active || cp->cp_xmit_data_sent) {
+                       cp->cp_xmit_rm = NULL;
+                       cp->cp_xmit_sg = 0;
+                       cp->cp_xmit_hdr_off = 0;
+                       cp->cp_xmit_data_off = 0;
+                       cp->cp_xmit_rdma_sent = 0;
+                       cp->cp_xmit_atomic_sent = 0;
+                       cp->cp_xmit_data_sent = 0;
 
                        rds_message_put(rm);
                }
        }
 
 over_batch:
-       if (conn->c_trans->xmit_complete)
-               conn->c_trans->xmit_complete(conn);
-       release_in_xmit(conn);
+       if (conn->c_trans->xmit_path_complete)
+               conn->c_trans->xmit_path_complete(cp);
+       release_in_xmit(cp);
 
        /* Nuke any messages we decided not to retransmit. */
        if (!list_empty(&to_be_dropped)) {
@@ -422,12 +429,12 @@ over_batch:
        if (ret == 0) {
                smp_mb();
                if ((test_bit(0, &conn->c_map_queued) ||
-                    !list_empty(&conn->c_send_queue)) &&
-                   send_gen == conn->c_send_gen) {
+                    !list_empty(&cp->cp_send_queue)) &&
+                   send_gen == cp->cp_send_gen) {
                        rds_stats_inc(s_send_lock_queue_raced);
                        if (batch_count < send_batch_count)
                                goto restart;
-                       queue_delayed_work(rds_wq, &conn->c_send_w, 1);
+                       queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
                }
        }
 out:
@@ -559,42 +566,6 @@ __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
        /* No need to wake the app - caller does this */
 }
 
-/*
- * This is called from the IB send completion when we detect
- * a RDMA operation that failed with remote access error.
- * So speed is not an issue here.
- */
-struct rds_message *rds_send_get_message(struct rds_connection *conn,
-                                        struct rm_rdma_op *op)
-{
-       struct rds_message *rm, *tmp, *found = NULL;
-       unsigned long flags;
-
-       spin_lock_irqsave(&conn->c_lock, flags);
-
-       list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
-               if (&rm->rdma == op) {
-                       atomic_inc(&rm->m_refcount);
-                       found = rm;
-                       goto out;
-               }
-       }
-
-       list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) {
-               if (&rm->rdma == op) {
-                       atomic_inc(&rm->m_refcount);
-                       found = rm;
-                       break;
-               }
-       }
-
-out:
-       spin_unlock_irqrestore(&conn->c_lock, flags);
-
-       return found;
-}
-EXPORT_SYMBOL_GPL(rds_send_get_message);
-
 /*
  * This removes messages from the socket's list if they're on it.  The list
  * argument must be private to the caller, we must be able to modify it
@@ -685,16 +656,16 @@ unlock_and_drop:
  * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
  * checks the RDS_MSG_HAS_ACK_SEQ bit.
  */
-void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
-                        is_acked_func is_acked)
+void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
+                             is_acked_func is_acked)
 {
        struct rds_message *rm, *tmp;
        unsigned long flags;
        LIST_HEAD(list);
 
-       spin_lock_irqsave(&conn->c_lock, flags);
+       spin_lock_irqsave(&cp->cp_lock, flags);
 
-       list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
+       list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
                if (!rds_send_is_acked(rm, ack, is_acked))
                        break;
 
@@ -706,17 +677,26 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
        if (!list_empty(&list))
                smp_mb__after_atomic();
 
-       spin_unlock_irqrestore(&conn->c_lock, flags);
+       spin_unlock_irqrestore(&cp->cp_lock, flags);
 
        /* now remove the messages from the sock list as needed */
        rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
 }
+EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
+
+void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
+                        is_acked_func is_acked)
+{
+       WARN_ON(conn->c_trans->t_mp_capable);
+       rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
+}
 EXPORT_SYMBOL_GPL(rds_send_drop_acked);
 
 void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
 {
        struct rds_message *rm, *tmp;
        struct rds_connection *conn;
+       struct rds_conn_path *cp;
        unsigned long flags;
        LIST_HEAD(list);
 
@@ -745,22 +725,26 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
        list_for_each_entry(rm, &list, m_sock_item) {
 
                conn = rm->m_inc.i_conn;
+               if (conn->c_trans->t_mp_capable)
+                       cp = rm->m_inc.i_conn_path;
+               else
+                       cp = &conn->c_path[0];
 
-               spin_lock_irqsave(&conn->c_lock, flags);
+               spin_lock_irqsave(&cp->cp_lock, flags);
                /*
                 * Maybe someone else beat us to removing rm from the conn.
                 * If we race with their flag update we'll get the lock and
                 * then really see that the flag has been cleared.
                 */
                if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
-                       spin_unlock_irqrestore(&conn->c_lock, flags);
+                       spin_unlock_irqrestore(&cp->cp_lock, flags);
                        spin_lock_irqsave(&rm->m_rs_lock, flags);
                        rm->m_rs = NULL;
                        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
                        continue;
                }
                list_del_init(&rm->m_conn_item);
-               spin_unlock_irqrestore(&conn->c_lock, flags);
+               spin_unlock_irqrestore(&cp->cp_lock, flags);
 
                /*
                 * Couldn't grab m_rs_lock in top loop (lock ordering),
@@ -809,6 +793,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
  * message from the flow with RDS_CANCEL_SENT_TO.
  */
 static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
+                            struct rds_conn_path *cp,
                             struct rds_message *rm, __be16 sport,
                             __be16 dport, int *queued)
 {
@@ -852,13 +837,14 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
                   trying to minimize the time we hold c_lock */
                rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
                rm->m_inc.i_conn = conn;
+               rm->m_inc.i_conn_path = cp;
                rds_message_addref(rm);
 
-               spin_lock(&conn->c_lock);
-               rm->m_inc.i_hdr.h_sequence = cpu_to_be64(conn->c_next_tx_seq++);
-               list_add_tail(&rm->m_conn_item, &conn->c_send_queue);
+               spin_lock(&cp->cp_lock);
+               rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
+               list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
                set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
-               spin_unlock(&conn->c_lock);
+               spin_unlock(&cp->cp_lock);
 
                rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
                         rm, len, rs, rs->rs_snd_bytes,
@@ -977,6 +963,29 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
        return ret;
 }
 
+static void rds_send_ping(struct rds_connection *conn);
+
+static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn)
+{
+       int hash;
+
+       if (conn->c_npaths == 0)
+               hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
+       else
+               hash = RDS_MPATH_HASH(rs, conn->c_npaths);
+       if (conn->c_npaths == 0 && hash != 0) {
+               rds_send_ping(conn);
+
+               if (conn->c_npaths == 0) {
+                       wait_event_interruptible(conn->c_hs_waitq,
+                                                (conn->c_npaths != 0));
+               }
+               if (conn->c_npaths == 1)
+                       hash = 0;
+       }
+       return hash;
+}
+
 int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 {
        struct sock *sk = sock->sk;
@@ -990,6 +999,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
        int queued = 0, allocated_mr = 0;
        int nonblock = msg->msg_flags & MSG_DONTWAIT;
        long timeo = sock_sndtimeo(sk, nonblock);
+       struct rds_conn_path *cpath;
 
        /* Mirror Linux UDP mirror of BSD error message compatibility */
        /* XXX: Perhaps MSG_MORE someday */
@@ -1088,15 +1098,19 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
                goto out;
        }
 
-       rds_conn_connect_if_down(conn);
+       if (conn->c_trans->t_mp_capable)
+               cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)];
+       else
+               cpath = &conn->c_path[0];
+
+       rds_conn_path_connect_if_down(cpath);
 
        ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
        if (ret) {
                rs->rs_seen_congestion = 1;
                goto out;
        }
-
-       while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
+       while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
                                  dport, &queued)) {
                rds_stats_inc(s_send_queue_full);
 
@@ -1106,7 +1120,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
                }
 
                timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
-                                       rds_send_queue_rm(rs, conn, rm,
+                                       rds_send_queue_rm(rs, conn, cpath, rm,
                                                          rs->rs_bound_port,
                                                          dport,
                                                          &queued),
@@ -1127,9 +1141,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
         */
        rds_stats_inc(s_send_queued);
 
-       ret = rds_send_xmit(conn);
+       ret = rds_send_xmit(cpath);
        if (ret == -ENOMEM || ret == -EAGAIN)
-               queue_delayed_work(rds_wq, &conn->c_send_w, 1);
+               queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
 
        rds_message_put(rm);
        return payload_len;
@@ -1147,10 +1161,16 @@ out:
 }
 
 /*
- * Reply to a ping packet.
+ * send out a probe. Can be shared by rds_send_ping,
+ * rds_send_pong, rds_send_hb.
+ * rds_send_hb should use h_flags
+ *   RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED
+ * or
+ *   RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
  */
 int
-rds_send_pong(struct rds_connection *conn, __be16 dport)
+rds_send_probe(struct rds_conn_path *cp, __be16 sport,
+              __be16 dport, u8 h_flags)
 {
        struct rds_message *rm;
        unsigned long flags;
@@ -1162,31 +1182,41 @@ rds_send_pong(struct rds_connection *conn, __be16 dport)
                goto out;
        }
 
-       rm->m_daddr = conn->c_faddr;
+       rm->m_daddr = cp->cp_conn->c_faddr;
        rm->data.op_active = 1;
 
-       rds_conn_connect_if_down(conn);
+       rds_conn_path_connect_if_down(cp);
 
-       ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL);
+       ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
        if (ret)
                goto out;
 
-       spin_lock_irqsave(&conn->c_lock, flags);
-       list_add_tail(&rm->m_conn_item, &conn->c_send_queue);
+       spin_lock_irqsave(&cp->cp_lock, flags);
+       list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
        set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
        rds_message_addref(rm);
-       rm->m_inc.i_conn = conn;
+       rm->m_inc.i_conn = cp->cp_conn;
+       rm->m_inc.i_conn_path = cp;
+
+       rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
+                                   cp->cp_next_tx_seq);
+       rm->m_inc.i_hdr.h_flags |= h_flags;
+       cp->cp_next_tx_seq++;
+
+       if (RDS_HS_PROBE(sport, dport) && cp->cp_conn->c_trans->t_mp_capable) {
+               u16 npaths = RDS_MPATH_WORKERS;
 
-       rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport,
-                                   conn->c_next_tx_seq);
-       conn->c_next_tx_seq++;
-       spin_unlock_irqrestore(&conn->c_lock, flags);
+               rds_message_add_extension(&rm->m_inc.i_hdr,
+                                         RDS_EXTHDR_NPATHS, &npaths,
+                                         sizeof(npaths));
+       }
+       spin_unlock_irqrestore(&cp->cp_lock, flags);
 
        rds_stats_inc(s_send_queued);
        rds_stats_inc(s_send_pong);
 
        /* schedule the send work on rds_wq */
-       queue_delayed_work(rds_wq, &conn->c_send_w, 1);
+       queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
 
        rds_message_put(rm);
        return 0;
@@ -1196,3 +1226,25 @@ out:
                rds_message_put(rm);
        return ret;
 }
+
+int
+rds_send_pong(struct rds_conn_path *cp, __be16 dport)
+{
+       return rds_send_probe(cp, 0, dport, 0);
+}
+
+void
+rds_send_ping(struct rds_connection *conn)
+{
+       unsigned long flags;
+       struct rds_conn_path *cp = &conn->c_path[0];
+
+       spin_lock_irqsave(&cp->cp_lock, flags);
+       if (conn->c_ping_triggered) {
+               spin_unlock_irqrestore(&cp->cp_lock, flags);
+               return;
+       }
+       conn->c_ping_triggered = 1;
+       spin_unlock_irqrestore(&cp->cp_lock, flags);
+       rds_send_probe(&conn->c_path[0], RDS_FLAG_PROBE_PORT, 0, 0);
+}