1 /* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
4 * Copyright (C) 2004, 2005 Oracle. All rights reserved.
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public
17 * License along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA.
22 #include <linux/kernel.h>
23 #include <linux/sched.h>
24 #include <linux/jiffies.h>
25 #include <linux/module.h>
27 #include <linux/bio.h>
28 #include <linux/blkdev.h>
29 #include <linux/delay.h>
30 #include <linux/file.h>
31 #include <linux/kthread.h>
32 #include <linux/configfs.h>
33 #include <linux/random.h>
34 #include <linux/crc32.h>
35 #include <linux/time.h>
36 #include <linux/debugfs.h>
37 #include <linux/slab.h>
38 #include <linux/bitmap.h>
39 #include <linux/ktime.h>
40 #include "heartbeat.h"
42 #include "nodemanager.h"
49 * The first heartbeat pass had one global thread that would serialize all hb
50 * callback calls. This global serializing sem should only be removed once
51 * we've made sure that all callees can deal with being called concurrently
52 * from multiple hb region threads.
54 static DECLARE_RWSEM(o2hb_callback_sem);
57 * multiple hb threads are watching multiple regions. A node is live
58 * whenever any of the threads sees activity from the node in its region.
60 static DEFINE_SPINLOCK(o2hb_live_lock);
61 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
62 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
63 static LIST_HEAD(o2hb_node_events);
64 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
67 * In global heartbeat, we maintain a series of region bitmaps.
68 * - o2hb_region_bitmap allows us to limit the region number to max region.
69 * - o2hb_live_region_bitmap tracks live regions (seen steady iterations).
70 * - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
72 * - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
74 static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
75 static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
76 static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
77 static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
79 #define O2HB_DB_TYPE_LIVENODES 0
80 #define O2HB_DB_TYPE_LIVEREGIONS 1
81 #define O2HB_DB_TYPE_QUORUMREGIONS 2
82 #define O2HB_DB_TYPE_FAILEDREGIONS 3
83 #define O2HB_DB_TYPE_REGION_LIVENODES 4
84 #define O2HB_DB_TYPE_REGION_NUMBER 5
85 #define O2HB_DB_TYPE_REGION_ELAPSED_TIME 6
86 #define O2HB_DB_TYPE_REGION_PINNED 7
87 struct o2hb_debug_buf {
94 static struct o2hb_debug_buf *o2hb_db_livenodes;
95 static struct o2hb_debug_buf *o2hb_db_liveregions;
96 static struct o2hb_debug_buf *o2hb_db_quorumregions;
97 static struct o2hb_debug_buf *o2hb_db_failedregions;
99 #define O2HB_DEBUG_DIR "o2hb"
100 #define O2HB_DEBUG_LIVENODES "livenodes"
101 #define O2HB_DEBUG_LIVEREGIONS "live_regions"
102 #define O2HB_DEBUG_QUORUMREGIONS "quorum_regions"
103 #define O2HB_DEBUG_FAILEDREGIONS "failed_regions"
104 #define O2HB_DEBUG_REGION_NUMBER "num"
105 #define O2HB_DEBUG_REGION_ELAPSED_TIME "elapsed_time_in_ms"
106 #define O2HB_DEBUG_REGION_PINNED "pinned"
108 static struct dentry *o2hb_debug_dir;
109 static struct dentry *o2hb_debug_livenodes;
110 static struct dentry *o2hb_debug_liveregions;
111 static struct dentry *o2hb_debug_quorumregions;
112 static struct dentry *o2hb_debug_failedregions;
114 static LIST_HEAD(o2hb_all_regions);
116 static struct o2hb_callback {
117 struct list_head list;
118 } o2hb_callbacks[O2HB_NUM_CB];
120 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
122 #define O2HB_DEFAULT_BLOCK_BITS 9
124 enum o2hb_heartbeat_modes {
125 O2HB_HEARTBEAT_LOCAL = 0,
126 O2HB_HEARTBEAT_GLOBAL,
127 O2HB_HEARTBEAT_NUM_MODES,
130 char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
131 "local", /* O2HB_HEARTBEAT_LOCAL */
132 "global", /* O2HB_HEARTBEAT_GLOBAL */
135 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
136 unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
139 * o2hb_dependent_users tracks the number of registered callbacks that depend
140 * on heartbeat. o2net and o2dlm are two entities that register this callback.
141 * However only o2dlm depends on the heartbeat. It does not want the heartbeat
142 * to stop while a dlm domain is still active.
144 unsigned int o2hb_dependent_users;
147 * In global heartbeat mode, all regions are pinned if there are one or more
148 * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
149 * regions are unpinned if the region count exceeds the cut off or the number
150 * of dependent users falls to zero.
152 #define O2HB_PIN_CUT_OFF 3
155 * In local heartbeat mode, we assume the dlm domain name to be the same as
156 * region uuid. This is true for domains created for the file system but not
157 * necessarily true for userdlm domains. This is a known limitation.
159 * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
160 * works for both file system and userdlm domains.
162 static int o2hb_region_pin(const char *region_uuid);
163 static void o2hb_region_unpin(const char *region_uuid);
165 /* Only sets a new threshold if there are no active regions.
167 * No locking or otherwise interesting code is required for reading
168 * o2hb_dead_threshold as it can't change once regions are active and
169 * it's not interesting to anyone until then anyway. */
170 static void o2hb_dead_threshold_set(unsigned int threshold)
172 if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
173 spin_lock(&o2hb_live_lock);
174 if (list_empty(&o2hb_all_regions))
175 o2hb_dead_threshold = threshold;
176 spin_unlock(&o2hb_live_lock);
180 static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode)
184 if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
185 spin_lock(&o2hb_live_lock);
186 if (list_empty(&o2hb_all_regions)) {
187 o2hb_heartbeat_mode = hb_mode;
190 spin_unlock(&o2hb_live_lock);
196 struct o2hb_node_event {
197 struct list_head hn_item;
198 enum o2hb_callback_type hn_event_type;
199 struct o2nm_node *hn_node;
203 struct o2hb_disk_slot {
204 struct o2hb_disk_heartbeat_block *ds_raw_block;
207 u64 ds_last_generation;
208 u16 ds_equal_samples;
209 u16 ds_changed_samples;
210 struct list_head ds_live_item;
213 /* each thread owns a region.. when we're asked to tear down the region
214 * we ask the thread to stop, who cleans up the region */
216 struct config_item hr_item;
218 struct list_head hr_all_item;
219 unsigned hr_unclean_stop:1,
225 /* protected by the hr_callback_sem */
226 struct task_struct *hr_task;
228 unsigned int hr_blocks;
229 unsigned long long hr_start_block;
231 unsigned int hr_block_bits;
232 unsigned int hr_block_bytes;
234 unsigned int hr_slots_per_page;
235 unsigned int hr_num_pages;
237 struct page **hr_slot_data;
238 struct block_device *hr_bdev;
239 struct o2hb_disk_slot *hr_slots;
241 /* live node map of this region */
242 unsigned long hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
243 unsigned int hr_region_num;
245 struct dentry *hr_debug_dir;
246 struct dentry *hr_debug_livenodes;
247 struct dentry *hr_debug_regnum;
248 struct dentry *hr_debug_elapsed_time;
249 struct dentry *hr_debug_pinned;
250 struct o2hb_debug_buf *hr_db_livenodes;
251 struct o2hb_debug_buf *hr_db_regnum;
252 struct o2hb_debug_buf *hr_db_elapsed_time;
253 struct o2hb_debug_buf *hr_db_pinned;
255 /* let the person setting up hb wait for it to return until it
256 * has reached a 'steady' state. This will be fixed when we have
257 * a more complete api that doesn't lead to this sort of fragility. */
258 atomic_t hr_steady_iterations;
260 /* terminate o2hb thread if it does not reach steady state
261 * (hr_steady_iterations == 0) within hr_unsteady_iterations */
262 atomic_t hr_unsteady_iterations;
264 char hr_dev_name[BDEVNAME_SIZE];
266 unsigned int hr_timeout_ms;
268 /* randomized as the region goes up and down so that a node
269 * recognizes a node going up and down in one iteration */
272 struct delayed_work hr_write_timeout_work;
273 unsigned long hr_last_timeout_start;
275 /* negotiate timer, used to negotiate extending hb timeout. */
276 struct delayed_work hr_nego_timeout_work;
277 unsigned long hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
279 /* Used during o2hb_check_slot to hold a copy of the block
280 * being checked because we temporarily have to zero out the
282 struct o2hb_disk_heartbeat_block *hr_tmp_block;
284 /* Message key for negotiate timeout message. */
286 struct list_head hr_handler_list;
289 struct o2hb_bio_wait_ctxt {
290 atomic_t wc_num_reqs;
291 struct completion wc_io_complete;
296 O2HB_NEGO_TIMEOUT_MSG = 1,
299 struct o2hb_nego_msg {
303 static void o2hb_write_timeout(struct work_struct *work)
306 struct o2hb_region *reg =
307 container_of(work, struct o2hb_region,
308 hr_write_timeout_work.work);
310 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
311 "milliseconds\n", reg->hr_dev_name,
312 jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
314 if (o2hb_global_heartbeat_active()) {
315 spin_lock(&o2hb_live_lock);
316 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
317 set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
318 failed = bitmap_weight(o2hb_failed_region_bitmap,
320 quorum = bitmap_weight(o2hb_quorum_region_bitmap,
322 spin_unlock(&o2hb_live_lock);
324 mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
328 * Fence if the number of failed regions >= half the number
331 if ((failed << 1) < quorum)
335 o2quo_disk_timeout();
338 static void o2hb_arm_timeout(struct o2hb_region *reg)
340 /* Arm writeout only after thread reaches steady state */
341 if (atomic_read(®->hr_steady_iterations) != 0)
344 mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
345 O2HB_MAX_WRITE_TIMEOUT_MS);
347 if (o2hb_global_heartbeat_active()) {
348 spin_lock(&o2hb_live_lock);
349 clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
350 spin_unlock(&o2hb_live_lock);
352 cancel_delayed_work(®->hr_write_timeout_work);
353 reg->hr_last_timeout_start = jiffies;
354 schedule_delayed_work(®->hr_write_timeout_work,
355 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
357 cancel_delayed_work(®->hr_nego_timeout_work);
358 /* negotiate timeout must be less than write timeout. */
359 schedule_delayed_work(®->hr_nego_timeout_work,
360 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS)/2);
361 memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap));
364 static void o2hb_disarm_timeout(struct o2hb_region *reg)
366 cancel_delayed_work_sync(®->hr_write_timeout_work);
367 cancel_delayed_work_sync(®->hr_nego_timeout_work);
370 static int o2hb_send_nego_msg(int key, int type, u8 target)
372 struct o2hb_nego_msg msg;
375 msg.node_num = o2nm_this_node();
377 ret = o2net_send_message(type, key, &msg, sizeof(msg),
380 if (ret == -EAGAIN || ret == -ENOMEM) {
388 static void o2hb_nego_timeout(struct work_struct *work)
390 unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
392 struct o2hb_region *reg;
394 reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work);
395 o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
396 /* lowest node as master node to make negotiate decision. */
397 master_node = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, 0);
399 if (master_node == o2nm_this_node()) {
400 set_bit(master_node, reg->hr_nego_node_bitmap);
401 if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap,
402 sizeof(reg->hr_nego_node_bitmap))) {
403 /* check negotiate bitmap every second to do timeout
406 schedule_delayed_work(®->hr_nego_timeout_work,
407 msecs_to_jiffies(1000));
412 /* approve negotiate timeout request. */
414 /* negotiate timeout with master node. */
415 o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG,
420 static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data,
423 struct o2hb_region *reg = data;
424 struct o2hb_nego_msg *nego_msg;
426 nego_msg = (struct o2hb_nego_msg *)msg->buf;
427 if (nego_msg->node_num < O2NM_MAX_NODES)
428 set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap);
430 mlog(ML_ERROR, "got nego timeout message from bad node.\n");
435 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
437 atomic_set(&wc->wc_num_reqs, 1);
438 init_completion(&wc->wc_io_complete);
442 /* Used in error paths too */
443 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
446 /* sadly atomic_sub_and_test() isn't available on all platforms. The
447 * good news is that the fast path only completes one at a time */
449 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
451 complete(&wc->wc_io_complete);
456 static void o2hb_wait_on_io(struct o2hb_region *reg,
457 struct o2hb_bio_wait_ctxt *wc)
459 o2hb_bio_wait_dec(wc, 1);
460 wait_for_completion(&wc->wc_io_complete);
463 static void o2hb_bio_end_io(struct bio *bio)
465 struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
468 mlog(ML_ERROR, "IO Error %d\n", bio->bi_error);
469 wc->wc_error = bio->bi_error;
472 o2hb_bio_wait_dec(wc, 1);
476 /* Setup a Bio to cover I/O against num_slots slots starting at
478 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
479 struct o2hb_bio_wait_ctxt *wc,
480 unsigned int *current_slot,
481 unsigned int max_slots)
483 int len, current_page;
484 unsigned int vec_len, vec_start;
485 unsigned int bits = reg->hr_block_bits;
486 unsigned int spp = reg->hr_slots_per_page;
487 unsigned int cs = *current_slot;
491 /* Testing has shown this allocation to take long enough under
492 * GFP_KERNEL that the local node can get fenced. It would be
493 * nicest if we could pre-allocate these bios and avoid this
495 bio = bio_alloc(GFP_ATOMIC, 16);
497 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
498 bio = ERR_PTR(-ENOMEM);
502 /* Must put everything in 512 byte sectors for the bio... */
503 bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9);
504 bio->bi_bdev = reg->hr_bdev;
505 bio->bi_private = wc;
506 bio->bi_end_io = o2hb_bio_end_io;
508 vec_start = (cs << bits) % PAGE_SIZE;
509 while(cs < max_slots) {
510 current_page = cs / spp;
511 page = reg->hr_slot_data[current_page];
513 vec_len = min(PAGE_SIZE - vec_start,
514 (max_slots-cs) * (PAGE_SIZE/spp) );
516 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
517 current_page, vec_len, vec_start);
519 len = bio_add_page(bio, page, vec_len, vec_start);
520 if (len != vec_len) break;
522 cs += vec_len / (PAGE_SIZE/spp);
531 static int o2hb_read_slots(struct o2hb_region *reg,
532 unsigned int max_slots)
534 unsigned int current_slot=0;
536 struct o2hb_bio_wait_ctxt wc;
539 o2hb_bio_wait_init(&wc);
541 while(current_slot < max_slots) {
542 bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots);
544 status = PTR_ERR(bio);
549 atomic_inc(&wc.wc_num_reqs);
550 submit_bio(READ, bio);
556 o2hb_wait_on_io(reg, &wc);
557 if (wc.wc_error && !status)
558 status = wc.wc_error;
563 static int o2hb_issue_node_write(struct o2hb_region *reg,
564 struct o2hb_bio_wait_ctxt *write_wc)
570 o2hb_bio_wait_init(write_wc);
572 slot = o2nm_this_node();
574 bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
576 status = PTR_ERR(bio);
581 atomic_inc(&write_wc->wc_num_reqs);
582 submit_bio(WRITE_SYNC, bio);
589 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
590 struct o2hb_disk_heartbeat_block *hb_block)
595 /* We want to compute the block crc with a 0 value in the
596 * hb_cksum field. Save it off here and replace after the
598 old_cksum = hb_block->hb_cksum;
599 hb_block->hb_cksum = 0;
601 ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
603 hb_block->hb_cksum = old_cksum;
608 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
610 mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
611 "cksum = 0x%x, generation 0x%llx\n",
612 (long long)le64_to_cpu(hb_block->hb_seq),
613 hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
614 (long long)le64_to_cpu(hb_block->hb_generation));
617 static int o2hb_verify_crc(struct o2hb_region *reg,
618 struct o2hb_disk_heartbeat_block *hb_block)
622 read = le32_to_cpu(hb_block->hb_cksum);
623 computed = o2hb_compute_block_crc_le(reg, hb_block);
625 return read == computed;
629 * Compare the slot data with what we wrote in the last iteration.
630 * If the match fails, print an appropriate error message. This is to
631 * detect errors like... another node hearting on the same slot,
632 * flaky device that is losing writes, etc.
633 * Returns 1 if check succeeds, 0 otherwise.
635 static int o2hb_check_own_slot(struct o2hb_region *reg)
637 struct o2hb_disk_slot *slot;
638 struct o2hb_disk_heartbeat_block *hb_block;
641 slot = ®->hr_slots[o2nm_this_node()];
642 /* Don't check on our 1st timestamp */
643 if (!slot->ds_last_time)
646 hb_block = slot->ds_raw_block;
647 if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
648 le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
649 hb_block->hb_node == slot->ds_node_num)
652 #define ERRSTR1 "Another node is heartbeating on device"
653 #define ERRSTR2 "Heartbeat generation mismatch on device"
654 #define ERRSTR3 "Heartbeat sequence mismatch on device"
656 if (hb_block->hb_node != slot->ds_node_num)
658 else if (le64_to_cpu(hb_block->hb_generation) !=
659 slot->ds_last_generation)
664 mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
665 "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
666 slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
667 (unsigned long long)slot->ds_last_time, hb_block->hb_node,
668 (unsigned long long)le64_to_cpu(hb_block->hb_generation),
669 (unsigned long long)le64_to_cpu(hb_block->hb_seq));
674 static inline void o2hb_prepare_block(struct o2hb_region *reg,
679 struct o2hb_disk_slot *slot;
680 struct o2hb_disk_heartbeat_block *hb_block;
682 node_num = o2nm_this_node();
683 slot = ®->hr_slots[node_num];
685 hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
686 memset(hb_block, 0, reg->hr_block_bytes);
687 /* TODO: time stuff */
688 cputime = CURRENT_TIME.tv_sec;
692 hb_block->hb_seq = cpu_to_le64(cputime);
693 hb_block->hb_node = node_num;
694 hb_block->hb_generation = cpu_to_le64(generation);
695 hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
697 /* This step must always happen last! */
698 hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
701 mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
702 (long long)generation,
703 le32_to_cpu(hb_block->hb_cksum));
706 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
707 struct o2nm_node *node,
710 struct o2hb_callback_func *f;
712 list_for_each_entry(f, &hbcall->list, hc_item) {
713 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
714 (f->hc_func)(node, idx, f->hc_data);
718 /* Will run the list in order until we process the passed event */
719 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
721 struct o2hb_callback *hbcall;
722 struct o2hb_node_event *event;
724 /* Holding callback sem assures we don't alter the callback
725 * lists when doing this, and serializes ourselves with other
726 * processes wanting callbacks. */
727 down_write(&o2hb_callback_sem);
729 spin_lock(&o2hb_live_lock);
730 while (!list_empty(&o2hb_node_events)
731 && !list_empty(&queued_event->hn_item)) {
732 event = list_entry(o2hb_node_events.next,
733 struct o2hb_node_event,
735 list_del_init(&event->hn_item);
736 spin_unlock(&o2hb_live_lock);
738 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
739 event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
742 hbcall = hbcall_from_type(event->hn_event_type);
744 /* We should *never* have gotten on to the list with a
745 * bad type... This isn't something that we should try
746 * to recover from. */
747 BUG_ON(IS_ERR(hbcall));
749 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
751 spin_lock(&o2hb_live_lock);
753 spin_unlock(&o2hb_live_lock);
755 up_write(&o2hb_callback_sem);
758 static void o2hb_queue_node_event(struct o2hb_node_event *event,
759 enum o2hb_callback_type type,
760 struct o2nm_node *node,
763 assert_spin_locked(&o2hb_live_lock);
765 BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
767 event->hn_event_type = type;
768 event->hn_node = node;
769 event->hn_node_num = node_num;
771 mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
772 type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
774 list_add_tail(&event->hn_item, &o2hb_node_events);
777 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
779 struct o2hb_node_event event =
780 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
781 struct o2nm_node *node;
784 node = o2nm_get_node_by_num(slot->ds_node_num);
788 spin_lock(&o2hb_live_lock);
789 if (!list_empty(&slot->ds_live_item)) {
790 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
793 list_del_init(&slot->ds_live_item);
795 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
796 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
798 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
803 spin_unlock(&o2hb_live_lock);
806 o2hb_run_event_list(&event);
811 static void o2hb_set_quorum_device(struct o2hb_region *reg)
813 if (!o2hb_global_heartbeat_active())
816 /* Prevent race with o2hb_heartbeat_group_drop_item() */
817 if (kthread_should_stop())
820 /* Tag region as quorum only after thread reaches steady state */
821 if (atomic_read(®->hr_steady_iterations) != 0)
824 spin_lock(&o2hb_live_lock);
826 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
830 * A region can be added to the quorum only when it sees all
831 * live nodes heartbeat on it. In other words, the region has been
832 * added to all nodes.
834 if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
835 sizeof(o2hb_live_node_bitmap)))
838 printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
839 config_item_name(®->hr_item), reg->hr_dev_name);
841 set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
844 * If global heartbeat active, unpin all regions if the
845 * region count > CUT_OFF
847 if (bitmap_weight(o2hb_quorum_region_bitmap,
848 O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
849 o2hb_region_unpin(NULL);
851 spin_unlock(&o2hb_live_lock);
854 static int o2hb_check_slot(struct o2hb_region *reg,
855 struct o2hb_disk_slot *slot)
857 int changed = 0, gen_changed = 0;
858 struct o2hb_node_event event =
859 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
860 struct o2nm_node *node;
861 struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
863 unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
864 unsigned int slot_dead_ms;
868 memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
871 * If a node is no longer configured but is still in the livemap, we
872 * may need to clear that bit from the livemap.
874 node = o2nm_get_node_by_num(slot->ds_node_num);
876 spin_lock(&o2hb_live_lock);
877 tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
878 spin_unlock(&o2hb_live_lock);
883 if (!o2hb_verify_crc(reg, hb_block)) {
884 /* all paths from here will drop o2hb_live_lock for
886 spin_lock(&o2hb_live_lock);
888 /* Don't print an error on the console in this case -
889 * a freshly formatted heartbeat area will not have a
891 if (list_empty(&slot->ds_live_item))
894 /* The node is live but pushed out a bad crc. We
895 * consider it a transient miss but don't populate any
896 * other values as they may be junk. */
897 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
898 slot->ds_node_num, reg->hr_dev_name);
899 o2hb_dump_slot(hb_block);
901 slot->ds_equal_samples++;
905 /* we don't care if these wrap.. the state transitions below
906 * clear at the right places */
907 cputime = le64_to_cpu(hb_block->hb_seq);
908 if (slot->ds_last_time != cputime)
909 slot->ds_changed_samples++;
911 slot->ds_equal_samples++;
912 slot->ds_last_time = cputime;
914 /* The node changed heartbeat generations. We assume this to
915 * mean it dropped off but came back before we timed out. We
916 * want to consider it down for the time being but don't want
917 * to lose any changed_samples state we might build up to
918 * considering it live again. */
919 if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
921 slot->ds_equal_samples = 0;
922 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
923 "to 0x%llx)\n", slot->ds_node_num,
924 (long long)slot->ds_last_generation,
925 (long long)le64_to_cpu(hb_block->hb_generation));
928 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
930 mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
931 "seq %llu last %llu changed %u equal %u\n",
932 slot->ds_node_num, (long long)slot->ds_last_generation,
933 le32_to_cpu(hb_block->hb_cksum),
934 (unsigned long long)le64_to_cpu(hb_block->hb_seq),
935 (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
936 slot->ds_equal_samples);
938 spin_lock(&o2hb_live_lock);
941 /* dead nodes only come to life after some number of
942 * changes at any time during their dead time */
943 if (list_empty(&slot->ds_live_item) &&
944 slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
945 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
946 slot->ds_node_num, (long long)slot->ds_last_generation);
948 set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
950 /* first on the list generates a callback */
951 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
952 mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
953 "bitmap\n", slot->ds_node_num);
954 set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
956 o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
963 list_add_tail(&slot->ds_live_item,
964 &o2hb_live_slots[slot->ds_node_num]);
966 slot->ds_equal_samples = 0;
968 /* We want to be sure that all nodes agree on the
969 * number of milliseconds before a node will be
970 * considered dead. The self-fencing timeout is
971 * computed from this value, and a discrepancy might
972 * result in heartbeat calling a node dead when it
973 * hasn't self-fenced yet. */
974 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
975 if (slot_dead_ms && slot_dead_ms != dead_ms) {
976 /* TODO: Perhaps we can fail the region here. */
977 mlog(ML_ERROR, "Node %d on device %s has a dead count "
978 "of %u ms, but our count is %u ms.\n"
979 "Please double check your configuration values "
980 "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
981 slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
987 /* if the list is dead, we're done.. */
988 if (list_empty(&slot->ds_live_item))
991 /* live nodes only go dead after enough consequtive missed
992 * samples.. reset the missed counter whenever we see
994 if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
995 mlog(ML_HEARTBEAT, "Node %d left my region\n",
998 clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1000 /* last off the live_slot generates a callback */
1001 list_del_init(&slot->ds_live_item);
1002 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1003 mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
1004 "nodes bitmap\n", slot->ds_node_num);
1005 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1007 /* node can be null */
1008 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
1009 node, slot->ds_node_num);
1015 /* We don't clear this because the node is still
1016 * actually writing new blocks. */
1018 slot->ds_changed_samples = 0;
1021 if (slot->ds_changed_samples) {
1022 slot->ds_changed_samples = 0;
1023 slot->ds_equal_samples = 0;
1026 spin_unlock(&o2hb_live_lock);
1029 o2hb_run_event_list(&event);
1032 o2nm_node_put(node);
1036 static int o2hb_highest_node(unsigned long *nodes, int numbits)
1038 return find_last_bit(nodes, numbits);
1041 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
1043 int i, ret, highest_node;
1044 int membership_change = 0, own_slot_ok = 0;
1045 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
1046 unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1047 struct o2hb_bio_wait_ctxt write_wc;
1049 ret = o2nm_configured_node_map(configured_nodes,
1050 sizeof(configured_nodes));
1057 * If a node is not configured but is in the livemap, we still need
1058 * to read the slot so as to be able to remove it from the livemap.
1060 o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
1062 while ((i = find_next_bit(live_node_bitmap,
1063 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1064 set_bit(i, configured_nodes);
1067 highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
1068 if (highest_node >= O2NM_MAX_NODES) {
1069 mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
1074 /* No sense in reading the slots of nodes that don't exist
1075 * yet. Of course, if the node definitions have holes in them
1076 * then we're reading an empty slot anyway... Consider this
1078 ret = o2hb_read_slots(reg, highest_node + 1);
1084 /* With an up to date view of the slots, we can check that no
1085 * other node has been improperly configured to heartbeat in
1087 own_slot_ok = o2hb_check_own_slot(reg);
1089 /* fill in the proper info for our next heartbeat */
1090 o2hb_prepare_block(reg, reg->hr_generation);
1092 ret = o2hb_issue_node_write(reg, &write_wc);
1099 while((i = find_next_bit(configured_nodes,
1100 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1101 membership_change |= o2hb_check_slot(reg, ®->hr_slots[i]);
1105 * We have to be sure we've advertised ourselves on disk
1106 * before we can go to steady state. This ensures that
1107 * people we find in our steady state have seen us.
1109 o2hb_wait_on_io(reg, &write_wc);
1110 if (write_wc.wc_error) {
1111 /* Do not re-arm the write timeout on I/O error - we
1112 * can't be sure that the new block ever made it to
1114 mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
1115 write_wc.wc_error, reg->hr_dev_name);
1116 ret = write_wc.wc_error;
1120 /* Skip disarming the timeout if own slot has stale/bad data */
1122 o2hb_set_quorum_device(reg);
1123 o2hb_arm_timeout(reg);
1127 /* let the person who launched us know when things are steady */
1128 if (atomic_read(®->hr_steady_iterations) != 0) {
1129 if (!ret && own_slot_ok && !membership_change) {
1130 if (atomic_dec_and_test(®->hr_steady_iterations))
1131 wake_up(&o2hb_steady_queue);
1135 if (atomic_read(®->hr_steady_iterations) != 0) {
1136 if (atomic_dec_and_test(®->hr_unsteady_iterations)) {
1137 printk(KERN_NOTICE "o2hb: Unable to stabilize "
1138 "heartbeart on region %s (%s)\n",
1139 config_item_name(®->hr_item),
1141 atomic_set(®->hr_steady_iterations, 0);
1142 reg->hr_aborted_start = 1;
1143 wake_up(&o2hb_steady_queue);
1152 * we ride the region ref that the region dir holds. before the region
1153 * dir is removed and drops it ref it will wait to tear down this
1156 static int o2hb_thread(void *data)
1159 struct o2hb_region *reg = data;
1160 struct o2hb_bio_wait_ctxt write_wc;
1161 ktime_t before_hb, after_hb;
1162 unsigned int elapsed_msec;
1164 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
1166 set_user_nice(current, MIN_NICE);
1169 ret = o2nm_depend_this_node();
1171 mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret);
1172 reg->hr_node_deleted = 1;
1173 wake_up(&o2hb_steady_queue);
1177 while (!kthread_should_stop() &&
1178 !reg->hr_unclean_stop && !reg->hr_aborted_start) {
1179 /* We track the time spent inside
1180 * o2hb_do_disk_heartbeat so that we avoid more than
1181 * hr_timeout_ms between disk writes. On busy systems
1182 * this should result in a heartbeat which is less
1183 * likely to time itself out. */
1184 before_hb = ktime_get_real();
1186 ret = o2hb_do_disk_heartbeat(reg);
1188 after_hb = ktime_get_real();
1190 elapsed_msec = (unsigned int)
1191 ktime_ms_delta(after_hb, before_hb);
1194 "start = %lld, end = %lld, msec = %u, ret = %d\n",
1195 before_hb.tv64, after_hb.tv64, elapsed_msec, ret);
1197 if (!kthread_should_stop() &&
1198 elapsed_msec < reg->hr_timeout_ms) {
1199 /* the kthread api has blocked signals for us so no
1200 * need to record the return value. */
1201 msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
1205 o2hb_disarm_timeout(reg);
1207 /* unclean stop is only used in very bad situation */
1208 for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
1209 o2hb_shutdown_slot(®->hr_slots[i]);
1211 /* Explicit down notification - avoid forcing the other nodes
1212 * to timeout on this region when we could just as easily
1213 * write a clear generation - thus indicating to them that
1214 * this node has left this region.
1216 if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
1217 o2hb_prepare_block(reg, 0);
1218 ret = o2hb_issue_node_write(reg, &write_wc);
1220 o2hb_wait_on_io(reg, &write_wc);
1226 o2nm_undepend_this_node();
1228 mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
1233 #ifdef CONFIG_DEBUG_FS
1234 static int o2hb_debug_open(struct inode *inode, struct file *file)
1236 struct o2hb_debug_buf *db = inode->i_private;
1237 struct o2hb_region *reg;
1238 unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1244 /* max_nodes should be the largest bitmap we pass here */
1245 BUG_ON(sizeof(map) < db->db_size);
1247 buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1251 switch (db->db_type) {
1252 case O2HB_DB_TYPE_LIVENODES:
1253 case O2HB_DB_TYPE_LIVEREGIONS:
1254 case O2HB_DB_TYPE_QUORUMREGIONS:
1255 case O2HB_DB_TYPE_FAILEDREGIONS:
1256 spin_lock(&o2hb_live_lock);
1257 memcpy(map, db->db_data, db->db_size);
1258 spin_unlock(&o2hb_live_lock);
1261 case O2HB_DB_TYPE_REGION_LIVENODES:
1262 spin_lock(&o2hb_live_lock);
1263 reg = (struct o2hb_region *)db->db_data;
1264 memcpy(map, reg->hr_live_node_bitmap, db->db_size);
1265 spin_unlock(&o2hb_live_lock);
1268 case O2HB_DB_TYPE_REGION_NUMBER:
1269 reg = (struct o2hb_region *)db->db_data;
1270 out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
1271 reg->hr_region_num);
1274 case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
1275 reg = (struct o2hb_region *)db->db_data;
1276 lts = reg->hr_last_timeout_start;
1277 /* If 0, it has never been set before */
1279 lts = jiffies_to_msecs(jiffies - lts);
1280 out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
1283 case O2HB_DB_TYPE_REGION_PINNED:
1284 reg = (struct o2hb_region *)db->db_data;
1285 out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
1286 !!reg->hr_item_pinned);
1293 while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
1294 out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
1295 out += snprintf(buf + out, PAGE_SIZE - out, "\n");
1298 i_size_write(inode, out);
1300 file->private_data = buf;
1307 static int o2hb_debug_release(struct inode *inode, struct file *file)
1309 kfree(file->private_data);
1313 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1314 size_t nbytes, loff_t *ppos)
1316 return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
1317 i_size_read(file->f_mapping->host));
1320 static int o2hb_debug_open(struct inode *inode, struct file *file)
1324 static int o2hb_debug_release(struct inode *inode, struct file *file)
1328 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1329 size_t nbytes, loff_t *ppos)
1333 #endif /* CONFIG_DEBUG_FS */
1335 static const struct file_operations o2hb_debug_fops = {
1336 .open = o2hb_debug_open,
1337 .release = o2hb_debug_release,
1338 .read = o2hb_debug_read,
1339 .llseek = generic_file_llseek,
1342 void o2hb_exit(void)
1344 debugfs_remove(o2hb_debug_failedregions);
1345 debugfs_remove(o2hb_debug_quorumregions);
1346 debugfs_remove(o2hb_debug_liveregions);
1347 debugfs_remove(o2hb_debug_livenodes);
1348 debugfs_remove(o2hb_debug_dir);
1349 kfree(o2hb_db_livenodes);
1350 kfree(o2hb_db_liveregions);
1351 kfree(o2hb_db_quorumregions);
1352 kfree(o2hb_db_failedregions);
1355 static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir,
1356 struct o2hb_debug_buf **db, int db_len,
1357 int type, int size, int len, void *data)
1359 *db = kmalloc(db_len, GFP_KERNEL);
1363 (*db)->db_type = type;
1364 (*db)->db_size = size;
1365 (*db)->db_len = len;
1366 (*db)->db_data = data;
1368 return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db,
1372 static int o2hb_debug_init(void)
1376 o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
1377 if (!o2hb_debug_dir) {
1382 o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES,
1385 sizeof(*o2hb_db_livenodes),
1386 O2HB_DB_TYPE_LIVENODES,
1387 sizeof(o2hb_live_node_bitmap),
1389 o2hb_live_node_bitmap);
1390 if (!o2hb_debug_livenodes) {
1395 o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS,
1397 &o2hb_db_liveregions,
1398 sizeof(*o2hb_db_liveregions),
1399 O2HB_DB_TYPE_LIVEREGIONS,
1400 sizeof(o2hb_live_region_bitmap),
1402 o2hb_live_region_bitmap);
1403 if (!o2hb_debug_liveregions) {
1408 o2hb_debug_quorumregions =
1409 o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS,
1411 &o2hb_db_quorumregions,
1412 sizeof(*o2hb_db_quorumregions),
1413 O2HB_DB_TYPE_QUORUMREGIONS,
1414 sizeof(o2hb_quorum_region_bitmap),
1416 o2hb_quorum_region_bitmap);
1417 if (!o2hb_debug_quorumregions) {
1422 o2hb_debug_failedregions =
1423 o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS,
1425 &o2hb_db_failedregions,
1426 sizeof(*o2hb_db_failedregions),
1427 O2HB_DB_TYPE_FAILEDREGIONS,
1428 sizeof(o2hb_failed_region_bitmap),
1430 o2hb_failed_region_bitmap);
1431 if (!o2hb_debug_failedregions) {
1448 for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1449 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1451 for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1452 INIT_LIST_HEAD(&o2hb_live_slots[i]);
1454 INIT_LIST_HEAD(&o2hb_node_events);
1456 memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
1457 memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
1458 memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
1459 memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
1460 memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
1462 o2hb_dependent_users = 0;
1464 return o2hb_debug_init();
1467 /* if we're already in a callback then we're already serialized by the sem */
1468 static void o2hb_fill_node_map_from_callback(unsigned long *map,
1471 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1473 memcpy(map, &o2hb_live_node_bitmap, bytes);
1477 * get a map of all nodes that are heartbeating in any regions
1479 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1481 /* callers want to serialize this map and callbacks so that they
1482 * can trust that they don't miss nodes coming to the party */
1483 down_read(&o2hb_callback_sem);
1484 spin_lock(&o2hb_live_lock);
1485 o2hb_fill_node_map_from_callback(map, bytes);
1486 spin_unlock(&o2hb_live_lock);
1487 up_read(&o2hb_callback_sem);
1489 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1492 * heartbeat configfs bits. The heartbeat set is a default set under
1493 * the cluster set in nodemanager.c.
1496 static struct o2hb_region *to_o2hb_region(struct config_item *item)
1498 return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1501 /* drop_item only drops its ref after killing the thread, nothing should
1502 * be using the region anymore. this has to clean up any state that
1503 * attributes might have built up. */
1504 static void o2hb_region_release(struct config_item *item)
1508 struct o2hb_region *reg = to_o2hb_region(item);
1510 mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
1512 kfree(reg->hr_tmp_block);
1514 if (reg->hr_slot_data) {
1515 for (i = 0; i < reg->hr_num_pages; i++) {
1516 page = reg->hr_slot_data[i];
1520 kfree(reg->hr_slot_data);
1524 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1526 kfree(reg->hr_slots);
1528 debugfs_remove(reg->hr_debug_livenodes);
1529 debugfs_remove(reg->hr_debug_regnum);
1530 debugfs_remove(reg->hr_debug_elapsed_time);
1531 debugfs_remove(reg->hr_debug_pinned);
1532 debugfs_remove(reg->hr_debug_dir);
1533 kfree(reg->hr_db_livenodes);
1534 kfree(reg->hr_db_regnum);
1535 kfree(reg->hr_db_elapsed_time);
1536 kfree(reg->hr_db_pinned);
1538 spin_lock(&o2hb_live_lock);
1539 list_del(®->hr_all_item);
1540 spin_unlock(&o2hb_live_lock);
1542 o2net_unregister_handler_list(®->hr_handler_list);
1546 static int o2hb_read_block_input(struct o2hb_region *reg,
1548 unsigned long *ret_bytes,
1549 unsigned int *ret_bits)
1551 unsigned long bytes;
1552 char *p = (char *)page;
1554 bytes = simple_strtoul(p, &p, 0);
1555 if (!p || (*p && (*p != '\n')))
1558 /* Heartbeat and fs min / max block sizes are the same. */
1559 if (bytes > 4096 || bytes < 512)
1561 if (hweight16(bytes) != 1)
1567 *ret_bits = ffs(bytes) - 1;
1572 static ssize_t o2hb_region_block_bytes_show(struct config_item *item,
1575 return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes);
1578 static ssize_t o2hb_region_block_bytes_store(struct config_item *item,
1582 struct o2hb_region *reg = to_o2hb_region(item);
1584 unsigned long block_bytes;
1585 unsigned int block_bits;
1590 status = o2hb_read_block_input(reg, page, &block_bytes,
1595 reg->hr_block_bytes = (unsigned int)block_bytes;
1596 reg->hr_block_bits = block_bits;
1601 static ssize_t o2hb_region_start_block_show(struct config_item *item,
1604 return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block);
1607 static ssize_t o2hb_region_start_block_store(struct config_item *item,
1611 struct o2hb_region *reg = to_o2hb_region(item);
1612 unsigned long long tmp;
1613 char *p = (char *)page;
1618 tmp = simple_strtoull(p, &p, 0);
1619 if (!p || (*p && (*p != '\n')))
1622 reg->hr_start_block = tmp;
1627 static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page)
1629 return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks);
1632 static ssize_t o2hb_region_blocks_store(struct config_item *item,
1636 struct o2hb_region *reg = to_o2hb_region(item);
1638 char *p = (char *)page;
1643 tmp = simple_strtoul(p, &p, 0);
1644 if (!p || (*p && (*p != '\n')))
1647 if (tmp > O2NM_MAX_NODES || tmp == 0)
1650 reg->hr_blocks = (unsigned int)tmp;
1655 static ssize_t o2hb_region_dev_show(struct config_item *item, char *page)
1657 unsigned int ret = 0;
1659 if (to_o2hb_region(item)->hr_bdev)
1660 ret = sprintf(page, "%s\n", to_o2hb_region(item)->hr_dev_name);
1665 static void o2hb_init_region_params(struct o2hb_region *reg)
1667 reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits;
1668 reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1670 mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1671 reg->hr_start_block, reg->hr_blocks);
1672 mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1673 reg->hr_block_bytes, reg->hr_block_bits);
1674 mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1675 mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1678 static int o2hb_map_slot_data(struct o2hb_region *reg)
1681 unsigned int last_slot;
1682 unsigned int spp = reg->hr_slots_per_page;
1685 struct o2hb_disk_slot *slot;
1687 reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1688 if (reg->hr_tmp_block == NULL)
1691 reg->hr_slots = kcalloc(reg->hr_blocks,
1692 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1693 if (reg->hr_slots == NULL)
1696 for(i = 0; i < reg->hr_blocks; i++) {
1697 slot = ®->hr_slots[i];
1698 slot->ds_node_num = i;
1699 INIT_LIST_HEAD(&slot->ds_live_item);
1700 slot->ds_raw_block = NULL;
1703 reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1704 mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1705 "at %u blocks per page\n",
1706 reg->hr_num_pages, reg->hr_blocks, spp);
1708 reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1710 if (!reg->hr_slot_data)
1713 for(i = 0; i < reg->hr_num_pages; i++) {
1714 page = alloc_page(GFP_KERNEL);
1718 reg->hr_slot_data[i] = page;
1720 last_slot = i * spp;
1721 raw = page_address(page);
1723 (j < spp) && ((j + last_slot) < reg->hr_blocks);
1725 BUG_ON((j + last_slot) >= reg->hr_blocks);
1727 slot = ®->hr_slots[j + last_slot];
1728 slot->ds_raw_block =
1729 (struct o2hb_disk_heartbeat_block *) raw;
1731 raw += reg->hr_block_bytes;
1738 /* Read in all the slots available and populate the tracking
1739 * structures so that we can start with a baseline idea of what's
1741 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1744 struct o2hb_disk_slot *slot;
1745 struct o2hb_disk_heartbeat_block *hb_block;
1747 ret = o2hb_read_slots(reg, reg->hr_blocks);
1751 /* We only want to get an idea of the values initially in each
1752 * slot, so we do no verification - o2hb_check_slot will
1753 * actually determine if each configured slot is valid and
1754 * whether any values have changed. */
1755 for(i = 0; i < reg->hr_blocks; i++) {
1756 slot = ®->hr_slots[i];
1757 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1759 /* Only fill the values that o2hb_check_slot uses to
1760 * determine changing slots */
1761 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1762 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1769 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1770 static ssize_t o2hb_region_dev_store(struct config_item *item,
1774 struct o2hb_region *reg = to_o2hb_region(item);
1775 struct task_struct *hb_task;
1778 char *p = (char *)page;
1780 struct inode *inode;
1781 ssize_t ret = -EINVAL;
1787 /* We can't heartbeat without having had our node number
1788 * configured yet. */
1789 if (o2nm_this_node() == O2NM_MAX_NODES)
1792 fd = simple_strtol(p, &p, 0);
1793 if (!p || (*p && (*p != '\n')))
1796 if (fd < 0 || fd >= INT_MAX)
1803 if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1804 reg->hr_block_bytes == 0)
1807 inode = igrab(f.file->f_mapping->host);
1811 if (!S_ISBLK(inode->i_mode))
1814 reg->hr_bdev = I_BDEV(f.file->f_mapping->host);
1815 ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL);
1817 reg->hr_bdev = NULL;
1822 bdevname(reg->hr_bdev, reg->hr_dev_name);
1824 sectsize = bdev_logical_block_size(reg->hr_bdev);
1825 if (sectsize != reg->hr_block_bytes) {
1827 "blocksize %u incorrect for device, expected %d",
1828 reg->hr_block_bytes, sectsize);
1833 o2hb_init_region_params(reg);
1835 /* Generation of zero is invalid */
1837 get_random_bytes(®->hr_generation,
1838 sizeof(reg->hr_generation));
1839 } while (reg->hr_generation == 0);
1841 ret = o2hb_map_slot_data(reg);
1847 ret = o2hb_populate_slot_data(reg);
1853 INIT_DELAYED_WORK(®->hr_write_timeout_work, o2hb_write_timeout);
1854 INIT_DELAYED_WORK(®->hr_nego_timeout_work, o2hb_nego_timeout);
1857 * A node is considered live after it has beat LIVE_THRESHOLD
1858 * times. We're not steady until we've given them a chance
1859 * _after_ our first read.
1860 * The default threshold is bare minimum so as to limit the delay
1861 * during mounts. For global heartbeat, the threshold doubled for the
1864 live_threshold = O2HB_LIVE_THRESHOLD;
1865 if (o2hb_global_heartbeat_active()) {
1866 spin_lock(&o2hb_live_lock);
1867 if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
1868 live_threshold <<= 1;
1869 spin_unlock(&o2hb_live_lock);
1872 atomic_set(®->hr_steady_iterations, live_threshold);
1873 /* unsteady_iterations is triple the steady_iterations */
1874 atomic_set(®->hr_unsteady_iterations, (live_threshold * 3));
1876 hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1877 reg->hr_item.ci_name);
1878 if (IS_ERR(hb_task)) {
1879 ret = PTR_ERR(hb_task);
1884 spin_lock(&o2hb_live_lock);
1885 reg->hr_task = hb_task;
1886 spin_unlock(&o2hb_live_lock);
1888 ret = wait_event_interruptible(o2hb_steady_queue,
1889 atomic_read(®->hr_steady_iterations) == 0 ||
1890 reg->hr_node_deleted);
1892 atomic_set(®->hr_steady_iterations, 0);
1893 reg->hr_aborted_start = 1;
1896 if (reg->hr_aborted_start) {
1901 if (reg->hr_node_deleted) {
1906 /* Ok, we were woken. Make sure it wasn't by drop_item() */
1907 spin_lock(&o2hb_live_lock);
1908 hb_task = reg->hr_task;
1909 if (o2hb_global_heartbeat_active())
1910 set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
1911 spin_unlock(&o2hb_live_lock);
1918 if (hb_task && o2hb_global_heartbeat_active())
1919 printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
1920 config_item_name(®->hr_item), reg->hr_dev_name);
1929 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1930 reg->hr_bdev = NULL;
1936 static ssize_t o2hb_region_pid_show(struct config_item *item, char *page)
1938 struct o2hb_region *reg = to_o2hb_region(item);
1941 spin_lock(&o2hb_live_lock);
1943 pid = task_pid_nr(reg->hr_task);
1944 spin_unlock(&o2hb_live_lock);
1949 return sprintf(page, "%u\n", pid);
1952 CONFIGFS_ATTR(o2hb_region_, block_bytes);
1953 CONFIGFS_ATTR(o2hb_region_, start_block);
1954 CONFIGFS_ATTR(o2hb_region_, blocks);
1955 CONFIGFS_ATTR(o2hb_region_, dev);
1956 CONFIGFS_ATTR_RO(o2hb_region_, pid);
1958 static struct configfs_attribute *o2hb_region_attrs[] = {
1959 &o2hb_region_attr_block_bytes,
1960 &o2hb_region_attr_start_block,
1961 &o2hb_region_attr_blocks,
1962 &o2hb_region_attr_dev,
1963 &o2hb_region_attr_pid,
1967 static struct configfs_item_operations o2hb_region_item_ops = {
1968 .release = o2hb_region_release,
1971 static struct config_item_type o2hb_region_type = {
1972 .ct_item_ops = &o2hb_region_item_ops,
1973 .ct_attrs = o2hb_region_attrs,
1974 .ct_owner = THIS_MODULE,
1979 struct o2hb_heartbeat_group {
1980 struct config_group hs_group;
1984 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1987 container_of(group, struct o2hb_heartbeat_group, hs_group)
1991 static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
1996 debugfs_create_dir(config_item_name(®->hr_item), dir);
1997 if (!reg->hr_debug_dir) {
2002 reg->hr_debug_livenodes =
2003 o2hb_debug_create(O2HB_DEBUG_LIVENODES,
2005 &(reg->hr_db_livenodes),
2006 sizeof(*(reg->hr_db_livenodes)),
2007 O2HB_DB_TYPE_REGION_LIVENODES,
2008 sizeof(reg->hr_live_node_bitmap),
2009 O2NM_MAX_NODES, reg);
2010 if (!reg->hr_debug_livenodes) {
2015 reg->hr_debug_regnum =
2016 o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER,
2018 &(reg->hr_db_regnum),
2019 sizeof(*(reg->hr_db_regnum)),
2020 O2HB_DB_TYPE_REGION_NUMBER,
2021 0, O2NM_MAX_NODES, reg);
2022 if (!reg->hr_debug_regnum) {
2027 reg->hr_debug_elapsed_time =
2028 o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME,
2030 &(reg->hr_db_elapsed_time),
2031 sizeof(*(reg->hr_db_elapsed_time)),
2032 O2HB_DB_TYPE_REGION_ELAPSED_TIME,
2034 if (!reg->hr_debug_elapsed_time) {
2039 reg->hr_debug_pinned =
2040 o2hb_debug_create(O2HB_DEBUG_REGION_PINNED,
2042 &(reg->hr_db_pinned),
2043 sizeof(*(reg->hr_db_pinned)),
2044 O2HB_DB_TYPE_REGION_PINNED,
2046 if (!reg->hr_debug_pinned) {
2056 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
2059 struct o2hb_region *reg = NULL;
2062 reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
2064 return ERR_PTR(-ENOMEM);
2066 if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
2067 ret = -ENAMETOOLONG;
2071 spin_lock(&o2hb_live_lock);
2072 reg->hr_region_num = 0;
2073 if (o2hb_global_heartbeat_active()) {
2074 reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
2076 if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
2077 spin_unlock(&o2hb_live_lock);
2081 set_bit(reg->hr_region_num, o2hb_region_bitmap);
2083 list_add_tail(®->hr_all_item, &o2hb_all_regions);
2084 spin_unlock(&o2hb_live_lock);
2086 config_item_init_type_name(®->hr_item, name, &o2hb_region_type);
2088 /* this is the same way to generate msg key as dlm, for local heartbeat,
2089 * name is also the same, so make initial crc value different to avoid
2090 * message key conflict.
2092 reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS,
2093 name, strlen(name));
2094 INIT_LIST_HEAD(®->hr_handler_list);
2095 ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key,
2096 sizeof(struct o2hb_nego_msg),
2097 o2hb_nego_timeout_handler,
2098 reg, NULL, ®->hr_handler_list);
2102 ret = o2hb_debug_region_init(reg, o2hb_debug_dir);
2104 config_item_put(®->hr_item);
2105 goto unregister_handler;
2108 return ®->hr_item;
2111 o2net_unregister_handler_list(®->hr_handler_list);
2114 return ERR_PTR(ret);
2117 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
2118 struct config_item *item)
2120 struct task_struct *hb_task;
2121 struct o2hb_region *reg = to_o2hb_region(item);
2122 int quorum_region = 0;
2124 /* stop the thread when the user removes the region dir */
2125 spin_lock(&o2hb_live_lock);
2126 hb_task = reg->hr_task;
2127 reg->hr_task = NULL;
2128 reg->hr_item_dropped = 1;
2129 spin_unlock(&o2hb_live_lock);
2132 kthread_stop(hb_task);
2134 if (o2hb_global_heartbeat_active()) {
2135 spin_lock(&o2hb_live_lock);
2136 clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2137 clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
2138 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
2140 clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
2141 spin_unlock(&o2hb_live_lock);
2142 printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
2143 ((atomic_read(®->hr_steady_iterations) == 0) ?
2144 "stopped" : "start aborted"), config_item_name(item),
2149 * If we're racing a dev_write(), we need to wake them. They will
2150 * check reg->hr_task
2152 if (atomic_read(®->hr_steady_iterations) != 0) {
2153 reg->hr_aborted_start = 1;
2154 atomic_set(®->hr_steady_iterations, 0);
2155 wake_up(&o2hb_steady_queue);
2158 config_item_put(item);
2160 if (!o2hb_global_heartbeat_active() || !quorum_region)
2164 * If global heartbeat active and there are dependent users,
2165 * pin all regions if quorum region count <= CUT_OFF
2167 spin_lock(&o2hb_live_lock);
2169 if (!o2hb_dependent_users)
2172 if (bitmap_weight(o2hb_quorum_region_bitmap,
2173 O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2174 o2hb_region_pin(NULL);
2177 spin_unlock(&o2hb_live_lock);
2180 static ssize_t o2hb_heartbeat_group_threshold_show(struct config_item *item,
2183 return sprintf(page, "%u\n", o2hb_dead_threshold);
2186 static ssize_t o2hb_heartbeat_group_threshold_store(struct config_item *item,
2187 const char *page, size_t count)
2190 char *p = (char *)page;
2192 tmp = simple_strtoul(p, &p, 10);
2193 if (!p || (*p && (*p != '\n')))
2196 /* this will validate ranges for us. */
2197 o2hb_dead_threshold_set((unsigned int) tmp);
2202 static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item,
2205 return sprintf(page, "%s\n",
2206 o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
2209 static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item,
2210 const char *page, size_t count)
2216 len = (page[count - 1] == '\n') ? count - 1 : count;
2220 for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
2221 if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len))
2224 ret = o2hb_global_heartbeat_mode_set(i);
2226 printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
2227 o2hb_heartbeat_mode_desc[i]);
2235 CONFIGFS_ATTR(o2hb_heartbeat_group_, threshold);
2236 CONFIGFS_ATTR(o2hb_heartbeat_group_, mode);
2238 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
2239 &o2hb_heartbeat_group_attr_threshold,
2240 &o2hb_heartbeat_group_attr_mode,
2244 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
2245 .make_item = o2hb_heartbeat_group_make_item,
2246 .drop_item = o2hb_heartbeat_group_drop_item,
2249 static struct config_item_type o2hb_heartbeat_group_type = {
2250 .ct_group_ops = &o2hb_heartbeat_group_group_ops,
2251 .ct_attrs = o2hb_heartbeat_group_attrs,
2252 .ct_owner = THIS_MODULE,
2255 /* this is just here to avoid touching group in heartbeat.h which the
2256 * entire damn world #includes */
2257 struct config_group *o2hb_alloc_hb_set(void)
2259 struct o2hb_heartbeat_group *hs = NULL;
2260 struct config_group *ret = NULL;
2262 hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
2266 config_group_init_type_name(&hs->hs_group, "heartbeat",
2267 &o2hb_heartbeat_group_type);
2269 ret = &hs->hs_group;
2276 void o2hb_free_hb_set(struct config_group *group)
2278 struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
2282 /* hb callback registration and issuing */
2284 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
2286 if (type == O2HB_NUM_CB)
2287 return ERR_PTR(-EINVAL);
2289 return &o2hb_callbacks[type];
2292 void o2hb_setup_callback(struct o2hb_callback_func *hc,
2293 enum o2hb_callback_type type,
2298 INIT_LIST_HEAD(&hc->hc_item);
2301 hc->hc_priority = priority;
2303 hc->hc_magic = O2HB_CB_MAGIC;
2305 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
2308 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2309 * In global heartbeat mode, region_uuid passed is NULL.
2311 * In local, we only pin the matching region. In global we pin all the active
2314 static int o2hb_region_pin(const char *region_uuid)
2316 int ret = 0, found = 0;
2317 struct o2hb_region *reg;
2320 assert_spin_locked(&o2hb_live_lock);
2322 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2323 if (reg->hr_item_dropped)
2326 uuid = config_item_name(®->hr_item);
2328 /* local heartbeat */
2330 if (strcmp(region_uuid, uuid))
2335 if (reg->hr_item_pinned || reg->hr_item_dropped)
2338 /* Ignore ENOENT only for local hb (userdlm domain) */
2339 ret = o2nm_depend_item(®->hr_item);
2341 mlog(ML_CLUSTER, "Pin region %s\n", uuid);
2342 reg->hr_item_pinned = 1;
2344 if (ret == -ENOENT && found)
2347 mlog(ML_ERROR, "Pin region %s fails with %d\n",
2361 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2362 * In global heartbeat mode, region_uuid passed is NULL.
2364 * In local, we only unpin the matching region. In global we unpin all the
2367 static void o2hb_region_unpin(const char *region_uuid)
2369 struct o2hb_region *reg;
2373 assert_spin_locked(&o2hb_live_lock);
2375 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2376 if (reg->hr_item_dropped)
2379 uuid = config_item_name(®->hr_item);
2381 if (strcmp(region_uuid, uuid))
2386 if (reg->hr_item_pinned) {
2387 mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
2388 o2nm_undepend_item(®->hr_item);
2389 reg->hr_item_pinned = 0;
2396 static int o2hb_region_inc_user(const char *region_uuid)
2400 spin_lock(&o2hb_live_lock);
2402 /* local heartbeat */
2403 if (!o2hb_global_heartbeat_active()) {
2404 ret = o2hb_region_pin(region_uuid);
2409 * if global heartbeat active and this is the first dependent user,
2410 * pin all regions if quorum region count <= CUT_OFF
2412 o2hb_dependent_users++;
2413 if (o2hb_dependent_users > 1)
2416 if (bitmap_weight(o2hb_quorum_region_bitmap,
2417 O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2418 ret = o2hb_region_pin(NULL);
2421 spin_unlock(&o2hb_live_lock);
2425 void o2hb_region_dec_user(const char *region_uuid)
2427 spin_lock(&o2hb_live_lock);
2429 /* local heartbeat */
2430 if (!o2hb_global_heartbeat_active()) {
2431 o2hb_region_unpin(region_uuid);
2436 * if global heartbeat active and there are no dependent users,
2437 * unpin all quorum regions
2439 o2hb_dependent_users--;
2440 if (!o2hb_dependent_users)
2441 o2hb_region_unpin(NULL);
2444 spin_unlock(&o2hb_live_lock);
2447 int o2hb_register_callback(const char *region_uuid,
2448 struct o2hb_callback_func *hc)
2450 struct o2hb_callback_func *f;
2451 struct o2hb_callback *hbcall;
2454 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2455 BUG_ON(!list_empty(&hc->hc_item));
2457 hbcall = hbcall_from_type(hc->hc_type);
2458 if (IS_ERR(hbcall)) {
2459 ret = PTR_ERR(hbcall);
2464 ret = o2hb_region_inc_user(region_uuid);
2471 down_write(&o2hb_callback_sem);
2473 list_for_each_entry(f, &hbcall->list, hc_item) {
2474 if (hc->hc_priority < f->hc_priority) {
2475 list_add_tail(&hc->hc_item, &f->hc_item);
2479 if (list_empty(&hc->hc_item))
2480 list_add_tail(&hc->hc_item, &hbcall->list);
2482 up_write(&o2hb_callback_sem);
2485 mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
2486 ret, __builtin_return_address(0), hc);
2489 EXPORT_SYMBOL_GPL(o2hb_register_callback);
2491 void o2hb_unregister_callback(const char *region_uuid,
2492 struct o2hb_callback_func *hc)
2494 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2496 mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
2497 __builtin_return_address(0), hc);
2499 /* XXX Can this happen _with_ a region reference? */
2500 if (list_empty(&hc->hc_item))
2504 o2hb_region_dec_user(region_uuid);
2506 down_write(&o2hb_callback_sem);
2508 list_del_init(&hc->hc_item);
2510 up_write(&o2hb_callback_sem);
2512 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
2514 int o2hb_check_node_heartbeating(u8 node_num)
2516 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2518 o2hb_fill_node_map(testing_map, sizeof(testing_map));
2519 if (!test_bit(node_num, testing_map)) {
2521 "node (%u) does not have heartbeating enabled.\n",
2528 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
2530 int o2hb_check_node_heartbeating_no_sem(u8 node_num)
2532 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2534 spin_lock(&o2hb_live_lock);
2535 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2536 spin_unlock(&o2hb_live_lock);
2537 if (!test_bit(node_num, testing_map)) {
2539 "node (%u) does not have heartbeating enabled.\n",
2546 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem);
2548 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
2550 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2552 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2553 if (!test_bit(node_num, testing_map)) {
2555 "node (%u) does not have heartbeating enabled.\n",
2562 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
2564 /* Makes sure our local node is configured with a node number, and is
2566 int o2hb_check_local_node_heartbeating(void)
2570 /* if this node was set then we have networking */
2571 node_num = o2nm_this_node();
2572 if (node_num == O2NM_MAX_NODES) {
2573 mlog(ML_HEARTBEAT, "this node has not been configured.\n");
2577 return o2hb_check_node_heartbeating(node_num);
2579 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
2582 * this is just a hack until we get the plumbing which flips file systems
2583 * read only and drops the hb ref instead of killing the node dead.
2585 void o2hb_stop_all_regions(void)
2587 struct o2hb_region *reg;
2589 mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
2591 spin_lock(&o2hb_live_lock);
2593 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
2594 reg->hr_unclean_stop = 1;
2596 spin_unlock(&o2hb_live_lock);
2598 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
2600 int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
2602 struct o2hb_region *reg;
2606 spin_lock(&o2hb_live_lock);
2609 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2610 if (reg->hr_item_dropped)
2613 mlog(0, "Region: %s\n", config_item_name(®->hr_item));
2614 if (numregs < max_regions) {
2615 memcpy(p, config_item_name(®->hr_item),
2616 O2HB_MAX_REGION_NAME_LEN);
2617 p += O2HB_MAX_REGION_NAME_LEN;
2622 spin_unlock(&o2hb_live_lock);
2626 EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
2628 int o2hb_global_heartbeat_active(void)
2630 return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
2632 EXPORT_SYMBOL(o2hb_global_heartbeat_active);