提问者:小点点

此发布/检查单个写入程序读取器的更新类是否可以使用memory_order_relaxed或获取/释放以提高效率?


我有一个小类,它利用std::atomic进行无锁操作。由于这个类被大量调用,它影响了性能,我遇到了麻烦。

这个类类似于LIFO,但是一旦调用了pop()函数,它只返回它的环形缓冲区的最后一个写入的元素(只有在最后一个pop()之后有新元素时)。

一个线程正在调用 push(),另一个线程正在调用 pop()。

由于这占用了我太多的电脑时间,我决定进一步研究d::atomic类及其memory_order。我在StackOverflow和其他资源和书籍中阅读了很多memory_order帖子,但我无法清楚地了解不同的模式。特别是,我在获取和发布模式之间挣扎:我也不明白为什么它们与memory_order_seq_cst不同。

memory_order_relaxed:在同一线程中,原子操作是即时的,但其他线程可能无法立即看到最新的值,它们需要一些时间才能更新。代码可以由编译器或操作系统自由重新排序。

memory_order_acquire/release:由atomic::load使用。它可以防止在此之前的代码行被重新排序(编译器/OS可以根据需要在这行之后重新排序),并使用此线程或另一个线程中的memory_order_release或memory_orded_seq_cst读取存储在此原子上的最新值。memory_order_release还可以在代码重新排序后阻止该代码。因此,在获取/发布中,两者之间的所有代码都可以被操作系统打乱。我不确定这是在同一线程之间还是在不同线程之间。

memory_order_seq_cst:最容易使用,因为它就像我们与变量一起使用的自然写入一样,可以立即刷新其他线程加载函数的值。

template<typename T>
class LockFreeEx
{
public:
    void push(const T& element)
    {
        const int wPos = m_position.load(std::memory_order_seq_cst);
        const int nextPos = getNextPos(wPos);
        m_buffer[nextPos] = element;
        m_position.store(nextPos, std::memory_order_seq_cst);
    }

    const bool pop(T& returnedElement)
    {

        const int wPos = m_position.exchange(-1, std::memory_order_seq_cst);
        if (wPos != -1)
        {
            returnedElement = m_buffer[wPos]; 
            return true;
        }
        else
        {
            return false;
        }
    }

private:
    static constexpr int maxElements = 8;
    static constexpr int getNextPos(int pos) noexcept {return (++pos == maxElements)? 0 : pos;}
    std::array<T, maxElements> m_buffer;
    std::atomic<int> m_position {-1};
};

所以,我的第一个想法是在所有原子操作中使用memory_order_relaxed,因为pop()线程在循环中每10-15毫秒在pop函数中寻找可用的更新,然后允许它在第一个pop()函数中失败,然后意识到有一个新的更新。只有几毫秒。

另一种选择是使用发布/获取 - 但我不确定它们。在所有 store() 中使用 release 并在所有 load() 函数中使用 collection。

不幸的是,我描述的所有memory_order似乎都起作用了,我不确定它们什么时候会失败,如果它们应该失败的话。

请你告诉我,如果你在这里使用放松的记忆顺序发现了一些问题?或者我应该使用释放/获取(也许关于这些的进一步解释可以帮助我)?为什么?

我认为,在所有的store()或load()中,轻松是这个类的最佳选择。但我不确定!

感谢阅读。

由于我看到每个人都在要求“char”,所以我将其更改为int,问题解决了!但这不是我想解决的问题。

正如我之前所说的那样,这个类很可能是后进先出的,但如果有最后一个元素的话,那么只有最后一个被推送的元素才重要。

我有一个大结构T(可复制和可签名),必须以无锁的方式在两个线程之间共享。所以,我知道的唯一方法是使用一个循环缓冲区来写T的最后一个已知值,以及一个原子,它知道最后一个写入值的索引。如果没有,索引将为-1。

请注意,我的推送线程必须知道何时有“new T”可用,这就是pop()返回bool的原因。

再次感谢每个试图帮助我记忆订单的人!:)

template<typename T>
class LockFreeEx
{
public:
    LockFreeEx() {}
    LockFreeEx(const T& initValue): m_data(initValue) {}

    // WRITE THREAD - CAN BE SLOW, WILL BE CALLED EACH 500-800ms
    void publish(const T& element)
    {
        // I used acquire instead relaxed to makesure wPos is always the lastest w_writePos value, and nextPos calculates the right one
        const int wPos = m_writePos.load(std::memory_order_acquire);
        const int nextPos = (wPos + 1) % bufferMaxSize;
        m_buffer[nextPos] = element;
        m_writePos.store(nextPos, std::memory_order_release);
    }


    // READ THREAD - NEED TO BE VERY FAST - CALLED ONCE AT THE BEGGINING OF THE LOOP each 2ms
    inline void update() 
    {
        // should I change to relaxed? It doesn't matter I don't get the new value or the old one, since I will call this function again very soon, and again, and again...
        const int writeIndex = m_writePos.load(std::memory_order_acquire); 
        // Updating only in case there is something new... T may be a heavy struct
        if (m_readPos != writeIndex)
        {
            m_readPos = writeIndex;
            m_data = m_buffer[m_readPos];
        }
    }
    // NEED TO BE LIGHTNING FAST, CALLED MULTIPLE TIMES IN THE READ THREAD
    inline const T& get() const noexcept {return m_data;}

private:
    // Buffer
    static constexpr int bufferMaxSize = 4;
    std::array<T, bufferMaxSize> m_buffer;

    std::atomic<int> m_writePos {0};
    int m_readPos = 0;

    // Data
    T m_data;
};

共2个答案

匿名用户

内存顺序不是关于您何时看到对原子对象的某些特定更改,而是关于此更改可以保证周围代码的内容。松弛原子除了对原子对象本身的更改之外什么也不能保证:更改将是原子的。但是您不能在任何同步上下文中使用松弛原子。

你有一些需要同步的代码。您希望弹出已被推动的内容,而不是尝试弹出尚未被推动的内容。因此,如果您使用宽松的操作,则无法保证您的pop会看到此推送代码:

m_buffer[nextPos] = element;
m_position.store(nextPos, std::memory_relaxed);

就像写的那样。它也可以这样看:

m_position.store(nextPos, std::memory_relaxed);
m_buffer[nextPos] = element;

因此,您可能会尝试从缓冲区中获取尚未存在的元素。因此,您必须使用一些同步,至少使用获取/释放内存顺序。

以及您的实际代码。我认为顺序可以如下:

const char wPos = m_position.load(std::memory_order_relaxed);
...
m_position.store(nextPos, std::memory_order_release);
...
const char wPos = m_position.exchange(-1, memory_order_acquire);

匿名用户

您的编写器只需要发布,而不需要seq-cst,但是放松太弱了。在对相应的m_buffer[]条目进行非原子赋值之前,您不能发布m_position的值。您需要发布顺序来确保只有在所有早期内存操作之后,其他线程才能看到m_position存储。https://preshing.com/20120913/acquire-and-release-semantics/

这必须与读取器中的获取或seq_cst负载“同步”。或者至少mo_consume在阅读器中。

理论上,您还需要wpos=m_position至少是acquire(或在读取器中消耗),而不是放松,因为C11的内存模型对于值预测这样的事情足够弱,这可以让编译器在加载实际从相干缓存中获取值之前,推测性地使用wpos的值。

(在实际的CPU上,一个疯狂的编译器可以使用test/branch来引入控制依赖关系,从而允许分支预测推测执行来打破wPos可能值的数据依赖性。

但是普通的编译器不会这样做。在DEC Alpha以外的CPU上,源代码中的数据依赖关系< code>wPos = m_position然后使用< code>m_buffer[wPos]将在asm中创建一个数据依赖关系,就像< code>mo_consume应该利用的那样。真正的isa而不是Alpha保证了依赖加载的依赖排序。(甚至在Alpha上,使用宽松的原子交换可能足以关闭存在于少数真正的Alpha CPUs上的允许这种重新排序的小窗口。)

在为x86编译时,使用mo_acquire完全没有缺点;它不需要任何额外的障碍。在其他ISAs上也有可能,比如32位ARM,其中获取需要一个障碍,因此在轻松的负载下“作弊”可能是一个在实践中仍然安全的胜利。当前的编译器总是将mo_consume增强为mo_acquire,因此我们很遗憾无法利用它。

即使使用seq_cst,您也已经有了真正的单词竞争条件。

  • 初始状态:m_position = 0
  • 读者通过交换m_position = -1 来“声明”插槽 0,并读取 m_buffer[0] 的一部分;
  • 读者出于某种原因睡觉(例如计时器中断取消了它的时间表),或者只是与作家赛跑。
  • 编写器将 wPos = m_position 读取为 -1,并计算 nextPos = 0
    它覆盖了部分读取的m_buffer[0]
  • 读者醒来并完成阅读,得到一个撕裂的T

在读取之后添加第二次检查< code>m_position(像SeqLock一样)并不能在所有情况下检测到这一点,因为写入器直到写入缓冲区元素之后才更新< code>m_position。

即使您的真实用例在读取和写入之间有很长的间隔,这个缺陷也可以在几乎同时发生一次读取和写入的情况下折磨您。

我当然知道读取端不能等待,也不能停止(它是音频),每5-10毫秒弹出一次,写入端是用户输入,这更慢,更快的可以每500毫秒推送一次。

在现代CPU上,一毫秒已经过时了。线程间延迟通常约为60纳秒,因此是微秒的几分之一,例如从四核Intel x86。只要你不睡在互斥锁上,在放弃之前旋转重试一次或两次都不是问题。

代码审查:

该类类似于LIFO,但一旦调用pop()函数,它只返回其环形缓冲区的最后一个写入元素(仅当自last pop()以来有新元素时)。

这不是一个真正的队列或堆栈:push 和 pop 不是好名字。“发布”和“读取”或“获取”可能会更好,并使它的用途更加明显。

我会在代码中加入注释,以说明这对于单个作者和多个读者来说是安全的。(在< code>push中< code>m_position的非原子增量使得它对于多个编写器来说显然是不安全的。)

即便如此,即使同时运行 1 个作家 1 个阅读器,这也有点奇怪。如果在写入过程中开始读取,它将获得“旧”值,而不是等待几分之一微秒来获取新值。然后下次读取时,已经有一个新值在等待;上次错过的那个。因此,例如m_position可以按以下顺序更新:2,-1,3。

这可能是所希望的,也可能不是所希望的,这取决于“陈旧”数据是否有任何价值,以及如果写入程序在写入过程中Hibernate,读取器阻塞的可接受性。或者甚至在作者不睡觉的情况下,旋转等待。

对于具有多个只读读取器的很少写入的小数据,标准模式是SeqLock。例如,用于在无法原子读写128位值的CPU上发布128位当前时间戳。请参阅使用32位原子实现64位原子计数器

为了安全起见,我们可以让编写器自由运行,始终环绕其圆形缓冲区,并让读者跟踪它查看的最后一个元素。

如果只有一个读取器,这应该是一个简单的非原子变量。如果它是一个实例变量,至少把它放在m_buffer[]的另一边。

    // Possible failure mode: writer wraps around between reads, leaving same m_position
    // single-reader
    const bool read(T &elem)
    {
        // FIXME: big hack to get this in a separate cache line from the instance vars
        // maybe instead use alignas(64) int m_lastread as a class member, and/or on the other side of m_buffer from m_position.
        static int lastread = -1;

        int wPos = m_position.load(std::memory_order_acquire);    // or cheat with relaxed to get asm that's like "consume"
        if (lastread == wPos)
            return false;

        elem = m_buffer[wPos];
        lastread = wPos;
        return true;
    }

您希望< code>lastread位于与编写器写入的内容不同的缓存行中。否则,读取器对readPos的更新将会变慢,因为与写入器的写入共享错误,反之亦然。

这使得读卡器对写入器写入的缓存行真正是只读的。但是,在写入程序写入处于“修改”状态的行之后,仍然需要MESI流量来请求对其进行读取访问。但是,编写器仍然可以读取m_position而没有缓存丢失,因此它可以立即将其存储放入存储缓冲区。它只需要等待RFO获得缓存行的独占所有权,然后才能将元素和更新的m_position从其存储缓冲区提交到L1d缓存。

TODO:让< code>m_position递增而不进行手动换行,这样我们就有了一个需要很长时间才能换行的写序列号,从而避免了< code>lastread == wPos的早期误报。

使用wPos

如果写入器已经完成了整个过程,并且正在写入正在读取的元素,那么唯一的危险就是在一个很小的时间窗口内未被发现撕裂。对于频繁读取和不频繁写入,以及不太小的缓冲区,这种情况永远不会发生。读取后再次检查m_position(如SeqLock,类似于下面)会将竞争窗口缩小到仅仍在进行中的写入。

如果有多个读取器,另一个好的选择可能是在每个< code>m_buffer条目中添加一个< code>claimed标志。所以你会定义

template<typename T>
class WaitFreePublish
{

private:
    struct {
        alignas(32) T elem;           // at most 2 elements per cache line
        std::atomic<int8_t> claimed;  // writers sets this to 0, readers try to CAS it to 1
                                      // could be bool if we don't end up needing 3 states for anything.
                                      // set to "1" in the constructor?  or invert and call it "unclaimed"
    } m_buffer[maxElements];

    std::atomic<int> m_position {-1};
}

如果<code>T</code>在末尾有填充,很遗憾我们不能利用<code>声明的</code>标志:/

这避免了比较位置的可能的失败模式:如果写入者在读取之间绕回,我们得到的最坏结果是撕裂。我们可以通过让编写器在编写元素的其余部分之前先清除< code > claim 标志来检测这种撕裂。

由于没有其他线程写入m_position,我们绝对可以使用宽松的负载而不必担心。我们甚至可以在其他地方缓存写入位置,但希望读取器不会经常使包含m_position的缓存行无效。显然,在您的用例中,写入器性能/延迟可能不是什么大问题。

因此,编写器读取器可以是这样的,SeqLock式的撕裂检测使用声明的标志、元素和m_position的已知更新顺序。

/// claimed flag per array element supports concurrent readers

    // thread-safety: single-writer only
    // update claimed flag first, then element, then m_position.
    void publish(const T& elem)
    {
        const int wPos = m_position.load(std::memory_order_relaxed);
        const int nextPos = getNextPos(wPos);

        m_buffer[nextPos].claimed.store(0, std::memory_order_relaxed);
        std::atomic_thread_fence(std::memory_order_release);  // make sure that `0` is visible *before* the non-atomic element modification
        m_buffer[nextPos].elem = elem;

        m_position.store(nextPos, std::memory_order_release);
    }

    // thread-safety: multiple readers are ok.  First one to claim an entry gets it
    // check claimed flag before/after to detect overwrite, like a SeqLock
    const bool read(T &elem)
    {
        int rPos = m_position.load(std::memory_order_acquire);

        int8_t claimed = m_buffer[rPos].claimed.load(std::memory_order_relaxed);
        if (claimed != 0)
            return false;      // read-only early-out

        claimed = 0;
        if (!m_buffer[rPos].claimed.compare_exchange_strong(
                claimed, 1, std::memory_order_acquire, std::memory_order_relaxed))
            return false;  // strong CAS failed: another thread claimed it

        elem = m_buffer[rPos].elem;

        // final check that the writer didn't step on this buffer during read, like a SeqLock
        std::atomic_thread_fence(std::memory_order_acquire);    // LoadLoad barrier

        // We expect it to still be claimed=1 like we set with CAS
        // Otherwise we raced with a writer and elem may be torn.
        //  optionally retry once or twice in this case because we know there's a new value waiting to be read.
        return m_buffer[rPos].claimed.load(std::memory_order_relaxed) == 1;

        // Note that elem can be updated even if we return false, if there was tearing.  Use a temporary if that's not ok.
    }

使用 claim = m_buffer[rPos].exchange(1) 并检查 claimed==0 将是另一种选择,而不是 CAS 强。也许在 x86 上效率稍高一些。在 LL/SC 机器上,我想如果 CAS 发现与预期不匹配,它可能能够在不进行写入的情况下进行救助,在这种情况下,只读检查毫无意义。

我使用< code > . claimed . compare _ exchange _ strong(claimed,1) with success ordering = < code > acquire 来确保读取< code>claimed发生在读取< code >之前。elem。

“失败”内存排序可以是放松的:如果我们看到它已经被另一个线程声明,我们就放弃,不查看任何共享数据。

compare_exchange_strong的存储部分的内存排序可以是放松,所以我们只需要mo_acquire,而不是acq_rel。读者不会对共享数据进行任何其他存储,我认为存储的顺序也不重要。到负载。CAS是一个原子RMW。只有一个线程的CAS可以在给定的缓冲区元素上成功,因为它们都试图将其从0设置为1。这就是原子RMW的工作方式,不管是放松还是seq_cst或介于两者之间。

它不需要seq_cst:在这个线程读取. elem之前,我们不需要刷新存储缓冲区或其他任何东西来确保存储是可见的。仅仅是一个原子RMW就足以阻止多个线程实际认为它们成功了。发布只是确保它不能在放松的只读检查之前提前移动。这不是正确性问题。希望没有x86编译器会在编译时这样做。(在x86上的运行时,RMW原子操作总是seq_cst。)

我认为作为一个 RMW 使它不可能“踩到”作家的写作(在环绕之后)。但这可能是实际的CPU实现细节,而不是ISO C。在任何给定 .claimed 的全局修改顺序中,我认为 RMW 保持在一起,并且“获取”顺序确实使其领先于 .elem 的读取。但是,不属于 RMW 的发布存储将是一个潜在的问题:编写器可以环绕并将 claimed=0 放入新条目中,然后读者的存储最终可以提交并将其设置为 1,而实际上没有读者曾经阅读过该元素。

如果我们非常确定读取器不需要检测循环缓冲区的写入器回绕,那么在写入器和读取器中省略< code > STD::atomic _ thread _ fence 。(要求的和非原子元素存储仍将由发布存储订购到< code>m_position)。读取器可以简化为省略第二次检查,如果通过了CAS,则总是返回true。

注意,m_buffer[nextPos].reclaed.store(0,std::memory_order_release)不足以阻止后来的非原子存储出现在它之前:与发布Geofence不同,发布存储是一个单向障碍。释放Geofence就像双向StoreStore屏障。(在x86上免费,在其他ISA上便宜。)

不幸的是,这种SeqLock风格的撕裂检测在技术上并没有避免C抽象机器中的UB。在ISO C中没有好的/安全的方法来表达这种模式,并且已知在真实硬件上的asm中是安全的。实际上没有任何东西使用这个撕裂值(假设< code>read()的调用者忽略它的< code>elem值,如果它返回false)。

使<code>elem</code>成为<code>std::atomic

使用< code>volatile T elem会破坏< code>buffer[i]。elem = elem因为与C不同,C不允许将可变结构复制到常规结构或从常规结构复制。(volatile struct = struct不可能,为什么?).这对于SeqLock类型的模式来说非常烦人,在这种模式中,您希望编译器发出高效的代码来复制整个对象表示,可以选择使用SIMD向量。如果您编写一个采用< code>volatile的构造函数或赋值操作符,就不会得到这样的结果