Leaked source code of windows server 2003
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1078 lines
31 KiB

  1. /************************************************************************
  2. Copyright (c) 2000 - 2000 Microsoft Corporation
  3. Module Name :
  4. tasksched.cpp
  5. Abstract :
  6. Source file for task manager classes and routines.
  7. Author :
  8. Revision History :
  9. ***********************************************************************/
  10. #include "stdafx.h"
  11. #if !defined(BITS_V12_ON_NT4)
  12. #include "tasksched.tmh"
  13. #endif
  14. ////////////////////////////////////////////////////////////////////////////////////
  15. //
  16. // TaskSchedulerWorkItem
  17. //
  18. ////////////////////////////////////////////////////////////////////////////////////
  19. ////////////////////////////////////////////////////////////////////////////////////
  20. // Constructor/Destructor
  21. ////////////////////////////////////////////////////////////////////////////////////
  22. TaskSchedulerWorkItem::TaskSchedulerWorkItem( FILETIME *pTimeToRun ) :
  23. m_Container( NULL ),
  24. m_CancelEvent(NULL),
  25. m_ItemComplete(NULL),
  26. m_State(TASK_STATE_NOTHING),
  27. m_WorkGroup(NULL)
  28. {
  29. try
  30. {
  31. // All events are manual reset.
  32. // new items are complete
  33. m_CancelEvent = CreateEvent( NULL, TRUE, TRUE, NULL );
  34. if ( !m_CancelEvent )
  35. throw ComError( HRESULT_FROM_WIN32(GetLastError()));
  36. m_ItemComplete = CreateEvent( NULL, TRUE, FALSE, NULL );
  37. if ( !m_ItemComplete )
  38. throw ComError( HRESULT_FROM_WIN32(GetLastError()));
  39. }
  40. catch ( ComError Error )
  41. {
  42. this->~TaskSchedulerWorkItem();
  43. throw;
  44. }
  45. }
  46. TaskSchedulerWorkItem::~TaskSchedulerWorkItem()
  47. {
  48. if ( m_ItemComplete ) SetEvent( m_ItemComplete );
  49. if ( m_CancelEvent ) CloseHandle( m_CancelEvent );
  50. if ( m_ItemComplete ) CloseHandle( m_ItemComplete );
  51. }
  52. void
  53. TaskSchedulerWorkItem::Serialize(
  54. HANDLE hFile
  55. )
  56. {
  57. //
  58. // If this function changes, be sure that the metadata extension
  59. // constants are adequate.
  60. //
  61. bool fActive = g_Manager->m_TaskScheduler.IsWorkItemInScheduler( this );
  62. SafeWriteFile( hFile, fActive );
  63. if (fActive)
  64. {
  65. SafeWriteFile( hFile, m_InsertionTime );
  66. SafeWriteFile( hFile, m_TimeToRun );
  67. }
  68. }
  69. void
  70. TaskSchedulerWorkItem::Unserialize(
  71. HANDLE hFile
  72. )
  73. {
  74. bool fActive;
  75. SafeReadFile( hFile, &fActive );
  76. if (fActive)
  77. {
  78. SafeReadFile( hFile, &m_InsertionTime );
  79. SafeReadFile( hFile, &m_TimeToRun );
  80. LogTask("workitem %p : adding to scheduler for %I64d", this, FILETIMEToUINT64(m_TimeToRun) );
  81. g_Manager->m_TaskScheduler.InsertWorkItem( this, &m_TimeToRun );
  82. }
  83. else
  84. {
  85. LogTask("workitem %p: not in scheduler", this );
  86. }
  87. }
  88. ////////////////////////////////////////////////////////////////////////////////////
  89. //
  90. // TaskScheduler
  91. //
  92. ////////////////////////////////////////////////////////////////////////////////////
  93. ////////////////////////////////////////////////////////////////////////////////////
  94. // Constructor/Destructor
  95. ////////////////////////////////////////////////////////////////////////////////////
  96. TaskScheduler::TaskScheduler() :
  97. m_bShouldDie(false),
  98. m_WaitableTimer(NULL),
  99. m_ReaderLock(NULL),
  100. m_WriterSemaphore(NULL),
  101. m_ReaderCount(0),
  102. m_WorkItemTLS((DWORD)-1),
  103. m_WriterOwner(0),
  104. m_WorkerInitialized(NULL)
  105. {
  106. try
  107. {
  108. m_WorkItemTLS = TlsAlloc();
  109. if ( (DWORD)-1 == m_WorkItemTLS)
  110. throw ComError( HRESULT_FROM_WIN32(GetLastError()));
  111. m_SchedulerLock = CreateMutex( NULL, FALSE, NULL );
  112. if ( !m_SchedulerLock )
  113. throw ComError( HRESULT_FROM_WIN32(GetLastError()));
  114. m_WaitableTimer = CreateWaitableTimer( NULL, FALSE, NULL );
  115. if ( !m_WaitableTimer )
  116. throw ComError( HRESULT_FROM_WIN32(GetLastError()));
  117. // Create and autoreset event for synchronization on startup
  118. m_WorkerInitialized = CreateEvent( NULL, FALSE, FALSE, NULL );
  119. if ( !m_WorkerInitialized )
  120. throw ComError( HRESULT_FROM_WIN32(GetLastError()));
  121. m_ReaderLock = CreateMutex( NULL, FALSE, NULL );
  122. if ( !m_ReaderLock )
  123. throw ComError( HRESULT_FROM_WIN32(GetLastError()));
  124. m_WriterSemaphore = CreateSemaphore( NULL, 1, 1, NULL );
  125. if ( !m_WriterSemaphore )
  126. throw ComError( HRESULT_FROM_WIN32(GetLastError()));
  127. }
  128. catch ( ComError Error )
  129. {
  130. this->~TaskScheduler();
  131. throw;
  132. }
  133. }
  134. TaskScheduler::~TaskScheduler()
  135. {
  136. if ((DWORD)-1 != m_WorkItemTLS)
  137. TlsFree( m_WorkItemTLS );
  138. if ( m_SchedulerLock )
  139. CloseHandle( m_SchedulerLock );
  140. if ( m_WaitableTimer )
  141. CloseHandle( m_WaitableTimer );
  142. if ( m_WorkerInitialized )
  143. CloseHandle( m_WorkerInitialized );
  144. if ( m_ReaderLock )
  145. CloseHandle( m_ReaderLock );
  146. if ( m_WriterSemaphore )
  147. CloseHandle( m_WriterSemaphore );
  148. }
  149. //////////////////////////////////////////////////////////////////////////////////////////
  150. // WorkItem control
  151. //////////////////////////////////////////////////////////////////////////////////////////
  152. bool TaskScheduler::CancelWorkItem( TaskSchedulerWorkItem * pWorkItem )
  153. {
  154. LogTask( "cancelling %p", pWorkItem );
  155. RTL_VERIFY( WAIT_OBJECT_0 == WaitForSingleObject( m_SchedulerLock, INFINITE ) );
  156. DWORD dwResult = WaitForSingleObject( pWorkItem->m_ItemComplete , 0 );
  157. if ( WAIT_OBJECT_0 == dwResult )
  158. {
  159. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  160. return true; // Job completed before the cancel
  161. }
  162. // If canceling the current work item, call Acknowlege immedialtly
  163. if ( GetCurrentWorkItem() == pWorkItem )
  164. {
  165. LogTask( "Canceling work item %p, we are the owner", pWorkItem );
  166. RTL_VERIFY( SetEvent( pWorkItem->m_CancelEvent ) );
  167. AcknowledgeWorkItemCancel();
  168. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  169. return false; // Job canceled
  170. }
  171. //
  172. // Remove the work item from its list.
  173. //
  174. switch( pWorkItem->m_State )
  175. {
  176. case TASK_STATE_WAITING:
  177. {
  178. m_WaitingList.erase( *pWorkItem );
  179. pWorkItem->m_State = TASK_STATE_CANCELED;
  180. pWorkItem->m_WorkGroup = NULL;
  181. Reschedule();
  182. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  183. return false;
  184. }
  185. case TASK_STATE_READY:
  186. {
  187. TaskSchedulerWorkGroup *pGroup =
  188. static_cast<TaskSchedulerWorkGroup*>(pWorkItem->m_WorkGroup);
  189. pGroup->m_ReadyList.erase( *pWorkItem );
  190. // Kill one on the semaphore
  191. RTL_VERIFY( WAIT_OBJECT_0 == WaitForSingleObject( pGroup->m_ItemAvailableSemaphore, 0 ) );
  192. pWorkItem->m_State = TASK_STATE_CANCELED;
  193. pWorkItem->m_WorkGroup = NULL;
  194. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  195. return false;
  196. }
  197. case TASK_STATE_RUNNING:
  198. {
  199. // cancelling on another thread
  200. RTL_VERIFY( SetEvent( pWorkItem->m_CancelEvent ) );
  201. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  202. dwResult = WaitForSingleObject( pWorkItem->m_ItemComplete, INFINITE );
  203. ASSERT( WAIT_OBJECT_0 == dwResult );
  204. return WAIT_OBJECT_0 != dwResult;
  205. }
  206. case TASK_STATE_CANCELED:
  207. case TASK_STATE_COMPLETE:
  208. case TASK_STATE_NOTHING:
  209. default:
  210. ASSERT( TASK_STATE_CANCELED == pWorkItem->m_State ||
  211. TASK_STATE_COMPLETE == pWorkItem->m_State ||
  212. TASK_STATE_NOTHING == pWorkItem->m_State );
  213. ASSERT( NULL == pWorkItem->m_WorkGroup );
  214. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  215. return true;
  216. }
  217. }
  218. void TaskScheduler::CompleteWorkItem( bool bCancel )
  219. {
  220. RTL_VERIFY( WaitForSingleObject( m_SchedulerLock, INFINITE ) == WAIT_OBJECT_0 );
  221. TaskSchedulerWorkItem *pWorkItem = GetCurrentWorkItem();
  222. LogTask( "completing %p", pWorkItem );
  223. // ASSERT( pWorkItem );
  224. if (pWorkItem)
  225. {
  226. RTL_VERIFY( TlsSetValue( m_WorkItemTLS, NULL ) );
  227. TaskSchedulerWorkGroup *pGroup =
  228. static_cast<TaskSchedulerWorkGroup*>(pWorkItem->m_WorkGroup);
  229. pGroup->m_RunningList.erase( *pWorkItem );
  230. pWorkItem->m_WorkGroup = NULL;
  231. pWorkItem->m_State = bCancel ? TASK_STATE_CANCELED : TASK_STATE_COMPLETE;
  232. RTL_VERIFY( SetEvent( pWorkItem->m_ItemComplete ));
  233. }
  234. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  235. }
  236. void TaskScheduler::DispatchWorkItem()
  237. {
  238. TaskSchedulerWorkItem *pWorkItem = NULL;
  239. RTL_VERIFY( WaitForSingleObject( m_SchedulerLock, INFINITE ) == WAIT_OBJECT_0 );
  240. // Move all the jobs that are available from waiting
  241. // to ready
  242. while ( !m_WaitingList.empty() )
  243. {
  244. FILETIME ftCurrentTime;
  245. GetSystemTimeAsFileTime( &ftCurrentTime );
  246. TaskSchedulerWorkItem * pHeadItem = &(*m_WaitingList.begin());
  247. UINT64 CurrentTime = FILETIMEToUINT64( ftCurrentTime );
  248. UINT64 HeadTime = FILETIMEToUINT64( pHeadItem->m_TimeToRun );
  249. if ( HeadTime > CurrentTime )
  250. {
  251. // All the jobs in the list are still waiting,
  252. // let them continue waiting
  253. break;
  254. }
  255. // transfer the head work item from the waiting list
  256. // to the ready list of the correct work group
  257. m_WaitingList.erase( *pHeadItem );
  258. AddItemToWorkGroup( pHeadItem->GetSid(), pHeadItem );
  259. }
  260. Reschedule();
  261. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  262. }
  263. void
  264. TaskScheduler::InsertDelayedWorkItem(
  265. TaskSchedulerWorkItem *pWorkItem,
  266. UINT64 Delay100Nsec
  267. )
  268. {
  269. FILETIME ftCurrentTime;
  270. GetSystemTimeAsFileTime( &ftCurrentTime );
  271. UINT64 TimeToRun = Delay100Nsec + FILETIMEToUINT64( ftCurrentTime );
  272. FILETIME ftTimeToRun = UINT64ToFILETIME( TimeToRun );
  273. InsertWorkItem( pWorkItem, &ftTimeToRun );
  274. }
  275. void
  276. TaskScheduler::RescheduleDelayedTask(
  277. TaskSchedulerWorkItem *pWorkItem,
  278. UINT64 Delay100Nsec
  279. )
  280. {
  281. // Resets the time for the work item to run to be Delay100NSec after
  282. // the insertion time.
  283. // If the work item is not in the queue, running, completed,
  284. // or canceled then this operation is ignored.
  285. // Otherwise, the job is rescheduled.
  286. LogTask( "rescheduling %p", pWorkItem );
  287. RTL_VERIFY( WaitForSingleObject( m_SchedulerLock, INFINITE ) == WAIT_OBJECT_0 );
  288. // If the work item is not on a running list or the pending list,
  289. // ignore the call.
  290. if ( TASK_STATE_READY == pWorkItem->m_State )
  291. {
  292. TaskSchedulerWorkGroup *pGroup =
  293. static_cast<TaskSchedulerWorkGroup*>( pWorkItem->m_WorkGroup );
  294. pGroup->m_ReadyList.erase( *pWorkItem );
  295. RTL_VERIFY( WAIT_OBJECT_0 == WaitForSingleObject( pGroup->m_ItemAvailableSemaphore, 0 ) );
  296. }
  297. else if ( TASK_STATE_WAITING == pWorkItem->m_State )
  298. {
  299. m_WaitingList.erase( *pWorkItem );
  300. }
  301. else
  302. {
  303. LogTask( "item %p not pending. Ignoring.", pWorkItem );
  304. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  305. return;
  306. }
  307. UINT64 TimeToRun = Delay100Nsec + FILETIMEToUINT64( pWorkItem->m_InsertionTime );
  308. pWorkItem->m_TimeToRun = UINT64ToFILETIME( TimeToRun );
  309. m_WaitingList.insert( *pWorkItem );
  310. pWorkItem->m_State = TASK_STATE_WAITING;
  311. pWorkItem->m_WorkGroup = NULL;
  312. Reschedule();
  313. LogTask( "item %p rescheduled", pWorkItem );
  314. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  315. }
  316. inline INT64 abs(INT64 x)
  317. {
  318. if (x >= 0)
  319. {
  320. return x;
  321. }
  322. else
  323. {
  324. return -x;
  325. }
  326. }
  327. void TaskScheduler::InsertWorkItem( TaskSchedulerWorkItem *pWorkItem, FILETIME *pTimeToRun )
  328. {
  329. {
  330. INT64 Difference;
  331. FILETIME ftCurrentTime;
  332. GetSystemTimeAsFileTime( &ftCurrentTime );
  333. if (pTimeToRun)
  334. {
  335. Difference = INT64(FILETIMEToUINT64( *pTimeToRun )) - INT64(FILETIMEToUINT64( ftCurrentTime ));
  336. if (abs(Difference) > 86400 * NanoSec100PerSec)
  337. {
  338. LogTask( "inserting %p; activates in %f days", pWorkItem, float(Difference) / (float(NanoSec100PerSec) * 86400) );
  339. }
  340. else
  341. {
  342. LogTask( "inserting %p; activates in %f seconds", pWorkItem, float(Difference) / float(NanoSec100PerSec) );
  343. }
  344. }
  345. else
  346. {
  347. LogTask( "inserting %p; activates now", pWorkItem );
  348. }
  349. }
  350. RTL_VERIFY( WaitForSingleObject( m_SchedulerLock, INFINITE ) == WAIT_OBJECT_0 );
  351. GetSystemTimeAsFileTime( &pWorkItem->m_InsertionTime );
  352. RTL_VERIFY( ResetEvent( pWorkItem->m_CancelEvent ) );
  353. RTL_VERIFY( ResetEvent( pWorkItem->m_ItemComplete ) );
  354. if ( !pTimeToRun && !m_bShouldDie )
  355. {
  356. pWorkItem->m_TimeToRun = pWorkItem->m_InsertionTime;
  357. AddItemToWorkGroup( pWorkItem->GetSid(), pWorkItem );
  358. }
  359. else
  360. {
  361. if (pTimeToRun)
  362. {
  363. pWorkItem->m_TimeToRun = *pTimeToRun;
  364. }
  365. else
  366. {
  367. GetSystemTimeAsFileTime( &pWorkItem->m_TimeToRun );
  368. }
  369. pWorkItem->m_State = TASK_STATE_WAITING;
  370. m_WaitingList.insert( *pWorkItem );
  371. Reschedule();
  372. }
  373. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  374. }
  375. bool TaskScheduler::IsWorkItemInScheduler( TaskSchedulerWorkItem *pWorkItem )
  376. {
  377. bool b;
  378. RTL_VERIFY( WaitForSingleObject( m_SchedulerLock, INFINITE ) == WAIT_OBJECT_0 );
  379. b = ( TASK_STATE_WAITING == pWorkItem->m_State ||
  380. TASK_STATE_READY == pWorkItem->m_State ||
  381. TASK_STATE_RUNNING == pWorkItem->m_State );
  382. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  383. return b;
  384. }
  385. void TaskScheduler::Reschedule()
  386. {
  387. if ( m_WaitingList.empty() )
  388. {
  389. // Nothing to do, cancel waitable timer.
  390. RTL_VERIFY( CancelWaitableTimer( m_WaitableTimer ) );
  391. return;
  392. }
  393. LARGE_INTEGER NextItemTime;
  394. FILETIME ftNextItemTime = (*m_WaitingList.begin()).m_TimeToRun;
  395. NextItemTime.QuadPart = (INT64)FILETIMEToUINT64( ftNextItemTime );
  396. RTL_VERIFY(
  397. SetWaitableTimer(
  398. m_WaitableTimer,
  399. &NextItemTime,
  400. 0,
  401. NULL,
  402. NULL,
  403. FALSE ) );
  404. }
  405. /////////////////////////////////////////////////////////////////////////////////////////////////
  406. // Reader/Writer lock
  407. //
  408. // Algorithm:
  409. //
  410. // Writer:
  411. // Wait on writer lock and cancel event. Return when either is signaled
  412. //
  413. // Unlock writer:
  414. // Release the writer lock
  415. //
  416. // Lock reader:
  417. // Lock reader lock to protect count. If I am the first reader, grab the writer semaphore.
  418. // Unlock reader lock. If on either wait the cancel event is signaled, abort.
  419. //
  420. // Unlock reader:
  421. // Decrement the reader count. If last reader, release the writer lock.
  422. //
  423. /////////////////////////////////////////////////////////////////////////////////////////////////
  424. bool TaskScheduler::LockReader()
  425. {
  426. LogLock( "reader" );
  427. HANDLE hCancel = GetCancelEvent();
  428. if ( !hCancel )
  429. {
  430. RTL_VERIFY( WaitForSingleObject( m_ReaderLock, INFINITE ) == WAIT_OBJECT_0 );
  431. // InterlockedIncrement returns the new value
  432. if ( InterlockedIncrement( &m_ReaderCount ) == 1 )
  433. {
  434. RTL_VERIFY( WaitForSingleObject( m_WriterSemaphore, INFINITE ) == WAIT_OBJECT_0 );
  435. }
  436. RTL_VERIFY( ReleaseMutex( m_ReaderLock ) );
  437. LogLock("reader lock acquired");
  438. ASSERT( !m_WriterOwner );
  439. return false;
  440. }
  441. DWORD dwResult;
  442. HANDLE hReaderLockHandles[2];
  443. hReaderLockHandles[0] = hCancel;
  444. hReaderLockHandles[1] = m_ReaderLock;
  445. dwResult = WaitForMultipleObjects( 2, hReaderLockHandles, false, INFINITE );
  446. switch ( dwResult )
  447. {
  448. case WAIT_OBJECT_0 + 0:
  449. // cancel request
  450. LogLock( "Cancel requested, aborting read lock" );
  451. return true;
  452. case WAIT_OBJECT_0 + 1:
  453. // lock acquired
  454. break;
  455. default:
  456. ASSERT(0);
  457. }
  458. bool bReturnVal = false;
  459. ULONG NewReaderCount = InterlockedIncrement( &m_ReaderCount );
  460. if (1 == NewReaderCount )
  461. {
  462. LogLock("First reader, need to block writers");
  463. HANDLE hWriterLockHandles[2];
  464. hWriterLockHandles[0] = hCancel;
  465. hWriterLockHandles[1] = m_WriterSemaphore;
  466. dwResult = WaitForMultipleObjects( 2, hWriterLockHandles, false, INFINITE );
  467. switch ( dwResult )
  468. {
  469. case WAIT_OBJECT_0 + 0:
  470. // cancel request
  471. LogLock( "Cancel requested, aborting acquire of writer lock");
  472. InterlockedDecrement( &m_ReaderCount );
  473. bReturnVal = true;
  474. break;
  475. case WAIT_OBJECT_0 + 1:
  476. // lock acquired
  477. break;
  478. default:
  479. ASSERT(0);
  480. }
  481. }
  482. RTL_VERIFY( ReleaseMutex( m_ReaderLock ) );
  483. if (!bReturnVal)
  484. {
  485. LogLock("reader lock acquired");
  486. ASSERT( !m_WriterOwner );
  487. }
  488. return bReturnVal;
  489. }
  490. void TaskScheduler::UnlockReader()
  491. {
  492. LogLock( "reader unlock" );
  493. LONG lNewReaderCount = InterlockedDecrement( &m_ReaderCount );
  494. ASSERT( lNewReaderCount >= 0 );
  495. if (!lNewReaderCount ) //Last reader
  496. {
  497. LogLock( "Last reader, letting writers pass" );
  498. RTL_VERIFY( ReleaseSemaphore( m_WriterSemaphore, 1, NULL ) );
  499. }
  500. LogLock( "Unlocked read access to lock" );
  501. }
  502. bool TaskScheduler::LockWriter()
  503. {
  504. LogLock( "writer lock" );
  505. HANDLE hCancel = GetCancelEvent();
  506. if (!hCancel)
  507. {
  508. RTL_VERIFY( WaitForSingleObject( m_WriterSemaphore, INFINITE ) == WAIT_OBJECT_0 );
  509. ASSERT( !m_WriterOwner );
  510. m_WriterOwner = GetCurrentThreadId();
  511. LogLock("Lock acquired with write access");
  512. return false;
  513. }
  514. HANDLE hHandles[2];
  515. hHandles[0] = hCancel;
  516. hHandles[1] = m_WriterSemaphore;
  517. DWORD dwResult = WaitForMultipleObjects( 2, hHandles, false, INFINITE );
  518. switch ( dwResult )
  519. {
  520. case WAIT_OBJECT_0 + 0:
  521. // cancel request
  522. LogLock("Cancel requested, aborting lock with write access");
  523. return true;
  524. case WAIT_OBJECT_0 + 1:
  525. // lock acquired
  526. ASSERT( !m_WriterOwner );
  527. m_WriterOwner = GetCurrentThreadId();
  528. LogLock("Lock acquired with write access");
  529. return false;
  530. default:
  531. ASSERT(0);
  532. return false;
  533. }
  534. }
  535. void TaskScheduler::UnlockWriter()
  536. {
  537. LogLock( "writer unlock" );
  538. ASSERT( GetCurrentThreadId() == m_WriterOwner );
  539. m_WriterOwner = 0;
  540. RTL_VERIFY( ReleaseSemaphore( m_WriterSemaphore, 1, NULL ) );
  541. LogLock("Unlocked lock with write access");
  542. }
  543. TaskScheduler::TaskSchedulerWorkGroup::TaskSchedulerWorkGroup(
  544. SidHandle Sid ) :
  545. m_Sid(Sid),
  546. m_ItemAvailableSemaphore(NULL),
  547. m_Threads(0),
  548. m_BusyThreads(0)
  549. {
  550. memset( m_Thread, 0, sizeof( m_Thread ) );
  551. memset( m_ThreadId, 0, sizeof( m_ThreadId ) );
  552. m_ItemAvailableSemaphore =
  553. CreateSemaphore(
  554. NULL,
  555. 0, // InitialCount
  556. 0x7FFFFFFF, // MaxCount
  557. NULL );
  558. if ( !m_ItemAvailableSemaphore )
  559. throw ComError( HRESULT_FROM_WIN32( GetLastError() ) );
  560. }
  561. TaskScheduler::TaskSchedulerWorkGroup::~TaskSchedulerWorkGroup()
  562. {
  563. if ( m_ItemAvailableSemaphore )
  564. CloseHandle( m_ItemAvailableSemaphore );
  565. }
  566. void
  567. TaskScheduler::AddItemToWorkGroup(
  568. SidHandle Sid,
  569. TaskSchedulerWorkItem *pWorkItem )
  570. {
  571. // If the work group has alread been created,
  572. // don't create it again
  573. WorkGroupMapType::iterator i = m_WorkGroupMap.find( Sid );
  574. TaskSchedulerWorkGroup *pWorkGroup = NULL;
  575. if ( m_WorkGroupMap.end() != i )
  576. {
  577. pWorkGroup = (*i).second;
  578. }
  579. else
  580. {
  581. LogTask( "Creating a new work group" );
  582. while(1)
  583. {
  584. try
  585. {
  586. pWorkGroup = new TaskSchedulerWorkGroup( Sid );
  587. m_WorkGroupMap.insert( WorkGroupMapType::value_type( Sid, pWorkGroup ) );
  588. LogTask( "Created new workgroup %p", pWorkGroup );
  589. break;
  590. }
  591. catch( ComError Error )
  592. {
  593. LogError( "Unable to create new workgroup sleeping, error %!winerr!", Error.Error() );
  594. m_WorkGroupMap.erase( Sid );
  595. delete pWorkGroup;
  596. pWorkGroup = NULL;
  597. Sleep( 5000 );
  598. }
  599. }
  600. }
  601. LogInfo( "Adding %p to workgroup %p", pWorkItem, pWorkGroup );
  602. pWorkGroup->m_ReadyList.insert( *pWorkItem );
  603. pWorkItem->m_State = TASK_STATE_READY;
  604. pWorkItem->m_WorkGroup = pWorkGroup;
  605. RTL_VERIFY( ReleaseSemaphore( pWorkGroup->m_ItemAvailableSemaphore, 1, NULL ) );
  606. // use a very aproximative heuristic to determine when to add more threads.
  607. // The load is the number of work items that are ready to run plus the number
  608. // of items being worked on(busy threads). See the note below why the number of
  609. // ready work items is not a good estimate.
  610. size_t Load = pWorkGroup->m_ReadyList.size() + pWorkGroup->m_BusyThreads;
  611. if ( Load > pWorkGroup->m_Threads &&
  612. pWorkGroup->m_Threads < MAX_WORKGROUP_THREADS )
  613. {
  614. LogInfo( "load of %u and %u threads. Add another thread",
  615. Load, pWorkGroup->m_Threads );
  616. while(1)
  617. {
  618. m_NewWorkerGroup = pWorkGroup;
  619. ASSERT( m_WorkGroupMap.end() != m_WorkGroupMap.find( m_NewWorkerGroup->m_Sid ) );
  620. RTL_VERIFY( ResetEvent( m_WorkerInitialized ) );
  621. HANDLE & ThreadHandle = pWorkGroup->m_Thread[ pWorkGroup->m_Threads ];
  622. DWORD & ThreadId = pWorkGroup->m_ThreadId[ pWorkGroup->m_Threads ];
  623. ThreadHandle =
  624. CreateThread(
  625. NULL, // security descriptor
  626. 0, // Use default stack
  627. TaskScheduler::WorkGroupWorkerThunk,
  628. static_cast<LPVOID>( this ),
  629. 0,
  630. &ThreadId );
  631. if ( !ThreadHandle )
  632. {
  633. LogError( "Unable to create new worker, error %!winerr!", GetLastError() );
  634. Sleep( 5000 );
  635. continue;
  636. }
  637. LogTask( "Created new worker with a handle %p, ID %u", ThreadHandle, ThreadId );
  638. HANDLE WaitHandles[2] = { ThreadHandle, m_WorkerInitialized };
  639. DWORD dwResult =
  640. WaitForMultipleObjectsEx(
  641. 2,
  642. WaitHandles,
  643. FALSE,
  644. INFINITE,
  645. FALSE );
  646. switch( dwResult )
  647. {
  648. case WAIT_OBJECT_0:
  649. GetExitCodeThread( ThreadHandle, &dwResult );
  650. LogError( "Thread exited with code %!winerr!, sleeping", dwResult );
  651. CloseHandle( ThreadHandle );
  652. ThreadHandle = 0;
  653. ThreadId = 0;
  654. Sleep( 5000 );
  655. continue;
  656. case WAIT_OBJECT_0 + 1:
  657. break;
  658. default:
  659. LogError( "Unexpected error, %!winerr!", dwResult );
  660. ASSERT( 0 );
  661. }
  662. LogTask( "Worker signaled success" );
  663. m_NewWorkerGroup = NULL;
  664. pWorkGroup->m_Threads++;
  665. break;
  666. }
  667. }
  668. }
  669. void
  670. TaskScheduler::KillBackgroundTasks()
  671. {
  672. LogTask( "Killing background threads" );
  673. RTL_VERIFY( WaitForSingleObject( m_SchedulerLock, INFINITE ) == WAIT_OBJECT_0 );
  674. m_bShouldDie = TRUE;
  675. DWORD Result;
  676. while(1)
  677. {
  678. if ( m_WorkGroupMap.empty() )
  679. {
  680. LogTask( "No more work groups, all done" );
  681. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  682. return;
  683. }
  684. TaskSchedulerWorkGroup *pGroup = (*m_WorkGroupMap.begin()).second;
  685. RTL_VERIFY( ReleaseSemaphore( pGroup->m_ItemAvailableSemaphore, pGroup->m_Threads, NULL ) );
  686. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  687. Result = WaitForMultipleObjects( pGroup->m_Threads, pGroup->m_Thread, TRUE, INFINITE );
  688. // WAIT_OBJECT_0 == 0 so Result >= WAIT_OBJECT_0 is always true
  689. ASSERT( Result < WAIT_OBJECT_0 + pGroup->m_Threads );
  690. RTL_VERIFY( WaitForSingleObject( m_SchedulerLock, INFINITE ) == WAIT_OBJECT_0 );
  691. for(size_t c=0; c < pGroup->m_Threads; c++ )
  692. {
  693. CloseHandle( pGroup->m_Thread[c] );
  694. }
  695. m_WorkGroupMap.erase( pGroup->m_Sid );
  696. delete pGroup;
  697. LogTask( "Killed everyone in work group %p", pGroup );
  698. }
  699. }
  700. DWORD BackgroundThreadProcFilter(
  701. LPEXCEPTION_POINTERS ExceptionPointers );
  702. DWORD
  703. TaskScheduler::WorkGroupWorkerThunk( void *pContext )
  704. {
  705. __try
  706. {
  707. return
  708. static_cast<TaskScheduler*>( pContext )->WorkGroupWorker();
  709. }
  710. __except( BackgroundThreadProcFilter(
  711. GetExceptionInformation() ) )
  712. {
  713. ASSERT( 0 );
  714. }
  715. ASSERT( 0 );
  716. return 0;
  717. }
  718. DWORD
  719. TaskScheduler::WorkGroupWorker( )
  720. {
  721. HRESULT Hr;
  722. LogTask( "I'm alive!" );
  723. Hr = CoInitializeEx( NULL, COINIT_MULTITHREADED );
  724. if ( FAILED( Hr ) )
  725. {
  726. LogError( "CoInitializeEx failed, %!winerr!", Hr );
  727. return (DWORD)(Hr);
  728. }
  729. TaskSchedulerWorkGroup *pGroup = m_NewWorkerGroup;
  730. ASSERT( m_WorkGroupMap.end() != m_WorkGroupMap.find( pGroup->m_Sid ) );
  731. RTL_VERIFY( SetEvent( m_WorkerInitialized ) );
  732. LogTask( "Initialization complete" );
  733. while(1)
  734. {
  735. TaskSchedulerWorkItem *pWorkItem = NULL;
  736. HANDLE Handles[] = { pGroup->m_ItemAvailableSemaphore, m_SchedulerLock };
  737. DWORD dwWaitResult =
  738. WaitForMultipleObjectsEx(
  739. sizeof(Handles)/sizeof(*Handles),
  740. Handles,
  741. TRUE, // Wait for all events
  742. 30000,
  743. FALSE ); // ablertable wait
  744. switch( dwWaitResult )
  745. {
  746. case WAIT_OBJECT_0:
  747. case WAIT_OBJECT_0+1:
  748. break;
  749. case WAIT_TIMEOUT:
  750. {
  751. LogInfo( "Timeout expired, check if we have something to do");
  752. RTL_VERIFY( WaitForSingleObject( m_SchedulerLock, INFINITE ) == WAIT_OBJECT_0 );
  753. if ( pGroup->m_ReadyList.empty() )
  754. {
  755. goto cleanup_on_timeout;
  756. }
  757. else
  758. {
  759. LogTask( "Still stuff to do, stay alive" );
  760. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  761. continue;
  762. }
  763. }
  764. default:
  765. ASSERT(0);
  766. }
  767. if ( m_bShouldDie )
  768. {
  769. LogTask( "Ordered to die, do so" );
  770. goto dodie;
  771. }
  772. ASSERT( !pGroup->m_ReadyList.empty() );
  773. // Get first item in ready list and move
  774. // it over to running list.
  775. pWorkItem = &(*pGroup->m_ReadyList.begin());
  776. pGroup->m_ReadyList.erase( *pWorkItem );
  777. pGroup->m_RunningList.insert( *pWorkItem );
  778. pWorkItem->m_State = TASK_STATE_RUNNING;
  779. ASSERT( pGroup == pWorkItem->m_WorkGroup );
  780. // Mark this thread as busy
  781. // NOTE: This counter is needed because some
  782. // code marks work items as complete even though
  783. // the really arn't complete yet. So we need
  784. // to have this to indicatate has many threads
  785. // are really available.
  786. InterlockedIncrement( &pGroup->m_BusyThreads );
  787. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  788. // Now do the real dispatching
  789. LogTask( "dispatching %p", pWorkItem );
  790. RTL_VERIFY( TlsSetValue( m_WorkItemTLS, pWorkItem ) );
  791. pWorkItem->OnDispatch();
  792. if (GetCurrentWorkItem())
  793. CompleteWorkItem();
  794. // Mark this thread as free
  795. InterlockedDecrement( &pGroup->m_BusyThreads );
  796. }
  797. cleanup_on_timeout:
  798. if ( 1 == pGroup->m_Threads )
  799. {
  800. // If were the last thread, destroy the workgroup
  801. LogTask( "We are the only thread, destroy work group %p", pGroup );
  802. CloseHandle( pGroup->m_Thread[0] );
  803. WorkGroupMapType::iterator i = m_WorkGroupMap.find( pGroup->m_Sid );
  804. ASSERT( m_WorkGroupMap.end() != i );
  805. m_WorkGroupMap.erase( i );
  806. delete pGroup;
  807. }
  808. else
  809. {
  810. // we were not the last thread, so remove ourselves from the list.
  811. // First, find the slot for this thread.
  812. size_t index = 0;
  813. for (;index < pGroup->m_Threads; index++ )
  814. {
  815. if ( GetCurrentThreadId() == pGroup->m_ThreadId[index] )
  816. break;
  817. }
  818. ASSERT( index < pGroup->m_Threads );
  819. LogTask( "We are not the only thread, remove thread in slot %u", index );
  820. CloseHandle( pGroup->m_Thread[index] );
  821. // collapse the list
  822. size_t slots = pGroup->m_Threads - index - 1;
  823. memmove( &pGroup->m_Thread[index], &pGroup->m_Thread[index+1], slots * sizeof(*pGroup->m_Thread) );
  824. memmove( &pGroup->m_ThreadId[index], &pGroup->m_ThreadId[index+1], slots * sizeof(*pGroup->m_ThreadId) );
  825. pGroup->m_Threads--;
  826. pGroup->m_Thread[pGroup->m_Threads] = 0;
  827. pGroup->m_ThreadId[pGroup->m_Threads] = 0;
  828. }
  829. dodie:
  830. RTL_VERIFY( ReleaseMutex( m_SchedulerLock ) );
  831. CoUninitialize();
  832. return 0;
  833. }
  834. //------------------------------------------------------------------------
  835. void
  836. ReleaseWriteLock( bool & bNeedLock )
  837. {
  838. bNeedLock = false;
  839. if (g_Manager->m_TaskScheduler.IsWriter())
  840. {
  841. g_Manager->m_TaskScheduler.UnlockWriter();
  842. bNeedLock = true;
  843. }
  844. }
  845. void
  846. ReclaimWriteLock( bool & bNeedLock )
  847. {
  848. bool bCancelled = false;
  849. if (bNeedLock && !g_Manager->m_TaskScheduler.IsWriter())
  850. {
  851. while (g_Manager->m_TaskScheduler.LockWriter() )
  852. {
  853. g_Manager->m_TaskScheduler.AcknowledgeWorkItemCancel();
  854. bCancelled = true;
  855. }
  856. bNeedLock = false;
  857. }
  858. if (bCancelled)
  859. {
  860. LogInfo("can't retake writer lock: the workitem was cancelled");
  861. throw ComError( S_FALSE );
  862. }
  863. }