Home > C#, functional, threads > Threadsafe Interfaces, Delegates vs Interfaces, Stuff Like That

Threadsafe Interfaces, Delegates vs Interfaces, Stuff Like That

The mighty Skeet posted the other day about the idea of using a single instance of IEnumerable as a shared input for multiple threads (not that I realised at first, but I got there eventually).

Clearly the interface is no good for that purpose, because two operations (“check for existence of next item” and “read next item”) are exposed as separate methods/properties, so it can’t be made atomic. Jared Parsons has blogged a lot of times very readably about this.

This got me thinking, because I’ve noticed that I can often shrink an interface declaration down so it only has one method. And then it doesn’t need to be an interface; it can just be a delegate. That way you can implement it on the fly with a lambda expression. If you express it without any out/ref parameters, you don’t even have to declare a new delegate type. And if you have a Tuple class (as in .NET 4.0), you don’t need to declare any new types – just higher order methods.

Suppose you have a method that takes no parameters and returns something, i.e. any Func. This might be a useful general facility:

public static IEnumerable<T> Repeat<T>(this Func<T> read)
{
    for (; ; ) yield return read();
}

So now with any such method you can turn it into an IEnumerable:

Func<string> readLine = Console.ReadLine;
var lines = readLine.Repeat();

If you are in the habit of handing out such functions to multiple threads, and you want them to synchronize their calls to it, this would presumably be useful:

public static Func<TResult> ToAtomic<TResult>(this Func<TResult> func)
{
    var padlock = new object();
    return () => { lock (padlock) { return func(); } };
}

So for any function, you can now make another function that takes out a lock before calling the original function. (At first glance this may appear pointless, as the padlock object looks like a local variable, but it’s not – it’s captured by the new function we’re returning, and so will be shared across any callers who have a reference to that returned function.) So for example:

var threadSafeReadLine = readLine.ToAtomic();

(Not a great example if readLine is Console.ReadLine, as I think that’s probably already atomic).

As atomic interfaces often involve single methods (because otherwise they are impossible to make atomic in a simple way), this might be a very general useful facility, and you could create overloads of ToAtomic for 1, 2 … 8 arguments.

IEnumerator doesn’t fit that pattern, but we can fix it very easily with another extension method:

public static Func<Tuple<bool, T>> ToFuncT>(this IEnumerator<T> source)
{
    return () => source.MoveNext() ? Tuple.Create(true, source.Current)
                                   : Tuple.Create(false, default(T));
}

This makes us a new delegate that returns a Tuple, in which the bool will be false when we reach the end of the list. By converting to a Func in this way, we can now apply our ToAtomic extension to it. That makes it safe to hand out to multiple threads. Then for each thread seperately, we can turn it back into an IEnumerable by applying our Repeat extension to it.

Although for real convenience, a worker thread doesn’t really want an infinite sequence of tuples! They’d surely prefer a sequence of simple values, which terminates when there are no more values. That’s easy enough to enable:

public static IEnumerable<T> ToEnumerable<T>(
                  this Func<Tuple<bool, T>> read)
{
    return read.Repeat().TakeWhile(item => item.Item1)
                        .Select(item => item.Item2);
}

So now we can write a simple test:

var results = new HashSet<int>();

var atomicResultsAdd = new Func<int, bool>(results.Add).ToAtomic();

const int count = 10000000;
var numbers = Enumerable.Range(0, count);

var atomicNumbers = numbers.GetEnumerator().ToFunc().ToAtomic();

Util.Multithread(4, () =>
    {
        foreach (var number in atomicNumbers.ToEnumerable())
        {
            if (!atomicResultsAdd(number))
                Console.WriteLine("Clash: " + number);
        }
    });

for (int n = 0; n < count; n++)
{
    if (!results.Contains(n))
        Console.WriteLine("Skipped: " + n);
}

The definition of Util.Multithread is left as an exercise for the reader. 🙂 Of course all it does is run the action passed to it on the requested number of threads, and waits for them all to finish.

We use a HashSet to check that each number in the range is consumed exactly once by a thread. (Another job for ToAtomic to do – this time, it’s the overload that takes one argument, which wraps the HashSet.Add method.)

You can see the difference if you change the expression for atomicNumbers (apologies for the chemistry pun):

var atomicNumbers = numbers.GetEnumerator().ToFunc() /*.ToAtomic()*/ ;

By commenting out that ToAtomic at the end, we cause a random number of messages about clashes and skips. Even so, they can be surprisingly rare, which is why threading bugs are so horrible. Here I’m testing with a range of ten million, and I still only get a handful of clashes or skips. This would presumably depend a little on the hardware.

I should say that the only problem with all this is that IEnumerable is not actually something that can generally be collapsed into a single method, because of its pesky (but very important) Dispose method that it inherits from IDisposable. If a consumer abandons an iteration without calling Dispose, then if the iteration is defined by an iterator method (containing yield return statements) it will not execute any outstanding finally blocks. So you can only use this approach safely for enumerators that don’t require clean up when they are abandoned.

Advertisements
Categories: C#, functional, threads Tags: , ,
  1. Dax
    October 28, 2009 at 10:58 pm

    This is brilliant! So is there a way to do something like an IEnumerable extension method “OnDispose(Action action)” and then pass it an action that decrements a counter than then disposes the original enumerator when the count reaches zero? Would you have to change Func<Tuple> to Func<bool, Tuple>?

  2. earwicker
    October 29, 2009 at 10:05 am

    I’m not sure how that would work – why would it be an extension method on IEnumerable?

    The reason for using delegates instead of interfaces is to get the syntax sugar of lambdas. But we can still get that with interfaces if we follow a simple pattern – maybe I should write another post about that…

    • Dax
      October 29, 2009 at 11:37 am

      I figured out we can do it if we change Func<Tuple> to Func<int, Tuple>. Basically change it from:

      a function that takes void and returns a Tuple, to:

      a function that takes an int, updates a reference counter with that int, disposes the enumerator if necessary, and then returns the tuple.

      So, the ToEnumerable method will become:

      public static IEnumerable ToEnumerable(this Func<int, Tuple> enumeratorDelegate) {
      enumeratorDelegate(1);
      try {
      foreach (var v in enumeratorDelegate.Repeat(0).TakeWhile(item => item.Item1).Select(item => item.Item2)) {
      yield return v;
      }
      } finally {
      enumeratorDelegate(-1);
      }
      }

      The ToFunc method now needs to handle the reference count change, so it becomes:

      public static Func<int, Tuple> ToFunc(this IEnumerator source) {
      return referenceCountChange => {
      source.HandleReferenceCountChange(referenceCountChange);
      return source.MoveNext() ? Tuple.Create(true, source.Current)
      : Tuple.Create(false, default(TResult));
      };
      }

      And then we just need a reference count helper for IDisposable. This makes use of an ugly static dictionary, but so it goes.

      readonly static Dictionary References = new Dictionary();
      public static void HandleReferenceCountChange(this IDisposable disposable, int i) {
      References[disposable] = References.ContainsKey(disposable) ? References[disposable] + i : i;
      if (References[disposable] == 0) {
      disposable.Dispose();
      References.Remove(disposable);
      }
      }

      With all this in line, we no longer have to worry about the underlying enumerator’s Dispose method getting called at the right time if someone changed your multithread to something like:

      Util.Multithread(4, () => {
      using (var iter = atomicNumbers.GetEnumerator()) {
      for (var i = 0; i < 2000000; ++i) {
      if (iter.MoveNext()) {
      var number = iter.Current;
      if (!atomicResultsAdd(number)) {
      Console.WriteLine("Clash: " + number);
      }
      }
      }
      }
      });

      You can test this out by changing your original enumeration from a Range to something like:

      public static IEnumerable Integers() {
      try {
      for (var i = 0; ; ++i) {
      yield return i;
      }
      } finally {
      Console.WriteLine(“Integers Enumerator Disposed”);
      }
      }

      Also you can add a Console.WriteLine to the HandleReferenceCountChange method just to prove that the original enumerator isn’t getting disposed until the last reference is removed.

      Of course, the one bad part about reference counting here is that if the Multithread method executes too fast, then your first thread may end up disposing the enumerator before the other threads have a chance to add their references. Ultimately the app itself should probably be in charge of disposal, and should probably call it after the Multithread completes or use of the enumerator is otherwise known to be finished.

  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: