From 19dcd363d13e6d067b92b79da8f3ed2a361f438d Mon Sep 17 00:00:00 2001 From: ReinUsesLisp Date: Thu, 21 Jan 2021 00:06:39 -0300 Subject: [PATCH] threadsafe_queue: Add std::stop_token overload to PopWait --- src/common/threadsafe_queue.h | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index ad04df8cab..53298421c4 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -14,7 +14,7 @@ #include namespace Common { -template +template class SPSCQueue { public: SPSCQueue() { @@ -70,9 +70,9 @@ public: } bool Pop(T& t) { - if (Empty()) + if (Empty()) { return false; - + } --size; ElementPtr* tmpptr = read_ptr; @@ -86,7 +86,7 @@ public: void Wait() { if (Empty()) { std::unique_lock lock{cv_mutex}; - cv.wait(lock, [this]() { return !Empty(); }); + cv.wait(lock, [this] { return !Empty(); }); } } @@ -97,6 +97,19 @@ public: return t; } + T PopWait(std::stop_token stop_token) { + if (Empty()) { + std::unique_lock lock{cv_mutex}; + cv.wait(lock, stop_token, [this] { return !Empty(); }); + } + if (stop_token.stop_requested()) { + return T{}; + } + T t; + Pop(t); + return t; + } + // not thread-safe void Clear() { size.store(0); @@ -125,13 +138,13 @@ private: ElementPtr* read_ptr; std::atomic_size_t size{0}; std::mutex cv_mutex; - std::condition_variable cv; + std::conditional_t cv; }; // a simple thread-safe, // single reader, multiple writer queue -template +template class MPSCQueue { public: [[nodiscard]] std::size_t Size() const { @@ -168,13 +181,17 @@ public: return spsc_queue.PopWait(); } + T PopWait(std::stop_token stop_token) { + return spsc_queue.PopWait(stop_token); + } + // not thread-safe void Clear() { spsc_queue.Clear(); } private: - SPSCQueue spsc_queue; + SPSCQueue spsc_queue; std::mutex write_lock; }; } // namespace Common