Counter Strike : Global Offensive Source Code
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

635 lines
16 KiB

  1. //===== Copyright � 1996-2005, Valve Corporation, All rights reserved. ======//
  2. //
  3. // Purpose:
  4. //
  5. // $NoKeywords: $
  6. //
  7. //===========================================================================//
  8. #include <windows.h>
  9. #include "vis.h"
  10. #include "threads.h"
  11. #include "stdlib.h"
  12. #include "pacifier.h"
  13. #include "mpi_stats.h"
  14. #include "vmpi.h"
  15. #include "vmpi_dispatch.h"
  16. #include "vmpi_filesystem.h"
  17. #include "vmpi_distribute_work.h"
  18. #include "iphelpers.h"
  19. #include "threadhelpers.h"
  20. #include "vstdlib/random.h"
  21. #include "vmpi_tools_shared.h"
  22. #include <conio.h>
  23. #include "scratchpad_helpers.h"
  24. #include "tier0/fasttimer.h"
  25. #define VMPI_VVIS_PACKET_ID 1
  26. // Sub packet IDs.
  27. #define VMPI_SUBPACKETID_DISCONNECT_NOTIFY 3 // We send ourselves this when there is a disconnect.
  28. #define VMPI_SUBPACKETID_BASEPORTALVIS 5
  29. #define VMPI_SUBPACKETID_PORTALFLOW 6
  30. #define VMPI_BASEPORTALVIS_RESULTS 7
  31. #define VMPI_BASEPORTALVIS_WORKER_DONE 8
  32. #define VMPI_PORTALFLOW_RESULTS 9
  33. #define VMPI_SUBPACKETID_BASEPORTALVIS_SYNC 11
  34. #define VMPI_SUBPACKETID_PORTALFLOW_SYNC 12
  35. #define VMPI_SUBPACKETID_MC_ADDR 13
  36. extern bool fastvis;
  37. // The worker waits until these are true.
  38. bool g_bBasePortalVisSync = false;
  39. bool g_bPortalFlowSync = false;
  40. CUtlVector<char> g_BasePortalVisResultsFilename;
  41. CCycleCount g_CPUTime;
  42. // This stuff is all for the multicast channel the master uses to send out the portal results.
  43. ISocket *g_pPortalMCSocket = NULL;
  44. CIPAddr g_PortalMCAddr;
  45. bool g_bGotMCAddr = false;
  46. HANDLE g_hMCThread = NULL;
  47. CEvent g_MCThreadExitEvent;
  48. unsigned long g_PortalMCThreadUniqueID = 0;
  49. int g_nMulticastPortalsReceived = 0;
  50. // Handle VVIS packets.
  51. bool VVIS_DispatchFn( MessageBuffer *pBuf, int iSource, int iPacketID )
  52. {
  53. switch ( pBuf->data[1] )
  54. {
  55. case VMPI_SUBPACKETID_MC_ADDR:
  56. {
  57. pBuf->setOffset( 2 );
  58. pBuf->read( &g_PortalMCAddr, sizeof( g_PortalMCAddr ) );
  59. g_bGotMCAddr = true;
  60. return true;
  61. }
  62. case VMPI_SUBPACKETID_DISCONNECT_NOTIFY:
  63. {
  64. // This is just used to cause nonblocking dispatches to jump out so loops like the one
  65. // in AppBarrier can handle the fact that there are disconnects.
  66. return true;
  67. }
  68. case VMPI_SUBPACKETID_BASEPORTALVIS_SYNC:
  69. {
  70. g_bBasePortalVisSync = true;
  71. return true;
  72. }
  73. case VMPI_SUBPACKETID_PORTALFLOW_SYNC:
  74. {
  75. g_bPortalFlowSync = true;
  76. return true;
  77. }
  78. case VMPI_BASEPORTALVIS_RESULTS:
  79. {
  80. const char *pFilename = &pBuf->data[2];
  81. g_BasePortalVisResultsFilename.CopyArray( pFilename, strlen( pFilename ) + 1 );
  82. return true;
  83. }
  84. default:
  85. {
  86. return false;
  87. }
  88. }
  89. }
  90. CDispatchReg g_VVISDispatchReg( VMPI_VVIS_PACKET_ID, VVIS_DispatchFn ); // register to handle the messages we want
  91. void VMPI_DeletePortalMCSocket()
  92. {
  93. // Stop the thread if it exists.
  94. if ( g_hMCThread )
  95. {
  96. g_MCThreadExitEvent.SetEvent();
  97. WaitForSingleObject( g_hMCThread, INFINITE );
  98. CloseHandle( g_hMCThread );
  99. g_hMCThread = NULL;
  100. }
  101. if ( g_pPortalMCSocket )
  102. {
  103. g_pPortalMCSocket->Release();
  104. g_pPortalMCSocket = NULL;
  105. }
  106. }
  107. void VVIS_SetupMPI( int &argc, char **&argv )
  108. {
  109. if ( !VMPI_FindArg( argc, argv, "-mpi", "" ) && !VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Worker ), "" ) )
  110. return;
  111. CmdLib_AtCleanup( VMPI_Stats_Term );
  112. CmdLib_AtCleanup( VMPI_DeletePortalMCSocket );
  113. VMPI_Stats_InstallSpewHook();
  114. // Force local mode?
  115. VMPIRunMode mode;
  116. if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Local ), "" ) )
  117. mode = VMPI_RUN_LOCAL;
  118. else
  119. mode = VMPI_RUN_NETWORKED;
  120. //
  121. // Extract mpi specific arguments
  122. //
  123. Msg( "Initializing VMPI...\n" );
  124. if ( !VMPI_Init( argc, argv, "dependency_info_vvis.txt", HandleMPIDisconnect, mode ) )
  125. {
  126. Error( "MPI_Init failed." );
  127. }
  128. StatsDB_InitStatsDatabase( argc, argv, "dbinfo_vvis.txt" );
  129. }
  130. void ProcessBasePortalVis( int iThread, uint64 iPortal, MessageBuffer *pBuf )
  131. {
  132. CTimeAdder adder( &g_CPUTime );
  133. BasePortalVis( iThread, iPortal );
  134. // Send my result to the master
  135. if ( pBuf )
  136. {
  137. portal_t * p = &portals[iPortal];
  138. pBuf->write( p->portalfront, portalbytes );
  139. pBuf->write( p->portalflood, portalbytes );
  140. }
  141. }
  142. void ReceiveBasePortalVis( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker )
  143. {
  144. portal_t * p = &portals[iWorkUnit];
  145. if ( p->portalflood != 0 || p->portalfront != 0 || p->portalvis != 0)
  146. {
  147. Msg("Duplicate portal %d\n", iWorkUnit);
  148. }
  149. if ( pBuf->getLen() - pBuf->getOffset() != portalbytes*2 )
  150. Error( "Invalid packet in ReceiveBasePortalVis." );
  151. //
  152. // allocate memory for bitwise vis solutions for this portal
  153. //
  154. p->portalfront = (byte*)malloc (portalbytes);
  155. pBuf->read( p->portalfront, portalbytes );
  156. p->portalflood = (byte*)malloc (portalbytes);
  157. pBuf->read( p->portalflood, portalbytes );
  158. p->portalvis = (byte*)malloc (portalbytes);
  159. memset (p->portalvis, 0, portalbytes);
  160. p->nummightsee = CountBits( p->portalflood, g_numportals*2 );
  161. }
  162. //-----------------------------------------
  163. //
  164. // Run BasePortalVis across all available processing nodes
  165. // Then collect and redistribute the results.
  166. //
  167. void RunMPIBasePortalVis()
  168. {
  169. int i;
  170. Msg( "\n\nportalbytes: %d\nNum Work Units: %d\nTotal data size: %d\n", portalbytes, g_numportals*2, portalbytes*g_numportals*2 );
  171. Msg("%-20s ", "BasePortalVis:");
  172. if ( g_bMPIMaster )
  173. StartPacifier("");
  174. VMPI_SetCurrentStage( "RunMPIBasePortalVis" );
  175. // Note: we're aiming for about 1500 portals in a map, so about 3000 work units.
  176. g_CPUTime.Init();
  177. double elapsed = DistributeWork(
  178. g_numportals * 2, // # work units
  179. ProcessBasePortalVis, // Worker function to process work units
  180. ReceiveBasePortalVis // Master function to receive work results
  181. );
  182. if ( g_bMPIMaster )
  183. {
  184. EndPacifier( false );
  185. Msg( " (%d)\n", (int)elapsed );
  186. }
  187. //
  188. // Distribute the results to all the workers.
  189. //
  190. if ( g_bMPIMaster )
  191. {
  192. if ( !fastvis )
  193. {
  194. VMPI_SetCurrentStage( "SendPortalResults" );
  195. // Store all the portal results in a temp file and multicast that to the workers.
  196. CUtlVector<char> allPortalData;
  197. allPortalData.SetSize( g_numportals * 2 * portalbytes * 2 );
  198. char *pOut = allPortalData.Base();
  199. for ( i=0; i < g_numportals * 2; i++)
  200. {
  201. portal_t *p = &portals[i];
  202. memcpy( pOut, p->portalfront, portalbytes );
  203. pOut += portalbytes;
  204. memcpy( pOut, p->portalflood, portalbytes );
  205. pOut += portalbytes;
  206. }
  207. const char *pVirtualFilename = "--portal-results--";
  208. VMPI_FileSystem_CreateVirtualFile( pVirtualFilename, allPortalData.Base(), allPortalData.Count() );
  209. char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_BASEPORTALVIS_RESULTS };
  210. VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), pVirtualFilename, strlen( pVirtualFilename ) + 1, VMPI_PERSISTENT );
  211. }
  212. }
  213. else
  214. {
  215. VMPI_SetCurrentStage( "RecvPortalResults" );
  216. // Wait until we've received the filename from the master.
  217. while ( g_BasePortalVisResultsFilename.Count() == 0 )
  218. {
  219. VMPI_DispatchNextMessage();
  220. }
  221. // Open
  222. FileHandle_t fp = g_pFileSystem->Open( g_BasePortalVisResultsFilename.Base(), "rb", VMPI_VIRTUAL_FILES_PATH_ID );
  223. if ( !fp )
  224. Error( "Can't open '%s' to read portal info.", g_BasePortalVisResultsFilename.Base() );
  225. for ( i=0; i < g_numportals * 2; i++)
  226. {
  227. portal_t *p = &portals[i];
  228. p->portalfront = (byte*)malloc (portalbytes);
  229. g_pFileSystem->Read( p->portalfront, portalbytes, fp );
  230. p->portalflood = (byte*)malloc (portalbytes);
  231. g_pFileSystem->Read( p->portalflood, portalbytes, fp );
  232. p->portalvis = (byte*)malloc (portalbytes);
  233. memset (p->portalvis, 0, portalbytes);
  234. p->nummightsee = CountBits (p->portalflood, g_numportals*2);
  235. }
  236. g_pFileSystem->Close( fp );
  237. }
  238. if ( !g_bMPIMaster )
  239. {
  240. if ( g_iVMPIVerboseLevel >= 1 )
  241. Msg( "\n%% worker CPU utilization during BasePortalVis: %.1f\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads );
  242. }
  243. }
  244. void ProcessPortalFlow( int iThread, uint64 iPortal, MessageBuffer *pBuf )
  245. {
  246. // Process Portal and distribute results
  247. CTimeAdder adder( &g_CPUTime );
  248. PortalFlow( iThread, iPortal );
  249. // Send my result to root and potentially the other slaves
  250. // The slave results are read in RecursiveLeafFlow
  251. //
  252. if ( pBuf )
  253. {
  254. portal_t * p = sorted_portals[iPortal];
  255. pBuf->write( p->portalvis, portalbytes );
  256. }
  257. }
  258. void ReceivePortalFlow( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker )
  259. {
  260. portal_t *p = sorted_portals[iWorkUnit];
  261. if ( p->status != stat_done )
  262. {
  263. pBuf->read( p->portalvis, portalbytes );
  264. p->status = stat_done;
  265. // Multicast the status of this portal out.
  266. if ( g_pPortalMCSocket )
  267. {
  268. char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_PORTALFLOW_RESULTS };
  269. void *chunks[4] = { cPacketID, &g_PortalMCThreadUniqueID, &iWorkUnit, p->portalvis };
  270. int chunkLengths[4] = { sizeof( cPacketID ), sizeof( g_PortalMCThreadUniqueID ), sizeof( iWorkUnit ), portalbytes };
  271. g_pPortalMCSocket->SendChunksTo( &g_PortalMCAddr, chunks, chunkLengths, ARRAYSIZE( chunks ) );
  272. }
  273. }
  274. }
  275. DWORD WINAPI PortalMCThreadFn( LPVOID p )
  276. {
  277. CUtlVector<char> data;
  278. data.SetSize( portalbytes + 128 );
  279. DWORD waitTime = 0;
  280. while ( WaitForSingleObject( g_MCThreadExitEvent.GetEventHandle(), waitTime ) != WAIT_OBJECT_0 )
  281. {
  282. CIPAddr ipFrom;
  283. int len = g_pPortalMCSocket->RecvFrom( data.Base(), data.Count(), &ipFrom );
  284. if ( len == -1 )
  285. {
  286. waitTime = 20;
  287. }
  288. else
  289. {
  290. // These lengths must match exactly what is sent in ReceivePortalFlow.
  291. if ( len == 2 + sizeof( g_PortalMCThreadUniqueID ) + sizeof( int ) + portalbytes )
  292. {
  293. // Perform more validation...
  294. if ( data[0] == VMPI_VVIS_PACKET_ID && data[1] == VMPI_PORTALFLOW_RESULTS )
  295. {
  296. if ( *((unsigned long*)&data[2]) == g_PortalMCThreadUniqueID )
  297. {
  298. int iWorkUnit = *((int*)&data[6]);
  299. if ( iWorkUnit >= 0 && iWorkUnit < g_numportals*2 )
  300. {
  301. portal_t *p = sorted_portals[iWorkUnit];
  302. if ( p )
  303. {
  304. ++g_nMulticastPortalsReceived;
  305. memcpy( p->portalvis, &data[10], portalbytes );
  306. p->status = stat_done;
  307. waitTime = 0;
  308. }
  309. }
  310. }
  311. }
  312. }
  313. }
  314. }
  315. return 0;
  316. }
  317. void MCThreadCleanupFn()
  318. {
  319. g_MCThreadExitEvent.SetEvent();
  320. }
  321. // --------------------------------------------------------------------------------- //
  322. // Cheesy hack to let them stop the job early and keep the results of what has
  323. // been done so far.
  324. // --------------------------------------------------------------------------------- //
  325. class CVisDistributeWorkCallbacks : public IWorkUnitDistributorCallbacks
  326. {
  327. public:
  328. CVisDistributeWorkCallbacks()
  329. {
  330. m_bExitedEarly = false;
  331. m_iState = STATE_NONE;
  332. }
  333. virtual bool Update()
  334. {
  335. if ( kbhit() )
  336. {
  337. int key = toupper( getch() );
  338. if ( m_iState == STATE_NONE )
  339. {
  340. if ( key == 'M' )
  341. {
  342. m_iState = STATE_AT_MENU;
  343. Warning("\n\n"
  344. "----------------------\n"
  345. "1. Write scratchpad file.\n"
  346. "2. Exit early and use fast vis for remaining portals.\n"
  347. "\n"
  348. "0. Exit menu.\n"
  349. "----------------------\n"
  350. "\n"
  351. );
  352. }
  353. }
  354. else if ( m_iState == STATE_AT_MENU )
  355. {
  356. if ( key == '1' )
  357. {
  358. Warning(
  359. "\n"
  360. "\nWriting scratchpad file."
  361. "\nCommand line: scratchpad3dviewer -file scratch.pad\n"
  362. "\nRed portals are the portals that are fast vis'd."
  363. "\n"
  364. );
  365. m_iState = STATE_NONE;
  366. IScratchPad3D *pPad = ScratchPad3D_Create( "scratch.pad" );
  367. if ( pPad )
  368. {
  369. ScratchPad_DrawWorld( pPad, false );
  370. // Draw the portals that haven't been vis'd.
  371. for ( int i=0; i < g_numportals*2; i++ )
  372. {
  373. portal_t *p = sorted_portals[i];
  374. ScratchPad_DrawWinding( pPad, p->winding->numpoints, p->winding->points, Vector( 1, 0, 0 ), Vector( .3, .3, .3 ) );
  375. }
  376. pPad->Release();
  377. }
  378. }
  379. else if ( key == '2' )
  380. {
  381. // Exit the process early.
  382. m_bExitedEarly = true;
  383. return true;
  384. }
  385. else if ( key == '0' )
  386. {
  387. m_iState = STATE_NONE;
  388. Warning( "\n\nExited menu.\n\n" );
  389. }
  390. }
  391. }
  392. return false;
  393. }
  394. public:
  395. enum
  396. {
  397. STATE_NONE,
  398. STATE_AT_MENU
  399. };
  400. bool m_bExitedEarly;
  401. int m_iState; // STATE_ enum.
  402. };
  403. CVisDistributeWorkCallbacks g_VisDistributeWorkCallbacks;
  404. void CheckExitedEarly()
  405. {
  406. if ( g_VisDistributeWorkCallbacks.m_bExitedEarly )
  407. {
  408. Warning( "\nExited early, using fastvis results...\n" );
  409. Warning( "Exited early, using fastvis results...\n" );
  410. // Use the fastvis results for portals that we didn't get results for.
  411. for ( int i=0; i < g_numportals*2; i++ )
  412. {
  413. if ( sorted_portals[i]->status != stat_done )
  414. {
  415. sorted_portals[i]->portalvis = sorted_portals[i]->portalflood;
  416. sorted_portals[i]->status = stat_done;
  417. }
  418. }
  419. }
  420. }
  421. //-----------------------------------------
  422. //
  423. // Run PortalFlow across all available processing nodes
  424. //
  425. void RunMPIPortalFlow()
  426. {
  427. Msg( "%-20s ", "MPIPortalFlow:" );
  428. if ( g_bMPIMaster )
  429. StartPacifier("");
  430. // Workers wait until we get the MC socket address.
  431. g_PortalMCThreadUniqueID = StatsDB_GetUniqueJobID();
  432. if ( g_bMPIMaster )
  433. {
  434. CCycleCount cnt;
  435. cnt.Sample();
  436. CUniformRandomStream randomStream;
  437. randomStream.SetSeed( cnt.GetMicroseconds() );
  438. g_PortalMCAddr.port = randomStream.RandomInt( 22000, 25000 ); // Pulled out of something else.
  439. g_PortalMCAddr.ip[0] = (unsigned char)RandomInt( 225, 238 );
  440. g_PortalMCAddr.ip[1] = (unsigned char)RandomInt( 0, 255 );
  441. g_PortalMCAddr.ip[2] = (unsigned char)RandomInt( 0, 255 );
  442. g_PortalMCAddr.ip[3] = (unsigned char)RandomInt( 3, 255 );
  443. g_pPortalMCSocket = CreateIPSocket();
  444. int i=0;
  445. for ( i; i < 5; i++ )
  446. {
  447. if ( g_pPortalMCSocket->BindToAny( randomStream.RandomInt( 20000, 30000 ) ) )
  448. break;
  449. }
  450. if ( i == 5 )
  451. {
  452. Error( "RunMPIPortalFlow: can't open a socket to multicast on." );
  453. }
  454. char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_SUBPACKETID_MC_ADDR };
  455. VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), &g_PortalMCAddr, sizeof( g_PortalMCAddr ), VMPI_PERSISTENT );
  456. }
  457. else
  458. {
  459. VMPI_SetCurrentStage( "wait for MC address" );
  460. while ( !g_bGotMCAddr )
  461. {
  462. VMPI_DispatchNextMessage();
  463. }
  464. // Open our multicast receive socket.
  465. g_pPortalMCSocket = CreateMulticastListenSocket( g_PortalMCAddr );
  466. if ( !g_pPortalMCSocket )
  467. {
  468. char err[512];
  469. IP_GetLastErrorString( err, sizeof( err ) );
  470. Error( "RunMPIPortalFlow: CreateMulticastListenSocket failed. (%s).", err );
  471. }
  472. // Make a thread to listen for the data on the multicast socket.
  473. DWORD dwDummy = 0;
  474. g_MCThreadExitEvent.Init( false, false );
  475. // Make sure we kill the MC thread if the app exits ungracefully.
  476. CmdLib_AtCleanup( MCThreadCleanupFn );
  477. g_hMCThread = CreateThread(
  478. NULL,
  479. 0,
  480. PortalMCThreadFn,
  481. NULL,
  482. 0,
  483. &dwDummy );
  484. if ( !g_hMCThread )
  485. {
  486. Error( "RunMPIPortalFlow: CreateThread failed for multicast receive thread." );
  487. }
  488. }
  489. VMPI_SetCurrentStage( "RunMPIBasePortalFlow" );
  490. g_pDistributeWorkCallbacks = &g_VisDistributeWorkCallbacks;
  491. g_CPUTime.Init();
  492. double elapsed = DistributeWork(
  493. g_numportals * 2, // # work units
  494. ProcessPortalFlow, // Worker function to process work units
  495. ReceivePortalFlow // Master function to receive work results
  496. );
  497. g_pDistributeWorkCallbacks = NULL;
  498. CheckExitedEarly();
  499. // Stop the multicast stuff.
  500. VMPI_DeletePortalMCSocket();
  501. if( !g_bMPIMaster )
  502. {
  503. if ( g_iVMPIVerboseLevel >= 1 )
  504. {
  505. Msg( "Received %d (out of %d) portals from multicast.\n", g_nMulticastPortalsReceived, g_numportals * 2 );
  506. Msg( "%.1f%% CPU utilization during PortalFlow\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads );
  507. }
  508. Msg( "VVIS worker finished. Over and out.\n" );
  509. VMPI_SetCurrentStage( "worker done" );
  510. Plat_ExitProcess( 0 );
  511. }
  512. if ( g_bMPIMaster )
  513. {
  514. EndPacifier( false );
  515. Msg( " (%d)\n", (int)elapsed );
  516. }
  517. }