One of the most common bugs occurring in any multi-threaded or multi-process code is corrupting shared data due to poor (or lack of) concurrency control. Concurrency is one term used to describe code interactions that are not sequential in nature (equivalent or companion terms include *parallel*, *multi-threaded* and *multi-process)*. Within this context, concurrency control indicates the tactics used to ensure the integrity of shared data.

To demonstrate this problem, we’ll use a fairly simple example: Counting even, odd and prime numbers in a large set. We’ll use different strategies over the same data set for serial and parallel processing. The serial implementation, SerialMapReduceWorker, is straight-forward since it involves no concurrency issues.

foreach (var kvp in input)
{
if (0 == kvp.Value % 2)
{
++mrr.Evens;
}
else
{
++mrr.Odds;
}
if (true == AMT.Math.IsPrime.TrialDivisionMethod(kvp.Value))
{
++mrr.Primes;
}
} |

foreach (var kvp in input)
{
if (0 == kvp.Value % 2)
{
++mrr.Evens;
}
else
{
++mrr.Odds;
}
if (true == AMT.Math.IsPrime.TrialDivisionMethod(kvp.Value))
{
++mrr.Primes;
}
}

SerialMapReduceWorker iterates over the set of integers, kvp, determines whether each integer is even, odd or prime, and increments the appropriate counter in the MapReduceResult instance, mrr. (Although no map-reduce is involved, SerialMapReduceWorker is named for consistency with concurrent workers)

.NET’s Task Parallel Library (TPL) makes it very easy (too easy?) to convert this code to run concurrently. All a developer has to do is change *foreach *to* Parallel.ForEach*, include some lambda syntax, and voila!, the code magically runs much faster!

Parallel.ForEach(input, kvp =>
{
if (0 == kvp.Value % 2)
{
++mrr.Evens;
}
else
{
++mrr.Odds;
}
if (true == AMT.Math.IsPrime.TrialDivisionMethod(kvp.Value))
{
++mrr.Primes;
}
}); |

Parallel.ForEach(input, kvp =>
{
if (0 == kvp.Value % 2)
{
++mrr.Evens;
}
else
{
++mrr.Odds;
}
if (true == AMT.Math.IsPrime.TrialDivisionMethod(kvp.Value))
{
++mrr.Primes;
}
});

Just look at these results – the parallel version executed almost twice a fast!

9999999 of 9999999 input values are unique

[SerialMapReduceWorker] Evens: 5,000,533; Odds: 4,999,466; Primes: 244,703; Elapsed: 00:00:**51**.6998025

[ParallelMapReduceWorker_Unprotected] Evens: 4,996,020; Odds: 4,994,704; Primes: 244,662; Elapsed: 00:00:**30**.5742845

Unfortunately, this conversion is also a dangerously naive implementation of concurrent code. Did you notice the problems? The parallel code found a different number of even, odd and prime numbers within the same set of integers. How is that possible? Answer: data corruption due to a lack of concurrency control.

The implementation in ParallelMapReduceWorker_Unprotected does nothing to protect the MapReduceResult instance, mrr. Each thread involved increments mrr.Evens, mrr.Odds and mrr.Primes. In effect, the threads might increment mrr.Evens from 4 to 5 simultaneously when the expectation is that one will increment from 4 to 5 and another from 5 to 6. As you can see in the results above, this unexpected data corruption causes ParallelMapReduceWorker_Unprotected’s count of even integers to be wrong by about 4,000.

In this case, correcting the error is fairly simple. The code just needs to protect access to MapReduceResult to ensure that only one thread can increment at a time. The corrected ParallelMapReduceWorker:

Parallel.ForEach(input, kvp =>
{
if (0 == kvp.Value % 2)
{
lock (mrr)
{
++mrr.Evens;
}
}
else
{
lock (mrr)
{
++mrr.Odds;
}
}
if (true == AMT.Math.IsPrime.TrialDivisionMethod(kvp.Value))
{
lock (mrr)
{
++mrr.Primes;
}
}
}); |

Parallel.ForEach(input, kvp =>
{
if (0 == kvp.Value % 2)
{
lock (mrr)
{
++mrr.Evens;
}
}
else
{
lock (mrr)
{
++mrr.Odds;
}
}
if (true == AMT.Math.IsPrime.TrialDivisionMethod(kvp.Value))
{
lock (mrr)
{
++mrr.Primes;
}
}
});

Each time this code determines it needs to increment one of the counters, it uses concurrency control by:

- Locking the MapReduceResult instance
- Incrementing the appropriate counter
- Unlocking the MapReduceResult instance

Since only one thread can lock *mrr*, other threads must wait until it is unlocked to proceed. This locking now guarantees that, continuing our previous case, *mrr.Evens* is correctly implemented from 4 to 5 only once. ParallelMapReduceWorker correctly calculates the counts (as compared to SerialMapReduceWorker), and does with almost the same performance as the unprotected version.

9999999 of 9999999 input values are unique

[SerialMapReduceWorker] Evens: **5,000,533**; Odds: 4,999,466; Primes: 244,703; Elapsed: 00:00:51.6998025

[ParallelMapReduceWorker_Unprotected] Evens: 4,996,020; Odds: 4,994,704; Primes: 244,662; Elapsed: 00:00:30.5742845

[ParallelMapReduceWorker] Evens: **5,000,533**; Odds: 4,999,466; Primes: 244,703; Elapsed: 00:00:30.6871152

NOTE: The count of primes appears to be incorrect. The IsPrime.TrialDivisionMethod implementation is intentionally slow to ensure multiple threads contend for access to the same data. Unfortunately, and unintentionally, it also appears to be incorrect. (c.f., Count of Primes)