winamp/Src/nu/threadpool/ThreadPool.cpp

366 lines
9.0 KiB
C++

#include "ThreadPool.h"
ThreadPool::ThreadPool()
{
for ( int i = 0; i < THREAD_TYPES; i++ )
{
num_threads_available[ i ] = 0;
max_load_event[ i ] = CreateEvent( NULL, TRUE, FALSE, NULL );
}
killswitch = CreateEvent( NULL, TRUE, FALSE, NULL );
// one thread of each type to start
for ( int i = 0; i < 2; i++ )
CreateNewThread_Internal( i );
watchdog_thread_handle = CreateThread( 0, 0, WatchDogThreadProcedure_stub, this, 0, 0 );
}
void ThreadPool::Kill()
{
SetEvent( killswitch );
WaitForSingleObject( watchdog_thread_handle, INFINITE );
CloseHandle( watchdog_thread_handle );
for ( ThreadID *l_thread : threads )
{
l_thread->Kill();
delete l_thread;
}
CloseHandle( killswitch );
for ( int i = 0; i < THREAD_TYPES; i++ )
CloseHandle( max_load_event[ i ] );
}
DWORD ThreadPool::WatchDogThreadProcedure_stub( LPVOID param )
{
ThreadPool *_this = (ThreadPool *)param;
return _this->WatchDogThreadProcedure();
}
/*
watchdog will get woken up when number of available threads hits zero
it creates a new thread, sleeps for a bit to let things "settle" and then reset the event
it will need a copy of all "any-thread" handles to build the new thread, and will need to manage in a thread safe way
(so a new thread doesn't "miss" a handle that is added in the interim)
*/
DWORD CALLBACK ThreadPool::WatchDogThreadProcedure()
{
// we ignore the max load event for reserved threads
HANDLE events[ 3 ] = { killswitch, max_load_event[ TYPE_MT ], max_load_event[ TYPE_STA ] };
while ( 1 )
{
DWORD ret = WaitForMultipleObjects( 3, events, FALSE, INFINITE );
if ( ret == WAIT_OBJECT_0 )
{
break;
}
else if ( ret == WAIT_OBJECT_0 + 1 || ret == WAIT_OBJECT_0 + 2 )
{
int thread_type = ret - ( WAIT_OBJECT_0 + 1 );
// this signal is for "full thread load reached"
// lets make sure we're actually at max capacity
Sleep( 10 ); // sleep a bit
if ( num_threads_available[ thread_type ] != 0 ) // see if we're still fully-loaded
continue;
guard.Lock();
CreateNewThread_Internal( thread_type );
guard.Unlock();
Sleep( 250 ); // give the system time to 'settle down' so we don't spawn a ton of threads in a row
ResetEvent( max_load_event[ thread_type ] );
}
}
return 0;
}
ThreadID *ThreadPool::ReserveThread( int flags )
{
// first, check to see if there's any released threads we can grab
Nullsoft::Utility::AutoLock threadlock( guard );
for ( ThreadID *t : threads )
{
if ( t->IsReserved() && t->IsReleased() && t->CanRunCOM( flags ) )
{
t->Reserve();
return t;
}
}
// TODO: if there are enough free threads available, mark one as reserved
// this will involve signalling the thread to switch to 'reserved' mode
// swapping out the 'function list' semaphore with a local one
// and removing all 'busy handles' from the queue
// can probably use the 'wake' handle to synchronize this
/*
int thread_type = GetThreadType(flags);
if (num_threads_available[thread_type > 2])
{
for (size_t i=0;i!=threads.size();i++)
{
if (threads[i]->IsReserved() == false && threads[i]->CanRunCOM(flags))
{
}
}
}
*/
ThreadID *new_thread = CreateNewThread_Internal( GetThreadType( flags, 1 ) );
return new_thread;
}
void ThreadPool::ReleaseThread( ThreadID *thread_id )
{
if ( thread_id )
{
// lock so there's no race condition between ReserveThread() and ReleaseThread()
Nullsoft::Utility::AutoLock threadlock( guard );
thread_id->Release();
}
}
int ThreadPool::AddHandle( ThreadID *thread_id, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags )
{
// TODO: need to ensure that handles are not duplicated
thread_functions.Add( handle, func, user_data, id );
if ( thread_id )
{
if ( thread_id->CanRunCOM( flags ) )
thread_id->AddHandle( handle );
else
return 1;
return 0;
}
else
{
/* increment thread counts temporarily - because the massive wake-up
causes thread counts to go to 0 */
for ( int i = 0; i < THREAD_TYPES; i++ )
InterlockedIncrement( &num_threads_available[ i ] );
guard.Lock();
AddHandle_Internal( 0, handle, flags );
bool thread_types[ THREAD_TYPES ];
GetThreadTypes( flags, thread_types );
for ( int i = 0; i < THREAD_TYPES; i++ )
{
if ( thread_types[ i ] )
any_thread_handles[ i ].push_back( handle );
}
guard.Unlock();
for ( int i = 0; i < THREAD_TYPES; i++ )
InterlockedDecrement( &num_threads_available[ i ] );
}
return 0;
}
/* helper functions for adding/removing handles,
we keep going through the list as long as we can add/remove immediately.
once we have to block, we recurse the function starting at the next handle
when the function returns, we wait.
this lets us do some work rather than sit and wait for each thread's lock */
void ThreadPool::RemoveHandle_Internal(size_t start, HANDLE handle)
{
for (;start!=threads.size();start++)
{
ThreadID *t = threads[start];
if (!t->TryRemoveHandle(handle)) // try to remove
{
// have to wait
RemoveHandle_Internal(start+1, handle); // recurse start with the next thread
t->WaitRemoveHandle(handle); // finish the job
return;
}
}
}
void ThreadPool::AddHandle_Internal(size_t start, HANDLE handle, int flags)
{
for (;start<threads.size();start++)
{
ThreadID *t = threads[start];
if ((flags & api_threadpool::FLAG_LONG_EXECUTION) && t->IsReserved())
continue;
if (!t->CanRunCOM(flags))
continue;
if (!t->TryAddHandle(handle)) // try to add
{
// have to wait,
AddHandle_Internal(start+1, handle, flags); // recurse start with the next thread
t->WaitAddHandle(handle); // finish the job
return;
}
}
}
void ThreadPool::RemoveHandle(ThreadID *thread_id, HANDLE handle)
{
if (thread_id)
{
thread_id->RemoveHandle(handle);
}
else
{
/* increment thread counts temporarily - because the massive wake-up
causes thread counts to go to 0 */
for (int i=0;i<THREAD_TYPES;i++)
InterlockedIncrement(&num_threads_available[i]);
guard.Lock();
RemoveHandle_Internal(0, handle);
for (int j=0;j<THREAD_TYPES;j++)
{
//for (ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin();
// itr != any_thread_handles[j].end();
// itr++)
ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin();
while(itr != any_thread_handles[j].end())
{
if (*itr == handle)
{
itr = any_thread_handles[j].erase(itr);
}
else
{
itr++;
}
}
}
guard.Unlock();
for (int i=0;i<THREAD_TYPES;i++)
InterlockedDecrement(&num_threads_available[i]);
}
}
int ThreadPool::RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags)
{
if (threadid)
threadid->QueueFunction(func, user_data, id);
else
thread_functions.QueueFunction(func, user_data, id);
return 0;
}
ThreadID *ThreadPool::CreateNewThread_Internal(int thread_type)
{
int reserved=0;
int com_type = api_threadpool::FLAG_REQUIRE_COM_MT; // default
switch(thread_type)
{
case TYPE_STA_RESERVED:
reserved=1;
case TYPE_STA:
com_type = api_threadpool::FLAG_REQUIRE_COM_STA;
break;
case TYPE_MT_RESERVED:
reserved=1;
case TYPE_MT:
com_type = api_threadpool::FLAG_REQUIRE_COM_MT;
break;
}
Nullsoft::Utility::AutoLock threadlock(guard); // lock here (rather than after new ThreadID) to protect any_thread_handles
ThreadID *new_thread = new ThreadID(&thread_functions, killswitch, thread_functions.functions_semaphore,
any_thread_handles[thread_type],
&num_threads_available[thread_type], max_load_event[thread_type],
reserved, com_type);
threads.push_back(new_thread);
return new_thread;
}
size_t ThreadPool::GetNumberOfThreads()
{
Nullsoft::Utility::AutoLock threadlock(guard);
return threads.size();
}
size_t ThreadPool::GetNumberOfActiveThreads()
{
size_t numThreads = GetNumberOfThreads();
for (int i=0;i<THREAD_TYPES;i++)
numThreads -= num_threads_available[i];
return numThreads;
}
int ThreadPool::GetThreadType(int flags, int reserved)
{
flags &= api_threadpool::MASK_COM_FLAGS;
int thread_type=TYPE_MT;
switch(flags)
{
case api_threadpool::FLAG_REQUIRE_COM_STA:
thread_type = reserved?TYPE_STA_RESERVED:TYPE_STA;
break;
case 0: // default
case api_threadpool::FLAG_REQUIRE_COM_MT:
thread_type = reserved?TYPE_MT_RESERVED:TYPE_MT;
break;
}
return thread_type;
}
void ThreadPool::GetThreadTypes(int flags, bool types[THREAD_TYPES])
{
for (int i=0;i<THREAD_TYPES;i++)
{
types[i]=true;
}
if (flags & api_threadpool::FLAG_REQUIRE_COM_STA)
{
types[TYPE_MT] = false;
types[TYPE_MT] = false;
}
if (flags & api_threadpool::FLAG_REQUIRE_COM_STA)
{
types[TYPE_STA] = false;
types[TYPE_STA_RESERVED] = false;
}
if (flags & api_threadpool::FLAG_LONG_EXECUTION)
{
types[TYPE_STA_RESERVED] = false;
types[TYPE_MT_RESERVED] = false;
}
}
#define CBCLASS ThreadPool
START_DISPATCH;
CB(RESERVETHREAD, ReserveThread)
VCB(RELEASETHREAD, ReleaseThread)
CB(ADDHANDLE, AddHandle)
VCB(REMOVEHANDLE, RemoveHandle)
CB(RUNFUNCTION, RunFunction)
CB(GETNUMBEROFTHREADS, GetNumberOfThreads)
CB(GETNUMBEROFACTIVETHREADS, GetNumberOfActiveThreads)
END_DISPATCH;
#undef CBCLASS