简易无锁队列

发布于 2023-10-08  131 次阅读


无锁队列

有啥用

无锁队列(Lock-Free Queue)是一种并发数据结构,它允许多个线程在无锁的情况下同时进行入队(Enqueue)和出队(Dequeue)操作。无锁队列的设计目标是提供高效的并发操作,以避免锁竞争带来的性能瓶颈。

无锁队列的主要用途是在并发场景中实现高效的数据共享和通信。以下是无锁队列的几个常见应用场景:

  1. 多线程数据传递:在多线程编程中,线程之间需要传递数据。无锁队列提供了一种高效的方式,允许一个线程将数据放入队列,而另一个线程可以从队列中获取数据,实现线程之间的数据交换。

  2. 事件驱动系统:无锁队列可以用于事件驱动系统,其中多个事件源可以将事件放入队列,而事件处理器可以从队列中获取事件并进行处理。这样可以实现高效的事件处理和调度。

  3. 生产者-消费者模型:无锁队列适用于生产者-消费者模型,其中多个生产者可以将数据放入队列,而多个消费者可以从队列中获取数据进行处理。无锁队列的高效性能可以提高生产者和消费者之间的并发性能。

  4. 并发任务调度:在并发任务调度中,任务可以通过无锁队列进行排队和调度。任务调度器可以将任务放入队列,而工作线程可以从队列中获取任务并执行。无锁队列的高效性能有助于提高任务调度的并发性能。

数据

我们假设每个Node包含一个function用来让多线程pop并且执行,那么这个Node的数据结构是

struct InterNodeSt
{
    std::function<void()> func;
    InterNodeSt* next = nullptr;
};

对于队列而言,我们需要让每个线程去读取内存的值并原子的修改它,这就需要volatile关键字了

struct xQueue
{
private:
    volatile InterNodeSt* m_Head;
    volatile InterNodeSt* m_Tail;
};

此外我们需要一个辅助的结点来避免恶心的边界值问题,那么我们把构造函数修改一下

struct xQueue
{
    xQueue()
    {
        m_Tail = new InterNodeSt;
        m_Head = m_Tail;
    }
private:
    volatile InterNodeSt* m_Head;
    volatile InterNodeSt* m_Tail;
};

Exchange and CAS

下面介绍一下Exchange和CAS操作

Exchange大概是这样的

long mAtomicExchange(volatile long* target, long value);

target就是你需要修改的地方,value就是你想要修改的值,返回值是你修改之前的值。

比如

volatile long a = 1;
long b = mAtomicExchange(&a, 2); // b = 1, a = 2

CAS比Exchange操作多了一步,就是多了一个比对的操作,如果期望值等于旧值才会被修改。此外其返回值依然为旧值。

long mAtomicCAS(volatile long* target, long expect, long value);

比如

volatile long a = 1;
long b = mAtomicCAS(&a, 2, 2); // a(1) != expect(2), b = 1, a = 1
b = mAtomicCAS(&a, b, 2); // ok, a(1) == expect(2), b = 1, a = 2

为了方便使用Windows自带的函数,我们把它包装一下

#define MemExchange(target, value)           \
_InterlockedExchange64((volatile long long*)(target), (long long)(value))
#define MemCAS(target, expect, value)        \
_InterlockedCompareExchange64((volatile long long*)(target), (long long)(value), (long long)(expect))

Push

我们只需要应用一下Exchange就好了,因为Push操作永远是成功的,我们只需要保证原子性即可。

void Push(InterNodeSt* node)
{
    node->next = nullptr;
    auto pOld = (InterNodeSt*)MemExchange(&this->m_Tail, node);
    pOld->next = node;
}

Pop

我们考虑到当队列为空时,此时队列是有一个节点的,这被称为辅助节点。

那么Pop就不可以简单地让m_Head向后移动,因为你Pop出的节点其实是上一个!

比如
[Head|Tail]
[0001|0002]

此时队列中只有一个0001,但是出队的却是0002!所以我们附一次值就好,即让0002 = 0001;

InterNodeSt* pop()
{
    InterNodeSt* expect = nullptr, *old = (InterNodeSt*)m_Head;
    std::function<void()> func;
    do
    {
        expect = old;
        auto next = old->next;
        if (!next)
        {
            return nullptr;
        }
        func = next->func;
        old = (InterNodeSt*)MemCAS(&m_Head, expect, next);
    } while (old != expect);
    old->func = func;
    return old;
}

ABA

coming soon
最后更新于 2023-10-08