Team Fortress 2 Source Code as on 22/4/2020
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

640 lines
16 KiB

  1. //========= Copyright Valve Corporation, All rights reserved. ============//
  2. //
  3. // Purpose:
  4. //
  5. // $NoKeywords: $
  6. //
  7. //=============================================================================//
  8. #include <windows.h>
  9. #include "vis.h"
  10. #include "threads.h"
  11. #include "stdlib.h"
  12. #include "pacifier.h"
  13. #include "mpi_stats.h"
  14. #include "vmpi.h"
  15. #include "vmpi_dispatch.h"
  16. #include "vmpi_filesystem.h"
  17. #include "vmpi_distribute_work.h"
  18. #include "iphelpers.h"
  19. #include "threadhelpers.h"
  20. #include "vstdlib/random.h"
  21. #include "vmpi_tools_shared.h"
  22. #include <conio.h>
  23. #include "scratchpad_helpers.h"
  24. #define VMPI_VVIS_PACKET_ID 1
  25. // Sub packet IDs.
  26. #define VMPI_SUBPACKETID_DISCONNECT_NOTIFY 3 // We send ourselves this when there is a disconnect.
  27. #define VMPI_SUBPACKETID_BASEPORTALVIS 5
  28. #define VMPI_SUBPACKETID_PORTALFLOW 6
  29. #define VMPI_BASEPORTALVIS_RESULTS 7
  30. #define VMPI_BASEPORTALVIS_WORKER_DONE 8
  31. #define VMPI_PORTALFLOW_RESULTS 9
  32. #define VMPI_SUBPACKETID_BASEPORTALVIS_SYNC 11
  33. #define VMPI_SUBPACKETID_PORTALFLOW_SYNC 12
  34. #define VMPI_SUBPACKETID_MC_ADDR 13
  35. // DistributeWork owns this packet ID.
  36. #define VMPI_DISTRIBUTEWORK_PACKETID 2
  37. extern bool fastvis;
  38. // The worker waits until these are true.
  39. bool g_bBasePortalVisSync = false;
  40. bool g_bPortalFlowSync = false;
  41. CUtlVector<char> g_BasePortalVisResultsFilename;
  42. CCycleCount g_CPUTime;
  43. // This stuff is all for the multicast channel the master uses to send out the portal results.
  44. ISocket *g_pPortalMCSocket = NULL;
  45. CIPAddr g_PortalMCAddr;
  46. bool g_bGotMCAddr = false;
  47. HANDLE g_hMCThread = NULL;
  48. CEvent g_MCThreadExitEvent;
  49. unsigned long g_PortalMCThreadUniqueID = 0;
  50. int g_nMulticastPortalsReceived = 0;
  51. // Handle VVIS packets.
  52. bool VVIS_DispatchFn( MessageBuffer *pBuf, int iSource, int iPacketID )
  53. {
  54. switch ( pBuf->data[1] )
  55. {
  56. case VMPI_SUBPACKETID_MC_ADDR:
  57. {
  58. pBuf->setOffset( 2 );
  59. pBuf->read( &g_PortalMCAddr, sizeof( g_PortalMCAddr ) );
  60. g_bGotMCAddr = true;
  61. return true;
  62. }
  63. case VMPI_SUBPACKETID_DISCONNECT_NOTIFY:
  64. {
  65. // This is just used to cause nonblocking dispatches to jump out so loops like the one
  66. // in AppBarrier can handle the fact that there are disconnects.
  67. return true;
  68. }
  69. case VMPI_SUBPACKETID_BASEPORTALVIS_SYNC:
  70. {
  71. g_bBasePortalVisSync = true;
  72. return true;
  73. }
  74. case VMPI_SUBPACKETID_PORTALFLOW_SYNC:
  75. {
  76. g_bPortalFlowSync = true;
  77. return true;
  78. }
  79. case VMPI_BASEPORTALVIS_RESULTS:
  80. {
  81. const char *pFilename = &pBuf->data[2];
  82. g_BasePortalVisResultsFilename.CopyArray( pFilename, strlen( pFilename ) + 1 );
  83. return true;
  84. }
  85. default:
  86. {
  87. return false;
  88. }
  89. }
  90. }
  91. CDispatchReg g_VVISDispatchReg( VMPI_VVIS_PACKET_ID, VVIS_DispatchFn ); // register to handle the messages we want
  92. CDispatchReg g_DistributeWorkReg( VMPI_DISTRIBUTEWORK_PACKETID, DistributeWorkDispatch );
  93. void VMPI_DeletePortalMCSocket()
  94. {
  95. // Stop the thread if it exists.
  96. if ( g_hMCThread )
  97. {
  98. g_MCThreadExitEvent.SetEvent();
  99. WaitForSingleObject( g_hMCThread, INFINITE );
  100. CloseHandle( g_hMCThread );
  101. g_hMCThread = NULL;
  102. }
  103. if ( g_pPortalMCSocket )
  104. {
  105. g_pPortalMCSocket->Release();
  106. g_pPortalMCSocket = NULL;
  107. }
  108. }
  109. void VVIS_SetupMPI( int &argc, char **&argv )
  110. {
  111. if ( !VMPI_FindArg( argc, argv, "-mpi", "" ) && !VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Worker ), "" ) )
  112. return;
  113. CmdLib_AtCleanup( VMPI_Stats_Term );
  114. CmdLib_AtCleanup( VMPI_DeletePortalMCSocket );
  115. VMPI_Stats_InstallSpewHook();
  116. // Force local mode?
  117. VMPIRunMode mode;
  118. if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Local ), "" ) )
  119. mode = VMPI_RUN_LOCAL;
  120. else
  121. mode = VMPI_RUN_NETWORKED;
  122. //
  123. // Extract mpi specific arguments
  124. //
  125. Msg( "Initializing VMPI...\n" );
  126. if ( !VMPI_Init( argc, argv, "dependency_info_vvis.txt", HandleMPIDisconnect, mode ) )
  127. {
  128. Error( "MPI_Init failed." );
  129. }
  130. StatsDB_InitStatsDatabase( argc, argv, "dbinfo_vvis.txt" );
  131. }
  132. void ProcessBasePortalVis( int iThread, uint64 iPortal, MessageBuffer *pBuf )
  133. {
  134. CTimeAdder adder( &g_CPUTime );
  135. BasePortalVis( iThread, iPortal );
  136. // Send my result to the master
  137. if ( pBuf )
  138. {
  139. portal_t * p = &portals[iPortal];
  140. pBuf->write( p->portalfront, portalbytes );
  141. pBuf->write( p->portalflood, portalbytes );
  142. }
  143. }
  144. void ReceiveBasePortalVis( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker )
  145. {
  146. portal_t * p = &portals[iWorkUnit];
  147. if ( p->portalflood != 0 || p->portalfront != 0 || p->portalvis != 0)
  148. {
  149. Msg("Duplicate portal %llu\n", iWorkUnit);
  150. }
  151. if ( pBuf->getLen() - pBuf->getOffset() != portalbytes*2 )
  152. Error( "Invalid packet in ReceiveBasePortalVis." );
  153. //
  154. // allocate memory for bitwise vis solutions for this portal
  155. //
  156. p->portalfront = (byte*)malloc (portalbytes);
  157. pBuf->read( p->portalfront, portalbytes );
  158. p->portalflood = (byte*)malloc (portalbytes);
  159. pBuf->read( p->portalflood, portalbytes );
  160. p->portalvis = (byte*)malloc (portalbytes);
  161. memset (p->portalvis, 0, portalbytes);
  162. p->nummightsee = CountBits( p->portalflood, g_numportals*2 );
  163. }
  164. //-----------------------------------------
  165. //
  166. // Run BasePortalVis across all available processing nodes
  167. // Then collect and redistribute the results.
  168. //
  169. void RunMPIBasePortalVis()
  170. {
  171. int i;
  172. Msg( "\n\nportalbytes: %d\nNum Work Units: %d\nTotal data size: %d\n", portalbytes, g_numportals*2, portalbytes*g_numportals*2 );
  173. Msg("%-20s ", "BasePortalVis:");
  174. if ( g_bMPIMaster )
  175. StartPacifier("");
  176. VMPI_SetCurrentStage( "RunMPIBasePortalVis" );
  177. // Note: we're aiming for about 1500 portals in a map, so about 3000 work units.
  178. g_CPUTime.Init();
  179. double elapsed = DistributeWork(
  180. g_numportals * 2, // # work units
  181. VMPI_DISTRIBUTEWORK_PACKETID, // packet ID
  182. ProcessBasePortalVis, // Worker function to process work units
  183. ReceiveBasePortalVis // Master function to receive work results
  184. );
  185. if ( g_bMPIMaster )
  186. {
  187. EndPacifier( false );
  188. Msg( " (%d)\n", (int)elapsed );
  189. }
  190. //
  191. // Distribute the results to all the workers.
  192. //
  193. if ( g_bMPIMaster )
  194. {
  195. if ( !fastvis )
  196. {
  197. VMPI_SetCurrentStage( "SendPortalResults" );
  198. // Store all the portal results in a temp file and multicast that to the workers.
  199. CUtlVector<char> allPortalData;
  200. allPortalData.SetSize( g_numportals * 2 * portalbytes * 2 );
  201. char *pOut = allPortalData.Base();
  202. for ( i=0; i < g_numportals * 2; i++)
  203. {
  204. portal_t *p = &portals[i];
  205. memcpy( pOut, p->portalfront, portalbytes );
  206. pOut += portalbytes;
  207. memcpy( pOut, p->portalflood, portalbytes );
  208. pOut += portalbytes;
  209. }
  210. const char *pVirtualFilename = "--portal-results--";
  211. VMPI_FileSystem_CreateVirtualFile( pVirtualFilename, allPortalData.Base(), allPortalData.Count() );
  212. char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_BASEPORTALVIS_RESULTS };
  213. VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), pVirtualFilename, strlen( pVirtualFilename ) + 1, VMPI_PERSISTENT );
  214. }
  215. }
  216. else
  217. {
  218. VMPI_SetCurrentStage( "RecvPortalResults" );
  219. // Wait until we've received the filename from the master.
  220. while ( g_BasePortalVisResultsFilename.Count() == 0 )
  221. {
  222. VMPI_DispatchNextMessage();
  223. }
  224. // Open
  225. FileHandle_t fp = g_pFileSystem->Open( g_BasePortalVisResultsFilename.Base(), "rb", VMPI_VIRTUAL_FILES_PATH_ID );
  226. if ( !fp )
  227. Error( "Can't open '%s' to read portal info.", g_BasePortalVisResultsFilename.Base() );
  228. for ( i=0; i < g_numportals * 2; i++)
  229. {
  230. portal_t *p = &portals[i];
  231. p->portalfront = (byte*)malloc (portalbytes);
  232. g_pFileSystem->Read( p->portalfront, portalbytes, fp );
  233. p->portalflood = (byte*)malloc (portalbytes);
  234. g_pFileSystem->Read( p->portalflood, portalbytes, fp );
  235. p->portalvis = (byte*)malloc (portalbytes);
  236. memset (p->portalvis, 0, portalbytes);
  237. p->nummightsee = CountBits (p->portalflood, g_numportals*2);
  238. }
  239. g_pFileSystem->Close( fp );
  240. }
  241. if ( !g_bMPIMaster )
  242. {
  243. if ( g_iVMPIVerboseLevel >= 1 )
  244. Msg( "\n%% worker CPU utilization during BasePortalVis: %.1f\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads );
  245. }
  246. }
  247. void ProcessPortalFlow( int iThread, uint64 iPortal, MessageBuffer *pBuf )
  248. {
  249. // Process Portal and distribute results
  250. CTimeAdder adder( &g_CPUTime );
  251. PortalFlow( iThread, iPortal );
  252. // Send my result to root and potentially the other slaves
  253. // The slave results are read in RecursiveLeafFlow
  254. //
  255. if ( pBuf )
  256. {
  257. portal_t * p = sorted_portals[iPortal];
  258. pBuf->write( p->portalvis, portalbytes );
  259. }
  260. }
  261. void ReceivePortalFlow( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker )
  262. {
  263. portal_t *p = sorted_portals[iWorkUnit];
  264. if ( p->status != stat_done )
  265. {
  266. pBuf->read( p->portalvis, portalbytes );
  267. p->status = stat_done;
  268. // Multicast the status of this portal out.
  269. if ( g_pPortalMCSocket )
  270. {
  271. char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_PORTALFLOW_RESULTS };
  272. void *chunks[4] = { cPacketID, &g_PortalMCThreadUniqueID, &iWorkUnit, p->portalvis };
  273. int chunkLengths[4] = { sizeof( cPacketID ), sizeof( g_PortalMCThreadUniqueID ), sizeof( iWorkUnit ), portalbytes };
  274. g_pPortalMCSocket->SendChunksTo( &g_PortalMCAddr, chunks, chunkLengths, ARRAYSIZE( chunks ) );
  275. }
  276. }
  277. }
  278. DWORD WINAPI PortalMCThreadFn( LPVOID p )
  279. {
  280. CUtlVector<char> data;
  281. data.SetSize( portalbytes + 128 );
  282. DWORD waitTime = 0;
  283. while ( WaitForSingleObject( g_MCThreadExitEvent.GetEventHandle(), waitTime ) != WAIT_OBJECT_0 )
  284. {
  285. CIPAddr ipFrom;
  286. int len = g_pPortalMCSocket->RecvFrom( data.Base(), data.Count(), &ipFrom );
  287. if ( len == -1 )
  288. {
  289. waitTime = 20;
  290. }
  291. else
  292. {
  293. // These lengths must match exactly what is sent in ReceivePortalFlow.
  294. if ( len == 2 + sizeof( g_PortalMCThreadUniqueID ) + sizeof( int ) + portalbytes )
  295. {
  296. // Perform more validation...
  297. if ( data[0] == VMPI_VVIS_PACKET_ID && data[1] == VMPI_PORTALFLOW_RESULTS )
  298. {
  299. if ( *((unsigned long*)&data[2]) == g_PortalMCThreadUniqueID )
  300. {
  301. int iWorkUnit = *((int*)&data[6]);
  302. if ( iWorkUnit >= 0 && iWorkUnit < g_numportals*2 )
  303. {
  304. portal_t *p = sorted_portals[iWorkUnit];
  305. if ( p )
  306. {
  307. ++g_nMulticastPortalsReceived;
  308. memcpy( p->portalvis, &data[10], portalbytes );
  309. p->status = stat_done;
  310. waitTime = 0;
  311. }
  312. }
  313. }
  314. }
  315. }
  316. }
  317. }
  318. return 0;
  319. }
  320. void MCThreadCleanupFn()
  321. {
  322. g_MCThreadExitEvent.SetEvent();
  323. }
  324. // --------------------------------------------------------------------------------- //
  325. // Cheesy hack to let them stop the job early and keep the results of what has
  326. // been done so far.
  327. // --------------------------------------------------------------------------------- //
  328. class CVisDistributeWorkCallbacks : public IWorkUnitDistributorCallbacks
  329. {
  330. public:
  331. CVisDistributeWorkCallbacks()
  332. {
  333. m_bExitedEarly = false;
  334. m_iState = STATE_NONE;
  335. }
  336. virtual bool Update()
  337. {
  338. if ( kbhit() )
  339. {
  340. int key = toupper( getch() );
  341. if ( m_iState == STATE_NONE )
  342. {
  343. if ( key == 'M' )
  344. {
  345. m_iState = STATE_AT_MENU;
  346. Warning("\n\n"
  347. "----------------------\n"
  348. "1. Write scratchpad file.\n"
  349. "2. Exit early and use fast vis for remaining portals.\n"
  350. "\n"
  351. "0. Exit menu.\n"
  352. "----------------------\n"
  353. "\n"
  354. );
  355. }
  356. }
  357. else if ( m_iState == STATE_AT_MENU )
  358. {
  359. if ( key == '1' )
  360. {
  361. Warning(
  362. "\n"
  363. "\nWriting scratchpad file."
  364. "\nCommand line: scratchpad3dviewer -file scratch.pad\n"
  365. "\nRed portals are the portals that are fast vis'd."
  366. "\n"
  367. );
  368. m_iState = STATE_NONE;
  369. IScratchPad3D *pPad = ScratchPad3D_Create( "scratch.pad" );
  370. if ( pPad )
  371. {
  372. ScratchPad_DrawWorld( pPad, false );
  373. // Draw the portals that haven't been vis'd.
  374. for ( int i=0; i < g_numportals*2; i++ )
  375. {
  376. portal_t *p = sorted_portals[i];
  377. ScratchPad_DrawWinding( pPad, p->winding->numpoints, p->winding->points, Vector( 1, 0, 0 ), Vector( .3, .3, .3 ) );
  378. }
  379. pPad->Release();
  380. }
  381. }
  382. else if ( key == '2' )
  383. {
  384. // Exit the process early.
  385. m_bExitedEarly = true;
  386. return true;
  387. }
  388. else if ( key == '0' )
  389. {
  390. m_iState = STATE_NONE;
  391. Warning( "\n\nExited menu.\n\n" );
  392. }
  393. }
  394. }
  395. return false;
  396. }
  397. public:
  398. enum
  399. {
  400. STATE_NONE,
  401. STATE_AT_MENU
  402. };
  403. bool m_bExitedEarly;
  404. int m_iState; // STATE_ enum.
  405. };
  406. CVisDistributeWorkCallbacks g_VisDistributeWorkCallbacks;
  407. void CheckExitedEarly()
  408. {
  409. if ( g_VisDistributeWorkCallbacks.m_bExitedEarly )
  410. {
  411. Warning( "\nExited early, using fastvis results...\n" );
  412. Warning( "Exited early, using fastvis results...\n" );
  413. // Use the fastvis results for portals that we didn't get results for.
  414. for ( int i=0; i < g_numportals*2; i++ )
  415. {
  416. if ( sorted_portals[i]->status != stat_done )
  417. {
  418. sorted_portals[i]->portalvis = sorted_portals[i]->portalflood;
  419. sorted_portals[i]->status = stat_done;
  420. }
  421. }
  422. }
  423. }
  424. //-----------------------------------------
  425. //
  426. // Run PortalFlow across all available processing nodes
  427. //
  428. void RunMPIPortalFlow()
  429. {
  430. Msg( "%-20s ", "MPIPortalFlow:" );
  431. if ( g_bMPIMaster )
  432. StartPacifier("");
  433. // Workers wait until we get the MC socket address.
  434. g_PortalMCThreadUniqueID = StatsDB_GetUniqueJobID();
  435. if ( g_bMPIMaster )
  436. {
  437. CCycleCount cnt;
  438. cnt.Sample();
  439. CUniformRandomStream randomStream;
  440. randomStream.SetSeed( cnt.GetMicroseconds() );
  441. g_PortalMCAddr.port = randomStream.RandomInt( 22000, 25000 ); // Pulled out of something else.
  442. g_PortalMCAddr.ip[0] = (unsigned char)RandomInt( 225, 238 );
  443. g_PortalMCAddr.ip[1] = (unsigned char)RandomInt( 0, 255 );
  444. g_PortalMCAddr.ip[2] = (unsigned char)RandomInt( 0, 255 );
  445. g_PortalMCAddr.ip[3] = (unsigned char)RandomInt( 3, 255 );
  446. g_pPortalMCSocket = CreateIPSocket();
  447. int i=0;
  448. for ( i; i < 5; i++ )
  449. {
  450. if ( g_pPortalMCSocket->BindToAny( randomStream.RandomInt( 20000, 30000 ) ) )
  451. break;
  452. }
  453. if ( i == 5 )
  454. {
  455. Error( "RunMPIPortalFlow: can't open a socket to multicast on." );
  456. }
  457. char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_SUBPACKETID_MC_ADDR };
  458. VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), &g_PortalMCAddr, sizeof( g_PortalMCAddr ), VMPI_PERSISTENT );
  459. }
  460. else
  461. {
  462. VMPI_SetCurrentStage( "wait for MC address" );
  463. while ( !g_bGotMCAddr )
  464. {
  465. VMPI_DispatchNextMessage();
  466. }
  467. // Open our multicast receive socket.
  468. g_pPortalMCSocket = CreateMulticastListenSocket( g_PortalMCAddr );
  469. if ( !g_pPortalMCSocket )
  470. {
  471. char err[512];
  472. IP_GetLastErrorString( err, sizeof( err ) );
  473. Error( "RunMPIPortalFlow: CreateMulticastListenSocket failed. (%s).", err );
  474. }
  475. // Make a thread to listen for the data on the multicast socket.
  476. DWORD dwDummy = 0;
  477. g_MCThreadExitEvent.Init( false, false );
  478. // Make sure we kill the MC thread if the app exits ungracefully.
  479. CmdLib_AtCleanup( MCThreadCleanupFn );
  480. g_hMCThread = CreateThread(
  481. NULL,
  482. 0,
  483. PortalMCThreadFn,
  484. NULL,
  485. 0,
  486. &dwDummy );
  487. if ( !g_hMCThread )
  488. {
  489. Error( "RunMPIPortalFlow: CreateThread failed for multicast receive thread." );
  490. }
  491. }
  492. VMPI_SetCurrentStage( "RunMPIBasePortalFlow" );
  493. g_pDistributeWorkCallbacks = &g_VisDistributeWorkCallbacks;
  494. g_CPUTime.Init();
  495. double elapsed = DistributeWork(
  496. g_numportals * 2, // # work units
  497. VMPI_DISTRIBUTEWORK_PACKETID, // packet ID
  498. ProcessPortalFlow, // Worker function to process work units
  499. ReceivePortalFlow // Master function to receive work results
  500. );
  501. g_pDistributeWorkCallbacks = NULL;
  502. CheckExitedEarly();
  503. // Stop the multicast stuff.
  504. VMPI_DeletePortalMCSocket();
  505. if( !g_bMPIMaster )
  506. {
  507. if ( g_iVMPIVerboseLevel >= 1 )
  508. {
  509. Msg( "Received %d (out of %d) portals from multicast.\n", g_nMulticastPortalsReceived, g_numportals * 2 );
  510. Msg( "%.1f%% CPU utilization during PortalFlow\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads );
  511. }
  512. Msg( "VVIS worker finished. Over and out.\n" );
  513. VMPI_SetCurrentStage( "worker done" );
  514. CmdLib_Exit( 0 );
  515. }
  516. if ( g_bMPIMaster )
  517. {
  518. EndPacifier( false );
  519. Msg( " (%d)\n", (int)elapsed );
  520. }
  521. }