Compare commits

...

4 Commits

Author SHA1 Message Date
yzct12345
6bb7c5b991 threadsafe_queue: Attempt to fix crash 2021-08-15 12:10:36 +00:00
yzct12345
0ec8b2c96d threadsafe_queue: Address lioncash's review 2021-08-15 11:14:17 +00:00
yzct12345
436c674c96 threadsafe_queue: Address lioncash's review 2021-08-15 08:37:59 +00:00
yzct12345
22bb4b86d1 threadsafe_queue: Rewrite
This rewrites the queue. It replaces it with a new MPMC queue.

https://github.com/yuzu-emu/yuzu/pull/6868#issuecomment-898693448

> Tbh due to bad use the SPSCQueue is no longer SPSC (Single Producer Single Consumer) nor lockfree.

The new queue tolerates non-SPSC use. It is also lockfree on push if no thread is waiting and on pop if there are available items or waiting is not requested.

> best is to replace its instances for std::deque or srd::queue.

I had to write my own queue because of thread safety concerns in the standard library.

I have not noticed any crashes or performance decreases. I ***hope*** it stays that way. If anything breaks, please 100% blame @FernandoS27 and @bunnei for making me work on this hard task.
2021-08-15 07:50:36 +00:00

View File

@@ -4,175 +4,242 @@
#pragma once
// a simple lockless thread-safe,
// single reader, single writer queue
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <iostream>
#include <mutex>
#include <utility>
#include <optional>
namespace Common {
/// a more foolproof multiple reader, multiple writer queue
template <typename T>
class SPSCQueue {
class MPMCQueue {
#define ABORT() \
do { \
std::cerr << __FILE__ " ERR " << __LINE__ << std::endl; \
abort(); \
} while (0)
public:
SPSCQueue() {
write_ptr = read_ptr = new ElementPtr();
}
~SPSCQueue() {
// this will empty out the whole queue
delete read_ptr;
}
[[nodiscard]] std::size_t Size() const {
return size.load();
}
[[nodiscard]] bool Empty() const {
return Size() == 0;
}
[[nodiscard]] T& Front() const {
return read_ptr->current;
~MPMCQueue() {
Clear();
if (waiting || head || tail) {
// Remove all the ABORT() after 1 month merged without problems
ABORT();
}
}
template <typename Arg>
void Push(Arg&& t) {
// create the element, add it to the queue
write_ptr->current = std::forward<Arg>(t);
// set the next pointer to a new element ptr
// then advance the write pointer
ElementPtr* new_ptr = new ElementPtr();
write_ptr->next.store(new_ptr, std::memory_order_release);
write_ptr = new_ptr;
++size;
// cv_mutex must be held or else there will be a missed wakeup if the other thread is in the
// line before cv.wait
// TODO(bunnei): This can be replaced with C++20 waitable atomics when properly supported.
// See discussion on https://github.com/yuzu-emu/yuzu/pull/3173 for details.
std::lock_guard lock{cv_mutex};
cv.notify_one();
}
void Pop() {
--size;
ElementPtr* tmpptr = read_ptr;
// advance the read pointer
read_ptr = tmpptr->next.load();
// set the next element to nullptr to stop the recursive deletion
tmpptr->next.store(nullptr);
delete tmpptr; // this also deletes the element
}
bool Pop(T& t) {
if (Empty())
return false;
--size;
ElementPtr* tmpptr = read_ptr;
read_ptr = tmpptr->next.load(std::memory_order_acquire);
t = std::move(tmpptr->current);
tmpptr->next.store(nullptr);
delete tmpptr;
return true;
}
void Wait() {
if (Empty()) {
std::unique_lock lock{cv_mutex};
cv.wait(lock, [this]() { return !Empty(); });
Node* const node = new Node(std::forward<Arg>(t));
if (!node || node == PLACEHOLDER) {
ABORT();
}
while (true) {
if (Node* const previous = tail.load(ACQUIRE)) {
if (Node* exchange = nullptr;
!previous->next.compare_exchange_weak(exchange, node, ACQ_REL)) {
continue;
}
if (tail.exchange(node, ACQ_REL) != previous) {
ABORT();
}
} else {
if (Node* exchange = nullptr;
!tail.compare_exchange_weak(exchange, node, ACQ_REL)) {
continue;
}
for (Node* exchange = nullptr;
!head.compare_exchange_weak(exchange, node, ACQ_REL);)
;
}
break;
}
if (waiting.load(ACQUIRE)) {
std::lock_guard lock{mutex};
condition.notify_one();
}
}
bool Pop(T& t) {
return PopImpl<false>(t);
}
T PopWait() {
Wait();
T t;
Pop(t);
if (!PopImpl<true>(t)) {
ABORT();
}
return t;
}
// not thread-safe
void Wait() {
if (head.load(ACQUIRE)) {
return;
}
static_cast<void>(waiting.fetch_add(1, ACQ_REL));
std::unique_lock lock{mutex};
while (true) {
if (head.load(ACQUIRE)) {
break;
}
condition.wait(lock);
}
if (!waiting.fetch_sub(1, ACQ_REL)) {
ABORT();
}
}
void Clear() {
size.store(0);
delete read_ptr;
write_ptr = read_ptr = new ElementPtr();
while (true) {
Node* const last = tail.load(ACQUIRE);
if (!last) {
return;
}
if (Node* exchange = nullptr;
!last->next.compare_exchange_weak(exchange, PLACEHOLDER, ACQ_REL)) {
continue;
}
if (tail.exchange(nullptr, ACQ_REL) != last) {
ABORT();
}
Node* node = head.exchange(nullptr, ACQ_REL);
while (node && node != PLACEHOLDER) {
Node* next = node->next.load(ACQUIRE);
delete node;
node = next;
}
return;
}
}
private:
// stores a pointer to element
// and a pointer to the next ElementPtr
class ElementPtr {
public:
ElementPtr() {}
~ElementPtr() {
ElementPtr* next_ptr = next.load();
if (next_ptr)
delete next_ptr;
template <bool WAIT>
bool PopImpl(T& t) {
std::optional<std::unique_lock<std::mutex>> lock{std::nullopt};
while (true) {
Node* const node = head.load(ACQUIRE);
if (!node) {
if constexpr (!WAIT) {
return false;
}
if (!lock) {
static_cast<void>(waiting.fetch_add(1, ACQ_REL));
lock = std::unique_lock{mutex};
continue;
}
condition.wait(*lock);
continue;
}
Node* const next = node->next.load(ACQUIRE);
if (next) {
if (next == PLACEHOLDER) {
continue;
}
if (Node* exchange = node; !head.compare_exchange_weak(exchange, next, ACQ_REL)) {
continue;
}
} else {
if (Node* exchange = nullptr;
!node->next.compare_exchange_weak(exchange, PLACEHOLDER, ACQ_REL)) {
continue;
}
if (tail.exchange(nullptr, ACQ_REL) != node) {
ABORT();
}
if (head.exchange(nullptr, ACQ_REL) != node) {
ABORT();
}
}
t = std::move(node->value);
delete node;
if (lock) {
if (!waiting.fetch_sub(1, ACQ_REL)) {
ABORT();
}
}
return true;
}
}
T current;
std::atomic<ElementPtr*> next{nullptr};
struct Node {
template <typename Arg>
explicit Node(Arg&& t) : value{std::forward<Arg>(t)} {}
Node(const Node&) = delete;
Node& operator=(const Node&) = delete;
Node(Node&&) = delete;
Node& operator=(Node&&) = delete;
const T value;
std::atomic<Node*> next{nullptr};
};
ElementPtr* write_ptr;
ElementPtr* read_ptr;
std::atomic_size_t size{0};
std::mutex cv_mutex;
std::condition_variable cv;
// We only need to avoid SEQ_CST on X86
// We can add RELAXED later if we port to ARM and it's too slow
static constexpr auto ACQUIRE = std::memory_order_acquire;
static constexpr auto ACQ_REL = std::memory_order_acq_rel;
static inline const auto PLACEHOLDER = reinterpret_cast<Node*>(1);
std::atomic<Node*> head{nullptr};
std::atomic<Node*> tail{nullptr};
std::atomic_size_t waiting{0};
std::condition_variable condition{};
std::mutex mutex{};
#undef ABORT
};
// a simple thread-safe,
// single reader, multiple writer queue
/// a simple lockless thread-safe,
/// single reader, single writer queue
template <typename T>
class MPSCQueue {
class /*[[deprecated("Transition to MPMCQueue")]]*/ SPSCQueue {
public:
[[nodiscard]] std::size_t Size() const {
return spsc_queue.Size();
}
[[nodiscard]] bool Empty() const {
return spsc_queue.Empty();
}
[[nodiscard]] T& Front() const {
return spsc_queue.Front();
}
template <typename Arg>
void Push(Arg&& t) {
std::lock_guard lock{write_lock};
spsc_queue.Push(t);
}
void Pop() {
return spsc_queue.Pop();
queue.Push(std::forward<Arg>(t));
}
bool Pop(T& t) {
return spsc_queue.Pop(t);
return queue.Pop(t);
}
void Wait() {
spsc_queue.Wait();
queue.Wait();
}
T PopWait() {
return spsc_queue.PopWait();
return queue.PopWait();
}
// not thread-safe
void Clear() {
spsc_queue.Clear();
queue.Clear();
}
private:
SPSCQueue<T> spsc_queue;
std::mutex write_lock;
MPMCQueue<T> queue{};
};
/// a simple thread-safe,
/// single reader, multiple writer queue
template <typename T>
class /*[[deprecated("Transition to MPMCQueue")]]*/ MPSCQueue {
public:
template <typename Arg>
void Push(Arg&& t) {
queue.Push(std::forward<Arg>(t));
}
bool Pop(T& t) {
return queue.Pop(t);
}
T PopWait() {
return queue.PopWait();
}
private:
MPMCQueue<T> queue{};
};
} // namespace Common