//========= Copyright Valve Corporation, All rights reserved. ============//
// Purpose:
#include "vmpi.h"
#include "vmpi_distribute_work.h"
#include "tier0/platform.h"
#include "tier0/dbg.h"
#include "utlvector.h"
#include "utllinkedlist.h"
#include "vmpi_dispatch.h"
#include "pacifier.h"
#include "vstdlib/random.h"
#include "mathlib/mathlib.h"
#include "threadhelpers.h"
#include "threads.h"
#include "tier1/strtools.h"
#include "tier1/utlmap.h"
#include "tier1/smartptr.h"
#include "tier0/icommandline.h"
#include "cmdlib.h"
#include "vmpi_distribute_tracker.h"
#include "vmpi_distribute_work_internal.h"
static int s_numWusToDeal = -1;
void VMPI_SetWorkUnitsPartitionSize( int numWusToDeal ) { s_numWusToDeal = numWusToDeal; }
class CWorkUnitInfo { public: WUIndexType m_iWorkUnit; };
class CWULookupInfo { public: CWULookupInfo() : m_iWUInfo( -1 ), m_iPartition( -222222 ), m_iPartitionListIndex( -1 ) {}
public: int m_iWUInfo; // Index into m_WUInfo.
int m_iPartition; // Which partition it's in.
int m_iPartitionListIndex; // Index into its partition's m_WUs.
class CPartitionInfo { public: typedef CUtlLinkedList< WUIndexType, int > PartitionWUs;
public: int m_iPartition; // Index into m_Partitions.
int m_iWorker; // Who owns this partition?
PartitionWUs m_WUs; // Which WUs are in this partition?
// Work units tracker to track consecutive finished blocks
class CWorkUnitsTracker { public: CWorkUnitsTracker() {}
public: // Initializes the unit tracker to receive numUnits in future
void PrepareForWorkUnits( uint64 numUnits ); // Signals that a work unit has been finished
// returns a zero-based index of the next pending work unit
// up to which the task list has been processed fully now
// because the received work unit filled the gap or was the next pending work unit.
// returns 0 to indicate that this work unit is a "faster processed future work unit".
uint64 WorkUnitFinished( uint64 iWorkUnit );
public: enum WUInfo { kNone, kTrigger, kDone }; CVisibleWindowVector< uint8 > m_arrInfo; };
void CWorkUnitsTracker::PrepareForWorkUnits( uint64 numUnits ) { m_arrInfo.Reset( numUnits + 1 );
if ( numUnits ) { m_arrInfo.ExpandWindow( 2ull, kNone ); m_arrInfo.Get( 0ull ) = kTrigger; } }
uint64 CWorkUnitsTracker::WorkUnitFinished( uint64 iWorkUnit ) { uint64 uiResult = uint64( 0 );
if ( iWorkUnit >= m_arrInfo.FirstPossibleIndex() && iWorkUnit < m_arrInfo.PastPossibleIndex() ) { // Need to access the element
m_arrInfo.ExpandWindow( iWorkUnit + 1, kNone );
// Set it done
uint8 &rchThere = m_arrInfo.Get( iWorkUnit ), chThere = rchThere; rchThere = kDone;
// Should we trigger?
if ( kTrigger == chThere ) { // Go along all "done" work units and trigger the last found one
while ( ( ( ++ iWorkUnit ) < m_arrInfo.PastVisibleIndex() ) && ( kDone == m_arrInfo.Get( iWorkUnit ) ) ) continue;
m_arrInfo.Get( iWorkUnit ) = kTrigger; m_arrInfo.ShrinkWindow( iWorkUnit - 1 ); uiResult = iWorkUnit; } else if( iWorkUnit == m_arrInfo.FirstPossibleIndex() ) { // Go along all "done" work units and shrink including the last found one
while ( ( ( ++ iWorkUnit ) < m_arrInfo.PastVisibleIndex() ) && ( kDone == m_arrInfo.Get( iWorkUnit ) ) ) continue;
m_arrInfo.ShrinkWindow( iWorkUnit - 1 ); } }
return uiResult; }
CWorkUnitsTracker g_MasterWorkUnitsTracker;
static bool CompareSoonestWorkUnitSets( CPartitionInfo::PartitionWUs * const &x, CPartitionInfo::PartitionWUs * const &y ) { // Compare by fourth/second/first job in the partitions
WUIndexType missing = ~WUIndexType(0); WUIndexType jobsX[4] = { missing, missing, missing, missing }; WUIndexType jobsY[4] = { missing, missing, missing, missing }; int counter = 0;
counter = 0; FOR_EACH_LL( (*x), i ) { jobsX[ counter ++ ] = (*x)[i]; if ( counter >= 4 ) break; }
counter = 0; FOR_EACH_LL( (*y), i ) { jobsY[ counter ++ ] = (*y)[i]; if ( counter >= 4 ) break; }
// Compare
if ( jobsX[3] != jobsY[3] ) return ( jobsX[3] < jobsY[3] );
if ( jobsX[1] != jobsY[1] ) return ( jobsX[1] < jobsY[1] );
return jobsX[0] < jobsY[0]; }
class CDistributor_DefaultMaster : public IWorkUnitDistributorMaster { public: virtual void Release() { delete this; } virtual void DistributeWork_Master( CDSInfo *pInfo ) { m_pInfo = pInfo; g_MasterWorkUnitsTracker.PrepareForWorkUnits( m_pInfo->m_nWorkUnits ); m_WULookup.Reset( pInfo->m_nWorkUnits ); while ( m_WULookup.FirstPossibleIndex() < m_WULookup.PastPossibleIndex() ) { VMPI_DispatchNextMessage( 200 );
VMPITracker_HandleDebugKeypresses(); if ( g_pDistributeWorkCallbacks && g_pDistributeWorkCallbacks->Update() ) break; } }
virtual void OnWorkerReady( int iSource ) { AssignWUsToWorker( iSource ); }
virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit ) { CWULookupInfo *pLookup = NULL; if ( iWorkUnit >= m_WULookup.FirstPossibleIndex() && iWorkUnit < m_WULookup.PastVisibleIndex() ) pLookup = &m_WULookup.Get( iWorkUnit );
if ( !pLookup || pLookup->m_iWUInfo == -1 ) return false; // Mark this WU finished and remove it from the list of pending WUs.
m_WUInfo.Remove( pLookup->m_iWUInfo ); pLookup->m_iWUInfo = -1;
// Get rid of the WU from its partition.
int iPartition = pLookup->m_iPartition; CPartitionInfo *pPartition = m_Partitions[iPartition]; pPartition->m_WUs.Remove( pLookup->m_iPartitionListIndex );
// Shrink the window of the lookup work units
if ( iWorkUnit == m_WULookup.FirstPossibleIndex() ) { WUIndexType kwu = iWorkUnit; for ( WUIndexType kwuEnd = m_WULookup.PastVisibleIndex(); kwu < kwuEnd; ++ kwu ) { if ( -1 != m_WULookup.Get( kwu ).m_iWUInfo && kwu > iWorkUnit ) break; } m_WULookup.ShrinkWindow( kwu - 1 ); }
// Give the worker some new work if need be.
if ( pPartition->m_WUs.Count() == 0 ) { int iPartitionWorker = pPartition->m_iWorker; delete pPartition; m_Partitions.Remove( iPartition ); // If there are any more WUs remaining, give the worker from this partition some more of them.
if ( m_WULookup.FirstPossibleIndex() < m_WULookup.PastPossibleIndex() ) { AssignWUsToWorker( iPartitionWorker ); } }
uint64 iDoneWorkUnits = g_MasterWorkUnitsTracker.WorkUnitFinished( iWorkUnit ); if ( iDoneWorkUnits && g_pDistributeWorkCallbacks ) { g_pDistributeWorkCallbacks->OnWorkUnitsCompleted( iDoneWorkUnits ); }
return true; }
virtual void DisconnectHandler( int workerID ) { int iPartitionLookup = FindPartitionByWorker( workerID ); if ( iPartitionLookup != -1 ) { // Mark this guy's partition as unowned so another worker can get it.
CPartitionInfo *pPartition = m_Partitions[iPartitionLookup]; pPartition->m_iWorker = -1; } }
CPartitionInfo* AddPartition( int iWorker ) { CPartitionInfo *pNew = new CPartitionInfo; pNew->m_iPartition = m_Partitions.AddToTail( pNew ); pNew->m_iWorker = iWorker; return pNew; }
bool SplitWUsPartition( CPartitionInfo *pPartitionLarge, CPartitionInfo **ppFirstHalf, CPartitionInfo **ppSecondHalf, int iFirstHalfWorker, int iSecondHalfWorker ) { int nCount = pPartitionLarge->m_WUs.Count(); if ( nCount > 1 ) // Allocate the partitions for the two workers
{ *ppFirstHalf = AddPartition( iFirstHalfWorker ); *ppSecondHalf = AddPartition( iSecondHalfWorker ); } else // Specially transfer a partition with too few work units
{ *ppFirstHalf = NULL; *ppSecondHalf = AddPartition( iSecondHalfWorker ); }
// Prepare for transfer
CPartitionInfo *arrNewParts[2] = { *ppFirstHalf ? *ppFirstHalf : *ppSecondHalf, *ppSecondHalf }; // Transfer the work units:
// alternate first/second halves
// don't put more than "half deal units" tasks into the second half
// e.g. { 1, 2, 3, 4 }
// becomes: 1st half { 1, 2 }, 2nd half { 3, 4 }
for ( int k = 0; k < nCount; ++ k ) { int iHead = pPartitionLarge->m_WUs.Head(); WUIndexType iWU = pPartitionLarge->m_WUs[ iHead ]; pPartitionLarge->m_WUs.Remove( iHead );
int nHalf = !!( ( k % 2 ) || ( k >= nCount - 1 ) ); if ( k == 5 ) // no more than 2 jobs to branch off
arrNewParts[ 1 ] = arrNewParts[ 0 ]; */ int nHalf = !( k < nCount/2 ); CPartitionInfo *pTo = arrNewParts[ nHalf ];
CWULookupInfo &li = m_WULookup.Get( iWU ); li.m_iPartition = pTo->m_iPartition; li.m_iPartitionListIndex = pTo->m_WUs.AddToTail( iWU ); }
// LogPartitionsWorkUnits( pInfo );
return true; }
void AssignWUsToWorker( int iWorker ) { // Get rid of this worker's old partition.
int iPrevious = FindPartitionByWorker( iWorker ); if ( iPrevious != -1 ) { delete m_Partitions[iPrevious]; m_Partitions.Remove( iPrevious ); }
if ( g_iVMPIVerboseLevel >= 1 ) Msg( "A" );
CVisibleWindowVector< CWULookupInfo > &vlkup = m_WULookup; if ( CommandLine()->FindParm( "-mpi_NoScheduler" ) ) { Warning( "\n\n-mpi_NoScheduler found: Warning - this should only be used for testing and with 1 worker!\n\n" ); vlkup.ExpandWindow( m_pInfo->m_nWorkUnits ); CPartitionInfo *pPartition = AddPartition( iWorker ); for ( int i=0; i < m_pInfo->m_nWorkUnits; i++ ) { CWorkUnitInfo info; info.m_iWorkUnit = i;
CWULookupInfo &li = vlkup.Get( i ); li.m_iPartition = pPartition->m_iPartition; li.m_iPartitionListIndex = pPartition->m_WUs.AddToTail( i ); li.m_iWUInfo = m_WUInfo.AddToTail( info ); }
SendPartitionToWorker( pPartition, iWorker ); return; }
// Any partitions abandoned by workers?
int iAbandonedPartition = FindPartitionByWorker( -1 ); if ( -1 != iAbandonedPartition ) { CPartitionInfo *pPartition = m_Partitions[ iAbandonedPartition ]; pPartition->m_iWorker = iWorker; SendPartitionToWorker( pPartition, iWorker ); } // Any absolutely untouched partitions yet?
else if ( vlkup.PastVisibleIndex() < vlkup.PastPossibleIndex() ) { // Figure out how many WUs to include in a batch
int numWusToDeal = s_numWusToDeal; if ( numWusToDeal <= 0 ) { uint64 uiFraction = vlkup.PastPossibleIndex() / g_nMaxWorkerCount; Assert( uiFraction < INT_MAX/2 ); numWusToDeal = int( uiFraction ); if ( numWusToDeal <= 0 ) numWusToDeal = 8; }
// Allocate room for upcoming work units lookup
WUIndexType iBegin = vlkup.PastVisibleIndex(); WUIndexType iEnd = min( iBegin + g_nMaxWorkerCount * numWusToDeal, vlkup.PastPossibleIndex() ); vlkup.ExpandWindow( iEnd - 1 );
// Allocate a partition
size_t numPartitions = min( ( size_t )(iEnd - iBegin), ( size_t )g_nMaxWorkerCount ); CArrayAutoPtr< CPartitionInfo * > spArrPartitions( new CPartitionInfo* [ numPartitions ] ); CPartitionInfo **arrPartitions = spArrPartitions.Get();
arrPartitions[0] = AddPartition( iWorker ); for ( size_t k = 1; k < numPartitions; ++ k ) arrPartitions[k] = AddPartition( -1 );
// Assign upcoming work units to the partitions.
for ( WUIndexType i = iBegin ; i < iEnd; ++ i ) { CWorkUnitInfo info; info.m_iWorkUnit = i;
CPartitionInfo *pPartition = arrPartitions[ size_t( (i - iBegin) % numPartitions ) ];
CWULookupInfo &li = vlkup.Get( i ); li.m_iPartition = pPartition->m_iPartition; li.m_iPartitionListIndex = pPartition->m_WUs.AddToTail( i ); li.m_iWUInfo = m_WUInfo.AddToTail( info ); }
// Now send this guy the WU list in his partition.
SendPartitionToWorker( arrPartitions[0], iWorker ); }
// Split one of the last partitions to finish sooner
else { // Find a partition to split.
int iPartToSplit = FindSoonestPartition(); if ( iPartToSplit >= 0 ) { CPartitionInfo *pPartition = m_Partitions[ iPartToSplit ];
CPartitionInfo *pOldHalf = NULL, *pNewHalf = NULL; int iOldWorker = pPartition->m_iWorker, iNewWorker = iWorker; if ( SplitWUsPartition( pPartition, &pOldHalf, &pNewHalf, iOldWorker, iNewWorker ) ) { if ( pOldHalf ) SendPartitionToWorker( pOldHalf, iOldWorker ); if ( pNewHalf ) SendPartitionToWorker( pNewHalf, iNewWorker );
// Delete the partition that got split
Assert( pPartition->m_WUs.Count() == 0 ); delete pPartition; m_Partitions.Remove( iPartToSplit ); } } } }
int FindSoonestPartition() { CUtlLinkedList < CPartitionInfo *, int > &lst = m_Partitions;
// Sorted partitions
CUtlMap< CPartitionInfo::PartitionWUs *, int > sortedPartitions ( CompareSoonestWorkUnitSets ); sortedPartitions.EnsureCapacity( lst.Count() ); FOR_EACH_LL( lst, i ) { sortedPartitions.Insert( &lst[i]->m_WUs, i ); }
if ( sortedPartitions.Count() ) { return sortedPartitions.Element( sortedPartitions.FirstInorder() ); }
return lst.Head(); }
int FindPartitionByWorker( int iWorker ) { FOR_EACH_LL( m_Partitions, i ) { if ( m_Partitions[i]->m_iWorker == iWorker ) return i; } return -1; }
void SendPartitionToWorker( CPartitionInfo *pPartition, int iWorker ) { // Stuff the next nWUs work units into the buffer.
MessageBuffer mb; PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WU_ASSIGNMENT ); FOR_EACH_LL( pPartition->m_WUs, i ) { WUIndexType iWU = pPartition->m_WUs[i]; mb.write( &iWU, sizeof( iWU ) ); VMPITracker_WorkUnitSentToWorker( ( int ) iWU, iWorker ); }
VMPI_SendData( mb.data, mb.getLen(), iWorker ); }
virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) { return false; } private: CDSInfo *m_pInfo;
CUtlLinkedList<CPartitionInfo*,int> m_Partitions; CVisibleWindowVector<CWULookupInfo> m_WULookup; // Map work unit index to CWorkUnitInfo.
CUtlLinkedList<CWorkUnitInfo,int> m_WUInfo; // Sorted with most elegible WU at the head.
class CDistributor_DefaultWorker : public IWorkUnitDistributorWorker { public: virtual void Release() { delete this; }
virtual void Init( CDSInfo *pInfo ) { } virtual bool GetNextWorkUnit( WUIndexType *pWUIndex ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock();
// NOTE: this is called from INSIDE worker threads.
if ( m_WorkUnits.Count() == 0 ) { return false; } else { *pWUIndex = m_WorkUnits[ m_WorkUnits.Head() ]; m_WorkUnits.Remove( m_WorkUnits.Head() ); return true; } }
virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU ) { }
virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) { if ( pBuf->data[1] == DW_SUBPACKETID_WU_ASSIGNMENT ) { // If the message wasn't even related to the current DistributeWork() call we're on, ignore it.
if ( bIgnoreContents ) return true;
if ( ((pBuf->getLen() - pBuf->getOffset()) % sizeof( WUIndexType )) != 0 ) { Error( "DistributeWork: invalid work units packet from master" ); }
// Parse out the work unit indices.
CCriticalSectionLock csLock( &m_CS ); csLock.Lock();
int nIndices = (pBuf->getLen() - pBuf->getOffset()) / sizeof( WUIndexType ); for ( int i=0; i < nIndices; i++ ) { WUIndexType iWU; pBuf->read( &iWU, sizeof( iWU ) );
// Add the index to the list.
m_WorkUnits.AddToTail( iWU ); } csLock.Unlock();
return true; } else { return false; } }
// Threads eat up the list of WUs in here.
CCriticalSection m_CS; CUtlLinkedList<WUIndexType, int> m_WorkUnits; // A list of work units assigned to this worker
IWorkUnitDistributorMaster* CreateWUDistributor_DefaultMaster() { return new CDistributor_DefaultMaster; } IWorkUnitDistributorWorker* CreateWUDistributor_DefaultWorker() { return new CDistributor_DefaultWorker; }