Team Fortress 2 Source Code as on 22/4/2020
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

430 lines
13 KiB

  1. //========= Copyright Valve Corporation, All rights reserved. ============//
  2. //
  3. // Purpose:
  4. //
  5. // $NoKeywords: $
  6. //=============================================================================
  7. #ifndef GC_JOBMGR_H
  8. #define GC_JOBMGR_H
  9. #ifdef _WIN32
  10. #pragma once
  11. #endif
  12. #include "tier0/fasttimer.h"
  13. #include "tier1/utlpriorityqueue.h"
  14. #include "job.h"
  15. #include "workthreadpool.h"
  16. class GCConVar;
  17. #include "tier0/memdbgon.h"
  18. namespace GCSDK
  19. {
  20. #if defined(_DEBUG)
  21. // this is restricted to debug builds due to the performance cost
  22. // that could be changed by removing the expensive sm_listAllJobs.Find() command
  23. #define DEBUG_JOB_LIST
  24. #endif // defined(_DEBUG)
  25. struct JobStats_t
  26. {
  27. uint m_cJobsCurrent;
  28. uint m_cJobsTotal;
  29. uint m_cJobsFailed;
  30. uint64 m_cJobsTimedOut; // # of jobs timed out ever
  31. double m_flSumJobTimeMicrosec;
  32. double m_flSumSqJobTimeMicrosec;
  33. uint64 m_unMaxJobTimeMicrosec;
  34. uint m_cTimeslices;
  35. JobStats_t()
  36. {
  37. memset( this, 0, sizeof(JobStats_t) );
  38. }
  39. };
  40. struct JobStatsBucket_t
  41. {
  42. JobStatsBucket_t()
  43. {
  44. memset( this, 0, sizeof(JobStatsBucket_t) );
  45. }
  46. char m_rgchName[64];
  47. uint64 m_cCompletes;
  48. uint64 m_u64RunTimeMax;
  49. uint64 m_cTimeoutNetMsg;
  50. uint64 m_cLongInterYieldTime;
  51. uint64 m_cLocksAttempted;
  52. uint64 m_cLocksWaitedFor;
  53. uint64 m_cLocksFailed;
  54. uint64 m_cLocksLongHeld;
  55. uint64 m_cLocksLongWait;
  56. uint64 m_cWaitTimeout;
  57. uint64 m_u64JobDuration;
  58. uint64 m_cJobsPaused;
  59. uint64 m_cJobsFailed;
  60. uint64 m_u64RunTime;
  61. // use by ListJobs
  62. uint64 m_cPauseReasonNetworkMsg;
  63. uint64 m_cPauseReasonSleepForTime;
  64. uint64 m_cPauseReasonWaitingForLock;
  65. uint64 m_cPauseReasonYield;
  66. uint64 m_cPauseReasonSQL;
  67. uint64 m_cPauseReasonWorkItem;
  68. #ifdef DBGFLAG_VALIDATE
  69. void Validate( CValidator &validator, const char *pchName )
  70. {
  71. VALIDATE_SCOPE();
  72. }
  73. #endif
  74. };
  75. enum EJobProfileAction
  76. {
  77. k_EJobProfileAction_ErrorReport = 0,
  78. k_EJobProfileAction_Start = 1,
  79. k_EJobProfileAction_Stop = 2,
  80. k_EJobProfileAction_Dump = 3,
  81. k_EJobProfileAction_Clear = 4,
  82. };
  83. enum EJobProfileSortOrder
  84. {
  85. k_EJobProfileSortOrder_Alpha = 0,
  86. k_EJobProfileSortOrder_Count = 1,
  87. k_EJobProfileSortOrder_TotalRuntime = 2,
  88. };
  89. struct JobProfileStats_t
  90. {
  91. int m_iJobProfileSort;
  92. CUtlMap< uint32, JobStatsBucket_t, int > *pmapStatsBucket;
  93. };
  94. //-----------------------------------------------------------------------------
  95. // Purpose: This keeps track of all jobs that belong to a given hub.
  96. // It's primarily used for routing incoming messages to jobs.
  97. //-----------------------------------------------------------------------------
  98. class CJobMgr
  99. {
  100. public:
  101. // Constructors & destructors
  102. CJobMgr();
  103. ~CJobMgr();
  104. // gets the next available job ID
  105. JobID_t GetNewJobID();
  106. // Set the thread count for the internal thread pool(s)
  107. void SetThreadPoolSize( uint cThreads );
  108. // Run any sleeping jobs who's wakeup time has arrived and check for timeouts
  109. bool BFrameFuncRunSleepingJobs( CLimitTimer &limitTimer );
  110. // Run any yielding jobs, even low priority ones
  111. bool BFrameFuncRunYieldingJobs( CLimitTimer &limitTimer );
  112. // Route this message to an existing Job, or create a new one if that JobID does not exist
  113. bool BRouteMsgToJob( void *pParent, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo );
  114. // Adds a new Job to the mgr and generates a JobID for it.
  115. void InsertJob( CJob &job );
  116. // Removes a Job from the mgr (the caller is still responsible for freeing it)
  117. void RemoveJob( CJob &job );
  118. //called by a job that has just been started to place itself on the yield queue instead of running
  119. void AddDelayedJobToYieldList( CJob &job );
  120. #ifdef GC
  121. // resumes the specified job if it is, in fact, waiting for a SQL query to return
  122. bool BResumeSQLJob( JobID_t jobID );
  123. // yields waiting for a query response
  124. bool BYieldingRunQuery( CJob &job, CGCSQLQueryGroup *pQueryGroup, ESchemaCatalog eSchemaCatalog );
  125. // SQL profiling
  126. enum ESQLProfileSort
  127. {
  128. k_ESQLProfileSortTotalTime,
  129. k_ESQLProfileSortTotalCount,
  130. k_ESQLProfileSortAvgTime,
  131. k_ESQLProfileSortName
  132. };
  133. void StartSQLProfiling();
  134. void StopSQLProfiling();
  135. void DumpSQLProfile( ESQLProfileSort eSort );
  136. #endif
  137. // returns true if we're running any jobs of the specified name
  138. // slow to call if lots of jobs are running, should only be used by tests
  139. bool BIsJobRunning( const char *pchJobName );
  140. // passes a network msg directly to the specified job
  141. void PassMsgToJob( CJob &job, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo );
  142. // yields until a network message is received
  143. bool BYieldingWaitForMsg( CJob &job );
  144. // yields for a set amount of time
  145. bool BYieldingWaitTime( CJob &job, uint32 cMicrosecondsToSleep );
  146. // simple yield until Run() called again
  147. bool BYield( CJob &job );
  148. // Yield only if job manager decides we need to
  149. bool BYieldIfNeeded( CJob &job, bool *pbYielded );
  150. // Thread pool work item
  151. bool BYieldingWaitForWorkItem( CJob &job, const char *pszWorkItemName = NULL );
  152. bool BRouteWorkItemCompleted( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ true ); }
  153. bool BRouteWorkItemCompletedIfExists( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ true ); }
  154. bool BRouteWorkItemCompletedDelayed( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ false ); }
  155. bool BRouteWorkItemCompletedIfExistsDelayed( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ false ); }
  156. void AddThreadedJobWorkItem( CWorkItem *pWorkItem );
  157. void StopWorkThreads() { m_WorkThreadPool.StopWorkThreads(); }
  158. static int ProfileSortFunc( void *pCtx, const int *lhs, const int *rhs );
  159. void ProfileJobs( EJobProfileAction ejobProfileAction, EJobProfileSortOrder iSortOrder = k_EJobProfileSortOrder_Alpha );
  160. int DumpJobSummary();
  161. void DumpJob( JobID_t jobID, int nPrintLocksMax = 20 ) const;
  162. int CountJobs() const; // counts currently active jobs
  163. void CheckThreadID(); // make sure we are still in the correct thread
  164. int CountYieldingJobs() const { return m_ListJobsYieldingRegPri.Count(); } // counts jobs currently in a yielding state
  165. bool HasOutstandingThreadPoolWorkItems();
  166. void SetIsShuttingDown();
  167. bool GetIsShuttingDown() const { return m_bIsShuttingDown; }
  168. void *GetMainMemoryDebugInfo() { return g_memMainDebugInfo.Base(); }
  169. #ifdef DBGFLAG_VALIDATE
  170. void Validate( CValidator &validator, const char *pchName ); // Validate our internal structures
  171. static void ValidateStatics( CValidator &validator, const char *pchName );
  172. #endif /* DBGFLAG_VALIDATE */
  173. // wakes up a job that was waiting on a lock
  174. void WakeupLockedJob( CJob &job );
  175. // returns true if there is a job active with the specified ID
  176. bool BJobExists( JobID_t jobID ) const;
  177. // returns a job
  178. CJob *GetPJob( JobID_t jobID );
  179. const CJob *GetPJob( JobID_t jobID ) const;
  180. JobStats_t& GetJobStats() { return m_JobStats; }
  181. // Access work thread pool directly
  182. CWorkThreadPool *AccessWorkThreadPool() { return &m_WorkThreadPool; }
  183. // Debug helpers
  184. // dumps a list of all running jobs across ALL job managers
  185. void DumpJobs( const char *pszJobName, int nMax, int nPrintLocksMax = 1 ) const;
  186. // cause a debug break in the given job
  187. static void DebugJob( int iJob );
  188. // disable/enable yielding for debugging
  189. void SetPauseAllowed( bool bNewPauseAllowed ) { m_bDebugDisallowPause = !bNewPauseAllowed; }
  190. private:
  191. bool BRouteWorkItemCompletedInternal( JobID_t jobID, bool bWorkItemCanceled, bool bShouldExist, bool bResumeImmediately );
  192. // Create a new job for this message
  193. bool BLaunchJobFromNetworkMsg( void *pParent, const JobMsgInfo_t &jobMsgInfo, IMsgNetPacket *pNetPacket );
  194. // Internal add to yield list (looks at priority)
  195. void AddToYieldList( CJob &job );
  196. // Get an IJob given a job ID and pause reason
  197. bool BGetIJob( JobID_t jobID, EJobPauseReason eJobPauseReason, bool bShouldExist, int *pIJob );
  198. // Map containing all of our jobs
  199. CUtlMap<JobID_t, CJob *, int> m_MapJob;
  200. // jobs simply waiting until the next Run()
  201. struct JobYielding_t
  202. {
  203. JobID_t m_JobID;
  204. uint m_nIteration;
  205. };
  206. CUtlLinkedList<JobYielding_t, int> m_ListJobsYieldingRegPri;
  207. bool BResumeYieldingJobs( CLimitTimer &limitTimer );
  208. bool BResumeYieldingJobsFromList( CUtlLinkedList<JobYielding_t, int> &listJobsYielding, uint nCurrentIteration, CLimitTimer &limitTimer );
  209. uint m_nCurrentYieldIterationRegPri;
  210. // jobs waiting on a timer
  211. struct JobSleeping_t
  212. {
  213. JobID_t m_JobID;
  214. CJobTime m_SWakeupTime;
  215. CJobTime m_STimeTouched;
  216. };
  217. CUtlPriorityQueue<JobSleeping_t> m_QueueJobSleeping;
  218. bool BResumeSleepingJobs( CLimitTimer &limitTimer );
  219. static bool JobSleepingLessFunc( JobSleeping_t const &lhs, JobSleeping_t const &rhs );
  220. // timeout list of jobs, ordered from oldest to newest
  221. struct JobTimeout_t
  222. {
  223. JobID_t m_JobID;
  224. CJobTime m_STimePaused;
  225. CJobTime m_STimeTouched;
  226. uint32 m_cHeartbeatsBeforeTimeout;
  227. };
  228. CUtlLinkedList<JobTimeout_t, int> m_ListJobTimeouts;
  229. CUtlMap<JobID_t, int, int> m_MapJobTimeoutsIndexByJobID;
  230. void PauseJob( CJob &job, EJobPauseReason eJobPauseReason );
  231. void CheckForJobTimeouts( CLimitTimer &limitTimer );
  232. void TimeoutJob( CJob &job );
  233. bool m_bJobTimedOut;
  234. // thread pool usage, for running job functions in other threads
  235. CWorkThreadPool m_WorkThreadPool;
  236. void AccumulateStatsofJob( CJob &job );
  237. void RecordOrphanedMessage( MsgType_t eMsg, JobID_t jobIDTarget );
  238. // stats info
  239. JobStats_t m_JobStats;
  240. // static job registration
  241. static void RegisterJobType( const JobType_t *pJobType );
  242. friend void Job_RegisterJobType( const JobType_t *pJobType );
  243. JobID_t m_unNextJobID;
  244. uint m_unFrameFuncThreadID; // the thread is JobMgr is working in
  245. bool m_bProfiling;
  246. bool m_bIsShuttingDown;
  247. int m_cErrorsToReport;
  248. CUtlMap< uint32, JobStatsBucket_t, int > m_mapStatsBucket;
  249. CUtlMap<MsgType_t, int, int> m_mapOrphanMessages;
  250. CUtlMemory<unsigned char> g_memMainDebugInfo;
  251. #ifdef GC
  252. // sql profiling
  253. bool m_bSQLProfiling;
  254. CFastTimer m_sqlTimer;
  255. struct PendingSQLJob_t
  256. {
  257. int64 m_nStartMicrosec;
  258. int32 m_iBucket;
  259. };
  260. struct SQLProfileBucket_t
  261. {
  262. int64 m_nTotalMicrosec;
  263. uint32 m_unCount;
  264. };
  265. CUtlHashMapLarge<GID_t, PendingSQLJob_t> m_mapSQLQueriesInFlight;
  266. CUtlDict<SQLProfileBucket_t> m_dictSQLBuckets;
  267. struct SQLProfileCtx_t
  268. {
  269. ESQLProfileSort m_eSort;
  270. CUtlDict<SQLProfileBucket_t> *pdictBuckets;
  271. };
  272. static int SQLProfileSortFunc( void *pCtx, const int *lhs, const int *rhs );
  273. #endif
  274. #ifdef DEBUG_JOB_LIST
  275. // static job debug list
  276. static CUtlLinkedList<CJob *, int> sm_listAllJobs;
  277. #endif
  278. bool m_bDebugDisallowPause;
  279. };
  280. //-----------------------------------------------------------------------------
  281. // Purpose: passthrough function just so the CJob internal data can be kept private
  282. //-----------------------------------------------------------------------------
  283. inline void Job_RegisterJobType( const JobType_t *pJobType )
  284. {
  285. CJobMgr::RegisterJobType( pJobType );
  286. }
  287. //-----------------------------------------------------------------------------
  288. // Purpose: passthrough function just so the CJob internal data can be kept private
  289. //-----------------------------------------------------------------------------
  290. inline void Job_SetJobType( CJob &job, const JobType_t *pJobType )
  291. {
  292. job.m_pJobType = pJobType;
  293. }
  294. //-----------------------------------------------------------------------------
  295. // Purpose: job registration macro
  296. //-----------------------------------------------------------------------------
  297. #define GC_REG_JOB( parentclass, jobclass, jobname, msg, servertype ) \
  298. GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \
  299. static const GCSDK::JobType_t g_JobType_##jobclass = { jobname, (GCSDK::MsgType_t)msg, servertype, (GCSDK::JobCreationFunc_t)CreateJob_##jobclass }; \
  300. GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \
  301. { \
  302. GCSDK::CJob *job = GCSDK::CJob::AllocateJob<jobclass>( pParent ); \
  303. if ( job ) \
  304. { \
  305. Job_SetJobType( *job, &g_JobType_##jobclass ); \
  306. if ( pvStartParam ) job->SetStartParam( pvStartParam ); \
  307. } \
  308. else \
  309. { \
  310. AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \
  311. } \
  312. return job; \
  313. } \
  314. static class CRegJob_##jobclass \
  315. { \
  316. public: CRegJob_##jobclass() \
  317. { \
  318. Job_RegisterJobType( &g_JobType_##jobclass ); \
  319. } \
  320. } g_RegJob_##jobclass;
  321. //-----------------------------------------------------------------------------
  322. // Purpose: job registration macro for job triggered by web api request
  323. //-----------------------------------------------------------------------------
  324. #define REG_WEBAPI_JOB( parentclass, jobclass, jobname, servertype ) \
  325. CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \
  326. static const JobType_t g_JobType_##jobclass = { jobname, k_EGCMsgInvalid, servertype, (JobCreationFunc_t)CreateJob_##jobclass }; \
  327. CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \
  328. { \
  329. CJob *job = CJob::AllocateJob<jobclass>( pParent ); \
  330. if ( job ) \
  331. { \
  332. Job_SetJobType( *job, &g_JobType_##jobclass ); \
  333. if ( pvStartParam ) job->SetStartParam( pvStartParam ); \
  334. } \
  335. else \
  336. { \
  337. AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \
  338. } \
  339. return job; \
  340. } \
  341. static class CRegJob_##jobclass \
  342. { \
  343. public: CRegJob_##jobclass() \
  344. { \
  345. Job_RegisterJobType( &g_JobType_##jobclass ); \
  346. } \
  347. } g_RegJob_##jobclass;
  348. } // namespace GCSDK
  349. #include "tier0/memdbgoff.h"
  350. #endif // GC_JOBMGR_H