High-perf SelectInParallel in 120 lines of C#

A few months ago at Lokad, we started working on 8-core machines. Multi-core machines need adequate algorithmic design to leverage their processing power; and such a design can be more or less complicated depending of the algorithm that you are trying to parallelize.

In our case, there were many situations where the parallelization was quite straightforward: large loops, all iterations being independents. At that time, PLinq, the parallelization library from Microsoft wasn’t still available as a final product (it will be shipped with Visual Studio 2010). Thus, since we were quite in a hurry, we decided to code our own SelectInParallel method (code being provided below). Basically, it’s just Select but with a parallel execution for each item being selected.

Although, being surprisingly simple, we found out that, at least for Lokad, SelectInParallel alone was fitting virtually 99% of our multi-core parallelization needs.

Yet, when we did start to try to speed-up algorithms with our first SelectInParallel implementation, we did end-up stuck with poor speed-up ratio at 3x or even 2x where I was expecting near 8x speed-up.

At first, I thought it was an illustration of the Amdahl’s law. But a more detailed performance investigation did show I was just plain wrong. The harsh reality was: threads, when not (very) carefully managed, involve a (very) significant overhead.

Our last SelectInParallel implementation is now 120 lines long with a quasi-negligible overhead, i.e. bringing a near linear speed-up with the number of CPU cores on your machine. Yet, this performance wasn’t easy to achieve. Let’s review two key aspects of the implementation.

Keep your main thread working: In the first implementation, we did follow the naive pattern: start N-threads (N being the number of CPUs), wait for them to finish, collect the results and proceed. Bad idea, if the amount of work happens to be small, then, simply waiting for your threads to start is going to be a performance killer. Instead, you should start N-1 threads, and get your calling thread working right away.

Avoid synchronization altogether: At first, we were using a Producer - Consumer threading pattern. Bad idea again: it produces a lot of locking contention, the work queue becoming the main bottleneck of the process. Instead, an arithmetic trick can be used to let the workers tackle disjoint workset right from the beginning and without any synchronization.

So far, we have been quite satisfied by our 120-lines ersatz to PLinq. Hope this piece of code can help a few other people to get the most of their many-core machines. If you have ideas to improve further the performance of this SelectInParallel implementation, just let me know.

using System;  
using System.Threading;  
  
namespace Lokad.Threading  
{  
    ///<summary>
    /// Quick alternative to PLinq.
    /// </summary>

    public static class ParallelExtensions
    {
        static int _threadCount = Environment.ProcessorCount;
 
  
        /// <summary>Get or sets the number of threads to be used in  
        /// the parallel extensions. </summary>  
        public static int ThreadCount  
        {  
            get { return _threadCount; }  
            set  
            {  
                _threadCount = value;  
            }  
        }  
  
        /// <summary>Fast parallelization of a function over an array.</summary>  
        /// <param name="input">Input array to processed in parallel.</param>  
        /// <param name="func">The action to perform (parameters and all the members should be immutable!!!).</param>  
        /// <remarks>Threads are recycled. Synchronization overhead is minimal.</remarks>  
        public static TResult[] SelectInParallel<TItem, TResult>(this TItem[] input, Func<TItem,TResult> func)  
        {  
            var results = new TResult[input.Length];  
  
            if (_threadCount == 1 || input.Length == 1)  
            {  
                for(int i = 0; i < input.Length; i++)  
                {  
                    results[i] = func(input[i]);  
                }  
  
                return results;  
            }  
  
            // perf: no more thread than items in collection  
            int threadCount = Math.Min(_threadCount, input.Length);  
  
            // perf: start by syncless process, then finish with light index-based sync  
            // to adjust varying execution time of the various threads.  
            int threshold = Math.Max(0, input.Length - (int) Math.Sqrt(input.Length) - 2*threadCount);  
            int workingIndex = threshold - 1;  
  
            var sync = new object();  
  
            Exception exception = null;  
  
            int completedCount = 0;  
            WaitCallback worker = index =>  
            {  
                try  
                {  
                    // no need for lock - disjoint processing  
                    for(var i = (int) index; i < threshold; i += threadCount)  
                    {  
                        results[i] = func(input[i]);  
                    }  
  
                    // joint processing  
                    int j;  
                    while((j = Interlocked.Increment(ref workingIndex)) < input.Length)  
                    {  
                        results[j] = func(input[j]);  
                    }  
  
                    var r = Interlocked.Increment(ref completedCount);  
  
                    // perf: only the terminating thread actually acquires a lock.  
                    if (r == threadCount && (int)index != 0)  
                    {  
                        lock (sync) Monitor.Pulse(sync);  
                    }  
                }  
                catch (Exception ex)  
                {  
                    exception = ex;  
                    lock (sync) Monitor.Pulse(sync);  
                }  
            };  
  
            for (int i = 1; i < threadCount; i++)  
            {  
                ThreadPool.QueueUserWorkItem(worker, i);  
            }  
            worker((object) 0); // perf: recycle current thread  
  
            // waiting until completion or failure  
            while(completedCount < threadCount && exception == null)  
            {  
                // CAUTION: limit on wait time is needed because if threads  
                // have terminated   
                // - AFTER the test of the 'while' loop, and  
                // - BEFORE the inner 'lock'   
                // then, there is no one left to call for 'Pulse'.  
                lock (sync) Monitor.Wait(sync, 10.Milliseconds());  
            }  
  
            if(exception != null)  
            {  
                throw exception;  
            }  
  
            return results;   
        }  
    }  
}

Reader Comments (2)

What about moving it to Lokad.Shared? March 23, 2009 | Rinat Abdullin


Posted by Lawrence: I have a need to process billions of records and run 8 methods each time. I have an 8 core machine so I worked out how to do this by initiating 8 worker threads and the signalling each one when a new record was ready to be processed. My problem is that running the 8 methods sequentially only takes 0.015 millisecs(very small amount of time - but over billions of iterations…). BUT each time I set a signal via AutoResetEvent.Set(), each single call to .Set() takes 0.025 millisecs…. MSDN forums are unable to help. Does your method above solve this problem? I would appreciate any advice, Lawrence August 5, 2009 | joannes


Hi Lawrence, The “trick” behind my SelectInParallel is that, precisely, there is virtually no need for synchronization calls. In your case, AutoResetEvent acts a synchronization call. If you are able to able to retrieve your record in big batches (typically 10k records at a time), then I think that the SelectInParallel would definitively be handy. Hope it helps. August 5, 2009 | joannes


So Monitor.Pulse and Monitor.Wait are not synchronization calls? Are they very lite with little time overhead? August 5, 2009 | Lawrence


Monitor.Pulse and Monitor.Wait are synchronization calls but the trick is to do very very few of them, that’s what SelectInParallel is doing. August 5, 2009 | joannes


OK. I can see I am doing 16 more calls to AutoResetEvent per record than you call Monitor.Pulse. That’s a saving I can make… Do you have any idea if Monitor.Pulse is lighter (takes less time) than AutoResetEvent? i.e. Is it worth me changing my code to use it, or just use AutoResetEvent less as in your code? Many thanks for this guidance August 5, 2009 | Lawrence


My guess would be that the two (Pulse vs. AutoResetEvent) should be pretty similar because the underlying mechanism is probably similar. That being said, only a real benchmark will be able to address properly this question (sorry not being able to be more helpful here). August 7, 2009 | joannes


joannes, I am not sure, but i guess there are still two more potentials for improvement in your code: 1) if the chunks are split by the main thread and the limits overgiven to the worker thread, the repeated Interlock calls can be saved (saving I think two memory barriers each). 2) “r == threadCount” where do we know, the ‘threadCount’ thread will be the terminating one? And lastly: 3) Monitor.Pulse can be circumvented completely, by using a downcounting _workerActive variable in the main thread scope, decrementing it at the end of the worker() scope and let the main thread spin wait for it. For ILNumerics this gave the best results. Best regards, Haymo March 2, 2012 | haymo


Regarding the costs of Monitor and AutorResetEvent. One relys on the other: www.albahari.com/threading/part4.aspx#_Signaling_with_Wait_and_Pul March 2, 2012 | haymo