Team Fortress 2 Source Code as on 22/4/2020
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

699 lines
20 KiB

  1. //========= Copyright Valve Corporation, All rights reserved. ============//
  2. //
  3. // Purpose:
  4. //
  5. //=============================================================================
  6. #include "vmpi.h"
  7. #include "vmpi_distribute_work.h"
  8. #include "tier0/platform.h"
  9. #include "tier0/dbg.h"
  10. #include "utlvector.h"
  11. #include "utllinkedlist.h"
  12. #include "vmpi_dispatch.h"
  13. #include "pacifier.h"
  14. #include "vstdlib/random.h"
  15. #include "mathlib/mathlib.h"
  16. #include "threadhelpers.h"
  17. #include "threads.h"
  18. #include "tier1/strtools.h"
  19. #include "tier1/utlmap.h"
  20. #include "tier1/smartptr.h"
  21. #include "tier0/icommandline.h"
  22. #include "cmdlib.h"
  23. #include "vmpi_distribute_tracker.h"
  24. #include "vmpi_distribute_work_internal.h"
  25. #define DW_SUBPACKETID_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+0)
  26. #define DW_SUBPACKETID_REQUEST_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+1)
  27. #define DW_SUBPACKETID_WUS_COMPLETED_LIST (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+2)
  28. // This is a pretty simple iterator. Basically, it holds a matrix of numbers.
  29. // Each row is assigned to a worker, and the worker just walks through his row.
  30. //
  31. // When a worker reaches the end of his row, it gets a little trickier.
  32. // They'll start doing their neighbor's row
  33. // starting at the back and continue on. At about this time, the master should reshuffle the
  34. // remaining work units to evenly distribute them amongst the workers.
  35. class CWorkUnitWalker
  36. {
  37. public:
  38. CWorkUnitWalker()
  39. {
  40. m_nWorkUnits = 0;
  41. }
  42. // This is all that's needed for it to start assigning work units.
  43. void Init( WUIndexType matrixWidth, WUIndexType matrixHeight, WUIndexType nWorkUnits )
  44. {
  45. m_nWorkUnits = nWorkUnits;
  46. m_MatrixWidth = matrixWidth;
  47. m_MatrixHeight = matrixHeight;
  48. Assert( m_MatrixWidth * m_MatrixHeight >= nWorkUnits );
  49. m_WorkerInfos.RemoveAll();
  50. m_WorkerInfos.EnsureCount( m_MatrixHeight );
  51. for ( int i=0; i < m_MatrixHeight; i++ )
  52. {
  53. m_WorkerInfos[i].m_iStartWorkUnit = matrixWidth * i;
  54. m_WorkerInfos[i].m_iWorkUnitOffset = 0;
  55. }
  56. }
  57. // This is the main function of the shuffler
  58. bool GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex, bool *bWorkerFinishedHisColumn )
  59. {
  60. if ( iWorker < 0 || iWorker >= m_WorkerInfos.Count() )
  61. {
  62. Assert( false );
  63. return false;
  64. }
  65. // If this worker has walked through all the work units, then he's done.
  66. CWorkerInfo *pWorker = &m_WorkerInfos[iWorker];
  67. if ( pWorker->m_iWorkUnitOffset >= m_nWorkUnits )
  68. return false;
  69. // If we've gone past the end of our work unit list, then we start at the BACK of the other rows of work units
  70. // in the hopes that we won't collide with the guy working there. We also should tell the master to reshuffle.
  71. WUIndexType iWorkUnitOffset = pWorker->m_iWorkUnitOffset;
  72. if ( iWorkUnitOffset >= m_MatrixWidth )
  73. {
  74. WUIndexType xOffset = iWorkUnitOffset % m_MatrixWidth;
  75. WUIndexType yOffset = iWorkUnitOffset / m_MatrixWidth;
  76. xOffset = m_MatrixWidth - xOffset - 1;
  77. iWorkUnitOffset = yOffset * m_MatrixWidth + xOffset;
  78. *bWorkerFinishedHisColumn = true;
  79. }
  80. else
  81. {
  82. *bWorkerFinishedHisColumn = false;
  83. }
  84. *pWUIndex = (pWorker->m_iStartWorkUnit + iWorkUnitOffset) % m_nWorkUnits;
  85. ++pWorker->m_iWorkUnitOffset;
  86. return true;
  87. }
  88. private:
  89. class CWorkerInfo
  90. {
  91. public:
  92. WUIndexType m_iStartWorkUnit;
  93. WUIndexType m_iWorkUnitOffset; // Which work unit in my list of work units am I working on?
  94. };
  95. WUIndexType m_nWorkUnits;
  96. WUIndexType m_MatrixWidth;
  97. WUIndexType m_MatrixHeight;
  98. CUtlVector<CWorkerInfo> m_WorkerInfos;
  99. };
  100. class IShuffleRequester
  101. {
  102. public:
  103. virtual void RequestShuffle() = 0;
  104. };
  105. // This is updated every time the master decides to reshuffle.
  106. // In-between shuffles, you can call NoteWorkUnitCompleted when a work unit is completed
  107. // and it'll avoid returning that work unit from GetNextWorkUnit again, but it WON'T
  108. class CShuffledWorkUnitWalker
  109. {
  110. public:
  111. void Init( WUIndexType nWorkUnits, IShuffleRequester *pRequester )
  112. {
  113. m_iLastShuffleRequest = 0;
  114. m_iCurShuffle = 1;
  115. m_flLastShuffleTime = Plat_FloatTime();
  116. m_pShuffleRequester = pRequester;
  117. int nBytes = PAD_NUMBER( nWorkUnits, 8 ) / 8;
  118. m_CompletedWUBits.SetSize( nBytes );
  119. m_LocalCompletedWUBits.SetSize( nBytes );
  120. for ( WUIndexType i=0; i < m_CompletedWUBits.Count(); i++ )
  121. m_LocalCompletedWUBits[i] = m_CompletedWUBits[i] = 0;
  122. // Setup our list of work units remaining.
  123. for ( WUIndexType iWU=0; iWU < nWorkUnits; iWU++ )
  124. {
  125. // Note: we're making an assumption here that if we add entries to a CUtlLinkedList in ascending order, their indices
  126. // will be ascending 1-by-1 as well. If that assumption breaks, we can create an extra array here to map WU indices to the linked list indices.
  127. WUIndexType index = m_WorkUnitsRemaining.AddToTail( iWU );
  128. if ( index != iWU )
  129. {
  130. Error( "CShuffledWorkUnitWalker: assumption on CUtlLinkedList indexing failed.\n" );
  131. }
  132. }
  133. }
  134. void Shuffle( int nWorkers )
  135. {
  136. if ( nWorkers == 0 )
  137. return;
  138. ++m_iCurShuffle;
  139. m_flLastShuffleTime = Plat_FloatTime();
  140. CCriticalSectionLock csLock( &m_CS );
  141. csLock.Lock();
  142. m_WorkUnitsMap.RemoveAll();
  143. m_WorkUnitsMap.EnsureCount( m_WorkUnitsRemaining.Count() );
  144. // Here's the shuffle. The CWorkUnitWalker is going to walk each worker through its own group from 0-W,
  145. // and our job is to interleave it so when worker 0 goes [0,1,2] and worker 1 goes [100,101,102], they're actually
  146. // doing [0,N,2N] and [1,N+1,2N+1] where N=# of workers.
  147. // The grid is RxW long, and R*W is >= nWorkUnits.
  148. // R = # units per worker = width of the matrix
  149. // W = # workers = height of the matrix
  150. WUIndexType matrixHeight = nWorkers;
  151. WUIndexType matrixWidth = m_WorkUnitsRemaining.Count() / matrixHeight;
  152. if ( (m_WorkUnitsRemaining.Count() % matrixHeight) != 0 )
  153. ++matrixWidth;
  154. Assert( matrixWidth * matrixHeight >= m_WorkUnitsRemaining.Count() );
  155. WUIndexType iWorkUnit = 0;
  156. FOR_EACH_LL( m_WorkUnitsRemaining, i )
  157. {
  158. WUIndexType xCoord = iWorkUnit / matrixHeight;
  159. WUIndexType yCoord = iWorkUnit % matrixHeight;
  160. Assert( xCoord < matrixWidth );
  161. Assert( yCoord < matrixHeight );
  162. m_WorkUnitsMap[yCoord*matrixWidth+xCoord] = m_WorkUnitsRemaining[i];
  163. ++iWorkUnit;
  164. }
  165. m_Walker.Init( matrixWidth, matrixHeight, m_WorkUnitsRemaining.Count() );
  166. }
  167. // Threadsafe.
  168. bool Thread_IsWorkUnitCompleted( WUIndexType iWU )
  169. {
  170. CCriticalSectionLock csLock( &m_CS );
  171. csLock.Lock();
  172. byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7));
  173. return (val != 0);
  174. }
  175. WUIndexType Thread_NumWorkUnitsRemaining()
  176. {
  177. CCriticalSectionLock csLock( &m_CS );
  178. csLock.Lock();
  179. return m_WorkUnitsRemaining.Count();
  180. }
  181. bool Thread_GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex )
  182. {
  183. CCriticalSectionLock csLock( &m_CS );
  184. csLock.Lock();
  185. while ( 1 )
  186. {
  187. WUIndexType iUnmappedWorkUnit;
  188. bool bWorkerFinishedHisColumn;
  189. if ( !m_Walker.GetNextWorkUnit( iWorker, &iUnmappedWorkUnit, &bWorkerFinishedHisColumn ) )
  190. return false;
  191. // If we've done all the work units assigned to us in the last shuffle, then request a reshuffle.
  192. if ( bWorkerFinishedHisColumn )
  193. HandleWorkerFinishedColumn();
  194. // Check the pending list.
  195. *pWUIndex = m_WorkUnitsMap[iUnmappedWorkUnit];
  196. byte bIsCompleted = m_CompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7));
  197. byte bIsCompletedLocally = m_LocalCompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7));
  198. if ( !bIsCompleted && !bIsCompletedLocally )
  199. return true;
  200. }
  201. }
  202. void HandleWorkerFinishedColumn()
  203. {
  204. if ( m_iLastShuffleRequest != m_iCurShuffle )
  205. {
  206. double flCurTime = Plat_FloatTime();
  207. if ( flCurTime - m_flLastShuffleTime > 2.0f )
  208. {
  209. m_pShuffleRequester->RequestShuffle();
  210. m_iLastShuffleRequest = m_iCurShuffle;
  211. }
  212. }
  213. }
  214. void Thread_NoteWorkUnitCompleted( WUIndexType iWU )
  215. {
  216. CCriticalSectionLock csLock( &m_CS );
  217. csLock.Lock();
  218. byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7));
  219. if ( val == 0 )
  220. {
  221. m_WorkUnitsRemaining.Remove( iWU );
  222. m_CompletedWUBits[iWU >> 3] |= (1 << (iWU & 7));
  223. }
  224. }
  225. void Thread_NoteLocalWorkUnitCompleted( WUIndexType iWU )
  226. {
  227. CCriticalSectionLock csLock( &m_CS );
  228. csLock.Lock();
  229. m_LocalCompletedWUBits[iWU >> 3] |= (1 << (iWU & 7));
  230. }
  231. CRC32_t GetShuffleCRC()
  232. {
  233. #ifdef _DEBUG
  234. static bool bCalcShuffleCRC = true;
  235. #else
  236. static bool bCalcShuffleCRC = VMPI_IsParamUsed( mpi_CalcShuffleCRC );
  237. #endif
  238. if ( bCalcShuffleCRC )
  239. {
  240. CCriticalSectionLock csLock( &m_CS );
  241. csLock.Lock();
  242. CRC32_t ret;
  243. CRC32_Init( &ret );
  244. FOR_EACH_LL( m_WorkUnitsRemaining, i )
  245. {
  246. WUIndexType iWorkUnit = m_WorkUnitsRemaining[i];
  247. CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) );
  248. }
  249. for ( int i=0; i < m_WorkUnitsMap.Count(); i++ )
  250. {
  251. WUIndexType iWorkUnit = m_WorkUnitsMap[i];
  252. CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) );
  253. }
  254. CRC32_Final( &ret );
  255. return ret;
  256. }
  257. else
  258. {
  259. return false;
  260. }
  261. }
  262. private:
  263. // These are PENDING WU completions until we call Shuffle() again, at which point we actually reorder the list
  264. // based on the completed WUs.
  265. CUtlVector<byte> m_CompletedWUBits; // Bit vector of completed WUs.
  266. CUtlLinkedList<WUIndexType, WUIndexType> m_WorkUnitsRemaining;
  267. CUtlVector<WUIndexType> m_WorkUnitsMap; // Maps the 0-N indices in the CWorkUnitWalker to the list of remaining work units.
  268. // Helps us avoid some duplicates that happen during shuffling if we've completed some WUs and sent them
  269. // to the master, but the master hasn't included them in the DW_SUBPACKETID_WUS_COMPLETED_LIST yet.
  270. CUtlVector<byte> m_LocalCompletedWUBits; // Bit vector of completed WUs.
  271. // Used to control how frequently we request a reshuffle.
  272. unsigned int m_iCurShuffle;
  273. unsigned int m_iLastShuffleRequest; // The index of the shuffle we last requested a reshuffle on (don't request a reshuffle on the same one).
  274. double m_flLastShuffleTime;
  275. IShuffleRequester *m_pShuffleRequester;
  276. CWorkUnitWalker m_Walker;
  277. CCriticalSection m_CS;
  278. };
  279. class CDistributor_SDKMaster : public IWorkUnitDistributorMaster, public IShuffleRequester
  280. {
  281. public:
  282. virtual void Release()
  283. {
  284. delete this;
  285. }
  286. static void Master_WorkerThread_Static( int iThread, void *pUserData )
  287. {
  288. ((CDistributor_SDKMaster*)pUserData)->Master_WorkerThread( iThread );
  289. }
  290. void Master_WorkerThread( int iThread )
  291. {
  292. while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 && !g_bVMPIEarlyExit )
  293. {
  294. WUIndexType iWU;
  295. if ( !m_WorkUnitWalker.Thread_GetNextWorkUnit( 0, &iWU ) )
  296. {
  297. // Wait until there are some WUs to do.
  298. VMPI_Sleep( 10 );
  299. continue;
  300. }
  301. // Do this work unit.
  302. m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU ); // We do this before it's completed because otherwise if a Shuffle() occurs,
  303. // the other thread might happen to pickup this work unit and we don't want that.
  304. m_pInfo->m_WorkerInfo.m_pProcessFn( iThread, iWU, NULL );
  305. NotifyLocalMasterCompletedWorkUnit( iWU );
  306. }
  307. }
  308. virtual void DistributeWork_Master( CDSInfo *pInfo )
  309. {
  310. m_pInfo = pInfo;
  311. m_bForceShuffle = false;
  312. m_bShuffleRequested = false;
  313. m_flLastShuffleRequestServiceTime = Plat_FloatTime();
  314. // Spawn idle-priority worker threads right here.
  315. m_bUsingMasterLocalThreads = (pInfo->m_WorkerInfo.m_pProcessFn != 0);
  316. if ( VMPI_IsParamUsed( mpi_NoMasterWorkerThreads ) )
  317. {
  318. Msg( "%s found. No worker threads will be created.\n", VMPI_GetParamString( mpi_NoMasterWorkerThreads ) );
  319. m_bUsingMasterLocalThreads = false;
  320. }
  321. m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this );
  322. Shuffle();
  323. if ( m_bUsingMasterLocalThreads )
  324. RunThreads_Start( Master_WorkerThread_Static, this, k_eRunThreadsPriority_Idle );
  325. uint64 lastShuffleTime = Plat_MSTime();
  326. while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 )
  327. {
  328. VMPI_DispatchNextMessage( 200 );
  329. CheckLocalMasterCompletedWorkUnits();
  330. VMPITracker_HandleDebugKeypresses();
  331. if ( g_pDistributeWorkCallbacks && g_pDistributeWorkCallbacks->Update() )
  332. break;
  333. // Reshuffle the work units optimally every certain interval.
  334. if ( m_bForceShuffle || CheckShuffleRequest() )
  335. {
  336. Shuffle();
  337. lastShuffleTime = Plat_MSTime();
  338. m_bForceShuffle = false;
  339. }
  340. }
  341. RunThreads_End();
  342. }
  343. virtual void RequestShuffle()
  344. {
  345. m_bShuffleRequested = true;
  346. }
  347. bool CheckShuffleRequest()
  348. {
  349. if ( m_bShuffleRequested )
  350. {
  351. double flCurTime = Plat_FloatTime();
  352. if ( flCurTime - m_flLastShuffleRequestServiceTime > 2.0f ) // Only handle shuffle requests every so often.
  353. {
  354. m_flLastShuffleRequestServiceTime = flCurTime;
  355. m_bShuffleRequested = false;
  356. return true;
  357. }
  358. }
  359. return false;
  360. }
  361. void Shuffle()
  362. {
  363. // Build a list of who's working.
  364. CUtlVector<unsigned short> whosWorking;
  365. if ( m_bUsingMasterLocalThreads )
  366. {
  367. whosWorking.AddToTail( VMPI_MASTER_ID );
  368. Assert( VMPI_MASTER_ID == 0 );
  369. }
  370. {
  371. CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock();
  372. for ( int i=0; i < pWorkersReady->m_WorkersReady.Count(); i++ )
  373. {
  374. int iWorker = pWorkersReady->m_WorkersReady[i];
  375. if ( VMPI_IsProcConnected( iWorker ) )
  376. whosWorking.AddToTail( iWorker );
  377. }
  378. m_WorkersReadyCS.Unlock();
  379. }
  380. // Before sending the shuffle command, tell any of these active workers about the pending WUs completed.
  381. CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock();
  382. m_WUSCompletedMessageBuffer.setLen( 0 );
  383. if ( BuildWUsCompletedMessage( pWUsCompleted->m_Pending, m_WUSCompletedMessageBuffer ) > 0 )
  384. {
  385. for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ )
  386. {
  387. VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), whosWorking[i] );
  388. }
  389. }
  390. pWUsCompleted->m_Completed.AddMultipleToTail( pWUsCompleted->m_Pending.Count(), pWUsCompleted->m_Pending.Base() ); // Add the pending ones to the full list now.
  391. pWUsCompleted->m_Pending.RemoveAll();
  392. m_WUsCompletedCS.Unlock();
  393. // Shuffle ourselves.
  394. m_WorkUnitWalker.Shuffle( whosWorking.Count() );
  395. // Send the shuffle command to the workers.
  396. MessageBuffer mb;
  397. PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_SHUFFLE );
  398. unsigned short nWorkers = whosWorking.Count();
  399. mb.write( &nWorkers, sizeof( nWorkers ) );
  400. CRC32_t shuffleCRC = m_WorkUnitWalker.GetShuffleCRC();
  401. mb.write( &shuffleCRC, sizeof( shuffleCRC ) );
  402. // Now for each worker, assign him an index in the shuffle and send the shuffle command.
  403. int workerIDPos = mb.getLen();
  404. unsigned short id = 0;
  405. mb.write( &id, sizeof( id ) );
  406. for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ )
  407. {
  408. id = (unsigned short)i;
  409. mb.update( workerIDPos, &id, sizeof( id ) );
  410. VMPI_SendData( mb.data, mb.getLen(), whosWorking[i] );
  411. }
  412. }
  413. int BuildWUsCompletedMessage( CUtlVector<WUIndexType> &wusCompleted, MessageBuffer &mb )
  414. {
  415. PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WUS_COMPLETED_LIST );
  416. m_pInfo->WriteWUIndex( wusCompleted.Count(), &mb );
  417. for ( int i=0; i < wusCompleted.Count(); i++ )
  418. {
  419. m_pInfo->WriteWUIndex( wusCompleted[i], &mb );
  420. }
  421. return wusCompleted.Count();
  422. }
  423. virtual void OnWorkerReady( int iSource )
  424. {
  425. CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock();
  426. if ( pWorkersReady->m_WorkersReady.Find( iSource ) == -1 )
  427. {
  428. pWorkersReady->m_WorkersReady.AddToTail( iSource );
  429. // Get this guy up to speed on which WUs are done.
  430. {
  431. CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock();
  432. m_WUSCompletedMessageBuffer.setLen( 0 );
  433. BuildWUsCompletedMessage( pWUsCompleted->m_Completed, m_WUSCompletedMessageBuffer );
  434. m_WUsCompletedCS.Unlock();
  435. }
  436. VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), iSource );
  437. m_bForceShuffle = true;
  438. }
  439. m_WorkersReadyCS.Unlock();
  440. }
  441. virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit )
  442. {
  443. return Thread_HandleWorkUnitResults( iWorkUnit );
  444. }
  445. bool Thread_HandleWorkUnitResults( WUIndexType iWorkUnit )
  446. {
  447. if ( m_WorkUnitWalker.Thread_IsWorkUnitCompleted( iWorkUnit ) )
  448. {
  449. return false;
  450. }
  451. else
  452. {
  453. m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWorkUnit );
  454. // We need the lock on here because our own worker threads can call into here.
  455. CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock();
  456. pWUsCompleted->m_Pending.AddToTail( iWorkUnit );
  457. m_WUsCompletedCS.Unlock();
  458. return true;
  459. }
  460. }
  461. virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents )
  462. {
  463. if ( pBuf->data[1] == DW_SUBPACKETID_REQUEST_SHUFFLE )
  464. {
  465. if ( bIgnoreContents )
  466. return true;
  467. m_bShuffleRequested = true;
  468. }
  469. return false;
  470. }
  471. virtual void DisconnectHandler( int workerID )
  472. {
  473. CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock();
  474. if ( pWorkersReady->m_WorkersReady.Find( workerID ) != -1 )
  475. m_bForceShuffle = true;
  476. m_WorkersReadyCS.Unlock();
  477. }
  478. public:
  479. CDSInfo *m_pInfo;
  480. class CWorkersReady
  481. {
  482. public:
  483. CUtlVector<int> m_WorkersReady; // The list of workers who have said they're ready to participate.
  484. };
  485. CCriticalSectionData<CWorkersReady> m_WorkersReadyCS;
  486. class CWUsCompleted
  487. {
  488. public:
  489. CUtlVector<WUIndexType> m_Completed; // WUs completed that we have sent to workers.
  490. CUtlVector<WUIndexType> m_Pending; // WUs completed that we haven't sent to workers.
  491. };
  492. CCriticalSectionData<CWUsCompleted> m_WUsCompletedCS;
  493. MessageBuffer m_WUSCompletedMessageBuffer; // Used to send lists of completed WUs.
  494. int m_bUsingMasterLocalThreads;
  495. bool m_bForceShuffle;
  496. bool m_bShuffleRequested;
  497. double m_flLastShuffleRequestServiceTime;
  498. CShuffledWorkUnitWalker m_WorkUnitWalker;
  499. };
  500. class CDistributor_SDKWorker : public IWorkUnitDistributorWorker, public IShuffleRequester
  501. {
  502. public:
  503. virtual void Init( CDSInfo *pInfo )
  504. {
  505. m_iMyWorkUnitWalkerID = -1;
  506. m_pInfo = pInfo;
  507. m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this );
  508. }
  509. virtual void Release()
  510. {
  511. delete this;
  512. }
  513. virtual bool GetNextWorkUnit( WUIndexType *pWUIndex )
  514. {
  515. // If we don't have an ID yet, we haven't received a Shuffle() command, so we're waiting for that before working.
  516. // TODO: we could do some random WUs here while we're waiting, although that could suck if the WUs take forever to do
  517. // and they're duplicates.
  518. if ( m_iMyWorkUnitWalkerID == -1 )
  519. return false;
  520. // Look in our current shuffled list of work units for the next one.
  521. return m_WorkUnitWalker.Thread_GetNextWorkUnit( m_iMyWorkUnitWalkerID, pWUIndex );
  522. }
  523. virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU )
  524. {
  525. m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU );
  526. }
  527. virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents )
  528. {
  529. // If it's a SHUFFLE message, then shuffle..
  530. if ( pBuf->data[1] == DW_SUBPACKETID_SHUFFLE )
  531. {
  532. if ( bIgnoreContents )
  533. return true;
  534. unsigned short nWorkers, myID;
  535. CRC32_t shuffleCRC;
  536. pBuf->read( &nWorkers, sizeof( nWorkers ) );
  537. pBuf->read( &shuffleCRC, sizeof( shuffleCRC ) );
  538. pBuf->read( &myID, sizeof( myID ) );
  539. m_iMyWorkUnitWalkerID = myID;
  540. m_WorkUnitWalker.Shuffle( nWorkers );
  541. if ( m_WorkUnitWalker.GetShuffleCRC() != shuffleCRC )
  542. {
  543. static int nWarnings = 1;
  544. if ( ++nWarnings <= 2 )
  545. Warning( "\nShuffle CRC mismatch\n" );
  546. }
  547. return true;
  548. }
  549. else if ( pBuf->data[1] == DW_SUBPACKETID_WUS_COMPLETED_LIST )
  550. {
  551. if ( bIgnoreContents )
  552. return true;
  553. WUIndexType nCompleted;
  554. m_pInfo->ReadWUIndex( &nCompleted, pBuf );
  555. for ( WUIndexType i=0; i < nCompleted; i++ )
  556. {
  557. WUIndexType iWU;
  558. m_pInfo->ReadWUIndex( &iWU, pBuf );
  559. m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWU );
  560. }
  561. return true;
  562. }
  563. return false;
  564. }
  565. virtual void RequestShuffle()
  566. {
  567. // Ok.. request a reshuffle.
  568. MessageBuffer mb;
  569. PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_REQUEST_SHUFFLE );
  570. VMPI_SendData( mb.data, mb.getLen(), VMPI_MASTER_ID );
  571. }
  572. private:
  573. CDSInfo *m_pInfo;
  574. CShuffledWorkUnitWalker m_WorkUnitWalker;
  575. int m_iMyWorkUnitWalkerID;
  576. };
  577. IWorkUnitDistributorMaster* CreateWUDistributor_SDKMaster()
  578. {
  579. return new CDistributor_SDKMaster;
  580. }
  581. IWorkUnitDistributorWorker* CreateWUDistributor_SDKWorker()
  582. {
  583. return new CDistributor_SDKWorker;
  584. }