Team Fortress 2 Source Code as on 22/4/2020
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1626 lines
54 KiB

  1. //========= Copyright Valve Corporation, All rights reserved. ============//
  2. //
  3. // Purpose:
  4. //
  5. // $NoKeywords: $
  6. //=============================================================================
  7. #include "stdafx.h"
  8. // memdbgon must be the last include file in a .cpp file!!!
  9. #include "tier0/memdbgon.h"
  10. namespace GCSDK
  11. {
  12. #ifdef DEBUG_JOB_LIST
  13. CUtlLinkedList<CJob *,int> CJobMgr::sm_listAllJobs;
  14. #endif
  15. typedef int (__cdecl *QSortCompareFuncCtx_t)(void *, const void *, const void *);
  16. //-----------------------------------------------------------------------------
  17. // Purpose: Constructor
  18. //-----------------------------------------------------------------------------
  19. CJobMgr::CJobMgr()
  20. : m_MapJob( 0, 0, DefLessFunc( GID_t ) ),
  21. m_QueueJobSleeping( 0, 0, &JobSleepingLessFunc ),
  22. m_unNextJobID( 0 ),
  23. m_mapStatsBucket( 0, 0, DefLessFunc(uint32) ),
  24. m_WorkThreadPool( "CJobMgr::m_WorkThreadPool" ),
  25. m_bDebugDisallowPause( false )
  26. {
  27. SetDefLessFunc( m_MapJobTimeoutsIndexByJobID );
  28. SetDefLessFunc( m_mapOrphanMessages );
  29. m_bJobTimedOut = false;
  30. m_nCurrentYieldIterationRegPri = 0;
  31. m_bProfiling = false;
  32. m_bIsShuttingDown = false;
  33. m_cErrorsToReport = 0;
  34. m_unFrameFuncThreadID = 0;
  35. m_WorkThreadPool.SetWorkThreadAutoConstruct( 1, NULL );
  36. if( MemAlloc_GetDebugInfoSize() > 0 )
  37. {
  38. g_memMainDebugInfo.Init( 0, MemAlloc_GetDebugInfoSize() );
  39. }
  40. if( MemAlloc_GetDebugInfoSize() > 0 )
  41. {
  42. g_memMainDebugInfo.EnsureCapacity( MemAlloc_GetDebugInfoSize() );
  43. }
  44. }
  45. //-----------------------------------------------------------------------------
  46. // Purpose: Constructor
  47. //-----------------------------------------------------------------------------
  48. CJobMgr::~CJobMgr()
  49. {
  50. m_WorkThreadPool.StopWorkThreads();
  51. }
  52. //-----------------------------------------------------------------------------
  53. // Purpose: limit the size of our thread pool
  54. //-----------------------------------------------------------------------------
  55. void CJobMgr::SetThreadPoolSize( uint cThreads )
  56. {
  57. m_WorkThreadPool.SetWorkThreadAutoConstruct( cThreads, NULL );
  58. }
  59. //-----------------------------------------------------------------------------
  60. // Purpose: gets the next available job ID
  61. //-----------------------------------------------------------------------------
  62. JobID_t CJobMgr::GetNewJobID()
  63. {
  64. #ifdef GC
  65. return GGCHost()->GenerateGID();
  66. #else
  67. return ++m_unNextJobID;
  68. #endif
  69. }
  70. //-----------------------------------------------------------------------------
  71. // Purpose: Run jobs
  72. // Runs once per frame and resumes any sleeping jobs that are scheduled
  73. // to run again, also checks for jobs which have timed out.
  74. //
  75. // Input: limitTimer - limit timer not to exceed
  76. // Output: true if there is still work remaining to do, false otherwise
  77. //-----------------------------------------------------------------------------
  78. bool CJobMgr::BFrameFuncRunSleepingJobs( CLimitTimer &limitTimer )
  79. {
  80. CheckThreadID(); // make sure frame function is called from correct thread
  81. bool bWorkRemaining = false;
  82. {
  83. VPROF_BUDGET( "CJobMgr::BResumeSleepingJobs", VPROF_BUDGETGROUP_JOBS_COROUTINES );
  84. bWorkRemaining |= BResumeSleepingJobs( limitTimer );
  85. }
  86. {
  87. VPROF_BUDGET( "CJobMgr::CheckForJobTimeouts", VPROF_BUDGETGROUP_JOBS_COROUTINES );
  88. CheckForJobTimeouts( limitTimer );
  89. }
  90. m_JobStats.m_cJobsCurrent = CountJobs();
  91. return bWorkRemaining;
  92. }
  93. //-----------------------------------------------------------------------------
  94. // Purpose: Run jobs
  95. // This function is called repeatedly in a single frame if time is left
  96. // and will first run any yielding jobs
  97. // Input: limitTimer - limit timer not to exceed
  98. // Output: true if there is still work remaining to do, false otherwise
  99. //-----------------------------------------------------------------------------
  100. bool CJobMgr::BFrameFuncRunYieldingJobs( CLimitTimer &limitTimer )
  101. {
  102. CheckThreadID(); // make sure frame function is called from correct thread
  103. bool bWorkRemaining = false;
  104. {
  105. VPROF_BUDGET( "CJobMgr::BResumeYieldingJobs", VPROF_BUDGETGROUP_JOBS_COROUTINES );
  106. bWorkRemaining |= BResumeYieldingJobs( limitTimer );
  107. }
  108. {
  109. VPROF_BUDGET( "CJobMgr -- Dispatch completed work items", VPROF_BUDGETGROUP_JOBS_COROUTINES );
  110. bWorkRemaining |= m_WorkThreadPool.BDispatchCompletedWorkItems( limitTimer, this );
  111. }
  112. m_JobStats.m_cJobsCurrent = CountJobs();
  113. return bWorkRemaining;
  114. }
  115. //-----------------------------------------------------------------------------
  116. // Purpose: Registers a new job for us to keep track of.
  117. // Input: job - The job in question
  118. //-----------------------------------------------------------------------------
  119. void CJobMgr::InsertJob( CJob &job )
  120. {
  121. Assert( m_MapJob.Find( job.GetJobID() ) == m_MapJob.InvalidIndex() );
  122. m_MapJob.Insert( job.GetJobID(), &job );
  123. #ifdef DEBUG_JOB_LIST
  124. sm_listAllJobs.AddToTail( &job );
  125. #endif
  126. }
  127. //-----------------------------------------------------------------------------
  128. // purpose: This job is done, accumulate its stats
  129. //-----------------------------------------------------------------------------
  130. void CJobMgr::AccumulateStatsofJob( CJob &job )
  131. {
  132. // if we are not profiling, but the job experienced some kind of failure
  133. // record it anyway - we will issue a consolidated spew about it
  134. if ( !m_bProfiling && job.m_flags.m_uFlags == 0 )
  135. return;
  136. if ( job.m_flags.m_uFlags )
  137. m_cErrorsToReport++;
  138. job.m_FastTimerDelta.End();
  139. job.m_cyclecountTotal += job.m_FastTimerDelta.GetDuration();
  140. uint32 eBucket = 0;
  141. // the pointer to the name is a pointer to a constant string
  142. // so use this dirty trick to make lookups fast
  143. eBucket = (uint32)job.GetName();
  144. int iBucket = m_mapStatsBucket.Find( eBucket );
  145. if ( iBucket == m_mapStatsBucket.InvalidIndex() )
  146. {
  147. iBucket = m_mapStatsBucket.Insert( eBucket );
  148. V_strcpy_safe( m_mapStatsBucket[iBucket].m_rgchName, job.GetName() );
  149. }
  150. JobStatsBucket_t *pJobStatsBucket = &m_mapStatsBucket[iBucket];
  151. pJobStatsBucket->m_cCompletes++;
  152. pJobStatsBucket->m_cLocksAttempted += job.m_cLocksAttempted;
  153. pJobStatsBucket->m_cLocksWaitedFor += job.m_cLocksWaitedFor;
  154. pJobStatsBucket->m_cLocksFailed += job.m_flags.m_bits.m_bLocksFailed ? 1 : 0;
  155. pJobStatsBucket->m_cLocksLongHeld += job.m_flags.m_bits.m_bLocksLongHeld ? 1 : 0;
  156. pJobStatsBucket->m_cLocksLongWait += job.m_flags.m_bits.m_bLocksLongWait ? 1 : 0;
  157. pJobStatsBucket->m_cWaitTimeout += job.m_flags.m_bits.m_bWaitTimeout ? 1 : 0;
  158. pJobStatsBucket->m_cJobsFailed += job.m_flags.m_bits.m_bJobFailed ? 1 : 0;
  159. pJobStatsBucket->m_cLongInterYieldTime += job.m_flags.m_bits.m_bLongInterYield ? 1 : 0;
  160. pJobStatsBucket->m_cTimeoutNetMsg += job.m_flags.m_bits.m_bTimeoutNetMsg ? 1 : 0;
  161. pJobStatsBucket->m_u64RunTime += job.m_cyclecountTotal.GetLongCycles();
  162. if ( (uint64)job.m_cyclecountTotal.GetLongCycles() > pJobStatsBucket->m_u64RunTimeMax )
  163. pJobStatsBucket->m_u64RunTimeMax = job.m_cyclecountTotal.GetLongCycles();
  164. if ( job.m_STimeSwitched != job.m_STimeStarted )
  165. {
  166. pJobStatsBucket->m_cJobsPaused++;
  167. pJobStatsBucket->m_u64JobDuration += job.m_STimeStarted.CServerMicroSecsPassed();
  168. }
  169. else
  170. {
  171. pJobStatsBucket->m_u64JobDuration += job.m_cyclecountTotal.GetMicroseconds();
  172. }
  173. }
  174. //-----------------------------------------------------------------------------
  175. // purpose: This message was orphaned, accumulate for stats
  176. //-----------------------------------------------------------------------------
  177. void CJobMgr::RecordOrphanedMessage( MsgType_t eMsg, JobID_t jobIDTarget )
  178. {
  179. EG_MSG( SPEW_JOB, "Message %s arrived responding to job %lld which no longer exists, dropping message\n", PchMsgNameFromEMsg( eMsg ), jobIDTarget );
  180. int iBucket = m_mapOrphanMessages.Find( eMsg );
  181. if ( iBucket == m_mapOrphanMessages.InvalidIndex() )
  182. {
  183. int ct = 0;
  184. iBucket = m_mapOrphanMessages.Insert( eMsg, ct );
  185. }
  186. m_mapOrphanMessages[iBucket]++;
  187. }
  188. //-----------------------------------------------------------------------------
  189. // Purpose: Removes a job from the manager. Note that we don't free it.
  190. // Input: job - The job in question
  191. //-----------------------------------------------------------------------------
  192. void CJobMgr::RemoveJob( CJob &job )
  193. {
  194. m_MapJob.Remove( job.GetJobID() );
  195. AccumulateStatsofJob( job );
  196. m_JobStats.m_cJobsTotal++;
  197. if ( job.m_flags.m_bits.m_bJobFailed )
  198. m_JobStats.m_cJobsFailed++;
  199. uint64 u64JobDuration = job.m_STimeStarted.CServerMicroSecsPassed();
  200. m_JobStats.m_flSumJobTimeMicrosec += u64JobDuration;
  201. m_JobStats.m_flSumSqJobTimeMicrosec += ((double)u64JobDuration * (double)u64JobDuration);
  202. if ( u64JobDuration > m_JobStats.m_unMaxJobTimeMicrosec )
  203. {
  204. m_JobStats.m_unMaxJobTimeMicrosec = u64JobDuration;
  205. }
  206. #ifdef DEBUG_JOB_LIST
  207. sm_listAllJobs.FindAndRemove( &job );
  208. #endif
  209. }
  210. #ifdef GC
  211. //-----------------------------------------------------------------------------
  212. // Purpose: resumes the specified job if it is, in fact, waiting for a SQL query
  213. // to return
  214. //-----------------------------------------------------------------------------
  215. bool CJobMgr::BResumeSQLJob( JobID_t jobID )
  216. {
  217. int iMap = m_mapSQLQueriesInFlight.Find( jobID );
  218. if ( m_mapSQLQueriesInFlight.IsValidIndex( iMap ) )
  219. {
  220. if ( m_bSQLProfiling && m_dictSQLBuckets.IsValidIndex( m_mapSQLQueriesInFlight[iMap].m_iBucket ) )
  221. {
  222. SQLProfileBucket_t &bucket = m_dictSQLBuckets[ m_mapSQLQueriesInFlight[iMap].m_iBucket ];
  223. bucket.m_unCount++;
  224. bucket.m_nTotalMicrosec += (int64)m_sqlTimer.GetDurationInProgress().GetUlMicroseconds() - m_mapSQLQueriesInFlight[iMap].m_nStartMicrosec;
  225. }
  226. m_mapSQLQueriesInFlight.RemoveAt( iMap );
  227. }
  228. int iJob;
  229. if ( !BGetIJob( jobID, k_EJobPauseReasonSQL, true, &iJob ) )
  230. {
  231. EG_MSG( SPEW_JOB, "BResumeSQLJob called for a job that could not be found!\n" );
  232. return false;
  233. }
  234. // Just change the job's pause reason and add it to the yield list
  235. // it will wake up on the next heartbeat
  236. m_MapJob[iJob]->EndPause( k_EJobPauseReasonSQL );
  237. AddToYieldList( *m_MapJob[iJob] );
  238. return true;
  239. }
  240. #endif
  241. //-----------------------------------------------------------------------------
  242. // Purpose: returns true if we're running any jobs of the specified name
  243. // Output : Returns true on success, false on failure.
  244. //-----------------------------------------------------------------------------
  245. bool CJobMgr::BIsJobRunning( const char *pchJobName )
  246. {
  247. FOR_EACH_MAP_FAST( m_MapJob, i )
  248. {
  249. if ( !Q_stricmp( m_MapJob[i]->GetName(), pchJobName ) )
  250. return true;
  251. }
  252. return false;
  253. }
  254. //-----------------------------------------------------------------------------
  255. // Purpose: returns true if there is a job active with the specified ID
  256. //-----------------------------------------------------------------------------
  257. bool CJobMgr::BJobExists( JobID_t jobID ) const
  258. {
  259. return ( m_MapJob.Find( jobID ) != m_MapJob.InvalidIndex() );
  260. }
  261. //-----------------------------------------------------------------------------
  262. // Purpose: returns a job pointer by id
  263. //-----------------------------------------------------------------------------
  264. const CJob *CJobMgr::GetPJob( JobID_t jobID ) const
  265. {
  266. int iMap = m_MapJob.Find( jobID );
  267. if ( iMap != m_MapJob.InvalidIndex() )
  268. {
  269. return m_MapJob[iMap];
  270. }
  271. return NULL;
  272. }
  273. CJob *CJobMgr::GetPJob( JobID_t jobID )
  274. {
  275. int iMap = m_MapJob.Find( jobID );
  276. if ( iMap != m_MapJob.InvalidIndex() )
  277. {
  278. return m_MapJob[iMap];
  279. }
  280. return NULL;
  281. }
  282. //-----------------------------------------------------------------------------
  283. // Purpose: Examines an incoming message to see if it belongs to an active job,
  284. // and if so, sends it to that job. Creates a new job if necessary.
  285. // Output: true if the message was routed to a job
  286. //-----------------------------------------------------------------------------
  287. bool CJobMgr::BRouteMsgToJob( void *pParent, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo )
  288. {
  289. if ( pNetPacket == NULL )
  290. {
  291. AssertMsg(pNetPacket, "CJobMgr::BRouteMsgToJob received NULL packet.");
  292. return false;
  293. }
  294. if ( jobMsgInfo.m_JobIDTarget != k_GIDNil )
  295. {
  296. // This message is a reply to a running job
  297. VPROF_BUDGET( "CJobMgr::BRouteMsgToJob() - continue job", VPROF_BUDGETGROUP_JOBS_COROUTINES );
  298. // Find the job that this packet is destined for
  299. int iJob = m_MapJob.Find( jobMsgInfo.m_JobIDTarget );
  300. if ( m_MapJob.InvalidIndex() != iJob )
  301. {
  302. // found the right job, pass it off
  303. PassMsgToJob( *(m_MapJob[iJob]), pNetPacket, jobMsgInfo );
  304. return true;
  305. }
  306. // The job is no longer running, it most likely timed out before the response arrived.
  307. // Continue and see if a job is registered to launch from this message
  308. }
  309. // no job, so try creating a job that can handle the msg
  310. // We pass in a pointer to m_JobIDTarget so that it gets set to the new Job's ID. This ensures
  311. // that anyone replying to this message from within the new job has the right JobIDSource.
  312. VPROF_BUDGET( "CJobMgr::BRouteMsgToJob() - job", VPROF_BUDGETGROUP_JOBS_COROUTINES );
  313. bool bRet = BLaunchJobFromNetworkMsg( pParent, jobMsgInfo, pNetPacket );
  314. if ( !bRet && jobMsgInfo.m_JobIDTarget != k_GIDNil )
  315. {
  316. RecordOrphanedMessage( jobMsgInfo.m_eMsg, jobMsgInfo.m_JobIDTarget );
  317. // return that we've handled this message (as much as it possibly can be) -- was intended for a job that has
  318. // timed out, no one else can do anything with it
  319. return true;
  320. }
  321. return bRet;
  322. }
  323. //-----------------------------------------------------------------------------
  324. // Purpose: Routes a message directly to the specified job
  325. //-----------------------------------------------------------------------------
  326. void CJobMgr::PassMsgToJob( CJob &job, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo )
  327. {
  328. // Check if this job previously failed to wait for this message type,
  329. // then this is probably a late reply. Discard it
  330. if ( job.BHasFailedToReceivedMsgType( jobMsgInfo.m_eMsg ) )
  331. {
  332. EmitInfo( SPEW_JOB, 2, LOG_ALWAYS, "Reply msg type %s to job %s is too late; discarding\n", PchMsgNameFromEMsg( jobMsgInfo.m_eMsg ), job.GetName() );
  333. return;
  334. }
  335. // make sure it's what we're waiting for
  336. if ( job.GetPauseReason() != k_EJobPauseReasonNetworkMsg )
  337. {
  338. AssertMsg3( false, "CJobMgr::PassMsgToJob() job %s received unexpected message %s when paused for %s\n", job.GetName(), PchMsgNameFromEMsg( jobMsgInfo.m_eMsg ), job.GetPauseReasonDescription() );
  339. }
  340. // In case of error, we need to throw this message away
  341. if ( job.GetPauseReason() != k_EJobPauseReasonNetworkMsg )
  342. return;
  343. // Add the packet and resume the job
  344. job.AddPacketToList( pNetPacket, jobMsgInfo.m_JobIDSource );
  345. job.EndPause( k_EJobPauseReasonNetworkMsg );
  346. AddToYieldList( job );
  347. return;
  348. }
  349. //-----------------------------------------------------------------------------
  350. // Purpose: pauses the job until a network msg for the specified job arrives
  351. //-----------------------------------------------------------------------------
  352. bool CJobMgr::BYieldingWaitForMsg( CJob &job )
  353. {
  354. // wait until we're woken up by a networking callback, or a timeout
  355. PauseJob( job, k_EJobPauseReasonNetworkMsg );
  356. return !m_bJobTimedOut;
  357. }
  358. //-----------------------------------------------------------------------------
  359. // Purpose: Returns IJob matching a JobID, if it is paused for the given reason
  360. // Input: jobID - The job that should be paused for the given reason
  361. // eJobPauseReason - Pause reason
  362. // bShouldExist - If true, job should exist, so asserts on not finding it ok
  363. // pIJob - IJob to fill in
  364. // Output: true if job paused for matching reason found
  365. //-----------------------------------------------------------------------------
  366. bool CJobMgr::BGetIJob( JobID_t jobID, EJobPauseReason eJobPauseReason, bool bShouldExist, int *pIJob )
  367. {
  368. // If this isn't owned by a job, we don't handle it
  369. if ( k_GIDNil == jobID )
  370. return false;
  371. // Figure out which job the msg belongs to
  372. int iJob = m_MapJob.Find( jobID );
  373. Assert( m_MapJob.InvalidIndex() != iJob || !bShouldExist );
  374. // If it's not one of ours, ignore it
  375. if ( m_MapJob.InvalidIndex() == iJob )
  376. return false;
  377. // make sure it's what we're waiting for
  378. if ( m_MapJob[iJob]->GetPauseReason() != eJobPauseReason )
  379. return false;
  380. *pIJob = iJob;
  381. return true;
  382. }
  383. //-----------------------------------------------------------------------------
  384. // Purpose: yields for a set amount of time
  385. // Input : &job - job that is yielding
  386. // m_cMicrosecondsToSleep - number of microseconds to wait for before resuming job
  387. // Output : Returns true on success, false on failure.
  388. //-----------------------------------------------------------------------------
  389. bool CJobMgr::BYieldingWaitTime( CJob &job, uint32 cMicrosecondsToSleep )
  390. {
  391. Assert( cMicrosecondsToSleep < k_cMicroSecJobPausedTimeout );
  392. // sleep of zero causes an infinite loop
  393. Assert( 0 != cMicrosecondsToSleep );
  394. #ifdef _DEBUG
  395. for ( int i = 0; i < m_QueueJobSleeping.Count(); i++ )
  396. {
  397. Assert( m_QueueJobSleeping.Element(i).m_JobID != job.GetJobID() );
  398. }
  399. #endif
  400. // insert the job into the sleep list
  401. JobSleeping_t jobSleeping;
  402. jobSleeping.m_JobID = job.GetJobID();
  403. jobSleeping.m_SWakeupTime.SetFromJobTime( cMicrosecondsToSleep );
  404. jobSleeping.m_STimeTouched.SetToJobTime();
  405. m_QueueJobSleeping.Insert( jobSleeping );
  406. // yield
  407. PauseJob( job, k_EJobPauseReasonSleepForTime );
  408. if ( m_bJobTimedOut )
  409. return false;
  410. return true;
  411. }
  412. #ifdef GC
  413. //-----------------------------------------------------------------------------
  414. // Purpose: yields waiting for a query response
  415. // Input : &job - job that is yielding
  416. // Output : Returns true on success, false on failure.
  417. //-----------------------------------------------------------------------------
  418. // yields waiting for a query response
  419. bool CJobMgr::BYieldingRunQuery( CJob &job, CGCSQLQueryGroup *pQueryGroup, ESchemaCatalog eSchemaCatalog )
  420. {
  421. // clear the existing results pointer, if any, to make space for the results
  422. // for this query
  423. pQueryGroup->SetResults( NULL );
  424. if ( m_bSQLProfiling )
  425. {
  426. const char *pchName = pQueryGroup->PchName();
  427. if ( !pchName || !pchName[0] )
  428. {
  429. if ( pQueryGroup->GetStatementCount() == 1 )
  430. {
  431. pchName = pQueryGroup->PchCommand( 0 );
  432. }
  433. if ( !pchName || !pchName[0] )
  434. {
  435. pchName = job.GetName();
  436. }
  437. }
  438. PendingSQLJob_t sqlJob;
  439. sqlJob.m_nStartMicrosec = (int64)m_sqlTimer.GetDurationInProgress().GetUlMicroseconds();
  440. sqlJob.m_iBucket = m_dictSQLBuckets.Find( pchName );
  441. if ( !m_dictSQLBuckets.IsValidIndex( sqlJob.m_iBucket ) )
  442. {
  443. SQLProfileBucket_t bucket = { 0, 0 };
  444. sqlJob.m_iBucket = m_dictSQLBuckets.Insert( pchName, bucket );
  445. }
  446. m_mapSQLQueriesInFlight.Insert( job.GetJobID(), sqlJob );
  447. }
  448. VPROF_BUDGET( "GCHost", VPROF_BUDGETGROUP_STEAM );
  449. {
  450. VPROF_BUDGET( "GCHost - SQLQuery", VPROF_BUDGETGROUP_STEAM );
  451. GGCHost()->SQLQuery( job.GetJobID(), pQueryGroup, eSchemaCatalog );
  452. }
  453. PauseJob( job, k_EJobPauseReasonSQL );
  454. return pQueryGroup->GetResults() && pQueryGroup->GetResults()->GetError() == k_EGCSQLErrorNone;
  455. }
  456. //-----------------------------------------------------------------------------
  457. // Purpose: turns on sql profiling
  458. //-----------------------------------------------------------------------------
  459. void CJobMgr::StartSQLProfiling()
  460. {
  461. if ( m_bSQLProfiling )
  462. return;
  463. m_mapSQLQueriesInFlight.RemoveAll();
  464. m_dictSQLBuckets.RemoveAll();
  465. m_sqlTimer.Start();
  466. m_bSQLProfiling = true;
  467. }
  468. //-----------------------------------------------------------------------------
  469. // Purpose: turns off sql profiling
  470. //-----------------------------------------------------------------------------
  471. void CJobMgr::StopSQLProfiling()
  472. {
  473. if ( !m_bSQLProfiling )
  474. return;
  475. m_mapSQLQueriesInFlight.RemoveAll();
  476. m_sqlTimer.End();
  477. m_bSQLProfiling = false;
  478. }
  479. //-----------------------------------------------------------------------------
  480. // Purpose: sql profile sort func
  481. //-----------------------------------------------------------------------------
  482. int CJobMgr::SQLProfileSortFunc( void *pCtx, const int *lhs, const int *rhs )
  483. {
  484. SQLProfileCtx_t *pSQLProfileCtx = (SQLProfileCtx_t *)pCtx;
  485. CUtlDict<SQLProfileBucket_t> *pDictBuckets = pSQLProfileCtx->pdictBuckets;
  486. SQLProfileBucket_t &lhsBucket = pDictBuckets->Element( *lhs );
  487. SQLProfileBucket_t &rhsBucket = pDictBuckets->Element( *rhs );
  488. switch ( pSQLProfileCtx->m_eSort )
  489. {
  490. default:
  491. case k_ESQLProfileSortTotalTime: return rhsBucket.m_nTotalMicrosec - lhsBucket.m_nTotalMicrosec;
  492. case k_ESQLProfileSortTotalCount: return rhsBucket.m_unCount - lhsBucket.m_unCount;
  493. case k_ESQLProfileSortAvgTime: return ( rhsBucket.m_nTotalMicrosec / rhsBucket.m_unCount ) - ( lhsBucket.m_nTotalMicrosec / lhsBucket.m_unCount );
  494. case k_ESQLProfileSortName: return Q_stricmp( pDictBuckets->GetElementName( *lhs ), pDictBuckets->GetElementName( *rhs ) );
  495. }
  496. }
  497. //-----------------------------------------------------------------------------
  498. // Purpose: dumps the current sql profile
  499. //-----------------------------------------------------------------------------
  500. void CJobMgr::DumpSQLProfile( ESQLProfileSort eSort )
  501. {
  502. CUtlVector<int> vecSort;
  503. for ( int iDict = 0; iDict < m_dictSQLBuckets.MaxElement(); iDict++ )
  504. {
  505. if ( !m_dictSQLBuckets.IsValidIndex( iDict ) )
  506. continue;
  507. if ( m_dictSQLBuckets[iDict].m_unCount > 0 )
  508. {
  509. vecSort.AddToTail( iDict );
  510. }
  511. }
  512. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "SQL statement stats:\n" );
  513. if ( 0 == vecSort.Count() )
  514. {
  515. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "\tNo SQL stats collected; use sql_profile_on / sql_profile_off to collect stats first\n" );
  516. return;
  517. }
  518. // sort
  519. SQLProfileCtx_t ctx;
  520. ctx.m_eSort = eSort;
  521. ctx.pdictBuckets = &m_dictSQLBuckets;
  522. V_qsort_s( vecSort.Base(), vecSort.Count(), sizeof(int), (QSortCompareFuncCtx_t)SQLProfileSortFunc, &ctx );
  523. // display
  524. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%8s %8s %8s\n", "count", "time", "avg" );
  525. FOR_EACH_VEC( vecSort, i )
  526. {
  527. SQLProfileBucket_t &bucket = m_dictSQLBuckets[ vecSort[i] ];
  528. const char *pchStatement = m_dictSQLBuckets.GetElementName( vecSort[i] );
  529. // cleanup the statement text
  530. char rgchCleaned[140];
  531. V_strcpy_safe( rgchCleaned, pchStatement );
  532. for ( int i = 0; NULL != rgchCleaned[i]; i++ )
  533. {
  534. if ( '\n' == rgchCleaned[i] || '\t' == rgchCleaned[i] )
  535. {
  536. rgchCleaned[i] = ' ';
  537. }
  538. }
  539. bool bSeconds = bucket.m_nTotalMicrosec > k_nMillion;
  540. float fTime = bucket.m_nTotalMicrosec / 1000.0f / ( bSeconds ? 1000.0f : 1.0f );
  541. // render
  542. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%8d %8.2f%s %8.2f %s\n",
  543. bucket.m_unCount,
  544. fTime,
  545. bSeconds ? "s " : "ms",
  546. (float)bucket.m_nTotalMicrosec / (float)bucket.m_unCount / 1000.0f,
  547. rgchCleaned );
  548. }
  549. }
  550. #endif
  551. //-----------------------------------------------------------------------------
  552. // Purpose: pauses job until a work item completes
  553. //-----------------------------------------------------------------------------
  554. bool CJobMgr::BYieldingWaitForWorkItem( CJob &job, const char *pszWorkItemName )
  555. {
  556. // wait until we're woken up by a work item completed, or a timeout
  557. PauseJob( job, k_EJobPauseReasonWorkItem );
  558. if ( m_bJobTimedOut || job.m_bWorkItemCanceled )
  559. return false;
  560. return true;
  561. }
  562. //-----------------------------------------------------------------------------
  563. // Purpose: adds a job work item to the thread pool
  564. //-----------------------------------------------------------------------------
  565. void CJobMgr::AddThreadedJobWorkItem( CWorkItem *pWorkItem )
  566. {
  567. m_WorkThreadPool.AddWorkItem( pWorkItem );
  568. }
  569. //-----------------------------------------------------------------------------
  570. // Purpose: returns true if we're still working
  571. //-----------------------------------------------------------------------------
  572. bool CJobMgr::HasOutstandingThreadPoolWorkItems()
  573. {
  574. return m_WorkThreadPool.HasWorkItemsToProcess();
  575. }
  576. //-----------------------------------------------------------------------------
  577. // Purpose: Mark that we're shutting down
  578. //-----------------------------------------------------------------------------
  579. void CJobMgr::SetIsShuttingDown()
  580. {
  581. m_WorkThreadPool.AllowTimeouts( true ); // during shutdown, we might abort jobs before waiting for the work item to complete
  582. m_bIsShuttingDown = true;
  583. }
  584. //-----------------------------------------------------------------------------
  585. // Purpose: Wakes up the specified waiting job.
  586. // Input: jobID - The job that owns this work item
  587. // bWorkItemCanceled - true if this job
  588. // bShouldExist - Do we assert if the job doesn't exist?
  589. // Output: true if the message was routed to a job
  590. //-----------------------------------------------------------------------------
  591. bool CJobMgr::BRouteWorkItemCompletedInternal( JobID_t jobID, bool bWorkItemCanceled, bool bShouldExist, bool bResumeImmediately )
  592. {
  593. int iJob;
  594. // this can resume jobs, make sure we didn't switch threads
  595. CheckThreadID();
  596. if ( !BGetIJob( jobID, k_EJobPauseReasonWorkItem, bShouldExist, &iJob ) )
  597. {
  598. EG_MSG( SPEW_JOB, "BRouteWorkItemCompleted called for a job that could not be found!\n" );
  599. return false;
  600. }
  601. // continue the job
  602. m_MapJob[iJob]->m_bWorkItemCanceled = bWorkItemCanceled;
  603. if ( bResumeImmediately )
  604. {
  605. m_MapJob[iJob]->Continue();
  606. }
  607. else
  608. {
  609. AddToYieldList( *m_MapJob[iJob] );
  610. // reset the sleep reason
  611. m_MapJob[iJob]->m_ePauseReason = k_EJobPauseReasonYield;
  612. }
  613. return true;
  614. }
  615. //-----------------------------------------------------------------------------
  616. // Purpose: Adds job to yield list (without actually pausing it) - internal
  617. // Input : &job - job that is yielding
  618. // Output : Returns true on success, false on failure.
  619. //-----------------------------------------------------------------------------
  620. void CJobMgr::AddToYieldList( CJob &job )
  621. {
  622. #ifdef _DEBUG
  623. FOR_EACH_LL( m_ListJobsYieldingRegPri, i )
  624. {
  625. Assert( m_ListJobsYieldingRegPri[i].m_JobID != job.GetJobID() );
  626. }
  627. #endif
  628. // insert the job into the sleep list
  629. JobYielding_t jobYielding;
  630. jobYielding.m_JobID = job.GetJobID();
  631. jobYielding.m_nIteration = m_nCurrentYieldIterationRegPri;
  632. m_ListJobsYieldingRegPri.AddToTail( jobYielding );
  633. }
  634. //-----------------------------------------------------------------------------
  635. // called by a job that has just been started to place itself on the yield queue instead of running
  636. //-----------------------------------------------------------------------------
  637. void CJobMgr::AddDelayedJobToYieldList( CJob &job )
  638. {
  639. //make sure that this job is setup to be yielded at this point, otherwise it will not resume properly
  640. AssertMsg1( job.GetPauseReason() == k_EJobPauseReasonYield, "Delayed job %s was added to yield list but was not in expected yield state\n", job.GetName() );
  641. AddToYieldList( job );
  642. }
  643. //-----------------------------------------------------------------------------
  644. // Purpose: yields until the next Run()
  645. // Input : &job - job that is yielding
  646. // Output : Returns true on success, false on failure.
  647. //-----------------------------------------------------------------------------
  648. bool CJobMgr::BYield( CJob &job )
  649. {
  650. AddToYieldList( job );
  651. // yield
  652. PauseJob( job, k_EJobPauseReasonYield );
  653. if ( m_bJobTimedOut )
  654. return false;
  655. return true;
  656. }
  657. //-----------------------------------------------------------------------------
  658. // Purpose: yields IF NEEDED until the next Run()
  659. // Input : &job - job that is possibly yielding
  660. // pbYielded - optional, set to true if we did yield
  661. // Output : Returns true on success, false on failure.
  662. //-----------------------------------------------------------------------------
  663. bool CJobMgr::BYieldIfNeeded( CJob &job, bool *pbYielded )
  664. {
  665. if ( pbYielded )
  666. *pbYielded = false;
  667. if ( job.GetMicrosecondsRun() > ( k_cMicroSecTaskGranularity / 2 ) )
  668. {
  669. bool bRet = BYield( job );
  670. if ( pbYielded )
  671. *pbYielded = bRet;
  672. return bRet;
  673. }
  674. return true;
  675. }
  676. //-----------------------------------------------------------------------------
  677. // Purpose: Resumes jobs in list passed in that are ready to be awakened
  678. //-----------------------------------------------------------------------------
  679. bool CJobMgr::BResumeYieldingJobsFromList( CUtlLinkedList<JobYielding_t, int> &listJobsYielding, uint nCurrentIteration,
  680. CLimitTimer &limitTimer )
  681. {
  682. while ( listJobsYielding.Count() )
  683. {
  684. int iJobYielding = listJobsYielding.Head();
  685. const JobYielding_t &jobYielding = listJobsYielding[ iJobYielding ];
  686. if ( jobYielding.m_nIteration > nCurrentIteration )
  687. break;
  688. // pop the sleep off the top of the queue
  689. int iJob = m_MapJob.Find( jobYielding.m_JobID );
  690. listJobsYielding.Remove( iJobYielding );
  691. if ( m_MapJob.InvalidIndex() == iJob )
  692. continue;
  693. Assert( m_MapJob[iJob]->GetPauseReason() == k_EJobPauseReasonYield );
  694. // Should never be false, but if it is we
  695. // don't want to do anything to this job
  696. if ( m_MapJob[iJob]->GetPauseReason() == k_EJobPauseReasonYield )
  697. {
  698. // resume the job
  699. m_MapJob[iJob]->Continue();
  700. }
  701. if ( limitTimer.BLimitReached() )
  702. break;
  703. }
  704. return ( listJobsYielding.Count() > 0 );
  705. }
  706. //-----------------------------------------------------------------------------
  707. // Purpose: Resumes any jobs that have are ready to be awaken
  708. // Input: limitTimer - limit timer not to exceed
  709. // Output: true if there is still work remaining to do, false otherwise
  710. //-----------------------------------------------------------------------------
  711. bool CJobMgr::BResumeYieldingJobs( CLimitTimer &limitTimer )
  712. {
  713. return BResumeYieldingJobsFromList( m_ListJobsYieldingRegPri, m_nCurrentYieldIterationRegPri++, limitTimer );
  714. }
  715. //-----------------------------------------------------------------------------
  716. // Purpose: Resumes any jobs that have are ready to be awaken
  717. // Input: limitTimer - limit timer not to exceed
  718. // Output: true if there is still work remaining to do, false otherwise
  719. //-----------------------------------------------------------------------------
  720. bool CJobMgr::BResumeSleepingJobs( CLimitTimer &limitTimer )
  721. {
  722. while ( m_QueueJobSleeping.Count() )
  723. {
  724. const JobSleeping_t &jobSleeping = m_QueueJobSleeping.ElementAtHead();
  725. if ( jobSleeping.m_SWakeupTime.LTime() > CJobTime::LJobTimeCur() )
  726. {
  727. // Check if we need to heartbeat
  728. if ( jobSleeping.m_STimeTouched.CServerMicroSecsPassed() >= k_cMicroSecJobHeartbeat )
  729. {
  730. int iJob = m_MapJob.Find( jobSleeping.m_JobID );
  731. if ( m_MapJob.InvalidIndex() != iJob )
  732. {
  733. m_MapJob[iJob]->Heartbeat();
  734. }
  735. }
  736. return false;
  737. }
  738. // pop the sleep off the top of the queue
  739. int iJob = m_MapJob.Find( jobSleeping.m_JobID );
  740. m_QueueJobSleeping.RemoveAtHead();
  741. if ( m_MapJob.InvalidIndex() == iJob )
  742. continue;
  743. Assert( m_MapJob[iJob]->GetPauseReason() == k_EJobPauseReasonSleepForTime );
  744. // should never be false, but if it is we don't want to do anything to this job
  745. if ( m_MapJob[iJob]->GetPauseReason() == k_EJobPauseReasonSleepForTime )
  746. {
  747. // resume the job
  748. m_MapJob[iJob]->Continue();
  749. }
  750. if ( limitTimer.BLimitReached() )
  751. break;
  752. }
  753. return ( m_QueueJobSleeping.Count() > 0 );
  754. }
  755. //-----------------------------------------------------------------------------
  756. // Purpose: comparison function for sorting sleeping jobs list by time
  757. // Output : Returns true on if lhs is greater than the rhs
  758. //-----------------------------------------------------------------------------
  759. bool CJobMgr::JobSleepingLessFunc( JobSleeping_t const &lhs, JobSleeping_t const &rhs )
  760. {
  761. // a lower time is a higher priority
  762. return ( lhs.m_SWakeupTime.LTime() > rhs.m_SWakeupTime.LTime() );
  763. }
  764. JobID_t g_DebugJob = k_GIDNil;
  765. //-----------------------------------------------------------------------------
  766. // Purpose: quickly iterates the list of jobs to make sure none have been paused
  767. // for too long
  768. //-----------------------------------------------------------------------------
  769. void CJobMgr::CheckForJobTimeouts( CLimitTimer &limitTimer )
  770. {
  771. // look through each active jobs
  772. // remove from the list any job that has successfully received it's I/O
  773. // send a failure msg to any job that has timed out
  774. // since the timeout time is constant, we only have to check until we find a job
  775. int cIter = 0;
  776. while ( m_ListJobTimeouts.Head() != m_ListJobTimeouts.InvalidIndex() )
  777. {
  778. cIter ++;
  779. // Break if limit timer is reached and we've already processed at least one item.
  780. if ( cIter > 1 && limitTimer.BLimitReached() )
  781. break;
  782. JobTimeout_t &jobtimeout = m_ListJobTimeouts[ m_ListJobTimeouts.Head() ];
  783. // see if it's timed out
  784. if ( !m_bIsShuttingDown && jobtimeout.m_STimeTouched.CServerMicroSecsPassed() < k_cMicroSecJobHeartbeat )
  785. {
  786. // we haven't reached our recycle or timeout limit, which means none of the jobs passed us in the queue would have either
  787. break;
  788. }
  789. // get the first job in the list, which is the most likely to have timed out
  790. int iJob = m_MapJob.Find( jobtimeout.m_JobID );
  791. if ( m_MapJob.InvalidIndex() == iJob )
  792. {
  793. m_MapJobTimeoutsIndexByJobID.Remove( jobtimeout.m_JobID );
  794. m_ListJobTimeouts.Remove( m_ListJobTimeouts.Head() );
  795. continue;
  796. }
  797. // job still exists, make sure it is still paused at the same point
  798. CJob *pJob = m_MapJob[iJob];
  799. if ( pJob->GetTimeSwitched().LTime() == jobtimeout.m_STimePaused.LTime() )
  800. {
  801. jobtimeout.m_cHeartbeatsBeforeTimeout--;
  802. if ( pJob->GetJobID() == g_DebugJob )
  803. {
  804. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "Heartbeat!\n" );
  805. }
  806. // Always heartbeat so anyone waiting on the job (say on another server) will know it is still alive
  807. // Note that we even do this right before we timeout, since the job will actually be continued and may just loop itself right back into this waiting state
  808. // Note also that we do NOT check pJob->GetNextHeartbeatTime() since we've already been watching our own timer
  809. pJob->Heartbeat();
  810. if ( m_bIsShuttingDown || jobtimeout.m_cHeartbeatsBeforeTimeout <= 0 )
  811. {
  812. // Job finished all its available heartbeats before its timeout limit, timeout if appropriate and remove from the list
  813. m_MapJobTimeoutsIndexByJobID.Remove( jobtimeout.m_JobID );
  814. m_ListJobTimeouts.Remove( m_ListJobTimeouts.Head() );
  815. bool bShouldTimeout = true;
  816. switch ( pJob->m_ePauseReason )
  817. {
  818. case k_EJobPauseReasonWaitingForLock:
  819. case k_EJobPauseReasonYield:
  820. case k_EJobPauseReasonSQL:
  821. bShouldTimeout = false;
  822. break;
  823. case k_EJobPauseReasonSleepForTime:
  824. bShouldTimeout = m_bIsShuttingDown;
  825. break;
  826. } // switch
  827. // If the job WAS waiting on IO but now is waiting on a Lock, Sleeping,
  828. // or Yielding, don't time it out.
  829. // BUGBUG taylor we should fix things so that we can timeout Jobs waiting on
  830. // Locks and have them properly unlink themselves from the Lock chain
  831. if ( bShouldTimeout )
  832. {
  833. TimeoutJob( *( pJob ) );
  834. }
  835. }
  836. else
  837. {
  838. // Job has not yet used up all its available heartbeats before its timeout limit
  839. // We've already decremented its m_cHeartbeatsBeforeTimeout, now Reset its touched time too
  840. jobtimeout.m_STimeTouched.SetToJobTime();
  841. // Move it back to the end of the queue so it can come back up to the top for either another heartbeat or a timeout
  842. m_ListJobTimeouts.LinkToTail( m_ListJobTimeouts.Head() );
  843. int iIndexMap = m_MapJobTimeoutsIndexByJobID.Find( jobtimeout.m_JobID );
  844. if ( iIndexMap != m_MapJobTimeoutsIndexByJobID.InvalidIndex() )
  845. {
  846. int &iListIndex = m_MapJobTimeoutsIndexByJobID.Element( iIndexMap );
  847. iListIndex = m_ListJobTimeouts.Tail();
  848. }
  849. else
  850. {
  851. AssertMsg( false, "Map of jobs to timeout is corrupted" );
  852. }
  853. }
  854. continue;
  855. }
  856. else
  857. {
  858. // This is really the common heartbeating case, where the job waited a short while without ever reaching the k_cMicroSecJobHeartbeat limit
  859. // Thus, we need to heartbeat before removing it from the list IF the job has gone too long without heartbeating
  860. if ( pJob->BJobNeedsToHeartbeat() )
  861. {
  862. pJob->Heartbeat();
  863. }
  864. // Since the job didn't actually time out, clear this timeout event
  865. m_MapJobTimeoutsIndexByJobID.Remove( jobtimeout.m_JobID );
  866. m_ListJobTimeouts.Remove( m_ListJobTimeouts.Head() );
  867. }
  868. }
  869. }
  870. //-----------------------------------------------------------------------------
  871. // Purpose: Continues a job in a timed out state
  872. //-----------------------------------------------------------------------------
  873. void CJobMgr::TimeoutJob( CJob &job )
  874. {
  875. if ( job.GetPauseReason() == k_EJobPauseReasonNetworkMsg )
  876. job.m_flags.m_bits.m_bTimeoutNetMsg = true;
  877. else
  878. {
  879. // these are so rare I dont want to add a column for them in the rollup
  880. EG_WARNING( SPEW_JOB, "Resuming job '%s (id: %lld)' due to timeout while paused for %s\n", job.GetName(),
  881. job.GetJobID(), job.GetPauseReasonDescription() );
  882. job.m_flags.m_bits.m_bTimeoutOther = true;
  883. }
  884. m_JobStats.m_cJobsTimedOut++;
  885. m_bJobTimedOut = true;
  886. job.Continue();
  887. m_bJobTimedOut = false;
  888. }
  889. //-----------------------------------------------------------------------------
  890. // Purpose: wakes up a job that was waiting on a lock
  891. //-----------------------------------------------------------------------------
  892. void CJobMgr::WakeupLockedJob( CJob &job )
  893. {
  894. Assert( job.m_ePauseReason == k_EJobPauseReasonWaitingForLock );
  895. // in case of error, bug out now so as not
  896. // to cause more trouble
  897. if ( job.m_ePauseReason != k_EJobPauseReasonWaitingForLock )
  898. {
  899. return;
  900. }
  901. // insert the job into the yielding list so it will wakeup next Run
  902. AddToYieldList( job );
  903. // reset the sleep reason
  904. job.m_ePauseReason = k_EJobPauseReasonYield;
  905. }
  906. //-----------------------------------------------------------------------------
  907. // Purpose: Pauses a job, and puts it in a list to check for timeouts
  908. //-----------------------------------------------------------------------------
  909. void CJobMgr::PauseJob( CJob &job, EJobPauseReason eJobPauseReason )
  910. {
  911. Assert( !m_bDebugDisallowPause );
  912. if ( m_bDebugDisallowPause )
  913. {
  914. EmitError( SPEW_GC, "Job %s attempted to pause even though pauses were disabled\n", job.GetName() );
  915. }
  916. // add to list to check for timeouts later (or update the existing entry if it is already there)
  917. JobTimeout_t *pJobTimeout;
  918. int iMapIndex = m_MapJobTimeoutsIndexByJobID.Find( job.GetJobID() );
  919. if ( iMapIndex == m_MapJobTimeoutsIndexByJobID.InvalidIndex() )
  920. {
  921. pJobTimeout = &m_ListJobTimeouts[ m_ListJobTimeouts.AddToTail() ];
  922. m_MapJobTimeoutsIndexByJobID.Insert( job.GetJobID(), m_ListJobTimeouts.Tail() );
  923. }
  924. else
  925. {
  926. // There was an existing entry, in addition to updating it, move it to the tail
  927. int &iListIndex = m_MapJobTimeoutsIndexByJobID.Element( iMapIndex );
  928. m_ListJobTimeouts.LinkToTail( iListIndex );
  929. iListIndex = m_ListJobTimeouts.Tail();
  930. pJobTimeout = &m_ListJobTimeouts.Element( iListIndex );
  931. }
  932. pJobTimeout->m_JobID = job.GetJobID();
  933. pJobTimeout->m_STimePaused.SetToJobTime();
  934. pJobTimeout->m_STimeTouched.SetToJobTime();
  935. pJobTimeout->m_cHeartbeatsBeforeTimeout = job.CHeartbeatsBeforeTimeout();
  936. if ( eJobPauseReason == k_EJobPauseReasonWorkItem )
  937. {
  938. // work items control their own schedule - wait up to 6 hours
  939. pJobTimeout->m_cHeartbeatsBeforeTimeout = (6 * 60 * 60 * k_nMillion) / k_cMicroSecJobHeartbeat;
  940. }
  941. if ( pJobTimeout->m_cHeartbeatsBeforeTimeout <= 0 )
  942. {
  943. pJobTimeout->m_cHeartbeatsBeforeTimeout = k_cJobHeartbeatsBeforeTimeoutDefault;
  944. }
  945. // tell the job to pause
  946. job.Pause( eJobPauseReason );
  947. }
  948. //-----------------------------------------------------------------------------
  949. // Purpose: dumps a list of currently active jobs to the console
  950. // Output : int - number of jobs listed
  951. //-----------------------------------------------------------------------------
  952. int CJobMgr::DumpJobSummary()
  953. {
  954. CUtlMap< uint32, JobStatsBucket_t, int > mapStatsBucket( 0, 0, DefLessFunc( uint32 ) );
  955. FOR_EACH_MAP_FAST( m_MapJob, i )
  956. {
  957. CJob &job = *m_MapJob[i];
  958. // the pointer to the name is a pointer to a constant string
  959. // so use this dirty trick to make lookups fast
  960. uint32 eBucket = (uint32)job.GetName();
  961. int iBucket = mapStatsBucket.Find( eBucket );
  962. if ( iBucket == mapStatsBucket.InvalidIndex() )
  963. {
  964. iBucket = mapStatsBucket.Insert( eBucket );
  965. V_strcpy_safe( mapStatsBucket[iBucket].m_rgchName, job.GetName() );
  966. }
  967. JobStatsBucket_t *pJobStatsBucket = &mapStatsBucket[iBucket];
  968. pJobStatsBucket->m_cCompletes++; // overloading this to really mean "jobs running" for this spew
  969. pJobStatsBucket->m_cLocksAttempted += job.m_vecLocks.Count(); // overloading this to really be used for "locks held" for this spew
  970. pJobStatsBucket->m_u64JobDuration += job.m_STimeStarted.CServerMicroSecsPassed();
  971. switch ( job.m_ePauseReason )
  972. {
  973. case k_EJobPauseReasonNetworkMsg: pJobStatsBucket->m_cPauseReasonNetworkMsg++; break;
  974. case k_EJobPauseReasonSleepForTime: pJobStatsBucket->m_cPauseReasonSleepForTime++; break;
  975. case k_EJobPauseReasonWaitingForLock: pJobStatsBucket->m_cPauseReasonWaitingForLock++; break;
  976. case k_EJobPauseReasonYield: pJobStatsBucket->m_cPauseReasonYield++; break;
  977. case k_EJobPauseReasonSQL: pJobStatsBucket->m_cPauseReasonSQL++; break;
  978. case k_EJobPauseReasonWorkItem: pJobStatsBucket->m_cPauseReasonWorkItem++; break;
  979. default: break;
  980. }
  981. }
  982. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS,
  983. "%50s --- running jobs (usec)-- -- locks held -- ----- pause reasons ---------------------------------\n", " " );
  984. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS,
  985. "%50s count aveduration netmsg sql sleep waitlock yield workitem\n", "name" );
  986. JobProfileStats_t jobprofilestats;
  987. jobprofilestats.m_iJobProfileSort = k_EJobProfileSortOrder_Count;
  988. jobprofilestats.pmapStatsBucket = &mapStatsBucket;
  989. CUtlVector<int> vecSort( 0, mapStatsBucket.Count() );
  990. FOR_EACH_MAP_FAST( mapStatsBucket, iBucket )
  991. {
  992. vecSort.AddToTail( iBucket );
  993. }
  994. V_qsort_s( vecSort.Base(), vecSort.Count(), sizeof(int), (QSortCompareFuncCtx_t)ProfileSortFunc, &jobprofilestats );
  995. FOR_EACH_VEC( vecSort, iVec )
  996. {
  997. JobStatsBucket_t &bucket = mapStatsBucket[ vecSort[iVec] ];
  998. int64 msecDurationAve = bucket.m_u64JobDuration / bucket.m_cCompletes;
  999. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%50s %8lld %16lld %13lld %11lld %8lld %8lld %8lld %8lld %8lld \n",
  1000. bucket.m_rgchName,
  1001. bucket.m_cCompletes,
  1002. msecDurationAve,
  1003. bucket.m_cLocksAttempted,
  1004. bucket.m_cPauseReasonNetworkMsg,
  1005. bucket.m_cPauseReasonSQL,
  1006. bucket.m_cPauseReasonSleepForTime,
  1007. bucket.m_cPauseReasonWaitingForLock,
  1008. bucket.m_cPauseReasonYield,
  1009. bucket.m_cPauseReasonWorkItem
  1010. );
  1011. }
  1012. return m_MapJob.Count();
  1013. }
  1014. //-----------------------------------------------------------------------------
  1015. // Purpose: spews details about a job by ID
  1016. //-----------------------------------------------------------------------------
  1017. void CJobMgr::DumpJob( JobID_t jobID, int nPrintLocksMax ) const
  1018. {
  1019. const CJob *pJob = GetPJob( jobID );
  1020. if( !pJob )
  1021. {
  1022. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "Invalid job ID %llu\n", jobID );
  1023. }
  1024. else
  1025. {
  1026. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%llu\t%12s %12s\n",
  1027. pJob->GetJobID(),
  1028. pJob->GetName(),
  1029. pJob->GetPauseReasonDescription() );
  1030. if ( pJob->GetPauseReason() == k_EJobPauseReasonWaitingForLock && pJob->m_pWaitingOnLock != NULL )
  1031. {
  1032. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "\tWaiting for lock %s from: %s line %d\n", pJob->m_pWaitingOnLock->GetName(), pJob->m_pWaitingOnLockFilename, pJob->m_waitingOnLockLine );
  1033. pJob->m_pWaitingOnLock->Dump( "\t ", nPrintLocksMax, true );
  1034. }
  1035. FOR_EACH_VEC( pJob->m_vecLocks, nLock )
  1036. {
  1037. CLock *pLock = pJob->m_vecLocks[nLock];
  1038. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "\tHolding lock %s:\n", pLock->GetName() );
  1039. pLock->Dump( "\t ", nPrintLocksMax, true );
  1040. }
  1041. }
  1042. }
  1043. //-----------------------------------------------------------------------------
  1044. // Purpose: count the number of active jobs
  1045. //-----------------------------------------------------------------------------
  1046. int CJobMgr::CountJobs() const
  1047. {
  1048. return m_MapJob.Count();
  1049. }
  1050. //-----------------------------------------------------------------------------
  1051. // Purpose: verify that current thread is correct
  1052. //-----------------------------------------------------------------------------
  1053. void CJobMgr::CheckThreadID()
  1054. {
  1055. uint unCurrentThread = ThreadGetCurrentId();
  1056. if ( m_unFrameFuncThreadID == 0 )
  1057. {
  1058. m_unFrameFuncThreadID = unCurrentThread;
  1059. }
  1060. else
  1061. {
  1062. // if this Assert goes of, you most likely tried to start
  1063. // a job from a different thread then the frame function thread
  1064. Assert( m_unFrameFuncThreadID == unCurrentThread );
  1065. }
  1066. }
  1067. //-----------------------------------------------------------------------------
  1068. // Purpose: JobType_t comparer, used to sort the list of registered
  1069. // jobs into a tree by msg that creates them
  1070. //-----------------------------------------------------------------------------
  1071. bool JobTypeSortFuncByMsg( JobType_t const * const &lhs, JobType_t const * const &rhs )
  1072. {
  1073. if ( lhs->m_eCreationMsg == rhs->m_eCreationMsg )
  1074. {
  1075. return ( lhs->m_eServerType < rhs->m_eServerType );
  1076. }
  1077. return ( lhs->m_eCreationMsg < rhs->m_eCreationMsg );
  1078. }
  1079. //-----------------------------------------------------------------------------
  1080. // Purpose: JobType_t comparer, used to sort the list of registered
  1081. // jobs into a tree by job name
  1082. //-----------------------------------------------------------------------------
  1083. bool JobTypeSortFuncByName( JobType_t const * const &lhs, JobType_t const * const &rhs )
  1084. {
  1085. int iCompare = Q_strcmp( lhs->m_pchName, rhs->m_pchName );
  1086. if ( iCompare == 0 )
  1087. {
  1088. return ( lhs->m_eServerType < rhs->m_eServerType );
  1089. }
  1090. return ( iCompare < 0 );
  1091. }
  1092. // singeton accessor to list of registered jobs
  1093. CUtlRBTree<const JobType_t *> &GMapJobTypesByMsg()
  1094. {
  1095. static CUtlRBTree<const JobType_t *> s_MapJobTypes( 0, 0, JobTypeSortFuncByMsg );
  1096. return s_MapJobTypes;
  1097. }
  1098. // singeton accessor to list of registered jobs
  1099. CUtlRBTree<const JobType_t *> &GMapJobTypesByName()
  1100. {
  1101. static CUtlRBTree<const JobType_t *> s_MapJobTypes( 0, 0, JobTypeSortFuncByName );
  1102. return s_MapJobTypes;
  1103. }
  1104. //-----------------------------------------------------------------------------
  1105. // Purpose: adds a new type of job into the global list
  1106. //-----------------------------------------------------------------------------
  1107. void CJobMgr::RegisterJobType( const JobType_t *pJobType )
  1108. {
  1109. Assert( pJobType->m_pchName != NULL );
  1110. Assert( pJobType->m_pJobFactory != NULL );
  1111. GMapJobTypesByMsg().Insert( pJobType );
  1112. GMapJobTypesByName().Insert( pJobType );
  1113. }
  1114. //-----------------------------------------------------------------------------
  1115. // Purpose: Creates a new job from the network msg
  1116. // Input : *pServerParent - server to attach job to
  1117. // msg - network msg
  1118. // Output : true if a job was created
  1119. //-----------------------------------------------------------------------------
  1120. bool CJobMgr::BLaunchJobFromNetworkMsg( void *pParent, const JobMsgInfo_t &jobMsgInfo, IMsgNetPacket *pNetPacket )
  1121. {
  1122. if ( pNetPacket == NULL )
  1123. {
  1124. AssertMsg(pNetPacket, "CJobMgr::BLaunchJobFromNetworkMsg received NULL packet.");
  1125. return false;
  1126. }
  1127. if ( pNetPacket->BHasTargetJobName() && BIsValidSystemMsg( pNetPacket->GetEMsg(), NULL ) )
  1128. {
  1129. JobType_t jobSearch = { pNetPacket->GetTargetJobName(), k_EGCMsgInvalid, jobMsgInfo.m_eServerType };
  1130. int iJobType = GMapJobTypesByName().Find( &jobSearch );
  1131. if ( GMapJobTypesByName().IsValidIndex( iJobType ) )
  1132. {
  1133. // Get shortcut to job info
  1134. const JobType_t *pJobType = (GMapJobTypesByName())[iJobType];
  1135. Assert( pJobType );
  1136. Assert( pJobType->m_pchName );
  1137. // Create the job
  1138. CJob *job = pJobType->m_pJobFactory( pParent, NULL );
  1139. // Safety check
  1140. if ( job == NULL )
  1141. {
  1142. AssertMsg1( job, "Job factory returned NULL for job named '%s'!\n", pJobType->m_pchName );
  1143. return false;
  1144. }
  1145. // Start the job
  1146. job->StartJobFromNetworkMsg( pNetPacket, jobMsgInfo.m_JobIDSource );
  1147. return true;
  1148. }
  1149. }
  1150. else
  1151. {
  1152. JobType_t jobSearch = { 0, jobMsgInfo.m_eMsg, jobMsgInfo.m_eServerType };
  1153. int iJobType = GMapJobTypesByMsg().Find( &jobSearch );
  1154. if ( GMapJobTypesByMsg().IsValidIndex( iJobType ) )
  1155. {
  1156. // Get shortcut to job info
  1157. const JobType_t *pJobType = (GMapJobTypesByMsg())[iJobType];
  1158. Assert( pJobType );
  1159. Assert( pJobType->m_pchName );
  1160. // Create the job
  1161. CJob *job = pJobType->m_pJobFactory( pParent, NULL );
  1162. // Safety check
  1163. if ( job == NULL )
  1164. {
  1165. AssertMsg3( job, "Job factory returned NULL for job msg %d, server type %d (named '%s')!\n", (int)jobMsgInfo.m_eMsg, (int)jobMsgInfo.m_eServerType, pJobType->m_pchName );
  1166. return false;
  1167. }
  1168. // Start the job
  1169. job->StartJobFromNetworkMsg( pNetPacket, jobMsgInfo.m_JobIDSource );
  1170. return true;
  1171. }
  1172. }
  1173. return false;
  1174. }
  1175. //-----------------------------------------------------------------------------
  1176. // Purpose: profile sort func
  1177. //-----------------------------------------------------------------------------
  1178. int CJobMgr::ProfileSortFunc( void *pCtx, const int *lhs, const int *rhs )
  1179. {
  1180. JobProfileStats_t *pJobprofilestats = (JobProfileStats_t *)pCtx;
  1181. int64 d = 0;
  1182. switch ( pJobprofilestats->m_iJobProfileSort )
  1183. {
  1184. default:
  1185. case k_EJobProfileSortOrder_Alpha:
  1186. return Q_stricmp( pJobprofilestats->pmapStatsBucket->Element(*lhs).m_rgchName,
  1187. pJobprofilestats->pmapStatsBucket->Element(*rhs).m_rgchName );
  1188. case k_EJobProfileSortOrder_Count:
  1189. d = ((int64)pJobprofilestats->pmapStatsBucket->Element(*rhs).m_cCompletes -
  1190. (int64)pJobprofilestats->pmapStatsBucket->Element(*lhs).m_cCompletes);
  1191. break;
  1192. case k_EJobProfileSortOrder_TotalRuntime:
  1193. d = ((int64)pJobprofilestats->pmapStatsBucket->Element(*rhs).m_u64RunTime -
  1194. (int64)pJobprofilestats->pmapStatsBucket->Element(*lhs).m_u64RunTime);
  1195. break;
  1196. }
  1197. if ( d < 0 )
  1198. return -1;
  1199. if ( d > 0 )
  1200. return 1;
  1201. return 0;
  1202. }
  1203. //-----------------------------------------------------------------------------
  1204. // Purpose: dump out accumulated job profile data
  1205. //-----------------------------------------------------------------------------
  1206. void CJobMgr::ProfileJobs( EJobProfileAction ejobProfileAction, EJobProfileSortOrder iSortOrder )
  1207. {
  1208. bool bClearBuckets = false;
  1209. if ( ejobProfileAction == k_EJobProfileAction_Start )
  1210. {
  1211. if ( !m_bProfiling )
  1212. {
  1213. bClearBuckets = true;
  1214. }
  1215. m_bProfiling = true;
  1216. }
  1217. else if ( ejobProfileAction == k_EJobProfileAction_Stop )
  1218. {
  1219. m_bProfiling = false;
  1220. }
  1221. else if ( ejobProfileAction == k_EJobProfileAction_Clear )
  1222. {
  1223. bClearBuckets = true;
  1224. }
  1225. if ( bClearBuckets )
  1226. {
  1227. m_mapStatsBucket.RemoveAll();
  1228. }
  1229. if ( k_EJobProfileAction_Dump != ejobProfileAction )
  1230. return;
  1231. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS,
  1232. "%44s --- completed jobs (usec)---------------------------------- ------ lock counts---------------------------------- ------ failures -----------\n", " " );
  1233. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS,
  1234. "%44s count averuntime maxruntime aveduration #yielded attempted waited failed longheld longwait wait-t/o t/o-msg jobfailed longslice\n", "name" );
  1235. JobProfileStats_t jobprofilestats;
  1236. jobprofilestats.m_iJobProfileSort = iSortOrder;
  1237. jobprofilestats.pmapStatsBucket = &m_mapStatsBucket;
  1238. CUtlVector<int> vecSort( 0, m_mapStatsBucket.Count() );
  1239. FOR_EACH_MAP_FAST( m_mapStatsBucket, iBucket )
  1240. {
  1241. vecSort.AddToTail( iBucket );
  1242. }
  1243. V_qsort_s( vecSort.Base(), vecSort.Count(), sizeof(int), (QSortCompareFuncCtx_t)ProfileSortFunc, &jobprofilestats );
  1244. FOR_EACH_VEC( vecSort, iVec )
  1245. {
  1246. JobStatsBucket_t &bucket = m_mapStatsBucket[ vecSort[iVec] ];
  1247. if ( bucket.m_cCompletes )
  1248. {
  1249. CCycleCount ccRunTime( bucket.m_u64RunTime / bucket.m_cCompletes );
  1250. int64 usecAve = ccRunTime.GetMicroseconds();
  1251. CCycleCount ccRunTimeMax( bucket.m_u64RunTimeMax );
  1252. int64 usecMax = ccRunTimeMax.GetMicroseconds();
  1253. int64 msecDurationAve = bucket.m_u64JobDuration / bucket.m_cCompletes;
  1254. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%44s %12lld %12lld %12lld %12lld %8lld %8lld %8lld %8lld %8lld %8lld %8lld %8lld %8lld %8lld\n",
  1255. bucket.m_rgchName,
  1256. bucket.m_cCompletes,
  1257. usecAve,
  1258. usecMax,
  1259. msecDurationAve,
  1260. bucket.m_cJobsPaused,
  1261. bucket.m_cLocksAttempted,
  1262. bucket.m_cLocksWaitedFor,
  1263. bucket.m_cLocksFailed,
  1264. bucket.m_cLocksLongHeld,
  1265. bucket.m_cLocksLongWait,
  1266. bucket.m_cWaitTimeout,
  1267. bucket.m_cTimeoutNetMsg,
  1268. bucket.m_cJobsFailed,
  1269. bucket.m_cLongInterYieldTime );
  1270. }
  1271. }
  1272. if ( m_mapOrphanMessages.Count() )
  1273. {
  1274. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "Messages that arrived responding to jobs that no longer exists and were dropped\n" );
  1275. FOR_EACH_MAP_FAST( m_mapOrphanMessages, iBucket )
  1276. {
  1277. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%44s %12d\n", PchMsgNameFromEMsg( m_mapOrphanMessages.Key(iBucket) ), m_mapOrphanMessages[iBucket] );
  1278. }
  1279. m_mapOrphanMessages.RemoveAll();
  1280. }
  1281. }
  1282. //-----------------------------------------------------------------------------
  1283. // Purpose: Dump a list of all jobs to the console
  1284. // Each job is indexed, and that index can be used with
  1285. // DebugJob() to cause a debug break in that job.
  1286. //-----------------------------------------------------------------------------
  1287. void CJobMgr::DumpJobs( const char *pszJobName, int nMax, int nPrintLocksMax ) const
  1288. {
  1289. FOR_EACH_MAP_FAST( m_MapJob, iJob )
  1290. {
  1291. if ( nMax <= 0 )
  1292. break;
  1293. nMax--;
  1294. if ( pszJobName == NULL || V_strcmp( pszJobName, m_MapJob[iJob]->GetName() ) == 0 )
  1295. {
  1296. DumpJob( m_MapJob.Key(iJob), nPrintLocksMax );
  1297. }
  1298. }
  1299. EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "Total job count: %d\n", m_MapJob.Count() );
  1300. }
  1301. //-----------------------------------------------------------------------------
  1302. // Purpose: cause a debug break in the given job
  1303. //-----------------------------------------------------------------------------
  1304. void CJobMgr::DebugJob( int iJob )
  1305. {
  1306. #ifdef DEBUG_JOB_LIST
  1307. if ( sm_listAllJobs.IsValidIndex( iJob ) )
  1308. {
  1309. sm_listAllJobs[iJob]->Debug();
  1310. }
  1311. else
  1312. {
  1313. EmitInfo( SPEW_CONSOLE, 1, 1, "Job not found\n" );
  1314. }
  1315. #else
  1316. EmitInfo( SPEW_CONSOLE, 1, 1, "Job debugging disabled\n" );
  1317. #endif
  1318. }
  1319. #ifdef DBGFLAG_VALIDATE
  1320. //-----------------------------------------------------------------------------
  1321. // Purpose: Run a global validation pass on all of our data structures and memory
  1322. // allocations.
  1323. // Input: validator - Our global validator object
  1324. // pchName - Our name (typically a member var in our container)
  1325. //-----------------------------------------------------------------------------
  1326. void CJobMgr::Validate( CValidator &validator, const char *pchName )
  1327. {
  1328. VALIDATE_SCOPE();
  1329. ValidateObj( m_MapJob );
  1330. FOR_EACH_MAP_FAST( m_MapJob, iJob )
  1331. {
  1332. ValidatePtr( m_MapJob[iJob] );
  1333. }
  1334. ValidateObj( m_mapStatsBucket );
  1335. FOR_EACH_MAP_FAST( m_mapStatsBucket, iBucket )
  1336. {
  1337. ValidateObj( m_mapStatsBucket[iBucket] );
  1338. }
  1339. ValidateObj( m_ListJobsYieldingRegPri );
  1340. ValidateObj( m_ListJobTimeouts );
  1341. ValidateObj( m_MapJobTimeoutsIndexByJobID );
  1342. ValidateObj( m_QueueJobSleeping );
  1343. ValidateObj( m_WorkThreadPool );
  1344. }
  1345. //-----------------------------------------------------------------------------
  1346. // Purpose: Run a global validation pass on all of our global data
  1347. // Input: validator - Our global validator object
  1348. //-----------------------------------------------------------------------------
  1349. void CJobMgr::ValidateStatics( CValidator &validator, const char *pchName )
  1350. {
  1351. VALIDATE_SCOPE_STATIC( "CJobMgr class statics" );
  1352. ValidateObj( GMapJobTypesByMsg() );
  1353. ValidateObj( GMapJobTypesByName() );
  1354. #ifdef DEBUG_JOB_LIST
  1355. ValidateObj( sm_listAllJobs );
  1356. #endif
  1357. }
  1358. #endif // DBGFLAG_VALIDATE
  1359. } // namespace GCSDK