From a1261ab0069731857a65bb96b6fb111f77bbb448 Mon Sep 17 00:00:00 2001 From: Levi Behunin Date: Mon, 18 Oct 2021 02:33:37 -0600 Subject: [PATCH] Replace threadsafe queue from: https://github.com/rigtorp/MPMCQueue --- src/common/logging/backend.cpp | 14 +- src/common/logging/log_entry.h | 4 +- src/common/threadsafe_queue.h | 419 +++++++++++++--------- src/input_common/gcadapter/gc_adapter.cpp | 10 +- src/input_common/gcadapter/gc_adapter.h | 6 +- src/input_common/gcadapter/gc_poller.cpp | 4 +- src/input_common/mouse/mouse_input.cpp | 8 +- src/input_common/mouse/mouse_input.h | 6 +- src/input_common/mouse/mouse_poller.cpp | 8 +- src/input_common/sdl/sdl_impl.cpp | 9 +- src/input_common/sdl/sdl_impl.h | 2 +- src/input_common/udp/client.cpp | 12 +- src/input_common/udp/client.h | 8 +- src/input_common/udp/udp.cpp | 2 +- src/video_core/gpu_thread.cpp | 5 +- src/video_core/gpu_thread.h | 4 +- 16 files changed, 302 insertions(+), 219 deletions(-) diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index 0e85a9c1de..0ddcd93798 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -206,7 +206,7 @@ public: return; const Entry& entry = CreateEntry(log_class, log_level, filename, line_num, function, std::move(message)); - message_queue.Push(entry); + message_queue.push(entry); } private: @@ -218,7 +218,9 @@ private: ForEachBackend([&entry](Backend& backend) { backend.Write(entry); }); }; while (true) { - entry = message_queue.PopWait(); + if (!message_queue.try_pop(entry)) { + continue; + } if (entry.final_entry) { break; } @@ -227,7 +229,7 @@ private: // Drain the logging queue. Only writes out up to MAX_LOGS_TO_WRITE to prevent a // case where a system is repeatedly spamming logs even on close. int max_logs_to_write = filter.IsDebug() ? INT_MAX : 100; - while (max_logs_to_write-- && message_queue.Pop(entry)) { + while (max_logs_to_write-- && message_queue.try_pop(entry)) { write_logs(); } })} {} @@ -239,7 +241,7 @@ private: void StopBackendThread() { Entry stop_entry{}; stop_entry.final_entry = true; - message_queue.Push(stop_entry); + message_queue.push(stop_entry); backend_thread.join(); } @@ -256,7 +258,7 @@ private: .filename = filename, .line_num = line_nr, .function = function, - .message = std::move(message), + .message = message.c_str(), .final_entry = false, }; } @@ -279,7 +281,7 @@ private: FileBackend file_backend; std::thread backend_thread; - MPSCQueue message_queue{}; + MPMCQueue message_queue{100000}; std::chrono::steady_clock::time_point time_origin{std::chrono::steady_clock::now()}; }; } // namespace diff --git a/src/common/logging/log_entry.h b/src/common/logging/log_entry.h index dd6f448418..a05d1441af 100644 --- a/src/common/logging/log_entry.h +++ b/src/common/logging/log_entry.h @@ -20,8 +20,8 @@ struct Entry { Level log_level{}; const char* filename = nullptr; unsigned int line_num = 0; - std::string function; - std::string message; + const char* function; + const char* message; bool final_entry = false; }; diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index 2c8c2b90e9..b9f36c8ed9 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -1,195 +1,282 @@ -// Copyright 2010 Dolphin Emulator Project -// Licensed under GPLv2+ -// Refer to the license.txt file included. +/* +Copyright (c) 2020 Erik Rigtorp + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + */ #pragma once -// a simple lockless thread-safe, -// single reader, single writer queue - #include -#include -#include -#include -#include +#include +#include // offsetof +#include +#include +#include // std::hardware_destructive_interference_size +#include + +#ifndef __cpp_aligned_new +#ifdef _WIN32 +#include // _aligned_malloc +#else +#include // posix_memalign +#endif +#endif namespace Common { -template -class SPSCQueue { +namespace mpmc { +#ifdef __cpp_lib_hardware_interference_size +static constexpr size_t hardwareInterferenceSize = std::hardware_destructive_interference_size; +#else +static constexpr size_t hardwareInterferenceSize = 64; +#endif + +#if defined(__cpp_aligned_new) +template +using AlignedAllocator = std::allocator; +#else +template +struct AlignedAllocator { + using value_type = T; + + T* allocate(std::size_t n) { + if (n > std::numeric_limits::max() / sizeof(T)) { + throw std::bad_array_new_length(); + } +#ifdef _WIN32 + auto* p = static_cast(_aligned_malloc(sizeof(T) * n, alignof(T))); + if (p == nullptr) { + throw std::bad_alloc(); + } +#else + T* p; + if (posix_memalign(reinterpret_cast(&p), alignof(T), sizeof(T) * n) != 0) { + throw std::bad_alloc(); + } +#endif + return p; + } + + void deallocate(T* p, std::size_t) { +#ifdef _WIN32 + _aligned_free(p); +#else + free(p); +#endif + } +}; +#endif + +template +struct Slot { + ~Slot() noexcept { + if (turn & 1) { + destroy(); + } + } + + template + void construct(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); + new (&storage) T(std::forward(args)...); + } + + void destroy() noexcept { + static_assert(std::is_nothrow_destructible::value, "T must be nothrow destructible"); + reinterpret_cast(&storage)->~T(); + } + + T&& move() noexcept { + return reinterpret_cast(storage); + } + + // Align to avoid false sharing between adjacent slots + alignas(hardwareInterferenceSize) std::atomic turn = {0}; + typename std::aligned_storage::type storage; +}; + +template >> +class Queue { +private: + static_assert(std::is_nothrow_copy_assignable::value || + std::is_nothrow_move_assignable::value, + "T must be nothrow copy or move assignable"); + + static_assert(std::is_nothrow_destructible::value, "T must be nothrow destructible"); + public: - SPSCQueue() { - write_ptr = read_ptr = new ElementPtr(); - } - ~SPSCQueue() { - // this will empty out the whole queue - delete read_ptr; + explicit Queue(const size_t capacity, const Allocator& allocator = Allocator()) + : capacity_(capacity), allocator_(allocator), head_(0), tail_(0) { + if (capacity_ < 1) { + throw std::invalid_argument("capacity < 1"); + } + // Allocate one extra slot to prevent false sharing on the last slot + slots_ = allocator_.allocate(capacity_ + 1); + // Allocators are not required to honor alignment for over-aligned types + // (see http://eel.is/c++draft/allocator.requirements#10) so we verify + // alignment here + if (reinterpret_cast(slots_) % alignof(Slot) != 0) { + allocator_.deallocate(slots_, capacity_ + 1); + throw std::bad_alloc(); + } + for (size_t i = 0; i < capacity_; ++i) { + new (&slots_[i]) Slot(); + } + static_assert(alignof(Slot) == hardwareInterferenceSize, + "Slot must be aligned to cache line boundary to prevent false sharing"); + static_assert(sizeof(Slot) % hardwareInterferenceSize == 0, + "Slot size must be a multiple of cache line size to prevent " + "false sharing between adjacent slots"); + static_assert(sizeof(Queue) % hardwareInterferenceSize == 0, + "Queue size must be a multiple of cache line size to " + "prevent false sharing between adjacent queues"); + static_assert(offsetof(Queue, tail_) - offsetof(Queue, head_) == + static_cast(hardwareInterferenceSize), + "head and tail must be a cache line apart to prevent false sharing"); } - [[nodiscard]] std::size_t Size() const { - return size.load(); + ~Queue() noexcept { + for (size_t i = 0; i < capacity_; ++i) { + slots_[i].~Slot(); + } + allocator_.deallocate(slots_, capacity_ + 1); } - [[nodiscard]] bool Empty() const { - return Size() == 0; + // non-copyable and non-movable + Queue(const Queue&) = delete; + Queue& operator=(const Queue&) = delete; + + template + void emplace(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); + auto const head = head_.fetch_add(1); + auto& slot = slots_[idx(head)]; + while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire)) + ; + slot.construct(std::forward(args)...); + slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); } - [[nodiscard]] T& Front() const { - return read_ptr->current; - } - - template - void Push(Arg&& t) { - // create the element, add it to the queue - write_ptr->current = std::forward(t); - // set the next pointer to a new element ptr - // then advance the write pointer - ElementPtr* new_ptr = new ElementPtr(); - write_ptr->next.store(new_ptr, std::memory_order_release); - write_ptr = new_ptr; - ++size; - - // cv_mutex must be held or else there will be a missed wakeup if the other thread is in the - // line before cv.wait - // TODO(bunnei): This can be replaced with C++20 waitable atomics when properly supported. - // See discussion on https://github.com/yuzu-emu/yuzu/pull/3173 for details. - std::lock_guard lock{cv_mutex}; - cv.notify_one(); - } - - void Pop() { - --size; - - ElementPtr* tmpptr = read_ptr; - // advance the read pointer - read_ptr = tmpptr->next.load(); - // set the next element to nullptr to stop the recursive deletion - tmpptr->next.store(nullptr); - delete tmpptr; // this also deletes the element - } - - bool Pop(T& t) { - if (Empty()) - return false; - - --size; - - ElementPtr* tmpptr = read_ptr; - read_ptr = tmpptr->next.load(std::memory_order_acquire); - t = std::move(tmpptr->current); - tmpptr->next.store(nullptr); - delete tmpptr; - return true; - } - - void Wait() { - if (Empty()) { - std::unique_lock lock{cv_mutex}; - cv.wait(lock, [this] { return !Empty(); }); + template + bool try_emplace(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); + auto head = head_.load(std::memory_order_acquire); + for (;;) { + auto& slot = slots_[idx(head)]; + if (turn(head) * 2 == slot.turn.load(std::memory_order_acquire)) { + if (head_.compare_exchange_strong(head, head + 1)) { + slot.construct(std::forward(args)...); + slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); + return true; + } + } else { + auto const prevHead = head; + head = head_.load(std::memory_order_acquire); + if (head == prevHead) { + return false; + } + } } } - T PopWait() { - Wait(); - T t; - Pop(t); - return t; + void push(const T& v) noexcept { + static_assert(std::is_nothrow_copy_constructible::value, + "T must be nothrow copy constructible"); + emplace(v); } - T PopWait(std::stop_token stop_token) { - if (Empty()) { - std::unique_lock lock{cv_mutex}; - cv.wait(lock, stop_token, [this] { return !Empty(); }); - } - if (stop_token.stop_requested()) { - return T{}; - } - T t; - Pop(t); - return t; + template ::value>::type> + void push(P&& v) noexcept { + emplace(std::forward

(v)); } - // not thread-safe - void Clear() { - size.store(0); - delete read_ptr; - write_ptr = read_ptr = new ElementPtr(); + bool try_push(const T& v) noexcept { + static_assert(std::is_nothrow_copy_constructible::value, + "T must be nothrow copy constructible"); + return try_emplace(v); + } + + template ::value>::type> + bool try_push(P&& v) noexcept { + return try_emplace(std::forward

(v)); + } + + void pop(T& v) noexcept { + auto const tail = tail_.fetch_add(1); + auto& slot = slots_[idx(tail)]; + while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire)) + ; + v = slot.move(); + slot.destroy(); + slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); + } + + bool try_pop(T& v) noexcept { + auto tail = tail_.load(std::memory_order_acquire); + for (;;) { + auto& slot = slots_[idx(tail)]; + if (turn(tail) * 2 + 1 == slot.turn.load(std::memory_order_acquire)) { + if (tail_.compare_exchange_strong(tail, tail + 1)) { + v = slot.move(); + slot.destroy(); + slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); + return true; + } + } else { + auto const prevTail = tail; + tail = tail_.load(std::memory_order_acquire); + if (tail == prevTail) { + return false; + } + } + } } private: - // stores a pointer to element - // and a pointer to the next ElementPtr - class ElementPtr { - public: - ElementPtr() {} - ~ElementPtr() { - ElementPtr* next_ptr = next.load(); - - if (next_ptr) - delete next_ptr; - } - - T current; - std::atomic next{nullptr}; - }; - - ElementPtr* write_ptr; - ElementPtr* read_ptr; - std::atomic_size_t size{0}; - std::mutex cv_mutex; - std::conditional_t cv; -}; - -// a simple thread-safe, -// single reader, multiple writer queue - -template -class MPSCQueue { -public: - [[nodiscard]] std::size_t Size() const { - return spsc_queue.Size(); + constexpr size_t idx(size_t i) const noexcept { + return i % capacity_; } - [[nodiscard]] bool Empty() const { - return spsc_queue.Empty(); - } - - [[nodiscard]] T& Front() const { - return spsc_queue.Front(); - } - - template - void Push(Arg&& t) { - std::lock_guard lock{write_lock}; - spsc_queue.Push(t); - } - - void Pop() { - return spsc_queue.Pop(); - } - - bool Pop(T& t) { - return spsc_queue.Pop(t); - } - - void Wait() { - spsc_queue.Wait(); - } - - T PopWait() { - return spsc_queue.PopWait(); - } - - T PopWait(std::stop_token stop_token) { - return spsc_queue.PopWait(stop_token); - } - - // not thread-safe - void Clear() { - spsc_queue.Clear(); + constexpr size_t turn(size_t i) const noexcept { + return i / capacity_; } private: - SPSCQueue spsc_queue; - std::mutex write_lock; + const size_t capacity_; + Slot* slots_; +#if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address) + Allocator allocator_ [[no_unique_address]]; +#else + Allocator allocator_; +#endif + + // Align to avoid false sharing between head_ and tail_ + alignas(hardwareInterferenceSize) std::atomic head_; + alignas(hardwareInterferenceSize) std::atomic tail_; }; -} // namespace Common +} // namespace mpmc + +template >> +using MPMCQueue = mpmc::Queue; + +} // namespace Common \ No newline at end of file diff --git a/src/input_common/gcadapter/gc_adapter.cpp b/src/input_common/gcadapter/gc_adapter.cpp index a2f1bb67c8..aed94a52c3 100644 --- a/src/input_common/gcadapter/gc_adapter.cpp +++ b/src/input_common/gcadapter/gc_adapter.cpp @@ -170,7 +170,7 @@ void Adapter::UpdateYuzuSettings(std::size_t port) { if (pads[port].buttons != 0) { pad_status.button = pads[port].last_button; - pad_queue.Push(pad_status); + pad_queue.push(pad_status); } // Accounting for a threshold here to ensure an intentional press @@ -181,7 +181,7 @@ void Adapter::UpdateYuzuSettings(std::size_t port) { pad_status.axis = static_cast(i); pad_status.axis_value = value; pad_status.axis_threshold = axis_threshold; - pad_queue.Push(pad_status); + pad_queue.push(pad_status); } } } @@ -478,20 +478,18 @@ bool Adapter::DeviceConnected(std::size_t port) const { } void Adapter::BeginConfiguration() { - pad_queue.Clear(); configuring = true; } void Adapter::EndConfiguration() { - pad_queue.Clear(); configuring = false; } -Common::SPSCQueue& Adapter::GetPadQueue() { +Common::MPMCQueue& Adapter::GetPadQueue() { return pad_queue; } -const Common::SPSCQueue& Adapter::GetPadQueue() const { +const Common::MPMCQueue& Adapter::GetPadQueue() const { return pad_queue; } diff --git a/src/input_common/gcadapter/gc_adapter.h b/src/input_common/gcadapter/gc_adapter.h index e5de5e94fc..519c66ed6a 100644 --- a/src/input_common/gcadapter/gc_adapter.h +++ b/src/input_common/gcadapter/gc_adapter.h @@ -85,8 +85,8 @@ public: void BeginConfiguration(); void EndConfiguration(); - Common::SPSCQueue& GetPadQueue(); - const Common::SPSCQueue& GetPadQueue() const; + Common::MPMCQueue& GetPadQueue(); + const Common::MPMCQueue& GetPadQueue() const; GCController& GetPadState(std::size_t port); const GCController& GetPadState(std::size_t port) const; @@ -145,7 +145,7 @@ private: libusb_device_handle* usb_adapter_handle = nullptr; std::array pads; - Common::SPSCQueue pad_queue; + Common::MPMCQueue pad_queue{1024}; std::thread adapter_input_thread; std::thread adapter_scan_thread; diff --git a/src/input_common/gcadapter/gc_poller.cpp b/src/input_common/gcadapter/gc_poller.cpp index 1b6ded8d60..12bd4c13a2 100644 --- a/src/input_common/gcadapter/gc_poller.cpp +++ b/src/input_common/gcadapter/gc_poller.cpp @@ -103,7 +103,7 @@ Common::ParamPackage GCButtonFactory::GetNextInput() const { Common::ParamPackage params; GCAdapter::GCPadStatus pad; auto& queue = adapter->GetPadQueue(); - while (queue.Pop(pad)) { + while (queue.try_pop(pad)) { // This while loop will break on the earliest detected button params.Set("engine", "gcpad"); params.Set("port", static_cast(pad.port)); @@ -263,7 +263,7 @@ Common::ParamPackage GCAnalogFactory::GetNextInput() { GCAdapter::GCPadStatus pad; Common::ParamPackage params; auto& queue = adapter->GetPadQueue(); - while (queue.Pop(pad)) { + while (queue.try_pop(pad)) { if (pad.button != GCAdapter::PadButton::Undefined) { params.Set("engine", "gcpad"); params.Set("port", static_cast(pad.port)); diff --git a/src/input_common/mouse/mouse_input.cpp b/src/input_common/mouse/mouse_input.cpp index 3b052ffb22..1c9f497eb3 100644 --- a/src/input_common/mouse/mouse_input.cpp +++ b/src/input_common/mouse/mouse_input.cpp @@ -52,7 +52,7 @@ void Mouse::UpdateYuzuSettings() { return; } - mouse_queue.Push(MouseStatus{ + mouse_queue.push(MouseStatus{ .button = last_button, }); } @@ -153,7 +153,6 @@ void Mouse::ReleaseAllButtons() { void Mouse::BeginConfiguration() { buttons = 0; last_button = MouseButton::Undefined; - mouse_queue.Clear(); configuring = true; } @@ -165,7 +164,6 @@ void Mouse::EndConfiguration() { info.data.axis = {0, 0}; } last_button = MouseButton::Undefined; - mouse_queue.Clear(); configuring = false; } @@ -205,11 +203,11 @@ bool Mouse::UnlockButton(std::size_t button_) { return button_state; } -Common::SPSCQueue& Mouse::GetMouseQueue() { +Common::MPMCQueue& Mouse::GetMouseQueue() { return mouse_queue; } -const Common::SPSCQueue& Mouse::GetMouseQueue() const { +const Common::MPMCQueue& Mouse::GetMouseQueue() const { return mouse_queue; } diff --git a/src/input_common/mouse/mouse_input.h b/src/input_common/mouse/mouse_input.h index c8bae99c14..dc47e9e0a9 100644 --- a/src/input_common/mouse/mouse_input.h +++ b/src/input_common/mouse/mouse_input.h @@ -79,8 +79,8 @@ public: [[nodiscard]] bool ToggleButton(std::size_t button_); [[nodiscard]] bool UnlockButton(std::size_t button_); - [[nodiscard]] Common::SPSCQueue& GetMouseQueue(); - [[nodiscard]] const Common::SPSCQueue& GetMouseQueue() const; + [[nodiscard]] Common::MPMCQueue& GetMouseQueue(); + [[nodiscard]] const Common::MPMCQueue& GetMouseQueue() const; [[nodiscard]] MouseData& GetMouseState(std::size_t button); [[nodiscard]] const MouseData& GetMouseState(std::size_t button) const; @@ -109,7 +109,7 @@ private: std::jthread update_thread; MouseButton last_button{MouseButton::Undefined}; std::array mouse_info; - Common::SPSCQueue mouse_queue; + Common::MPMCQueue mouse_queue{1024}; bool configuring{false}; int mouse_panning_timout{}; }; diff --git a/src/input_common/mouse/mouse_poller.cpp b/src/input_common/mouse/mouse_poller.cpp index 090b26972d..f8cf19d117 100644 --- a/src/input_common/mouse/mouse_poller.cpp +++ b/src/input_common/mouse/mouse_poller.cpp @@ -52,7 +52,7 @@ Common::ParamPackage MouseButtonFactory::GetNextInput() const { MouseInput::MouseStatus pad; Common::ParamPackage params; auto& queue = mouse_input->GetMouseQueue(); - while (queue.Pop(pad)) { + while (queue.try_pop(pad)) { // This while loop will break on the earliest detected button if (pad.button != MouseInput::MouseButton::Undefined) { params.Set("engine", "mouse"); @@ -184,7 +184,7 @@ Common::ParamPackage MouseAnalogFactory::GetNextInput() const { MouseInput::MouseStatus pad; Common::ParamPackage params; auto& queue = mouse_input->GetMouseQueue(); - while (queue.Pop(pad)) { + while (queue.try_pop(pad)) { // This while loop will break on the earliest detected button if (pad.button != MouseInput::MouseButton::Undefined) { params.Set("engine", "mouse"); @@ -227,7 +227,7 @@ Common::ParamPackage MouseMotionFactory::GetNextInput() const { MouseInput::MouseStatus pad; Common::ParamPackage params; auto& queue = mouse_input->GetMouseQueue(); - while (queue.Pop(pad)) { + while (queue.try_pop(pad)) { // This while loop will break on the earliest detected button if (pad.button != MouseInput::MouseButton::Undefined) { params.Set("engine", "mouse"); @@ -275,7 +275,7 @@ Common::ParamPackage MouseTouchFactory::GetNextInput() const { MouseInput::MouseStatus pad; Common::ParamPackage params; auto& queue = mouse_input->GetMouseQueue(); - while (queue.Pop(pad)) { + while (queue.try_pop(pad)) { // This while loop will break on the earliest detected button if (pad.button != MouseInput::MouseButton::Undefined) { params.Set("engine", "mouse"); diff --git a/src/input_common/sdl/sdl_impl.cpp b/src/input_common/sdl/sdl_impl.cpp index ecb00d4286..30556614a3 100644 --- a/src/input_common/sdl/sdl_impl.cpp +++ b/src/input_common/sdl/sdl_impl.cpp @@ -46,7 +46,7 @@ static int SDLEventWatcher(void* user_data, SDL_Event* event) { // Don't handle the event if we are configuring if (sdl_state->polling) { - sdl_state->event_queue.Push(*event); + sdl_state->event_queue.push(*event); } else { sdl_state->HandleGameControllerEvent(*event); } @@ -1460,7 +1460,6 @@ public: explicit SDLPoller(SDLState& state_) : state(state_) {} void Start([[maybe_unused]] const std::string& device_id) override { - state.event_queue.Clear(); state.polling = true; } @@ -1478,7 +1477,7 @@ public: Common::ParamPackage GetNextInput() override { SDL_Event event; - while (state.event_queue.Pop(event)) { + while (state.event_queue.try_pop(event)) { const auto package = FromEvent(event); if (package) { return *package; @@ -1550,7 +1549,7 @@ public: Common::ParamPackage GetNextInput() override { SDL_Event event; - while (state.event_queue.Pop(event)) { + while (state.event_queue.try_pop(event)) { const auto package = FromEvent(event); if (package) { return *package; @@ -1592,7 +1591,7 @@ public: Common::ParamPackage GetNextInput() override { SDL_Event event; - while (state.event_queue.Pop(event)) { + while (state.event_queue.try_pop(event)) { if (event.type != SDL_JOYAXISMOTION) { // Check for a button press auto button_press = button_poller.FromEvent(event); diff --git a/src/input_common/sdl/sdl_impl.h b/src/input_common/sdl/sdl_impl.h index 7a9ad63465..f808e072a2 100644 --- a/src/input_common/sdl/sdl_impl.h +++ b/src/input_common/sdl/sdl_impl.h @@ -59,7 +59,7 @@ public: /// Used by the Pollers during config std::atomic polling = false; - Common::SPSCQueue event_queue; + Common::MPMCQueue event_queue{1024}; std::vector GetInputDevices() override; diff --git a/src/input_common/udp/client.cpp b/src/input_common/udp/client.cpp index 9b0aec7970..d7b70332c0 100644 --- a/src/input_common/udp/client.cpp +++ b/src/input_common/udp/client.cpp @@ -338,7 +338,7 @@ void Client::UpdateYuzuSettings(std::size_t client, std::size_t pad_index, gyro[0], gyro[1], gyro[2], acc[0], acc[1], acc[2]); } UDPPadStatus pad{ - .host = clients[client].host, + .host = clients[client].host.c_str(), .port = clients[client].port, .pad_index = pad_index, }; @@ -346,12 +346,12 @@ void Client::UpdateYuzuSettings(std::size_t client, std::size_t pad_index, if (gyro[i] > 5.0f || gyro[i] < -5.0f) { pad.motion = static_cast(i); pad.motion_value = gyro[i]; - pad_queue.Push(pad); + pad_queue.push(pad); } if (acc[i] > 1.75f || acc[i] < -1.75f) { pad.motion = static_cast(i + 3); pad.motion_value = acc[i]; - pad_queue.Push(pad); + pad_queue.push(pad); } } } @@ -401,12 +401,10 @@ void Client::UpdateTouchInput(Response::TouchPad& touch_pad, std::size_t client, } void Client::BeginConfiguration() { - pad_queue.Clear(); configuring = true; } void Client::EndConfiguration() { - pad_queue.Clear(); configuring = false; } @@ -434,11 +432,11 @@ const Input::TouchStatus& Client::GetTouchState() const { return touch_status; } -Common::SPSCQueue& Client::GetPadQueue() { +Common::MPMCQueue& Client::GetPadQueue() { return pad_queue; } -const Common::SPSCQueue& Client::GetPadQueue() const { +const Common::MPMCQueue& Client::GetPadQueue() const { return pad_queue; } diff --git a/src/input_common/udp/client.h b/src/input_common/udp/client.h index 380f9bb76d..0814265e56 100644 --- a/src/input_common/udp/client.h +++ b/src/input_common/udp/client.h @@ -46,7 +46,7 @@ enum class PadTouch { }; struct UDPPadStatus { - std::string host{"127.0.0.1"}; + const char* host{"127.0.0.1"}; u16 port{26760}; std::size_t pad_index{}; PadMotion motion{PadMotion::Undefined}; @@ -85,8 +85,8 @@ public: bool DeviceConnected(std::size_t pad) const; void ReloadSockets(); - Common::SPSCQueue& GetPadQueue(); - const Common::SPSCQueue& GetPadQueue() const; + Common::MPMCQueue& GetPadQueue(); + const Common::MPMCQueue& GetPadQueue() const; DeviceStatus& GetPadState(const std::string& host, u16 port, std::size_t pad); const DeviceStatus& GetPadState(const std::string& host, u16 port, std::size_t pad) const; @@ -146,7 +146,7 @@ private: static constexpr std::size_t MAX_TOUCH_FINGERS = MAX_UDP_CLIENTS * 2; std::array pads{}; std::array clients{}; - Common::SPSCQueue pad_queue{}; + Common::MPMCQueue pad_queue{1024}; Input::TouchStatus touch_status{}; std::array finger_id{}; }; diff --git a/src/input_common/udp/udp.cpp b/src/input_common/udp/udp.cpp index 9829da6f00..3c3842c482 100644 --- a/src/input_common/udp/udp.cpp +++ b/src/input_common/udp/udp.cpp @@ -59,7 +59,7 @@ Common::ParamPackage UDPMotionFactory::GetNextInput() { Common::ParamPackage params; CemuhookUDP::UDPPadStatus pad; auto& queue = client->GetPadQueue(); - while (queue.Pop(pad)) { + while (queue.try_pop(pad)) { if (pad.motion == CemuhookUDP::PadMotion::Undefined || std::abs(pad.motion_value) < 1) { continue; } diff --git a/src/video_core/gpu_thread.cpp b/src/video_core/gpu_thread.cpp index 9547f277a0..3a7870260e 100644 --- a/src/video_core/gpu_thread.cpp +++ b/src/video_core/gpu_thread.cpp @@ -32,7 +32,8 @@ static void RunThread(std::stop_token stop_token, Core::System& system, VideoCore::RasterizerInterface* const rasterizer = renderer.ReadRasterizer(); while (!stop_token.stop_requested()) { - CommandDataContainer next = state.queue.PopWait(stop_token); + CommandDataContainer next; + state.queue.pop(next); if (stop_token.stop_requested()) { break; } @@ -119,7 +120,7 @@ u64 ThreadManager::PushCommand(CommandData&& command_data, bool block) { std::unique_lock lk(state.write_lock); const u64 fence{++state.last_fence}; - state.queue.Push(CommandDataContainer(std::move(command_data), fence, block)); + state.queue.push(CommandDataContainer(std::move(command_data), fence, block)); if (block) { state.cv.wait(lk, thread.get_stop_token(), [this, fence] { diff --git a/src/video_core/gpu_thread.h b/src/video_core/gpu_thread.h index 00984188ee..2ae3422012 100644 --- a/src/video_core/gpu_thread.h +++ b/src/video_core/gpu_thread.h @@ -97,9 +97,9 @@ struct CommandDataContainer { /// Struct used to synchronize the GPU thread struct SynchState final { - using CommandQueue = Common::SPSCQueue; + using CommandQueue = Common::MPMCQueue; std::mutex write_lock; - CommandQueue queue; + CommandQueue queue{100000}; u64 last_fence{}; std::atomic signaled_fence{}; std::condition_variable_any cv;