Team Fortress 2 Source Code as on 22/4/2020
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1457 lines
33 KiB

  1. //========= Copyright Valve Corporation, All rights reserved. ============//
  2. //
  3. // Purpose:
  4. //
  5. //=============================================================================
  6. #if defined( _WIN32 ) && !defined( _X360 )
  7. #define WIN32_LEAN_AND_MEAN
  8. #include <windows.h>
  9. #endif
  10. #include "tier0/dbg.h"
  11. #include "tier0/tslist.h"
  12. #include "tier0/icommandline.h"
  13. #include "vstdlib/jobthread.h"
  14. #include "vstdlib/random.h"
  15. #include "tier1/functors.h"
  16. #include "tier1/fmtstr.h"
  17. #include "tier1/utlvector.h"
  18. #include "tier1/generichash.h"
  19. #include "tier0/vprof.h"
  20. #if defined( _X360 )
  21. #include "xbox/xbox_win32stubs.h"
  22. #endif
  23. #include "tier0/memdbgon.h"
  24. class CJobThread;
  25. //-----------------------------------------------------------------------------
  26. inline void ServiceJobAndRelease( CJob *pJob, int iThread = -1 )
  27. {
  28. // TryLock() would only fail if another thread has entered
  29. // Execute() or Abort()
  30. if ( !pJob->IsFinished() && pJob->TryLock() )
  31. {
  32. // ...service the request
  33. pJob->SetServiceThread( iThread );
  34. pJob->Execute();
  35. pJob->Unlock();
  36. }
  37. pJob->Release();
  38. }
  39. //-----------------------------------------------------------------------------
  40. class ALIGN16 CJobQueue
  41. {
  42. public:
  43. CJobQueue() :
  44. m_nItems( 0 ),
  45. m_nMaxItems( INT_MAX )
  46. {
  47. for ( int i = 0; i < ARRAYSIZE( m_pQueues ); i++ )
  48. {
  49. m_pQueues[i] = new CTSQueue<CJob *>;
  50. }
  51. }
  52. ~CJobQueue()
  53. {
  54. for ( int i = 0; i < ARRAYSIZE( m_pQueues ); i++ )
  55. {
  56. delete m_pQueues[i];
  57. }
  58. }
  59. int Count()
  60. {
  61. return m_nItems;
  62. }
  63. int Count( JobPriority_t priority )
  64. {
  65. return m_pQueues[priority]->Count();
  66. }
  67. CJob *PrePush()
  68. {
  69. if ( m_nItems >= m_nMaxItems )
  70. {
  71. CJob *pOverflowJob;
  72. if ( Pop( &pOverflowJob ) )
  73. {
  74. return pOverflowJob;
  75. }
  76. }
  77. return NULL;
  78. }
  79. int Push( CJob *pJob, int iThread = -1 )
  80. {
  81. pJob->AddRef();
  82. CJob *pOverflowJob;
  83. int nOverflow = 0;
  84. while ( ( pOverflowJob = PrePush() ) != NULL )
  85. {
  86. ServiceJobAndRelease( pJob );
  87. nOverflow++;
  88. }
  89. m_pQueues[pJob->GetPriority()]->PushItem( pJob );
  90. m_mutex.Lock();
  91. if ( ++m_nItems == 1 )
  92. {
  93. m_JobAvailableEvent.Set();
  94. }
  95. m_mutex.Unlock();
  96. return nOverflow;
  97. }
  98. bool Pop( CJob **ppJob )
  99. {
  100. m_mutex.Lock();
  101. if ( !m_nItems )
  102. {
  103. m_mutex.Unlock();
  104. *ppJob = NULL;
  105. return false;
  106. }
  107. if ( --m_nItems == 0 )
  108. {
  109. m_JobAvailableEvent.Reset();
  110. }
  111. m_mutex.Unlock();
  112. for ( int i = JP_HIGH; i >= 0; --i )
  113. {
  114. if ( m_pQueues[i]->PopItem( ppJob ) )
  115. {
  116. return true;
  117. }
  118. }
  119. AssertMsg( 0, "Expected at least one queue item" );
  120. *ppJob = NULL;
  121. return false;
  122. }
  123. CThreadEvent &GetEventHandle()
  124. {
  125. return m_JobAvailableEvent;
  126. }
  127. void Flush()
  128. {
  129. // Only safe to call when system is suspended
  130. m_mutex.Lock();
  131. m_nItems = 0;
  132. m_JobAvailableEvent.Reset();
  133. CJob *pJob;
  134. for ( int i = JP_HIGH; i >= 0; --i )
  135. {
  136. while ( m_pQueues[i]->PopItem( &pJob ) )
  137. {
  138. pJob->Abort();
  139. pJob->Release();
  140. }
  141. }
  142. m_mutex.Unlock();
  143. }
  144. private:
  145. CTSQueue<CJob *> *m_pQueues[JP_HIGH + 1];
  146. int m_nItems;
  147. int m_nMaxItems;
  148. CThreadMutex m_mutex;
  149. CThreadManualEvent m_JobAvailableEvent;
  150. } ALIGN16_POST;
  151. //-----------------------------------------------------------------------------
  152. //
  153. // CThreadPool
  154. //
  155. //-----------------------------------------------------------------------------
  156. class CThreadPool : public CRefCounted1<IThreadPool, CRefCountServiceMT>
  157. {
  158. public:
  159. CThreadPool();
  160. ~CThreadPool();
  161. //-----------------------------------------------------
  162. // Thread functions
  163. //-----------------------------------------------------
  164. bool Start( const ThreadPoolStartParams_t &startParams = ThreadPoolStartParams_t() ) { return Start( startParams, NULL ); }
  165. bool Start( const ThreadPoolStartParams_t &startParams, const char *pszNameOverride );
  166. bool Stop( int timeout = TT_INFINITE );
  167. void Distribute( bool bDistribute = true, int *pAffinityTable = NULL );
  168. //-----------------------------------------------------
  169. // Functions for any thread
  170. //-----------------------------------------------------
  171. unsigned GetJobCount() { return m_nJobs; }
  172. int NumThreads();
  173. int NumIdleThreads();
  174. //-----------------------------------------------------
  175. // Pause/resume processing jobs
  176. //-----------------------------------------------------
  177. int SuspendExecution();
  178. int ResumeExecution();
  179. //-----------------------------------------------------
  180. // Offer the current thread to the pool
  181. //-----------------------------------------------------
  182. virtual int YieldWait( CThreadEvent **pEvents, int nEvents, bool bWaitAll = true, unsigned timeout = TT_INFINITE );
  183. virtual int YieldWait( CJob **, int nJobs, bool bWaitAll = true, unsigned timeout = TT_INFINITE );
  184. void Yield( unsigned timeout );
  185. //-----------------------------------------------------
  186. // Add a native job to the queue (master thread)
  187. //-----------------------------------------------------
  188. void AddJob( CJob * );
  189. void InsertJobInQueue( CJob * );
  190. //-----------------------------------------------------
  191. // All threads execute pFunctor asap. Thread will either wake up
  192. // and execute or execute pFunctor right after completing current job and
  193. // before looking for another job.
  194. //-----------------------------------------------------
  195. void ExecuteHighPriorityFunctor( CFunctor *pFunctor );
  196. //-----------------------------------------------------
  197. // Add an function object to the queue (master thread)
  198. //-----------------------------------------------------
  199. void AddFunctorInternal( CFunctor *, CJob ** = NULL, const char *pszDescription = NULL, unsigned flags = 0 );
  200. //-----------------------------------------------------
  201. // Remove a job from the queue (master thread)
  202. //-----------------------------------------------------
  203. virtual void ChangePriority( CJob *p, JobPriority_t priority );
  204. //-----------------------------------------------------
  205. // Bulk job manipulation (blocking)
  206. //-----------------------------------------------------
  207. int ExecuteToPriority( JobPriority_t toPriority, JobFilter_t pfnFilter = NULL );
  208. int AbortAll();
  209. virtual void Reserved1() {}
  210. void WaitForIdle( bool bAll = true );
  211. private:
  212. enum
  213. {
  214. IO_STACKSIZE = ( 64 * 1024 ),
  215. COMPUTATION_STACKSIZE = 0,
  216. };
  217. //-----------------------------------------------------
  218. //
  219. //-----------------------------------------------------
  220. CJob *PeekJob();
  221. CJob *GetDummyJob();
  222. //-----------------------------------------------------
  223. // Thread functions
  224. //-----------------------------------------------------
  225. int Run();
  226. private:
  227. friend class CJobThread;
  228. CJobQueue m_SharedQueue;
  229. CInterlockedInt m_nIdleThreads;
  230. CUtlVector<CJobThread *> m_Threads;
  231. CUtlVector<CThreadEvent *> m_IdleEvents;
  232. CThreadMutex m_SuspendMutex;
  233. int m_nSuspend;
  234. CInterlockedInt m_nJobs;
  235. // Some jobs should only be executed on the threadpool thread(s). Ie: the rendering thread has the GL context
  236. // and the main thread coming in and "helping" with jobs breaks that pretty nicely. This flag states that
  237. // only the threadpool threads should execute these jobs.
  238. bool m_bExecOnThreadPoolThreadsOnly;
  239. };
  240. //-----------------------------------------------------------------------------
  241. JOB_INTERFACE IThreadPool *CreateThreadPool()
  242. {
  243. return new CThreadPool;
  244. }
  245. JOB_INTERFACE void DestroyThreadPool( IThreadPool *pPool )
  246. {
  247. delete pPool;
  248. }
  249. //-----------------------------------------------------------------------------
  250. class CGlobalThreadPool : public CThreadPool
  251. {
  252. public:
  253. virtual bool Start( const ThreadPoolStartParams_t &startParamsIn )
  254. {
  255. int nThreads = ( CommandLine()->ParmValue( "-threads", -1 ) - 1 );
  256. ThreadPoolStartParams_t startParams = startParamsIn;
  257. if ( nThreads >= 0 )
  258. {
  259. startParams.nThreads = nThreads;
  260. }
  261. else
  262. {
  263. // Cap the GlobPool threads at 4.
  264. startParams.nThreadsMax = 4;
  265. }
  266. return CThreadPool::Start( startParams, "Glob" );
  267. }
  268. virtual bool OnFinalRelease()
  269. {
  270. AssertMsg( 0, "Releasing global thread pool object!" );
  271. return false;
  272. }
  273. };
  274. //-----------------------------------------------------------------------------
  275. class CJobThread : public CWorkerThread
  276. {
  277. public:
  278. CJobThread( CThreadPool *pOwner, int iThread ) :
  279. m_SharedQueue( pOwner->m_SharedQueue ),
  280. m_pOwner( pOwner ),
  281. m_iThread( iThread )
  282. {
  283. }
  284. CThreadEvent &GetIdleEvent()
  285. {
  286. return m_IdleEvent;
  287. }
  288. CJobQueue &AccessDirectQueue()
  289. {
  290. return m_DirectQueue;
  291. }
  292. private:
  293. unsigned Wait()
  294. {
  295. unsigned waitResult;
  296. tmZone( TELEMETRY_LEVEL0, TMZF_IDLE, "%s", __FUNCTION__ );
  297. #ifdef WIN32
  298. enum Event_t
  299. {
  300. CALL_FROM_MASTER,
  301. SHARED_QUEUE,
  302. DIRECT_QUEUE,
  303. NUM_EVENTS
  304. };
  305. HANDLE waitHandles[NUM_EVENTS];
  306. waitHandles[CALL_FROM_MASTER] = GetCallHandle().GetHandle();
  307. waitHandles[SHARED_QUEUE] = m_SharedQueue.GetEventHandle().GetHandle();
  308. waitHandles[DIRECT_QUEUE] = m_DirectQueue.GetEventHandle().GetHandle();
  309. #ifdef _DEBUG
  310. while ( ( waitResult = WaitForMultipleObjects( ARRAYSIZE(waitHandles), waitHandles, FALSE, 10 ) ) == WAIT_TIMEOUT )
  311. {
  312. waitResult = waitResult; // break here
  313. }
  314. #else
  315. waitResult = WaitForMultipleObjects( ARRAYSIZE(waitHandles), waitHandles, FALSE, INFINITE );
  316. #endif
  317. #else // !win32
  318. bool bSet = false;
  319. int nWaitTime = 100;
  320. while( !bSet )
  321. {
  322. // Jobs are typically enqueued to the shared job queue so wait on it first.
  323. bSet = m_SharedQueue.GetEventHandle().Wait( nWaitTime );
  324. if( !bSet )
  325. bSet = m_DirectQueue.GetEventHandle().Wait( 10 );
  326. if ( !bSet )
  327. bSet = GetCallHandle().Wait( 0 );
  328. }
  329. if ( !bSet )
  330. waitResult = WAIT_TIMEOUT;
  331. else
  332. waitResult = WAIT_OBJECT_0;
  333. #endif
  334. return waitResult;
  335. }
  336. int Run()
  337. {
  338. // Wait for either a call from the master thread, or an item in the queue...
  339. unsigned waitResult;
  340. bool bExit = false;
  341. tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s", __FUNCTION__ );
  342. m_pOwner->m_nIdleThreads++;
  343. m_IdleEvent.Set();
  344. while (!bExit && ( ( waitResult = Wait() ) != WAIT_FAILED ) )
  345. {
  346. if ( PeekCall() )
  347. {
  348. CFunctor *pFunctor = NULL;
  349. tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s PeekCall():%d", __FUNCTION__, GetCallParam() );
  350. switch ( GetCallParam( &pFunctor ) )
  351. {
  352. case TPM_EXIT:
  353. Reply( true );
  354. bExit = TRUE;
  355. break;
  356. case TPM_SUSPEND:
  357. Reply( true );
  358. SuspendCooperative();
  359. break;
  360. case TPM_RUNFUNCTOR:
  361. if( pFunctor )
  362. {
  363. ( *pFunctor )();
  364. Reply( true );
  365. }
  366. else
  367. {
  368. Assert( pFunctor );
  369. Reply( false );
  370. }
  371. break;
  372. default:
  373. AssertMsg( 0, "Unknown call to thread" );
  374. Reply( false );
  375. break;
  376. }
  377. }
  378. else
  379. {
  380. tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s !PeekCall()", __FUNCTION__ );
  381. CJob *pJob;
  382. bool bTookJob = false;
  383. do
  384. {
  385. if ( !m_DirectQueue.Pop( &pJob) )
  386. {
  387. if ( !m_SharedQueue.Pop( &pJob ) )
  388. {
  389. // Nothing to process, return to wait state
  390. break;
  391. }
  392. }
  393. if ( !bTookJob )
  394. {
  395. m_IdleEvent.Reset();
  396. m_pOwner->m_nIdleThreads--;
  397. bTookJob = true;
  398. }
  399. ServiceJobAndRelease( pJob, m_iThread );
  400. m_pOwner->m_nJobs--;
  401. } while ( !PeekCall() );
  402. if ( bTookJob )
  403. {
  404. m_pOwner->m_nIdleThreads++;
  405. m_IdleEvent.Set();
  406. }
  407. }
  408. }
  409. m_pOwner->m_nIdleThreads--;
  410. m_IdleEvent.Reset();
  411. return 0;
  412. }
  413. CJobQueue m_DirectQueue;
  414. CJobQueue & m_SharedQueue;
  415. CThreadPool * m_pOwner;
  416. CThreadManualEvent m_IdleEvent;
  417. int m_iThread;
  418. };
  419. //-----------------------------------------------------------------------------
  420. CGlobalThreadPool g_ThreadPool;
  421. IThreadPool *g_pThreadPool = &g_ThreadPool;
  422. //-----------------------------------------------------------------------------
  423. //
  424. // CThreadPool
  425. //
  426. //-----------------------------------------------------------------------------
  427. CThreadPool::CThreadPool() :
  428. m_nIdleThreads( 0 ),
  429. m_nJobs( 0 ),
  430. m_nSuspend( 0 )
  431. {
  432. }
  433. //---------------------------------------------------------
  434. CThreadPool::~CThreadPool()
  435. {
  436. Stop();
  437. }
  438. //---------------------------------------------------------
  439. //
  440. //---------------------------------------------------------
  441. int CThreadPool::NumThreads()
  442. {
  443. return m_Threads.Count();
  444. }
  445. //---------------------------------------------------------
  446. //
  447. //---------------------------------------------------------
  448. int CThreadPool::NumIdleThreads()
  449. {
  450. return m_nIdleThreads;
  451. }
  452. void CThreadPool::ExecuteHighPriorityFunctor( CFunctor *pFunctor )
  453. {
  454. int i;
  455. for ( i = 0; i < m_Threads.Count(); i++ )
  456. {
  457. m_Threads[i]->CallWorker( TPM_RUNFUNCTOR, 0, false, pFunctor );
  458. }
  459. for ( i = 0; i < m_Threads.Count(); i++ )
  460. {
  461. m_Threads[i]->WaitForReply();
  462. }
  463. }
  464. //---------------------------------------------------------
  465. // Pause/resume processing jobs
  466. //---------------------------------------------------------
  467. int CThreadPool::SuspendExecution()
  468. {
  469. AUTO_LOCK( m_SuspendMutex );
  470. // If not already suspended
  471. if ( m_nSuspend == 0 )
  472. {
  473. // Make sure state is correct
  474. int i;
  475. for ( i = 0; i < m_Threads.Count(); i++ )
  476. {
  477. m_Threads[i]->CallWorker( TPM_SUSPEND, 0 );
  478. }
  479. for ( i = 0; i < m_Threads.Count(); i++ )
  480. {
  481. m_Threads[i]->WaitForReply();
  482. }
  483. // Because worker must signal before suspending, we could reach
  484. // here with the thread not actually suspended
  485. for ( i = 0; i < m_Threads.Count(); i++ )
  486. {
  487. m_Threads[i]->BWaitForThreadSuspendCooperative();
  488. }
  489. }
  490. return m_nSuspend++;
  491. }
  492. //---------------------------------------------------------
  493. int CThreadPool::ResumeExecution()
  494. {
  495. AUTO_LOCK( m_SuspendMutex );
  496. AssertMsg( m_nSuspend >= 1, "Attempted resume when not suspended");
  497. int result = m_nSuspend--;
  498. if (m_nSuspend == 0 )
  499. {
  500. for ( int i = 0; i < m_Threads.Count(); i++ )
  501. {
  502. m_Threads[i]->ResumeCooperative();
  503. }
  504. }
  505. return result;
  506. }
  507. //---------------------------------------------------------
  508. void CThreadPool::WaitForIdle( bool bAll )
  509. {
  510. ThreadWaitForEvents( m_IdleEvents.Count(), m_IdleEvents.Base(), bAll, 60000 );
  511. }
  512. //---------------------------------------------------------
  513. int CThreadPool::YieldWait( CThreadEvent **pEvents, int nEvents, bool bWaitAll, unsigned timeout )
  514. {
  515. tmZone( TELEMETRY_LEVEL0, TMZF_IDLE, "%s(%d) SPINNING %t", __FUNCTION__, timeout, tmSendCallStack( TELEMETRY_LEVEL0, 0 ) );
  516. Assert( timeout == TT_INFINITE ); // unimplemented
  517. int result;
  518. CJob *pJob;
  519. // Always wait for zero milliseconds initially, to let us process jobs on this thread.
  520. timeout = 0;
  521. while ( ( result = ThreadWaitForEvents( nEvents, pEvents, bWaitAll, timeout ) ) == WAIT_TIMEOUT )
  522. {
  523. if ( !m_bExecOnThreadPoolThreadsOnly && m_SharedQueue.Pop( &pJob ) )
  524. {
  525. ServiceJobAndRelease( pJob );
  526. m_nJobs--;
  527. }
  528. else
  529. {
  530. // Since there are no jobs for the main thread set the timeout to infinite.
  531. // The only disadvantage to this is that if a job thread creates a new job
  532. // then the main thread will not be available to pick it up, but if that
  533. // is a problem you can just create more worker threads. Debugging test runs
  534. // of TF2 suggests that jobs are only ever added from the main thread which
  535. // means that there is no disadvantage.
  536. // Waiting on the events instead of busy spinning has multiple advantages.
  537. // It avoids wasting CPU time/electricity, it makes it more obvious in profiles
  538. // when the main thread is idle versus busy, and it allows ready thread analysis
  539. // in xperf to find out what woke up a waiting thread.
  540. // It also avoids unnecessary CPU starvation -- seen on customer traces of TF2.
  541. timeout = TT_INFINITE;
  542. }
  543. }
  544. return result;
  545. }
  546. //---------------------------------------------------------
  547. int CThreadPool::YieldWait( CJob **ppJobs, int nJobs, bool bWaitAll, unsigned timeout )
  548. {
  549. CUtlVectorFixed<CThreadEvent *, 64> handles;
  550. if ( nJobs > handles.NumAllocated() - 2 )
  551. {
  552. return TW_FAILED;
  553. }
  554. for ( int i = 0; i < nJobs; i++ )
  555. {
  556. handles.AddToTail( ppJobs[i]->AccessEvent() );
  557. }
  558. return YieldWait( handles.Base(), handles.Count(), bWaitAll, timeout);
  559. }
  560. //---------------------------------------------------------
  561. void CThreadPool::Yield( unsigned timeout )
  562. {
  563. // @MULTICORE (toml 10/24/2006): not implemented
  564. Assert( ThreadInMainThread() );
  565. if ( !ThreadInMainThread() )
  566. {
  567. ThreadSleep( timeout );
  568. return;
  569. }
  570. ThreadSleep( timeout );
  571. }
  572. //---------------------------------------------------------
  573. // Add a job to the queue
  574. //---------------------------------------------------------
  575. void CThreadPool::AddJob( CJob *pJob )
  576. {
  577. if ( !pJob )
  578. {
  579. return;
  580. }
  581. if ( pJob->m_ThreadPoolData != JOB_NO_DATA )
  582. {
  583. Warning( "Cannot add a thread job already committed to another thread pool\n" );
  584. return;
  585. }
  586. if ( m_Threads.Count() == 0 )
  587. {
  588. // So only threadpool jobs are supposed to execute the jobs, but there are no threadpool threads?
  589. Assert( !m_bExecOnThreadPoolThreadsOnly );
  590. pJob->Execute();
  591. return;
  592. }
  593. int flags = pJob->GetFlags();
  594. if ( !m_bExecOnThreadPoolThreadsOnly && ( ( flags & ( JF_IO | JF_QUEUE ) ) == 0 ) /* @TBD && !m_queue.Count() */ )
  595. {
  596. if ( !NumIdleThreads() )
  597. {
  598. pJob->Execute();
  599. return;
  600. }
  601. pJob->SetPriority( JP_HIGH );
  602. }
  603. if ( !pJob->CanExecute() )
  604. {
  605. // Already handled
  606. ExecuteOnce( Warning( "Attempted to add job to job queue that has already been completed\n" ) );
  607. return;
  608. }
  609. pJob->m_pThreadPool = this;
  610. pJob->m_status = JOB_STATUS_PENDING;
  611. InsertJobInQueue( pJob );
  612. ++m_nJobs;
  613. }
  614. //---------------------------------------------------------
  615. //
  616. //---------------------------------------------------------
  617. void CThreadPool::InsertJobInQueue( CJob *pJob )
  618. {
  619. CJobQueue *pQueue;
  620. if ( !( pJob->GetFlags() & JF_SERIAL ) )
  621. {
  622. int iThread = pJob->GetServiceThread();
  623. if ( iThread == -1 || !m_Threads.IsValidIndex( iThread ) )
  624. {
  625. pQueue = &m_SharedQueue;
  626. }
  627. else
  628. {
  629. pQueue = &(m_Threads[iThread]->AccessDirectQueue());
  630. }
  631. }
  632. else
  633. {
  634. pQueue = &(m_Threads[0]->AccessDirectQueue());
  635. }
  636. m_nJobs -= pQueue->Push( pJob );
  637. }
  638. //---------------------------------------------------------
  639. // Add an function object to the queue (master thread)
  640. //---------------------------------------------------------
  641. void CThreadPool::AddFunctorInternal( CFunctor *pFunctor, CJob **ppJob, const char *pszDescription, unsigned flags )
  642. {
  643. // Note: assumes caller has handled refcount
  644. CJob *pJob = new CFunctorJob( pFunctor, pszDescription );
  645. pJob->SetFlags( flags );
  646. AddJob( pJob );
  647. if ( ppJob )
  648. {
  649. *ppJob = pJob;
  650. }
  651. else
  652. {
  653. pJob->Release();
  654. }
  655. }
  656. //---------------------------------------------------------
  657. // Remove a job from the queue
  658. //---------------------------------------------------------
  659. void CThreadPool::ChangePriority( CJob *pJob, JobPriority_t priority )
  660. {
  661. // Right now, only support upping the priority
  662. if ( pJob->GetPriority() < priority )
  663. {
  664. pJob->SetPriority( priority );
  665. m_SharedQueue.Push( pJob );
  666. }
  667. else
  668. {
  669. ExecuteOnce( if ( pJob->GetPriority() != priority ) DevMsg( "CThreadPool::RemoveJob not implemented right now" ) );
  670. }
  671. }
  672. //---------------------------------------------------------
  673. // Execute to a specified priority
  674. //---------------------------------------------------------
  675. int CThreadPool::ExecuteToPriority( JobPriority_t iToPriority, JobFilter_t pfnFilter )
  676. {
  677. SuspendExecution();
  678. CJob *pJob;
  679. int nExecuted = 0;
  680. int i;
  681. int nJobsTotal = GetJobCount();
  682. CUtlVector<CJob *> jobsToPutBack;
  683. for ( int iCurPriority = JP_HIGH; iCurPriority >= iToPriority; --iCurPriority )
  684. {
  685. for ( i = 0; i < m_Threads.Count(); i++ )
  686. {
  687. CJobQueue &queue = m_Threads[i]->AccessDirectQueue();
  688. while ( queue.Count( (JobPriority_t)iCurPriority ) )
  689. {
  690. queue.Pop( &pJob );
  691. if ( pfnFilter && !(*pfnFilter)( pJob ) )
  692. {
  693. if ( pJob->CanExecute() )
  694. {
  695. jobsToPutBack.EnsureCapacity( nJobsTotal );
  696. jobsToPutBack.AddToTail( pJob );
  697. }
  698. else
  699. {
  700. m_nJobs--;
  701. pJob->Release(); // an already serviced job in queue, may as well ditch it (as in, main thread probably force executed)
  702. }
  703. continue;
  704. }
  705. ServiceJobAndRelease( pJob );
  706. m_nJobs--;
  707. nExecuted++;
  708. }
  709. }
  710. while ( m_SharedQueue.Count( (JobPriority_t)iCurPriority ) )
  711. {
  712. m_SharedQueue.Pop( &pJob );
  713. if ( pfnFilter && !(*pfnFilter)( pJob ) )
  714. {
  715. if ( pJob->CanExecute() )
  716. {
  717. jobsToPutBack.EnsureCapacity( nJobsTotal );
  718. jobsToPutBack.AddToTail( pJob );
  719. }
  720. else
  721. {
  722. m_nJobs--;
  723. pJob->Release(); // see above
  724. }
  725. continue;
  726. }
  727. ServiceJobAndRelease( pJob );
  728. m_nJobs--;
  729. nExecuted++;
  730. }
  731. }
  732. for ( i = 0; i < jobsToPutBack.Count(); i++ )
  733. {
  734. InsertJobInQueue( jobsToPutBack[i] );
  735. jobsToPutBack[i]->Release();
  736. }
  737. ResumeExecution();
  738. return nExecuted;
  739. }
  740. //---------------------------------------------------------
  741. //
  742. //---------------------------------------------------------
  743. int CThreadPool::AbortAll()
  744. {
  745. SuspendExecution();
  746. CJob *pJob;
  747. int iAborted = 0;
  748. while ( m_SharedQueue.Pop( &pJob ) )
  749. {
  750. pJob->Abort();
  751. pJob->Release();
  752. iAborted++;
  753. }
  754. for ( int i = 0; i < m_Threads.Count(); i++ )
  755. {
  756. CJobQueue &queue = m_Threads[i]->AccessDirectQueue();
  757. while ( queue.Pop( &pJob ) )
  758. {
  759. pJob->Abort();
  760. pJob->Release();
  761. iAborted++;
  762. }
  763. }
  764. m_nJobs = 0;
  765. ResumeExecution();
  766. return iAborted;
  767. }
  768. //---------------------------------------------------------
  769. // CThreadPool thread functions
  770. //---------------------------------------------------------
  771. bool CThreadPool::Start( const ThreadPoolStartParams_t &startParams, const char *pszName )
  772. {
  773. int nThreads = startParams.nThreads;
  774. m_bExecOnThreadPoolThreadsOnly = startParams.bExecOnThreadPoolThreadsOnly;
  775. if ( nThreads < 0 )
  776. {
  777. const CPUInformation &ci = *GetCPUInformation();
  778. if ( startParams.bIOThreads )
  779. {
  780. nThreads = ci.m_nLogicalProcessors;
  781. }
  782. else
  783. {
  784. nThreads = ( ci.m_nLogicalProcessors / (( ci.m_bHT ) ? 2 : 1) ) - 1; // One per
  785. if ( IsPC() )
  786. {
  787. if ( nThreads > 3 )
  788. {
  789. DevMsg( "Defaulting to limit of 3 worker threads, use -threads on command line if want more\n" ); // Current >4 processor configs don't really work so well, probably due to cache issues? (toml 7/12/2007)
  790. nThreads = 3;
  791. }
  792. }
  793. }
  794. if ( ( startParams.nThreadsMax >= 0 ) && ( nThreads > startParams.nThreadsMax ) )
  795. {
  796. nThreads = startParams.nThreadsMax;
  797. }
  798. }
  799. if ( nThreads <= 0 )
  800. {
  801. return true;
  802. }
  803. int nStackSize = startParams.nStackSize;
  804. if ( nStackSize < 0 )
  805. {
  806. if ( startParams.bIOThreads )
  807. {
  808. nStackSize = IO_STACKSIZE;
  809. }
  810. else
  811. {
  812. nStackSize = COMPUTATION_STACKSIZE;
  813. }
  814. }
  815. int priority = startParams.iThreadPriority;
  816. if ( priority == SHRT_MIN )
  817. {
  818. if ( startParams.bIOThreads )
  819. {
  820. priority = THREAD_PRIORITY_HIGHEST;
  821. }
  822. else
  823. {
  824. priority = ThreadGetPriority();
  825. }
  826. }
  827. bool bDistribute;
  828. if ( startParams.fDistribute != TRS_NONE )
  829. {
  830. bDistribute = ( startParams.fDistribute == TRS_TRUE );
  831. }
  832. else
  833. {
  834. bDistribute = !startParams.bIOThreads;
  835. }
  836. //--------------------------------------------------------
  837. m_Threads.EnsureCapacity( nThreads );
  838. m_IdleEvents.EnsureCapacity( nThreads );
  839. if ( !pszName )
  840. {
  841. pszName = ( startParams.bIOThreads ) ? "IOJobX" : "CmpJobX";
  842. }
  843. while ( nThreads-- )
  844. {
  845. int iThread = m_Threads.AddToTail();
  846. m_IdleEvents.AddToTail();
  847. m_Threads[iThread] = new CJobThread( this, iThread );
  848. m_IdleEvents[iThread] = &m_Threads[iThread]->GetIdleEvent();
  849. m_Threads[iThread]->SetName( CFmtStr( "%s%d", pszName, iThread ) );
  850. m_Threads[iThread]->Start( nStackSize );
  851. m_Threads[iThread]->GetIdleEvent().Wait();
  852. #ifdef WIN32
  853. ThreadSetPriority( (ThreadHandle_t)m_Threads[iThread]->GetThreadHandle(), priority );
  854. #endif
  855. }
  856. Distribute( bDistribute, startParams.bUseAffinityTable ? (int *)startParams.iAffinityTable : NULL );
  857. return true;
  858. }
  859. //---------------------------------------------------------
  860. void CThreadPool::Distribute( bool bDistribute, int *pAffinityTable )
  861. {
  862. if ( bDistribute )
  863. {
  864. const CPUInformation &ci = *GetCPUInformation();
  865. int nHwThreadsPer = (( ci.m_bHT ) ? 2 : 1);
  866. if ( ci.m_nLogicalProcessors > 1 )
  867. {
  868. if ( !pAffinityTable )
  869. {
  870. #if defined( IS_WINDOWS_PC )
  871. // no affinity table, distribution is cycled across all available
  872. HINSTANCE hInst = LoadLibrary( "kernel32.dll" );
  873. if ( hInst )
  874. {
  875. typedef DWORD (WINAPI *SetThreadIdealProcessorFn)(ThreadHandle_t hThread, DWORD dwIdealProcessor);
  876. SetThreadIdealProcessorFn Thread_SetIdealProcessor = (SetThreadIdealProcessorFn)GetProcAddress( hInst, "SetThreadIdealProcessor" );
  877. if ( Thread_SetIdealProcessor )
  878. {
  879. ThreadHandle_t hMainThread = ThreadGetCurrentHandle();
  880. Thread_SetIdealProcessor( hMainThread, 0 );
  881. int iProc = 0;
  882. for ( int i = 0; i < m_Threads.Count(); i++ )
  883. {
  884. iProc += nHwThreadsPer;
  885. if ( iProc >= ci.m_nLogicalProcessors )
  886. {
  887. iProc %= ci.m_nLogicalProcessors;
  888. if ( nHwThreadsPer > 1 )
  889. {
  890. iProc = ( iProc + 1 ) % nHwThreadsPer;
  891. }
  892. }
  893. Thread_SetIdealProcessor((ThreadHandle_t)m_Threads[i]->GetThreadHandle(), iProc);
  894. }
  895. }
  896. FreeLibrary( hInst );
  897. }
  898. #else
  899. // no affinity table, distribution is cycled across all available
  900. int iProc = 0;
  901. for ( int i = 0; i < m_Threads.Count(); i++ )
  902. {
  903. iProc += nHwThreadsPer;
  904. if ( iProc >= ci.m_nLogicalProcessors )
  905. {
  906. iProc %= ci.m_nLogicalProcessors;
  907. if ( nHwThreadsPer > 1 )
  908. {
  909. iProc = ( iProc + 1 ) % nHwThreadsPer;
  910. }
  911. }
  912. #ifdef WIN32
  913. ThreadSetAffinity( (ThreadHandle_t)m_Threads[i]->GetThreadHandle(), 1 << iProc );
  914. #endif
  915. }
  916. #endif
  917. }
  918. else
  919. {
  920. // distribution is from affinity table
  921. for ( int i = 0; i < m_Threads.Count(); i++ )
  922. {
  923. #ifdef WIN32
  924. ThreadSetAffinity( (ThreadHandle_t)m_Threads[i]->GetThreadHandle(), pAffinityTable[i] );
  925. #endif
  926. }
  927. }
  928. }
  929. }
  930. else
  931. {
  932. #ifdef WIN32
  933. DWORD_PTR dwProcessAffinity, dwSystemAffinity;
  934. if ( GetProcessAffinityMask( GetCurrentProcess(), &dwProcessAffinity, &dwSystemAffinity ) )
  935. {
  936. for ( int i = 0; i < m_Threads.Count(); i++ )
  937. {
  938. ThreadSetAffinity( (ThreadHandle_t)m_Threads[i]->GetThreadHandle(), dwProcessAffinity );
  939. }
  940. }
  941. #endif
  942. }
  943. }
  944. //---------------------------------------------------------
  945. bool CThreadPool::Stop( int timeout )
  946. {
  947. for ( int i = 0; i < m_Threads.Count(); i++ )
  948. {
  949. m_Threads[i]->CallWorker( TPM_EXIT );
  950. }
  951. for ( int i = 0; i < m_Threads.Count(); ++i )
  952. {
  953. while( m_Threads[i]->IsAlive() )
  954. {
  955. ThreadSleep( 0 );
  956. }
  957. delete m_Threads[i];
  958. }
  959. m_nJobs = 0;
  960. m_SharedQueue.Flush();
  961. m_nIdleThreads = 0;
  962. m_Threads.RemoveAll();
  963. m_IdleEvents.RemoveAll();
  964. return true;
  965. }
  966. //---------------------------------------------------------
  967. CJob *CThreadPool::GetDummyJob()
  968. {
  969. class CDummyJob : public CJob
  970. {
  971. public:
  972. CDummyJob()
  973. {
  974. Execute();
  975. }
  976. virtual JobStatus_t DoExecute() { return JOB_OK; }
  977. };
  978. static CDummyJob dummyJob;
  979. dummyJob.AddRef();
  980. return &dummyJob;
  981. }
  982. //-----------------------------------------------------------------------------
  983. namespace ThreadPoolTest
  984. {
  985. int g_iSleep;
  986. CThreadEvent g_done;
  987. int g_nTotalToComplete;
  988. CThreadPool *g_pTestThreadPool;
  989. class CCountJob : public CJob
  990. {
  991. public:
  992. virtual JobStatus_t DoExecute()
  993. {
  994. m_nCount++;
  995. ThreadPause();
  996. if ( g_iSleep >= 0)
  997. ThreadSleep( g_iSleep );
  998. if ( bDoWork )
  999. {
  1000. byte pMemory[1024];
  1001. int i;
  1002. for ( i = 0; i < 1024; i++ )
  1003. {
  1004. pMemory[i] = rand();
  1005. }
  1006. for ( i = 0; i < 50; i++ )
  1007. {
  1008. sqrt( (float)HashBlock( pMemory, 1024 ) + HashBlock( pMemory, 1024 ) + 10.0 );
  1009. }
  1010. bDoWork = false;
  1011. }
  1012. if ( m_nCount == g_nTotalToComplete )
  1013. g_done.Set();
  1014. return 0;
  1015. }
  1016. static CInterlockedInt m_nCount;
  1017. bool bDoWork;
  1018. };
  1019. CInterlockedInt CCountJob::m_nCount;
  1020. int g_nTotalAtFinish;
  1021. void Test( bool bDistribute, bool bSleep = true, bool bFinishExecute = false, bool bDoWork = false )
  1022. {
  1023. for ( int bInterleavePushPop = 0; bInterleavePushPop < 2; bInterleavePushPop++ )
  1024. {
  1025. for ( g_iSleep = -10; g_iSleep <= 10; g_iSleep += 10 )
  1026. {
  1027. Msg( "ThreadPoolTest: Testing! Sleep %d, interleave %d \n", g_iSleep, bInterleavePushPop );
  1028. int nMaxThreads = ( IsX360() ) ? 6 : 8;
  1029. int nIncrement = ( IsX360() ) ? 1 : 2;
  1030. for ( int i = 1; i <= nMaxThreads; i += nIncrement )
  1031. {
  1032. CCountJob::m_nCount = 0;
  1033. g_nTotalAtFinish = 0;
  1034. ThreadPoolStartParams_t params;
  1035. params.nThreads = i;
  1036. params.fDistribute = ( bDistribute) ? TRS_TRUE : TRS_FALSE;
  1037. g_pTestThreadPool->Start( params, "Tst" );
  1038. if ( !bInterleavePushPop )
  1039. {
  1040. g_pTestThreadPool->SuspendExecution();
  1041. }
  1042. CCountJob jobs[4000];
  1043. g_nTotalToComplete = ARRAYSIZE(jobs);
  1044. CFastTimer timer, suspendTimer;
  1045. suspendTimer.Start();
  1046. timer.Start();
  1047. for ( int j = 0; j < ARRAYSIZE(jobs); j++ )
  1048. {
  1049. jobs[j].SetFlags( JF_QUEUE );
  1050. jobs[j].bDoWork = bDoWork;
  1051. g_pTestThreadPool->AddJob( &jobs[j] );
  1052. if ( bSleep && j % 16 == 0 )
  1053. {
  1054. ThreadSleep( 0 );
  1055. }
  1056. }
  1057. if ( !bInterleavePushPop )
  1058. {
  1059. g_pTestThreadPool->ResumeExecution();
  1060. }
  1061. if ( bFinishExecute && g_iSleep <= 1 )
  1062. {
  1063. g_done.Wait();
  1064. }
  1065. g_nTotalAtFinish = CCountJob::m_nCount;
  1066. timer.End();
  1067. g_pTestThreadPool->SuspendExecution();
  1068. suspendTimer.End();
  1069. g_pTestThreadPool->ResumeExecution();
  1070. g_pTestThreadPool->Stop();
  1071. g_done.Reset();
  1072. int counts[8] = { 0 };
  1073. for ( int j = 0; j < ARRAYSIZE(jobs); j++ )
  1074. {
  1075. if ( jobs[j].GetServiceThread() != -1 )
  1076. {
  1077. counts[jobs[j].GetServiceThread()]++;
  1078. jobs[j].ClearServiceThread();
  1079. }
  1080. }
  1081. Msg( "ThreadPoolTest: %d threads -- %d (%d) jobs processed in %fms, %fms to suspend (%f/%f) [%d, %d, %d, %d, %d, %d, %d, %d]\n",
  1082. i, g_nTotalAtFinish, (int)CCountJob::m_nCount, timer.GetDuration().GetMillisecondsF(), suspendTimer.GetDuration().GetMillisecondsF() - timer.GetDuration().GetMillisecondsF(),
  1083. timer.GetDuration().GetMillisecondsF() / (float)CCountJob::m_nCount, (suspendTimer.GetDuration().GetMillisecondsF())/(float)g_nTotalAtFinish,
  1084. counts[0], counts[1], counts[2], counts[3], counts[4], counts[5], counts[6], counts[7] );
  1085. }
  1086. }
  1087. }
  1088. }
  1089. bool g_bOutputError;
  1090. volatile int g_ReadyToExecute;
  1091. CInterlockedInt g_nReady;
  1092. class CExecuteTestJob : public CJob
  1093. {
  1094. public:
  1095. virtual JobStatus_t DoExecute()
  1096. {
  1097. byte pMemory[1024];
  1098. int i;
  1099. for ( i = 0; i < 1024; i++ )
  1100. {
  1101. pMemory[i] = rand();
  1102. }
  1103. for ( i = 0; i < 50; i++ )
  1104. {
  1105. sqrt( (float)HashBlock( pMemory, 1024 ) + HashBlock( pMemory, 1024 ) + 10.0 );
  1106. }
  1107. if ( AccessEvent()->Check() || IsFinished() )
  1108. {
  1109. if ( !g_bOutputError )
  1110. {
  1111. Msg( "Forced execute test failed!\n" );
  1112. DebuggerBreakIfDebugging();
  1113. }
  1114. }
  1115. return 0;
  1116. }
  1117. };
  1118. class CExecuteTestExecuteJob : public CJob
  1119. {
  1120. public:
  1121. virtual JobStatus_t DoExecute()
  1122. {
  1123. bool bAbort = ( RandomInt( 1, 10 ) == 1 );
  1124. g_nReady++;
  1125. while ( !g_ReadyToExecute )
  1126. {
  1127. ThreadPause();
  1128. }
  1129. if ( !bAbort )
  1130. m_pTestJob->Execute();
  1131. else
  1132. m_pTestJob->Abort();
  1133. g_nReady--;
  1134. return 0;
  1135. }
  1136. CExecuteTestJob *m_pTestJob;
  1137. };
  1138. void TestForcedExecute()
  1139. {
  1140. Msg( "TestForcedExecute\n" );
  1141. for ( int tests = 0; tests < 30; tests++ )
  1142. {
  1143. for ( int i = 1; i <= 5; i += 2 )
  1144. {
  1145. g_nReady = 0;
  1146. ThreadPoolStartParams_t params;
  1147. params.nThreads = i;
  1148. params.fDistribute = TRS_TRUE;
  1149. g_pTestThreadPool->Start( params, "Tst" );
  1150. static CExecuteTestJob jobs[4000];
  1151. for ( int j = 0; j < ARRAYSIZE(jobs); j++ )
  1152. {
  1153. g_ReadyToExecute = false;
  1154. for ( int k = 0; k < i; k++ )
  1155. {
  1156. CExecuteTestExecuteJob *pJob = new CExecuteTestExecuteJob;
  1157. pJob->SetFlags( JF_QUEUE );
  1158. pJob->m_pTestJob = &jobs[j];
  1159. g_pTestThreadPool->AddJob( pJob );
  1160. pJob->Release();
  1161. }
  1162. while ( g_nReady < i )
  1163. {
  1164. ThreadPause();
  1165. }
  1166. g_ReadyToExecute = true;
  1167. ThreadSleep();
  1168. jobs[j].Execute();
  1169. while ( g_nReady > 0 )
  1170. {
  1171. ThreadPause();
  1172. }
  1173. }
  1174. g_pTestThreadPool->Stop();
  1175. }
  1176. }
  1177. Msg( "TestForcedExecute DONE\n" );
  1178. }
  1179. } // namespace ThreadPoolTest
  1180. void RunThreadPoolTests()
  1181. {
  1182. CThreadPool pool;
  1183. ThreadPoolTest::g_pTestThreadPool = &pool;
  1184. RunTSQueueTests(10000);
  1185. RunTSListTests(10000);
  1186. #ifdef _WIN32
  1187. DWORD_PTR mask1 = 0;
  1188. --mask1;
  1189. DWORD_PTR mask2 = 0;
  1190. --mask2;
  1191. GetProcessAffinityMask( GetCurrentProcess(), &mask1, &mask2 );
  1192. #else
  1193. int32 mask1=-1;
  1194. #endif
  1195. Msg( "ThreadPoolTest: Job distribution speed\n" );
  1196. for ( int i = 0; i < 2; i++ )
  1197. {
  1198. bool bToCompletion = ( i % 2 != 0 );
  1199. if ( !IsX360() )
  1200. {
  1201. Msg( "ThreadPoolTest: Non-distribute\n" );
  1202. ThreadPoolTest::Test( false, true, bToCompletion );
  1203. }
  1204. Msg( "ThreadPoolTest: Distribute\n" );
  1205. ThreadPoolTest::Test( true, true, bToCompletion );
  1206. Msg( "ThreadPoolTest: One core\n" );
  1207. ThreadSetAffinity( 0, 1 );
  1208. ThreadPoolTest::Test( false, true, bToCompletion );
  1209. ThreadSetAffinity( 0, mask1 );
  1210. Msg( "ThreadPoolTest: NO Sleep\n" );
  1211. ThreadPoolTest::Test( false, false, bToCompletion );
  1212. Msg( "ThreadPoolTest: Distribute\n" );
  1213. ThreadPoolTest::Test( true, false, bToCompletion );
  1214. Msg( "ThreadPoolTest: One core\n" );
  1215. ThreadSetAffinity( 0, 1 );
  1216. ThreadPoolTest::Test( false, false, bToCompletion );
  1217. ThreadSetAffinity( 0, mask1 );
  1218. }
  1219. Msg( "ThreadPoolTest: Jobs doing work\n" );
  1220. for ( int i = 0; i < 2; i++ )
  1221. {
  1222. bool bToCompletion = true;// = ( i % 2 != 0 );
  1223. if ( !IsX360() )
  1224. {
  1225. Msg( "ThreadPoolTest: Non-distribute\n" );
  1226. ThreadPoolTest::Test( false, true, bToCompletion, true );
  1227. }
  1228. Msg( "ThreadPoolTest: Distribute\n" );
  1229. ThreadPoolTest::Test( true, true, bToCompletion, true );
  1230. Msg( "ThreadPoolTest: One core\n" );
  1231. ThreadSetAffinity( 0, 1 );
  1232. ThreadPoolTest::Test( false, true, bToCompletion, true );
  1233. ThreadSetAffinity( 0, mask1 );
  1234. Msg( "ThreadPoolTest: NO Sleep\n" );
  1235. ThreadPoolTest::Test( false, false, bToCompletion, true );
  1236. Msg( "ThreadPoolTest: Distribute\n" );
  1237. ThreadPoolTest::Test( true, false, bToCompletion, true );
  1238. Msg( "ThreadPoolTest: One core\n" );
  1239. ThreadSetAffinity( 0, 1 );
  1240. ThreadPoolTest::Test( false, false, bToCompletion, true );
  1241. ThreadSetAffinity( 0, mask1 );
  1242. }
  1243. #ifdef _WIN32
  1244. GetProcessAffinityMask( GetCurrentProcess(), &mask1, &mask2 );
  1245. #endif
  1246. ThreadPoolTest::TestForcedExecute();
  1247. }