From 6bb7c5b99177a764c4f50957c35f7cd1427594d2 Mon Sep 17 00:00:00 2001 From: yzct12345 <87620833+yzct12345@users.noreply.github.com> Date: Sun, 15 Aug 2021 12:10:36 +0000 Subject: [PATCH] threadsafe_queue: Attempt to fix crash --- src/common/threadsafe_queue.h | 45 +++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index 3522406c73..5851a9fed4 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -14,12 +15,17 @@ namespace Common { /// a more foolproof multiple reader, multiple writer queue template class MPMCQueue { +#define ABORT() \ + do { \ + std::cerr << __FILE__ " ERR " << __LINE__ << std::endl; \ + abort(); \ + } while (0) public: ~MPMCQueue() { Clear(); if (waiting || head || tail) { - // Remove all the abort() after 1 month merged without problems - abort(); + // Remove all the ABORT() after 1 month merged without problems + ABORT(); } } @@ -27,7 +33,7 @@ public: void Push(Arg&& t) { Node* const node = new Node(std::forward(t)); if (!node || node == PLACEHOLDER) { - abort(); + ABORT(); } while (true) { if (Node* const previous = tail.load(ACQUIRE)) { @@ -35,9 +41,8 @@ public: !previous->next.compare_exchange_weak(exchange, node, ACQ_REL)) { continue; } - if (Node* exchange = previous; - !tail.compare_exchange_strong(exchange, node, ACQ_REL)) { - abort(); + if (tail.exchange(node, ACQ_REL) != previous) { + ABORT(); } } else { if (Node* exchange = nullptr; @@ -63,7 +68,7 @@ public: T PopWait() { T t; if (!PopImpl(t)) { - abort(); + ABORT(); } return t; } @@ -81,7 +86,7 @@ public: condition.wait(lock); } if (!waiting.fetch_sub(1, ACQ_REL)) { - abort(); + ABORT(); } } @@ -95,12 +100,12 @@ public: !last->next.compare_exchange_weak(exchange, PLACEHOLDER, ACQ_REL)) { continue; } - if (Node* exchange = last; !tail.compare_exchange_strong(exchange, nullptr, ACQ_REL)) { - abort(); + if (tail.exchange(nullptr, ACQ_REL) != last) { + ABORT(); } Node* node = head.exchange(nullptr, ACQ_REL); while (node && node != PLACEHOLDER) { - Node* next = node->next; + Node* next = node->next.load(ACQUIRE); delete node; node = next; } @@ -126,7 +131,7 @@ private: condition.wait(*lock); continue; } - Node* const next = node->next; + Node* const next = node->next.load(ACQUIRE); if (next) { if (next == PLACEHOLDER) { continue; @@ -139,20 +144,18 @@ private: !node->next.compare_exchange_weak(exchange, PLACEHOLDER, ACQ_REL)) { continue; } - if (Node* exchange = node; - !tail.compare_exchange_strong(exchange, nullptr, ACQ_REL)) { - abort(); + if (tail.exchange(nullptr, ACQ_REL) != node) { + ABORT(); } - if (Node* exchange = node; - !head.compare_exchange_strong(exchange, nullptr, ACQ_REL)) { - abort(); + if (head.exchange(nullptr, ACQ_REL) != node) { + ABORT(); } } t = std::move(node->value); delete node; if (lock) { - if (!waiting.fetch_sub(1, RELEASE)) { - abort(); + if (!waiting.fetch_sub(1, ACQ_REL)) { + ABORT(); } } return true; @@ -176,7 +179,6 @@ private: // We only need to avoid SEQ_CST on X86 // We can add RELAXED later if we port to ARM and it's too slow static constexpr auto ACQUIRE = std::memory_order_acquire; - static constexpr auto RELEASE = std::memory_order_release; static constexpr auto ACQ_REL = std::memory_order_acq_rel; static inline const auto PLACEHOLDER = reinterpret_cast(1); @@ -186,6 +188,7 @@ private: std::atomic_size_t waiting{0}; std::condition_variable condition{}; std::mutex mutex{}; +#undef ABORT }; /// a simple lockless thread-safe,