Counter Strike : Global Offensive Source Code
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1470 lines
48 KiB

  1. //========== Copyright � 2005, Valve Corporation, All rights reserved. ========
  2. //
  3. // Purpose: A utility for a discrete job-oriented worker thread.
  4. //
  5. // The class CThreadPool is both the job queue, and the
  6. // worker thread. Except when the main thread attempts to
  7. // synchronously execute a job, most of the inter-thread locking
  8. // on the queue.
  9. //
  10. // The queue threading model uses a manual reset event for optimal
  11. // throughput. Adding to the queue is guarded by a semaphore that
  12. // will block the inserting thread if the queue has overflown.
  13. // This prevents the worker thread from being starved out even if
  14. // not running at a higher priority than the master thread.
  15. //
  16. // The thread function waits for jobs, services jobs, and manages
  17. // communication between the worker and master threads. The nature
  18. // of the work is opaque to the Executer.
  19. //
  20. // CJob instances actually do the work. The base class
  21. // calls virtual methods for job primitives, so derivations don't
  22. // need to worry about threading models. All of the variants of
  23. // job and OS can be expressed in this hierarchy. Instances of
  24. // CJob are the items placed in the queue, and by
  25. // overriding the job primitives they are the manner by which
  26. // users of the Executer control the state of the job.
  27. //
  28. //=============================================================================
  29. #include <limits.h>
  30. #include "tier0/threadtools.h"
  31. #include "tier1/refcount.h"
  32. #include "tier1/utllinkedlist.h"
  33. #include "tier1/utlvector.h"
  34. #include "tier1/functors.h"
  35. #include "vstdlib/vstdlib.h"
  36. #ifndef JOBTHREAD_H
  37. #define JOBTHREAD_H
  38. #ifdef AddJob // windows.h print function collisions
  39. #undef AddJob
  40. #undef GetJob
  41. #endif
  42. #ifdef VSTDLIB_DLL_EXPORT
  43. #define JOB_INTERFACE DLL_EXPORT
  44. #define JOB_OVERLOAD DLL_GLOBAL_EXPORT
  45. #define JOB_CLASS DLL_CLASS_EXPORT
  46. #else
  47. #define JOB_INTERFACE DLL_IMPORT
  48. #define JOB_OVERLOAD DLL_GLOBAL_IMPORT
  49. #define JOB_CLASS DLL_CLASS_IMPORT
  50. #endif
  51. #if defined( _WIN32 )
  52. #pragma once
  53. #endif
  54. //-----------------------------------------------------------------------------
  55. //
  56. //-----------------------------------------------------------------------------
  57. class CJob;
  58. //-----------------------------------------------------------------------------
  59. //
  60. //-----------------------------------------------------------------------------
  61. enum JobStatusEnum_t
  62. {
  63. // Use negative for errors
  64. JOB_OK, // operation is successful
  65. JOB_STATUS_PENDING, // file is properly queued, waiting for service
  66. JOB_STATUS_INPROGRESS, // file is being accessed
  67. JOB_STATUS_ABORTED, // file was aborted by caller
  68. JOB_STATUS_UNSERVICED, // file is not yet queued
  69. };
  70. typedef int JobStatus_t;
  71. enum JobFlags_t
  72. {
  73. JF_IO = ( 1 << 0 ), // The job primarily blocks on IO or hardware
  74. JF_BOOST_THREAD = ( 1 << 1 ), // Up the thread priority to max allowed while processing task
  75. JF_SERIAL = ( 1 << 2 ), // Job cannot be executed out of order relative to other "strict" jobs
  76. JF_QUEUE = ( 1 << 3 ), // Queue it, even if not an IO job
  77. };
  78. enum JobPriority_t
  79. {
  80. JP_LOW,
  81. JP_NORMAL,
  82. JP_HIGH,
  83. JP_IMMEDIATE,
  84. JP_NUM_PRIORITIES,
  85. // Priority aliases for game jobs
  86. JP_FRAME = JP_NORMAL,
  87. JP_FRAME_SEGMENT = JP_HIGH,
  88. };
  89. #define TP_MAX_POOL_THREADS 64
  90. struct ThreadPoolStartParams_t
  91. {
  92. ThreadPoolStartParams_t( bool bIOThreads = false, unsigned nThreads = (unsigned)-1, int *pAffinities = NULL, ThreeState_t fDistribute = TRS_NONE, unsigned nStackSize = (unsigned)-1, int iThreadPriority = SHRT_MIN )
  93. : bIOThreads( bIOThreads ), nThreads( nThreads ), nThreadsMax( -1 ), fDistribute( fDistribute ), nStackSize( nStackSize ), iThreadPriority( iThreadPriority )
  94. {
  95. bExecOnThreadPoolThreadsOnly = false;
  96. #if defined( DEDICATED ) && IsPlatformLinux()
  97. bEnableOnLinuxDedicatedServer = false; // by default, thread pools don't start up on Linux DS
  98. #endif
  99. bUseAffinityTable = ( pAffinities != NULL ) && ( fDistribute == TRS_TRUE ) && ( nThreads != (unsigned)-1 );
  100. if ( bUseAffinityTable )
  101. {
  102. // user supplied an optional 1:1 affinity mapping to override normal distribute behavior
  103. nThreads = MIN( TP_MAX_POOL_THREADS, nThreads );
  104. for ( unsigned int i = 0; i < nThreads; i++ )
  105. {
  106. iAffinityTable[i] = pAffinities[i];
  107. }
  108. }
  109. }
  110. int nThreads;
  111. int nThreadsMax;
  112. ThreeState_t fDistribute;
  113. int nStackSize;
  114. int iThreadPriority;
  115. int iAffinityTable[TP_MAX_POOL_THREADS];
  116. bool bIOThreads : 1;
  117. bool bUseAffinityTable : 1;
  118. bool bExecOnThreadPoolThreadsOnly : 1;
  119. #if defined( DEDICATED ) && IsPlatformLinux()
  120. bool bEnableOnLinuxDedicatedServer : 1;
  121. #endif
  122. };
  123. //-----------------------------------------------------------------------------
  124. //
  125. // IThreadPool
  126. //
  127. //-----------------------------------------------------------------------------
  128. typedef bool (*JobFilter_t)( CJob * );
  129. //---------------------------------------------------------
  130. // Messages supported through the CallWorker() method
  131. //---------------------------------------------------------
  132. enum ThreadPoolMessages_t
  133. {
  134. TPM_EXIT, // Exit the thread
  135. TPM_SUSPEND, // Suspend after next operation
  136. };
  137. //---------------------------------------------------------
  138. #ifdef Yield
  139. #undef Yield
  140. #endif
  141. abstract_class IThreadPool : public IRefCounted
  142. {
  143. public:
  144. virtual ~IThreadPool() {};
  145. //-----------------------------------------------------
  146. // Thread functions
  147. //-----------------------------------------------------
  148. virtual bool Start( const ThreadPoolStartParams_t &startParams = ThreadPoolStartParams_t() ) = 0;
  149. virtual bool Stop( int timeout = TT_INFINITE ) = 0;
  150. //-----------------------------------------------------
  151. // Functions for any thread
  152. //-----------------------------------------------------
  153. virtual unsigned GetJobCount() = 0;
  154. virtual int NumThreads() = 0;
  155. virtual int NumIdleThreads() = 0;
  156. //-----------------------------------------------------
  157. // Pause/resume processing jobs
  158. //-----------------------------------------------------
  159. virtual int SuspendExecution() = 0;
  160. virtual int ResumeExecution() = 0;
  161. //-----------------------------------------------------
  162. // Offer the current thread to the pool
  163. //-----------------------------------------------------
  164. virtual int YieldWait( CThreadEvent **pEvents, int nEvents, bool bWaitAll = true, unsigned timeout = TT_INFINITE ) = 0;
  165. virtual int YieldWait( CJob **, int nJobs, bool bWaitAll = true, unsigned timeout = TT_INFINITE ) = 0;
  166. virtual void Yield( unsigned timeout ) = 0;
  167. bool YieldWait( CThreadEvent &event, unsigned timeout = TT_INFINITE );
  168. bool YieldWait( CJob *, unsigned timeout = TT_INFINITE );
  169. //-----------------------------------------------------
  170. // Add a native job to the queue (master thread)
  171. // See AddPerFrameJob below if you want to add a job that
  172. // wants to be run before the end of the frame
  173. //-----------------------------------------------------
  174. virtual void AddJob( CJob * ) = 0;
  175. //-----------------------------------------------------
  176. // Add an function object to the queue (master thread)
  177. //-----------------------------------------------------
  178. virtual void AddFunctor( CFunctor *pFunctor, CJob **ppJob = NULL, const char *pszDescription = NULL, unsigned flags = 0 ) { AddFunctorInternal( RetAddRef( pFunctor ), ppJob, pszDescription, flags ); }
  179. //-----------------------------------------------------
  180. // Change the priority of an active job
  181. //-----------------------------------------------------
  182. virtual void ChangePriority( CJob *p, JobPriority_t priority ) = 0;
  183. //-----------------------------------------------------
  184. // Bulk job manipulation (blocking)
  185. //-----------------------------------------------------
  186. int ExecuteAll( JobFilter_t pfnFilter = NULL ) { return ExecuteToPriority( JP_LOW, pfnFilter ); }
  187. virtual int ExecuteToPriority( JobPriority_t toPriority, JobFilter_t pfnFilter = NULL ) = 0;
  188. virtual int AbortAll() = 0;
  189. //-----------------------------------------------------
  190. // Add a native job to the queue (master thread)
  191. // Call YieldWaitPerFrameJobs() to wait only until all per-frame jobs are done
  192. //-----------------------------------------------------
  193. virtual void AddPerFrameJob( CJob * ) = 0;
  194. //-----------------------------------------------------
  195. // Add an arbitrary call to the queue (master thread)
  196. //
  197. // Avert thy eyes! Imagine rather:
  198. //
  199. // CJob *AddCall( <function>, [args1, [arg2,]...]
  200. // CJob *AddCall( <object>, <function>, [args1, [arg2,]...]
  201. // CJob *AddRefCall( <object>, <function>, [args1, [arg2,]...]
  202. // CJob *QueueCall( <function>, [args1, [arg2,]...]
  203. // CJob *QueueCall( <object>, <function>, [args1, [arg2,]...]
  204. //-----------------------------------------------------
  205. #define DEFINE_NONMEMBER_ADD_CALL(N) \
  206. template <typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
  207. CJob *AddCall(FUNCTION_RETTYPE (*pfnProxied)( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
  208. { \
  209. CJob *pJob; \
  210. if ( !NumIdleThreads() ) \
  211. { \
  212. pJob = GetDummyJob(); \
  213. FunctorDirectCall( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \
  214. } \
  215. else \
  216. { \
  217. AddFunctorInternal( CreateFunctor( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \
  218. } \
  219. \
  220. return pJob; \
  221. }
  222. //-------------------------------------
  223. #define DEFINE_MEMBER_ADD_CALL(N) \
  224. template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
  225. CJob *AddCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
  226. { \
  227. CJob *pJob; \
  228. if ( !NumIdleThreads() ) \
  229. { \
  230. pJob = GetDummyJob(); \
  231. FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \
  232. } \
  233. else \
  234. { \
  235. AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \
  236. } \
  237. \
  238. return pJob; \
  239. }
  240. //-------------------------------------
  241. #define DEFINE_CONST_MEMBER_ADD_CALL(N) \
  242. template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
  243. CJob *AddCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \
  244. { \
  245. CJob *pJob; \
  246. if ( !NumIdleThreads() ) \
  247. { \
  248. pJob = GetDummyJob(); \
  249. FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \
  250. } \
  251. else \
  252. { \
  253. AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \
  254. } \
  255. \
  256. return pJob; \
  257. }
  258. //-------------------------------------
  259. #define DEFINE_REF_COUNTING_MEMBER_ADD_CALL(N) \
  260. template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
  261. CJob *AddRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
  262. { \
  263. CJob *pJob; \
  264. if ( !NumIdleThreads() ) \
  265. { \
  266. pJob = GetDummyJob(); \
  267. FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \
  268. } \
  269. else \
  270. { \
  271. AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \
  272. } \
  273. \
  274. return pJob; \
  275. }
  276. //-------------------------------------
  277. #define DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL(N) \
  278. template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
  279. CJob *AddRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \
  280. { \
  281. CJob *pJob; \
  282. if ( !NumIdleThreads() ) \
  283. { \
  284. pJob = GetDummyJob(); \
  285. FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \
  286. } \
  287. else \
  288. { \
  289. AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \
  290. } \
  291. \
  292. return pJob; \
  293. }
  294. //-----------------------------------------------------------------------------
  295. #define DEFINE_NONMEMBER_QUEUE_CALL(N) \
  296. template <typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
  297. CJob *QueueCall(FUNCTION_RETTYPE (*pfnProxied)( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
  298. { \
  299. CJob *pJob; \
  300. AddFunctorInternal( CreateFunctor( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \
  301. return pJob; \
  302. }
  303. //-------------------------------------
  304. #define DEFINE_MEMBER_QUEUE_CALL(N) \
  305. template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
  306. CJob *QueueCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
  307. { \
  308. CJob *pJob; \
  309. AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \
  310. return pJob; \
  311. }
  312. //-------------------------------------
  313. #define DEFINE_CONST_MEMBER_QUEUE_CALL(N) \
  314. template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
  315. CJob *QueueCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \
  316. { \
  317. CJob *pJob; \
  318. AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \
  319. return pJob; \
  320. }
  321. //-------------------------------------
  322. #define DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL(N) \
  323. template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
  324. CJob *QueueRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
  325. { \
  326. CJob *pJob; \
  327. AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \
  328. return pJob; \
  329. }
  330. //-------------------------------------
  331. #define DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL(N) \
  332. template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
  333. CJob *QueueRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \
  334. { \
  335. CJob *pJob; \
  336. AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \
  337. \
  338. return pJob; \
  339. }
  340. FUNC_GENERATE_ALL( DEFINE_NONMEMBER_ADD_CALL );
  341. FUNC_GENERATE_ALL( DEFINE_MEMBER_ADD_CALL );
  342. FUNC_GENERATE_ALL( DEFINE_CONST_MEMBER_ADD_CALL );
  343. FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_MEMBER_ADD_CALL );
  344. FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL );
  345. FUNC_GENERATE_ALL( DEFINE_NONMEMBER_QUEUE_CALL );
  346. FUNC_GENERATE_ALL( DEFINE_MEMBER_QUEUE_CALL );
  347. FUNC_GENERATE_ALL( DEFINE_CONST_MEMBER_QUEUE_CALL );
  348. FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL );
  349. FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL );
  350. #undef DEFINE_NONMEMBER_ADD_CALL
  351. #undef DEFINE_MEMBER_ADD_CALL
  352. #undef DEFINE_CONST_MEMBER_ADD_CALL
  353. #undef DEFINE_REF_COUNTING_MEMBER_ADD_CALL
  354. #undef DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL
  355. #undef DEFINE_NONMEMBER_QUEUE_CALL
  356. #undef DEFINE_MEMBER_QUEUE_CALL
  357. #undef DEFINE_CONST_MEMBER_QUEUE_CALL
  358. #undef DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL
  359. #undef DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL
  360. private:
  361. virtual void AddFunctorInternal( CFunctor *, CJob ** = NULL, const char *pszDescription = NULL, unsigned flags = 0 ) = 0;
  362. //-----------------------------------------------------
  363. // Services for internal use by job instances
  364. //-----------------------------------------------------
  365. friend class CJob;
  366. virtual CJob *GetDummyJob() = 0;
  367. public:
  368. virtual void Distribute( bool bDistribute = true, int *pAffinityTable = NULL ) = 0;
  369. virtual bool Start( const ThreadPoolStartParams_t &startParams, const char *pszNameOverride ) = 0;
  370. virtual int YieldWaitPerFrameJobs( ) = 0;
  371. };
  372. //-----------------------------------------------------------------------------
  373. JOB_INTERFACE IThreadPool *CreateNewThreadPool();
  374. JOB_INTERFACE void DestroyThreadPool( IThreadPool *pPool );
  375. //-------------------------------------
  376. JOB_INTERFACE void RunThreadPoolTests();
  377. //-----------------------------------------------------------------------------
  378. JOB_INTERFACE IThreadPool *g_pThreadPool;
  379. #ifdef _X360
  380. JOB_INTERFACE IThreadPool *g_pAlternateThreadPool;
  381. #endif
  382. //-----------------------------------------------------------------------------
  383. // Class to combine the metadata for an operation and the ability to perform
  384. // the operation. Meant for inheritance. All functions inline, defers to executor
  385. //-----------------------------------------------------------------------------
  386. DECLARE_POINTER_HANDLE( ThreadPoolData_t );
  387. #define JOB_NO_DATA ((ThreadPoolData_t)-1)
  388. class CJob : public CRefCounted1<IRefCounted, CRefCountServiceMT>
  389. {
  390. public:
  391. CJob( JobPriority_t priority = JP_NORMAL )
  392. : m_status( JOB_STATUS_UNSERVICED ),
  393. m_ThreadPoolData( JOB_NO_DATA ),
  394. m_priority( priority ),
  395. m_flags( 0 ),
  396. m_pThreadPool( NULL ),
  397. m_CompleteEvent( true ),
  398. m_iServicingThread( -1 )
  399. {
  400. }
  401. //-----------------------------------------------------
  402. // Priority (not thread safe)
  403. //-----------------------------------------------------
  404. void SetPriority( JobPriority_t priority ) { m_priority = priority; }
  405. JobPriority_t GetPriority() const { return m_priority; }
  406. //-----------------------------------------------------
  407. void SetFlags( unsigned flags ) { m_flags = flags; }
  408. unsigned GetFlags() const { return m_flags; }
  409. //-----------------------------------------------------
  410. void SetServiceThread( int iServicingThread ) { m_iServicingThread = (char)iServicingThread; }
  411. int GetServiceThread() const { return m_iServicingThread; }
  412. void ClearServiceThread() { m_iServicingThread = -1; }
  413. //-----------------------------------------------------
  414. // Fast queries
  415. //-----------------------------------------------------
  416. bool Executed() const { return ( m_status == JOB_OK ); }
  417. bool CanExecute() const { return ( m_status == JOB_STATUS_PENDING || m_status == JOB_STATUS_UNSERVICED ); }
  418. bool IsFinished() const { return ( m_status != JOB_STATUS_PENDING && m_status != JOB_STATUS_INPROGRESS && m_status != JOB_STATUS_UNSERVICED ); }
  419. JobStatus_t GetStatus() const { return m_status; }
  420. //-----------------------------------------------------
  421. // Try to acquire ownership (to satisfy). If you take the lock, you must either execute or abort.
  422. //-----------------------------------------------------
  423. bool TryLock() { return m_mutex.TryLock(); }
  424. void Lock() { m_mutex.Lock(); }
  425. void Unlock() { m_mutex.Unlock(); }
  426. //-----------------------------------------------------
  427. // Thread event support (safe for NULL this to simplify code )
  428. //-----------------------------------------------------
  429. bool WaitForFinish( uint32 dwTimeout = TT_INFINITE ) { if (!this) return true; return ( !IsFinished() ) ? g_pThreadPool->YieldWait( this, dwTimeout ) : true; }
  430. bool WaitForFinishAndRelease( uint32 dwTimeout = TT_INFINITE ) { if (!this) return true; bool bResult = WaitForFinish( dwTimeout); Release(); return bResult; }
  431. CThreadEvent *AccessEvent() { return &m_CompleteEvent; }
  432. //-----------------------------------------------------
  433. // Perform the job
  434. //-----------------------------------------------------
  435. JobStatus_t Execute();
  436. JobStatus_t TryExecute();
  437. JobStatus_t ExecuteAndRelease() { JobStatus_t status = Execute(); Release(); return status; }
  438. JobStatus_t TryExecuteAndRelease() { JobStatus_t status = TryExecute(); Release(); return status; }
  439. //-----------------------------------------------------
  440. // Terminate the job, discard if partially or wholly fulfilled
  441. //-----------------------------------------------------
  442. JobStatus_t Abort( bool bDiscard = true );
  443. virtual char const *Describe() { return "Job"; }
  444. private:
  445. //-----------------------------------------------------
  446. friend class CThreadPool;
  447. JobStatus_t m_status;
  448. JobPriority_t m_priority;
  449. CThreadMutex m_mutex;
  450. unsigned char m_flags;
  451. char m_iServicingThread;
  452. short m_reserved;
  453. ThreadPoolData_t m_ThreadPoolData;
  454. IThreadPool * m_pThreadPool;
  455. CThreadEvent m_CompleteEvent;
  456. #if defined( THREAD_PARENT_STACK_TRACE_ENABLED )
  457. void * m_ParentStackTrace[THREAD_PARENT_STACK_TRACE_LENGTH];
  458. #endif
  459. private:
  460. //-----------------------------------------------------
  461. CJob( const CJob &fromRequest );
  462. void operator=(const CJob &fromRequest );
  463. virtual JobStatus_t DoExecute() = 0;
  464. virtual JobStatus_t DoAbort( bool bDiscard ) { return JOB_STATUS_ABORTED; }
  465. virtual void DoCleanup() {}
  466. };
  467. //-----------------------------------------------------------------------------
  468. class CFunctorJob : public CJob
  469. {
  470. public:
  471. CFunctorJob( CFunctor *pFunctor, const char *pszDescription = NULL )
  472. : m_pFunctor( pFunctor )
  473. {
  474. if ( pszDescription )
  475. {
  476. Q_strncpy( m_szDescription, pszDescription, sizeof(m_szDescription) );
  477. }
  478. else
  479. {
  480. m_szDescription[0] = 0;
  481. }
  482. }
  483. virtual JobStatus_t DoExecute()
  484. {
  485. (*m_pFunctor)();
  486. return JOB_OK;
  487. }
  488. const char *Describe()
  489. {
  490. return m_szDescription;
  491. }
  492. private:
  493. CRefPtr<CFunctor> m_pFunctor;
  494. char m_szDescription[16];
  495. };
  496. //-----------------------------------------------------------------------------
  497. // Utility for managing multiple jobs
  498. //-----------------------------------------------------------------------------
  499. class CJobSet
  500. {
  501. public:
  502. CJobSet( CJob *pJob = NULL )
  503. {
  504. if ( pJob )
  505. {
  506. m_jobs.AddToTail( pJob );
  507. }
  508. }
  509. CJobSet( CJob **ppJobs, int nJobs )
  510. {
  511. if ( ppJobs )
  512. {
  513. m_jobs.AddMultipleToTail( nJobs, ppJobs );
  514. }
  515. }
  516. ~CJobSet()
  517. {
  518. for ( int i = 0; i < m_jobs.Count(); i++ )
  519. {
  520. m_jobs[i]->Release();
  521. }
  522. }
  523. void operator+=( CJob *pJob )
  524. {
  525. m_jobs.AddToTail( pJob );
  526. }
  527. void operator-=( CJob *pJob )
  528. {
  529. m_jobs.FindAndRemove( pJob );
  530. }
  531. void Execute( bool bRelease = true )
  532. {
  533. for ( int i = 0; i < m_jobs.Count(); i++ )
  534. {
  535. m_jobs[i]->Execute();
  536. if ( bRelease )
  537. {
  538. m_jobs[i]->Release();
  539. }
  540. }
  541. if ( bRelease )
  542. {
  543. m_jobs.RemoveAll();
  544. }
  545. }
  546. void Abort( bool bRelease = true )
  547. {
  548. for ( int i = 0; i < m_jobs.Count(); i++ )
  549. {
  550. m_jobs[i]->Abort();
  551. if ( bRelease )
  552. {
  553. m_jobs[i]->Release();
  554. }
  555. }
  556. if ( bRelease )
  557. {
  558. m_jobs.RemoveAll();
  559. }
  560. }
  561. void WaitForFinish( bool bRelease = true )
  562. {
  563. for ( int i = 0; i < m_jobs.Count(); i++ )
  564. {
  565. m_jobs[i]->WaitForFinish();
  566. if ( bRelease )
  567. {
  568. m_jobs[i]->Release();
  569. }
  570. }
  571. if ( bRelease )
  572. {
  573. m_jobs.RemoveAll();
  574. }
  575. }
  576. void WaitForFinish( IThreadPool *pPool, bool bRelease = true )
  577. {
  578. pPool->YieldWait( m_jobs.Base(), m_jobs.Count() );
  579. if ( bRelease )
  580. {
  581. for ( int i = 0; i < m_jobs.Count(); i++ )
  582. {
  583. m_jobs[i]->Release();
  584. }
  585. m_jobs.RemoveAll();
  586. }
  587. }
  588. private:
  589. CUtlVectorFixed<CJob *, 16> m_jobs;
  590. };
  591. //-----------------------------------------------------------------------------
  592. // Job helpers
  593. //-----------------------------------------------------------------------------
  594. #define ThreadExecute g_pThreadPool->QueueCall
  595. #define ThreadExecuteRef g_pThreadPool->QueueRefCall
  596. #define BeginExecuteParallel() do { CJobSet jobSet
  597. #define EndExecuteParallel() jobSet.WaitForFinish( g_pThreadPool ); } while (0)
  598. #define ExecuteParallel jobSet += g_pThreadPool->QueueCall
  599. #define ExecuteRefParallel jobSet += g_pThreadPool->QueueCallRef
  600. //-----------------------------------------------------------------------------
  601. // Work splitting: array split, best when cost per item is roughly equal
  602. //-----------------------------------------------------------------------------
  603. #pragma warning(push)
  604. #pragma warning(disable:4389)
  605. #pragma warning(disable:4018)
  606. #pragma warning(disable:4701)
  607. #define DEFINE_NON_MEMBER_ITER_RANGE_PARALLEL(N) \
  608. template <typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N, typename ITERTYPE1, typename ITERTYPE2> \
  609. void IterRangeParallel(FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( ITERTYPE1, ITERTYPE2 FUNC_SEPARATOR_##N FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ), ITERTYPE1 from, ITERTYPE2 to FUNC_ARG_FORMAL_PARAMS_##N ) \
  610. { \
  611. const int MAX_THREADS = 16; \
  612. int nIdle = g_pThreadPool->NumIdleThreads(); \
  613. ITERTYPE1 range = to - from; \
  614. int nThreads = min( nIdle + 1, range ); \
  615. if ( nThreads > MAX_THREADS ) \
  616. { \
  617. nThreads = MAX_THREADS; \
  618. } \
  619. if ( nThreads < 2 ) \
  620. { \
  621. FunctorDirectCall( pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \
  622. } \
  623. else \
  624. { \
  625. ITERTYPE1 nIncrement = range / nThreads; \
  626. \
  627. CJobSet jobSet; \
  628. while ( --nThreads ) \
  629. { \
  630. ITERTYPE2 thisTo = from + nIncrement; \
  631. jobSet += g_pThreadPool->AddCall( pfnProxied, from, thisTo FUNC_FUNCTOR_CALL_ARGS_##N ); \
  632. from = thisTo; \
  633. } \
  634. FunctorDirectCall( pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \
  635. jobSet.WaitForFinish( g_pThreadPool ); \
  636. } \
  637. \
  638. }
  639. FUNC_GENERATE_ALL( DEFINE_NON_MEMBER_ITER_RANGE_PARALLEL );
  640. #define DEFINE_MEMBER_ITER_RANGE_PARALLEL(N) \
  641. template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N, typename ITERTYPE1, typename ITERTYPE2> \
  642. void IterRangeParallel(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( ITERTYPE1, ITERTYPE2 FUNC_SEPARATOR_##N FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ), ITERTYPE1 from, ITERTYPE2 to FUNC_ARG_FORMAL_PARAMS_##N ) \
  643. { \
  644. const int MAX_THREADS = 16; \
  645. int nIdle = g_pThreadPool->NumIdleThreads(); \
  646. ITERTYPE1 range = to - from; \
  647. int nThreads = min( nIdle + 1, range ); \
  648. if ( nThreads > MAX_THREADS ) \
  649. { \
  650. nThreads = MAX_THREADS; \
  651. } \
  652. if ( nThreads < 2 ) \
  653. { \
  654. FunctorDirectCall( pObject, pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \
  655. } \
  656. else \
  657. { \
  658. ITERTYPE1 nIncrement = range / nThreads; \
  659. \
  660. CJobSet jobSet; \
  661. while ( --nThreads ) \
  662. { \
  663. ITERTYPE2 thisTo = from + nIncrement; \
  664. jobSet += g_pThreadPool->AddCall( pObject, pfnProxied, from, thisTo FUNC_FUNCTOR_CALL_ARGS_##N ); \
  665. from = thisTo; \
  666. } \
  667. FunctorDirectCall( pObject, pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \
  668. jobSet.WaitForFinish( g_pThreadPool ); \
  669. } \
  670. \
  671. }
  672. FUNC_GENERATE_ALL( DEFINE_MEMBER_ITER_RANGE_PARALLEL );
  673. //-----------------------------------------------------------------------------
  674. // Work splitting: competitive, best when cost per item varies a lot
  675. //-----------------------------------------------------------------------------
  676. template <typename T>
  677. class CJobItemProcessor
  678. {
  679. public:
  680. typedef T ItemType_t;
  681. void Begin() {}
  682. // void Process( ItemType_t & ) {}
  683. void End() {}
  684. };
  685. template <typename T>
  686. class CFuncJobItemProcessor : public CJobItemProcessor<T>
  687. {
  688. public:
  689. void Init(void (*pfnProcess)( T & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL )
  690. {
  691. m_pfnProcess = pfnProcess;
  692. m_pfnBegin = pfnBegin;
  693. m_pfnEnd = pfnEnd;
  694. }
  695. //CFuncJobItemProcessor(OBJECT_TYPE_PTR pObject, void (FUNCTION_CLASS::*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL );
  696. void Begin() { if ( m_pfnBegin ) (*m_pfnBegin)(); }
  697. void Process( T &item ) { (*m_pfnProcess)( item ); }
  698. void End() { if ( m_pfnEnd ) (*m_pfnEnd)(); }
  699. protected:
  700. void (*m_pfnProcess)( T & );
  701. void (*m_pfnBegin)();
  702. void (*m_pfnEnd)();
  703. };
  704. template <typename T, class OBJECT_TYPE, class FUNCTION_CLASS = OBJECT_TYPE >
  705. class CMemberFuncJobItemProcessor : public CJobItemProcessor<T>
  706. {
  707. public:
  708. void Init( OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( T & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL )
  709. {
  710. m_pObject = pObject;
  711. m_pfnProcess = pfnProcess;
  712. m_pfnBegin = pfnBegin;
  713. m_pfnEnd = pfnEnd;
  714. }
  715. void Begin() { if ( m_pfnBegin ) ((*m_pObject).*m_pfnBegin)(); }
  716. void Process( T &item ) { ((*m_pObject).*m_pfnProcess)( item ); }
  717. void End() { if ( m_pfnEnd ) ((*m_pObject).*m_pfnEnd)(); }
  718. protected:
  719. OBJECT_TYPE *m_pObject;
  720. void (FUNCTION_CLASS::*m_pfnProcess)( T & );
  721. void (FUNCTION_CLASS::*m_pfnBegin)();
  722. void (FUNCTION_CLASS::*m_pfnEnd)();
  723. };
  724. template <typename T>
  725. class CLoopFuncJobItemProcessor : public CJobItemProcessor<T>
  726. {
  727. public:
  728. void Init(void (*pfnProcess)( T*, int, int ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL )
  729. {
  730. m_pfnProcess = pfnProcess;
  731. m_pfnBegin = pfnBegin;
  732. m_pfnEnd = pfnEnd;
  733. }
  734. void Begin() { if ( m_pfnBegin ) (*m_pfnBegin)(); }
  735. void Process( T* pContext, int nFirst, int nCount ) { (*m_pfnProcess)( pContext, nFirst, nCount ); }
  736. void End() { if ( m_pfnEnd ) (*m_pfnEnd)(); }
  737. protected:
  738. void (*m_pfnProcess)( T*, int, int );
  739. void (*m_pfnBegin)();
  740. void (*m_pfnEnd)();
  741. };
  742. template <typename T, class OBJECT_TYPE, class FUNCTION_CLASS = OBJECT_TYPE >
  743. class CLoopMemberFuncJobItemProcessor : public CJobItemProcessor<T>
  744. {
  745. public:
  746. void Init( OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( T*, int, int ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL )
  747. {
  748. m_pObject = pObject;
  749. m_pfnProcess = pfnProcess;
  750. m_pfnBegin = pfnBegin;
  751. m_pfnEnd = pfnEnd;
  752. }
  753. void Begin() { if ( m_pfnBegin ) ((*m_pObject).*m_pfnBegin)(); }
  754. void Process( T *item, int nFirst, int nCount ) { ((*m_pObject).*m_pfnProcess)( item, nFirst, nCount ); }
  755. void End() { if ( m_pfnEnd ) ((*m_pObject).*m_pfnEnd)(); }
  756. protected:
  757. OBJECT_TYPE *m_pObject;
  758. void (FUNCTION_CLASS::*m_pfnProcess)( T*, int, int );
  759. void (FUNCTION_CLASS::*m_pfnBegin)();
  760. void (FUNCTION_CLASS::*m_pfnEnd)();
  761. };
  762. #pragma warning(push)
  763. #pragma warning(disable:4189)
  764. template <typename ITEM_TYPE, class ITEM_PROCESSOR_TYPE, int ID_TO_PREVENT_COMDATS_IN_PROFILES = 1>
  765. class CParallelProcessor
  766. {
  767. public:
  768. CParallelProcessor()
  769. {
  770. m_pItems = m_pLimit= 0;
  771. }
  772. void Run( ITEM_TYPE *pItems, unsigned nItems, int nChunkSize = 1, int nMaxParallel = INT_MAX, IThreadPool *pThreadPool = NULL )
  773. {
  774. if ( nItems == 0 )
  775. return;
  776. #if defined(_X360)
  777. volatile int ignored = ID_TO_PREVENT_COMDATS_IN_PROFILES;
  778. #endif
  779. m_nChunkSize = nChunkSize;
  780. if ( !pThreadPool )
  781. {
  782. pThreadPool = g_pThreadPool;
  783. }
  784. m_pItems = pItems;
  785. m_pLimit = pItems + nItems;
  786. int nJobs = nItems - 1;
  787. if ( nJobs > nMaxParallel )
  788. {
  789. nJobs = nMaxParallel;
  790. }
  791. if (! pThreadPool ) // only possible on linux
  792. {
  793. DoExecute( );
  794. return;
  795. }
  796. int nThreads = pThreadPool->NumThreads();
  797. if ( nJobs > nThreads )
  798. {
  799. nJobs = nThreads;
  800. }
  801. if ( nJobs > 0 )
  802. {
  803. CJob **jobs = (CJob **)stackalloc( nJobs * sizeof(CJob **) );
  804. int i = nJobs;
  805. while( i-- )
  806. {
  807. jobs[i] = pThreadPool->QueueCall( this, &CParallelProcessor<ITEM_TYPE, ITEM_PROCESSOR_TYPE, ID_TO_PREVENT_COMDATS_IN_PROFILES>::DoExecute );
  808. }
  809. DoExecute();
  810. for ( i = 0; i < nJobs; i++ )
  811. {
  812. jobs[i]->Abort(); // will either abort ones that never got a thread, or noop on ones that did
  813. jobs[i]->Release();
  814. }
  815. }
  816. else
  817. {
  818. DoExecute();
  819. }
  820. }
  821. ITEM_PROCESSOR_TYPE m_ItemProcessor;
  822. private:
  823. void DoExecute()
  824. {
  825. if ( m_pItems < m_pLimit )
  826. {
  827. #if defined(_X360)
  828. volatile int ignored = ID_TO_PREVENT_COMDATS_IN_PROFILES;
  829. #endif
  830. m_ItemProcessor.Begin();
  831. ITEM_TYPE *pLimit = m_pLimit;
  832. int nChunkSize = m_nChunkSize;
  833. for (;;)
  834. {
  835. ITEM_TYPE *pCurrent = m_pItems.AtomicAdd( nChunkSize );
  836. ITEM_TYPE *pLast = MIN( pLimit, pCurrent + nChunkSize );
  837. while( pCurrent < pLast )
  838. {
  839. m_ItemProcessor.Process( *pCurrent );
  840. pCurrent++;
  841. }
  842. if ( pCurrent >= pLimit )
  843. {
  844. break;
  845. }
  846. }
  847. m_ItemProcessor.End();
  848. }
  849. }
  850. CInterlockedPtr<ITEM_TYPE> m_pItems;
  851. ITEM_TYPE * m_pLimit;
  852. int m_nChunkSize;
  853. };
  854. #pragma warning(pop)
  855. template <typename ITEM_TYPE>
  856. inline void ParallelProcess( ITEM_TYPE *pItems, unsigned nItems, void (*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
  857. {
  858. CParallelProcessor<ITEM_TYPE, CFuncJobItemProcessor<ITEM_TYPE> > processor;
  859. processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd );
  860. processor.Run( pItems, nItems, 1, nMaxParallel );
  861. }
  862. template <typename ITEM_TYPE, typename OBJECT_TYPE, typename FUNCTION_CLASS >
  863. inline void ParallelProcess( ITEM_TYPE *pItems, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( ITEM_TYPE & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
  864. {
  865. CParallelProcessor<ITEM_TYPE, CMemberFuncJobItemProcessor<ITEM_TYPE, OBJECT_TYPE, FUNCTION_CLASS> > processor;
  866. processor.m_ItemProcessor.Init( pObject, pfnProcess, pfnBegin, pfnEnd );
  867. processor.Run( pItems, nItems, 1, nMaxParallel );
  868. }
  869. // Parallel Process that lets you specify threadpool
  870. template <typename ITEM_TYPE>
  871. inline void ParallelProcess( IThreadPool *pPool, ITEM_TYPE *pItems, unsigned nItems, void (*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
  872. {
  873. CParallelProcessor<ITEM_TYPE, CFuncJobItemProcessor<ITEM_TYPE> > processor;
  874. processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd );
  875. processor.Run( pItems, nItems, 1, nMaxParallel, pPool );
  876. }
  877. template <typename ITEM_TYPE, typename OBJECT_TYPE, typename FUNCTION_CLASS >
  878. inline void ParallelProcess( IThreadPool *pPool, ITEM_TYPE *pItems, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( ITEM_TYPE & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
  879. {
  880. CParallelProcessor<ITEM_TYPE, CMemberFuncJobItemProcessor<ITEM_TYPE, OBJECT_TYPE, FUNCTION_CLASS> > processor;
  881. processor.m_ItemProcessor.Init( pObject, pfnProcess, pfnBegin, pfnEnd );
  882. processor.Run( pItems, nItems, 1, nMaxParallel, pPool );
  883. }
  884. // ParallelProcessChunks lets you specify a minimum # of items to process per job. Use this when
  885. // you may have a large set of work items which only take a small amount of time per item, and so
  886. // need to reduce dispatch overhead.
  887. template <typename ITEM_TYPE>
  888. inline void ParallelProcessChunks( ITEM_TYPE *pItems, unsigned nItems, void (*pfnProcess)( ITEM_TYPE & ), int nChunkSize, int nMaxParallel = INT_MAX )
  889. {
  890. CParallelProcessor<ITEM_TYPE, CFuncJobItemProcessor<ITEM_TYPE> > processor;
  891. processor.m_ItemProcessor.Init( pfnProcess, NULL, NULL );
  892. processor.Run( pItems, nItems, nChunkSize, nMaxParallel );
  893. }
  894. template <typename ITEM_TYPE, typename OBJECT_TYPE, typename FUNCTION_CLASS >
  895. inline void ParallelProcessChunks( ITEM_TYPE *pItems, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( ITEM_TYPE & ), int nChunkSize, int nMaxParallel = INT_MAX )
  896. {
  897. CParallelProcessor<ITEM_TYPE, CMemberFuncJobItemProcessor<ITEM_TYPE, OBJECT_TYPE, FUNCTION_CLASS> > processor;
  898. processor.m_ItemProcessor.Init( pObject, pfnProcess, NULL, NULL );
  899. processor.Run( pItems, nItems, nChunkSize, nMaxParallel );
  900. }
  901. template <typename ITEM_TYPE, typename OBJECT_TYPE, typename FUNCTION_CLASS >
  902. inline void ParallelProcessChunks( IThreadPool *pPool, ITEM_TYPE *pItems, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( ITEM_TYPE & ), int nChunkSize, int nMaxParallel = INT_MAX )
  903. {
  904. CParallelProcessor<ITEM_TYPE, CMemberFuncJobItemProcessor<ITEM_TYPE, OBJECT_TYPE, FUNCTION_CLASS> > processor;
  905. processor.m_ItemProcessor.Init( pObject, pfnProcess, NULL, NULL );
  906. processor.Run( pItems, nItems, nChunkSize, nMaxParallel, pPool );
  907. }
  908. template <class CONTEXT_TYPE, class ITEM_PROCESSOR_TYPE>
  909. class CParallelLoopProcessor
  910. {
  911. public:
  912. CParallelLoopProcessor()
  913. {
  914. m_nIndex = m_nLimit = 0;
  915. m_nChunkCount = 0;
  916. m_nActive = 0;
  917. }
  918. void Run( CONTEXT_TYPE *pContext, int nBegin, int nItems, int nChunkCount, int nMaxParallel = INT_MAX, IThreadPool *pThreadPool = NULL )
  919. {
  920. if ( !nItems )
  921. return;
  922. if ( !pThreadPool )
  923. {
  924. pThreadPool = g_pThreadPool;
  925. }
  926. m_pContext = pContext;
  927. m_nIndex = nBegin;
  928. m_nLimit = nBegin + nItems;
  929. nChunkCount = MAX( MIN( nItems, nChunkCount ), 1 );
  930. m_nChunkCount = ( nItems + nChunkCount - 1 ) / nChunkCount;
  931. int nJobs = ( nItems + m_nChunkCount - 1 ) / m_nChunkCount;
  932. if ( nJobs > nMaxParallel )
  933. {
  934. nJobs = nMaxParallel;
  935. }
  936. if ( !pThreadPool ) // only possible on linux
  937. {
  938. DoExecute( );
  939. return;
  940. }
  941. int nThreads = pThreadPool->NumThreads();
  942. if ( nJobs > nThreads )
  943. {
  944. nJobs = nThreads;
  945. }
  946. if ( nJobs > 0 )
  947. {
  948. CJob **jobs = (CJob **)stackalloc( nJobs * sizeof(CJob **) );
  949. int i = nJobs;
  950. while( i-- )
  951. {
  952. jobs[i] = pThreadPool->QueueCall( this, &CParallelLoopProcessor<CONTEXT_TYPE, ITEM_PROCESSOR_TYPE>::DoExecute );
  953. }
  954. DoExecute();
  955. for ( i = 0; i < nJobs; i++ )
  956. {
  957. jobs[i]->Abort(); // will either abort ones that never got a thread, or noop on ones that did
  958. jobs[i]->Release();
  959. }
  960. }
  961. else
  962. {
  963. DoExecute();
  964. }
  965. }
  966. ITEM_PROCESSOR_TYPE m_ItemProcessor;
  967. private:
  968. void DoExecute()
  969. {
  970. m_ItemProcessor.Begin();
  971. for (;;)
  972. {
  973. int nIndex = m_nIndex.AtomicAdd( m_nChunkCount );
  974. if ( nIndex < m_nLimit )
  975. {
  976. int nCount = MIN( m_nChunkCount, m_nLimit - nIndex );
  977. m_ItemProcessor.Process( m_pContext, nIndex, nCount );
  978. }
  979. else
  980. {
  981. break;
  982. }
  983. }
  984. m_ItemProcessor.End();
  985. --m_nActive;
  986. }
  987. CONTEXT_TYPE *m_pContext;
  988. CInterlockedInt m_nIndex;
  989. int m_nLimit;
  990. int m_nChunkCount;
  991. CInterlockedInt m_nActive;
  992. };
  993. template < typename CONTEXT_TYPE >
  994. inline void ParallelLoopProcess( IThreadPool *pPool, CONTEXT_TYPE *pContext, int nStart, int nCount, void (*pfnProcess)( CONTEXT_TYPE*, int, int ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
  995. {
  996. CParallelLoopProcessor< CONTEXT_TYPE, CLoopFuncJobItemProcessor< CONTEXT_TYPE > > processor;
  997. processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd );
  998. processor.Run( pContext, nStart, nCount, 1, nMaxParallel, pPool );
  999. }
  1000. template < typename CONTEXT_TYPE, typename OBJECT_TYPE, typename FUNCTION_CLASS >
  1001. inline void ParallelLoopProcess( IThreadPool *pPool, CONTEXT_TYPE *pContext, int nStart, int nCount, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( CONTEXT_TYPE*, int, int ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
  1002. {
  1003. CParallelLoopProcessor< CONTEXT_TYPE, CLoopMemberFuncJobItemProcessor<CONTEXT_TYPE, OBJECT_TYPE, FUNCTION_CLASS> > processor;
  1004. processor.m_ItemProcessor.Init( pObject, pfnProcess, pfnBegin, pfnEnd );
  1005. processor.Run( pContext, nStart, nCount, 1, nMaxParallel, pPool );
  1006. }
  1007. template < typename CONTEXT_TYPE >
  1008. inline void ParallelLoopProcessChunks( IThreadPool *pPool, CONTEXT_TYPE *pContext, int nStart, int nCount, int nChunkSize, void (*pfnProcess)( CONTEXT_TYPE*, int, int ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
  1009. {
  1010. CParallelLoopProcessor< CONTEXT_TYPE, CLoopFuncJobItemProcessor< CONTEXT_TYPE > > processor;
  1011. processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd );
  1012. processor.Run( pContext, nStart, nCount, nChunkSize, nMaxParallel, pPool );
  1013. }
  1014. template < typename CONTEXT_TYPE, typename OBJECT_TYPE, typename FUNCTION_CLASS >
  1015. inline void ParallelLoopProcessChunks( IThreadPool *pPool, CONTEXT_TYPE *pContext, int nStart, int nCount, int nChunkSize, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( CONTEXT_TYPE*, int, int ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
  1016. {
  1017. CParallelLoopProcessor< CONTEXT_TYPE, CLoopMemberFuncJobItemProcessor<CONTEXT_TYPE, OBJECT_TYPE, FUNCTION_CLASS> > processor;
  1018. processor.m_ItemProcessor.Init( pObject, pfnProcess, pfnBegin, pfnEnd );
  1019. processor.Run( pContext, nStart, nCount, nChunkSize, nMaxParallel, pPool );
  1020. }
  1021. template <class Derived>
  1022. class CParallelProcessorBase
  1023. {
  1024. protected:
  1025. typedef CParallelProcessorBase<Derived> ThisParallelProcessorBase_t;
  1026. typedef Derived ThisParallelProcessorDerived_t;
  1027. public:
  1028. CParallelProcessorBase()
  1029. {
  1030. m_nActive = 0;
  1031. }
  1032. protected:
  1033. void Run( int nMaxParallel = INT_MAX, int threadOverride = -1 )
  1034. {
  1035. int i = g_pThreadPool->NumIdleThreads();
  1036. if ( nMaxParallel < i)
  1037. {
  1038. i = nMaxParallel;
  1039. }
  1040. while( i -- > 0 )
  1041. {
  1042. if ( threadOverride == -1 || i == threadOverride - 1 )
  1043. {
  1044. ++ m_nActive;
  1045. ThreadExecute( this, &ThisParallelProcessorBase_t::DoExecute )->Release();
  1046. }
  1047. }
  1048. if ( threadOverride == -1 || threadOverride == 0 )
  1049. {
  1050. ++ m_nActive;
  1051. DoExecute();
  1052. }
  1053. while ( m_nActive )
  1054. {
  1055. ThreadPause();
  1056. }
  1057. }
  1058. protected:
  1059. void OnBegin() {}
  1060. bool OnProcess() { return false; }
  1061. void OnEnd() {}
  1062. private:
  1063. void DoExecute()
  1064. {
  1065. static_cast<Derived *>( this )->OnBegin();
  1066. while ( static_cast<Derived *>( this )->OnProcess() )
  1067. continue;
  1068. static_cast<Derived *>(this)->OnEnd();
  1069. -- m_nActive;
  1070. }
  1071. CInterlockedInt m_nActive;
  1072. };
  1073. //-----------------------------------------------------------------------------
  1074. // Raw thread launching
  1075. //-----------------------------------------------------------------------------
  1076. inline uintp FunctorExecuteThread( void *pParam )
  1077. {
  1078. CFunctor *pFunctor = (CFunctor *)pParam;
  1079. (*pFunctor)();
  1080. pFunctor->Release();
  1081. return 0;
  1082. }
  1083. inline ThreadHandle_t ThreadExecuteSoloImpl( CFunctor *pFunctor, const char *pszName = NULL )
  1084. {
  1085. ThreadHandle_t hThread;
  1086. hThread = CreateSimpleThread( FunctorExecuteThread, pFunctor );
  1087. if ( pszName )
  1088. {
  1089. ThreadSetDebugName( hThread, pszName );
  1090. }
  1091. return hThread;
  1092. }
  1093. inline ThreadHandle_t ThreadExecuteSolo( CJob *pJob ) { return ThreadExecuteSoloImpl( CreateFunctor( pJob, &CJob::Execute ), pJob->Describe() ); }
  1094. template <typename T1>
  1095. inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1 ), pszName ); }
  1096. template <typename T1, typename T2>
  1097. inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2 ), pszName ); }
  1098. template <typename T1, typename T2, typename T3>
  1099. inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3 ), pszName ); }
  1100. template <typename T1, typename T2, typename T3, typename T4>
  1101. inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4 ), pszName ); }
  1102. template <typename T1, typename T2, typename T3, typename T4, typename T5>
  1103. inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5 ), pszName ); }
  1104. template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
  1105. inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5, a6 ), pszName ); }
  1106. template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7>
  1107. inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5, a6, a7 ), pszName ); }
  1108. template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8>
  1109. inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7, T8 a8 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5, a6, a7, a8 ), pszName ); }
  1110. template <typename T1, typename T2>
  1111. inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2 ), pszName ); }
  1112. template <typename T1, typename T2, typename T3>
  1113. inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3 ), pszName ); }
  1114. template <typename T1, typename T2, typename T3, typename T4>
  1115. inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4 ), pszName ); }
  1116. template <typename T1, typename T2, typename T3, typename T4, typename T5>
  1117. inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5 ), pszName ); }
  1118. template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
  1119. inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6 ), pszName ); }
  1120. template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7>
  1121. inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6, a7 ), pszName ); }
  1122. template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8>
  1123. inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7, T8 a8 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6, a7, a8 ), pszName ); }
  1124. //-----------------------------------------------------------------------------
  1125. inline bool IThreadPool::YieldWait( CThreadEvent &theEvent, unsigned timeout )
  1126. {
  1127. CThreadEvent *pEvent = &theEvent;
  1128. return ( YieldWait( &pEvent, 1, true, timeout ) != TW_TIMEOUT );
  1129. }
  1130. inline bool IThreadPool::YieldWait( CJob *pJob, unsigned timeout )
  1131. {
  1132. return ( YieldWait( &pJob, 1, true, timeout ) != TW_TIMEOUT );
  1133. }
  1134. //-----------------------------------------------------------------------------
  1135. inline JobStatus_t CJob::Execute()
  1136. {
  1137. if ( IsFinished() )
  1138. {
  1139. return m_status;
  1140. }
  1141. AUTO_LOCK( m_mutex );
  1142. AddRef();
  1143. JobStatus_t result;
  1144. switch ( m_status )
  1145. {
  1146. case JOB_STATUS_UNSERVICED:
  1147. case JOB_STATUS_PENDING:
  1148. {
  1149. // Service it
  1150. m_status = JOB_STATUS_INPROGRESS;
  1151. #if defined( THREAD_PARENT_STACK_TRACE_ENABLED )
  1152. //replace thread parent trace with job parent
  1153. {
  1154. CStackTop_ReferenceParentStack stackTop( m_ParentStackTrace, ARRAYSIZE( m_ParentStackTrace ) );
  1155. result = m_status = DoExecute();
  1156. }
  1157. #else
  1158. result = m_status = DoExecute();
  1159. #endif
  1160. DoCleanup();
  1161. m_CompleteEvent.Set();
  1162. break;
  1163. }
  1164. case JOB_STATUS_INPROGRESS:
  1165. AssertMsg(0, "Mutex Should have protected use while processing");
  1166. // fall through...
  1167. case JOB_OK:
  1168. case JOB_STATUS_ABORTED:
  1169. result = m_status;
  1170. break;
  1171. default:
  1172. AssertMsg( m_status < JOB_OK, "Unknown job state");
  1173. result = m_status;
  1174. }
  1175. Release();
  1176. return result;
  1177. }
  1178. //---------------------------------------------------------
  1179. inline JobStatus_t CJob::TryExecute()
  1180. {
  1181. // TryLock() would only fail if another thread has entered
  1182. // Execute() or Abort()
  1183. if ( !IsFinished() && TryLock() )
  1184. {
  1185. // ...service the request
  1186. Execute();
  1187. Unlock();
  1188. }
  1189. return m_status;
  1190. }
  1191. //---------------------------------------------------------
  1192. inline JobStatus_t CJob::Abort( bool bDiscard )
  1193. {
  1194. if ( IsFinished() )
  1195. {
  1196. return m_status;
  1197. }
  1198. AUTO_LOCK( m_mutex );
  1199. AddRef();
  1200. JobStatus_t result;
  1201. switch ( m_status )
  1202. {
  1203. case JOB_STATUS_UNSERVICED:
  1204. case JOB_STATUS_PENDING:
  1205. {
  1206. result = m_status = DoAbort( bDiscard );
  1207. if ( bDiscard )
  1208. DoCleanup();
  1209. m_CompleteEvent.Set();
  1210. }
  1211. break;
  1212. case JOB_STATUS_ABORTED:
  1213. case JOB_STATUS_INPROGRESS:
  1214. case JOB_OK:
  1215. result = m_status;
  1216. break;
  1217. default:
  1218. AssertMsg( m_status < JOB_OK, "Unknown job state");
  1219. result = m_status;
  1220. }
  1221. Release();
  1222. return result;
  1223. }
  1224. //-----------------------------------------------------------------------------
  1225. #endif // JOBTHREAD_H