Team Fortress 2 Source Code as on 22/4/2020
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
7.0 KiB

  1. //========= Copyright Valve Corporation, All rights reserved. ============//
  2. //
  3. // Purpose:
  4. //
  5. //=============================================================================
  6. #ifndef VMPI_DISTRIBUTE_WORK_INTERNAL_H
  7. #define VMPI_DISTRIBUTE_WORK_INTERNAL_H
  8. #ifdef _WIN32
  9. #pragma once
  10. #endif
  11. #define VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE 50
  12. typedef uint64 WUIndexType;
  13. class CDSInfo;
  14. extern bool g_bVMPIEarlyExit;
  15. // These classes are overridden to handle the details of communicating and scheduling work units.
  16. class IWorkUnitDistributorMaster
  17. {
  18. public:
  19. virtual void Release() = 0;
  20. virtual void DistributeWork_Master( CDSInfo *pInfo ) = 0;
  21. virtual void OnWorkerReady( int iWorker ) = 0;
  22. virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) = 0;
  23. // Called when the results for a work unit arrive. This function must return false if
  24. // we've already received the results for this work unit.
  25. virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit ) = 0;
  26. virtual void DisconnectHandler( int workerID ) = 0;
  27. };
  28. class IWorkUnitDistributorWorker
  29. {
  30. public:
  31. virtual void Release() = 0;
  32. virtual void Init( CDSInfo *pInfo ) = 0;
  33. // Called by worker threads to get the next work unit to do.
  34. virtual bool GetNextWorkUnit( WUIndexType *pWUIndex ) = 0;
  35. // Called by the worker threads after a work unit is completed.
  36. virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU ) = 0;
  37. // Called by the main thread when a packet comes in.
  38. virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) = 0;
  39. };
  40. template < typename T, typename Derived >
  41. class CVisibleWindowVectorT : protected CUtlVector < T >
  42. {
  43. typedef CUtlVector< T > BaseClass;
  44. public:
  45. CVisibleWindowVectorT() : m_uiBase( 0 ), m_uiTotal( 0 ) {}
  46. public:
  47. inline void PostInitElement( uint64 uiRealIdx, T &newElement ) { /* do nothing */ return; }
  48. inline void PostInitElement( uint64 uiRealIdx, T &newElement, T const &x ) { newElement = x; }
  49. public:
  50. // Resets the content and makes size "uiTotal"
  51. void Reset( uint64 uiTotal ) {
  52. BaseClass::RemoveAll();
  53. BaseClass::EnsureCapacity( min( 100, (int)uiTotal ) );
  54. m_uiBase = 0;
  55. m_uiTotal = uiTotal;
  56. }
  57. // Gets the element from the window, otherwise NULL
  58. T & Get( uint64 idx ) {
  59. T *pElement = (( idx >= m_uiBase && idx < m_uiBase + BaseClass::Count() ) ? ( BaseClass::Base() + size_t( idx - m_uiBase ) ) : NULL );
  60. Assert( pElement );
  61. return *pElement;
  62. }
  63. // Expands the window to see the element by its index
  64. void ExpandWindow( uint64 idxAccessible ) {
  65. Assert( idxAccessible >= m_uiBase );
  66. if ( idxAccessible >= m_uiBase + BaseClass::Count() ) {
  67. int iOldCount = BaseClass::Count();
  68. int iAddElements = int(idxAccessible - m_uiBase) - iOldCount + 1;
  69. Assert( iOldCount + iAddElements <= 100000000 /* really need 100 Mb at once? */ );
  70. BaseClass::AddMultipleToTail( iAddElements );
  71. for ( int iNewCount = BaseClass::Count(); iOldCount < iNewCount; ++ iOldCount )
  72. static_cast< Derived * >( this )->PostInitElement( m_uiBase + iOldCount, BaseClass::Element( iOldCount ) );
  73. }
  74. }
  75. // Expands the window and initializes the new elements to a given value
  76. void ExpandWindow( uint64 idxAccessible, T const& x ) {
  77. Assert( idxAccessible >= m_uiBase );
  78. if ( idxAccessible >= m_uiBase + BaseClass::Count() ) {
  79. int iOldCount = BaseClass::Count();
  80. int iAddElements = int(idxAccessible - m_uiBase) - iOldCount + 1;
  81. Assert( unsigned(iAddElements) <= 50000000 /* growing 50 Mb at once? */ );
  82. BaseClass::AddMultipleToTail( iAddElements );
  83. for ( int iNewCount = BaseClass::Count(); iOldCount < iNewCount; ++ iOldCount )
  84. static_cast< Derived * >( this )->PostInitElement( m_uiBase + iOldCount, BaseClass::Element( iOldCount ), x );
  85. }
  86. }
  87. // Shrinks the window to drop some of the elements from the head
  88. void ShrinkWindow( uint64 idxDrop ) {
  89. Assert( idxDrop >= m_uiBase && idxDrop <= m_uiBase + BaseClass::Count() );
  90. if ( idxDrop >= m_uiBase && idxDrop <= m_uiBase + BaseClass::Count() ) {
  91. int iDropElements = int( idxDrop - m_uiBase ) + 1;
  92. m_uiBase += iDropElements;
  93. BaseClass::RemoveMultiple( 0, min( iDropElements, BaseClass::Count() ) );
  94. }
  95. }
  96. // First possible index in this vector (only past dropped items)
  97. uint64 FirstPossibleIndex() const { return m_uiBase; }
  98. // Last possible index in this vector
  99. uint64 PastPossibleIndex() const { return m_uiTotal; }
  100. // Past visible window index in this vector
  101. uint64 PastVisibleIndex() const { return m_uiBase + BaseClass::Count(); }
  102. protected:
  103. uint64 m_uiBase, m_uiTotal;
  104. };
  105. template < typename T >
  106. class CVisibleWindowVector : public CVisibleWindowVectorT < T, CVisibleWindowVector< T > >
  107. {
  108. public:
  109. CVisibleWindowVector() {}
  110. };
  111. template < typename T >
  112. T const * GenericFind( T const *pBegin, T const *pEnd, T const &x )
  113. {
  114. for ( ; pBegin != pEnd; ++ pBegin )
  115. if ( *pBegin == x )
  116. break;
  117. return pBegin;
  118. }
  119. template < typename T >
  120. T * GenericFind( T *pBegin, T *pEnd, T const &x )
  121. {
  122. for ( ; pBegin != pEnd; ++ pBegin )
  123. if ( *pBegin == x )
  124. break;
  125. return pBegin;
  126. }
  127. class CWorkerInfo
  128. {
  129. public:
  130. ProcessWorkUnitFn m_pProcessFn;
  131. CVisibleWindowVector<WUIndexType> m_WorkUnitsRunning; // A list of work units currently running, index is the thread index
  132. CCriticalSection m_WorkUnitsRunningCS;
  133. };
  134. class CMasterInfo
  135. {
  136. public:
  137. // Only used by the master.
  138. ReceiveWorkUnitFn m_ReceiveFn;
  139. };
  140. class CDSInfo
  141. {
  142. public:
  143. inline void WriteWUIndex( WUIndexType iWU, MessageBuffer *pBuf )
  144. {
  145. if ( m_nWorkUnits <= 0xFFFF )
  146. {
  147. Assert( iWU <= 0xFFFF );
  148. unsigned short val = (unsigned short)iWU;
  149. pBuf->write( &val, sizeof( val ) );
  150. }
  151. else if ( m_nWorkUnits <= 0xFFFFFFFF )
  152. {
  153. Assert( iWU <= 0xFFFF );
  154. unsigned int val = (unsigned int)iWU;
  155. pBuf->write( &val, sizeof( val ) );
  156. }
  157. else
  158. {
  159. pBuf->write( &iWU, sizeof( iWU ) );
  160. }
  161. }
  162. inline void ReadWUIndex( WUIndexType *pWU, MessageBuffer *pBuf )
  163. {
  164. if ( m_nWorkUnits <= 0xFFFF )
  165. {
  166. unsigned short val;
  167. pBuf->read( &val, sizeof( val ) );
  168. *pWU = val;
  169. }
  170. else if ( m_nWorkUnits <= 0xFFFFFFFF )
  171. {
  172. unsigned int val;
  173. pBuf->read( &val, sizeof( val ) );
  174. *pWU = val;
  175. }
  176. else
  177. {
  178. pBuf->read( pWU, sizeof( *pWU ) );
  179. }
  180. }
  181. public:
  182. CWorkerInfo m_WorkerInfo;
  183. CMasterInfo m_MasterInfo;
  184. bool m_bMasterReady; // Set to true when the master is ready to go.
  185. bool m_bMasterFinished;
  186. WUIndexType m_nWorkUnits;
  187. char m_cPacketID;
  188. };
  189. // Called to write the packet ID, the subpacket ID, and the ID of which DistributeWork() call we're at.
  190. void PrepareDistributeWorkHeader( MessageBuffer *pBuf, unsigned char cSubpacketID );
  191. // Called from threads on the master to process a completed work unit.
  192. void NotifyLocalMasterCompletedWorkUnit( WUIndexType iWorkUnit );
  193. void CheckLocalMasterCompletedWorkUnits();
  194. // Creation functions for different distributors.
  195. IWorkUnitDistributorMaster* CreateWUDistributor_DefaultMaster();
  196. IWorkUnitDistributorWorker* CreateWUDistributor_DefaultWorker();
  197. IWorkUnitDistributorMaster* CreateWUDistributor_SDKMaster();
  198. IWorkUnitDistributorWorker* CreateWUDistributor_SDKWorker();
  199. #endif // VMPI_DISTRIBUTE_WORK_INTERNAL_H