diff --git a/src/core/address_space.cpp b/src/core/address_space.cpp index 6ba442255..68d3b45fc 100644 --- a/src/core/address_space.cpp +++ b/src/core/address_space.cpp @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: Copyright 2024 shadPS4 Emulator Project // SPDX-License-Identifier: GPL-2.0-or-later +#include #include #include "common/alignment.h" #include "common/arch.h" diff --git a/src/core/libraries/kernel/threads/condvar.cpp b/src/core/libraries/kernel/threads/condvar.cpp index 161d17038..67c1ed12d 100644 --- a/src/core/libraries/kernel/threads/condvar.cpp +++ b/src/core/libraries/kernel/threads/condvar.cpp @@ -2,6 +2,8 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include +#include +#include "common/assert.h" #include "core/libraries/error_codes.h" #include "core/libraries/kernel/kernel.h" #include "core/libraries/kernel/threads/pthread.h" @@ -19,6 +21,59 @@ static constexpr PthreadCondAttr PhreadCondattrDefault = { .c_clockid = ClockId::Realtime, }; +static std::mutex sc_lock; +static std::unordered_map sc_table; + +void SleepqAdd(void* wchan, Pthread* td) { + auto [it, is_new] = sc_table.try_emplace(wchan, td->sleepqueue); + if (!is_new) { + it->second->sq_freeq.push_front(td->sleepqueue); + } + td->sleepqueue = nullptr; + td->wchan = wchan; + it->second->sq_blocked.push_back(td); +} + +int SleepqRemove(SleepQueue* sq, Pthread* td) { + std::erase(sq->sq_blocked, td); + if (sq->sq_blocked.empty()) { + td->sleepqueue = sq; + sc_table.erase(td->wchan); + td->wchan = nullptr; + return 0; + } else { + td->sleepqueue = sq->sq_freeq.front(); + sq->sq_freeq.pop_front(); + td->wchan = nullptr; + return 1; + } +} + +void SleepqDrop(SleepQueue* sq, void (*callback)(Pthread*, void*), void* arg) { + if (sq->sq_blocked.empty()) { + return; + } + + sc_table.erase(sq); + Pthread* td = sq->sq_blocked.front(); + sq->sq_blocked.pop_front(); + + callback(td, arg); + + td->sleepqueue = sq; + td->wchan = nullptr; + + auto sq2 = sq->sq_freeq.begin(); + for (Pthread* td : sq->sq_blocked) { + callback(td, arg); + td->sleepqueue = *sq2; + td->wchan = nullptr; + ++sq2; + } + sq->sq_blocked.clear(); + sq->sq_freeq.clear(); +} + static int CondInit(PthreadCondT* cond, const PthreadCondAttrT* cond_attr, const char* name) { auto* cvp = new PthreadCond{}; if (cvp == nullptr) { @@ -90,35 +145,61 @@ int PS4_SYSV_ABI posix_pthread_cond_destroy(PthreadCondT* cond) { return 0; } -int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime) { +int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime, u64 usec) { PthreadMutex* mp = *mutex; if (int error = mp->IsOwned(g_curthread); error != 0) { return error; } - //_thr_testcancel(curthread); - //_thr_cancel_enter2(curthread, 0); - if (abstime) { - const auto status = cond.wait_until(*mp, abstime->TimePoint()); - return status == std::cv_status::timeout ? POSIX_ETIMEDOUT : 0; - } else { - cond.wait(*mp); - return 0; - } - //_thr_cancel_leave(curthread, 0); -} + Pthread* curthread = g_curthread; + ASSERT_MSG(curthread->wchan == nullptr, "Thread was already on queue."); + // _thr_testcancel(curthread); + sc_lock.lock(); -int PthreadCond::Wait(PthreadMutexT* mutex, u64 usec) { - PthreadMutex* mp = *mutex; - if (int error = mp->IsOwned(g_curthread); error != 0) { - return error; - } + /* + * set __has_user_waiters before unlocking mutex, this allows + * us to check it without locking in pthread_cond_signal(). + */ + has_user_waiters = 1; + curthread->will_sleep = 1; - //_thr_testcancel(curthread); - //_thr_cancel_enter2(curthread, 0); - const auto status = cond.wait_for(*mp, std::chrono::microseconds(usec)); - return status == std::cv_status::timeout ? POSIX_ETIMEDOUT : 0; - //_thr_cancel_leave(curthread, 0); + int recurse; + mp->CvUnlock(&recurse); + + curthread->mutex_obj = mp; + SleepqAdd(this, curthread); + + int error = 0; + for (;;) { + curthread->wake_sema.try_acquire(); + sc_lock.unlock(); + + //_thr_cancel_enter2(curthread, 0); + int error = curthread->Sleep(abstime, usec) ? 0 : POSIX_ETIMEDOUT; + //_thr_cancel_leave(curthread, 0); + + sc_lock.lock(); + if (curthread->wchan == nullptr) { + error = 0; + break; + } else if (curthread->ShouldCancel()) { + SleepQueue* sq = sc_table[this]; + has_user_waiters = SleepqRemove(sq, curthread); + sc_lock.unlock(); + curthread->mutex_obj = nullptr; + mp->CvLock(recurse); + return 0; + } else if (error == POSIX_ETIMEDOUT) { + SleepQueue* sq = sc_table[this]; + has_user_waiters = SleepqRemove(sq, curthread); + break; + } + UNREACHABLE(); + } + sc_lock.unlock(); + curthread->mutex_obj = nullptr; + mp->CvLock(recurse); + return error; } int PS4_SYSV_ABI posix_pthread_cond_wait(PthreadCondT* cond, PthreadMutexT* mutex) { @@ -143,20 +224,102 @@ int PS4_SYSV_ABI posix_pthread_cond_reltimedwait_np(PthreadCondT* cond, PthreadM u64 usec) { PthreadCond* cvp{}; CHECK_AND_INIT_COND - return cvp->Wait(mutex, usec); + return cvp->Wait(mutex, THR_RELTIME, usec); +} + +int PthreadCond::Signal() { + Pthread* curthread = g_curthread; + + sc_lock.lock(); + auto it = sc_table.find(this); + if (it == sc_table.end()) { + sc_lock.unlock(); + return 0; + } + + SleepQueue* sq = it->second; + Pthread* td = sq->sq_blocked.front(); + PthreadMutex* mp = td->mutex_obj; + has_user_waiters = SleepqRemove(sq, td); + + std::binary_semaphore* waddr = nullptr; + if (mp->m_owner == curthread) { + if (curthread->nwaiter_defer >= Pthread::MaxDeferWaiters) { + curthread->WakeAll(); + } + curthread->defer_waiters[curthread->nwaiter_defer++] = &td->wake_sema; + mp->m_flags |= PthreadMutexFlags::Defered; + } else { + waddr = &td->wake_sema; + } + + sc_lock.unlock(); + if (waddr != nullptr) { + waddr->release(); + } + return 0; +} + +struct BroadcastArg { + Pthread* curthread; + std::binary_semaphore* waddrs[Pthread::MaxDeferWaiters]; + int count; +}; + +int PthreadCond::Broadcast() { + BroadcastArg ba; + ba.curthread = g_curthread; + ba.count = 0; + + const auto drop_cb = [](Pthread* td, void* arg) { + BroadcastArg* ba = reinterpret_cast(arg); + Pthread* curthread = ba->curthread; + PthreadMutex* mp = td->mutex_obj; + + if (mp->m_owner == curthread) { + if (curthread->nwaiter_defer >= Pthread::MaxDeferWaiters) { + curthread->WakeAll(); + } + curthread->defer_waiters[curthread->nwaiter_defer++] = &td->wake_sema; + mp->m_flags |= PthreadMutexFlags::Defered; + } else { + if (ba->count >= Pthread::MaxDeferWaiters) { + for (int i = 0; i < ba->count; i++) { + ba->waddrs[i]->release(); + } + ba->count = 0; + } + ba->waddrs[ba->count++] = &td->wake_sema; + } + }; + + sc_lock.lock(); + auto it = sc_table.find(this); + if (it == sc_table.end()) { + sc_lock.unlock(); + return 0; + } + + SleepqDrop(it->second, drop_cb, &ba); + has_user_waiters = 0; + sc_lock.unlock(); + + for (int i = 0; i < ba.count; i++) { + ba.waddrs[i]->release(); + } + return 0; } int PS4_SYSV_ABI posix_pthread_cond_signal(PthreadCondT* cond) { PthreadCond* cvp{}; CHECK_AND_INIT_COND - cvp->cond.notify_one(); - return 0; + return cvp->Signal(); } int PS4_SYSV_ABI posix_pthread_cond_broadcast(PthreadCondT* cond) { PthreadCond* cvp{}; CHECK_AND_INIT_COND - cvp->cond.notify_all(); + cvp->Broadcast(); return 0; } diff --git a/src/core/libraries/kernel/threads/mutex.cpp b/src/core/libraries/kernel/threads/mutex.cpp index b6caad297..a414dadc1 100644 --- a/src/core/libraries/kernel/threads/mutex.cpp +++ b/src/core/libraries/kernel/threads/mutex.cpp @@ -18,7 +18,6 @@ static std::mutex MutxStaticLock; #define THR_MUTEX_INITIALIZER ((PthreadMutex*)NULL) #define THR_ADAPTIVE_MUTEX_INITIALIZER ((PthreadMutex*)1) #define THR_MUTEX_DESTROYED ((PthreadMutex*)2) -#define THR_MUTEX_RELTIME (const OrbisKernelTimespec*)-1 #define CPU_SPINWAIT __asm__ volatile("pause") @@ -138,7 +137,7 @@ int PthreadMutex::SelfTryLock() { int PthreadMutex::SelfLock(const OrbisKernelTimespec* abstime, u64 usec) { const auto DoSleep = [&] { - if (abstime == THR_MUTEX_RELTIME) { + if (abstime == THR_RELTIME) { std::this_thread::sleep_for(std::chrono::microseconds(usec)); return POSIX_ETIMEDOUT; } else { @@ -225,11 +224,11 @@ int PthreadMutex::Lock(const OrbisKernelTimespec* abstime, u64 usec) { if (abstime == nullptr) { m_lock.lock(); - } else if (abstime != THR_MUTEX_RELTIME && - (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)) [[unlikely]] { + } else if (abstime != THR_RELTIME && (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)) + [[unlikely]] { ret = POSIX_EINVAL; } else { - if (THR_MUTEX_RELTIME) { + if (THR_RELTIME) { ret = m_lock.try_lock_for(std::chrono::microseconds(usec)) ? 0 : POSIX_ETIMEDOUT; } else { ret = m_lock.try_lock_until(abstime->TimePoint()) ? 0 : POSIX_ETIMEDOUT; @@ -269,7 +268,7 @@ int PS4_SYSV_ABI posix_pthread_mutex_timedlock(PthreadMutexT* mutex, int PS4_SYSV_ABI posix_pthread_mutex_reltimedlock_np(PthreadMutexT* mutex, u64 usec) { CHECK_AND_INIT_MUTEX - return (*mutex)->Lock(THR_MUTEX_RELTIME, usec); + return (*mutex)->Lock(THR_RELTIME, usec); } int PthreadMutex::Unlock() { @@ -284,8 +283,15 @@ int PthreadMutex::Unlock() { if (Type() == PthreadMutexType::Recursive && m_count > 0) [[unlikely]] { m_count--; } else { + int defered = True(m_flags & PthreadMutexFlags::Defered); + m_flags &= ~PthreadMutexFlags::Defered; + curthread->Dequeue(this); m_lock.unlock(); + + if (curthread->will_sleep == 0 && defered) { + curthread->WakeAll(); + } } return 0; } diff --git a/src/core/libraries/kernel/threads/pthread.h b/src/core/libraries/kernel/threads/pthread.h index ff738a6be..b4364eb6e 100644 --- a/src/core/libraries/kernel/threads/pthread.h +++ b/src/core/libraries/kernel/threads/pthread.h @@ -3,7 +3,6 @@ #pragma once -#include #include #include #include @@ -61,22 +60,28 @@ struct PthreadMutex : public ListBaseHook { return static_cast(m_flags & PthreadMutexFlags::TypeMask); } - void lock() { - Lock(nullptr); - } - - void unlock() { - Unlock(); - } - int SelfTryLock(); int SelfLock(const OrbisKernelTimespec* abstime, u64 usec); int TryLock(); int Lock(const OrbisKernelTimespec* abstime, u64 usec = 0); + int CvLock(int recurse) { + const int error = Lock(nullptr); + if (error == 0) { + m_count = recurse; + } + return error; + } + int Unlock(); + int CvUnlock(int* recurse) { + *recurse = m_count; + m_count = 0; + return Unlock(); + } + bool IsOwned(Pthread* curthread) const; }; using PthreadMutexT = PthreadMutex*; @@ -111,15 +116,16 @@ enum class ClockId : u32 { }; struct PthreadCond { - std::condition_variable_any cond; u32 has_user_waiters; u32 has_kern_waiters; u32 flags; ClockId clock_id; std::string name; - int Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime); - int Wait(PthreadMutexT* mutex, u64 usec); + int Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime, u64 usec = 0); + + int Signal(); + int Broadcast(); }; using PthreadCondT = PthreadCond*; @@ -247,8 +253,11 @@ struct SchedParam { int sched_priority; }; +#define THR_RELTIME (const OrbisKernelTimespec*)-1 + struct Pthread { static constexpr u32 ThrMagic = 0xd09ba115U; + static constexpr u32 MaxDeferWaiters = 50; std::atomic tid; std::mutex lock; @@ -296,6 +305,8 @@ struct Pthread { PthreadMutex* mutex_obj; bool will_sleep; bool has_user_waiters; + int nwaiter_defer; + std::binary_semaphore* defer_waiters[MaxDeferWaiters]; bool InCritical() const noexcept { return locklevel > 0 || critical_count > 0; @@ -309,6 +320,28 @@ struct Pthread { return cancel_pending && cancel_enable && no_cancel == 0; } + void WakeAll() { + for (int i = 0; i < nwaiter_defer; i++) { + defer_waiters[i]->release(); + } + nwaiter_defer = 0; + } + + bool Sleep(const OrbisKernelTimespec* abstime, u64 usec) { + will_sleep = 0; + if (nwaiter_defer > 0) { + WakeAll(); + } + if (abstime == THR_RELTIME) { + return wake_sema.try_acquire_for(std::chrono::microseconds(usec)); + } else if (abstime != nullptr) { + return wake_sema.try_acquire_until(abstime->TimePoint()); + } else { + wake_sema.acquire(); + return true; + } + } + void Enqueue(PthreadMutex* mutex) { mutex->m_owner = this; // mutexq.push_back(*mutex);