threadsafe_queue: Attempt to fix crash

This commit is contained in:
yzct12345
2021-08-15 12:10:36 +00:00
committed by GitHub
parent 0ec8b2c96d
commit 6bb7c5b991

View File

@@ -6,6 +6,7 @@
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <optional>
@@ -14,12 +15,17 @@ namespace Common {
/// a more foolproof multiple reader, multiple writer queue
template <typename T>
class MPMCQueue {
#define ABORT() \
do { \
std::cerr << __FILE__ " ERR " << __LINE__ << std::endl; \
abort(); \
} while (0)
public:
~MPMCQueue() {
Clear();
if (waiting || head || tail) {
// Remove all the abort() after 1 month merged without problems
abort();
// Remove all the ABORT() after 1 month merged without problems
ABORT();
}
}
@@ -27,7 +33,7 @@ public:
void Push(Arg&& t) {
Node* const node = new Node(std::forward<Arg>(t));
if (!node || node == PLACEHOLDER) {
abort();
ABORT();
}
while (true) {
if (Node* const previous = tail.load(ACQUIRE)) {
@@ -35,9 +41,8 @@ public:
!previous->next.compare_exchange_weak(exchange, node, ACQ_REL)) {
continue;
}
if (Node* exchange = previous;
!tail.compare_exchange_strong(exchange, node, ACQ_REL)) {
abort();
if (tail.exchange(node, ACQ_REL) != previous) {
ABORT();
}
} else {
if (Node* exchange = nullptr;
@@ -63,7 +68,7 @@ public:
T PopWait() {
T t;
if (!PopImpl<true>(t)) {
abort();
ABORT();
}
return t;
}
@@ -81,7 +86,7 @@ public:
condition.wait(lock);
}
if (!waiting.fetch_sub(1, ACQ_REL)) {
abort();
ABORT();
}
}
@@ -95,12 +100,12 @@ public:
!last->next.compare_exchange_weak(exchange, PLACEHOLDER, ACQ_REL)) {
continue;
}
if (Node* exchange = last; !tail.compare_exchange_strong(exchange, nullptr, ACQ_REL)) {
abort();
if (tail.exchange(nullptr, ACQ_REL) != last) {
ABORT();
}
Node* node = head.exchange(nullptr, ACQ_REL);
while (node && node != PLACEHOLDER) {
Node* next = node->next;
Node* next = node->next.load(ACQUIRE);
delete node;
node = next;
}
@@ -126,7 +131,7 @@ private:
condition.wait(*lock);
continue;
}
Node* const next = node->next;
Node* const next = node->next.load(ACQUIRE);
if (next) {
if (next == PLACEHOLDER) {
continue;
@@ -139,20 +144,18 @@ private:
!node->next.compare_exchange_weak(exchange, PLACEHOLDER, ACQ_REL)) {
continue;
}
if (Node* exchange = node;
!tail.compare_exchange_strong(exchange, nullptr, ACQ_REL)) {
abort();
if (tail.exchange(nullptr, ACQ_REL) != node) {
ABORT();
}
if (Node* exchange = node;
!head.compare_exchange_strong(exchange, nullptr, ACQ_REL)) {
abort();
if (head.exchange(nullptr, ACQ_REL) != node) {
ABORT();
}
}
t = std::move(node->value);
delete node;
if (lock) {
if (!waiting.fetch_sub(1, RELEASE)) {
abort();
if (!waiting.fetch_sub(1, ACQ_REL)) {
ABORT();
}
}
return true;
@@ -176,7 +179,6 @@ private:
// We only need to avoid SEQ_CST on X86
// We can add RELAXED later if we port to ARM and it's too slow
static constexpr auto ACQUIRE = std::memory_order_acquire;
static constexpr auto RELEASE = std::memory_order_release;
static constexpr auto ACQ_REL = std::memory_order_acq_rel;
static inline const auto PLACEHOLDER = reinterpret_cast<Node*>(1);
@@ -186,6 +188,7 @@ private:
std::atomic_size_t waiting{0};
std::condition_variable condition{};
std::mutex mutex{};
#undef ABORT
};
/// a simple lockless thread-safe,