Author

I am Joannes Vermorel, founder at Lokad. I am also an engineer from the Corps des Mines who initially graduated from the ENS.

I have been passionate about computer science, software matters and data mining for almost two decades. (RSS - ATOM)

Meta

Monday
Mar232009

## High-perf SelectInParallel in 120 lines of C#

A few months ago at Lokad, we started working on 8-core machines. Multi-core machines need adequate algorithmic design to leverage their processing power; and such a design can be more or less complicated depending of the algorithm that you are trying to parallelize.

In our case, there were many situations where the parallelization was quite straightforward: large loops, all iterations being independents. At that time, PLinq, the parallelization library from Microsoft wasn't still available as a final product (it will be shipped with Visual Studio 2010). Thus, since we were quite in a hurry, we decided to code our own SelectInParallel method (code being provided below). Basically, it's just Select but with a parallel execution for each item being selected.

Although, being surprisingly simple, we found out that, at least for Lokad, SelectInParallel alone was fitting virtually 99% of our multi-core parallelization needs.

Yet, when we did start to try to speed-up algorithms with our first SelectInParallel implementation, we did end-up stuck with poor speed-up ratio at 3x or even 2x where I was expecting near 8x speed-up.

At first, I thought it was an illustration of the Amdahl's law. But a more detailed performance investigation did show I was just plain wrong. The harsh reality was: threads, when not (very) carefully managed, involve a (very) significant overhead.

Our last SelectInParallel implementation is now 120 lines long with a quasi-negligible overhead, i.e. bringing a near linear speed-up with the number of CPU cores on your machine. Yet, this performance wasn't easy to achieve. Let's review two key aspects of the implementation.

Keep your main thread working: In the first implementation, we did follow the naive pattern: start N-threads (N being the number of CPUs), wait for them to finish, collect the results and proceed. Bad idea, if the amount of work happens to be small, then, simply waiting for your threads to start is going to be a performance killer. Instead, you should start N-1 threads, and get your calling thread working right away.

Avoid synchronization altogether: At first, we were using a Producer - Consumer threading pattern. Bad idea again: it produces a lot of locking contention, the work queue becoming the main bottleneck of the process. Instead, an arithmetic trick can be used to let the workers tackle disjoint workset right from the beginning and without any synchronization.

So far, we have been quite satisfied by our 120-lines ersatz to PLinq. Hope this piece of code can help a few other people to get the most of their many-core machines. If you have ideas to improve further the performance of this SelectInParallel implementation, just let me know.

using System;using System.Threading;namespace Lokad.Threading{    ///<summary>    /// Quick alternative to PLinq.    ///</summary>    public static class ParallelExtensions    {        static int _threadCount = Environment.ProcessorCount;        /// <summary>Get or sets the number of threads to be used in        /// the parallel extensions. </summary>        public static int ThreadCount        {            get { return _threadCount; }            set            {                _threadCount = value;            }        }        /// <summary>Fast parallelization of a function over an array.</summary>        /// <param name="input">Input array to processed in parallel.</param>        /// <param name="func">The action to perform (parameters and all the members should be immutable!!!).</param>        /// <remarks>Threads are recycled. Synchronization overhead is minimal.</remarks>        public static TResult[] SelectInParallel<TItem, TResult>(this TItem[] input, Func<TItem,TResult> func)        {            var results = new TResult[input.Length];            if (_threadCount == 1 || input.Length == 1)            {                for(int i = 0; i < input.Length; i++)                {                    results[i] = func(input[i]);                }                return results;            }            // perf: no more thread than items in collection            int threadCount = Math.Min(_threadCount, input.Length);            // perf: start by syncless process, then finish with light index-based sync            // to adjust varying execution time of the various threads.            int threshold = Math.Max(0, input.Length - (int) Math.Sqrt(input.Length) - 2*threadCount);            int workingIndex = threshold - 1;            var sync = new object();            Exception exception = null;            int completedCount = 0;            WaitCallback worker = index =>            {                try                {                    // no need for lock - disjoint processing                    for(var i = (int) index; i < threshold; i += threadCount)                    {                        results[i] = func(input[i]);                    }                    // joint processing                    int j;                    while((j = Interlocked.Increment(ref workingIndex)) < input.Length)                    {                        results[j] = func(input[j]);                    }                    var r = Interlocked.Increment(ref completedCount);                    // perf: only the terminating thread actually acquires a lock.                    if (r == threadCount && (int)index != 0)                    {                        lock (sync) Monitor.Pulse(sync);                    }                }                catch (Exception ex)                {                    exception = ex;                    lock (sync) Monitor.Pulse(sync);                }            };            for (int i = 1; i < threadCount; i++)            {                ThreadPool.QueueUserWorkItem(worker, i);            }            worker((object) 0); // perf: recycle current thread            // waiting until completion or failure            while(completedCount < threadCount && exception == null)            {                // CAUTION: limit on wait time is needed because if threads                // have terminated                 // - AFTER the test of the 'while' loop, and                // - BEFORE the inner 'lock'                 // then, there is no one left to call for 'Pulse'.                lock (sync) Monitor.Wait(sync, 10.Milliseconds());            }            if(exception != null)            {                throw exception;            }            return results;         }    }}
Monday
Nov192007

## Delete-proof data paging

In order to retrieve a large amount of data from a SQL table, you need to resort to a data paging scheme. Conceptually, a typical paged SQL query looks like (the syntax is approximate and vaguely inspired from MS SQL Server 2005)
SELECT Foo.BarFROM FooWHERE RowNumber() BETWEEN @Index AND @Index + @PageSizeORDER BY Foo.Id;
The queries are made iteratively until no rows get returned any more. Yet, this approach fails both if rows are added or deleted in the table Foo during the iteration.

If rows are inserted, then the RowNumbers() will be impacted. Some rows will see their number to be incremented.

• The newly inserted rows maybe missed.

• Certain rows are going to be retrieved twice.

In the overall, the situation isn't bad, because, after all, all rows that were present when the retrieval iteration started will be retrieved.

In the other hand, if rows are deleted, then some rows will get their RowNumbers() decremented. As a consequence, certain rows will never get retrieved. This issue is quite troublesome, because you would not expect deleted rows to prevent the retrieval of other (valid) rows.

One workaround is to this situation would be to add some IsDeleted flag to the table (instead of actually deleting rows, flags would be changed from false to true); and to purge the database only once a while. Yet, this solution has many drawbacks: table schema must be changed and all existing queries that target this table must add the extra condition IsDeleted = false.

A more subtle approach to the problem consists in changing the definition of the iterating index. Instead of using an index that correspond to a generate RowNumbers(), let directly use @IndexId, the greatest identifier retrieved so far. Thus, the paged query becomes
SELECT TOP @PageSizeFROM FooWHERE Foo.Id > @IndexIdORDER BY Foo.Id
With this new approach, we are now certain that no rows will be skipped during the retrieval process. Plus, we haven't made any change to the table schema.

Saturday
Mar172007

## Iridium r8 released and new website fo Math.NET

Math.NET is open-source project delivering mathematics / statistics libraries written in C# (and mostly targeting .Net, although Mono compatibility should not be an issue). I have been personally contributing on the numerics package of Math.NET code-named Iridium.

Several years ago, I did setup a mediawiki-based website for Math.NET, but it's now completely obsolete. Yet, I would like to mention Christoph Ruegg has released a brand new website for Math.NET. Update your (social) bookmarks with http://mathnet.opensourcedotnet.info/

Check also 8th release (March 2007) of the Iridium package available in the download section.

Page 1 2