我有一个小类,它利用std::atomic进行无锁操作。由于这个类被大量调用,它影响了性能,我遇到了麻烦。
这个类类似于LIFO,但是一旦调用了pop()函数,它只返回它的环形缓冲区的最后一个写入的元素(只有在最后一个pop()之后有新元素时)。
一个线程正在调用 push(),另一个线程正在调用 pop()。
由于这占用了我太多的电脑时间,我决定进一步研究d::atomic类及其memory_order。我在StackOverflow和其他资源和书籍中阅读了很多memory_order帖子,但我无法清楚地了解不同的模式。特别是,我在获取和发布模式之间挣扎:我也不明白为什么它们与memory_order_seq_cst不同。
memory_order_relaxed:在同一线程中,原子操作是即时的,但其他线程可能无法立即看到最新的值,它们需要一些时间才能更新。代码可以由编译器或操作系统自由重新排序。
memory_order_acquire/release:由atomic::load使用。它可以防止在此之前的代码行被重新排序(编译器/OS可以根据需要在这行之后重新排序),并使用此线程或另一个线程中的memory_order_release或memory_orded_seq_cst读取存储在此原子上的最新值。memory_order_release还可以在代码重新排序后阻止该代码。因此,在获取/发布中,两者之间的所有代码都可以被操作系统打乱。我不确定这是在同一线程之间还是在不同线程之间。
memory_order_seq_cst:最容易使用,因为它就像我们与变量一起使用的自然写入一样,可以立即刷新其他线程加载函数的值。
template<typename T>
class LockFreeEx
{
public:
void push(const T& element)
{
const int wPos = m_position.load(std::memory_order_seq_cst);
const int nextPos = getNextPos(wPos);
m_buffer[nextPos] = element;
m_position.store(nextPos, std::memory_order_seq_cst);
}
const bool pop(T& returnedElement)
{
const int wPos = m_position.exchange(-1, std::memory_order_seq_cst);
if (wPos != -1)
{
returnedElement = m_buffer[wPos];
return true;
}
else
{
return false;
}
}
private:
static constexpr int maxElements = 8;
static constexpr int getNextPos(int pos) noexcept {return (++pos == maxElements)? 0 : pos;}
std::array<T, maxElements> m_buffer;
std::atomic<int> m_position {-1};
};
所以,我的第一个想法是在所有原子操作中使用memory_order_relaxed,因为pop()线程在循环中每10-15毫秒在pop函数中寻找可用的更新,然后允许它在第一个pop()函数中失败,然后意识到有一个新的更新。只有几毫秒。
另一种选择是使用发布/获取 - 但我不确定它们。在所有 store() 中使用 release 并在所有 load() 函数中使用 collection。
不幸的是,我描述的所有memory_order似乎都起作用了,我不确定它们什么时候会失败,如果它们应该失败的话。
请你告诉我,如果你在这里使用放松的记忆顺序发现了一些问题?或者我应该使用释放/获取(也许关于这些的进一步解释可以帮助我)?为什么?
我认为,在所有的store()或load()中,轻松是这个类的最佳选择。但我不确定!
感谢阅读。
由于我看到每个人都在要求“char”,所以我将其更改为int,问题解决了!但这不是我想解决的问题。
正如我之前所说的那样,这个类很可能是后进先出的,但如果有最后一个元素的话,那么只有最后一个被推送的元素才重要。
我有一个大结构T(可复制和可签名),必须以无锁的方式在两个线程之间共享。所以,我知道的唯一方法是使用一个循环缓冲区来写T的最后一个已知值,以及一个原子,它知道最后一个写入值的索引。如果没有,索引将为-1。
请注意,我的推送线程必须知道何时有“new T”可用,这就是pop()返回bool的原因。
再次感谢每个试图帮助我记忆订单的人!:)
template<typename T>
class LockFreeEx
{
public:
LockFreeEx() {}
LockFreeEx(const T& initValue): m_data(initValue) {}
// WRITE THREAD - CAN BE SLOW, WILL BE CALLED EACH 500-800ms
void publish(const T& element)
{
// I used acquire instead relaxed to makesure wPos is always the lastest w_writePos value, and nextPos calculates the right one
const int wPos = m_writePos.load(std::memory_order_acquire);
const int nextPos = (wPos + 1) % bufferMaxSize;
m_buffer[nextPos] = element;
m_writePos.store(nextPos, std::memory_order_release);
}
// READ THREAD - NEED TO BE VERY FAST - CALLED ONCE AT THE BEGGINING OF THE LOOP each 2ms
inline void update()
{
// should I change to relaxed? It doesn't matter I don't get the new value or the old one, since I will call this function again very soon, and again, and again...
const int writeIndex = m_writePos.load(std::memory_order_acquire);
// Updating only in case there is something new... T may be a heavy struct
if (m_readPos != writeIndex)
{
m_readPos = writeIndex;
m_data = m_buffer[m_readPos];
}
}
// NEED TO BE LIGHTNING FAST, CALLED MULTIPLE TIMES IN THE READ THREAD
inline const T& get() const noexcept {return m_data;}
private:
// Buffer
static constexpr int bufferMaxSize = 4;
std::array<T, bufferMaxSize> m_buffer;
std::atomic<int> m_writePos {0};
int m_readPos = 0;
// Data
T m_data;
};
内存顺序不是关于您何时看到对原子对象的某些特定更改,而是关于此更改可以保证周围代码的内容。松弛原子除了对原子对象本身的更改之外什么也不能保证:更改将是原子的。但是您不能在任何同步上下文中使用松弛原子。
你有一些需要同步的代码。您希望弹出已被推动的内容,而不是尝试弹出尚未被推动的内容。因此,如果您使用宽松的操作,则无法保证您的pop会看到此推送代码:
m_buffer[nextPos] = element;
m_position.store(nextPos, std::memory_relaxed);
就像写的那样。它也可以这样看:
m_position.store(nextPos, std::memory_relaxed);
m_buffer[nextPos] = element;
因此,您可能会尝试从缓冲区中获取尚未存在的元素。因此,您必须使用一些同步,至少使用获取/释放内存顺序。
以及您的实际代码。我认为顺序可以如下:
const char wPos = m_position.load(std::memory_order_relaxed);
...
m_position.store(nextPos, std::memory_order_release);
...
const char wPos = m_position.exchange(-1, memory_order_acquire);
您的编写器只需要发布
,而不需要seq-cst,但是放松
太弱了。在对相应的m_buffer[]
条目进行非原子赋值之前,您不能发布m_position
的值。您需要发布顺序来确保只有在所有早期内存操作之后,其他线程才能看到m_position
存储。https://preshing.com/20120913/acquire-and-release-semantics/
这必须与读取器中的获取或seq_cst负载“同步”。或者至少mo_consume
在阅读器中。
理论上,您还需要wpos=m_position
至少是acquire
(或在读取器中消耗
),而不是放松,因为C11的内存模型对于值预测这样的事情足够弱,这可以让编译器在加载实际从相干缓存中获取值之前,推测性地使用wpos
的值。
(在实际的CPU上,一个疯狂的编译器可以使用test/branch来引入控制依赖关系,从而允许分支预测推测执行来打破wPos
可能值的数据依赖性。
但是普通的编译器不会这样做。在DEC Alpha以外的CPU上,源代码中的数据依赖关系< code>wPos = m_position然后使用< code>m_buffer[wPos]将在asm中创建一个数据依赖关系,就像< code>mo_consume应该利用的那样。真正的isa而不是Alpha保证了依赖加载的依赖排序。(甚至在Alpha上,使用宽松的原子交换可能足以关闭存在于少数真正的Alpha CPUs上的允许这种重新排序的小窗口。)
在为x86编译时,使用mo_acquire
完全没有缺点;它不需要任何额外的障碍。在其他ISAs上也有可能,比如32位ARM,其中获取
需要一个障碍,因此在轻松的负载下“作弊”可能是一个在实践中仍然安全的胜利。当前的编译器总是将mo_consume
增强为mo_acquire
,因此我们很遗憾无法利用它。
即使使用seq_cst
,您也已经有了真正的单词竞争条件。
m_position = 0
m_position = -1
来“声明”插槽 0,并读取 m_buffer[0] 的一部分;
wPos = m_position
读取为 -1
,并计算 nextPos = 0
。m_buffer[0]
的T
在读取之后添加第二次检查< code>m_position(像SeqLock一样)并不能在所有情况下检测到这一点,因为写入器直到写入缓冲区元素之后才更新< code>m_position。
即使您的真实用例在读取和写入之间有很长的间隔,这个缺陷也可以在几乎同时发生一次读取和写入的情况下折磨您。
我当然知道读取端不能等待,也不能停止(它是音频),每5-10毫秒弹出一次,写入端是用户输入,这更慢,更快的可以每500毫秒推送一次。
在现代CPU上,一毫秒已经过时了。线程间延迟通常约为60纳秒,因此是微秒的几分之一,例如从四核Intel x86。只要你不睡在互斥锁上,在放弃之前旋转重试一次或两次都不是问题。
代码审查:
该类类似于LIFO,但一旦调用pop()函数,它只返回其环形缓冲区的最后一个写入元素(仅当自last pop()以来有新元素时)。
这不是一个真正的队列或堆栈:push 和 pop 不是好名字。“发布”和“读取”或“获取”可能会更好,并使它的用途更加明显。
我会在代码中加入注释,以说明这对于单个作者和多个读者来说是安全的。(在< code>push中< code>m_position的非原子增量使得它对于多个编写器来说显然是不安全的。)
即便如此,即使同时运行 1 个作家 1 个阅读器,这也有点奇怪。如果在写入过程中开始读取,它将获得“旧”值,而不是等待几分之一微秒来获取新值。然后下次读取时,已经有一个新值在等待;上次错过的那个。因此,例如m_position
可以按以下顺序更新:2,-1,3。
这可能是所希望的,也可能不是所希望的,这取决于“陈旧”数据是否有任何价值,以及如果写入程序在写入过程中Hibernate,读取器阻塞的可接受性。或者甚至在作者不睡觉的情况下,旋转等待。
对于具有多个只读读取器的很少写入的小数据,标准模式是SeqLock。例如,用于在无法原子读写128位值的CPU上发布128位当前时间戳。请参阅使用32位原子实现64位原子计数器
为了安全起见,我们可以让编写器自由运行,始终环绕其圆形缓冲区,并让读者跟踪它查看的最后一个元素。
如果只有一个读取器,这应该是一个简单的非原子变量。如果它是一个实例变量,至少把它放在m_buffer[]
的另一边。
// Possible failure mode: writer wraps around between reads, leaving same m_position
// single-reader
const bool read(T &elem)
{
// FIXME: big hack to get this in a separate cache line from the instance vars
// maybe instead use alignas(64) int m_lastread as a class member, and/or on the other side of m_buffer from m_position.
static int lastread = -1;
int wPos = m_position.load(std::memory_order_acquire); // or cheat with relaxed to get asm that's like "consume"
if (lastread == wPos)
return false;
elem = m_buffer[wPos];
lastread = wPos;
return true;
}
您希望< code>lastread位于与编写器写入的内容不同的缓存行中。否则,读取器对readPos的更新将会变慢,因为与写入器的写入共享错误,反之亦然。
这使得读卡器对写入器写入的缓存行真正是只读的。但是,在写入程序写入处于“修改”状态的行之后,仍然需要MESI流量来请求对其进行读取访问。但是,编写器仍然可以读取m_position
而没有缓存丢失,因此它可以立即将其存储放入存储缓冲区。它只需要等待RFO获得缓存行的独占所有权,然后才能将元素和更新的m_position
从其存储缓冲区提交到L1d缓存。
TODO:让< code>m_position递增而不进行手动换行,这样我们就有了一个需要很长时间才能换行的写序列号,从而避免了< code>lastread == wPos的早期误报。
使用wPos
如果写入器已经完成了整个过程,并且正在写入正在读取的元素,那么唯一的危险就是在一个很小的时间窗口内未被发现撕裂。对于频繁读取和不频繁写入,以及不太小的缓冲区,这种情况永远不会发生。读取后再次检查m_position
(如SeqLock,类似于下面)会将竞争窗口缩小到仅仍在进行中的写入。
如果有多个读取器,另一个好的选择可能是在每个< code>m_buffer条目中添加一个< code>claimed标志。所以你会定义
template<typename T>
class WaitFreePublish
{
private:
struct {
alignas(32) T elem; // at most 2 elements per cache line
std::atomic<int8_t> claimed; // writers sets this to 0, readers try to CAS it to 1
// could be bool if we don't end up needing 3 states for anything.
// set to "1" in the constructor? or invert and call it "unclaimed"
} m_buffer[maxElements];
std::atomic<int> m_position {-1};
}
如果<code>T</code>在末尾有填充,很遗憾我们不能利用<code>声明的</code>标志:/
这避免了比较位置的可能的失败模式:如果写入者在读取之间绕回,我们得到的最坏结果是撕裂。我们可以通过让编写器在编写元素的其余部分之前先清除< code > claim 标志来检测这种撕裂。
由于没有其他线程写入m_position
,我们绝对可以使用宽松的负载而不必担心。我们甚至可以在其他地方缓存写入位置,但希望读取器不会经常使包含m_position
的缓存行无效。显然,在您的用例中,写入器性能/延迟可能不是什么大问题。
因此,编写器读取器可以是这样的,SeqLock式的撕裂检测使用声明的标志、元素和m_position的已知更新顺序。
/// claimed flag per array element supports concurrent readers
// thread-safety: single-writer only
// update claimed flag first, then element, then m_position.
void publish(const T& elem)
{
const int wPos = m_position.load(std::memory_order_relaxed);
const int nextPos = getNextPos(wPos);
m_buffer[nextPos].claimed.store(0, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release); // make sure that `0` is visible *before* the non-atomic element modification
m_buffer[nextPos].elem = elem;
m_position.store(nextPos, std::memory_order_release);
}
// thread-safety: multiple readers are ok. First one to claim an entry gets it
// check claimed flag before/after to detect overwrite, like a SeqLock
const bool read(T &elem)
{
int rPos = m_position.load(std::memory_order_acquire);
int8_t claimed = m_buffer[rPos].claimed.load(std::memory_order_relaxed);
if (claimed != 0)
return false; // read-only early-out
claimed = 0;
if (!m_buffer[rPos].claimed.compare_exchange_strong(
claimed, 1, std::memory_order_acquire, std::memory_order_relaxed))
return false; // strong CAS failed: another thread claimed it
elem = m_buffer[rPos].elem;
// final check that the writer didn't step on this buffer during read, like a SeqLock
std::atomic_thread_fence(std::memory_order_acquire); // LoadLoad barrier
// We expect it to still be claimed=1 like we set with CAS
// Otherwise we raced with a writer and elem may be torn.
// optionally retry once or twice in this case because we know there's a new value waiting to be read.
return m_buffer[rPos].claimed.load(std::memory_order_relaxed) == 1;
// Note that elem can be updated even if we return false, if there was tearing. Use a temporary if that's not ok.
}
使用 claim = m_buffer[rPos].exchange(1)
并检查 claimed==0
将是另一种选择,而不是 CAS 强。也许在 x86 上效率稍高一些。在 LL/SC 机器上,我想如果 CAS 发现与预期
不匹配,它可能能够在不进行写入的情况下进行救助,在这种情况下,只读检查毫无意义。
我使用< code > . claimed . compare _ exchange _ strong(claimed,1) with success ordering = < code > acquire 来确保读取< code>claimed发生在读取< code >之前。elem。
“失败”内存排序可以是放松的
:如果我们看到它已经被另一个线程声明,我们就放弃,不查看任何共享数据。
compare_exchange_strong
的存储部分的内存排序可以是放松
,所以我们只需要mo_acquire
,而不是acq_rel
。读者不会对共享数据进行任何其他存储,我认为存储的顺序也不重要。到负载。CAS是一个原子RMW。只有一个线程的CAS可以在给定的缓冲区元素上成功,因为它们都试图将其从0设置为1。这就是原子RMW的工作方式,不管是放松还是seq_cst或介于两者之间。
它不需要seq_cst:在这个线程读取. elem
之前,我们不需要刷新存储缓冲区或其他任何东西来确保存储是可见的。仅仅是一个原子RMW就足以阻止多个线程实际认为它们成功了。发布只是确保它不能在放松的只读检查之前提前移动。这不是正确性问题。希望没有x86编译器会在编译时这样做。(在x86上的运行时,RMW原子操作总是seq_cst。)
我认为作为一个 RMW 使它不可能“踩到”作家的写作(在环绕之后)。但这可能是实际的CPU实现细节,而不是ISO C。在任何给定 .claimed
的全局修改顺序中,我认为 RMW 保持在一起,并且“获取”顺序确实使其领先于 .elem
的读取。但是,不属于 RMW 的发布
存储将是一个潜在的问题:编写器可以环绕并将 claimed=0
放入新条目中,然后读者的存储最终可以提交并将其设置为 1,而实际上没有读者曾经阅读过该元素。
如果我们非常确定读取器不需要检测循环缓冲区的写入器回绕,那么在写入器和读取器中省略< code > STD::atomic _ thread _ fence 。(要求的和非原子元素存储仍将由发布存储订购到< code>m_position)。读取器可以简化为省略第二次检查,如果通过了CAS,则总是返回true。
注意,m_buffer[nextPos].reclaed.store(0,std::memory_order_release)
不足以阻止后来的非原子存储出现在它之前:与发布Geofence不同,发布存储是一个单向障碍。释放Geofence就像双向StoreStore屏障。(在x86上免费,在其他ISA上便宜。)
不幸的是,这种SeqLock风格的撕裂检测在技术上并没有避免C抽象机器中的UB。在ISO C中没有好的/安全的方法来表达这种模式,并且已知在真实硬件上的asm中是安全的。实际上没有任何东西使用这个撕裂值(假设< code>read()的调用者忽略它的< code>elem值,如果它返回false)。
使<code>elem</code>成为<code>std::atomic
使用< code>volatile T elem会破坏< code>buffer[i]。elem = elem因为与C不同,C不允许将可变结构复制到常规结构或从常规结构复制。(volatile struct = struct不可能,为什么?).这对于SeqLock类型的模式来说非常烦人,在这种模式中,您希望编译器发出高效的代码来复制整个对象表示,可以选择使用SIMD向量。如果您编写一个采用< code>volatile的构造函数或赋值操作符,就不会得到这样的结果