Team Fortress 2 Source Code as on 22/4/2020
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

628 lines
18 KiB

  1. //========= Copyright Valve Corporation, All rights reserved. ============//
  2. //
  3. // Purpose:
  4. //
  5. //=============================================================================//
  6. #include <windows.h>
  7. #include "vmpi.h"
  8. #include "vmpi_distribute_work.h"
  9. #include "tier0/platform.h"
  10. #include "tier0/dbg.h"
  11. #include "utlvector.h"
  12. #include "utllinkedlist.h"
  13. #include "vmpi_dispatch.h"
  14. #include "pacifier.h"
  15. #include "vstdlib/random.h"
  16. #include "mathlib/mathlib.h"
  17. #include "threadhelpers.h"
  18. #include "threads.h"
  19. #include "tier1/strtools.h"
  20. #include "tier1/utlmap.h"
  21. #include "tier1/smartptr.h"
  22. #include "tier0/icommandline.h"
  23. #include "cmdlib.h"
  24. #include "vmpi_distribute_tracker.h"
  25. #include "vmpi_distribute_work_internal.h"
  26. // To catch some bugs with 32-bit vs 64-bit and etc.
  27. #pragma warning( default : 4244 )
  28. #pragma warning( default : 4305 )
  29. #pragma warning( default : 4267 )
  30. #pragma warning( default : 4311 )
  31. #pragma warning( default : 4312 )
  32. const int MAX_DW_CALLS = 255;
  33. extern bool g_bSetThreadPriorities;
  34. // Subpacket IDs owned by DistributeWork.
  35. #define DW_SUBPACKETID_MASTER_READY 0
  36. #define DW_SUBPACKETID_WORKER_READY 1
  37. #define DW_SUBPACKETID_MASTER_FINISHED 2
  38. #define DW_SUBPACKETID_WU_RESULTS 4
  39. #define DW_SUBPACKETID_WU_STARTED 6 // A worker telling the master it has started processing a work unit.
  40. // NOTE VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE is where the IWorkUnitDistributorX classes start their subpackets.
  41. IWorkUnitDistributorCallbacks *g_pDistributeWorkCallbacks = NULL;
  42. static CDSInfo g_DSInfo;
  43. static unsigned short g_iCurDSInfo = (unsigned short)-1; // This is incremented each time DistributeWork is called.
  44. static int g_iMasterFinishedDistributeWorkCall = -1; // The worker stores this to know which DistributeWork() calls the master has finished.
  45. static int g_iMasterReadyForDistributeWorkCall = -1;
  46. // This is only valid if we're a worker and if the worker currently has threads chewing on work units.
  47. static CDSInfo *g_pCurWorkerThreadsInfo = NULL;
  48. static CUtlVector<uint64> g_wuCountByProcess;
  49. static uint64 g_totalWUCountByProcess[512];
  50. static uint64 g_nWUs; // How many work units there were this time around.
  51. static uint64 g_nCompletedWUs; // How many work units completed.
  52. static uint64 g_nDuplicatedWUs; // How many times a worker sent results for a work unit that was already completed.
  53. // Set to true if Error() is called and we want to exit early. vrad and vvis check for this in their
  54. // thread functions, so the workers quit early when the master is done rather than finishing up
  55. // potentially time-consuming work units they're working on.
  56. bool g_bVMPIEarlyExit = false;
  57. static bool g_bMasterDistributingWork = false;
  58. static IWorkUnitDistributorWorker *g_pCurDistributorWorker = NULL;
  59. static IWorkUnitDistributorMaster *g_pCurDistributorMaster = NULL;
  60. // For the stats database.
  61. WUIndexType g_ThreadWUs[4] = { ~0ull, ~0ull, ~0ull, ~0ull };
  62. class CMasterWorkUnitCompletedList
  63. {
  64. public:
  65. CUtlVector<WUIndexType> m_CompletedWUs;
  66. };
  67. static CCriticalSectionData<CMasterWorkUnitCompletedList> g_MasterWorkUnitCompletedList;
  68. int SortByWUCount( const void *elem1, const void *elem2 )
  69. {
  70. uint64 a = g_wuCountByProcess[ *((const int*)elem1) ];
  71. uint64 b = g_wuCountByProcess[ *((const int*)elem2) ];
  72. if ( a < b )
  73. return 1;
  74. else if ( a == b )
  75. return 0;
  76. else
  77. return -1;
  78. }
  79. void PrepareDistributeWorkHeader( MessageBuffer *pBuf, unsigned char cSubpacketID )
  80. {
  81. char cPacketID[2] = { g_DSInfo.m_cPacketID, cSubpacketID };
  82. pBuf->write( cPacketID, 2 );
  83. pBuf->write( &g_iCurDSInfo, sizeof( g_iCurDSInfo ) );
  84. }
  85. void ShowMPIStats(
  86. double flTimeSpent,
  87. unsigned long nBytesSent,
  88. unsigned long nBytesReceived,
  89. unsigned long nMessagesSent,
  90. unsigned long nMessagesReceived )
  91. {
  92. double flKSent = (nBytesSent + 511) / 1024;
  93. double flKRecv = (nBytesReceived + 511) / 1024;
  94. bool bShowOutput = VMPI_IsParamUsed( mpi_ShowDistributeWorkStats );
  95. bool bOldSuppress = g_bSuppressPrintfOutput;
  96. g_bSuppressPrintfOutput = !bShowOutput;
  97. Msg( "\n\n--------------------------------------------------------------\n");
  98. Msg( "Total Time : %.2f\n", flTimeSpent );
  99. Msg( "Total Bytes Sent : %dk (%.2fk/sec, %d messages)\n", (int)flKSent, flKSent / flTimeSpent, nMessagesSent );
  100. Msg( "Total Bytes Recv : %dk (%.2fk/sec, %d messages)\n", (int)flKRecv, flKRecv / flTimeSpent, nMessagesReceived );
  101. if ( g_bMPIMaster )
  102. {
  103. Msg( "Duplicated WUs : %I64u (%.1f%%)\n", g_nDuplicatedWUs, (float)g_nDuplicatedWUs * 100.0f / g_nWUs );
  104. Msg( "\nWU count by proc:\n" );
  105. int nProcs = VMPI_GetCurrentNumberOfConnections();
  106. CUtlVector<int> sortedProcs;
  107. sortedProcs.SetSize( nProcs );
  108. for ( int i=0; i < nProcs; i++ )
  109. sortedProcs[i] = i;
  110. qsort( sortedProcs.Base(), nProcs, sizeof( int ), SortByWUCount );
  111. for ( int i=0; i < nProcs; i++ )
  112. {
  113. const char *pMachineName = VMPI_GetMachineName( sortedProcs[i] );
  114. Msg( "%s", pMachineName );
  115. char formatStr[512];
  116. Q_snprintf( formatStr, sizeof( formatStr ), "%%%ds %I64u\n", 30 - strlen( pMachineName ), g_wuCountByProcess[ sortedProcs[i] ] );
  117. Msg( formatStr, ":" );
  118. }
  119. }
  120. Msg( "--------------------------------------------------------------\n\n ");
  121. g_bSuppressPrintfOutput = bOldSuppress;
  122. }
  123. void VMPI_DistributeWork_DisconnectHandler( int procID, const char *pReason )
  124. {
  125. if ( g_bMasterDistributingWork )
  126. {
  127. // Show the disconnect in the database but not on the screen.
  128. bool bOldSuppress = g_bSuppressPrintfOutput;
  129. g_bSuppressPrintfOutput = true;
  130. Msg( "VMPI_DistributeWork_DisconnectHandler( %d )\n", procID );
  131. g_bSuppressPrintfOutput = bOldSuppress;
  132. // Redistribute the WUs from this guy's partition to another worker.
  133. g_pCurDistributorMaster->DisconnectHandler( procID );
  134. }
  135. }
  136. uint64 VMPI_GetNumWorkUnitsCompleted( int iProc )
  137. {
  138. Assert( iProc >= 0 && iProc <= ARRAYSIZE( g_totalWUCountByProcess ) );
  139. return g_totalWUCountByProcess[iProc];
  140. }
  141. void HandleWorkUnitCompleted( CDSInfo *pInfo, int iSource, WUIndexType iWorkUnit, MessageBuffer *pBuf )
  142. {
  143. VMPITracker_WorkUnitCompleted( ( int ) iWorkUnit, iSource );
  144. if ( g_pCurDistributorMaster->HandleWorkUnitResults( iWorkUnit ) )
  145. {
  146. if ( g_iVMPIVerboseLevel >= 1 )
  147. Msg( "-" );
  148. ++ g_nCompletedWUs;
  149. ++ g_wuCountByProcess[iSource];
  150. ++ g_totalWUCountByProcess[iSource];
  151. // Let the master process the incoming WU data.
  152. if ( pBuf )
  153. {
  154. pInfo->m_MasterInfo.m_ReceiveFn( iWorkUnit, pBuf, iSource );
  155. }
  156. UpdatePacifier( float( g_nCompletedWUs ) / pInfo->m_nWorkUnits );
  157. }
  158. else
  159. {
  160. // Ignore it if we already got the results for this work unit.
  161. ++ g_nDuplicatedWUs;
  162. if ( g_iVMPIVerboseLevel >= 1 )
  163. Msg( "*" );
  164. }
  165. }
  166. bool DistributeWorkDispatch( MessageBuffer *pBuf, int iSource, int iPacketID )
  167. {
  168. unsigned short iCurDistributeWorkCall = *((unsigned short*)&pBuf->data[2]);
  169. if ( iCurDistributeWorkCall >= MAX_DW_CALLS )
  170. Error( "Got an invalid DistributeWork packet (id: %d, sub: %d) (iCurDW: %d).", pBuf->data[0], pBuf->data[1], iCurDistributeWorkCall );
  171. CDSInfo *pInfo = &g_DSInfo;
  172. pBuf->setOffset( 4 );
  173. switch ( pBuf->data[1] )
  174. {
  175. case DW_SUBPACKETID_MASTER_READY:
  176. {
  177. g_iMasterReadyForDistributeWorkCall = iCurDistributeWorkCall;
  178. return true;
  179. }
  180. case DW_SUBPACKETID_WORKER_READY:
  181. {
  182. if ( iCurDistributeWorkCall > g_iCurDSInfo || !g_bMPIMaster )
  183. Error( "State incorrect on master for DW_SUBPACKETID_WORKER_READY packet from %s.", VMPI_GetMachineName( iSource ) );
  184. if ( iCurDistributeWorkCall == g_iCurDSInfo )
  185. {
  186. // Ok, give this guy some WUs.
  187. if ( g_pCurDistributorMaster )
  188. g_pCurDistributorMaster->OnWorkerReady( iSource );
  189. }
  190. return true;
  191. }
  192. case DW_SUBPACKETID_MASTER_FINISHED:
  193. {
  194. g_iMasterFinishedDistributeWorkCall = iCurDistributeWorkCall;
  195. return true;
  196. }
  197. // Worker sends this to tell the master it has started on a work unit.
  198. case DW_SUBPACKETID_WU_STARTED:
  199. {
  200. if ( iCurDistributeWorkCall != g_iCurDSInfo )
  201. return true;
  202. WUIndexType iWU;
  203. pBuf->read( &iWU, sizeof( iWU ) );
  204. VMPITracker_WorkUnitStarted( ( int ) iWU, iSource );
  205. return true;
  206. }
  207. case DW_SUBPACKETID_WU_RESULTS:
  208. {
  209. // We only care about work results for the iteration we're in.
  210. if ( iCurDistributeWorkCall != g_iCurDSInfo )
  211. return true;
  212. WUIndexType iWorkUnit;
  213. pBuf->read( &iWorkUnit, sizeof( iWorkUnit ) );
  214. if ( iWorkUnit >= pInfo->m_nWorkUnits )
  215. {
  216. Error( "DistributeWork: got an invalid work unit index (%I64u for WU count of %I64u).", iWorkUnit, pInfo->m_nWorkUnits );
  217. }
  218. HandleWorkUnitCompleted( pInfo, iSource, iWorkUnit, pBuf );
  219. return true;
  220. }
  221. default:
  222. {
  223. if ( g_pCurDistributorMaster )
  224. return g_pCurDistributorMaster->HandlePacket( pBuf, iSource, iCurDistributeWorkCall != g_iCurDSInfo );
  225. else if ( g_pCurDistributorWorker )
  226. return g_pCurDistributorWorker->HandlePacket( pBuf, iSource, iCurDistributeWorkCall != g_iCurDSInfo );
  227. else
  228. return false;
  229. }
  230. }
  231. }
  232. EWorkUnitDistributor VMPI_GetActiveWorkUnitDistributor()
  233. {
  234. if ( VMPI_IsParamUsed( mpi_UseSDKDistributor ) )
  235. {
  236. Msg( "Found %s.\n", VMPI_GetParamString( mpi_UseSDKDistributor ) );
  237. return k_eWorkUnitDistributor_SDK;
  238. }
  239. else if ( VMPI_IsParamUsed( mpi_UseDefaultDistributor ) )
  240. {
  241. Msg( "Found %s.\n", VMPI_GetParamString( mpi_UseDefaultDistributor ) );
  242. return k_eWorkUnitDistributor_Default;
  243. }
  244. else
  245. {
  246. if ( VMPI_IsSDKMode() )
  247. return k_eWorkUnitDistributor_SDK;
  248. else
  249. return k_eWorkUnitDistributor_Default;
  250. }
  251. }
  252. void PreDistributeWorkSync( CDSInfo *pInfo )
  253. {
  254. if ( g_bMPIMaster )
  255. {
  256. // Send a message telling all the workers we're ready to go on this DistributeWork call.
  257. MessageBuffer mb;
  258. PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_MASTER_READY );
  259. VMPI_SendData( mb.data, mb.getLen(), VMPI_PERSISTENT );
  260. }
  261. else
  262. {
  263. if ( g_iVMPIVerboseLevel >= 1 )
  264. Msg( "PreDistributeWorkSync: waiting for master\n" );
  265. // Wait for the master's message saying it's ready to go.
  266. while ( g_iMasterReadyForDistributeWorkCall < g_iCurDSInfo )
  267. {
  268. VMPI_DispatchNextMessage();
  269. }
  270. if ( g_iVMPIVerboseLevel >= 1 )
  271. Msg( "PreDistributeWorkSync: master ready\n" );
  272. // Now tell the master we're ready.
  273. MessageBuffer mb;
  274. PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WORKER_READY );
  275. VMPI_SendData( mb.data, mb.getLen(), VMPI_MASTER_ID );
  276. }
  277. }
  278. void DistributeWork_Master( CDSInfo *pInfo, ProcessWorkUnitFn processFn, ReceiveWorkUnitFn receiveFn )
  279. {
  280. pInfo->m_WorkerInfo.m_pProcessFn = processFn;
  281. pInfo->m_MasterInfo.m_ReceiveFn = receiveFn;
  282. VMPITracker_Start( (int) pInfo->m_nWorkUnits );
  283. g_bMasterDistributingWork = true;
  284. g_pCurDistributorMaster->DistributeWork_Master( pInfo );
  285. g_bMasterDistributingWork = false;
  286. VMPITracker_End();
  287. // Tell all workers to move on.
  288. MessageBuffer mb;
  289. PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_MASTER_FINISHED );
  290. VMPI_SendData( mb.data, mb.getLen(), VMPI_PERSISTENT );
  291. // Clear the master's local completed work unit list.
  292. CMasterWorkUnitCompletedList *pList = g_MasterWorkUnitCompletedList.Lock();
  293. pList->m_CompletedWUs.RemoveAll();
  294. g_MasterWorkUnitCompletedList.Unlock();
  295. }
  296. void NotifyLocalMasterCompletedWorkUnit( WUIndexType iWorkUnit )
  297. {
  298. CMasterWorkUnitCompletedList *pList = g_MasterWorkUnitCompletedList.Lock();
  299. pList->m_CompletedWUs.AddToTail( iWorkUnit );
  300. g_MasterWorkUnitCompletedList.Unlock();
  301. }
  302. void CheckLocalMasterCompletedWorkUnits()
  303. {
  304. CMasterWorkUnitCompletedList *pList = g_MasterWorkUnitCompletedList.Lock();
  305. for ( int i=0; i < pList->m_CompletedWUs.Count(); i++ )
  306. {
  307. HandleWorkUnitCompleted( &g_DSInfo, 0, pList->m_CompletedWUs[i], NULL );
  308. }
  309. pList->m_CompletedWUs.RemoveAll();
  310. g_MasterWorkUnitCompletedList.Unlock();
  311. }
  312. void TellMasterThatWorkerStartedAWorkUnit( MessageBuffer &mb, CDSInfo *pInfo, WUIndexType iWU )
  313. {
  314. mb.setLen( 0 );
  315. PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WU_STARTED );
  316. mb.write( &iWU, sizeof( iWU ) );
  317. VMPI_SendData( mb.data, mb.getLen(), VMPI_MASTER_ID, k_eVMPISendFlags_GroupPackets );
  318. }
  319. void VMPI_WorkerThread( int iThread, void *pUserData )
  320. {
  321. CDSInfo *pInfo = (CDSInfo*)pUserData;
  322. CWorkerInfo *pWorkerInfo = &pInfo->m_WorkerInfo;
  323. // Get our index for running work units
  324. uint64 idxRunningWorkUnit = (uint64) iThread;
  325. {
  326. CCriticalSectionLock csLock( &pWorkerInfo->m_WorkUnitsRunningCS );
  327. csLock.Lock();
  328. pWorkerInfo->m_WorkUnitsRunning.ExpandWindow( idxRunningWorkUnit, ~0ull );
  329. csLock.Unlock();
  330. }
  331. MessageBuffer mb;
  332. PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WU_RESULTS );
  333. MessageBuffer mbStartedWorkUnit; // Special messagebuffer used to tell the master when we started a work unit.
  334. while ( g_iMasterFinishedDistributeWorkCall < g_iCurDSInfo && !g_bVMPIEarlyExit )
  335. {
  336. WUIndexType iWU;
  337. // Quit out when there are no more work units.
  338. if ( !g_pCurDistributorWorker->GetNextWorkUnit( &iWU ) )
  339. {
  340. // Wait until there are some WUs to do. This should probably use event handles.
  341. VMPI_Sleep( 10 );
  342. continue;
  343. }
  344. CCriticalSectionLock csLock( &pWorkerInfo->m_WorkUnitsRunningCS );
  345. csLock.Lock();
  346. // Check if this WU is not running
  347. WUIndexType const *pBegin = &pWorkerInfo->m_WorkUnitsRunning.Get( 0ull ), *pEnd = pBegin + pWorkerInfo->m_WorkUnitsRunning.PastVisibleIndex();
  348. WUIndexType const *pRunningWu = GenericFind( pBegin, pEnd, iWU );
  349. if ( pRunningWu != pEnd )
  350. continue;
  351. // We are running it
  352. pWorkerInfo->m_WorkUnitsRunning.Get( idxRunningWorkUnit ) = iWU;
  353. csLock.Unlock();
  354. // Process this WU and send the results to the master.
  355. mb.setLen( 4 );
  356. mb.write( &iWU, sizeof( iWU ) );
  357. // Set the current WU for the stats database.
  358. if ( iThread >= 0 && iThread < 4 )
  359. {
  360. g_ThreadWUs[iThread] = iWU;
  361. }
  362. // Tell the master we're starting on this WU.
  363. TellMasterThatWorkerStartedAWorkUnit( mbStartedWorkUnit, pInfo, iWU );
  364. pWorkerInfo->m_pProcessFn( iThread, iWU, &mb );
  365. g_pCurDistributorWorker->NoteLocalWorkUnitCompleted( iWU );
  366. VMPI_SendData( mb.data, mb.getLen(), VMPI_MASTER_ID, /*k_eVMPISendFlags_GroupPackets*/0 );
  367. // Flush grouped packets every once in a while.
  368. //VMPI_FlushGroupedPackets( 1000 );
  369. }
  370. if ( g_iVMPIVerboseLevel >= 1 )
  371. Msg( "Worker thread exiting.\n" );
  372. }
  373. void DistributeWork_Worker( CDSInfo *pInfo, ProcessWorkUnitFn processFn )
  374. {
  375. if ( g_iVMPIVerboseLevel >= 1 )
  376. Msg( "VMPI_DistributeWork call %d started.\n", g_iCurDSInfo+1 );
  377. CWorkerInfo *pWorkerInfo = &pInfo->m_WorkerInfo;
  378. pWorkerInfo->m_pProcessFn = processFn;
  379. g_pCurWorkerThreadsInfo = pInfo;
  380. g_pCurDistributorWorker->Init( pInfo );
  381. // Start a couple threads to do the work.
  382. RunThreads_Start( VMPI_WorkerThread, pInfo, g_bSetThreadPriorities ? k_eRunThreadsPriority_Idle : k_eRunThreadsPriority_UseGlobalState );
  383. if ( g_iVMPIVerboseLevel >= 1 )
  384. Msg( "RunThreads_Start finished successfully.\n" );
  385. if ( VMPI_IsSDKMode() )
  386. {
  387. Msg( "\n" );
  388. while ( g_iMasterFinishedDistributeWorkCall < g_iCurDSInfo )
  389. {
  390. VMPI_DispatchNextMessage( 300 );
  391. Msg( "\rThreads status: " );
  392. for ( int i=0; i < ARRAYSIZE( g_ThreadWUs ); i++ )
  393. {
  394. if ( g_ThreadWUs[i] != ~0ull )
  395. Msg( "%d: WU %5d ", i, (int)g_ThreadWUs[i] );
  396. }
  397. VMPI_FlushGroupedPackets();
  398. }
  399. Msg( "\n" );
  400. }
  401. else
  402. {
  403. while ( g_iMasterFinishedDistributeWorkCall < g_iCurDSInfo )
  404. {
  405. VMPI_DispatchNextMessage();
  406. }
  407. }
  408. // Close the threads.
  409. g_pCurWorkerThreadsInfo = NULL;
  410. RunThreads_End();
  411. if ( g_iVMPIVerboseLevel >= 1 )
  412. Msg( "VMPI_DistributeWork call %d finished.\n", g_iCurDSInfo+1 );
  413. }
  414. // This is called by VMPI_Finalize in case it's shutting down due to an Error() call.
  415. // In this case, it's important that the worker threads here are shut down before VMPI shuts
  416. // down its sockets.
  417. void DistributeWork_Cancel()
  418. {
  419. if ( g_pCurWorkerThreadsInfo )
  420. {
  421. Msg( "\nDistributeWork_Cancel saves the day!\n" );
  422. g_pCurWorkerThreadsInfo->m_bMasterFinished = true;
  423. g_bVMPIEarlyExit = true;
  424. RunThreads_End();
  425. }
  426. }
  427. // Returns time it took to finish the work.
  428. double DistributeWork(
  429. uint64 nWorkUnits, // how many work units to dole out
  430. char cPacketID,
  431. ProcessWorkUnitFn processFn, // workers implement this to process a work unit and send results back
  432. ReceiveWorkUnitFn receiveFn // the master implements this to receive a work unit
  433. )
  434. {
  435. ++g_iCurDSInfo;
  436. if ( g_iCurDSInfo == 0 )
  437. {
  438. // Register our disconnect handler so we can deal with it if clients bail out.
  439. if ( g_bMPIMaster )
  440. {
  441. VMPI_AddDisconnectHandler( VMPI_DistributeWork_DisconnectHandler );
  442. }
  443. }
  444. else if ( g_iCurDSInfo >= MAX_DW_CALLS )
  445. {
  446. Error( "DistributeWork: called more than %d times.\n", MAX_DW_CALLS );
  447. }
  448. CDSInfo *pInfo = &g_DSInfo;
  449. pInfo->m_cPacketID = cPacketID;
  450. pInfo->m_nWorkUnits = nWorkUnits;
  451. // Make all the workers wait until the master is ready.
  452. PreDistributeWorkSync( pInfo );
  453. g_nWUs = nWorkUnits;
  454. g_nCompletedWUs = 0ull;
  455. g_nDuplicatedWUs = 0ull;
  456. // Setup stats info.
  457. double flMPIStartTime = Plat_FloatTime();
  458. g_wuCountByProcess.SetCount( 512 );
  459. memset( g_wuCountByProcess.Base(), 0, sizeof( int ) * g_wuCountByProcess.Count() );
  460. unsigned long nBytesSentStart = g_nBytesSent;
  461. unsigned long nBytesReceivedStart = g_nBytesReceived;
  462. unsigned long nMessagesSentStart = g_nMessagesSent;
  463. unsigned long nMessagesReceivedStart = g_nMessagesReceived;
  464. EWorkUnitDistributor eWorkUnitDistributor = VMPI_GetActiveWorkUnitDistributor();
  465. if ( g_bMPIMaster )
  466. {
  467. Assert( !g_pCurDistributorMaster );
  468. g_pCurDistributorMaster = ( eWorkUnitDistributor == k_eWorkUnitDistributor_SDK ? CreateWUDistributor_SDKMaster() : CreateWUDistributor_DefaultMaster() );
  469. DistributeWork_Master( pInfo, processFn, receiveFn );
  470. g_pCurDistributorMaster->Release();
  471. g_pCurDistributorMaster = NULL;
  472. }
  473. else
  474. {
  475. Assert( !g_pCurDistributorWorker );
  476. g_pCurDistributorWorker = ( eWorkUnitDistributor == k_eWorkUnitDistributor_SDK ? CreateWUDistributor_SDKWorker() : CreateWUDistributor_DefaultWorker() );
  477. DistributeWork_Worker( pInfo, processFn );
  478. g_pCurDistributorWorker->Release();
  479. g_pCurDistributorWorker = NULL;
  480. }
  481. double flTimeSpent = Plat_FloatTime() - flMPIStartTime;
  482. ShowMPIStats(
  483. flTimeSpent,
  484. g_nBytesSent - nBytesSentStart,
  485. g_nBytesReceived - nBytesReceivedStart,
  486. g_nMessagesSent - nMessagesSentStart,
  487. g_nMessagesReceived - nMessagesReceivedStart
  488. );
  489. // Mark that the threads aren't working on anything at the moment.
  490. for ( int i=0; i < ARRAYSIZE( g_ThreadWUs ); i++ )
  491. g_ThreadWUs[i] = ~0ull;
  492. return flTimeSpent;
  493. }