smp_call_function_many: handle concurrent clearing of mask
[cascardo/linux.git] / kernel / smp.c
index 9910744..9545489 100644 (file)
@@ -450,7 +450,7 @@ void smp_call_function_many(const struct cpumask *mask,
 {
        struct call_function_data *data;
        unsigned long flags;
-       int cpu, next_cpu, this_cpu = smp_processor_id();
+       int refs, cpu, next_cpu, this_cpu = smp_processor_id();
 
        /*
         * Can deadlock when called with interrupts disabled.
@@ -461,7 +461,7 @@ void smp_call_function_many(const struct cpumask *mask,
        WARN_ON_ONCE(cpu_online(this_cpu) && irqs_disabled()
                     && !oops_in_progress && !early_boot_irqs_disabled);
 
-       /* So, what's a CPU they want? Ignoring this one. */
+       /* Try to fastpath.  So, what's a CPU they want? Ignoring this one. */
        cpu = cpumask_first_and(mask, cpu_online_mask);
        if (cpu == this_cpu)
                cpu = cpumask_next_and(cpu, mask, cpu_online_mask);
@@ -483,22 +483,49 @@ void smp_call_function_many(const struct cpumask *mask,
 
        data = &__get_cpu_var(cfd_data);
        csd_lock(&data->csd);
+
+       /* This BUG_ON verifies our reuse assertions and can be removed */
        BUG_ON(atomic_read(&data->refs) || !cpumask_empty(data->cpumask));
 
+       /*
+        * The global call function queue list add and delete are protected
+        * by a lock, but the list is traversed without any lock, relying
+        * on the rcu list add and delete to allow safe concurrent traversal.
+        * We reuse the call function data without waiting for any grace
+        * period after some other cpu removes it from the global queue.
+        * This means a cpu might find our data block as it is being
+        * filled out.
+        *
+        * We hold off the interrupt handler on the other cpu by
+        * ordering our writes to the cpu mask vs our setting of the
+        * refs counter.  We assert only the cpu owning the data block
+        * will set a bit in cpumask, and each bit will only be cleared
+        * by the subject cpu.  Each cpu must first find its bit is
+        * set and then check that refs is set indicating the element is
+        * ready to be processed, otherwise it must skip the entry.
+        *
+        * On the previous iteration refs was set to 0 by another cpu.
+        * To avoid the use of transitivity, set the counter to 0 here
+        * so the wmb will pair with the rmb in the interrupt handler.
+        */
+       atomic_set(&data->refs, 0);     /* convert 3rd to 1st party write */
+
        data->csd.func = func;
        data->csd.info = info;
-       cpumask_and(data->cpumask, mask, cpu_online_mask);
-       cpumask_clear_cpu(this_cpu, data->cpumask);
 
-       /*
-        * To ensure the interrupt handler gets an complete view
-        * we order the cpumask and refs writes and order the read
-        * of them in the interrupt handler.  In addition we may
-        * only clear our own cpu bit from the mask.
-        */
+       /* Ensure 0 refs is visible before mask.  Also orders func and info */
        smp_wmb();
 
-       atomic_set(&data->refs, cpumask_weight(data->cpumask));
+       /* We rely on the "and" being processed before the store */
+       cpumask_and(data->cpumask, mask, cpu_online_mask);
+       cpumask_clear_cpu(this_cpu, data->cpumask);
+       refs = cpumask_weight(data->cpumask);
+
+       /* Some callers race with other cpus changing the passed mask */
+       if (unlikely(!refs)) {
+               csd_unlock(&data->csd);
+               return;
+       }
 
        raw_spin_lock_irqsave(&call_function.lock, flags);
        /*
@@ -507,6 +534,12 @@ void smp_call_function_many(const struct cpumask *mask,
         * will not miss any other list entries:
         */
        list_add_rcu(&data->csd.list, &call_function.queue);
+       /*
+        * We rely on the wmb() in list_add_rcu to complete our writes
+        * to the cpumask before this write to refs, which indicates
+        * data is on the list and is ready to be processed.
+        */
+       atomic_set(&data->refs, refs);
        raw_spin_unlock_irqrestore(&call_function.lock, flags);
 
        /*