staging: ramster: disable build in anticipation of renaming
[cascardo/linux.git] / drivers / staging / ramster / ramster / nodemanager.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2004, 2005, 2012 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21
22 #include <linux/slab.h>
23 #include <linux/kernel.h>
24 #include <linux/module.h>
25 #include <linux/configfs.h>
26
27 #include "tcp.h"
28 #include "nodemanager.h"
29 #include "heartbeat.h"
30 #include "masklog.h"
31
32 /* for now we operate under the assertion that there can be only one
33  * cluster active at a time.  Changing this will require trickling
34  * cluster references throughout where nodes are looked up */
35 struct r2nm_cluster *r2nm_single_cluster;
36
37 char *r2nm_fence_method_desc[R2NM_FENCE_METHODS] = {
38                 "reset",        /* R2NM_FENCE_RESET */
39                 "panic",        /* R2NM_FENCE_PANIC */
40 };
41
42 struct r2nm_node *r2nm_get_node_by_num(u8 node_num)
43 {
44         struct r2nm_node *node = NULL;
45
46         if (node_num >= R2NM_MAX_NODES || r2nm_single_cluster == NULL)
47                 goto out;
48
49         read_lock(&r2nm_single_cluster->cl_nodes_lock);
50         node = r2nm_single_cluster->cl_nodes[node_num];
51         if (node)
52                 config_item_get(&node->nd_item);
53         read_unlock(&r2nm_single_cluster->cl_nodes_lock);
54 out:
55         return node;
56 }
57 EXPORT_SYMBOL_GPL(r2nm_get_node_by_num);
58
59 int r2nm_configured_node_map(unsigned long *map, unsigned bytes)
60 {
61         struct r2nm_cluster *cluster = r2nm_single_cluster;
62
63         BUG_ON(bytes < (sizeof(cluster->cl_nodes_bitmap)));
64
65         if (cluster == NULL)
66                 return -EINVAL;
67
68         read_lock(&cluster->cl_nodes_lock);
69         memcpy(map, cluster->cl_nodes_bitmap, sizeof(cluster->cl_nodes_bitmap));
70         read_unlock(&cluster->cl_nodes_lock);
71
72         return 0;
73 }
74 EXPORT_SYMBOL_GPL(r2nm_configured_node_map);
75
76 static struct r2nm_node *r2nm_node_ip_tree_lookup(struct r2nm_cluster *cluster,
77                                                   __be32 ip_needle,
78                                                   struct rb_node ***ret_p,
79                                                   struct rb_node **ret_parent)
80 {
81         struct rb_node **p = &cluster->cl_node_ip_tree.rb_node;
82         struct rb_node *parent = NULL;
83         struct r2nm_node *node, *ret = NULL;
84
85         while (*p) {
86                 int cmp;
87
88                 parent = *p;
89                 node = rb_entry(parent, struct r2nm_node, nd_ip_node);
90
91                 cmp = memcmp(&ip_needle, &node->nd_ipv4_address,
92                                 sizeof(ip_needle));
93                 if (cmp < 0)
94                         p = &(*p)->rb_left;
95                 else if (cmp > 0)
96                         p = &(*p)->rb_right;
97                 else {
98                         ret = node;
99                         break;
100                 }
101         }
102
103         if (ret_p != NULL)
104                 *ret_p = p;
105         if (ret_parent != NULL)
106                 *ret_parent = parent;
107
108         return ret;
109 }
110
111 struct r2nm_node *r2nm_get_node_by_ip(__be32 addr)
112 {
113         struct r2nm_node *node = NULL;
114         struct r2nm_cluster *cluster = r2nm_single_cluster;
115
116         if (cluster == NULL)
117                 goto out;
118
119         read_lock(&cluster->cl_nodes_lock);
120         node = r2nm_node_ip_tree_lookup(cluster, addr, NULL, NULL);
121         if (node)
122                 config_item_get(&node->nd_item);
123         read_unlock(&cluster->cl_nodes_lock);
124
125 out:
126         return node;
127 }
128 EXPORT_SYMBOL_GPL(r2nm_get_node_by_ip);
129
130 void r2nm_node_put(struct r2nm_node *node)
131 {
132         config_item_put(&node->nd_item);
133 }
134 EXPORT_SYMBOL_GPL(r2nm_node_put);
135
136 void r2nm_node_get(struct r2nm_node *node)
137 {
138         config_item_get(&node->nd_item);
139 }
140 EXPORT_SYMBOL_GPL(r2nm_node_get);
141
142 u8 r2nm_this_node(void)
143 {
144         u8 node_num = R2NM_MAX_NODES;
145
146         if (r2nm_single_cluster && r2nm_single_cluster->cl_has_local)
147                 node_num = r2nm_single_cluster->cl_local_node;
148
149         return node_num;
150 }
151 EXPORT_SYMBOL_GPL(r2nm_this_node);
152
153 /* node configfs bits */
154
155 static struct r2nm_cluster *to_r2nm_cluster(struct config_item *item)
156 {
157         return item ?
158                 container_of(to_config_group(item), struct r2nm_cluster,
159                              cl_group)
160                 : NULL;
161 }
162
163 static struct r2nm_node *to_r2nm_node(struct config_item *item)
164 {
165         return item ? container_of(item, struct r2nm_node, nd_item) : NULL;
166 }
167
168 static void r2nm_node_release(struct config_item *item)
169 {
170         struct r2nm_node *node = to_r2nm_node(item);
171         kfree(node);
172 }
173
174 static ssize_t r2nm_node_num_read(struct r2nm_node *node, char *page)
175 {
176         return sprintf(page, "%d\n", node->nd_num);
177 }
178
179 static struct r2nm_cluster *to_r2nm_cluster_from_node(struct r2nm_node *node)
180 {
181         /* through the first node_set .parent
182          * mycluster/nodes/mynode == r2nm_cluster->r2nm_node_group->r2nm_node */
183         return to_r2nm_cluster(node->nd_item.ci_parent->ci_parent);
184 }
185
186 enum {
187         R2NM_NODE_ATTR_NUM = 0,
188         R2NM_NODE_ATTR_PORT,
189         R2NM_NODE_ATTR_ADDRESS,
190         R2NM_NODE_ATTR_LOCAL,
191 };
192
193 static ssize_t r2nm_node_num_write(struct r2nm_node *node, const char *page,
194                                    size_t count)
195 {
196         struct r2nm_cluster *cluster = to_r2nm_cluster_from_node(node);
197         unsigned long tmp;
198         char *p = (char *)page;
199         int err;
200
201         err = kstrtoul(p, 10, &tmp);
202         if (err)
203                 return err;
204
205         if (tmp >= R2NM_MAX_NODES)
206                 return -ERANGE;
207
208         /* once we're in the cl_nodes tree networking can look us up by
209          * node number and try to use our address and port attributes
210          * to connect to this node.. make sure that they've been set
211          * before writing the node attribute? */
212         if (!test_bit(R2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
213             !test_bit(R2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
214                 return -EINVAL; /* XXX */
215
216         write_lock(&cluster->cl_nodes_lock);
217         if (cluster->cl_nodes[tmp])
218                 p = NULL;
219         else  {
220                 cluster->cl_nodes[tmp] = node;
221                 node->nd_num = tmp;
222                 set_bit(tmp, cluster->cl_nodes_bitmap);
223         }
224         write_unlock(&cluster->cl_nodes_lock);
225         if (p == NULL)
226                 return -EEXIST;
227
228         return count;
229 }
230 static ssize_t r2nm_node_ipv4_port_read(struct r2nm_node *node, char *page)
231 {
232         return sprintf(page, "%u\n", ntohs(node->nd_ipv4_port));
233 }
234
235 static ssize_t r2nm_node_ipv4_port_write(struct r2nm_node *node,
236                                          const char *page, size_t count)
237 {
238         unsigned long tmp;
239         char *p = (char *)page;
240         int err;
241
242         err = kstrtoul(p, 10, &tmp);
243         if (err)
244                 return err;
245
246         if (tmp == 0)
247                 return -EINVAL;
248         if (tmp >= (u16)-1)
249                 return -ERANGE;
250
251         node->nd_ipv4_port = htons(tmp);
252
253         return count;
254 }
255
256 static ssize_t r2nm_node_ipv4_address_read(struct r2nm_node *node, char *page)
257 {
258         return sprintf(page, "%pI4\n", &node->nd_ipv4_address);
259 }
260
261 static ssize_t r2nm_node_ipv4_address_write(struct r2nm_node *node,
262                                             const char *page,
263                                             size_t count)
264 {
265         struct r2nm_cluster *cluster = to_r2nm_cluster_from_node(node);
266         int ret, i;
267         struct rb_node **p, *parent;
268         unsigned int octets[4];
269         __be32 ipv4_addr = 0;
270
271         ret = sscanf(page, "%3u.%3u.%3u.%3u", &octets[3], &octets[2],
272                      &octets[1], &octets[0]);
273         if (ret != 4)
274                 return -EINVAL;
275
276         for (i = 0; i < ARRAY_SIZE(octets); i++) {
277                 if (octets[i] > 255)
278                         return -ERANGE;
279                 be32_add_cpu(&ipv4_addr, octets[i] << (i * 8));
280         }
281
282         ret = 0;
283         write_lock(&cluster->cl_nodes_lock);
284         if (r2nm_node_ip_tree_lookup(cluster, ipv4_addr, &p, &parent))
285                 ret = -EEXIST;
286         else {
287                 rb_link_node(&node->nd_ip_node, parent, p);
288                 rb_insert_color(&node->nd_ip_node, &cluster->cl_node_ip_tree);
289         }
290         write_unlock(&cluster->cl_nodes_lock);
291         if (ret)
292                 return ret;
293
294         memcpy(&node->nd_ipv4_address, &ipv4_addr, sizeof(ipv4_addr));
295
296         return count;
297 }
298
299 static ssize_t r2nm_node_local_read(struct r2nm_node *node, char *page)
300 {
301         return sprintf(page, "%d\n", node->nd_local);
302 }
303
304 static ssize_t r2nm_node_local_write(struct r2nm_node *node, const char *page,
305                                      size_t count)
306 {
307         struct r2nm_cluster *cluster = to_r2nm_cluster_from_node(node);
308         unsigned long tmp;
309         char *p = (char *)page;
310         ssize_t ret;
311         int err;
312
313         err = kstrtoul(p, 10, &tmp);
314         if (err)
315                 return err;
316
317         tmp = !!tmp; /* boolean of whether this node wants to be local */
318
319         /* setting local turns on networking rx for now so we require having
320          * set everything else first */
321         if (!test_bit(R2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
322             !test_bit(R2NM_NODE_ATTR_NUM, &node->nd_set_attributes) ||
323             !test_bit(R2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
324                 return -EINVAL; /* XXX */
325
326         /* the only failure case is trying to set a new local node
327          * when a different one is already set */
328         if (tmp && tmp == cluster->cl_has_local &&
329             cluster->cl_local_node != node->nd_num)
330                 return -EBUSY;
331
332         /* bring up the rx thread if we're setting the new local node. */
333         if (tmp && !cluster->cl_has_local) {
334                 ret = r2net_start_listening(node);
335                 if (ret)
336                         return ret;
337         }
338
339         if (!tmp && cluster->cl_has_local &&
340             cluster->cl_local_node == node->nd_num) {
341                 r2net_stop_listening(node);
342                 cluster->cl_local_node = R2NM_INVALID_NODE_NUM;
343         }
344
345         node->nd_local = tmp;
346         if (node->nd_local) {
347                 cluster->cl_has_local = tmp;
348                 cluster->cl_local_node = node->nd_num;
349         }
350
351         return count;
352 }
353
354 struct r2nm_node_attribute {
355         struct configfs_attribute attr;
356         ssize_t (*show)(struct r2nm_node *, char *);
357         ssize_t (*store)(struct r2nm_node *, const char *, size_t);
358 };
359
360 static struct r2nm_node_attribute r2nm_node_attr_num = {
361         .attr   = { .ca_owner = THIS_MODULE,
362                     .ca_name = "num",
363                     .ca_mode = S_IRUGO | S_IWUSR },
364         .show   = r2nm_node_num_read,
365         .store  = r2nm_node_num_write,
366 };
367
368 static struct r2nm_node_attribute r2nm_node_attr_ipv4_port = {
369         .attr   = { .ca_owner = THIS_MODULE,
370                     .ca_name = "ipv4_port",
371                     .ca_mode = S_IRUGO | S_IWUSR },
372         .show   = r2nm_node_ipv4_port_read,
373         .store  = r2nm_node_ipv4_port_write,
374 };
375
376 static struct r2nm_node_attribute r2nm_node_attr_ipv4_address = {
377         .attr   = { .ca_owner = THIS_MODULE,
378                     .ca_name = "ipv4_address",
379                     .ca_mode = S_IRUGO | S_IWUSR },
380         .show   = r2nm_node_ipv4_address_read,
381         .store  = r2nm_node_ipv4_address_write,
382 };
383
384 static struct r2nm_node_attribute r2nm_node_attr_local = {
385         .attr   = { .ca_owner = THIS_MODULE,
386                     .ca_name = "local",
387                     .ca_mode = S_IRUGO | S_IWUSR },
388         .show   = r2nm_node_local_read,
389         .store  = r2nm_node_local_write,
390 };
391
392 static struct configfs_attribute *r2nm_node_attrs[] = {
393         [R2NM_NODE_ATTR_NUM] = &r2nm_node_attr_num.attr,
394         [R2NM_NODE_ATTR_PORT] = &r2nm_node_attr_ipv4_port.attr,
395         [R2NM_NODE_ATTR_ADDRESS] = &r2nm_node_attr_ipv4_address.attr,
396         [R2NM_NODE_ATTR_LOCAL] = &r2nm_node_attr_local.attr,
397         NULL,
398 };
399
400 static int r2nm_attr_index(struct configfs_attribute *attr)
401 {
402         int i;
403         for (i = 0; i < ARRAY_SIZE(r2nm_node_attrs); i++) {
404                 if (attr == r2nm_node_attrs[i])
405                         return i;
406         }
407         BUG();
408         return 0;
409 }
410
411 static ssize_t r2nm_node_show(struct config_item *item,
412                               struct configfs_attribute *attr,
413                               char *page)
414 {
415         struct r2nm_node *node = to_r2nm_node(item);
416         struct r2nm_node_attribute *r2nm_node_attr =
417                 container_of(attr, struct r2nm_node_attribute, attr);
418         ssize_t ret = 0;
419
420         if (r2nm_node_attr->show)
421                 ret = r2nm_node_attr->show(node, page);
422         return ret;
423 }
424
425 static ssize_t r2nm_node_store(struct config_item *item,
426                                struct configfs_attribute *attr,
427                                const char *page, size_t count)
428 {
429         struct r2nm_node *node = to_r2nm_node(item);
430         struct r2nm_node_attribute *r2nm_node_attr =
431                 container_of(attr, struct r2nm_node_attribute, attr);
432         ssize_t ret;
433         int attr_index = r2nm_attr_index(attr);
434
435         if (r2nm_node_attr->store == NULL) {
436                 ret = -EINVAL;
437                 goto out;
438         }
439
440         if (test_bit(attr_index, &node->nd_set_attributes))
441                 return -EBUSY;
442
443         ret = r2nm_node_attr->store(node, page, count);
444         if (ret < count)
445                 goto out;
446
447         set_bit(attr_index, &node->nd_set_attributes);
448 out:
449         return ret;
450 }
451
452 static struct configfs_item_operations r2nm_node_item_ops = {
453         .release                = r2nm_node_release,
454         .show_attribute         = r2nm_node_show,
455         .store_attribute        = r2nm_node_store,
456 };
457
458 static struct config_item_type r2nm_node_type = {
459         .ct_item_ops    = &r2nm_node_item_ops,
460         .ct_attrs       = r2nm_node_attrs,
461         .ct_owner       = THIS_MODULE,
462 };
463
464 /* node set */
465
466 struct r2nm_node_group {
467         struct config_group ns_group;
468         /* some stuff? */
469 };
470
471 #if 0
472 static struct r2nm_node_group *to_r2nm_node_group(struct config_group *group)
473 {
474         return group ?
475                 container_of(group, struct r2nm_node_group, ns_group)
476                 : NULL;
477 }
478 #endif
479
480 struct r2nm_cluster_attribute {
481         struct configfs_attribute attr;
482         ssize_t (*show)(struct r2nm_cluster *, char *);
483         ssize_t (*store)(struct r2nm_cluster *, const char *, size_t);
484 };
485
486 static ssize_t r2nm_cluster_attr_write(const char *page, ssize_t count,
487                                         unsigned int *val)
488 {
489         unsigned long tmp;
490         char *p = (char *)page;
491         int err;
492
493         err = kstrtoul(p, 10, &tmp);
494         if (err)
495                 return err;
496
497         if (tmp == 0)
498                 return -EINVAL;
499         if (tmp >= (u32)-1)
500                 return -ERANGE;
501
502         *val = tmp;
503
504         return count;
505 }
506
507 static ssize_t r2nm_cluster_attr_idle_timeout_ms_read(
508         struct r2nm_cluster *cluster, char *page)
509 {
510         return sprintf(page, "%u\n", cluster->cl_idle_timeout_ms);
511 }
512
513 static ssize_t r2nm_cluster_attr_idle_timeout_ms_write(
514         struct r2nm_cluster *cluster, const char *page, size_t count)
515 {
516         ssize_t ret;
517         unsigned int val = 0;
518
519         ret =  r2nm_cluster_attr_write(page, count, &val);
520
521         if (ret > 0) {
522                 if (cluster->cl_idle_timeout_ms != val
523                         && r2net_num_connected_peers()) {
524                         mlog(ML_NOTICE,
525                              "r2net: cannot change idle timeout after "
526                              "the first peer has agreed to it."
527                              "  %d connected peers\n",
528                              r2net_num_connected_peers());
529                         ret = -EINVAL;
530                 } else if (val <= cluster->cl_keepalive_delay_ms) {
531                         mlog(ML_NOTICE,
532                              "r2net: idle timeout must be larger "
533                              "than keepalive delay\n");
534                         ret = -EINVAL;
535                 } else {
536                         cluster->cl_idle_timeout_ms = val;
537                 }
538         }
539
540         return ret;
541 }
542
543 static ssize_t r2nm_cluster_attr_keepalive_delay_ms_read(
544         struct r2nm_cluster *cluster, char *page)
545 {
546         return sprintf(page, "%u\n", cluster->cl_keepalive_delay_ms);
547 }
548
549 static ssize_t r2nm_cluster_attr_keepalive_delay_ms_write(
550         struct r2nm_cluster *cluster, const char *page, size_t count)
551 {
552         ssize_t ret;
553         unsigned int val = 0;
554
555         ret =  r2nm_cluster_attr_write(page, count, &val);
556
557         if (ret > 0) {
558                 if (cluster->cl_keepalive_delay_ms != val
559                     && r2net_num_connected_peers()) {
560                         mlog(ML_NOTICE,
561                              "r2net: cannot change keepalive delay after"
562                              " the first peer has agreed to it."
563                              "  %d connected peers\n",
564                              r2net_num_connected_peers());
565                         ret = -EINVAL;
566                 } else if (val >= cluster->cl_idle_timeout_ms) {
567                         mlog(ML_NOTICE,
568                              "r2net: keepalive delay must be "
569                              "smaller than idle timeout\n");
570                         ret = -EINVAL;
571                 } else {
572                         cluster->cl_keepalive_delay_ms = val;
573                 }
574         }
575
576         return ret;
577 }
578
579 static ssize_t r2nm_cluster_attr_reconnect_delay_ms_read(
580         struct r2nm_cluster *cluster, char *page)
581 {
582         return sprintf(page, "%u\n", cluster->cl_reconnect_delay_ms);
583 }
584
585 static ssize_t r2nm_cluster_attr_reconnect_delay_ms_write(
586         struct r2nm_cluster *cluster, const char *page, size_t count)
587 {
588         return r2nm_cluster_attr_write(page, count,
589                                         &cluster->cl_reconnect_delay_ms);
590 }
591
592 static ssize_t r2nm_cluster_attr_fence_method_read(
593         struct r2nm_cluster *cluster, char *page)
594 {
595         ssize_t ret = 0;
596
597         if (cluster)
598                 ret = sprintf(page, "%s\n",
599                               r2nm_fence_method_desc[cluster->cl_fence_method]);
600         return ret;
601 }
602
603 static ssize_t r2nm_cluster_attr_fence_method_write(
604         struct r2nm_cluster *cluster, const char *page, size_t count)
605 {
606         unsigned int i;
607
608         if (page[count - 1] != '\n')
609                 goto bail;
610
611         for (i = 0; i < R2NM_FENCE_METHODS; ++i) {
612                 if (count != strlen(r2nm_fence_method_desc[i]) + 1)
613                         continue;
614                 if (strncasecmp(page, r2nm_fence_method_desc[i], count - 1))
615                         continue;
616                 if (cluster->cl_fence_method != i) {
617                         pr_info("ramster: Changing fence method to %s\n",
618                                r2nm_fence_method_desc[i]);
619                         cluster->cl_fence_method = i;
620                 }
621                 return count;
622         }
623
624 bail:
625         return -EINVAL;
626 }
627
628 static struct r2nm_cluster_attribute r2nm_cluster_attr_idle_timeout_ms = {
629         .attr   = { .ca_owner = THIS_MODULE,
630                     .ca_name = "idle_timeout_ms",
631                     .ca_mode = S_IRUGO | S_IWUSR },
632         .show   = r2nm_cluster_attr_idle_timeout_ms_read,
633         .store  = r2nm_cluster_attr_idle_timeout_ms_write,
634 };
635
636 static struct r2nm_cluster_attribute r2nm_cluster_attr_keepalive_delay_ms = {
637         .attr   = { .ca_owner = THIS_MODULE,
638                     .ca_name = "keepalive_delay_ms",
639                     .ca_mode = S_IRUGO | S_IWUSR },
640         .show   = r2nm_cluster_attr_keepalive_delay_ms_read,
641         .store  = r2nm_cluster_attr_keepalive_delay_ms_write,
642 };
643
644 static struct r2nm_cluster_attribute r2nm_cluster_attr_reconnect_delay_ms = {
645         .attr   = { .ca_owner = THIS_MODULE,
646                     .ca_name = "reconnect_delay_ms",
647                     .ca_mode = S_IRUGO | S_IWUSR },
648         .show   = r2nm_cluster_attr_reconnect_delay_ms_read,
649         .store  = r2nm_cluster_attr_reconnect_delay_ms_write,
650 };
651
652 static struct r2nm_cluster_attribute r2nm_cluster_attr_fence_method = {
653         .attr   = { .ca_owner = THIS_MODULE,
654                     .ca_name = "fence_method",
655                     .ca_mode = S_IRUGO | S_IWUSR },
656         .show   = r2nm_cluster_attr_fence_method_read,
657         .store  = r2nm_cluster_attr_fence_method_write,
658 };
659
660 static struct configfs_attribute *r2nm_cluster_attrs[] = {
661         &r2nm_cluster_attr_idle_timeout_ms.attr,
662         &r2nm_cluster_attr_keepalive_delay_ms.attr,
663         &r2nm_cluster_attr_reconnect_delay_ms.attr,
664         &r2nm_cluster_attr_fence_method.attr,
665         NULL,
666 };
667 static ssize_t r2nm_cluster_show(struct config_item *item,
668                                         struct configfs_attribute *attr,
669                                         char *page)
670 {
671         struct r2nm_cluster *cluster = to_r2nm_cluster(item);
672         struct r2nm_cluster_attribute *r2nm_cluster_attr =
673                 container_of(attr, struct r2nm_cluster_attribute, attr);
674         ssize_t ret = 0;
675
676         if (r2nm_cluster_attr->show)
677                 ret = r2nm_cluster_attr->show(cluster, page);
678         return ret;
679 }
680
681 static ssize_t r2nm_cluster_store(struct config_item *item,
682                                         struct configfs_attribute *attr,
683                                         const char *page, size_t count)
684 {
685         struct r2nm_cluster *cluster = to_r2nm_cluster(item);
686         struct r2nm_cluster_attribute *r2nm_cluster_attr =
687                 container_of(attr, struct r2nm_cluster_attribute, attr);
688         ssize_t ret;
689
690         if (r2nm_cluster_attr->store == NULL) {
691                 ret = -EINVAL;
692                 goto out;
693         }
694
695         ret = r2nm_cluster_attr->store(cluster, page, count);
696         if (ret < count)
697                 goto out;
698 out:
699         return ret;
700 }
701
702 static struct config_item *r2nm_node_group_make_item(struct config_group *group,
703                                                      const char *name)
704 {
705         struct r2nm_node *node = NULL;
706
707         if (strlen(name) > R2NM_MAX_NAME_LEN)
708                 return ERR_PTR(-ENAMETOOLONG);
709
710         node = kzalloc(sizeof(struct r2nm_node), GFP_KERNEL);
711         if (node == NULL)
712                 return ERR_PTR(-ENOMEM);
713
714         strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
715         config_item_init_type_name(&node->nd_item, name, &r2nm_node_type);
716         spin_lock_init(&node->nd_lock);
717
718         mlog(ML_CLUSTER, "r2nm: Registering node %s\n", name);
719
720         return &node->nd_item;
721 }
722
723 static void r2nm_node_group_drop_item(struct config_group *group,
724                                       struct config_item *item)
725 {
726         struct r2nm_node *node = to_r2nm_node(item);
727         struct r2nm_cluster *cluster =
728                                 to_r2nm_cluster(group->cg_item.ci_parent);
729
730         r2net_disconnect_node(node);
731
732         if (cluster->cl_has_local &&
733             (cluster->cl_local_node == node->nd_num)) {
734                 cluster->cl_has_local = 0;
735                 cluster->cl_local_node = R2NM_INVALID_NODE_NUM;
736                 r2net_stop_listening(node);
737         }
738
739         /* XXX call into net to stop this node from trading messages */
740
741         write_lock(&cluster->cl_nodes_lock);
742
743         /* XXX sloppy */
744         if (node->nd_ipv4_address)
745                 rb_erase(&node->nd_ip_node, &cluster->cl_node_ip_tree);
746
747         /* nd_num might be 0 if the node number hasn't been set.. */
748         if (cluster->cl_nodes[node->nd_num] == node) {
749                 cluster->cl_nodes[node->nd_num] = NULL;
750                 clear_bit(node->nd_num, cluster->cl_nodes_bitmap);
751         }
752         write_unlock(&cluster->cl_nodes_lock);
753
754         mlog(ML_CLUSTER, "r2nm: Unregistered node %s\n",
755              config_item_name(&node->nd_item));
756
757         config_item_put(item);
758 }
759
760 static struct configfs_group_operations r2nm_node_group_group_ops = {
761         .make_item      = r2nm_node_group_make_item,
762         .drop_item      = r2nm_node_group_drop_item,
763 };
764
765 static struct config_item_type r2nm_node_group_type = {
766         .ct_group_ops   = &r2nm_node_group_group_ops,
767         .ct_owner       = THIS_MODULE,
768 };
769
770 /* cluster */
771
772 static void r2nm_cluster_release(struct config_item *item)
773 {
774         struct r2nm_cluster *cluster = to_r2nm_cluster(item);
775
776         kfree(cluster->cl_group.default_groups);
777         kfree(cluster);
778 }
779
780 static struct configfs_item_operations r2nm_cluster_item_ops = {
781         .release        = r2nm_cluster_release,
782         .show_attribute         = r2nm_cluster_show,
783         .store_attribute        = r2nm_cluster_store,
784 };
785
786 static struct config_item_type r2nm_cluster_type = {
787         .ct_item_ops    = &r2nm_cluster_item_ops,
788         .ct_attrs       = r2nm_cluster_attrs,
789         .ct_owner       = THIS_MODULE,
790 };
791
792 /* cluster set */
793
794 struct r2nm_cluster_group {
795         struct configfs_subsystem cs_subsys;
796         /* some stuff? */
797 };
798
799 #if 0
800 static struct r2nm_cluster_group *
801 to_r2nm_cluster_group(struct config_group *group)
802 {
803         return group ?
804                 container_of(to_configfs_subsystem(group),
805                                 struct r2nm_cluster_group, cs_subsys)
806                : NULL;
807 }
808 #endif
809
810 static struct config_group *
811 r2nm_cluster_group_make_group(struct config_group *group,
812                                                           const char *name)
813 {
814         struct r2nm_cluster *cluster = NULL;
815         struct r2nm_node_group *ns = NULL;
816         struct config_group *r2hb_group = NULL, *ret = NULL;
817         void *defs = NULL;
818
819         /* this runs under the parent dir's i_mutex; there can be only
820          * one caller in here at a time */
821         if (r2nm_single_cluster)
822                 return ERR_PTR(-ENOSPC);
823
824         cluster = kzalloc(sizeof(struct r2nm_cluster), GFP_KERNEL);
825         ns = kzalloc(sizeof(struct r2nm_node_group), GFP_KERNEL);
826         defs = kcalloc(3, sizeof(struct config_group *), GFP_KERNEL);
827         r2hb_group = r2hb_alloc_hb_set();
828         if (cluster == NULL || ns == NULL || r2hb_group == NULL || defs == NULL)
829                 goto out;
830
831         config_group_init_type_name(&cluster->cl_group, name,
832                                     &r2nm_cluster_type);
833         config_group_init_type_name(&ns->ns_group, "node",
834                                     &r2nm_node_group_type);
835
836         cluster->cl_group.default_groups = defs;
837         cluster->cl_group.default_groups[0] = &ns->ns_group;
838         cluster->cl_group.default_groups[1] = r2hb_group;
839         cluster->cl_group.default_groups[2] = NULL;
840         rwlock_init(&cluster->cl_nodes_lock);
841         cluster->cl_node_ip_tree = RB_ROOT;
842         cluster->cl_reconnect_delay_ms = R2NET_RECONNECT_DELAY_MS_DEFAULT;
843         cluster->cl_idle_timeout_ms    = R2NET_IDLE_TIMEOUT_MS_DEFAULT;
844         cluster->cl_keepalive_delay_ms = R2NET_KEEPALIVE_DELAY_MS_DEFAULT;
845         cluster->cl_fence_method       = R2NM_FENCE_RESET;
846
847         ret = &cluster->cl_group;
848         r2nm_single_cluster = cluster;
849
850 out:
851         if (ret == NULL) {
852                 kfree(cluster);
853                 kfree(ns);
854                 r2hb_free_hb_set(r2hb_group);
855                 kfree(defs);
856                 ret = ERR_PTR(-ENOMEM);
857         }
858
859         return ret;
860 }
861
862 static void r2nm_cluster_group_drop_item(struct config_group *group,
863                                                 struct config_item *item)
864 {
865         struct r2nm_cluster *cluster = to_r2nm_cluster(item);
866         int i;
867         struct config_item *killme;
868
869         BUG_ON(r2nm_single_cluster != cluster);
870         r2nm_single_cluster = NULL;
871
872         for (i = 0; cluster->cl_group.default_groups[i]; i++) {
873                 killme = &cluster->cl_group.default_groups[i]->cg_item;
874                 cluster->cl_group.default_groups[i] = NULL;
875                 config_item_put(killme);
876         }
877
878         config_item_put(item);
879 }
880
881 static struct configfs_group_operations r2nm_cluster_group_group_ops = {
882         .make_group     = r2nm_cluster_group_make_group,
883         .drop_item      = r2nm_cluster_group_drop_item,
884 };
885
886 static struct config_item_type r2nm_cluster_group_type = {
887         .ct_group_ops   = &r2nm_cluster_group_group_ops,
888         .ct_owner       = THIS_MODULE,
889 };
890
891 static struct r2nm_cluster_group r2nm_cluster_group = {
892         .cs_subsys = {
893                 .su_group = {
894                         .cg_item = {
895                                 .ci_namebuf = "cluster",
896                                 .ci_type = &r2nm_cluster_group_type,
897                         },
898                 },
899         },
900 };
901
902 int r2nm_depend_item(struct config_item *item)
903 {
904         return configfs_depend_item(&r2nm_cluster_group.cs_subsys, item);
905 }
906
907 void r2nm_undepend_item(struct config_item *item)
908 {
909         configfs_undepend_item(&r2nm_cluster_group.cs_subsys, item);
910 }
911
912 int r2nm_depend_this_node(void)
913 {
914         int ret = 0;
915         struct r2nm_node *local_node;
916
917         local_node = r2nm_get_node_by_num(r2nm_this_node());
918         if (!local_node) {
919                 ret = -EINVAL;
920                 goto out;
921         }
922
923         ret = r2nm_depend_item(&local_node->nd_item);
924         r2nm_node_put(local_node);
925
926 out:
927         return ret;
928 }
929
930 void r2nm_undepend_this_node(void)
931 {
932         struct r2nm_node *local_node;
933
934         local_node = r2nm_get_node_by_num(r2nm_this_node());
935         BUG_ON(!local_node);
936
937         r2nm_undepend_item(&local_node->nd_item);
938         r2nm_node_put(local_node);
939 }
940
941
942 static void __exit exit_r2nm(void)
943 {
944         /* XXX sync with hb callbacks and shut down hb? */
945         r2net_unregister_hb_callbacks();
946         configfs_unregister_subsystem(&r2nm_cluster_group.cs_subsys);
947
948         r2net_exit();
949         r2hb_exit();
950 }
951
952 static int __init init_r2nm(void)
953 {
954         int ret = -1;
955
956         ret = r2hb_init();
957         if (ret)
958                 goto out;
959
960         ret = r2net_init();
961         if (ret)
962                 goto out_r2hb;
963
964         ret = r2net_register_hb_callbacks();
965         if (ret)
966                 goto out_r2net;
967
968         config_group_init(&r2nm_cluster_group.cs_subsys.su_group);
969         mutex_init(&r2nm_cluster_group.cs_subsys.su_mutex);
970         ret = configfs_register_subsystem(&r2nm_cluster_group.cs_subsys);
971         if (ret) {
972                 pr_err("nodemanager: Registration returned %d\n", ret);
973                 goto out_callbacks;
974         }
975
976         if (!ret)
977                 goto out;
978
979         configfs_unregister_subsystem(&r2nm_cluster_group.cs_subsys);
980 out_callbacks:
981         r2net_unregister_hb_callbacks();
982 out_r2net:
983         r2net_exit();
984 out_r2hb:
985         r2hb_exit();
986 out:
987         return ret;
988 }
989
990 MODULE_AUTHOR("Oracle");
991 MODULE_LICENSE("GPL");
992
993 /* module_init(init_r2nm) */
994 late_initcall(init_r2nm);
995 /* module_exit(exit_r2nm) */