打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Building a custom thread pool (series, part 2): a work stealing queue
Building a custom thread pool (series, part 2): a work stealing queue

The primary reason a traditional thread pool doesn’t scale is thatthere’s a single work queue protected by a global lock.  For obviousreasons, this can easily become a bottleneck.  Two primary thingscontribute heavily to whether the global lock becomes a limiting factorfor a particular workload’s throughput:

  1. As the size of work items become smaller, the frequency at which the pool’s threads must acquire the global lock increases.  Moving forward, we expect the granularity of latent parallelism to become smaller such that programs can scale as more processors are added.
  2. As more processors are added, the arrival rate at the lock will increase when compared to the same workload run with fewer processors.  This inherently limits the ability to “get more work through” that single straw that is the global queue.

For coarse-grained work items, and for small numbers of processors,these problems simply aren’t too great.  That has been the CLRThreadPool’s forte for quite some time; most work items range in the1,000s to 10,000s (or more) of CPU cycles, and 8-processors wasconsidered pushing the limits.  Clearly the direction the whole industryis headed in exposes these fundamental flaws very quickly.  We’d liketo enable work items with 100s and 1,000s of cycles and must scale wellbeyond 4, 8, 16, 32, 64, ... processors.

Decentralized scheduling techniques can be used to combat thisproblem.  In other words, if we give different components their own workqueues, we can eliminate the central bottleneck.  This approach worksto a degree but becomes complicated very quickly because clearly wedon’t want each such queue to have its own pool of dedicated threads. So we’d need some way of multiplexing a very dynamic and comparativelylarge number of work pools onto a mostly-fixed and comparativelysmall number of OS threads.

Introducing work stealing

Another technique – and the main subject of this blog post – is touse a so-called work stealing queue (WSQ).  A WSQ is a special kind ofqueue in that it has two ends, and allows lock-free pushes and pops fromone end (“private”), but requires synchronization from the other end(“public”).  When the queue is sufficiently small that private andpublic operations could conflict, synchronization is necessary.  It isarray-based and can grow dynamically.  This data structure was madefamous in the 90’s when much work on dynamic work scheduling was done inthe research community.

In the context of a thread pool, the WSQ can augment the traditionalglobal queue to enable more efficient private queuing and dequeuing.  Itworks roughly as follows:

  • We still have a global queue protected by a global lock.
  • (We can of course consider the ability to have separate pools to reduce pressure on this.)
  • Each thread in the pool has its own private WSQ.
  • When work is queued from a pool thread, the work goes into the WSQ, avoiding all locking.
  • When work is queued from a non-pool thread, it goes into the global queue.
  • When threads are looking for work, they can have a preferred search order:
    • Check the local WSQ.  Work here can be dequeued without locks.
    • Check the global queue.  Work here must be dequeued using locks.
    • Check other threads’ WSQs.  This is called “stealing”, and requires locks.

If you haven’t guessed, this is by-and-large how the Task Parallel Library (TPL) schedules work.

For workloads that recursively queue a lot of work, the use of aper-thread WSQ substantially reduces the synchronization necessary tocomplete the work, leading to far better throughput.  There are alsofewer cache effects due to sharing of the global queue information. “Stealing” is our last course of action in the abovementioned searchlogic, because it has the secondary effect of causing another thread tohave to visit the global queue (or steal) sooner.  In some sense, it isdouble the cost of merely getting an item from the global queue.

Another (subtle) aspect of WSQs is that they are LIFO for privateoperations and FIFO for steals.  This is inherent in how the WSQ’ssynchronization works (and is key to enabling lock-freedom), but hasadditional rationale:

  1. By executing the work most recently pushed into the queue in LIFO order, chances are that memory associated with it will still be hot in the cache.
  2. By stealing in FIFO order, chances are that a larger “chunk” of work will be stolen (possibly reducing the chance of needing additional steals).  The reason for this is that many work stealing workloads are divide-and-conquer in nature; in such cases, the recursion forms a tree, and the oldest items in the queue lie closer to the root; hence, stealing one of those implicitly also steals a (potentially) large subtree of computations that will unfold once that piece of work is stolen and run.

This decision clearly changes the regular order of execution when compared to a mostly-FIFO system, and is the reason we’re contemplating exposing options to control this behavior from TPL.

A simple WorkStealingQueue<T> type

With all that background behind us, let’s jump straight into a reallysimple implementation of a work stealing queue written in C#.

public class WorkStealingQueue<T>

{

The queue is array-based, and we keep two indexes—a head and atail.  The tail represents the private end and the head represents thepublic end.  We also maintain a mask that is always equal to the size ofthe list minus one, helping with some of the bounds-checking arithmeticand handling automatic wraparound for indexing into the array.  Becauseof the way we use the mask (we will assume all legal bits for indexinginto the list are on), the count must always be a power of two.  Wearbitrarily select the number 32 as the queues initial (power of two)size.

 

    private const int INITIAL_SIZE = 32;

    private T[] m_array = new T[INITIAL_SIZE];

    private int m_mask = INITIAL_SIZE - 1;

    private volatile int m_headIndex = 0;

    private volatile int m_tailIndex = 0;
 
We also need a lock to protect the operations that require synchronization.

 

    private object m_foreignLock = new object();

Although they aren’t exercised very much in the code, we have somehelper properties.  The queue is empty when the head is equal to orgreater than the tail, and the count can be computed by subtracting thehead from the tail.  Because these fields never wrap (because we use themask), this is correct.

    public bool IsEmpty

    {

        get { return m_headIndex >= m_tailIndex; }

    }

 

    public int Count

    {

        get { return m_tailIndex - m_headIndex; }

    }
 
OK,let’s get into the meat of the implementation.  Pushing is the obviousplace to start, and, for obvious reasons, we only support privatepushes.  Public pushes are useless given the protocol explained above,i.e., the only public operation we will support is stealing.  Keep inmind when reading this code that m_tailIndex and m_headIndex are bothvolatile variables.

 

    public void LocalPush(T obj)

    {

        int tail = m_tailIndex;


Firstwe must check whether there is room in the queue.  To do so, we justsee if m_tailIndex is less than the sum of m_mask (the size of the listminus one) and m_headIndex.  False negatives are OK, and are certainlypossible because a concurrent steal may come along and take an element,making room, immediately after the check.  We will handle this bysynchronizing in a moment.

 

        if (tail < m_headIndex + m_mask)

        {

If there is indeed room, we can merely stick the object into thearray (masking m_tailIndex with m_mask to ensure we’re within the legalrange) and then increment m_tailIndex by one.  This may look unsafe, butit is in fact safe: writes retire in order in .NET’s memory model, andwe know no other thread is changing m_tailIndex (only private operationswrite to it) and that no thread will try to access the current arrayslot into which we’re storing the element.

 

            m_array[tail & m_mask] = obj;

            m_tailIndex = tail + 1;

        }

Otherwise, we need to head down the slow path which involves resizing.

 

        else

        {

We will take the lock and check that we still need to make room.  

 

            lock (m_foreignLock)

            {

                int head = m_headIndex;

                int count = m_tailIndex - m_headIndex;

 

                if (count >= m_mask)

                {

Assuming we need to make more room, we will just double the sizeof the array, copy elements, fix up the fields, and move on.  Rememberthat the array length is always a power of two, so we can get the nextpower of two by simply bitshifting to the left by one.  We do that forthe mask too, but need to remember to “turn on” the least significantbit by oring one into the mask.

 

                    T[] newArray = new T[m_array.Length << 1];

                    for (int i = 0; i < m_array.Length; i++)

                        newArray[i] = m_array[(i + head) & m_mask];

 

                    m_array = newArray;

                    m_headIndex = 0;

                    m_tailIndex = tail = count;

                    m_mask = (m_mask << 1) | 1;


Afterwe’re done resizing, the m_headIndex is reset to 0, and the m_tailIndexis the previous size of the queue.  We can then store into the queue insame way we would have earlier.

                }

                m_array[tail & m_mask] = obj;

                m_tailIndex = tail + 1;

            }

        }

    }

And that’s that: we’ve added an item into the queue with a localpush.  Now let’s look at the reverse: removing an element with a localpop.  Remember, it’s impossible for a local push and pop to interleavewith one another because they must be executed by the same threadserially.

 

    public bool LocalPop(ref T obj)

    {

First we read the current value of m_tailIndex.  If the queue iscurrently empty, i.e., m_headIndex >= m_tailIndex, then we justreturn false right away.  This is how “emptiness” is conveyed tocallers.

 

        int tail = m_tailIndex;

        if (m_headIndex >= tail)

            return false;
 
Next we disable an annoying C# compiler warning.

 

#pragma warning disable 0420
 
Nowwe have determined there is at least one element in the queue (or wasduring our previous check).  We will now subtract one from the tail,which effectively removes the element.  There is still a chance that wewill “lose” in a race with another thread doing a steal, so we’ll needto be very careful.  In fact, there is a subtle .NET memory model gotchato be aware of: we must guarantee our write to take the element doesnot get trapped in the write buffer beyond a subsequent read of them_headIndex.  If that could happen, we might mistakenly think we tookthe element, while at the same time a stealing thread thought it tookthe same element!  The result would be that the same item will bedequeued by two threads which could lead to disaster.  In a thread pool,it’d amount to the same work item being run twice.  To ensure thisreordering can’t happen, we must use a XCHG to perform the write tom_tailIndex.

 

        tail -= 1;

        Interlocked.Exchange(ref m_tailIndex, tail);
 
Wedetect whether we lost the race by checking to see if our dequeuing ofthe element has made the queue empty.  If it hasn’t, we can just readthe array element in the new m_tailIndex position and return it.

 

        if (m_headIndex <= tail)

        {

            obj = m_array[tail & m_mask];

            return true;

        }

        else

        {

Otherwise, we take the lock and see what to do.  This blocks outall steals.  Either we will find that there indeed is an elementremaining, and we can just return it as we would have done above, or wemust “put the element back” by just incrementing the m_tailIndex.  If wehave to back out our modification, we just return false to indicatethat the queue has become empty.  We know we aren’t racing with itbecoming non-empty because only private pushes are supported.

 

            lock (m_foreignLock)

            {

                if (m_headIndex <= tail)

                {

                    // Element still available. Take it.

                    obj = m_array[tail & m_mask];

                    return true;

                }

                else

                {

                    // We lost the race, element was stolen, restore the tail.

                    m_tailIndex = tail + 1;

                    return false;

                }

            }

        }

    }

Lastly, let’s take a look at the public pop capability.  We allow atimeout to be supplied, because it’s often useful during the stealinglogic to use a 0-timeout on the first pass through all the WSQs.  Thiscan help to eliminate lock wait times and more evenly distributecontention across the list of WSQs.
 

    private bool TrySteal(ref T obj, int millisecondsTimeout)

    {

First we acquire the WSQ’s lock, ensuring mutual exclusion amongall other concurrent steals, resize operations, and local pops that maymake the queue empty.

 

        bool taken = false;

        try

        {

            taken = Monitor.TryEnter(m_foreignLock, millisecondsTimeout);

            if (taken)

            {

Once inside the lock, we must increment m_headIndex by one.  Thismoves the head towards the tail, and has the effect of taking anelement.  Now this part gets quite tricky.  We must ensure that we don’tremove the last element when racing with a local pop that went down itsfast path (i.e., it didn’t acquire the lock).  Given two threads racingto take an element—a steal and a local pop—we must ensure precisely oneof them “wins”.  Having both succeed will lead to the same elementbeing popped twice, and having neither succeed could lead to reportingback an empty queue when in fact an element exists.

To do that, we will write to the m_headIndex variable to tentativelytake the element, and must then read the m_tailIndex right afterward toensure that the queue is still non-empty.  As with the pop logicearlier, we need to use an XCHG operation to write the m_headIndexfield, otherwise we will potentially suffer from a similar legal memoryreordering bug.

 

                int head = m_headIndex;

                Interlocked.Exchange(ref m_headIndex, head + 1);
 
Ifthe queue is non-empty, we just read the element as we usually do: byindexing into the array with the new m_headIndex value using the propermasking.  We then return true to indicate an element was found.

 

                if (head < m_tailIndex)

                {

                    obj = m_array[head & m_mask];

                    return true;

                }

Otherwise, the queue is empty and we must return.  Clearly thisis racy and by the time we return the queue may be non-empty.  If thepool will subsequently wait for work to arrive, this must be taken intoconsideration so as not to incur lost wake-ups.

 

                else

                {

                    m_headIndex = head;

                    return false;

                }

            }

        }

We of course need to release the lock at the end of it all.

 

        finally

        {

            if (taken)

                Monitor.Exit(m_foreignLock);

        }

 

        return false;

    }

}

And that’s it!  As with most lock-free algorithms, the core idea issurprisingly simple but deceptively subtle and intricate.  After seeingit written out and explained in detail, I hope that you’ll have that “Ahhah!” moment that always happens after staring at this kind of code fora little while.  In future posts, we’ll take a closer look at theperformance differences between this and a traditional globallysynchronized queue, and discuss what it takes to merge the two ideasimplementation-wise.

Appendix

For reference, here’s the full code without all the explanation intertwined:

using System;

using System.Threading;

 

public class WorkStealingQueue<T>

{

    private const int INITIAL_SIZE = 32;

    private T[] m_array = new T[INITIAL_SIZE];

    private int m_mask = INITIAL_SIZE - 1;

    private volatile int m_headIndex = 0;

    private volatile int m_tailIndex = 0;

    private object m_foreignLock = new object();

 

    public bool IsEmpty

    {

        get { return m_headIndex >= m_tailIndex; }

    }

 

    public int Count

    {

        get { return m_tailIndex - m_headIndex; }

    }

 

    public void LocalPush(T obj)

    {

        int tail = m_tailIndex;

        if (tail < m_headIndex + m_mask)

        {

            m_array[tail & m_mask] = obj;

            m_tailIndex = tail + 1;

        }

        else

        {

            lock (m_foreignLock)

            {

                int head = m_headIndex;

                int count = m_tailIndex - m_headIndex;

 

                if (count >= m_mask)

                {

                    T[] newArray = new T[m_array.Length << 1];

                    for (int i = 0; i < m_array.Length; i++)

                        newArray[i] = m_array[(i + head) & m_mask];

 

                    // Reset the field values, incl. the mask.

                    m_array = newArray;

                    m_headIndex = 0;

                    m_tailIndex = tail = count;

                    m_mask = (m_mask << 1) | 1;

                }

                m_array[tail & m_mask] = obj;

                m_tailIndex = tail + 1;

            }

        }

    }

 

    public bool LocalPop(ref T obj)

    {

        int tail = m_tailIndex;

        if (m_headIndex >= tail)

            return false;

 

#pragma warning disable 0420

 

        tail -= 1;

        Interlocked.Exchange(ref m_tailIndex, tail);

 

        if (m_headIndex <= tail)

        {

            obj = m_array[tail & m_mask];

            return true;

        }

        else

        {

            lock (m_foreignLock)

            {

                if (m_headIndex <= tail)

                {

                    // Element still available. Take it.

                    obj = m_array[tail & m_mask];

                    return true;

                }

                else

                {

                    // We lost the race, element was stolen, restore the tail.

                    m_tailIndex = tail + 1;

                    return false;

                }

            }

        }

    }

 

    private bool TrySteal(ref T obj, int millisecondsTimeout)

    {

        bool taken = false;

        try

        {

            taken = Monitor.TryEnter(m_foreignLock, millisecondsTimeout);

            if (taken)

            {

                int head = m_headIndex;

                Interlocked.Exchange(ref m_headIndex, head + 1);

                if (head < m_tailIndex)

                {

                    obj = m_array[head & m_mask];

                    return true;

                }

                else

                {

                    m_headIndex = head;

                    return false;

                }

            }

        }

        finally

        {

            if (taken)

                Monitor.Exit(m_foreignLock);

        }

 

        return false;

    }

}

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
环形队列到底是怎么回事?
TBB: concurrent_queue 高性能的奥秘 – 英特尔? 软件网络博客 - 中文
浅谈算法和数据结构(1):栈和队列
.NET源码Stack<t>和Queue<t>的实现 </t...
[算法总结] 6 道题搞定 BAT 面试——堆栈和队列
用LLVM来开发自己的编译器(三)
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服