Initial community commit

This commit is contained in:
Jef 2024-09-24 14:54:57 +02:00
parent 537bcbc862
commit fc06254474
16440 changed files with 4239995 additions and 2 deletions

View file

@ -0,0 +1,79 @@
#include "ThreadFunctions.h"
#include "threadpool_types.h"
ThreadFunctions::ThreadFunctions(int create_function_list)
{
if (create_function_list)
{
functions_semaphore = CreateSemaphore(0, 0, ThreadPoolTypes::MAX_SEMAPHORE_VALUE, 0);
InitializeCriticalSectionAndSpinCount(&functions_guard, 200);
}
else
functions_semaphore = 0;
}
ThreadFunctions::~ThreadFunctions()
{
if (functions_semaphore)
{
CloseHandle(functions_semaphore);
DeleteCriticalSection(&functions_guard);
}
}
void ThreadFunctions::Add(HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id)
{
Nullsoft::Utility::AutoLock l(guard);
Data *new_data = (Data *)calloc(1, sizeof(Data));
new_data->func = func;
new_data->user_data = user_data;
new_data->id = id;
data[handle] = new_data;
}
bool ThreadFunctions::Get(HANDLE handle, api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id)
{
Nullsoft::Utility::AutoLock l(guard);
DataMap::iterator found = data.find(handle);
if (found == data.end())
return false;
const Data *d = found->second;
*func = d->func;
*user_data = d->user_data;
*id = d->id;
return true;
}
void ThreadFunctions::QueueFunction(api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id)
{
Data *new_data = (Data *)calloc(1, sizeof(Data));
new_data->func = func;
new_data->user_data = user_data;
new_data->id = id;
EnterCriticalSection(&functions_guard);
functions_list.push_front(new_data);
LeaveCriticalSection(&functions_guard); // unlock before releasing the semaphore early so we don't lock convoy
ReleaseSemaphore(functions_semaphore, 1, 0);
}
bool ThreadFunctions::PopFunction(api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id)
{
EnterCriticalSection(&functions_guard);
if (!functions_list.empty())
{
ThreadFunctions::Data *data = functions_list.back();
functions_list.pop_back();
LeaveCriticalSection(&functions_guard);
*func = data->func;
*user_data = data->user_data;
*id = data->id;
free(data);
return true;
}
else
{
LeaveCriticalSection(&functions_guard);
return false;
}
}

View file

@ -0,0 +1,31 @@
#pragma once
#include "api_threadpool.h"
#include <map>
#include <deque>
#include "../AutoLock.h"
class ThreadFunctions
{
public:
struct Data
{
api_threadpool::ThreadPoolFunc func;
void *user_data;
intptr_t id;
};
ThreadFunctions(int create_function_list=1);
~ThreadFunctions();
void Add(HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id);
bool Get(HANDLE handle, api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id);
void QueueFunction(api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id);
bool PopFunction(api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id);
typedef std::map<HANDLE, const ThreadFunctions::Data*> DataMap;
DataMap data;
Nullsoft::Utility::LockGuard guard;
typedef std::deque<ThreadFunctions::Data*> FuncList;
FuncList functions_list;
CRITICAL_SECTION functions_guard;
HANDLE functions_semaphore;
};

View file

@ -0,0 +1,274 @@
#include "ThreadID.h"
DWORD ThreadID::thread_func_stub(LPVOID param)
{
ThreadID *t = static_cast<ThreadID*>(param);
if (t != NULL)
{
return t->ThreadFunction();
}
else return 0;
}
void ThreadID::Kill()
{
if (threadHandle && threadHandle != INVALID_HANDLE_VALUE)
{
//cut: WaitForSingleObject(threadHandle, INFINITE);
while (WaitForMultipleObjectsEx(1, &threadHandle, FALSE, INFINITE, TRUE) != WAIT_OBJECT_0)
{
}
}
}
ThreadID::ThreadID(ThreadFunctions *t_f, HANDLE killswitch, HANDLE global_functions_semaphore,
ThreadPoolTypes::HandleList &inherited_handles,
volatile LONG *thread_count, HANDLE _max_load_event,
int _reserved, int _com_type) : ThreadFunctions(_reserved)
{
/* initialize values */
released = false;
InitializeCriticalSection(&handle_lock);
/* grab values passed to us */
reserved = _reserved;
com_type = _com_type;
max_load_event = _max_load_event;
global_functions = t_f;
num_threads_available = thread_count;
/* wait_handles[0] is kill switch */
wait_handles.push_back(killswitch);
/* wait_handles[1] is wake switch */
wakeHandle = CreateSemaphore(0, 0, ThreadPoolTypes::MAX_SEMAPHORE_VALUE, 0);
wait_handles.push_back(wakeHandle);
if (reserved)
{
/* if thread is reserved,
wait_handles[2] is a Funcion Call wake semaphore
for this thread only. */
wait_handles.push_back(functions_semaphore); // WAIT_OBJECT_0+1 == per-thread queued functions
}
else
{
/* if thread is not reserved,
wait_handles[2] is a Function Call wake semaphore
global to all threads */
wait_handles.push_back(global_functions_semaphore); // WAIT_OBJECT_0+2 == any-thread queued functions
}
/* add inherited handles
(handles added to thread pool before this thread was created) */
for ( ThreadPoolTypes::HandleList::iterator itr = inherited_handles.begin(); itr != inherited_handles.end(); itr++ )
{
wait_handles.push_back( *itr );
}
/* start thread */
threadHandle = CreateThread(0, 0, thread_func_stub, this, 0, 0);
}
ThreadID::~ThreadID()
{
CloseHandle(threadHandle);
CloseHandle(wakeHandle);
DeleteCriticalSection(&handle_lock);
}
bool ThreadID::TryAddHandle(HANDLE new_handle)
{
// let's see if we get lucky and can access the handle list directly
if (TryEnterCriticalSection(&handle_lock))
{
// made it
wait_handles.push_back(new_handle);
LeaveCriticalSection(&handle_lock);
return true;
}
else
{
ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple...
return false;
}
}
void ThreadID::WaitAddHandle(HANDLE handle)
{
// wakeHandle already got released once by nature of this function being called
EnterCriticalSection(&handle_lock);
wait_handles.push_back(handle);
LeaveCriticalSection(&handle_lock);
ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait
}
void ThreadID::AddHandle(HANDLE new_handle)
{
if (!TryAddHandle(new_handle))
WaitAddHandle(new_handle);
}
bool ThreadID::TryRemoveHandle(HANDLE handle)
{
// let's see if we get lucky and can access the handle list directly
if (TryEnterCriticalSection(&handle_lock))
{
RemoveHandle_Internal(handle);
LeaveCriticalSection(&handle_lock);
return true;
}
else
{
ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple...
return false;
}
return false;
}
void ThreadID::WaitRemoveHandle(HANDLE handle)
{
// wakeHandle already got released once by nature of this function being called
EnterCriticalSection(&handle_lock);
RemoveHandle_Internal(handle);
LeaveCriticalSection(&handle_lock);
ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait
}
void ThreadID::RemoveHandle(HANDLE handle)
{
if (!TryRemoveHandle(handle))
WaitRemoveHandle(handle);
}
void ThreadID::RemoveHandle_Internal(HANDLE handle)
{
// first three handles are reserved, so start after that
for (size_t i=3;i<wait_handles.size();i++)
{
if (wait_handles[i] == handle)
{
wait_handles.erase(wait_handles.begin() + i);
i--;
}
}
}
bool ThreadID::IsReserved() const
{
return !!reserved;
}
DWORD CALLBACK ThreadID::ThreadFunction()
{
switch(com_type)
{
case api_threadpool::FLAG_REQUIRE_COM_MT:
CoInitializeEx(0, COINIT_MULTITHREADED);
break;
case api_threadpool::FLAG_REQUIRE_COM_STA:
CoInitialize(0);
break;
}
while (1)
{
InterlockedIncrement(num_threads_available);
EnterCriticalSection(&handle_lock);
DWORD ret = WaitForMultipleObjectsEx((DWORD)wait_handles.size(), wait_handles.data(), FALSE, INFINITE, TRUE);
// cut: LeaveCriticalSection(&handle_lock);
if (InterlockedDecrement(num_threads_available) == 0 && !reserved)
SetEvent(max_load_event); // notify the watch dog if all the threads are used up
if (ret == WAIT_OBJECT_0)
{
// killswitch
LeaveCriticalSection(&handle_lock);
break;
}
else if (ret == WAIT_OBJECT_0 + 1)
{
// we got woken up to release the handles lock
// wait for the second signal
LeaveCriticalSection(&handle_lock);
InterlockedIncrement(num_threads_available);
WaitForSingleObject(wakeHandle, INFINITE);
InterlockedDecrement(num_threads_available);
}
else if (ret == WAIT_OBJECT_0 + 2)
{
LeaveCriticalSection(&handle_lock);
api_threadpool::ThreadPoolFunc func;
void *user_data;
intptr_t id;
if (reserved)
{
// per-thread queued functions
if (PopFunction(&func, &user_data, &id))
{
func(0, user_data, id);
}
}
else
{
// global queued functions
if (global_functions->PopFunction(&func, &user_data, &id))
{
func(0, user_data, id);
}
}
}
else if (ret > WAIT_OBJECT_0 && ret < (WAIT_OBJECT_0 + wait_handles.size()))
{
DWORD index = ret - WAIT_OBJECT_0;
HANDLE handle = wait_handles[index];
LeaveCriticalSection(&handle_lock);
/* !!! race condition here if someone calls ThreadPool::RemoveHandle and then CloseHandle() !!!
before calling RemoveHandle, caller needs to either
ensure that Event is unsignalled (And won't be signalled)
or call RemoveHandle from within the function callback */
api_threadpool::ThreadPoolFunc func;
void *user_data;
intptr_t id;
if (global_functions->Get(handle, &func, &user_data, &id))
{
func(handle, user_data, id);
}
}
else
{
LeaveCriticalSection(&handle_lock);
}
}
if (com_type & api_threadpool::MASK_COM_FLAGS)
CoUninitialize();
return 0;
}
bool ThreadID::CanRunCOM(int flags) const
{
switch(com_type)
{
case api_threadpool::FLAG_REQUIRE_COM_MT: // if we're a CONIT_MULTITHREADEX thread (default)
return !(flags & api_threadpool::FLAG_REQUIRE_COM_STA); // don't let STA stuff run
case api_threadpool::FLAG_REQUIRE_COM_STA: // if we're a CoInitialize(0) thread
return !(flags & api_threadpool::FLAG_REQUIRE_COM_MT); // don't let MT stuff run
}
return false; // shouldn't get here
}
bool ThreadID::IsReleased() const
{
return released;
}
void ThreadID::Reserve()
{
released=false;
}
void ThreadID::Release()
{
released=true;
}

View file

@ -0,0 +1,56 @@
#pragma once
#include <windows.h>
#include "ThreadFunctions.h"
#include "threadpool_types.h"
#include <vector>
class ThreadID : private ThreadFunctions
{
public:
static DWORD CALLBACK thread_func_stub(LPVOID param);
ThreadID(ThreadFunctions *t_f, HANDLE killswitch, HANDLE global_functions_semaphore, ThreadPoolTypes::HandleList &inherited_handles, volatile LONG *thread_count, HANDLE _max_load_event, int _reserved, int _com_type);
~ThreadID();
void Kill();
/* Try and Wait must be paired!!! */
bool TryAddHandle(HANDLE new_handle);
void WaitAddHandle(HANDLE new_handle);
void AddHandle(HANDLE new_handle);
/* Try and Wait must be paired!!! */
bool TryRemoveHandle(HANDLE handle);
void WaitRemoveHandle(HANDLE handle);
void RemoveHandle(HANDLE handle);
using ThreadFunctions::QueueFunction;
bool IsReserved() const;
bool IsReleased() const;
bool CanRunCOM(int flags) const;
void Reserve(); // re-reserves a released thread
void Release(); // release a reversed thread
private:
void RemoveHandle_Internal(HANDLE handle);
DWORD CALLBACK ThreadFunction();
int reserved;
ThreadFunctions *global_functions;
volatile LONG *num_threads_available;
int com_type;
bool released;
ThreadFunctions local_functions;
// list of handles we're waiting on
typedef std::vector<HANDLE> HandleList;
HandleList wait_handles;
CRITICAL_SECTION handle_lock;
// handles we create/own
HANDLE threadHandle;
HANDLE wakeHandle;
// handles given to us
HANDLE max_load_event;
};

View file

@ -0,0 +1,365 @@
#include "ThreadPool.h"
ThreadPool::ThreadPool()
{
for ( int i = 0; i < THREAD_TYPES; i++ )
{
num_threads_available[ i ] = 0;
max_load_event[ i ] = CreateEvent( NULL, TRUE, FALSE, NULL );
}
killswitch = CreateEvent( NULL, TRUE, FALSE, NULL );
// one thread of each type to start
for ( int i = 0; i < 2; i++ )
CreateNewThread_Internal( i );
watchdog_thread_handle = CreateThread( 0, 0, WatchDogThreadProcedure_stub, this, 0, 0 );
}
void ThreadPool::Kill()
{
SetEvent( killswitch );
WaitForSingleObject( watchdog_thread_handle, INFINITE );
CloseHandle( watchdog_thread_handle );
for ( ThreadID *l_thread : threads )
{
l_thread->Kill();
delete l_thread;
}
CloseHandle( killswitch );
for ( int i = 0; i < THREAD_TYPES; i++ )
CloseHandle( max_load_event[ i ] );
}
DWORD ThreadPool::WatchDogThreadProcedure_stub( LPVOID param )
{
ThreadPool *_this = (ThreadPool *)param;
return _this->WatchDogThreadProcedure();
}
/*
watchdog will get woken up when number of available threads hits zero
it creates a new thread, sleeps for a bit to let things "settle" and then reset the event
it will need a copy of all "any-thread" handles to build the new thread, and will need to manage in a thread safe way
(so a new thread doesn't "miss" a handle that is added in the interim)
*/
DWORD CALLBACK ThreadPool::WatchDogThreadProcedure()
{
// we ignore the max load event for reserved threads
HANDLE events[ 3 ] = { killswitch, max_load_event[ TYPE_MT ], max_load_event[ TYPE_STA ] };
while ( 1 )
{
DWORD ret = WaitForMultipleObjects( 3, events, FALSE, INFINITE );
if ( ret == WAIT_OBJECT_0 )
{
break;
}
else if ( ret == WAIT_OBJECT_0 + 1 || ret == WAIT_OBJECT_0 + 2 )
{
int thread_type = ret - ( WAIT_OBJECT_0 + 1 );
// this signal is for "full thread load reached"
// lets make sure we're actually at max capacity
Sleep( 10 ); // sleep a bit
if ( num_threads_available[ thread_type ] != 0 ) // see if we're still fully-loaded
continue;
guard.Lock();
CreateNewThread_Internal( thread_type );
guard.Unlock();
Sleep( 250 ); // give the system time to 'settle down' so we don't spawn a ton of threads in a row
ResetEvent( max_load_event[ thread_type ] );
}
}
return 0;
}
ThreadID *ThreadPool::ReserveThread( int flags )
{
// first, check to see if there's any released threads we can grab
Nullsoft::Utility::AutoLock threadlock( guard );
for ( ThreadID *t : threads )
{
if ( t->IsReserved() && t->IsReleased() && t->CanRunCOM( flags ) )
{
t->Reserve();
return t;
}
}
// TODO: if there are enough free threads available, mark one as reserved
// this will involve signalling the thread to switch to 'reserved' mode
// swapping out the 'function list' semaphore with a local one
// and removing all 'busy handles' from the queue
// can probably use the 'wake' handle to synchronize this
/*
int thread_type = GetThreadType(flags);
if (num_threads_available[thread_type > 2])
{
for (size_t i=0;i!=threads.size();i++)
{
if (threads[i]->IsReserved() == false && threads[i]->CanRunCOM(flags))
{
}
}
}
*/
ThreadID *new_thread = CreateNewThread_Internal( GetThreadType( flags, 1 ) );
return new_thread;
}
void ThreadPool::ReleaseThread( ThreadID *thread_id )
{
if ( thread_id )
{
// lock so there's no race condition between ReserveThread() and ReleaseThread()
Nullsoft::Utility::AutoLock threadlock( guard );
thread_id->Release();
}
}
int ThreadPool::AddHandle( ThreadID *thread_id, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags )
{
// TODO: need to ensure that handles are not duplicated
thread_functions.Add( handle, func, user_data, id );
if ( thread_id )
{
if ( thread_id->CanRunCOM( flags ) )
thread_id->AddHandle( handle );
else
return 1;
return 0;
}
else
{
/* increment thread counts temporarily - because the massive wake-up
causes thread counts to go to 0 */
for ( int i = 0; i < THREAD_TYPES; i++ )
InterlockedIncrement( &num_threads_available[ i ] );
guard.Lock();
AddHandle_Internal( 0, handle, flags );
bool thread_types[ THREAD_TYPES ];
GetThreadTypes( flags, thread_types );
for ( int i = 0; i < THREAD_TYPES; i++ )
{
if ( thread_types[ i ] )
any_thread_handles[ i ].push_back( handle );
}
guard.Unlock();
for ( int i = 0; i < THREAD_TYPES; i++ )
InterlockedDecrement( &num_threads_available[ i ] );
}
return 0;
}
/* helper functions for adding/removing handles,
we keep going through the list as long as we can add/remove immediately.
once we have to block, we recurse the function starting at the next handle
when the function returns, we wait.
this lets us do some work rather than sit and wait for each thread's lock */
void ThreadPool::RemoveHandle_Internal(size_t start, HANDLE handle)
{
for (;start!=threads.size();start++)
{
ThreadID *t = threads[start];
if (!t->TryRemoveHandle(handle)) // try to remove
{
// have to wait
RemoveHandle_Internal(start+1, handle); // recurse start with the next thread
t->WaitRemoveHandle(handle); // finish the job
return;
}
}
}
void ThreadPool::AddHandle_Internal(size_t start, HANDLE handle, int flags)
{
for (;start<threads.size();start++)
{
ThreadID *t = threads[start];
if ((flags & api_threadpool::FLAG_LONG_EXECUTION) && t->IsReserved())
continue;
if (!t->CanRunCOM(flags))
continue;
if (!t->TryAddHandle(handle)) // try to add
{
// have to wait,
AddHandle_Internal(start+1, handle, flags); // recurse start with the next thread
t->WaitAddHandle(handle); // finish the job
return;
}
}
}
void ThreadPool::RemoveHandle(ThreadID *thread_id, HANDLE handle)
{
if (thread_id)
{
thread_id->RemoveHandle(handle);
}
else
{
/* increment thread counts temporarily - because the massive wake-up
causes thread counts to go to 0 */
for (int i=0;i<THREAD_TYPES;i++)
InterlockedIncrement(&num_threads_available[i]);
guard.Lock();
RemoveHandle_Internal(0, handle);
for (int j=0;j<THREAD_TYPES;j++)
{
//for (ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin();
// itr != any_thread_handles[j].end();
// itr++)
ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin();
while(itr != any_thread_handles[j].end())
{
if (*itr == handle)
{
itr = any_thread_handles[j].erase(itr);
}
else
{
itr++;
}
}
}
guard.Unlock();
for (int i=0;i<THREAD_TYPES;i++)
InterlockedDecrement(&num_threads_available[i]);
}
}
int ThreadPool::RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags)
{
if (threadid)
threadid->QueueFunction(func, user_data, id);
else
thread_functions.QueueFunction(func, user_data, id);
return 0;
}
ThreadID *ThreadPool::CreateNewThread_Internal(int thread_type)
{
int reserved=0;
int com_type = api_threadpool::FLAG_REQUIRE_COM_MT; // default
switch(thread_type)
{
case TYPE_STA_RESERVED:
reserved=1;
case TYPE_STA:
com_type = api_threadpool::FLAG_REQUIRE_COM_STA;
break;
case TYPE_MT_RESERVED:
reserved=1;
case TYPE_MT:
com_type = api_threadpool::FLAG_REQUIRE_COM_MT;
break;
}
Nullsoft::Utility::AutoLock threadlock(guard); // lock here (rather than after new ThreadID) to protect any_thread_handles
ThreadID *new_thread = new ThreadID(&thread_functions, killswitch, thread_functions.functions_semaphore,
any_thread_handles[thread_type],
&num_threads_available[thread_type], max_load_event[thread_type],
reserved, com_type);
threads.push_back(new_thread);
return new_thread;
}
size_t ThreadPool::GetNumberOfThreads()
{
Nullsoft::Utility::AutoLock threadlock(guard);
return threads.size();
}
size_t ThreadPool::GetNumberOfActiveThreads()
{
size_t numThreads = GetNumberOfThreads();
for (int i=0;i<THREAD_TYPES;i++)
numThreads -= num_threads_available[i];
return numThreads;
}
int ThreadPool::GetThreadType(int flags, int reserved)
{
flags &= api_threadpool::MASK_COM_FLAGS;
int thread_type=TYPE_MT;
switch(flags)
{
case api_threadpool::FLAG_REQUIRE_COM_STA:
thread_type = reserved?TYPE_STA_RESERVED:TYPE_STA;
break;
case 0: // default
case api_threadpool::FLAG_REQUIRE_COM_MT:
thread_type = reserved?TYPE_MT_RESERVED:TYPE_MT;
break;
}
return thread_type;
}
void ThreadPool::GetThreadTypes(int flags, bool types[THREAD_TYPES])
{
for (int i=0;i<THREAD_TYPES;i++)
{
types[i]=true;
}
if (flags & api_threadpool::FLAG_REQUIRE_COM_STA)
{
types[TYPE_MT] = false;
types[TYPE_MT] = false;
}
if (flags & api_threadpool::FLAG_REQUIRE_COM_STA)
{
types[TYPE_STA] = false;
types[TYPE_STA_RESERVED] = false;
}
if (flags & api_threadpool::FLAG_LONG_EXECUTION)
{
types[TYPE_STA_RESERVED] = false;
types[TYPE_MT_RESERVED] = false;
}
}
#define CBCLASS ThreadPool
START_DISPATCH;
CB(RESERVETHREAD, ReserveThread)
VCB(RELEASETHREAD, ReleaseThread)
CB(ADDHANDLE, AddHandle)
VCB(REMOVEHANDLE, RemoveHandle)
CB(RUNFUNCTION, RunFunction)
CB(GETNUMBEROFTHREADS, GetNumberOfThreads)
CB(GETNUMBEROFACTIVETHREADS, GetNumberOfActiveThreads)
END_DISPATCH;
#undef CBCLASS

View file

@ -0,0 +1,98 @@
#pragma once
#include <windows.h>
#include <bfc/platform/types.h>
#include <vector>
#include "../autolock.h"
#include "ThreadID.h"
#include "ThreadFunctions.h"
#include "threadpool_types.h"
/* random notes
HANDLEs common to all threads
WaitForMultipleObjectsEx() around these
0 - killswitch
1 - shared APC event. since threads might want to use APCs themselves, we'll use a different mechanism (thread-safe FIFO and an event). the intention is that APCs that can go on any thread will use this handle
2 - per thread APC event.
parameters for "run my function" method
function pointer, user data, flags
flags:
interrupt - for very short non-locking functions where it is safe to interrupt another thread, uses QueueUserAPC
no_wait - spawn a new thread if all threads are busy
com_multithreaded - all threads are created with CoInitialize(0), if you need a COINIT_MULTITHREADED thread, use this flag
parameters for "add my handle" method
handle, function pointer, user data, flags
flags:
single_thread - only one thread in the pool will wait on your object, useful if your handle is not auto-reset
parameters for "function call repeat" - calls your function until you return 0
function pointer, user data, flags
flags:
single_thread - keep calling on the same thread
*/
class ThreadPool : public api_threadpool
{
public:
static const char *getServiceName() { return "Thread Pool API"; }
static const GUID getServiceGuid() { return ThreadPoolGUID; }
public:
// Owner API:
ThreadPool();
void Kill();
// User API:
/* If you have multiple events, APCs, etc and you need them to always run on the same thread
you can reserve one */
ThreadID *ReserveThread(int flags);
/* Release a thread you've previously reserved */
void ReleaseThread(ThreadID *thread_id);
/* adds a waitable handle to the thread pool. when the event is signalled, your function ptr will get called
user_data and id values get passed to your function.
your function should return 1 to indicate that it can be removed
flags, see api_threadpool */
int AddHandle(ThreadID *threadid, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags);
void RemoveHandle(ThreadID *threadid, HANDLE handle);
int RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags);
size_t GetNumberOfThreads(); // total number of threads in the threadpool
size_t GetNumberOfActiveThreads(); // number of threads that are currently being used (inside user function but not necessarily busy)
private:
enum
{
TYPE_MT = 0,
TYPE_STA = 1,
TYPE_MT_RESERVED = 2,
TYPE_STA_RESERVED = 3,
THREAD_TYPES = 4, // two thread types, single threaded apartment COM and multithreaded COM
};
private:
static DWORD CALLBACK WatchDogThreadProcedure_stub(LPVOID param);
ThreadID *CreateNewThread_Internal(int thread_type = 0);
DWORD CALLBACK WatchDogThreadProcedure();
static int GetThreadType(int flags, int reserved = 0);
static void GetThreadTypes(int flags, bool types[THREAD_TYPES]);
void RemoveHandle_Internal(size_t start, HANDLE handle); // recursive helper function for RemoveHandle()
void AddHandle_Internal(size_t start, HANDLE handle, int flags); // recursive helper function for RemoveHandle()
Nullsoft::Utility::LockGuard guard; // guards threads, any_thread_handles, and non_reserved_handles data structures
typedef std::vector<ThreadID*> ThreadList;
ThreadList threads;
ThreadPoolTypes::HandleList any_thread_handles[THREAD_TYPES];
HANDLE killswitch;
HANDLE watchdog_thread_handle;
volatile LONG num_threads_available[THREAD_TYPES];
ThreadFunctions thread_functions;
HANDLE max_load_event[THREAD_TYPES];
protected:
RECVS_DISPATCH;
};

View file

@ -0,0 +1,54 @@
#ifndef NU_THREADPOOL_TIMERHANDLE_H
#define NU_THREADPOOL_TIMERHANDLE_H
#if !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x400)
#error Must define _WIN32_WINNT >= 0x400 to use TimerHandle
#endif
#include <windows.h>
#include <bfc/platform/types.h>
/*
TimerHandle() constructor will make a new timer handle
TimerHandle(existing_handle) will "take over" an existing handle
~TimerHandle() DOES NOT CloseHandle as this object is meant as a helper
call Close() to kill the timer handle
The timer will be "one shot" auto-reset.
Because it is meant to be compatible with the threadpool, manual-reset timers and periodic timers
are not recommended!! You will have re-entrancy problems
If you want "periodic" behavior, call Wait() at the end of your ThreadPoolFunc
*/
class TimerHandle
{
public:
TimerHandle() { timerHandle = CreateWaitableTimer( 0, FALSE, 0 ); }
TimerHandle( HANDLE p_handle ) { timerHandle = p_handle; }
void Close() { CloseHandle( timerHandle ); }
void Wait( uint64_t p_milliseconds )
{
/* MSDN notes about SetWaitableTimer: 100 nanosecond resolution, Negative values indicate relative time*/
LARGE_INTEGER timeout = { 0 };
timeout.QuadPart = -( (int64_t)p_milliseconds * 1000LL /*to microseconds*/ * 10LL /* to 100 nanoseconds */ );
SetWaitableTimer( timerHandle, &timeout, 0, 0, 0, FALSE );
}
void Poll( uint64_t p_milliseconds ) // only use on a reserved thread!!!
{
/* MSDN notes about SetWaitableTimer: 100 nanosecond resolution, Negative values indicate relative time*/
LARGE_INTEGER timeout = { 0 };
timeout.QuadPart = -( (int64_t)p_milliseconds * 1000LL /*to microseconds*/ * 10LL /* to 100 nanoseconds */ );
SetWaitableTimer( timerHandle, &timeout, (LONG)p_milliseconds, 0, 0, FALSE );
}
/* TODO: WaitUntil method for absolute times */
void Cancel() { CancelWaitableTimer( timerHandle ); }
operator HANDLE() { return timerHandle; }
private:
HANDLE timerHandle;
};
#endif // !NU_THREADPOOL_TIMERHANDLE_H

View file

@ -0,0 +1,92 @@
#pragma once
#include <windows.h>
#include <bfc/platform/types.h>
#include <bfc/dispatch.h>
class ThreadID;
class api_threadpool : public Dispatchable
{
protected:
api_threadpool() {}
~api_threadpool() {}
public:
typedef int (*ThreadPoolFunc)(HANDLE handle, void *user_data, intptr_t id);
enum
{
// pass this flag to AddHandle or RunFunction indicate that your thread function
// might run for a long time
FLAG_LONG_EXECUTION = 0x1,
FLAG_REQUIRE_COM_STA = 0x2,
FLAG_REQUIRE_COM_MT = 0x4,
MASK_COM_FLAGS = 0x6,
};
public:
ThreadID *ReserveThread(int flags);
/* Release a thread you've previously reserved */
void ReleaseThread(ThreadID *thread_id);
/* adds a waitable handle to the thread pool. when the event is signalled, your function ptr will get called
user_data and id values get passed to your function.
your function should return 1 to indicate that it can be removed
flags, see api_threadpool */
int AddHandle(ThreadID *threadid, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags);
void RemoveHandle(ThreadID *threadid, HANDLE handle);
int RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags);
size_t GetNumberOfThreads(); // total number of threads in the threadpool
size_t GetNumberOfActiveThreads(); // number of threads that are currently being used (inside user function but not necessarily busy)
enum
{
RESERVETHREAD = 0,
RELEASETHREAD = 1,
ADDHANDLE = 2,
REMOVEHANDLE = 3,
RUNFUNCTION = 4,
GETNUMBEROFTHREADS = 5,
GETNUMBEROFACTIVETHREADS = 6,
};
};
inline ThreadID *api_threadpool::ReserveThread(int flags)
{
return _call(RESERVETHREAD, (ThreadID *)0, flags);
}
inline void api_threadpool::ReleaseThread(ThreadID *thread_id)
{
_voidcall(RELEASETHREAD, thread_id);
}
inline int api_threadpool::AddHandle(ThreadID *threadid, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags)
{
return _call(ADDHANDLE, (int)1, threadid, handle, func, user_data, id, flags);
}
inline void api_threadpool::RemoveHandle(ThreadID *threadid, HANDLE handle)
{
_voidcall(REMOVEHANDLE, threadid, handle);
}
inline int api_threadpool::RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags)
{
return _call(RUNFUNCTION, (int)1, threadid, func, user_data, id, flags);
}
inline size_t api_threadpool::GetNumberOfThreads()
{
return _call(GETNUMBEROFTHREADS, (size_t)0);
}
inline size_t api_threadpool::GetNumberOfActiveThreads()
{
return _call(GETNUMBEROFACTIVETHREADS, (size_t)0);
}
// {4DE015D3-11D8-4ac6-A3E6-216DF5252107}
static const GUID ThreadPoolGUID =
{ 0x4de015d3, 0x11d8, 0x4ac6, { 0xa3, 0xe6, 0x21, 0x6d, 0xf5, 0x25, 0x21, 0x7 } };

View file

@ -0,0 +1,23 @@
Microsoft Visual Studio Solution File, Format Version 8.00
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "threadpool", "threadpool.vcproj", "{12CC5DA2-87FF-456A-AF20-A243F168EFE8}"
ProjectSection(ProjectDependencies) = postProject
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfiguration) = preSolution
Debug = Debug
Release = Release
EndGlobalSection
GlobalSection(ProjectDependencies) = postSolution
EndGlobalSection
GlobalSection(ProjectConfiguration) = postSolution
{12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Debug.ActiveCfg = Debug|Win32
{12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Debug.Build.0 = Debug|Win32
{12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Release.ActiveCfg = Release|Win32
{12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Release.Build.0 = Release|Win32
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
EndGlobalSection
GlobalSection(ExtensibilityAddIns) = postSolution
EndGlobalSection
EndGlobal

View file

@ -0,0 +1,137 @@
<?xml version="1.0" encoding="Windows-1252"?>
<VisualStudioProject
ProjectType="Visual C++"
Version="7.10"
Name="threadpool"
ProjectGUID="{12CC5DA2-87FF-456A-AF20-A243F168EFE8}"
Keyword="Win32Proj">
<Platforms>
<Platform
Name="Win32"/>
</Platforms>
<Configurations>
<Configuration
Name="Debug|Win32"
OutputDirectory="Debug"
IntermediateDirectory="Debug"
ConfigurationType="4"
CharacterSet="2">
<Tool
Name="VCCLCompilerTool"
Optimization="0"
AdditionalIncludeDirectories="../../Wasabi"
PreprocessorDefinitions="WIN32;_DEBUG;_LIB;_WIN32_WINNT=0x400"
MinimalRebuild="TRUE"
BasicRuntimeChecks="3"
RuntimeLibrary="5"
UsePrecompiledHeader="0"
WarningLevel="3"
Detect64BitPortabilityProblems="TRUE"
DebugInformationFormat="4"/>
<Tool
Name="VCCustomBuildTool"/>
<Tool
Name="VCLibrarianTool"
OutputFile="$(OutDir)/threadpool.lib"/>
<Tool
Name="VCMIDLTool"/>
<Tool
Name="VCPostBuildEventTool"/>
<Tool
Name="VCPreBuildEventTool"/>
<Tool
Name="VCPreLinkEventTool"/>
<Tool
Name="VCResourceCompilerTool"/>
<Tool
Name="VCWebServiceProxyGeneratorTool"/>
<Tool
Name="VCXMLDataGeneratorTool"/>
<Tool
Name="VCManagedWrapperGeneratorTool"/>
<Tool
Name="VCAuxiliaryManagedWrapperGeneratorTool"/>
</Configuration>
<Configuration
Name="Release|Win32"
OutputDirectory="Release"
IntermediateDirectory="Release"
ConfigurationType="4"
CharacterSet="2">
<Tool
Name="VCCLCompilerTool"
AdditionalIncludeDirectories="../../Wasabi"
PreprocessorDefinitions="WIN32;NDEBUG;_LIB;_WIN32_WINNT=0x400"
RuntimeLibrary="4"
UsePrecompiledHeader="0"
WarningLevel="3"
Detect64BitPortabilityProblems="TRUE"
DebugInformationFormat="3"/>
<Tool
Name="VCCustomBuildTool"/>
<Tool
Name="VCLibrarianTool"
OutputFile="$(OutDir)/threadpool.lib"/>
<Tool
Name="VCMIDLTool"/>
<Tool
Name="VCPostBuildEventTool"/>
<Tool
Name="VCPreBuildEventTool"/>
<Tool
Name="VCPreLinkEventTool"/>
<Tool
Name="VCResourceCompilerTool"/>
<Tool
Name="VCWebServiceProxyGeneratorTool"/>
<Tool
Name="VCXMLDataGeneratorTool"/>
<Tool
Name="VCManagedWrapperGeneratorTool"/>
<Tool
Name="VCAuxiliaryManagedWrapperGeneratorTool"/>
</Configuration>
</Configurations>
<References>
</References>
<Files>
<Filter
Name="Source Files"
Filter="cpp;c;cxx;def;odl;idl;hpj;bat;asm;asmx"
UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}">
<File
RelativePath=".\ThreadFunctions.cpp">
</File>
<File
RelativePath=".\ThreadID.cpp">
</File>
<File
RelativePath=".\ThreadPool.cpp">
</File>
</Filter>
<Filter
Name="Header Files"
Filter="h;hpp;hxx;hm;inl;inc;xsd"
UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}">
<File
RelativePath=".\api_threadpool.h">
</File>
<File
RelativePath=".\ThreadFunctions.h">
</File>
<File
RelativePath=".\ThreadID.h">
</File>
<File
RelativePath=".\ThreadPool.h">
</File>
</Filter>
<Filter
Name="Resource Files"
Filter="rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx"
UniqueIdentifier="{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}">
</Filter>
</Files>
<Globals>
</Globals>
</VisualStudioProject>

View file

@ -0,0 +1,8 @@
#pragma once
#include <deque>
#include <windows.h>
namespace ThreadPoolTypes
{
typedef std::deque<HANDLE> HandleList;
const int MAX_SEMAPHORE_VALUE = 1024; //some arbitrarily high amount*
}