Scheduling cross-threaded tasks using .Net’s BlockingCollection

Concurrency has always been a complicated aspect of computer science.

With modern hardware increasing not only in speed but also in parallelism, it has become necessary to use concurrency in applications to fully exploit available resources.

This is done using threads, which can be thought of as independently running sub-programs of a larger program, or process. Each thread has its own state – including its stack – and executes its own code, but stacks within the same process can generally share other resources at will.

With even the most simple applications being multi-threaded these days, games are no exception. As such, one can hardly find a modern game engine or framework that does not use parallelism in some form or another.

However, today I do not want to talk about how to move complex systems like physics simulation, AI, pathfinding, or even rendering to different threads – like many games do.

Instead I will look at a specific low-level problem:

Assuming we have a multi-threaded program, how can the different threads communicate, and specifically: how can we make sure that certain code is always run on certain threads.

The problem

It might not be obvious why this is necessary in the first place. What do we care where our code runs, as long as it executes when we need it to?

One example where this is not good enough is when using OpenGL. To use OpenGL, one needs to create an OpenGL context, which can be thought of as the box containing the entirety of the application’s graphical state.

This context is thread specific, which means that we can only use it – and any OpenGL functionality – from a single thread at a time. We can change what thread the context is bound to, but that is not a practical solution if we need to make OpenGL calls from different threads.

For example: In Roche Fusion, we used multi-threading to massively speed up our loading times. To achieve this – and I may write a post going into more detail on this topic in the future – we have threads that do not have access to the OpenGL context loading most of the game’s assets.

However, we need to load shaders and textures, create vertex and index buffers, and more – all of which can only be done on the OpenGL thread.

What we want is a way to make our OpenGL thread do what we need it to do – from different threads.

Requirements for our solution

What we will create is a data structure that behaves effectively like a todo list. Any thread will add tasks to that list, and our main thread will execute those items as soon as it can.

Adding tasks

When adding tasks, there are three different ways in which we may want the code to behave.

In the simplest case we could forget about the task and continue running the current threads code – potentially adding further tasks. Alternatively, we might want to wait for the task to complete, since we may need its results to continue. We can further split this case by whether we want to receive a return value of our tasks, or not.

To maximise the readability of our code, we will use three different methods for these three behaviours:

public void RunAndForget(Action action)
public void RunAndAwait(Action action)
public T RunAndReturn<T>(Func<T> action)

Executing tasks

Similar to adding new tasks, there are multiple ways in which we may want to execute our tasks.

This largely depends on how – if ever – we are done with our list of tasks, and whether the executing thread also has to do other work next to those on the list. The latter may for example be the case, if the same thread that is creating OpenGL resources also needs to draw an animation to indicate to the user the loading process is progressing properly.

Specifically we may want to execute a single task – blocking the current thread until there is one and it is executed. Or we may want to execute single tasks – but only if there are any, without blocking.

Even more complicated, we may want to try and execute one, or as many as possible tasks until a certain time interval has elapsed.

The method signatures for these different behaviours are as follows:

public void ExecuteOne()
public bool TryExecuteOne()
public bool TryExecuteOne(TimeSpan timeout)
public int ExecuteFor(TimeSpan timeout)

The return values of these methods indicate whether, or how many tasks were executed, which can be useful in a variety of circumstances.

Note that the two methods that time out after a given interval are not guaranteed to actually finish in that time, since they might start a task before the time-out, but only complete it and return afterwards.

There is no real solution to that problem given our constraints, but in practice it does not have to be an issue, if our todo list type is used correctly, which mostly means that we should not give it a few large, but rather many small tasks, if the time-out behaviour is important.

Other considerations

Now that we have defined the interface of the system we are going to create, we should mention for completeness that it is important that our types will be thread-safe, so that we can add and execute tasks on different threads without having to care too much about the horrible things that can happen when multiple threads are working with the same, thread-unsafe resources.

Further, I will also add the additional requirement that all tasks are executed in the order they are added to the list. This is not necessarily needed, but it allows for much easier reasoning about the expected results from the system, especially when it comes to adding multi tasks that may depend on each other.

Lastly, we naturally want to avoid as much overhead as possible, and have our code be as simple as possible. The latter is especially true since concurrency bugs can be very difficult to track down due to their often unpredictable nature.

Underlying data structure

It is clear at this point that the underlying data structure for our system is some sort of queue – a first in, first out collection.

We could implement our own or use the one provided by the .NET framework. In either case we need to make sure that addition and removal are thread-safe, which effectively means spreading locks all over our code.

However, this could quickly become quite messy.

Fortunately for us, .NET (singe version 4) already has a thread-safe queue (as well as other collections) in the System.Collections.Concurrent namespace. As one would expect, the ConcurrentQueue<T> has the typical Enqueue method, as well as a TryDequeue and a TryPeek.

This would in principle be sufficient for most of our requirements. However, one thing we do not have covered is the ability to try and execute tasks for a certain time interval.

We could write that part of the system ourself, but finding the best solution may not be easy, and we would again end up with quite complicated code.

Fortunately, .NET has us covered again. We can use the BlockingCollection<T> type as a wrapper around our concurrent queue to provide exactly the functionality we need.

The blocking collection’s methods we are interested in are:

T Take()
bool TryTake(out T item)
bool TryTake(out T item, Timespan timeout)

Implementing our execution methods

These methods correspond almost one to one to our own method signatures above and behave just as expected. This means that the remaining work consists mostly of simple book-keeping, making it easy to write, and less likely to be prone to bugs.

We thus implement the simplest of our execution methods as follows.

public void ExecuteOne()
{
    var action = this.actions.Take();
    action();
}

Note that actions here – and below – is a private field containing our concurrent collection.

Similarly, the next two methods are fairly straight forward as well.

public bool TryExecuteOne()
{
    Action action;
    if (this.actions.TryTake(out action))
    {
        action();
        return true;
    }
    return false;
}

public bool TryExecuteOne(TimeSpan timeout)
{
    Action action;
    if (this.actions.TryTake(out action, timeout))
    {
        action();
        return true;
    }
    return false;
}

Our last execution method, which executes as many actions as it can, but stops once a given time runs out is only marginally more complicated. We use a Stopwatch to track the total elapsed time, and call TryTake with decreasing time-out intervals until there is no time remaining.

public int ExecuteFor(TimeSpan time)
{
    var timer = Stopwatch.StartNew();
    var executed = 0;
    while (true)
    {
        var timeLeft = time - timer.Elapsed;
        if (timeLeft < new TimeSpan(0))
            break;
        Action action;
        if (!this.actions.TryTake(out action, timeLeft))
            break;
        action();
        executed++;
    }
    return executed;
}

Implementing task adding methods

Thanks to the concurrent collection we are using, the methods we use to add tasks to our queue are also relatively uncomplicated.

The first one is in fact trivial:

public void RunAndForget(Action action)
{
    this.actions.Add(action);
}

The other two are slightly more complicated. For both of them, we have to pause the adding thread until the executing thread has executed the action or function we handed over as parameter.

This is the only part in our code where we – sort of – have to deal with actual threading ourselves.

But instead of writing our own waiting loop or other complicated code we again use a type .NET provides for us: ManualResetEvent

This class does exactly what we need. It allows us to wait in one thread (our adding one) until another (our executing thread) notifies us that we can continue.

For this to work out we wrap our original action inside another one, which does the additional work of setting the ManualResetEvent to indicate that our action was completed.

public void RunAndAwait(Action action)
{
    var reset = new ManualResetEvent(false);

    this.actions.Add(() =>
    {
        action();
        reset.Set();
    });

    reset.WaitOne();
}

With this in place, our last method becomes easy. We again wrap the function we received as argument inside another action that sets our return value and then pass it to the previous method.

public T RunAndReturn<T>(Func<T> action)
{
    T ret = default(T);
    this.RunAndAwait(() => ret = action());
    return ret;
}

And that’s it!

Conclusion

Plugging together all the snippets above results in a simple class that solves our problem of letting specific threads to specific tasks in a multi-threaded environment in an orderly fashion.

The full class – which also adds another few method overloads can be found on GitHub as part of Bearded.Utilities, a general purpose C# utilities project I am involved with.

Note how our implementation makes heavy use of provided framework classes, which makes it much less likely that those parts of the code will contain bugs, and also guarantees good performance – assuming our usage of the framework is proper of course.

I hope you have found this interesting or useful. Maybe you even had to solve this or a similar problem yourself and found an equivalent – or entirely different – solution.

In either case, feel free to let me know what you think and if you have any suggestions in the comments below.

Enjoy the pixels!

Leave a Reply

2 comments

  1. Matty says:

    I love it how you don’t do any concurrency (and therefore something that you will probably fail in with some horrible detail that only occurs every 200 runs), but instead only make an interface to existing code.

    • Paul Scharf says:

      Thanks, and I agree!
      That is one thing I really like as well about my solution.

      On the other hand, using the waiting Run methods, it is possible to create deadlocks if you have threads telling each other what to do.
      So… don’t do that. ;)