|
|
//========= Copyright Valve Corporation, All rights reserved. ============//
//
// Purpose:
//
//=============================================================================
#include "vmpi.h"
#include "vmpi_distribute_work.h"
#include "tier0/platform.h"
#include "tier0/dbg.h"
#include "utlvector.h"
#include "utllinkedlist.h"
#include "vmpi_dispatch.h"
#include "pacifier.h"
#include "vstdlib/random.h"
#include "mathlib/mathlib.h"
#include "threadhelpers.h"
#include "threads.h"
#include "tier1/strtools.h"
#include "tier1/utlmap.h"
#include "tier1/smartptr.h"
#include "tier0/icommandline.h"
#include "cmdlib.h"
#include "vmpi_distribute_tracker.h"
#include "vmpi_distribute_work_internal.h"
#define DW_SUBPACKETID_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+0)
#define DW_SUBPACKETID_REQUEST_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+1)
#define DW_SUBPACKETID_WUS_COMPLETED_LIST (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+2)
// This is a pretty simple iterator. Basically, it holds a matrix of numbers.
// Each row is assigned to a worker, and the worker just walks through his row.
//
// When a worker reaches the end of his row, it gets a little trickier.
// They'll start doing their neighbor's row
// starting at the back and continue on. At about this time, the master should reshuffle the
// remaining work units to evenly distribute them amongst the workers.
class CWorkUnitWalker { public: CWorkUnitWalker() { m_nWorkUnits = 0; } // This is all that's needed for it to start assigning work units.
void Init( WUIndexType matrixWidth, WUIndexType matrixHeight, WUIndexType nWorkUnits ) { m_nWorkUnits = nWorkUnits; m_MatrixWidth = matrixWidth; m_MatrixHeight = matrixHeight; Assert( m_MatrixWidth * m_MatrixHeight >= nWorkUnits ); m_WorkerInfos.RemoveAll(); m_WorkerInfos.EnsureCount( m_MatrixHeight ); for ( int i=0; i < m_MatrixHeight; i++ ) { m_WorkerInfos[i].m_iStartWorkUnit = matrixWidth * i; m_WorkerInfos[i].m_iWorkUnitOffset = 0; } }
// This is the main function of the shuffler
bool GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex, bool *bWorkerFinishedHisColumn ) { if ( iWorker < 0 || iWorker >= m_WorkerInfos.Count() ) { Assert( false ); return false; }
// If this worker has walked through all the work units, then he's done.
CWorkerInfo *pWorker = &m_WorkerInfos[iWorker]; if ( pWorker->m_iWorkUnitOffset >= m_nWorkUnits ) return false; // If we've gone past the end of our work unit list, then we start at the BACK of the other rows of work units
// in the hopes that we won't collide with the guy working there. We also should tell the master to reshuffle.
WUIndexType iWorkUnitOffset = pWorker->m_iWorkUnitOffset; if ( iWorkUnitOffset >= m_MatrixWidth ) { WUIndexType xOffset = iWorkUnitOffset % m_MatrixWidth; WUIndexType yOffset = iWorkUnitOffset / m_MatrixWidth; xOffset = m_MatrixWidth - xOffset - 1; iWorkUnitOffset = yOffset * m_MatrixWidth + xOffset; *bWorkerFinishedHisColumn = true; } else { *bWorkerFinishedHisColumn = false; }
*pWUIndex = (pWorker->m_iStartWorkUnit + iWorkUnitOffset) % m_nWorkUnits; ++pWorker->m_iWorkUnitOffset; return true; }
private: class CWorkerInfo { public: WUIndexType m_iStartWorkUnit; WUIndexType m_iWorkUnitOffset; // Which work unit in my list of work units am I working on?
}; WUIndexType m_nWorkUnits; WUIndexType m_MatrixWidth; WUIndexType m_MatrixHeight; CUtlVector<CWorkerInfo> m_WorkerInfos; };
class IShuffleRequester { public: virtual void RequestShuffle() = 0; };
// This is updated every time the master decides to reshuffle.
// In-between shuffles, you can call NoteWorkUnitCompleted when a work unit is completed
// and it'll avoid returning that work unit from GetNextWorkUnit again, but it WON'T
class CShuffledWorkUnitWalker { public: void Init( WUIndexType nWorkUnits, IShuffleRequester *pRequester ) { m_iLastShuffleRequest = 0; m_iCurShuffle = 1; m_flLastShuffleTime = Plat_FloatTime(); m_pShuffleRequester = pRequester; int nBytes = PAD_NUMBER( nWorkUnits, 8 ) / 8; m_CompletedWUBits.SetSize( nBytes ); m_LocalCompletedWUBits.SetSize( nBytes ); for ( WUIndexType i=0; i < m_CompletedWUBits.Count(); i++ ) m_LocalCompletedWUBits[i] = m_CompletedWUBits[i] = 0; // Setup our list of work units remaining.
for ( WUIndexType iWU=0; iWU < nWorkUnits; iWU++ ) { // Note: we're making an assumption here that if we add entries to a CUtlLinkedList in ascending order, their indices
// will be ascending 1-by-1 as well. If that assumption breaks, we can create an extra array here to map WU indices to the linked list indices.
WUIndexType index = m_WorkUnitsRemaining.AddToTail( iWU ); if ( index != iWU ) { Error( "CShuffledWorkUnitWalker: assumption on CUtlLinkedList indexing failed.\n" ); } } } void Shuffle( int nWorkers ) { if ( nWorkers == 0 ) return;
++m_iCurShuffle; m_flLastShuffleTime = Plat_FloatTime(); CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); m_WorkUnitsMap.RemoveAll(); m_WorkUnitsMap.EnsureCount( m_WorkUnitsRemaining.Count() ); // Here's the shuffle. The CWorkUnitWalker is going to walk each worker through its own group from 0-W,
// and our job is to interleave it so when worker 0 goes [0,1,2] and worker 1 goes [100,101,102], they're actually
// doing [0,N,2N] and [1,N+1,2N+1] where N=# of workers.
// The grid is RxW long, and R*W is >= nWorkUnits.
// R = # units per worker = width of the matrix
// W = # workers = height of the matrix
WUIndexType matrixHeight = nWorkers; WUIndexType matrixWidth = m_WorkUnitsRemaining.Count() / matrixHeight; if ( (m_WorkUnitsRemaining.Count() % matrixHeight) != 0 ) ++matrixWidth;
Assert( matrixWidth * matrixHeight >= m_WorkUnitsRemaining.Count() ); WUIndexType iWorkUnit = 0; FOR_EACH_LL( m_WorkUnitsRemaining, i ) { WUIndexType xCoord = iWorkUnit / matrixHeight; WUIndexType yCoord = iWorkUnit % matrixHeight; Assert( xCoord < matrixWidth ); Assert( yCoord < matrixHeight ); m_WorkUnitsMap[yCoord*matrixWidth+xCoord] = m_WorkUnitsRemaining[i]; ++iWorkUnit; }
m_Walker.Init( matrixWidth, matrixHeight, m_WorkUnitsRemaining.Count() ); } // Threadsafe.
bool Thread_IsWorkUnitCompleted( WUIndexType iWU ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7)); return (val != 0); } WUIndexType Thread_NumWorkUnitsRemaining() { CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); return m_WorkUnitsRemaining.Count(); } bool Thread_GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); while ( 1 ) { WUIndexType iUnmappedWorkUnit; bool bWorkerFinishedHisColumn; if ( !m_Walker.GetNextWorkUnit( iWorker, &iUnmappedWorkUnit, &bWorkerFinishedHisColumn ) ) return false; // If we've done all the work units assigned to us in the last shuffle, then request a reshuffle.
if ( bWorkerFinishedHisColumn ) HandleWorkerFinishedColumn(); // Check the pending list.
*pWUIndex = m_WorkUnitsMap[iUnmappedWorkUnit]; byte bIsCompleted = m_CompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7)); byte bIsCompletedLocally = m_LocalCompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7)); if ( !bIsCompleted && !bIsCompletedLocally ) return true; } } void HandleWorkerFinishedColumn() { if ( m_iLastShuffleRequest != m_iCurShuffle ) { double flCurTime = Plat_FloatTime(); if ( flCurTime - m_flLastShuffleTime > 2.0f ) { m_pShuffleRequester->RequestShuffle(); m_iLastShuffleRequest = m_iCurShuffle; } } } void Thread_NoteWorkUnitCompleted( WUIndexType iWU ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock();
byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7)); if ( val == 0 ) { m_WorkUnitsRemaining.Remove( iWU ); m_CompletedWUBits[iWU >> 3] |= (1 << (iWU & 7)); } } void Thread_NoteLocalWorkUnitCompleted( WUIndexType iWU ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); m_LocalCompletedWUBits[iWU >> 3] |= (1 << (iWU & 7)); } CRC32_t GetShuffleCRC() { #ifdef _DEBUG
static bool bCalcShuffleCRC = true; #else
static bool bCalcShuffleCRC = VMPI_IsParamUsed( mpi_CalcShuffleCRC ); #endif
if ( bCalcShuffleCRC ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock();
CRC32_t ret; CRC32_Init( &ret ); FOR_EACH_LL( m_WorkUnitsRemaining, i ) { WUIndexType iWorkUnit = m_WorkUnitsRemaining[i]; CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) ); } for ( int i=0; i < m_WorkUnitsMap.Count(); i++ ) { WUIndexType iWorkUnit = m_WorkUnitsMap[i]; CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) ); } CRC32_Final( &ret ); return ret; } else { return false; } }
private: // These are PENDING WU completions until we call Shuffle() again, at which point we actually reorder the list
// based on the completed WUs.
CUtlVector<byte> m_CompletedWUBits; // Bit vector of completed WUs.
CUtlLinkedList<WUIndexType, WUIndexType> m_WorkUnitsRemaining; CUtlVector<WUIndexType> m_WorkUnitsMap; // Maps the 0-N indices in the CWorkUnitWalker to the list of remaining work units.
// Helps us avoid some duplicates that happen during shuffling if we've completed some WUs and sent them
// to the master, but the master hasn't included them in the DW_SUBPACKETID_WUS_COMPLETED_LIST yet.
CUtlVector<byte> m_LocalCompletedWUBits; // Bit vector of completed WUs.
// Used to control how frequently we request a reshuffle.
unsigned int m_iCurShuffle; unsigned int m_iLastShuffleRequest; // The index of the shuffle we last requested a reshuffle on (don't request a reshuffle on the same one).
double m_flLastShuffleTime; IShuffleRequester *m_pShuffleRequester; CWorkUnitWalker m_Walker; CCriticalSection m_CS; };
class CDistributor_SDKMaster : public IWorkUnitDistributorMaster, public IShuffleRequester { public: virtual void Release() { delete this; }
static void Master_WorkerThread_Static( int iThread, void *pUserData ) { ((CDistributor_SDKMaster*)pUserData)->Master_WorkerThread( iThread ); } void Master_WorkerThread( int iThread ) { while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 && !g_bVMPIEarlyExit ) { WUIndexType iWU; if ( !m_WorkUnitWalker.Thread_GetNextWorkUnit( 0, &iWU ) ) { // Wait until there are some WUs to do.
VMPI_Sleep( 10 ); continue; } // Do this work unit.
m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU ); // We do this before it's completed because otherwise if a Shuffle() occurs,
// the other thread might happen to pickup this work unit and we don't want that.
m_pInfo->m_WorkerInfo.m_pProcessFn( iThread, iWU, NULL ); NotifyLocalMasterCompletedWorkUnit( iWU ); } }
virtual void DistributeWork_Master( CDSInfo *pInfo ) { m_pInfo = pInfo; m_bForceShuffle = false; m_bShuffleRequested = false; m_flLastShuffleRequestServiceTime = Plat_FloatTime(); // Spawn idle-priority worker threads right here.
m_bUsingMasterLocalThreads = (pInfo->m_WorkerInfo.m_pProcessFn != 0); if ( VMPI_IsParamUsed( mpi_NoMasterWorkerThreads ) ) { Msg( "%s found. No worker threads will be created.\n", VMPI_GetParamString( mpi_NoMasterWorkerThreads ) ); m_bUsingMasterLocalThreads = false; } m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this ); Shuffle();
if ( m_bUsingMasterLocalThreads ) RunThreads_Start( Master_WorkerThread_Static, this, k_eRunThreadsPriority_Idle );
uint64 lastShuffleTime = Plat_MSTime(); while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 ) { VMPI_DispatchNextMessage( 200 ); CheckLocalMasterCompletedWorkUnits(); VMPITracker_HandleDebugKeypresses(); if ( g_pDistributeWorkCallbacks && g_pDistributeWorkCallbacks->Update() ) break;
// Reshuffle the work units optimally every certain interval.
if ( m_bForceShuffle || CheckShuffleRequest() ) { Shuffle(); lastShuffleTime = Plat_MSTime(); m_bForceShuffle = false; } } RunThreads_End(); }
virtual void RequestShuffle() { m_bShuffleRequested = true; } bool CheckShuffleRequest() { if ( m_bShuffleRequested ) { double flCurTime = Plat_FloatTime(); if ( flCurTime - m_flLastShuffleRequestServiceTime > 2.0f ) // Only handle shuffle requests every so often.
{ m_flLastShuffleRequestServiceTime = flCurTime; m_bShuffleRequested = false; return true; } } return false; } void Shuffle() { // Build a list of who's working.
CUtlVector<unsigned short> whosWorking; if ( m_bUsingMasterLocalThreads ) { whosWorking.AddToTail( VMPI_MASTER_ID ); Assert( VMPI_MASTER_ID == 0 ); } { CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); for ( int i=0; i < pWorkersReady->m_WorkersReady.Count(); i++ ) { int iWorker = pWorkersReady->m_WorkersReady[i]; if ( VMPI_IsProcConnected( iWorker ) ) whosWorking.AddToTail( iWorker ); } m_WorkersReadyCS.Unlock(); }
// Before sending the shuffle command, tell any of these active workers about the pending WUs completed.
CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); m_WUSCompletedMessageBuffer.setLen( 0 ); if ( BuildWUsCompletedMessage( pWUsCompleted->m_Pending, m_WUSCompletedMessageBuffer ) > 0 ) { for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ ) { VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), whosWorking[i] ); } } pWUsCompleted->m_Completed.AddMultipleToTail( pWUsCompleted->m_Pending.Count(), pWUsCompleted->m_Pending.Base() ); // Add the pending ones to the full list now.
pWUsCompleted->m_Pending.RemoveAll(); m_WUsCompletedCS.Unlock(); // Shuffle ourselves.
m_WorkUnitWalker.Shuffle( whosWorking.Count() );
// Send the shuffle command to the workers.
MessageBuffer mb; PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_SHUFFLE );
unsigned short nWorkers = whosWorking.Count(); mb.write( &nWorkers, sizeof( nWorkers ) );
CRC32_t shuffleCRC = m_WorkUnitWalker.GetShuffleCRC(); mb.write( &shuffleCRC, sizeof( shuffleCRC ) );
// Now for each worker, assign him an index in the shuffle and send the shuffle command.
int workerIDPos = mb.getLen(); unsigned short id = 0; mb.write( &id, sizeof( id ) ); for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ ) { id = (unsigned short)i; mb.update( workerIDPos, &id, sizeof( id ) ); VMPI_SendData( mb.data, mb.getLen(), whosWorking[i] ); } }
int BuildWUsCompletedMessage( CUtlVector<WUIndexType> &wusCompleted, MessageBuffer &mb ) { PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WUS_COMPLETED_LIST ); m_pInfo->WriteWUIndex( wusCompleted.Count(), &mb ); for ( int i=0; i < wusCompleted.Count(); i++ ) { m_pInfo->WriteWUIndex( wusCompleted[i], &mb ); } return wusCompleted.Count(); }
virtual void OnWorkerReady( int iSource ) { CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); if ( pWorkersReady->m_WorkersReady.Find( iSource ) == -1 ) { pWorkersReady->m_WorkersReady.AddToTail( iSource ); // Get this guy up to speed on which WUs are done.
{ CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); m_WUSCompletedMessageBuffer.setLen( 0 ); BuildWUsCompletedMessage( pWUsCompleted->m_Completed, m_WUSCompletedMessageBuffer ); m_WUsCompletedCS.Unlock(); } VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), iSource ); m_bForceShuffle = true; } m_WorkersReadyCS.Unlock(); }
virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit ) { return Thread_HandleWorkUnitResults( iWorkUnit ); } bool Thread_HandleWorkUnitResults( WUIndexType iWorkUnit ) { if ( m_WorkUnitWalker.Thread_IsWorkUnitCompleted( iWorkUnit ) ) { return false; } else { m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWorkUnit ); // We need the lock on here because our own worker threads can call into here.
CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); pWUsCompleted->m_Pending.AddToTail( iWorkUnit ); m_WUsCompletedCS.Unlock(); return true; } }
virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) { if ( pBuf->data[1] == DW_SUBPACKETID_REQUEST_SHUFFLE ) { if ( bIgnoreContents ) return true; m_bShuffleRequested = true; } return false; }
virtual void DisconnectHandler( int workerID ) { CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); if ( pWorkersReady->m_WorkersReady.Find( workerID ) != -1 ) m_bForceShuffle = true; m_WorkersReadyCS.Unlock(); }
public: CDSInfo *m_pInfo; class CWorkersReady { public: CUtlVector<int> m_WorkersReady; // The list of workers who have said they're ready to participate.
}; CCriticalSectionData<CWorkersReady> m_WorkersReadyCS;
class CWUsCompleted { public: CUtlVector<WUIndexType> m_Completed; // WUs completed that we have sent to workers.
CUtlVector<WUIndexType> m_Pending; // WUs completed that we haven't sent to workers.
}; CCriticalSectionData<CWUsCompleted> m_WUsCompletedCS; MessageBuffer m_WUSCompletedMessageBuffer; // Used to send lists of completed WUs.
int m_bUsingMasterLocalThreads; bool m_bForceShuffle; bool m_bShuffleRequested; double m_flLastShuffleRequestServiceTime; CShuffledWorkUnitWalker m_WorkUnitWalker; };
class CDistributor_SDKWorker : public IWorkUnitDistributorWorker, public IShuffleRequester { public: virtual void Init( CDSInfo *pInfo ) { m_iMyWorkUnitWalkerID = -1; m_pInfo = pInfo; m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this ); } virtual void Release() { delete this; } virtual bool GetNextWorkUnit( WUIndexType *pWUIndex ) { // If we don't have an ID yet, we haven't received a Shuffle() command, so we're waiting for that before working.
// TODO: we could do some random WUs here while we're waiting, although that could suck if the WUs take forever to do
// and they're duplicates.
if ( m_iMyWorkUnitWalkerID == -1 ) return false; // Look in our current shuffled list of work units for the next one.
return m_WorkUnitWalker.Thread_GetNextWorkUnit( m_iMyWorkUnitWalkerID, pWUIndex ); } virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU ) { m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU ); }
virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) { // If it's a SHUFFLE message, then shuffle..
if ( pBuf->data[1] == DW_SUBPACKETID_SHUFFLE ) { if ( bIgnoreContents ) return true; unsigned short nWorkers, myID; CRC32_t shuffleCRC; pBuf->read( &nWorkers, sizeof( nWorkers ) ); pBuf->read( &shuffleCRC, sizeof( shuffleCRC ) ); pBuf->read( &myID, sizeof( myID ) ); m_iMyWorkUnitWalkerID = myID; m_WorkUnitWalker.Shuffle( nWorkers ); if ( m_WorkUnitWalker.GetShuffleCRC() != shuffleCRC ) { static int nWarnings = 1; if ( ++nWarnings <= 2 ) Warning( "\nShuffle CRC mismatch\n" ); } return true; } else if ( pBuf->data[1] == DW_SUBPACKETID_WUS_COMPLETED_LIST ) { if ( bIgnoreContents ) return true; WUIndexType nCompleted; m_pInfo->ReadWUIndex( &nCompleted, pBuf ); for ( WUIndexType i=0; i < nCompleted; i++ ) { WUIndexType iWU; m_pInfo->ReadWUIndex( &iWU, pBuf ); m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWU ); } return true; } return false; }
virtual void RequestShuffle() { // Ok.. request a reshuffle.
MessageBuffer mb; PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_REQUEST_SHUFFLE ); VMPI_SendData( mb.data, mb.getLen(), VMPI_MASTER_ID ); } private: CDSInfo *m_pInfo; CShuffledWorkUnitWalker m_WorkUnitWalker; int m_iMyWorkUnitWalkerID; };
IWorkUnitDistributorMaster* CreateWUDistributor_SDKMaster() { return new CDistributor_SDKMaster; }
IWorkUnitDistributorWorker* CreateWUDistributor_SDKWorker() { return new CDistributor_SDKWorker; }
|