ACPI / EC: Work around method reentrancy limit in ACPICA for _Qxx
[cascardo/linux.git] / drivers / staging / lustre / lnet / lnet / net_fault.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  * Lustre is a trademark of Seagate, Inc.
28  *
29  * lnet/lnet/net_fault.c
30  *
31  * Lustre network fault simulation
32  *
33  * Author: liang.zhen@intel.com
34  */
35
36 #define DEBUG_SUBSYSTEM S_LNET
37
38 #include "../../include/linux/lnet/lib-lnet.h"
39 #include "../../include/linux/lnet/lnetctl.h"
40
41 #define LNET_MSG_MASK           (LNET_PUT_BIT | LNET_ACK_BIT | \
42                                  LNET_GET_BIT | LNET_REPLY_BIT)
43
44 struct lnet_drop_rule {
45         /** link chain on the_lnet.ln_drop_rules */
46         struct list_head        dr_link;
47         /** attributes of this rule */
48         struct lnet_fault_attr  dr_attr;
49         /** lock to protect \a dr_drop_at and \a dr_stat */
50         spinlock_t              dr_lock;
51         /**
52          * the message sequence to drop, which means message is dropped when
53          * dr_stat.drs_count == dr_drop_at
54          */
55         unsigned long           dr_drop_at;
56         /**
57          * seconds to drop the next message, it's exclusive with dr_drop_at
58          */
59         unsigned long           dr_drop_time;
60         /** baseline to caculate dr_drop_time */
61         unsigned long           dr_time_base;
62         /** statistic of dropped messages */
63         struct lnet_fault_stat  dr_stat;
64 };
65
66 static bool
67 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
68 {
69         if (nid == msg_nid || nid == LNET_NID_ANY)
70                 return true;
71
72         if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
73                 return false;
74
75         /* 255.255.255.255@net is wildcard for all addresses in a network */
76         return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
77 }
78
79 static bool
80 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
81                       lnet_nid_t dst, unsigned int type, unsigned int portal)
82 {
83         if (!lnet_fault_nid_match(attr->fa_src, src) ||
84             !lnet_fault_nid_match(attr->fa_dst, dst))
85                 return false;
86
87         if (!(attr->fa_msg_mask & (1 << type)))
88                 return false;
89
90         /**
91          * NB: ACK and REPLY have no portal, but they should have been
92          * rejected by message mask
93          */
94         if (attr->fa_ptl_mask && /* has portal filter */
95             !(attr->fa_ptl_mask & (1ULL << portal)))
96                 return false;
97
98         return true;
99 }
100
101 static int
102 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
103 {
104         if (!attr->fa_msg_mask)
105                 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
106
107         if (!attr->fa_ptl_mask) /* no portal filter */
108                 return 0;
109
110         /* NB: only PUT and GET can be filtered if portal filter has been set */
111         attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
112         if (!attr->fa_msg_mask) {
113                 CDEBUG(D_NET, "can't find valid message type bits %x\n",
114                        attr->fa_msg_mask);
115                 return -EINVAL;
116         }
117         return 0;
118 }
119
120 static void
121 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
122 {
123         /* NB: fs_counter is NOT updated by this function */
124         switch (type) {
125         case LNET_MSG_PUT:
126                 stat->fs_put++;
127                 return;
128         case LNET_MSG_ACK:
129                 stat->fs_ack++;
130                 return;
131         case LNET_MSG_GET:
132                 stat->fs_get++;
133                 return;
134         case LNET_MSG_REPLY:
135                 stat->fs_reply++;
136                 return;
137         }
138 }
139
140 /**
141  * LNet message drop simulation
142  */
143
144 /**
145  * Add a new drop rule to LNet
146  * There is no check for duplicated drop rule, all rules will be checked for
147  * incoming message.
148  */
149 static int
150 lnet_drop_rule_add(struct lnet_fault_attr *attr)
151 {
152         struct lnet_drop_rule *rule;
153
154         if (attr->u.drop.da_rate & attr->u.drop.da_interval) {
155                 CDEBUG(D_NET, "please provide either drop rate or drop interval, but not both at the same time %d/%d\n",
156                        attr->u.drop.da_rate, attr->u.drop.da_interval);
157                 return -EINVAL;
158         }
159
160         if (lnet_fault_attr_validate(attr))
161                 return -EINVAL;
162
163         CFS_ALLOC_PTR(rule);
164         if (!rule)
165                 return -ENOMEM;
166
167         spin_lock_init(&rule->dr_lock);
168
169         rule->dr_attr = *attr;
170         if (attr->u.drop.da_interval) {
171                 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
172                 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
173                                                     attr->u.drop.da_interval);
174         } else {
175                 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
176         }
177
178         lnet_net_lock(LNET_LOCK_EX);
179         list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
180         lnet_net_unlock(LNET_LOCK_EX);
181
182         CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
183                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
184                attr->u.drop.da_rate, attr->u.drop.da_interval);
185         return 0;
186 }
187
188 /**
189  * Remove matched drop rules from lnet, all rules that can match \a src and
190  * \a dst will be removed.
191  * If \a src is zero, then all rules have \a dst as destination will be remove
192  * If \a dst is zero, then all rules have \a src as source will be removed
193  * If both of them are zero, all rules will be removed
194  */
195 static int
196 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
197 {
198         struct lnet_drop_rule *rule;
199         struct lnet_drop_rule *tmp;
200         struct list_head zombies;
201         int n = 0;
202
203         INIT_LIST_HEAD(&zombies);
204
205         lnet_net_lock(LNET_LOCK_EX);
206         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
207                 if (rule->dr_attr.fa_src != src && src)
208                         continue;
209
210                 if (rule->dr_attr.fa_dst != dst && dst)
211                         continue;
212
213                 list_move(&rule->dr_link, &zombies);
214         }
215         lnet_net_unlock(LNET_LOCK_EX);
216
217         list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
218                 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
219                        libcfs_nid2str(rule->dr_attr.fa_src),
220                        libcfs_nid2str(rule->dr_attr.fa_dst),
221                        rule->dr_attr.u.drop.da_rate,
222                        rule->dr_attr.u.drop.da_interval);
223
224                 list_del(&rule->dr_link);
225                 CFS_FREE_PTR(rule);
226                 n++;
227         }
228
229         return n;
230 }
231
232 /**
233  * List drop rule at position of \a pos
234  */
235 static int
236 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
237                     struct lnet_fault_stat *stat)
238 {
239         struct lnet_drop_rule *rule;
240         int cpt;
241         int i = 0;
242         int rc = -ENOENT;
243
244         cpt = lnet_net_lock_current();
245         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
246                 if (i++ < pos)
247                         continue;
248
249                 spin_lock(&rule->dr_lock);
250                 *attr = rule->dr_attr;
251                 *stat = rule->dr_stat;
252                 spin_unlock(&rule->dr_lock);
253                 rc = 0;
254                 break;
255         }
256
257         lnet_net_unlock(cpt);
258         return rc;
259 }
260
261 /**
262  * reset counters for all drop rules
263  */
264 static void
265 lnet_drop_rule_reset(void)
266 {
267         struct lnet_drop_rule *rule;
268         int cpt;
269
270         cpt = lnet_net_lock_current();
271
272         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
273                 struct lnet_fault_attr *attr = &rule->dr_attr;
274
275                 spin_lock(&rule->dr_lock);
276
277                 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
278                 if (attr->u.drop.da_rate) {
279                         rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
280                 } else {
281                         rule->dr_drop_time = cfs_time_shift(cfs_rand() %
282                                                 attr->u.drop.da_interval);
283                         rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
284                 }
285                 spin_unlock(&rule->dr_lock);
286         }
287
288         lnet_net_unlock(cpt);
289 }
290
291 /**
292  * check source/destination NID, portal, message type and drop rate,
293  * decide whether should drop this message or not
294  */
295 static bool
296 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
297                 lnet_nid_t dst, unsigned int type, unsigned int portal)
298 {
299         struct lnet_fault_attr *attr = &rule->dr_attr;
300         bool drop;
301
302         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
303                 return false;
304
305         /* match this rule, check drop rate now */
306         spin_lock(&rule->dr_lock);
307         if (rule->dr_drop_time) { /* time based drop */
308                 unsigned long now = cfs_time_current();
309
310                 rule->dr_stat.fs_count++;
311                 drop = cfs_time_aftereq(now, rule->dr_drop_time);
312                 if (drop) {
313                         if (cfs_time_after(now, rule->dr_time_base))
314                                 rule->dr_time_base = now;
315
316                         rule->dr_drop_time = rule->dr_time_base +
317                                              cfs_time_seconds(cfs_rand() %
318                                                 attr->u.drop.da_interval);
319                         rule->dr_time_base += cfs_time_seconds(attr->u.drop.da_interval);
320
321                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lu\n",
322                                libcfs_nid2str(attr->fa_src),
323                                libcfs_nid2str(attr->fa_dst),
324                                rule->dr_drop_time);
325                 }
326
327         } else { /* rate based drop */
328                 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
329
330                 if (!do_div(rule->dr_stat.fs_count, attr->u.drop.da_rate)) {
331                         rule->dr_drop_at = rule->dr_stat.fs_count +
332                                            cfs_rand() % attr->u.drop.da_rate;
333                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
334                                libcfs_nid2str(attr->fa_src),
335                                libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
336                 }
337         }
338
339         if (drop) { /* drop this message, update counters */
340                 lnet_fault_stat_inc(&rule->dr_stat, type);
341                 rule->dr_stat.u.drop.ds_dropped++;
342         }
343
344         spin_unlock(&rule->dr_lock);
345         return drop;
346 }
347
348 /**
349  * Check if message from \a src to \a dst can match any existed drop rule
350  */
351 bool
352 lnet_drop_rule_match(lnet_hdr_t *hdr)
353 {
354         struct lnet_drop_rule *rule;
355         lnet_nid_t src = le64_to_cpu(hdr->src_nid);
356         lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
357         unsigned int typ = le32_to_cpu(hdr->type);
358         unsigned int ptl = -1;
359         bool drop = false;
360         int cpt;
361
362         /**
363          * NB: if Portal is specified, then only PUT and GET will be
364          * filtered by drop rule
365          */
366         if (typ == LNET_MSG_PUT)
367                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
368         else if (typ == LNET_MSG_GET)
369                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
370
371         cpt = lnet_net_lock_current();
372         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
373                 drop = drop_rule_match(rule, src, dst, typ, ptl);
374                 if (drop)
375                         break;
376         }
377
378         lnet_net_unlock(cpt);
379         return drop;
380 }
381
382 /**
383  * LNet Delay Simulation
384  */
385 /** timestamp (second) to send delayed message */
386 #define msg_delay_send           msg_ev.hdr_data
387
388 struct lnet_delay_rule {
389         /** link chain on the_lnet.ln_delay_rules */
390         struct list_head        dl_link;
391         /** link chain on delay_dd.dd_sched_rules */
392         struct list_head        dl_sched_link;
393         /** attributes of this rule */
394         struct lnet_fault_attr  dl_attr;
395         /** lock to protect \a below members */
396         spinlock_t              dl_lock;
397         /** refcount of delay rule */
398         atomic_t                dl_refcount;
399         /**
400          * the message sequence to delay, which means message is delayed when
401          * dl_stat.fs_count == dl_delay_at
402          */
403         unsigned long           dl_delay_at;
404         /**
405          * seconds to delay the next message, it's exclusive with dl_delay_at
406          */
407         unsigned long           dl_delay_time;
408         /** baseline to caculate dl_delay_time */
409         unsigned long           dl_time_base;
410         /** jiffies to send the next delayed message */
411         unsigned long           dl_msg_send;
412         /** delayed message list */
413         struct list_head        dl_msg_list;
414         /** statistic of delayed messages */
415         struct lnet_fault_stat  dl_stat;
416         /** timer to wakeup delay_daemon */
417         struct timer_list       dl_timer;
418 };
419
420 struct delay_daemon_data {
421         /** serialise rule add/remove */
422         struct mutex            dd_mutex;
423         /** protect rules on \a dd_sched_rules */
424         spinlock_t              dd_lock;
425         /** scheduled delay rules (by timer) */
426         struct list_head        dd_sched_rules;
427         /** daemon thread sleeps at here */
428         wait_queue_head_t       dd_waitq;
429         /** controller (lctl command) wait at here */
430         wait_queue_head_t       dd_ctl_waitq;
431         /** daemon is running */
432         unsigned int            dd_running;
433         /** daemon stopped */
434         unsigned int            dd_stopped;
435 };
436
437 static struct delay_daemon_data delay_dd;
438
439 static unsigned long
440 round_timeout(unsigned long timeout)
441 {
442         return cfs_time_seconds((unsigned int)
443                         cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
444 }
445
446 static void
447 delay_rule_decref(struct lnet_delay_rule *rule)
448 {
449         if (atomic_dec_and_test(&rule->dl_refcount)) {
450                 LASSERT(list_empty(&rule->dl_sched_link));
451                 LASSERT(list_empty(&rule->dl_msg_list));
452                 LASSERT(list_empty(&rule->dl_link));
453
454                 CFS_FREE_PTR(rule);
455         }
456 }
457
458 /**
459  * check source/destination NID, portal, message type and delay rate,
460  * decide whether should delay this message or not
461  */
462 static bool
463 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
464                  lnet_nid_t dst, unsigned int type, unsigned int portal,
465                  struct lnet_msg *msg)
466 {
467         struct lnet_fault_attr *attr = &rule->dl_attr;
468         bool delay;
469
470         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
471                 return false;
472
473         /* match this rule, check delay rate now */
474         spin_lock(&rule->dl_lock);
475         if (rule->dl_delay_time) { /* time based delay */
476                 unsigned long now = cfs_time_current();
477
478                 rule->dl_stat.fs_count++;
479                 delay = cfs_time_aftereq(now, rule->dl_delay_time);
480                 if (delay) {
481                         if (cfs_time_after(now, rule->dl_time_base))
482                                 rule->dl_time_base = now;
483
484                         rule->dl_delay_time = rule->dl_time_base +
485                                              cfs_time_seconds(cfs_rand() %
486                                                 attr->u.delay.la_interval);
487                         rule->dl_time_base += cfs_time_seconds(attr->u.delay.la_interval);
488
489                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lu\n",
490                                libcfs_nid2str(attr->fa_src),
491                                libcfs_nid2str(attr->fa_dst),
492                                rule->dl_delay_time);
493                 }
494
495         } else { /* rate based delay */
496                 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
497                 /* generate the next random rate sequence */
498                 if (!do_div(rule->dl_stat.fs_count, attr->u.delay.la_rate)) {
499                         rule->dl_delay_at = rule->dl_stat.fs_count +
500                                             cfs_rand() % attr->u.delay.la_rate;
501                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
502                                libcfs_nid2str(attr->fa_src),
503                                libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
504                 }
505         }
506
507         if (!delay) {
508                 spin_unlock(&rule->dl_lock);
509                 return false;
510         }
511
512         /* delay this message, update counters */
513         lnet_fault_stat_inc(&rule->dl_stat, type);
514         rule->dl_stat.u.delay.ls_delayed++;
515
516         list_add_tail(&msg->msg_list, &rule->dl_msg_list);
517         msg->msg_delay_send = round_timeout(
518                         cfs_time_shift(attr->u.delay.la_latency));
519         if (rule->dl_msg_send == -1) {
520                 rule->dl_msg_send = msg->msg_delay_send;
521                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
522         }
523
524         spin_unlock(&rule->dl_lock);
525         return true;
526 }
527
528 /**
529  * check if \a msg can match any Delay Rule, receiving of this message
530  * will be delayed if there is a match.
531  */
532 bool
533 lnet_delay_rule_match_locked(lnet_hdr_t *hdr, struct lnet_msg *msg)
534 {
535         struct lnet_delay_rule *rule;
536         lnet_nid_t src = le64_to_cpu(hdr->src_nid);
537         lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
538         unsigned int typ = le32_to_cpu(hdr->type);
539         unsigned int ptl = -1;
540
541         /* NB: called with hold of lnet_net_lock */
542
543         /**
544          * NB: if Portal is specified, then only PUT and GET will be
545          * filtered by delay rule
546          */
547         if (typ == LNET_MSG_PUT)
548                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
549         else if (typ == LNET_MSG_GET)
550                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
551
552         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
553                 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
554                         return true;
555         }
556
557         return false;
558 }
559
560 /** check out delayed messages for send */
561 static void
562 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
563                   struct list_head *msg_list)
564 {
565         struct lnet_msg *msg;
566         struct lnet_msg *tmp;
567         unsigned long now = cfs_time_current();
568
569         if (!all && rule->dl_msg_send > now)
570                 return;
571
572         spin_lock(&rule->dl_lock);
573         list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
574                 if (!all && msg->msg_delay_send > now)
575                         break;
576
577                 msg->msg_delay_send = 0;
578                 list_move_tail(&msg->msg_list, msg_list);
579         }
580
581         if (list_empty(&rule->dl_msg_list)) {
582                 del_timer(&rule->dl_timer);
583                 rule->dl_msg_send = -1;
584
585         } else if (!list_empty(msg_list)) {
586                 /*
587                  * dequeued some timedout messages, update timer for the
588                  * next delayed message on rule
589                  */
590                 msg = list_entry(rule->dl_msg_list.next,
591                                  struct lnet_msg, msg_list);
592                 rule->dl_msg_send = msg->msg_delay_send;
593                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
594         }
595         spin_unlock(&rule->dl_lock);
596 }
597
598 static void
599 delayed_msg_process(struct list_head *msg_list, bool drop)
600 {
601         struct lnet_msg *msg;
602
603         while (!list_empty(msg_list)) {
604                 struct lnet_ni *ni;
605                 int cpt;
606                 int rc;
607
608                 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
609                 LASSERT(msg->msg_rxpeer);
610
611                 ni = msg->msg_rxpeer->lp_ni;
612                 cpt = msg->msg_rx_cpt;
613
614                 list_del_init(&msg->msg_list);
615                 if (drop) {
616                         rc = -ECANCELED;
617
618                 } else if (!msg->msg_routing) {
619                         rc = lnet_parse_local(ni, msg);
620                         if (!rc)
621                                 continue;
622
623                 } else {
624                         lnet_net_lock(cpt);
625                         rc = lnet_parse_forward_locked(ni, msg);
626                         lnet_net_unlock(cpt);
627
628                         switch (rc) {
629                         case LNET_CREDIT_OK:
630                                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
631                                              0, msg->msg_len, msg->msg_len);
632                         case LNET_CREDIT_WAIT:
633                                 continue;
634                         default: /* failures */
635                                 break;
636                         }
637                 }
638
639                 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
640                 lnet_finalize(ni, msg, rc);
641         }
642 }
643
644 /**
645  * Process delayed messages for scheduled rules
646  * This function can either be called by delay_rule_daemon, or by lnet_finalise
647  */
648 void
649 lnet_delay_rule_check(void)
650 {
651         struct lnet_delay_rule *rule;
652         struct list_head msgs;
653
654         INIT_LIST_HEAD(&msgs);
655         while (1) {
656                 if (list_empty(&delay_dd.dd_sched_rules))
657                         break;
658
659                 spin_lock_bh(&delay_dd.dd_lock);
660                 if (list_empty(&delay_dd.dd_sched_rules)) {
661                         spin_unlock_bh(&delay_dd.dd_lock);
662                         break;
663                 }
664
665                 rule = list_entry(delay_dd.dd_sched_rules.next,
666                                   struct lnet_delay_rule, dl_sched_link);
667                 list_del_init(&rule->dl_sched_link);
668                 spin_unlock_bh(&delay_dd.dd_lock);
669
670                 delayed_msg_check(rule, false, &msgs);
671                 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
672         }
673
674         if (!list_empty(&msgs))
675                 delayed_msg_process(&msgs, false);
676 }
677
678 /** daemon thread to handle delayed messages */
679 static int
680 lnet_delay_rule_daemon(void *arg)
681 {
682         delay_dd.dd_running = 1;
683         wake_up(&delay_dd.dd_ctl_waitq);
684
685         while (delay_dd.dd_running) {
686                 wait_event_interruptible(delay_dd.dd_waitq,
687                                          !delay_dd.dd_running ||
688                                          !list_empty(&delay_dd.dd_sched_rules));
689                 lnet_delay_rule_check();
690         }
691
692         /* in case more rules have been enqueued after my last check */
693         lnet_delay_rule_check();
694         delay_dd.dd_stopped = 1;
695         wake_up(&delay_dd.dd_ctl_waitq);
696
697         return 0;
698 }
699
700 static void
701 delay_timer_cb(unsigned long arg)
702 {
703         struct lnet_delay_rule *rule = (struct lnet_delay_rule *)arg;
704
705         spin_lock_bh(&delay_dd.dd_lock);
706         if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
707                 atomic_inc(&rule->dl_refcount);
708                 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
709                 wake_up(&delay_dd.dd_waitq);
710         }
711         spin_unlock_bh(&delay_dd.dd_lock);
712 }
713
714 /**
715  * Add a new delay rule to LNet
716  * There is no check for duplicated delay rule, all rules will be checked for
717  * incoming message.
718  */
719 int
720 lnet_delay_rule_add(struct lnet_fault_attr *attr)
721 {
722         struct lnet_delay_rule *rule;
723         int rc = 0;
724
725         if (attr->u.delay.la_rate & attr->u.delay.la_interval) {
726                 CDEBUG(D_NET, "please provide either delay rate or delay interval, but not both at the same time %d/%d\n",
727                        attr->u.delay.la_rate, attr->u.delay.la_interval);
728                 return -EINVAL;
729         }
730
731         if (!attr->u.delay.la_latency) {
732                 CDEBUG(D_NET, "delay latency cannot be zero\n");
733                 return -EINVAL;
734         }
735
736         if (lnet_fault_attr_validate(attr))
737                 return -EINVAL;
738
739         CFS_ALLOC_PTR(rule);
740         if (!rule)
741                 return -ENOMEM;
742
743         mutex_lock(&delay_dd.dd_mutex);
744         if (!delay_dd.dd_running) {
745                 struct task_struct *task;
746
747                 /**
748                  *  NB: although LND threads will process delayed message
749                  * in lnet_finalize, but there is no guarantee that LND
750                  * threads will be waken up if no other message needs to
751                  * be handled.
752                  * Only one daemon thread, performance is not the concern
753                  * of this simualation module.
754                  */
755                 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
756                 if (IS_ERR(task)) {
757                         rc = PTR_ERR(task);
758                         goto failed;
759                 }
760                 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
761         }
762
763         init_timer(&rule->dl_timer);
764         rule->dl_timer.function = delay_timer_cb;
765         rule->dl_timer.data = (unsigned long)rule;
766
767         spin_lock_init(&rule->dl_lock);
768         INIT_LIST_HEAD(&rule->dl_msg_list);
769         INIT_LIST_HEAD(&rule->dl_sched_link);
770
771         rule->dl_attr = *attr;
772         if (attr->u.delay.la_interval) {
773                 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
774                 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
775                                                      attr->u.delay.la_interval);
776         } else {
777                 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
778         }
779
780         rule->dl_msg_send = -1;
781
782         lnet_net_lock(LNET_LOCK_EX);
783         atomic_set(&rule->dl_refcount, 1);
784         list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
785         lnet_net_unlock(LNET_LOCK_EX);
786
787         CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
788                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
789                attr->u.delay.la_rate);
790
791         mutex_unlock(&delay_dd.dd_mutex);
792         return 0;
793 failed:
794         mutex_unlock(&delay_dd.dd_mutex);
795         CFS_FREE_PTR(rule);
796         return rc;
797 }
798
799 /**
800  * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
801  * and \a dst are zero, all rules will be removed, otherwise only matched rules
802  * will be removed.
803  * If \a src is zero, then all rules have \a dst as destination will be remove
804  * If \a dst is zero, then all rules have \a src as source will be removed
805  *
806  * When a delay rule is removed, all delayed messages of this rule will be
807  * processed immediately.
808  */
809 int
810 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
811 {
812         struct lnet_delay_rule *rule;
813         struct lnet_delay_rule *tmp;
814         struct list_head rule_list;
815         struct list_head msg_list;
816         int n = 0;
817         bool cleanup;
818
819         INIT_LIST_HEAD(&rule_list);
820         INIT_LIST_HEAD(&msg_list);
821
822         if (shutdown) {
823                 src = 0;
824                 dst = 0;
825         }
826
827         mutex_lock(&delay_dd.dd_mutex);
828         lnet_net_lock(LNET_LOCK_EX);
829
830         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
831                 if (rule->dl_attr.fa_src != src && src)
832                         continue;
833
834                 if (rule->dl_attr.fa_dst != dst && dst)
835                         continue;
836
837                 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
838                        libcfs_nid2str(rule->dl_attr.fa_src),
839                        libcfs_nid2str(rule->dl_attr.fa_dst),
840                        rule->dl_attr.u.delay.la_rate,
841                        rule->dl_attr.u.delay.la_interval);
842                 /* refcount is taken over by rule_list */
843                 list_move(&rule->dl_link, &rule_list);
844         }
845
846         /* check if we need to shutdown delay_daemon */
847         cleanup = list_empty(&the_lnet.ln_delay_rules) &&
848                   !list_empty(&rule_list);
849         lnet_net_unlock(LNET_LOCK_EX);
850
851         list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
852                 list_del_init(&rule->dl_link);
853
854                 del_timer_sync(&rule->dl_timer);
855                 delayed_msg_check(rule, true, &msg_list);
856                 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
857                 n++;
858         }
859
860         if (cleanup) { /* no more delay rule, shutdown delay_daemon */
861                 LASSERT(delay_dd.dd_running);
862                 delay_dd.dd_running = 0;
863                 wake_up(&delay_dd.dd_waitq);
864
865                 while (!delay_dd.dd_stopped)
866                         wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
867         }
868         mutex_unlock(&delay_dd.dd_mutex);
869
870         if (!list_empty(&msg_list))
871                 delayed_msg_process(&msg_list, shutdown);
872
873         return n;
874 }
875
876 /**
877  * List Delay Rule at position of \a pos
878  */
879 int
880 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
881                      struct lnet_fault_stat *stat)
882 {
883         struct lnet_delay_rule *rule;
884         int cpt;
885         int i = 0;
886         int rc = -ENOENT;
887
888         cpt = lnet_net_lock_current();
889         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
890                 if (i++ < pos)
891                         continue;
892
893                 spin_lock(&rule->dl_lock);
894                 *attr = rule->dl_attr;
895                 *stat = rule->dl_stat;
896                 spin_unlock(&rule->dl_lock);
897                 rc = 0;
898                 break;
899         }
900
901         lnet_net_unlock(cpt);
902         return rc;
903 }
904
905 /**
906  * reset counters for all Delay Rules
907  */
908 void
909 lnet_delay_rule_reset(void)
910 {
911         struct lnet_delay_rule *rule;
912         int cpt;
913
914         cpt = lnet_net_lock_current();
915
916         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
917                 struct lnet_fault_attr *attr = &rule->dl_attr;
918
919                 spin_lock(&rule->dl_lock);
920
921                 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
922                 if (attr->u.delay.la_rate) {
923                         rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
924                 } else {
925                         rule->dl_delay_time = cfs_time_shift(cfs_rand() %
926                                                 attr->u.delay.la_interval);
927                         rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
928                 }
929                 spin_unlock(&rule->dl_lock);
930         }
931
932         lnet_net_unlock(cpt);
933 }
934
935 int
936 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
937 {
938         struct lnet_fault_attr *attr;
939         struct lnet_fault_stat *stat;
940
941         attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
942
943         switch (opc) {
944         default:
945                 return -EINVAL;
946
947         case LNET_CTL_DROP_ADD:
948                 if (!attr)
949                         return -EINVAL;
950
951                 return lnet_drop_rule_add(attr);
952
953         case LNET_CTL_DROP_DEL:
954                 if (!attr)
955                         return -EINVAL;
956
957                 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
958                                                      attr->fa_dst);
959                 return 0;
960
961         case LNET_CTL_DROP_RESET:
962                 lnet_drop_rule_reset();
963                 return 0;
964
965         case LNET_CTL_DROP_LIST:
966                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
967                 if (!attr || !stat)
968                         return -EINVAL;
969
970                 return lnet_drop_rule_list(data->ioc_count, attr, stat);
971
972         case LNET_CTL_DELAY_ADD:
973                 if (!attr)
974                         return -EINVAL;
975
976                 return lnet_delay_rule_add(attr);
977
978         case LNET_CTL_DELAY_DEL:
979                 if (!attr)
980                         return -EINVAL;
981
982                 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
983                                                       attr->fa_dst, false);
984                 return 0;
985
986         case LNET_CTL_DELAY_RESET:
987                 lnet_delay_rule_reset();
988                 return 0;
989
990         case LNET_CTL_DELAY_LIST:
991                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
992                 if (!attr || !stat)
993                         return -EINVAL;
994
995                 return lnet_delay_rule_list(data->ioc_count, attr, stat);
996         }
997 }
998
999 int
1000 lnet_fault_init(void)
1001 {
1002         CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1003         CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1004         CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1005         CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1006
1007         mutex_init(&delay_dd.dd_mutex);
1008         spin_lock_init(&delay_dd.dd_lock);
1009         init_waitqueue_head(&delay_dd.dd_waitq);
1010         init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1011         INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1012
1013         return 0;
1014 }
1015
1016 void
1017 lnet_fault_fini(void)
1018 {
1019         lnet_drop_rule_del(0, 0);
1020         lnet_delay_rule_del(0, 0, true);
1021
1022         LASSERT(list_empty(&the_lnet.ln_drop_rules));
1023         LASSERT(list_empty(&the_lnet.ln_delay_rules));
1024         LASSERT(list_empty(&delay_dd.dd_sched_rules));
1025 }