ACPI / EC: Work around method reentrancy limit in ACPICA for _Qxx
[cascardo/linux.git] / drivers / staging / lustre / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/socklnd/socklnd.c
37  *
38  * Author: Zach Brown <zab@zabbo.net>
39  * Author: Peter J. Braam <braam@clusterfs.com>
40  * Author: Phil Schwan <phil@clusterfs.com>
41  * Author: Eric Barton <eric@bartonsoftware.com>
42  */
43
44 #include "socklnd.h"
45
46 static lnd_t the_ksocklnd;
47 ksock_nal_data_t ksocknal_data;
48
49 static ksock_interface_t *
50 ksocknal_ip2iface(lnet_ni_t *ni, __u32 ip)
51 {
52         ksock_net_t *net = ni->ni_data;
53         int i;
54         ksock_interface_t *iface;
55
56         for (i = 0; i < net->ksnn_ninterfaces; i++) {
57                 LASSERT(i < LNET_MAX_INTERFACES);
58                 iface = &net->ksnn_interfaces[i];
59
60                 if (iface->ksni_ipaddr == ip)
61                         return iface;
62         }
63
64         return NULL;
65 }
66
67 static ksock_route_t *
68 ksocknal_create_route(__u32 ipaddr, int port)
69 {
70         ksock_route_t *route;
71
72         LIBCFS_ALLOC(route, sizeof(*route));
73         if (!route)
74                 return NULL;
75
76         atomic_set(&route->ksnr_refcount, 1);
77         route->ksnr_peer = NULL;
78         route->ksnr_retry_interval = 0;  /* OK to connect at any time */
79         route->ksnr_ipaddr = ipaddr;
80         route->ksnr_port = port;
81         route->ksnr_scheduled = 0;
82         route->ksnr_connecting = 0;
83         route->ksnr_connected = 0;
84         route->ksnr_deleted = 0;
85         route->ksnr_conn_count = 0;
86         route->ksnr_share_count = 0;
87
88         return route;
89 }
90
91 void
92 ksocknal_destroy_route(ksock_route_t *route)
93 {
94         LASSERT(!atomic_read(&route->ksnr_refcount));
95
96         if (route->ksnr_peer)
97                 ksocknal_peer_decref(route->ksnr_peer);
98
99         LIBCFS_FREE(route, sizeof(*route));
100 }
101
102 static int
103 ksocknal_create_peer(ksock_peer_t **peerp, lnet_ni_t *ni, lnet_process_id_t id)
104 {
105         int cpt = lnet_cpt_of_nid(id.nid);
106         ksock_net_t *net = ni->ni_data;
107         ksock_peer_t *peer;
108
109         LASSERT(id.nid != LNET_NID_ANY);
110         LASSERT(id.pid != LNET_PID_ANY);
111         LASSERT(!in_interrupt());
112
113         LIBCFS_CPT_ALLOC(peer, lnet_cpt_table(), cpt, sizeof(*peer));
114         if (!peer)
115                 return -ENOMEM;
116
117         peer->ksnp_ni = ni;
118         peer->ksnp_id = id;
119         atomic_set(&peer->ksnp_refcount, 1);   /* 1 ref for caller */
120         peer->ksnp_closing = 0;
121         peer->ksnp_accepting = 0;
122         peer->ksnp_proto = NULL;
123         peer->ksnp_last_alive = 0;
124         peer->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
125
126         INIT_LIST_HEAD(&peer->ksnp_conns);
127         INIT_LIST_HEAD(&peer->ksnp_routes);
128         INIT_LIST_HEAD(&peer->ksnp_tx_queue);
129         INIT_LIST_HEAD(&peer->ksnp_zc_req_list);
130         spin_lock_init(&peer->ksnp_lock);
131
132         spin_lock_bh(&net->ksnn_lock);
133
134         if (net->ksnn_shutdown) {
135                 spin_unlock_bh(&net->ksnn_lock);
136
137                 LIBCFS_FREE(peer, sizeof(*peer));
138                 CERROR("Can't create peer: network shutdown\n");
139                 return -ESHUTDOWN;
140         }
141
142         net->ksnn_npeers++;
143
144         spin_unlock_bh(&net->ksnn_lock);
145
146         *peerp = peer;
147         return 0;
148 }
149
150 void
151 ksocknal_destroy_peer(ksock_peer_t *peer)
152 {
153         ksock_net_t *net = peer->ksnp_ni->ni_data;
154
155         CDEBUG(D_NET, "peer %s %p deleted\n",
156                libcfs_id2str(peer->ksnp_id), peer);
157
158         LASSERT(!atomic_read(&peer->ksnp_refcount));
159         LASSERT(!peer->ksnp_accepting);
160         LASSERT(list_empty(&peer->ksnp_conns));
161         LASSERT(list_empty(&peer->ksnp_routes));
162         LASSERT(list_empty(&peer->ksnp_tx_queue));
163         LASSERT(list_empty(&peer->ksnp_zc_req_list));
164
165         LIBCFS_FREE(peer, sizeof(*peer));
166
167         /*
168          * NB a peer's connections and routes keep a reference on their peer
169          * until they are destroyed, so we can be assured that _all_ state to
170          * do with this peer has been cleaned up when its refcount drops to
171          * zero.
172          */
173         spin_lock_bh(&net->ksnn_lock);
174         net->ksnn_npeers--;
175         spin_unlock_bh(&net->ksnn_lock);
176 }
177
178 ksock_peer_t *
179 ksocknal_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
180 {
181         struct list_head *peer_list = ksocknal_nid2peerlist(id.nid);
182         struct list_head *tmp;
183         ksock_peer_t *peer;
184
185         list_for_each(tmp, peer_list) {
186                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
187
188                 LASSERT(!peer->ksnp_closing);
189
190                 if (peer->ksnp_ni != ni)
191                         continue;
192
193                 if (peer->ksnp_id.nid != id.nid ||
194                     peer->ksnp_id.pid != id.pid)
195                         continue;
196
197                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
198                        peer, libcfs_id2str(id),
199                        atomic_read(&peer->ksnp_refcount));
200                 return peer;
201         }
202         return NULL;
203 }
204
205 ksock_peer_t *
206 ksocknal_find_peer(lnet_ni_t *ni, lnet_process_id_t id)
207 {
208         ksock_peer_t *peer;
209
210         read_lock(&ksocknal_data.ksnd_global_lock);
211         peer = ksocknal_find_peer_locked(ni, id);
212         if (peer)                       /* +1 ref for caller? */
213                 ksocknal_peer_addref(peer);
214         read_unlock(&ksocknal_data.ksnd_global_lock);
215
216         return peer;
217 }
218
219 static void
220 ksocknal_unlink_peer_locked(ksock_peer_t *peer)
221 {
222         int i;
223         __u32 ip;
224         ksock_interface_t *iface;
225
226         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
227                 LASSERT(i < LNET_MAX_INTERFACES);
228                 ip = peer->ksnp_passive_ips[i];
229
230                 iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
231                 /*
232                  * All IPs in peer->ksnp_passive_ips[] come from the
233                  * interface list, therefore the call must succeed.
234                  */
235                 LASSERT(iface);
236
237                 CDEBUG(D_NET, "peer=%p iface=%p ksni_nroutes=%d\n",
238                        peer, iface, iface->ksni_nroutes);
239                 iface->ksni_npeers--;
240         }
241
242         LASSERT(list_empty(&peer->ksnp_conns));
243         LASSERT(list_empty(&peer->ksnp_routes));
244         LASSERT(!peer->ksnp_closing);
245         peer->ksnp_closing = 1;
246         list_del(&peer->ksnp_list);
247         /* lose peerlist's ref */
248         ksocknal_peer_decref(peer);
249 }
250
251 static int
252 ksocknal_get_peer_info(lnet_ni_t *ni, int index,
253                        lnet_process_id_t *id, __u32 *myip, __u32 *peer_ip,
254                        int *port, int *conn_count, int *share_count)
255 {
256         ksock_peer_t *peer;
257         struct list_head *ptmp;
258         ksock_route_t *route;
259         struct list_head *rtmp;
260         int i;
261         int j;
262         int rc = -ENOENT;
263
264         read_lock(&ksocknal_data.ksnd_global_lock);
265
266         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
267                 list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
268                         peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
269
270                         if (peer->ksnp_ni != ni)
271                                 continue;
272
273                         if (!peer->ksnp_n_passive_ips &&
274                             list_empty(&peer->ksnp_routes)) {
275                                 if (index-- > 0)
276                                         continue;
277
278                                 *id = peer->ksnp_id;
279                                 *myip = 0;
280                                 *peer_ip = 0;
281                                 *port = 0;
282                                 *conn_count = 0;
283                                 *share_count = 0;
284                                 rc = 0;
285                                 goto out;
286                         }
287
288                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
289                                 if (index-- > 0)
290                                         continue;
291
292                                 *id = peer->ksnp_id;
293                                 *myip = peer->ksnp_passive_ips[j];
294                                 *peer_ip = 0;
295                                 *port = 0;
296                                 *conn_count = 0;
297                                 *share_count = 0;
298                                 rc = 0;
299                                 goto out;
300                         }
301
302                         list_for_each(rtmp, &peer->ksnp_routes) {
303                                 if (index-- > 0)
304                                         continue;
305
306                                 route = list_entry(rtmp, ksock_route_t,
307                                                    ksnr_list);
308
309                                 *id = peer->ksnp_id;
310                                 *myip = route->ksnr_myipaddr;
311                                 *peer_ip = route->ksnr_ipaddr;
312                                 *port = route->ksnr_port;
313                                 *conn_count = route->ksnr_conn_count;
314                                 *share_count = route->ksnr_share_count;
315                                 rc = 0;
316                                 goto out;
317                         }
318                 }
319         }
320  out:
321         read_unlock(&ksocknal_data.ksnd_global_lock);
322         return rc;
323 }
324
325 static void
326 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
327 {
328         ksock_peer_t *peer = route->ksnr_peer;
329         int type = conn->ksnc_type;
330         ksock_interface_t *iface;
331
332         conn->ksnc_route = route;
333         ksocknal_route_addref(route);
334
335         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
336                 if (!route->ksnr_myipaddr) {
337                         /* route wasn't bound locally yet (the initial route) */
338                         CDEBUG(D_NET, "Binding %s %pI4h to %pI4h\n",
339                                libcfs_id2str(peer->ksnp_id),
340                                &route->ksnr_ipaddr,
341                                &conn->ksnc_myipaddr);
342                 } else {
343                         CDEBUG(D_NET, "Rebinding %s %pI4h from %pI4h to %pI4h\n",
344                                libcfs_id2str(peer->ksnp_id),
345                                &route->ksnr_ipaddr,
346                                &route->ksnr_myipaddr,
347                                &conn->ksnc_myipaddr);
348
349                         iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
350                                                   route->ksnr_myipaddr);
351                         if (iface)
352                                 iface->ksni_nroutes--;
353                 }
354                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
355                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
356                                           route->ksnr_myipaddr);
357                 if (iface)
358                         iface->ksni_nroutes++;
359         }
360
361         route->ksnr_connected |= (1 << type);
362         route->ksnr_conn_count++;
363
364         /*
365          * Successful connection => further attempts can
366          * proceed immediately
367          */
368         route->ksnr_retry_interval = 0;
369 }
370
371 static void
372 ksocknal_add_route_locked(ksock_peer_t *peer, ksock_route_t *route)
373 {
374         struct list_head *tmp;
375         ksock_conn_t *conn;
376         ksock_route_t *route2;
377
378         LASSERT(!peer->ksnp_closing);
379         LASSERT(!route->ksnr_peer);
380         LASSERT(!route->ksnr_scheduled);
381         LASSERT(!route->ksnr_connecting);
382         LASSERT(!route->ksnr_connected);
383
384         /* LASSERT(unique) */
385         list_for_each(tmp, &peer->ksnp_routes) {
386                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
387
388                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
389                         CERROR("Duplicate route %s %pI4h\n",
390                                libcfs_id2str(peer->ksnp_id),
391                                &route->ksnr_ipaddr);
392                         LBUG();
393                 }
394         }
395
396         route->ksnr_peer = peer;
397         ksocknal_peer_addref(peer);
398         /* peer's routelist takes over my ref on 'route' */
399         list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
400
401         list_for_each(tmp, &peer->ksnp_conns) {
402                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
403
404                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
405                         continue;
406
407                 ksocknal_associate_route_conn_locked(route, conn);
408                 /* keep going (typed routes) */
409         }
410 }
411
412 static void
413 ksocknal_del_route_locked(ksock_route_t *route)
414 {
415         ksock_peer_t *peer = route->ksnr_peer;
416         ksock_interface_t *iface;
417         ksock_conn_t *conn;
418         struct list_head *ctmp;
419         struct list_head *cnxt;
420
421         LASSERT(!route->ksnr_deleted);
422
423         /* Close associated conns */
424         list_for_each_safe(ctmp, cnxt, &peer->ksnp_conns) {
425                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
426
427                 if (conn->ksnc_route != route)
428                         continue;
429
430                 ksocknal_close_conn_locked(conn, 0);
431         }
432
433         if (route->ksnr_myipaddr) {
434                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
435                                           route->ksnr_myipaddr);
436                 if (iface)
437                         iface->ksni_nroutes--;
438         }
439
440         route->ksnr_deleted = 1;
441         list_del(&route->ksnr_list);
442         ksocknal_route_decref(route);        /* drop peer's ref */
443
444         if (list_empty(&peer->ksnp_routes) &&
445             list_empty(&peer->ksnp_conns)) {
446                 /*
447                  * I've just removed the last route to a peer with no active
448                  * connections
449                  */
450                 ksocknal_unlink_peer_locked(peer);
451         }
452 }
453
454 int
455 ksocknal_add_peer(lnet_ni_t *ni, lnet_process_id_t id, __u32 ipaddr, int port)
456 {
457         struct list_head *tmp;
458         ksock_peer_t *peer;
459         ksock_peer_t *peer2;
460         ksock_route_t *route;
461         ksock_route_t *route2;
462         int rc;
463
464         if (id.nid == LNET_NID_ANY ||
465             id.pid == LNET_PID_ANY)
466                 return -EINVAL;
467
468         /* Have a brand new peer ready... */
469         rc = ksocknal_create_peer(&peer, ni, id);
470         if (rc)
471                 return rc;
472
473         route = ksocknal_create_route(ipaddr, port);
474         if (!route) {
475                 ksocknal_peer_decref(peer);
476                 return -ENOMEM;
477         }
478
479         write_lock_bh(&ksocknal_data.ksnd_global_lock);
480
481         /* always called with a ref on ni, so shutdown can't have started */
482         LASSERT(!((ksock_net_t *) ni->ni_data)->ksnn_shutdown);
483
484         peer2 = ksocknal_find_peer_locked(ni, id);
485         if (peer2) {
486                 ksocknal_peer_decref(peer);
487                 peer = peer2;
488         } else {
489                 /* peer table takes my ref on peer */
490                 list_add_tail(&peer->ksnp_list,
491                               ksocknal_nid2peerlist(id.nid));
492         }
493
494         route2 = NULL;
495         list_for_each(tmp, &peer->ksnp_routes) {
496                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
497
498                 if (route2->ksnr_ipaddr == ipaddr)
499                         break;
500
501                 route2 = NULL;
502         }
503         if (!route2) {
504                 ksocknal_add_route_locked(peer, route);
505                 route->ksnr_share_count++;
506         } else {
507                 ksocknal_route_decref(route);
508                 route2->ksnr_share_count++;
509         }
510
511         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
512
513         return 0;
514 }
515
516 static void
517 ksocknal_del_peer_locked(ksock_peer_t *peer, __u32 ip)
518 {
519         ksock_conn_t *conn;
520         ksock_route_t *route;
521         struct list_head *tmp;
522         struct list_head *nxt;
523         int nshared;
524
525         LASSERT(!peer->ksnp_closing);
526
527         /* Extra ref prevents peer disappearing until I'm done with it */
528         ksocknal_peer_addref(peer);
529
530         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
531                 route = list_entry(tmp, ksock_route_t, ksnr_list);
532
533                 /* no match */
534                 if (!(!ip || route->ksnr_ipaddr == ip))
535                         continue;
536
537                 route->ksnr_share_count = 0;
538                 /* This deletes associated conns too */
539                 ksocknal_del_route_locked(route);
540         }
541
542         nshared = 0;
543         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
544                 route = list_entry(tmp, ksock_route_t, ksnr_list);
545                 nshared += route->ksnr_share_count;
546         }
547
548         if (!nshared) {
549                 /*
550                  * remove everything else if there are no explicit entries
551                  * left
552                  */
553                 list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
554                         route = list_entry(tmp, ksock_route_t, ksnr_list);
555
556                         /* we should only be removing auto-entries */
557                         LASSERT(!route->ksnr_share_count);
558                         ksocknal_del_route_locked(route);
559                 }
560
561                 list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
562                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
563
564                         ksocknal_close_conn_locked(conn, 0);
565                 }
566         }
567
568         ksocknal_peer_decref(peer);
569         /* NB peer unlinks itself when last conn/route is removed */
570 }
571
572 static int
573 ksocknal_del_peer(lnet_ni_t *ni, lnet_process_id_t id, __u32 ip)
574 {
575         LIST_HEAD(zombies);
576         struct list_head *ptmp;
577         struct list_head *pnxt;
578         ksock_peer_t *peer;
579         int lo;
580         int hi;
581         int i;
582         int rc = -ENOENT;
583
584         write_lock_bh(&ksocknal_data.ksnd_global_lock);
585
586         if (id.nid != LNET_NID_ANY) {
587                 lo = (int)(ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers);
588                 hi = (int)(ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers);
589         } else {
590                 lo = 0;
591                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
592         }
593
594         for (i = lo; i <= hi; i++) {
595                 list_for_each_safe(ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
596                         peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
597
598                         if (peer->ksnp_ni != ni)
599                                 continue;
600
601                         if (!((id.nid == LNET_NID_ANY || peer->ksnp_id.nid == id.nid) &&
602                               (id.pid == LNET_PID_ANY || peer->ksnp_id.pid == id.pid)))
603                                 continue;
604
605                         ksocknal_peer_addref(peer);     /* a ref for me... */
606
607                         ksocknal_del_peer_locked(peer, ip);
608
609                         if (peer->ksnp_closing &&
610                             !list_empty(&peer->ksnp_tx_queue)) {
611                                 LASSERT(list_empty(&peer->ksnp_conns));
612                                 LASSERT(list_empty(&peer->ksnp_routes));
613
614                                 list_splice_init(&peer->ksnp_tx_queue,
615                                                  &zombies);
616                         }
617
618                         ksocknal_peer_decref(peer);     /* ...till here */
619
620                         rc = 0;          /* matched! */
621                 }
622         }
623
624         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
625
626         ksocknal_txlist_done(ni, &zombies, 1);
627
628         return rc;
629 }
630
631 static ksock_conn_t *
632 ksocknal_get_conn_by_idx(lnet_ni_t *ni, int index)
633 {
634         ksock_peer_t *peer;
635         struct list_head *ptmp;
636         ksock_conn_t *conn;
637         struct list_head *ctmp;
638         int i;
639
640         read_lock(&ksocknal_data.ksnd_global_lock);
641
642         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
643                 list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
644                         peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
645
646                         LASSERT(!peer->ksnp_closing);
647
648                         if (peer->ksnp_ni != ni)
649                                 continue;
650
651                         list_for_each(ctmp, &peer->ksnp_conns) {
652                                 if (index-- > 0)
653                                         continue;
654
655                                 conn = list_entry(ctmp, ksock_conn_t,
656                                                   ksnc_list);
657                                 ksocknal_conn_addref(conn);
658                                 read_unlock(&ksocknal_data.ksnd_global_lock);
659                                 return conn;
660                         }
661                 }
662         }
663
664         read_unlock(&ksocknal_data.ksnd_global_lock);
665         return NULL;
666 }
667
668 static ksock_sched_t *
669 ksocknal_choose_scheduler_locked(unsigned int cpt)
670 {
671         struct ksock_sched_info *info = ksocknal_data.ksnd_sched_info[cpt];
672         ksock_sched_t *sched;
673         int i;
674
675         LASSERT(info->ksi_nthreads > 0);
676
677         sched = &info->ksi_scheds[0];
678         /*
679          * NB: it's safe so far, but info->ksi_nthreads could be changed
680          * at runtime when we have dynamic LNet configuration, then we
681          * need to take care of this.
682          */
683         for (i = 1; i < info->ksi_nthreads; i++) {
684                 if (sched->kss_nconns > info->ksi_scheds[i].kss_nconns)
685                         sched = &info->ksi_scheds[i];
686         }
687
688         return sched;
689 }
690
691 static int
692 ksocknal_local_ipvec(lnet_ni_t *ni, __u32 *ipaddrs)
693 {
694         ksock_net_t *net = ni->ni_data;
695         int i;
696         int nip;
697
698         read_lock(&ksocknal_data.ksnd_global_lock);
699
700         nip = net->ksnn_ninterfaces;
701         LASSERT(nip <= LNET_MAX_INTERFACES);
702
703         /*
704          * Only offer interfaces for additional connections if I have
705          * more than one.
706          */
707         if (nip < 2) {
708                 read_unlock(&ksocknal_data.ksnd_global_lock);
709                 return 0;
710         }
711
712         for (i = 0; i < nip; i++) {
713                 ipaddrs[i] = net->ksnn_interfaces[i].ksni_ipaddr;
714                 LASSERT(ipaddrs[i]);
715         }
716
717         read_unlock(&ksocknal_data.ksnd_global_lock);
718         return nip;
719 }
720
721 static int
722 ksocknal_match_peerip(ksock_interface_t *iface, __u32 *ips, int nips)
723 {
724         int best_netmatch = 0;
725         int best_xor      = 0;
726         int best          = -1;
727         int this_xor;
728         int this_netmatch;
729         int i;
730
731         for (i = 0; i < nips; i++) {
732                 if (!ips[i])
733                         continue;
734
735                 this_xor = ips[i] ^ iface->ksni_ipaddr;
736                 this_netmatch = !(this_xor & iface->ksni_netmask) ? 1 : 0;
737
738                 if (!(best < 0 ||
739                       best_netmatch < this_netmatch ||
740                       (best_netmatch == this_netmatch &&
741                        best_xor > this_xor)))
742                         continue;
743
744                 best = i;
745                 best_netmatch = this_netmatch;
746                 best_xor = this_xor;
747         }
748
749         LASSERT(best >= 0);
750         return best;
751 }
752
753 static int
754 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
755 {
756         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
757         ksock_net_t *net = peer->ksnp_ni->ni_data;
758         ksock_interface_t *iface;
759         ksock_interface_t *best_iface;
760         int n_ips;
761         int i;
762         int j;
763         int k;
764         __u32 ip;
765         __u32 xor;
766         int this_netmatch;
767         int best_netmatch;
768         int best_npeers;
769
770         /*
771          * CAVEAT EMPTOR: We do all our interface matching with an
772          * exclusive hold of global lock at IRQ priority.  We're only
773          * expecting to be dealing with small numbers of interfaces, so the
774          * O(n**3)-ness shouldn't matter
775          */
776         /*
777          * Also note that I'm not going to return more than n_peerips
778          * interfaces, even if I have more myself
779          */
780         write_lock_bh(global_lock);
781
782         LASSERT(n_peerips <= LNET_MAX_INTERFACES);
783         LASSERT(net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
784
785         /*
786          * Only match interfaces for additional connections
787          * if I have > 1 interface
788          */
789         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
790                 min(n_peerips, net->ksnn_ninterfaces);
791
792         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
793                 /*            ^ yes really... */
794
795                 /*
796                  * If we have any new interfaces, first tick off all the
797                  * peer IPs that match old interfaces, then choose new
798                  * interfaces to match the remaining peer IPS.
799                  * We don't forget interfaces we've stopped using; we might
800                  * start using them again...
801                  */
802                 if (i < peer->ksnp_n_passive_ips) {
803                         /* Old interface. */
804                         ip = peer->ksnp_passive_ips[i];
805                         best_iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
806
807                         /* peer passive ips are kept up to date */
808                         LASSERT(best_iface);
809                 } else {
810                         /* choose a new interface */
811                         LASSERT(i == peer->ksnp_n_passive_ips);
812
813                         best_iface = NULL;
814                         best_netmatch = 0;
815                         best_npeers = 0;
816
817                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
818                                 iface = &net->ksnn_interfaces[j];
819                                 ip = iface->ksni_ipaddr;
820
821                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
822                                         if (peer->ksnp_passive_ips[k] == ip)
823                                                 break;
824
825                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
826                                         continue;
827
828                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
829                                 xor = ip ^ peerips[k];
830                                 this_netmatch = !(xor & iface->ksni_netmask) ? 1 : 0;
831
832                                 if (!(!best_iface ||
833                                       best_netmatch < this_netmatch ||
834                                       (best_netmatch == this_netmatch &&
835                                        best_npeers > iface->ksni_npeers)))
836                                         continue;
837
838                                 best_iface = iface;
839                                 best_netmatch = this_netmatch;
840                                 best_npeers = iface->ksni_npeers;
841                         }
842
843                         LASSERT(best_iface);
844
845                         best_iface->ksni_npeers++;
846                         ip = best_iface->ksni_ipaddr;
847                         peer->ksnp_passive_ips[i] = ip;
848                         peer->ksnp_n_passive_ips = i + 1;
849                 }
850
851                 /* mark the best matching peer IP used */
852                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
853                 peerips[j] = 0;
854         }
855
856         /* Overwrite input peer IP addresses */
857         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
858
859         write_unlock_bh(global_lock);
860
861         return n_ips;
862 }
863
864 static void
865 ksocknal_create_routes(ksock_peer_t *peer, int port,
866                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
867 {
868         ksock_route_t *newroute = NULL;
869         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
870         lnet_ni_t *ni = peer->ksnp_ni;
871         ksock_net_t *net = ni->ni_data;
872         struct list_head *rtmp;
873         ksock_route_t *route;
874         ksock_interface_t *iface;
875         ksock_interface_t *best_iface;
876         int best_netmatch;
877         int this_netmatch;
878         int best_nroutes;
879         int i;
880         int j;
881
882         /*
883          * CAVEAT EMPTOR: We do all our interface matching with an
884          * exclusive hold of global lock at IRQ priority.  We're only
885          * expecting to be dealing with small numbers of interfaces, so the
886          * O(n**3)-ness here shouldn't matter
887          */
888         write_lock_bh(global_lock);
889
890         if (net->ksnn_ninterfaces < 2) {
891                 /*
892                  * Only create additional connections
893                  * if I have > 1 interface
894                  */
895                 write_unlock_bh(global_lock);
896                 return;
897         }
898
899         LASSERT(npeer_ipaddrs <= LNET_MAX_INTERFACES);
900
901         for (i = 0; i < npeer_ipaddrs; i++) {
902                 if (newroute) {
903                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
904                 } else {
905                         write_unlock_bh(global_lock);
906
907                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
908                         if (!newroute)
909                                 return;
910
911                         write_lock_bh(global_lock);
912                 }
913
914                 if (peer->ksnp_closing) {
915                         /* peer got closed under me */
916                         break;
917                 }
918
919                 /* Already got a route? */
920                 route = NULL;
921                 list_for_each(rtmp, &peer->ksnp_routes) {
922                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
923
924                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
925                                 break;
926
927                         route = NULL;
928                 }
929                 if (route)
930                         continue;
931
932                 best_iface = NULL;
933                 best_nroutes = 0;
934                 best_netmatch = 0;
935
936                 LASSERT(net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
937
938                 /* Select interface to connect from */
939                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
940                         iface = &net->ksnn_interfaces[j];
941
942                         /* Using this interface already? */
943                         list_for_each(rtmp, &peer->ksnp_routes) {
944                                 route = list_entry(rtmp, ksock_route_t,
945                                                    ksnr_list);
946
947                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
948                                         break;
949
950                                 route = NULL;
951                         }
952                         if (route)
953                                 continue;
954
955                         this_netmatch = (!((iface->ksni_ipaddr ^
956                                            newroute->ksnr_ipaddr) &
957                                            iface->ksni_netmask)) ? 1 : 0;
958
959                         if (!(!best_iface ||
960                               best_netmatch < this_netmatch ||
961                               (best_netmatch == this_netmatch &&
962                                best_nroutes > iface->ksni_nroutes)))
963                                 continue;
964
965                         best_iface = iface;
966                         best_netmatch = this_netmatch;
967                         best_nroutes = iface->ksni_nroutes;
968                 }
969
970                 if (!best_iface)
971                         continue;
972
973                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
974                 best_iface->ksni_nroutes++;
975
976                 ksocknal_add_route_locked(peer, newroute);
977                 newroute = NULL;
978         }
979
980         write_unlock_bh(global_lock);
981         if (newroute)
982                 ksocknal_route_decref(newroute);
983 }
984
985 int
986 ksocknal_accept(lnet_ni_t *ni, struct socket *sock)
987 {
988         ksock_connreq_t *cr;
989         int rc;
990         __u32 peer_ip;
991         int peer_port;
992
993         rc = lnet_sock_getaddr(sock, 1, &peer_ip, &peer_port);
994         LASSERT(!rc);                 /* we succeeded before */
995
996         LIBCFS_ALLOC(cr, sizeof(*cr));
997         if (!cr) {
998                 LCONSOLE_ERROR_MSG(0x12f, "Dropping connection request from %pI4h: memory exhausted\n",
999                                    &peer_ip);
1000                 return -ENOMEM;
1001         }
1002
1003         lnet_ni_addref(ni);
1004         cr->ksncr_ni   = ni;
1005         cr->ksncr_sock = sock;
1006
1007         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
1008
1009         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
1010         wake_up(&ksocknal_data.ksnd_connd_waitq);
1011
1012         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
1013         return 0;
1014 }
1015
1016 static int
1017 ksocknal_connecting(ksock_peer_t *peer, __u32 ipaddr)
1018 {
1019         ksock_route_t *route;
1020
1021         list_for_each_entry(route, &peer->ksnp_routes, ksnr_list) {
1022                 if (route->ksnr_ipaddr == ipaddr)
1023                         return route->ksnr_connecting;
1024         }
1025         return 0;
1026 }
1027
1028 int
1029 ksocknal_create_conn(lnet_ni_t *ni, ksock_route_t *route,
1030                      struct socket *sock, int type)
1031 {
1032         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
1033         LIST_HEAD(zombies);
1034         lnet_process_id_t peerid;
1035         struct list_head *tmp;
1036         __u64 incarnation;
1037         ksock_conn_t *conn;
1038         ksock_conn_t *conn2;
1039         ksock_peer_t *peer = NULL;
1040         ksock_peer_t *peer2;
1041         ksock_sched_t *sched;
1042         ksock_hello_msg_t *hello;
1043         int cpt;
1044         ksock_tx_t *tx;
1045         ksock_tx_t *txtmp;
1046         int rc;
1047         int active;
1048         char *warn = NULL;
1049
1050         active = !!route;
1051
1052         LASSERT(active == (type != SOCKLND_CONN_NONE));
1053
1054         LIBCFS_ALLOC(conn, sizeof(*conn));
1055         if (!conn) {
1056                 rc = -ENOMEM;
1057                 goto failed_0;
1058         }
1059
1060         conn->ksnc_peer = NULL;
1061         conn->ksnc_route = NULL;
1062         conn->ksnc_sock = sock;
1063         /*
1064          * 2 ref, 1 for conn, another extra ref prevents socket
1065          * being closed before establishment of connection
1066          */
1067         atomic_set(&conn->ksnc_sock_refcount, 2);
1068         conn->ksnc_type = type;
1069         ksocknal_lib_save_callback(sock, conn);
1070         atomic_set(&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1071
1072         conn->ksnc_rx_ready = 0;
1073         conn->ksnc_rx_scheduled = 0;
1074
1075         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
1076         conn->ksnc_tx_ready = 0;
1077         conn->ksnc_tx_scheduled = 0;
1078         conn->ksnc_tx_carrier = NULL;
1079         atomic_set(&conn->ksnc_tx_nob, 0);
1080
1081         LIBCFS_ALLOC(hello, offsetof(ksock_hello_msg_t,
1082                                      kshm_ips[LNET_MAX_INTERFACES]));
1083         if (!hello) {
1084                 rc = -ENOMEM;
1085                 goto failed_1;
1086         }
1087
1088         /* stash conn's local and remote addrs */
1089         rc = ksocknal_lib_get_conn_addrs(conn);
1090         if (rc)
1091                 goto failed_1;
1092
1093         /*
1094          * Find out/confirm peer's NID and connection type and get the
1095          * vector of interfaces she's willing to let me connect to.
1096          * Passive connections use the listener timeout since the peer sends
1097          * eagerly
1098          */
1099         if (active) {
1100                 peer = route->ksnr_peer;
1101                 LASSERT(ni == peer->ksnp_ni);
1102
1103                 /* Active connection sends HELLO eagerly */
1104                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1105                 peerid = peer->ksnp_id;
1106
1107                 write_lock_bh(global_lock);
1108                 conn->ksnc_proto = peer->ksnp_proto;
1109                 write_unlock_bh(global_lock);
1110
1111                 if (!conn->ksnc_proto) {
1112                          conn->ksnc_proto = &ksocknal_protocol_v3x;
1113 #if SOCKNAL_VERSION_DEBUG
1114                          if (*ksocknal_tunables.ksnd_protocol == 2)
1115                                  conn->ksnc_proto = &ksocknal_protocol_v2x;
1116                          else if (*ksocknal_tunables.ksnd_protocol == 1)
1117                                  conn->ksnc_proto = &ksocknal_protocol_v1x;
1118 #endif
1119                 }
1120
1121                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1122                 if (rc)
1123                         goto failed_1;
1124         } else {
1125                 peerid.nid = LNET_NID_ANY;
1126                 peerid.pid = LNET_PID_ANY;
1127
1128                 /* Passive, get protocol from peer */
1129                 conn->ksnc_proto = NULL;
1130         }
1131
1132         rc = ksocknal_recv_hello(ni, conn, hello, &peerid, &incarnation);
1133         if (rc < 0)
1134                 goto failed_1;
1135
1136         LASSERT(!rc || active);
1137         LASSERT(conn->ksnc_proto);
1138         LASSERT(peerid.nid != LNET_NID_ANY);
1139
1140         cpt = lnet_cpt_of_nid(peerid.nid);
1141
1142         if (active) {
1143                 ksocknal_peer_addref(peer);
1144                 write_lock_bh(global_lock);
1145         } else {
1146                 rc = ksocknal_create_peer(&peer, ni, peerid);
1147                 if (rc)
1148                         goto failed_1;
1149
1150                 write_lock_bh(global_lock);
1151
1152                 /* called with a ref on ni, so shutdown can't have started */
1153                 LASSERT(!((ksock_net_t *) ni->ni_data)->ksnn_shutdown);
1154
1155                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1156                 if (!peer2) {
1157                         /*
1158                          * NB this puts an "empty" peer in the peer
1159                          * table (which takes my ref)
1160                          */
1161                         list_add_tail(&peer->ksnp_list,
1162                                       ksocknal_nid2peerlist(peerid.nid));
1163                 } else {
1164                         ksocknal_peer_decref(peer);
1165                         peer = peer2;
1166                 }
1167
1168                 /* +1 ref for me */
1169                 ksocknal_peer_addref(peer);
1170                 peer->ksnp_accepting++;
1171
1172                 /*
1173                  * Am I already connecting to this guy?  Resolve in
1174                  * favour of higher NID...
1175                  */
1176                 if (peerid.nid < ni->ni_nid &&
1177                     ksocknal_connecting(peer, conn->ksnc_ipaddr)) {
1178                         rc = EALREADY;
1179                         warn = "connection race resolution";
1180                         goto failed_2;
1181                 }
1182         }
1183
1184         if (peer->ksnp_closing ||
1185             (active && route->ksnr_deleted)) {
1186                 /* peer/route got closed under me */
1187                 rc = -ESTALE;
1188                 warn = "peer/route removed";
1189                 goto failed_2;
1190         }
1191
1192         if (!peer->ksnp_proto) {
1193                 /*
1194                  * Never connected before.
1195                  * NB recv_hello may have returned EPROTO to signal my peer
1196                  * wants a different protocol than the one I asked for.
1197                  */
1198                 LASSERT(list_empty(&peer->ksnp_conns));
1199
1200                 peer->ksnp_proto = conn->ksnc_proto;
1201                 peer->ksnp_incarnation = incarnation;
1202         }
1203
1204         if (peer->ksnp_proto != conn->ksnc_proto ||
1205             peer->ksnp_incarnation != incarnation) {
1206                 /* Peer rebooted or I've got the wrong protocol version */
1207                 ksocknal_close_peer_conns_locked(peer, 0, 0);
1208
1209                 peer->ksnp_proto = NULL;
1210                 rc = ESTALE;
1211                 warn = peer->ksnp_incarnation != incarnation ?
1212                        "peer rebooted" :
1213                        "wrong proto version";
1214                 goto failed_2;
1215         }
1216
1217         switch (rc) {
1218         default:
1219                 LBUG();
1220         case 0:
1221                 break;
1222         case EALREADY:
1223                 warn = "lost conn race";
1224                 goto failed_2;
1225         case EPROTO:
1226                 warn = "retry with different protocol version";
1227                 goto failed_2;
1228         }
1229
1230         /*
1231          * Refuse to duplicate an existing connection, unless this is a
1232          * loopback connection
1233          */
1234         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1235                 list_for_each(tmp, &peer->ksnp_conns) {
1236                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1237
1238                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1239                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1240                             conn2->ksnc_type != conn->ksnc_type)
1241                                 continue;
1242
1243                         /*
1244                          * Reply on a passive connection attempt so the peer
1245                          * realises we're connected.
1246                          */
1247                         LASSERT(!rc);
1248                         if (!active)
1249                                 rc = EALREADY;
1250
1251                         warn = "duplicate";
1252                         goto failed_2;
1253                 }
1254         }
1255
1256         /*
1257          * If the connection created by this route didn't bind to the IP
1258          * address the route connected to, the connection/route matching
1259          * code below probably isn't going to work.
1260          */
1261         if (active &&
1262             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1263                 CERROR("Route %s %pI4h connected to %pI4h\n",
1264                        libcfs_id2str(peer->ksnp_id),
1265                        &route->ksnr_ipaddr,
1266                        &conn->ksnc_ipaddr);
1267         }
1268
1269         /*
1270          * Search for a route corresponding to the new connection and
1271          * create an association.  This allows incoming connections created
1272          * by routes in my peer to match my own route entries so I don't
1273          * continually create duplicate routes.
1274          */
1275         list_for_each(tmp, &peer->ksnp_routes) {
1276                 route = list_entry(tmp, ksock_route_t, ksnr_list);
1277
1278                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1279                         continue;
1280
1281                 ksocknal_associate_route_conn_locked(route, conn);
1282                 break;
1283         }
1284
1285         conn->ksnc_peer = peer;          /* conn takes my ref on peer */
1286         peer->ksnp_last_alive = cfs_time_current();
1287         peer->ksnp_send_keepalive = 0;
1288         peer->ksnp_error = 0;
1289
1290         sched = ksocknal_choose_scheduler_locked(cpt);
1291         sched->kss_nconns++;
1292         conn->ksnc_scheduler = sched;
1293
1294         conn->ksnc_tx_last_post = cfs_time_current();
1295         /* Set the deadline for the outgoing HELLO to drain */
1296         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1297         conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
1298         mb();   /* order with adding to peer's conn list */
1299
1300         list_add(&conn->ksnc_list, &peer->ksnp_conns);
1301         ksocknal_conn_addref(conn);
1302
1303         ksocknal_new_packet(conn, 0);
1304
1305         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1306
1307         /* Take packets blocking for this connection. */
1308         list_for_each_entry_safe(tx, txtmp, &peer->ksnp_tx_queue, tx_list) {
1309                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) == SOCKNAL_MATCH_NO)
1310                                 continue;
1311
1312                 list_del(&tx->tx_list);
1313                 ksocknal_queue_tx_locked(tx, conn);
1314         }
1315
1316         write_unlock_bh(global_lock);
1317
1318         /*
1319          * We've now got a new connection.  Any errors from here on are just
1320          * like "normal" comms errors and we close the connection normally.
1321          * NB (a) we still have to send the reply HELLO for passive
1322          *      connections,
1323          *    (b) normal I/O on the conn is blocked until I setup and call the
1324          *      socket callbacks.
1325          */
1326         CDEBUG(D_NET, "New conn %s p %d.x %pI4h -> %pI4h/%d incarnation:%lld sched[%d:%d]\n",
1327                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1328                &conn->ksnc_myipaddr, &conn->ksnc_ipaddr,
1329                conn->ksnc_port, incarnation, cpt,
1330                (int)(sched - &sched->kss_info->ksi_scheds[0]));
1331
1332         if (active) {
1333                 /* additional routes after interface exchange? */
1334                 ksocknal_create_routes(peer, conn->ksnc_port,
1335                                        hello->kshm_ips, hello->kshm_nips);
1336         } else {
1337                 hello->kshm_nips = ksocknal_select_ips(peer, hello->kshm_ips,
1338                                                        hello->kshm_nips);
1339                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1340         }
1341
1342         LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1343                                     kshm_ips[LNET_MAX_INTERFACES]));
1344
1345         /*
1346          * setup the socket AFTER I've received hello (it disables
1347          * SO_LINGER).  I might call back to the acceptor who may want
1348          * to send a protocol version response and then close the
1349          * socket; this ensures the socket only tears down after the
1350          * response has been sent.
1351          */
1352         if (!rc)
1353                 rc = ksocknal_lib_setup_sock(sock);
1354
1355         write_lock_bh(global_lock);
1356
1357         /* NB my callbacks block while I hold ksnd_global_lock */
1358         ksocknal_lib_set_callback(sock, conn);
1359
1360         if (!active)
1361                 peer->ksnp_accepting--;
1362
1363         write_unlock_bh(global_lock);
1364
1365         if (rc) {
1366                 write_lock_bh(global_lock);
1367                 if (!conn->ksnc_closing) {
1368                         /* could be closed by another thread */
1369                         ksocknal_close_conn_locked(conn, rc);
1370                 }
1371                 write_unlock_bh(global_lock);
1372         } else if (!ksocknal_connsock_addref(conn)) {
1373                 /* Allow I/O to proceed. */
1374                 ksocknal_read_callback(conn);
1375                 ksocknal_write_callback(conn);
1376                 ksocknal_connsock_decref(conn);
1377         }
1378
1379         ksocknal_connsock_decref(conn);
1380         ksocknal_conn_decref(conn);
1381         return rc;
1382
1383  failed_2:
1384         if (!peer->ksnp_closing &&
1385             list_empty(&peer->ksnp_conns) &&
1386             list_empty(&peer->ksnp_routes)) {
1387                 list_add(&zombies, &peer->ksnp_tx_queue);
1388                 list_del_init(&peer->ksnp_tx_queue);
1389                 ksocknal_unlink_peer_locked(peer);
1390         }
1391
1392         write_unlock_bh(global_lock);
1393
1394         if (warn) {
1395                 if (rc < 0)
1396                         CERROR("Not creating conn %s type %d: %s\n",
1397                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1398                 else
1399                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1400                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1401         }
1402
1403         if (!active) {
1404                 if (rc > 0) {
1405                         /*
1406                          * Request retry by replying with CONN_NONE
1407                          * ksnc_proto has been set already
1408                          */
1409                         conn->ksnc_type = SOCKLND_CONN_NONE;
1410                         hello->kshm_nips = 0;
1411                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1412                 }
1413
1414                 write_lock_bh(global_lock);
1415                 peer->ksnp_accepting--;
1416                 write_unlock_bh(global_lock);
1417         }
1418
1419         ksocknal_txlist_done(ni, &zombies, 1);
1420         ksocknal_peer_decref(peer);
1421
1422 failed_1:
1423         if (hello)
1424                 LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1425                                             kshm_ips[LNET_MAX_INTERFACES]));
1426
1427         LIBCFS_FREE(conn, sizeof(*conn));
1428
1429 failed_0:
1430         sock_release(sock);
1431         return rc;
1432 }
1433
1434 void
1435 ksocknal_close_conn_locked(ksock_conn_t *conn, int error)
1436 {
1437         /*
1438          * This just does the immmediate housekeeping, and queues the
1439          * connection for the reaper to terminate.
1440          * Caller holds ksnd_global_lock exclusively in irq context
1441          */
1442         ksock_peer_t *peer = conn->ksnc_peer;
1443         ksock_route_t *route;
1444         ksock_conn_t *conn2;
1445         struct list_head *tmp;
1446
1447         LASSERT(!peer->ksnp_error);
1448         LASSERT(!conn->ksnc_closing);
1449         conn->ksnc_closing = 1;
1450
1451         /* ksnd_deathrow_conns takes over peer's ref */
1452         list_del(&conn->ksnc_list);
1453
1454         route = conn->ksnc_route;
1455         if (route) {
1456                 /* dissociate conn from route... */
1457                 LASSERT(!route->ksnr_deleted);
1458                 LASSERT(route->ksnr_connected & (1 << conn->ksnc_type));
1459
1460                 conn2 = NULL;
1461                 list_for_each(tmp, &peer->ksnp_conns) {
1462                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1463
1464                         if (conn2->ksnc_route == route &&
1465                             conn2->ksnc_type == conn->ksnc_type)
1466                                 break;
1467
1468                         conn2 = NULL;
1469                 }
1470                 if (!conn2)
1471                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1472
1473                 conn->ksnc_route = NULL;
1474
1475 #if 0      /* irrelevant with only eager routes */
1476                 /* make route least favourite */
1477                 list_del(&route->ksnr_list);
1478                 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
1479 #endif
1480                 ksocknal_route_decref(route);     /* drop conn's ref on route */
1481         }
1482
1483         if (list_empty(&peer->ksnp_conns)) {
1484                 /* No more connections to this peer */
1485
1486                 if (!list_empty(&peer->ksnp_tx_queue)) {
1487                         ksock_tx_t *tx;
1488
1489                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1490
1491                         /*
1492                          * throw them to the last connection...,
1493                          * these TXs will be send to /dev/null by scheduler
1494                          */
1495                         list_for_each_entry(tx, &peer->ksnp_tx_queue,
1496                                             tx_list)
1497                                 ksocknal_tx_prep(conn, tx);
1498
1499                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1500                         list_splice_init(&peer->ksnp_tx_queue,
1501                                          &conn->ksnc_tx_queue);
1502                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1503                 }
1504
1505                 peer->ksnp_proto = NULL;        /* renegotiate protocol version */
1506                 peer->ksnp_error = error;       /* stash last conn close reason */
1507
1508                 if (list_empty(&peer->ksnp_routes)) {
1509                         /*
1510                          * I've just closed last conn belonging to a
1511                          * peer with no routes to it
1512                          */
1513                         ksocknal_unlink_peer_locked(peer);
1514                 }
1515         }
1516
1517         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1518
1519         list_add_tail(&conn->ksnc_list,
1520                       &ksocknal_data.ksnd_deathrow_conns);
1521         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1522
1523         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1524 }
1525
1526 void
1527 ksocknal_peer_failed(ksock_peer_t *peer)
1528 {
1529         int notify = 0;
1530         unsigned long last_alive = 0;
1531
1532         /*
1533          * There has been a connection failure or comms error; but I'll only
1534          * tell LNET I think the peer is dead if it's to another kernel and
1535          * there are no connections or connection attempts in existence.
1536          */
1537         read_lock(&ksocknal_data.ksnd_global_lock);
1538
1539         if (!(peer->ksnp_id.pid & LNET_PID_USERFLAG) &&
1540             list_empty(&peer->ksnp_conns) &&
1541             !peer->ksnp_accepting &&
1542             !ksocknal_find_connecting_route_locked(peer)) {
1543                 notify = 1;
1544                 last_alive = peer->ksnp_last_alive;
1545         }
1546
1547         read_unlock(&ksocknal_data.ksnd_global_lock);
1548
1549         if (notify)
1550                 lnet_notify(peer->ksnp_ni, peer->ksnp_id.nid, 0,
1551                             last_alive);
1552 }
1553
1554 void
1555 ksocknal_finalize_zcreq(ksock_conn_t *conn)
1556 {
1557         ksock_peer_t *peer = conn->ksnc_peer;
1558         ksock_tx_t *tx;
1559         ksock_tx_t *temp;
1560         ksock_tx_t *tmp;
1561         LIST_HEAD(zlist);
1562
1563         /*
1564          * NB safe to finalize TXs because closing of socket will
1565          * abort all buffered data
1566          */
1567         LASSERT(!conn->ksnc_sock);
1568
1569         spin_lock(&peer->ksnp_lock);
1570
1571         list_for_each_entry_safe(tx, tmp, &peer->ksnp_zc_req_list, tx_zc_list) {
1572                 if (tx->tx_conn != conn)
1573                         continue;
1574
1575                 LASSERT(tx->tx_msg.ksm_zc_cookies[0]);
1576
1577                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1578                 tx->tx_zc_aborted = 1; /* mark it as not-acked */
1579                 list_del(&tx->tx_zc_list);
1580                 list_add(&tx->tx_zc_list, &zlist);
1581         }
1582
1583         spin_unlock(&peer->ksnp_lock);
1584
1585         list_for_each_entry_safe(tx, temp, &zlist, tx_zc_list) {
1586                 list_del(&tx->tx_zc_list);
1587                 ksocknal_tx_decref(tx);
1588         }
1589 }
1590
1591 void
1592 ksocknal_terminate_conn(ksock_conn_t *conn)
1593 {
1594         /*
1595          * This gets called by the reaper (guaranteed thread context) to
1596          * disengage the socket from its callbacks and close it.
1597          * ksnc_refcount will eventually hit zero, and then the reaper will
1598          * destroy it.
1599          */
1600         ksock_peer_t *peer = conn->ksnc_peer;
1601         ksock_sched_t *sched = conn->ksnc_scheduler;
1602         int failed = 0;
1603
1604         LASSERT(conn->ksnc_closing);
1605
1606         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1607         spin_lock_bh(&sched->kss_lock);
1608
1609         /* a closing conn is always ready to tx */
1610         conn->ksnc_tx_ready = 1;
1611
1612         if (!conn->ksnc_tx_scheduled &&
1613             !list_empty(&conn->ksnc_tx_queue)) {
1614                 list_add_tail(&conn->ksnc_tx_list,
1615                               &sched->kss_tx_conns);
1616                 conn->ksnc_tx_scheduled = 1;
1617                 /* extra ref for scheduler */
1618                 ksocknal_conn_addref(conn);
1619
1620                 wake_up(&sched->kss_waitq);
1621         }
1622
1623         spin_unlock_bh(&sched->kss_lock);
1624
1625         /* serialise with callbacks */
1626         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1627
1628         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1629
1630         /*
1631          * OK, so this conn may not be completely disengaged from its
1632          * scheduler yet, but it _has_ committed to terminate...
1633          */
1634         conn->ksnc_scheduler->kss_nconns--;
1635
1636         if (peer->ksnp_error) {
1637                 /* peer's last conn closed in error */
1638                 LASSERT(list_empty(&peer->ksnp_conns));
1639                 failed = 1;
1640                 peer->ksnp_error = 0;     /* avoid multiple notifications */
1641         }
1642
1643         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1644
1645         if (failed)
1646                 ksocknal_peer_failed(peer);
1647
1648         /*
1649          * The socket is closed on the final put; either here, or in
1650          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1651          * when the connection was established, this will close the socket
1652          * immediately, aborting anything buffered in it. Any hung
1653          * zero-copy transmits will therefore complete in finite time.
1654          */
1655         ksocknal_connsock_decref(conn);
1656 }
1657
1658 void
1659 ksocknal_queue_zombie_conn(ksock_conn_t *conn)
1660 {
1661         /* Queue the conn for the reaper to destroy */
1662
1663         LASSERT(!atomic_read(&conn->ksnc_conn_refcount));
1664         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1665
1666         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1667         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1668
1669         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1670 }
1671
1672 void
1673 ksocknal_destroy_conn(ksock_conn_t *conn)
1674 {
1675         unsigned long last_rcv;
1676
1677         /* Final coup-de-grace of the reaper */
1678         CDEBUG(D_NET, "connection %p\n", conn);
1679
1680         LASSERT(!atomic_read(&conn->ksnc_conn_refcount));
1681         LASSERT(!atomic_read(&conn->ksnc_sock_refcount));
1682         LASSERT(!conn->ksnc_sock);
1683         LASSERT(!conn->ksnc_route);
1684         LASSERT(!conn->ksnc_tx_scheduled);
1685         LASSERT(!conn->ksnc_rx_scheduled);
1686         LASSERT(list_empty(&conn->ksnc_tx_queue));
1687
1688         /* complete current receive if any */
1689         switch (conn->ksnc_rx_state) {
1690         case SOCKNAL_RX_LNET_PAYLOAD:
1691                 last_rcv = conn->ksnc_rx_deadline -
1692                            cfs_time_seconds(*ksocknal_tunables.ksnd_timeout);
1693                 CERROR("Completing partial receive from %s[%d], ip %pI4h:%d, with error, wanted: %d, left: %d, last alive is %ld secs ago\n",
1694                        libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_type,
1695                        &conn->ksnc_ipaddr, conn->ksnc_port,
1696                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1697                        cfs_duration_sec(cfs_time_sub(cfs_time_current(),
1698                                                      last_rcv)));
1699                 lnet_finalize(conn->ksnc_peer->ksnp_ni,
1700                               conn->ksnc_cookie, -EIO);
1701                 break;
1702         case SOCKNAL_RX_LNET_HEADER:
1703                 if (conn->ksnc_rx_started)
1704                         CERROR("Incomplete receive of lnet header from %s, ip %pI4h:%d, with error, protocol: %d.x.\n",
1705                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1706                                &conn->ksnc_ipaddr, conn->ksnc_port,
1707                                conn->ksnc_proto->pro_version);
1708                 break;
1709         case SOCKNAL_RX_KSM_HEADER:
1710                 if (conn->ksnc_rx_started)
1711                         CERROR("Incomplete receive of ksock message from %s, ip %pI4h:%d, with error, protocol: %d.x.\n",
1712                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1713                                &conn->ksnc_ipaddr, conn->ksnc_port,
1714                                conn->ksnc_proto->pro_version);
1715                 break;
1716         case SOCKNAL_RX_SLOP:
1717                 if (conn->ksnc_rx_started)
1718                         CERROR("Incomplete receive of slops from %s, ip %pI4h:%d, with error\n",
1719                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1720                                &conn->ksnc_ipaddr, conn->ksnc_port);
1721                break;
1722         default:
1723                 LBUG();
1724                 break;
1725         }
1726
1727         ksocknal_peer_decref(conn->ksnc_peer);
1728
1729         LIBCFS_FREE(conn, sizeof(*conn));
1730 }
1731
1732 int
1733 ksocknal_close_peer_conns_locked(ksock_peer_t *peer, __u32 ipaddr, int why)
1734 {
1735         ksock_conn_t *conn;
1736         struct list_head *ctmp;
1737         struct list_head *cnxt;
1738         int count = 0;
1739
1740         list_for_each_safe(ctmp, cnxt, &peer->ksnp_conns) {
1741                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
1742
1743                 if (!ipaddr || conn->ksnc_ipaddr == ipaddr) {
1744                         count++;
1745                         ksocknal_close_conn_locked(conn, why);
1746                 }
1747         }
1748
1749         return count;
1750 }
1751
1752 int
1753 ksocknal_close_conn_and_siblings(ksock_conn_t *conn, int why)
1754 {
1755         ksock_peer_t *peer = conn->ksnc_peer;
1756         __u32 ipaddr = conn->ksnc_ipaddr;
1757         int count;
1758
1759         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1760
1761         count = ksocknal_close_peer_conns_locked(peer, ipaddr, why);
1762
1763         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1764
1765         return count;
1766 }
1767
1768 int
1769 ksocknal_close_matching_conns(lnet_process_id_t id, __u32 ipaddr)
1770 {
1771         ksock_peer_t *peer;
1772         struct list_head *ptmp;
1773         struct list_head *pnxt;
1774         int lo;
1775         int hi;
1776         int i;
1777         int count = 0;
1778
1779         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1780
1781         if (id.nid != LNET_NID_ANY) {
1782                 lo = (int)(ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers);
1783                 hi = (int)(ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers);
1784         } else {
1785                 lo = 0;
1786                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1787         }
1788
1789         for (i = lo; i <= hi; i++) {
1790                 list_for_each_safe(ptmp, pnxt,
1791                                    &ksocknal_data.ksnd_peers[i]) {
1792                         peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
1793
1794                         if (!((id.nid == LNET_NID_ANY || id.nid == peer->ksnp_id.nid) &&
1795                               (id.pid == LNET_PID_ANY || id.pid == peer->ksnp_id.pid)))
1796                                 continue;
1797
1798                         count += ksocknal_close_peer_conns_locked(peer, ipaddr, 0);
1799                 }
1800         }
1801
1802         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1803
1804         /* wildcards always succeed */
1805         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || !ipaddr)
1806                 return 0;
1807
1808         if (!count)
1809                 return -ENOENT;
1810         else
1811                 return 0;
1812 }
1813
1814 void
1815 ksocknal_notify(lnet_ni_t *ni, lnet_nid_t gw_nid, int alive)
1816 {
1817         /*
1818          * The router is telling me she's been notified of a change in
1819          * gateway state....
1820          */
1821         lnet_process_id_t id = {0};
1822
1823         id.nid = gw_nid;
1824         id.pid = LNET_PID_ANY;
1825
1826         CDEBUG(D_NET, "gw %s %s\n", libcfs_nid2str(gw_nid),
1827                alive ? "up" : "down");
1828
1829         if (!alive) {
1830                 /* If the gateway crashed, close all open connections... */
1831                 ksocknal_close_matching_conns(id, 0);
1832                 return;
1833         }
1834
1835         /*
1836          * ...otherwise do nothing.  We can only establish new connections
1837          * if we have autroutes, and these connect on demand.
1838          */
1839 }
1840
1841 void
1842 ksocknal_query(lnet_ni_t *ni, lnet_nid_t nid, unsigned long *when)
1843 {
1844         int connect = 1;
1845         unsigned long last_alive = 0;
1846         unsigned long now = cfs_time_current();
1847         ksock_peer_t *peer = NULL;
1848         rwlock_t *glock = &ksocknal_data.ksnd_global_lock;
1849         lnet_process_id_t id = {
1850                 .nid = nid,
1851                 .pid = LNET_PID_LUSTRE,
1852         };
1853
1854         read_lock(glock);
1855
1856         peer = ksocknal_find_peer_locked(ni, id);
1857         if (peer) {
1858                 struct list_head *tmp;
1859                 ksock_conn_t *conn;
1860                 int bufnob;
1861
1862                 list_for_each(tmp, &peer->ksnp_conns) {
1863                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1864                         bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
1865
1866                         if (bufnob < conn->ksnc_tx_bufnob) {
1867                                 /* something got ACKed */
1868                                 conn->ksnc_tx_deadline =
1869                                         cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
1870                                 peer->ksnp_last_alive = now;
1871                                 conn->ksnc_tx_bufnob = bufnob;
1872                         }
1873                 }
1874
1875                 last_alive = peer->ksnp_last_alive;
1876                 if (!ksocknal_find_connectable_route_locked(peer))
1877                         connect = 0;
1878         }
1879
1880         read_unlock(glock);
1881
1882         if (last_alive)
1883                 *when = last_alive;
1884
1885         CDEBUG(D_NET, "Peer %s %p, alive %ld secs ago, connect %d\n",
1886                libcfs_nid2str(nid), peer,
1887                last_alive ? cfs_duration_sec(now - last_alive) : -1,
1888                connect);
1889
1890         if (!connect)
1891                 return;
1892
1893         ksocknal_add_peer(ni, id, LNET_NIDADDR(nid), lnet_acceptor_port());
1894
1895         write_lock_bh(glock);
1896
1897         peer = ksocknal_find_peer_locked(ni, id);
1898         if (peer)
1899                 ksocknal_launch_all_connections_locked(peer);
1900
1901         write_unlock_bh(glock);
1902 }
1903
1904 static void
1905 ksocknal_push_peer(ksock_peer_t *peer)
1906 {
1907         int index;
1908         int i;
1909         struct list_head *tmp;
1910         ksock_conn_t *conn;
1911
1912         for (index = 0; ; index++) {
1913                 read_lock(&ksocknal_data.ksnd_global_lock);
1914
1915                 i = 0;
1916                 conn = NULL;
1917
1918                 list_for_each(tmp, &peer->ksnp_conns) {
1919                         if (i++ == index) {
1920                                 conn = list_entry(tmp, ksock_conn_t,
1921                                                   ksnc_list);
1922                                 ksocknal_conn_addref(conn);
1923                                 break;
1924                         }
1925                 }
1926
1927                 read_unlock(&ksocknal_data.ksnd_global_lock);
1928
1929                 if (!conn)
1930                         break;
1931
1932                 ksocknal_lib_push_conn(conn);
1933                 ksocknal_conn_decref(conn);
1934         }
1935 }
1936
1937 static int ksocknal_push(lnet_ni_t *ni, lnet_process_id_t id)
1938 {
1939         struct list_head *start;
1940         struct list_head *end;
1941         struct list_head *tmp;
1942         int rc = -ENOENT;
1943         unsigned int hsize = ksocknal_data.ksnd_peer_hash_size;
1944
1945         if (id.nid == LNET_NID_ANY) {
1946                 start = &ksocknal_data.ksnd_peers[0];
1947                 end = &ksocknal_data.ksnd_peers[hsize - 1];
1948         } else {
1949                 start = ksocknal_nid2peerlist(id.nid);
1950                 end = ksocknal_nid2peerlist(id.nid);
1951         }
1952
1953         for (tmp = start; tmp <= end; tmp++) {
1954                 int peer_off; /* searching offset in peer hash table */
1955
1956                 for (peer_off = 0; ; peer_off++) {
1957                         ksock_peer_t *peer;
1958                         int i = 0;
1959
1960                         read_lock(&ksocknal_data.ksnd_global_lock);
1961                         list_for_each_entry(peer, tmp, ksnp_list) {
1962                                 if (!((id.nid == LNET_NID_ANY ||
1963                                        id.nid == peer->ksnp_id.nid) &&
1964                                       (id.pid == LNET_PID_ANY ||
1965                                        id.pid == peer->ksnp_id.pid)))
1966                                         continue;
1967
1968                                 if (i++ == peer_off) {
1969                                         ksocknal_peer_addref(peer);
1970                                         break;
1971                                 }
1972                         }
1973                         read_unlock(&ksocknal_data.ksnd_global_lock);
1974
1975                         if (!i) /* no match */
1976                                 break;
1977
1978                         rc = 0;
1979                         ksocknal_push_peer(peer);
1980                         ksocknal_peer_decref(peer);
1981                 }
1982         }
1983         return rc;
1984 }
1985
1986 static int
1987 ksocknal_add_interface(lnet_ni_t *ni, __u32 ipaddress, __u32 netmask)
1988 {
1989         ksock_net_t *net = ni->ni_data;
1990         ksock_interface_t *iface;
1991         int rc;
1992         int i;
1993         int j;
1994         struct list_head *ptmp;
1995         ksock_peer_t *peer;
1996         struct list_head *rtmp;
1997         ksock_route_t *route;
1998
1999         if (!ipaddress || !netmask)
2000                 return -EINVAL;
2001
2002         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2003
2004         iface = ksocknal_ip2iface(ni, ipaddress);
2005         if (iface) {
2006                 /* silently ignore dups */
2007                 rc = 0;
2008         } else if (net->ksnn_ninterfaces == LNET_MAX_INTERFACES) {
2009                 rc = -ENOSPC;
2010         } else {
2011                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
2012
2013                 iface->ksni_ipaddr = ipaddress;
2014                 iface->ksni_netmask = netmask;
2015                 iface->ksni_nroutes = 0;
2016                 iface->ksni_npeers = 0;
2017
2018                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2019                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
2020                                 peer = list_entry(ptmp, ksock_peer_t,
2021                                                   ksnp_list);
2022
2023                                 for (j = 0; j < peer->ksnp_n_passive_ips; j++)
2024                                         if (peer->ksnp_passive_ips[j] == ipaddress)
2025                                                 iface->ksni_npeers++;
2026
2027                                 list_for_each(rtmp, &peer->ksnp_routes) {
2028                                         route = list_entry(rtmp, ksock_route_t,
2029                                                            ksnr_list);
2030
2031                                         if (route->ksnr_myipaddr == ipaddress)
2032                                                 iface->ksni_nroutes++;
2033                                 }
2034                         }
2035                 }
2036
2037                 rc = 0;
2038                 /* NB only new connections will pay attention to the new interface! */
2039         }
2040
2041         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2042
2043         return rc;
2044 }
2045
2046 static void
2047 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
2048 {
2049         struct list_head *tmp;
2050         struct list_head *nxt;
2051         ksock_route_t *route;
2052         ksock_conn_t *conn;
2053         int i;
2054         int j;
2055
2056         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
2057                 if (peer->ksnp_passive_ips[i] == ipaddr) {
2058                         for (j = i + 1; j < peer->ksnp_n_passive_ips; j++)
2059                                 peer->ksnp_passive_ips[j - 1] =
2060                                         peer->ksnp_passive_ips[j];
2061                         peer->ksnp_n_passive_ips--;
2062                         break;
2063                 }
2064
2065         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
2066                 route = list_entry(tmp, ksock_route_t, ksnr_list);
2067
2068                 if (route->ksnr_myipaddr != ipaddr)
2069                         continue;
2070
2071                 if (route->ksnr_share_count) {
2072                         /* Manually created; keep, but unbind */
2073                         route->ksnr_myipaddr = 0;
2074                 } else {
2075                         ksocknal_del_route_locked(route);
2076                 }
2077         }
2078
2079         list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
2080                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
2081
2082                 if (conn->ksnc_myipaddr == ipaddr)
2083                         ksocknal_close_conn_locked(conn, 0);
2084         }
2085 }
2086
2087 static int
2088 ksocknal_del_interface(lnet_ni_t *ni, __u32 ipaddress)
2089 {
2090         ksock_net_t *net = ni->ni_data;
2091         int rc = -ENOENT;
2092         struct list_head *tmp;
2093         struct list_head *nxt;
2094         ksock_peer_t *peer;
2095         __u32 this_ip;
2096         int i;
2097         int j;
2098
2099         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2100
2101         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2102                 this_ip = net->ksnn_interfaces[i].ksni_ipaddr;
2103
2104                 if (!(!ipaddress || ipaddress == this_ip))
2105                         continue;
2106
2107                 rc = 0;
2108
2109                 for (j = i + 1; j < net->ksnn_ninterfaces; j++)
2110                         net->ksnn_interfaces[j - 1] =
2111                                 net->ksnn_interfaces[j];
2112
2113                 net->ksnn_ninterfaces--;
2114
2115                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
2116                         list_for_each_safe(tmp, nxt,
2117                                            &ksocknal_data.ksnd_peers[j]) {
2118                                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
2119
2120                                 if (peer->ksnp_ni != ni)
2121                                         continue;
2122
2123                                 ksocknal_peer_del_interface_locked(peer, this_ip);
2124                         }
2125                 }
2126         }
2127
2128         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2129
2130         return rc;
2131 }
2132
2133 int
2134 ksocknal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
2135 {
2136         lnet_process_id_t id = {0};
2137         struct libcfs_ioctl_data *data = arg;
2138         int rc;
2139
2140         switch (cmd) {
2141         case IOC_LIBCFS_GET_INTERFACE: {
2142                 ksock_net_t       *net = ni->ni_data;
2143                 ksock_interface_t *iface;
2144
2145                 read_lock(&ksocknal_data.ksnd_global_lock);
2146
2147                 if (data->ioc_count >= (__u32)net->ksnn_ninterfaces) {
2148                         rc = -ENOENT;
2149                 } else {
2150                         rc = 0;
2151                         iface = &net->ksnn_interfaces[data->ioc_count];
2152
2153                         data->ioc_u32[0] = iface->ksni_ipaddr;
2154                         data->ioc_u32[1] = iface->ksni_netmask;
2155                         data->ioc_u32[2] = iface->ksni_npeers;
2156                         data->ioc_u32[3] = iface->ksni_nroutes;
2157                 }
2158
2159                 read_unlock(&ksocknal_data.ksnd_global_lock);
2160                 return rc;
2161         }
2162
2163         case IOC_LIBCFS_ADD_INTERFACE:
2164                 return ksocknal_add_interface(ni,
2165                                               data->ioc_u32[0], /* IP address */
2166                                               data->ioc_u32[1]); /* net mask */
2167
2168         case IOC_LIBCFS_DEL_INTERFACE:
2169                 return ksocknal_del_interface(ni,
2170                                               data->ioc_u32[0]); /* IP address */
2171
2172         case IOC_LIBCFS_GET_PEER: {
2173                 __u32 myip = 0;
2174                 __u32 ip = 0;
2175                 int port = 0;
2176                 int conn_count = 0;
2177                 int share_count = 0;
2178
2179                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2180                                             &id, &myip, &ip, &port,
2181                                             &conn_count,  &share_count);
2182                 if (rc)
2183                         return rc;
2184
2185                 data->ioc_nid    = id.nid;
2186                 data->ioc_count  = share_count;
2187                 data->ioc_u32[0] = ip;
2188                 data->ioc_u32[1] = port;
2189                 data->ioc_u32[2] = myip;
2190                 data->ioc_u32[3] = conn_count;
2191                 data->ioc_u32[4] = id.pid;
2192                 return 0;
2193         }
2194
2195         case IOC_LIBCFS_ADD_PEER:
2196                 id.nid = data->ioc_nid;
2197                 id.pid = LNET_PID_LUSTRE;
2198                 return ksocknal_add_peer(ni, id,
2199                                           data->ioc_u32[0], /* IP */
2200                                           data->ioc_u32[1]); /* port */
2201
2202         case IOC_LIBCFS_DEL_PEER:
2203                 id.nid = data->ioc_nid;
2204                 id.pid = LNET_PID_ANY;
2205                 return ksocknal_del_peer(ni, id,
2206                                           data->ioc_u32[0]); /* IP */
2207
2208         case IOC_LIBCFS_GET_CONN: {
2209                 int txmem;
2210                 int rxmem;
2211                 int nagle;
2212                 ksock_conn_t *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
2213
2214                 if (!conn)
2215                         return -ENOENT;
2216
2217                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2218
2219                 data->ioc_count  = txmem;
2220                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2221                 data->ioc_flags  = nagle;
2222                 data->ioc_u32[0] = conn->ksnc_ipaddr;
2223                 data->ioc_u32[1] = conn->ksnc_port;
2224                 data->ioc_u32[2] = conn->ksnc_myipaddr;
2225                 data->ioc_u32[3] = conn->ksnc_type;
2226                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_info->ksi_cpt;
2227                 data->ioc_u32[5] = rxmem;
2228                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2229                 ksocknal_conn_decref(conn);
2230                 return 0;
2231         }
2232
2233         case IOC_LIBCFS_CLOSE_CONNECTION:
2234                 id.nid = data->ioc_nid;
2235                 id.pid = LNET_PID_ANY;
2236                 return ksocknal_close_matching_conns(id,
2237                                                       data->ioc_u32[0]);
2238
2239         case IOC_LIBCFS_REGISTER_MYNID:
2240                 /* Ignore if this is a noop */
2241                 if (data->ioc_nid == ni->ni_nid)
2242                         return 0;
2243
2244                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2245                        libcfs_nid2str(data->ioc_nid),
2246                        libcfs_nid2str(ni->ni_nid));
2247                 return -EINVAL;
2248
2249         case IOC_LIBCFS_PUSH_CONNECTION:
2250                 id.nid = data->ioc_nid;
2251                 id.pid = LNET_PID_ANY;
2252                 return ksocknal_push(ni, id);
2253
2254         default:
2255                 return -EINVAL;
2256         }
2257         /* not reached */
2258 }
2259
2260 static void
2261 ksocknal_free_buffers(void)
2262 {
2263         LASSERT(!atomic_read(&ksocknal_data.ksnd_nactive_txs));
2264
2265         if (ksocknal_data.ksnd_sched_info) {
2266                 struct ksock_sched_info *info;
2267                 int i;
2268
2269                 cfs_percpt_for_each(info, i, ksocknal_data.ksnd_sched_info) {
2270                         if (info->ksi_scheds) {
2271                                 LIBCFS_FREE(info->ksi_scheds,
2272                                             info->ksi_nthreads_max *
2273                                             sizeof(info->ksi_scheds[0]));
2274                         }
2275                 }
2276                 cfs_percpt_free(ksocknal_data.ksnd_sched_info);
2277         }
2278
2279         LIBCFS_FREE(ksocknal_data.ksnd_peers,
2280                     sizeof(struct list_head) *
2281                     ksocknal_data.ksnd_peer_hash_size);
2282
2283         spin_lock(&ksocknal_data.ksnd_tx_lock);
2284
2285         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2286                 struct list_head zlist;
2287                 ksock_tx_t *tx;
2288                 ksock_tx_t *temp;
2289
2290                 list_add(&zlist, &ksocknal_data.ksnd_idle_noop_txs);
2291                 list_del_init(&ksocknal_data.ksnd_idle_noop_txs);
2292                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2293
2294                 list_for_each_entry_safe(tx, temp, &zlist, tx_list) {
2295                         list_del(&tx->tx_list);
2296                         LIBCFS_FREE(tx, tx->tx_desc_size);
2297                 }
2298         } else {
2299                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2300         }
2301 }
2302
2303 static void
2304 ksocknal_base_shutdown(void)
2305 {
2306         struct ksock_sched_info *info;
2307         ksock_sched_t *sched;
2308         int i;
2309         int j;
2310
2311         LASSERT(!ksocknal_data.ksnd_nnets);
2312
2313         switch (ksocknal_data.ksnd_init) {
2314         default:
2315                 LASSERT(0);
2316
2317         case SOCKNAL_INIT_ALL:
2318         case SOCKNAL_INIT_DATA:
2319                 LASSERT(ksocknal_data.ksnd_peers);
2320                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2321                         LASSERT(list_empty(&ksocknal_data.ksnd_peers[i]));
2322
2323                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2324                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2325                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2326                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2327                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2328
2329                 if (ksocknal_data.ksnd_sched_info) {
2330                         cfs_percpt_for_each(info, i,
2331                                             ksocknal_data.ksnd_sched_info) {
2332                                 if (!info->ksi_scheds)
2333                                         continue;
2334
2335                                 for (j = 0; j < info->ksi_nthreads_max; j++) {
2336                                         sched = &info->ksi_scheds[j];
2337                                         LASSERT(list_empty(
2338                                                 &sched->kss_tx_conns));
2339                                         LASSERT(list_empty(
2340                                                 &sched->kss_rx_conns));
2341                                         LASSERT(list_empty(
2342                                                 &sched->kss_zombie_noop_txs));
2343                                         LASSERT(!sched->kss_nconns);
2344                                 }
2345                         }
2346                 }
2347
2348                 /* flag threads to terminate; wake and wait for them to die */
2349                 ksocknal_data.ksnd_shuttingdown = 1;
2350                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2351                 wake_up_all(&ksocknal_data.ksnd_reaper_waitq);
2352
2353                 if (ksocknal_data.ksnd_sched_info) {
2354                         cfs_percpt_for_each(info, i,
2355                                             ksocknal_data.ksnd_sched_info) {
2356                                 if (!info->ksi_scheds)
2357                                         continue;
2358
2359                                 for (j = 0; j < info->ksi_nthreads_max; j++) {
2360                                         sched = &info->ksi_scheds[j];
2361                                         wake_up_all(&sched->kss_waitq);
2362                                 }
2363                         }
2364                 }
2365
2366                 i = 4;
2367                 read_lock(&ksocknal_data.ksnd_global_lock);
2368                 while (ksocknal_data.ksnd_nthreads) {
2369                         i++;
2370                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2371                                "waiting for %d threads to terminate\n",
2372                                 ksocknal_data.ksnd_nthreads);
2373                         read_unlock(&ksocknal_data.ksnd_global_lock);
2374                         set_current_state(TASK_UNINTERRUPTIBLE);
2375                         schedule_timeout(cfs_time_seconds(1));
2376                         read_lock(&ksocknal_data.ksnd_global_lock);
2377                 }
2378                 read_unlock(&ksocknal_data.ksnd_global_lock);
2379
2380                 ksocknal_free_buffers();
2381
2382                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2383                 break;
2384         }
2385
2386         module_put(THIS_MODULE);
2387 }
2388
2389 static __u64
2390 ksocknal_new_incarnation(void)
2391 {
2392         /* The incarnation number is the time this module loaded and it
2393          * identifies this particular instance of the socknal.
2394          */
2395         return ktime_get_ns();
2396 }
2397
2398 static int
2399 ksocknal_base_startup(void)
2400 {
2401         struct ksock_sched_info *info;
2402         int rc;
2403         int i;
2404
2405         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2406         LASSERT(!ksocknal_data.ksnd_nnets);
2407
2408         memset(&ksocknal_data, 0, sizeof(ksocknal_data)); /* zero pointers */
2409
2410         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2411         LIBCFS_ALLOC(ksocknal_data.ksnd_peers,
2412                      sizeof(struct list_head) *
2413                      ksocknal_data.ksnd_peer_hash_size);
2414         if (!ksocknal_data.ksnd_peers)
2415                 return -ENOMEM;
2416
2417         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2418                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2419
2420         rwlock_init(&ksocknal_data.ksnd_global_lock);
2421         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2422
2423         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2424         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2425         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2426         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2427         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2428
2429         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2430         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2431         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2432         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2433
2434         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2435         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2436
2437         /* NB memset above zeros whole of ksocknal_data */
2438
2439         /* flag lists/ptrs/locks initialised */
2440         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2441         try_module_get(THIS_MODULE);
2442
2443         ksocknal_data.ksnd_sched_info = cfs_percpt_alloc(lnet_cpt_table(),
2444                                                          sizeof(*info));
2445         if (!ksocknal_data.ksnd_sched_info)
2446                 goto failed;
2447
2448         cfs_percpt_for_each(info, i, ksocknal_data.ksnd_sched_info) {
2449                 ksock_sched_t *sched;
2450                 int nthrs;
2451
2452                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2453                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2454                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2455                 } else {
2456                         /*
2457                          * max to half of CPUs, assume another half should be
2458                          * reserved for upper layer modules
2459                          */
2460                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2461                 }
2462
2463                 info->ksi_nthreads_max = nthrs;
2464                 info->ksi_cpt = i;
2465
2466                 LIBCFS_CPT_ALLOC(info->ksi_scheds, lnet_cpt_table(), i,
2467                                  info->ksi_nthreads_max * sizeof(*sched));
2468                 if (!info->ksi_scheds)
2469                         goto failed;
2470
2471                 for (; nthrs > 0; nthrs--) {
2472                         sched = &info->ksi_scheds[nthrs - 1];
2473
2474                         sched->kss_info = info;
2475                         spin_lock_init(&sched->kss_lock);
2476                         INIT_LIST_HEAD(&sched->kss_rx_conns);
2477                         INIT_LIST_HEAD(&sched->kss_tx_conns);
2478                         INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2479                         init_waitqueue_head(&sched->kss_waitq);
2480                 }
2481         }
2482
2483         ksocknal_data.ksnd_connd_starting       = 0;
2484         ksocknal_data.ksnd_connd_failed_stamp   = 0;
2485         ksocknal_data.ksnd_connd_starting_stamp = ktime_get_real_seconds();
2486         /*
2487          * must have at least 2 connds to remain responsive to accepts while
2488          * connecting
2489          */
2490         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2491                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2492
2493         if (*ksocknal_tunables.ksnd_nconnds_max <
2494             *ksocknal_tunables.ksnd_nconnds) {
2495                 ksocknal_tunables.ksnd_nconnds_max =
2496                         ksocknal_tunables.ksnd_nconnds;
2497         }
2498
2499         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2500                 char name[16];
2501
2502                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2503                 ksocknal_data.ksnd_connd_starting++;
2504                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2505
2506                 snprintf(name, sizeof(name), "socknal_cd%02d", i);
2507                 rc = ksocknal_thread_start(ksocknal_connd,
2508                                            (void *)((ulong_ptr_t)i), name);
2509                 if (rc) {
2510                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2511                         ksocknal_data.ksnd_connd_starting--;
2512                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2513                         CERROR("Can't spawn socknal connd: %d\n", rc);
2514                         goto failed;
2515                 }
2516         }
2517
2518         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2519         if (rc) {
2520                 CERROR("Can't spawn socknal reaper: %d\n", rc);
2521                 goto failed;
2522         }
2523
2524         /* flag everything initialised */
2525         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2526
2527         return 0;
2528
2529  failed:
2530         ksocknal_base_shutdown();
2531         return -ENETDOWN;
2532 }
2533
2534 static void
2535 ksocknal_debug_peerhash(lnet_ni_t *ni)
2536 {
2537         ksock_peer_t *peer = NULL;
2538         struct list_head *tmp;
2539         int i;
2540
2541         read_lock(&ksocknal_data.ksnd_global_lock);
2542
2543         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2544                 list_for_each(tmp, &ksocknal_data.ksnd_peers[i]) {
2545                         peer = list_entry(tmp, ksock_peer_t, ksnp_list);
2546
2547                         if (peer->ksnp_ni == ni)
2548                                 break;
2549
2550                         peer = NULL;
2551                 }
2552         }
2553
2554         if (peer) {
2555                 ksock_route_t *route;
2556                 ksock_conn_t  *conn;
2557
2558                 CWARN("Active peer on shutdown: %s, ref %d, scnt %d, closing %d, accepting %d, err %d, zcookie %llu, txq %d, zc_req %d\n",
2559                       libcfs_id2str(peer->ksnp_id),
2560                       atomic_read(&peer->ksnp_refcount),
2561                       peer->ksnp_sharecount, peer->ksnp_closing,
2562                       peer->ksnp_accepting, peer->ksnp_error,
2563                       peer->ksnp_zc_next_cookie,
2564                       !list_empty(&peer->ksnp_tx_queue),
2565                       !list_empty(&peer->ksnp_zc_req_list));
2566
2567                 list_for_each(tmp, &peer->ksnp_routes) {
2568                         route = list_entry(tmp, ksock_route_t, ksnr_list);
2569                         CWARN("Route: ref %d, schd %d, conn %d, cnted %d, del %d\n",
2570                               atomic_read(&route->ksnr_refcount),
2571                               route->ksnr_scheduled, route->ksnr_connecting,
2572                               route->ksnr_connected, route->ksnr_deleted);
2573                 }
2574
2575                 list_for_each(tmp, &peer->ksnp_conns) {
2576                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
2577                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2578                               atomic_read(&conn->ksnc_conn_refcount),
2579                               atomic_read(&conn->ksnc_sock_refcount),
2580                               conn->ksnc_type, conn->ksnc_closing);
2581                 }
2582         }
2583
2584         read_unlock(&ksocknal_data.ksnd_global_lock);
2585 }
2586
2587 void
2588 ksocknal_shutdown(lnet_ni_t *ni)
2589 {
2590         ksock_net_t *net = ni->ni_data;
2591         int i;
2592         lnet_process_id_t anyid = {0};
2593
2594         anyid.nid = LNET_NID_ANY;
2595         anyid.pid = LNET_PID_ANY;
2596
2597         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2598         LASSERT(ksocknal_data.ksnd_nnets > 0);
2599
2600         spin_lock_bh(&net->ksnn_lock);
2601         net->ksnn_shutdown = 1;          /* prevent new peers */
2602         spin_unlock_bh(&net->ksnn_lock);
2603
2604         /* Delete all peers */
2605         ksocknal_del_peer(ni, anyid, 0);
2606
2607         /* Wait for all peer state to clean up */
2608         i = 2;
2609         spin_lock_bh(&net->ksnn_lock);
2610         while (net->ksnn_npeers) {
2611                 spin_unlock_bh(&net->ksnn_lock);
2612
2613                 i++;
2614                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2615                        "waiting for %d peers to disconnect\n",
2616                        net->ksnn_npeers);
2617                 set_current_state(TASK_UNINTERRUPTIBLE);
2618                 schedule_timeout(cfs_time_seconds(1));
2619
2620                 ksocknal_debug_peerhash(ni);
2621
2622                 spin_lock_bh(&net->ksnn_lock);
2623         }
2624         spin_unlock_bh(&net->ksnn_lock);
2625
2626         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2627                 LASSERT(!net->ksnn_interfaces[i].ksni_npeers);
2628                 LASSERT(!net->ksnn_interfaces[i].ksni_nroutes);
2629         }
2630
2631         list_del(&net->ksnn_list);
2632         LIBCFS_FREE(net, sizeof(*net));
2633
2634         ksocknal_data.ksnd_nnets--;
2635         if (!ksocknal_data.ksnd_nnets)
2636                 ksocknal_base_shutdown();
2637 }
2638
2639 static int
2640 ksocknal_enumerate_interfaces(ksock_net_t *net)
2641 {
2642         char **names;
2643         int i;
2644         int j;
2645         int rc;
2646         int n;
2647
2648         n = lnet_ipif_enumerate(&names);
2649         if (n <= 0) {
2650                 CERROR("Can't enumerate interfaces: %d\n", n);
2651                 return n;
2652         }
2653
2654         for (i = j = 0; i < n; i++) {
2655                 int up;
2656                 __u32 ip;
2657                 __u32 mask;
2658
2659                 if (!strcmp(names[i], "lo")) /* skip the loopback IF */
2660                         continue;
2661
2662                 rc = lnet_ipif_query(names[i], &up, &ip, &mask);
2663                 if (rc) {
2664                         CWARN("Can't get interface %s info: %d\n",
2665                               names[i], rc);
2666                         continue;
2667                 }
2668
2669                 if (!up) {
2670                         CWARN("Ignoring interface %s (down)\n",
2671                               names[i]);
2672                         continue;
2673                 }
2674
2675                 if (j == LNET_MAX_INTERFACES) {
2676                         CWARN("Ignoring interface %s (too many interfaces)\n",
2677                               names[i]);
2678                         continue;
2679                 }
2680
2681                 net->ksnn_interfaces[j].ksni_ipaddr = ip;
2682                 net->ksnn_interfaces[j].ksni_netmask = mask;
2683                 strlcpy(net->ksnn_interfaces[j].ksni_name,
2684                         names[i], sizeof(net->ksnn_interfaces[j].ksni_name));
2685                 j++;
2686         }
2687
2688         lnet_ipif_free_enumeration(names, n);
2689
2690         if (!j)
2691                 CERROR("Can't find any usable interfaces\n");
2692
2693         return j;
2694 }
2695
2696 static int
2697 ksocknal_search_new_ipif(ksock_net_t *net)
2698 {
2699         int new_ipif = 0;
2700         int i;
2701
2702         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2703                 char *ifnam = &net->ksnn_interfaces[i].ksni_name[0];
2704                 char *colon = strchr(ifnam, ':');
2705                 int found  = 0;
2706                 ksock_net_t *tmp;
2707                 int j;
2708
2709                 if (colon) /* ignore alias device */
2710                         *colon = 0;
2711
2712                 list_for_each_entry(tmp, &ksocknal_data.ksnd_nets, ksnn_list) {
2713                         for (j = 0; !found && j < tmp->ksnn_ninterfaces; j++) {
2714                                 char *ifnam2 =
2715                                         &tmp->ksnn_interfaces[j].ksni_name[0];
2716                                 char *colon2 = strchr(ifnam2, ':');
2717
2718                                 if (colon2)
2719                                         *colon2 = 0;
2720
2721                                 found = !strcmp(ifnam, ifnam2);
2722                                 if (colon2)
2723                                         *colon2 = ':';
2724                         }
2725                         if (found)
2726                                 break;
2727                 }
2728
2729                 new_ipif += !found;
2730                 if (colon)
2731                         *colon = ':';
2732         }
2733
2734         return new_ipif;
2735 }
2736
2737 static int
2738 ksocknal_start_schedulers(struct ksock_sched_info *info)
2739 {
2740         int nthrs;
2741         int rc = 0;
2742         int i;
2743
2744         if (!info->ksi_nthreads) {
2745                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2746                         nthrs = info->ksi_nthreads_max;
2747                 } else {
2748                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2749                                                info->ksi_cpt);
2750                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2751                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2752                 }
2753                 nthrs = min(nthrs, info->ksi_nthreads_max);
2754         } else {
2755                 LASSERT(info->ksi_nthreads <= info->ksi_nthreads_max);
2756                 /* increase two threads if there is new interface */
2757                 nthrs = min(2, info->ksi_nthreads_max - info->ksi_nthreads);
2758         }
2759
2760         for (i = 0; i < nthrs; i++) {
2761                 long id;
2762                 char name[20];
2763                 ksock_sched_t *sched;
2764
2765                 id = KSOCK_THREAD_ID(info->ksi_cpt, info->ksi_nthreads + i);
2766                 sched = &info->ksi_scheds[KSOCK_THREAD_SID(id)];
2767                 snprintf(name, sizeof(name), "socknal_sd%02d_%02d",
2768                          info->ksi_cpt, (int)(sched - &info->ksi_scheds[0]));
2769
2770                 rc = ksocknal_thread_start(ksocknal_scheduler,
2771                                            (void *)id, name);
2772                 if (!rc)
2773                         continue;
2774
2775                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2776                        info->ksi_cpt, info->ksi_nthreads + i, rc);
2777                 break;
2778         }
2779
2780         info->ksi_nthreads += i;
2781         return rc;
2782 }
2783
2784 static int
2785 ksocknal_net_start_threads(ksock_net_t *net, __u32 *cpts, int ncpts)
2786 {
2787         int newif = ksocknal_search_new_ipif(net);
2788         int rc;
2789         int i;
2790
2791         LASSERT(ncpts > 0 && ncpts <= cfs_cpt_number(lnet_cpt_table()));
2792
2793         for (i = 0; i < ncpts; i++) {
2794                 struct ksock_sched_info *info;
2795                 int cpt = !cpts ? i : cpts[i];
2796
2797                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2798                 info = ksocknal_data.ksnd_sched_info[cpt];
2799
2800                 if (!newif && info->ksi_nthreads > 0)
2801                         continue;
2802
2803                 rc = ksocknal_start_schedulers(info);
2804                 if (rc)
2805                         return rc;
2806         }
2807         return 0;
2808 }
2809
2810 int
2811 ksocknal_startup(lnet_ni_t *ni)
2812 {
2813         ksock_net_t *net;
2814         int rc;
2815         int i;
2816
2817         LASSERT(ni->ni_lnd == &the_ksocklnd);
2818
2819         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2820                 rc = ksocknal_base_startup();
2821                 if (rc)
2822                         return rc;
2823         }
2824
2825         LIBCFS_ALLOC(net, sizeof(*net));
2826         if (!net)
2827                 goto fail_0;
2828
2829         spin_lock_init(&net->ksnn_lock);
2830         net->ksnn_incarnation = ksocknal_new_incarnation();
2831         ni->ni_data = net;
2832         ni->ni_peertimeout    = *ksocknal_tunables.ksnd_peertimeout;
2833         ni->ni_maxtxcredits   = *ksocknal_tunables.ksnd_credits;
2834         ni->ni_peertxcredits  = *ksocknal_tunables.ksnd_peertxcredits;
2835         ni->ni_peerrtrcredits = *ksocknal_tunables.ksnd_peerrtrcredits;
2836
2837         if (!ni->ni_interfaces[0]) {
2838                 rc = ksocknal_enumerate_interfaces(net);
2839                 if (rc <= 0)
2840                         goto fail_1;
2841
2842                 net->ksnn_ninterfaces = 1;
2843         } else {
2844                 for (i = 0; i < LNET_MAX_INTERFACES; i++) {
2845                         int up;
2846
2847                         if (!ni->ni_interfaces[i])
2848                                 break;
2849
2850                         rc = lnet_ipif_query(ni->ni_interfaces[i], &up,
2851                                              &net->ksnn_interfaces[i].ksni_ipaddr,
2852                                              &net->ksnn_interfaces[i].ksni_netmask);
2853
2854                         if (rc) {
2855                                 CERROR("Can't get interface %s info: %d\n",
2856                                        ni->ni_interfaces[i], rc);
2857                                 goto fail_1;
2858                         }
2859
2860                         if (!up) {
2861                                 CERROR("Interface %s is down\n",
2862                                        ni->ni_interfaces[i]);
2863                                 goto fail_1;
2864                         }
2865
2866                         strlcpy(net->ksnn_interfaces[i].ksni_name,
2867                                 ni->ni_interfaces[i],
2868                                 sizeof(net->ksnn_interfaces[i].ksni_name));
2869                 }
2870                 net->ksnn_ninterfaces = i;
2871         }
2872
2873         /* call it before add it to ksocknal_data.ksnd_nets */
2874         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2875         if (rc)
2876                 goto fail_1;
2877
2878         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
2879                                 net->ksnn_interfaces[0].ksni_ipaddr);
2880         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2881
2882         ksocknal_data.ksnd_nnets++;
2883
2884         return 0;
2885
2886  fail_1:
2887         LIBCFS_FREE(net, sizeof(*net));
2888  fail_0:
2889         if (!ksocknal_data.ksnd_nnets)
2890                 ksocknal_base_shutdown();
2891
2892         return -ENETDOWN;
2893 }
2894
2895 static void __exit ksocklnd_exit(void)
2896 {
2897         lnet_unregister_lnd(&the_ksocklnd);
2898 }
2899
2900 static int __init ksocklnd_init(void)
2901 {
2902         int rc;
2903
2904         /* check ksnr_connected/connecting field large enough */
2905         CLASSERT(SOCKLND_CONN_NTYPES <= 4);
2906         CLASSERT(SOCKLND_CONN_ACK == SOCKLND_CONN_BULK_IN);
2907
2908         /* initialize the_ksocklnd */
2909         the_ksocklnd.lnd_type     = SOCKLND;
2910         the_ksocklnd.lnd_startup  = ksocknal_startup;
2911         the_ksocklnd.lnd_shutdown = ksocknal_shutdown;
2912         the_ksocklnd.lnd_ctl      = ksocknal_ctl;
2913         the_ksocklnd.lnd_send     = ksocknal_send;
2914         the_ksocklnd.lnd_recv     = ksocknal_recv;
2915         the_ksocklnd.lnd_notify   = ksocknal_notify;
2916         the_ksocklnd.lnd_query    = ksocknal_query;
2917         the_ksocklnd.lnd_accept   = ksocknal_accept;
2918
2919         rc = ksocknal_tunables_init();
2920         if (rc)
2921                 return rc;
2922
2923         lnet_register_lnd(&the_ksocklnd);
2924
2925         return 0;
2926 }
2927
2928 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2929 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2930 MODULE_VERSION("2.7.0");
2931 MODULE_LICENSE("GPL");
2932
2933 module_init(ksocklnd_init);
2934 module_exit(ksocklnd_exit);