Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1327 lines
29 KiB

  1. //+---------------------------------------------------------------------------
  2. //
  3. // Microsoft Windows
  4. // Copyright (C) Microsoft Corporation, 1992 - 1997.
  5. //
  6. // File: thdpool.c
  7. //
  8. // Contents: Home of the SPM thread pool
  9. //
  10. // Classes:
  11. //
  12. // Functions:
  13. //
  14. // History: 6-08-93 RichardW Created
  15. //
  16. //----------------------------------------------------------------------------
  17. #include <lsapch.hxx>
  18. #define POOL_SEM_LIMIT 0x7FFFFFFF
  19. #define MAX_POOL_THREADS_HARD 256
  20. #define MAX_SUBQUEUE_THREADS (4 * MAX_POOL_THREADS_HARD)
  21. LSAP_TASK_QUEUE GlobalQueue;
  22. //
  23. // Local Prototypes:
  24. //
  25. typedef enum _LSAP_TASK_STATUS {
  26. TaskNotQueued,
  27. TaskQueued,
  28. TaskUnknown
  29. } LSAP_TASK_STATUS ;
  30. LSAP_TASK_STATUS
  31. EnqueueThreadTask(
  32. PLSAP_TASK_QUEUE pQueue,
  33. PLSAP_THREAD_TASK pTask,
  34. BOOLEAN fUrgent);
  35. #define LockQueue(q) EnterCriticalSection(&((q)->Lock))
  36. #define UnlockQueue(q) LeaveCriticalSection(&((q)->Lock))
  37. //+---------------------------------------------------------------------------
  38. //
  39. // Function: InitializeTaskQueue
  40. //
  41. // Synopsis: Initialize a Queue Structure
  42. //
  43. // Arguments: [pQueue] --
  44. // [Type] --
  45. //
  46. // History: 11-10-95 RichardW Created
  47. //
  48. // Notes:
  49. //
  50. //----------------------------------------------------------------------------
  51. BOOL
  52. InitializeTaskQueue(
  53. PLSAP_TASK_QUEUE pQueue,
  54. LSAP_TASK_QUEUE_TYPE Type)
  55. {
  56. OBJECT_HANDLE_FLAG_INFORMATION FlagInfo ;
  57. RtlZeroMemory( pQueue, sizeof(LSAP_TASK_QUEUE) );
  58. InitializeListHead(
  59. &pQueue->pTasks
  60. );
  61. pQueue->Type = Type;
  62. pQueue->hSemaphore = CreateSemaphore(NULL, 0, POOL_SEM_LIMIT, NULL);
  63. __try
  64. {
  65. InitializeCriticalSectionAndSpinCount( &pQueue->Lock, 5000 );
  66. }
  67. __except (EXCEPTION_EXECUTE_HANDLER)
  68. {
  69. if ( pQueue->hSemaphore )
  70. {
  71. NtClose( pQueue->hSemaphore );
  72. pQueue->hSemaphore = NULL ;
  73. }
  74. }
  75. if (!pQueue->hSemaphore)
  76. {
  77. DebugLog((DEB_ERROR, "Could not create semaphore, %d\n",
  78. GetLastError() ));
  79. return( FALSE );
  80. }
  81. FlagInfo.Inherit = FALSE ;
  82. FlagInfo.ProtectFromClose = TRUE ;
  83. NtSetInformationObject(
  84. pQueue->hSemaphore,
  85. ObjectHandleFlagInformation,
  86. &FlagInfo,
  87. sizeof( FlagInfo ) );
  88. pQueue->StartSync = CreateEvent( NULL, TRUE, FALSE, NULL );
  89. if ( pQueue->StartSync == NULL )
  90. {
  91. DebugLog(( DEB_ERROR, "Could not create start sync event\n" ));
  92. CloseHandle( pQueue->hSemaphore );
  93. return FALSE ;
  94. }
  95. return( TRUE );
  96. }
  97. //+---------------------------------------------------------------------------
  98. //
  99. // Function: InitializeThreadPool
  100. //
  101. // Synopsis: Initializes necessary data for the thread pool
  102. //
  103. // Arguments: (none)
  104. //
  105. // History: 7-13-93 RichardW Created
  106. //
  107. // Notes:
  108. //
  109. //----------------------------------------------------------------------------
  110. BOOL
  111. InitializeThreadPool(void)
  112. {
  113. if (!InitializeTaskQueue(&GlobalQueue, QueueShared))
  114. {
  115. return( FALSE );
  116. }
  117. LsaIAddTouchAddress( &GlobalQueue, sizeof( GlobalQueue ) );
  118. return(TRUE);
  119. }
  120. //+---------------------------------------------------------------------------
  121. //
  122. // Function: QueueAssociateThread
  123. //
  124. // Synopsis: Associates the thread with the queue
  125. //
  126. // Arguments: [pQueue] --
  127. //
  128. // History: 11-09-95 RichardW Created
  129. //
  130. // Notes:
  131. //
  132. //----------------------------------------------------------------------------
  133. VOID
  134. QueueAssociateThread(
  135. PLSAP_TASK_QUEUE pQueue)
  136. {
  137. PSession pSession;
  138. LockQueue( pQueue );
  139. pQueue->TotalThreads++ ;
  140. #if 0
  141. //
  142. // We were already idle, so decrement this so the later call
  143. // to wait for thread task will leave the count correct.
  144. //
  145. DsysAssert(pQueue->IdleThreads > 0);
  146. pQueue->IdleThreads--;
  147. #endif
  148. //
  149. // Update the statistics:
  150. //
  151. if ( pQueue->MaxThreads < pQueue->TotalThreads )
  152. {
  153. pQueue->MaxThreads = pQueue->TotalThreads ;
  154. }
  155. UnlockQueue( pQueue );
  156. }
  157. //+---------------------------------------------------------------------------
  158. //
  159. // Function: QueueDisassociateThread
  160. //
  161. // Synopsis: Disconnects a thread and a queue
  162. //
  163. // Arguments: [pQueue] --
  164. // [pLastThread] -- OPTIONAL flag indicating last thread of queue
  165. //
  166. // History: 11-09-95 RichardW Created
  167. //
  168. // Notes:
  169. //
  170. //----------------------------------------------------------------------------
  171. BOOL
  172. QueueDisassociateThread(
  173. PLSAP_TASK_QUEUE pQueue,
  174. BOOLEAN * pLastThread)
  175. {
  176. PSession pSession;
  177. LockQueue( pQueue );
  178. DsysAssert(pQueue->TotalThreads > 0);
  179. if ( pQueue->TotalThreads == 1 )
  180. {
  181. if ( pLastThread )
  182. {
  183. *pLastThread = TRUE ;
  184. //
  185. // If the queue is being run down, set the
  186. // event since we are the last thread
  187. //
  188. if ( pQueue->Type == QueueZombie )
  189. {
  190. SetEvent( pQueue->StartSync );
  191. }
  192. }
  193. if ( pQueue->Tasks )
  194. {
  195. //
  196. // Make sure that we never have more tasks queued
  197. // to a zombie
  198. //
  199. DsysAssert( pQueue->Type != QueueZombie );
  200. UnlockQueue( pQueue );
  201. return FALSE ;
  202. }
  203. }
  204. pQueue->TotalThreads--;
  205. UnlockQueue( pQueue );
  206. return TRUE ;
  207. }
  208. //+---------------------------------------------------------------------------
  209. //
  210. // Function: DequeueAnyTask
  211. //
  212. // Synopsis: Returns a task from this queue or any shared, if available
  213. //
  214. // Arguments: [pQueue] --
  215. //
  216. // Requires: pQueue must be locked!
  217. //
  218. // History: 11-09-95 RichardW Created
  219. //
  220. // Notes:
  221. //
  222. //----------------------------------------------------------------------------
  223. PLSAP_THREAD_TASK
  224. DequeueAnyTask(
  225. PLSAP_TASK_QUEUE pQueue)
  226. {
  227. PLSAP_THREAD_TASK pTask;
  228. PLSAP_TASK_QUEUE pShared;
  229. PLSAP_TASK_QUEUE pPrev;
  230. if ( !IsListEmpty(&pQueue->pTasks) )
  231. {
  232. pTask = (PLSAP_THREAD_TASK) RemoveHeadList(&pQueue->pTasks);
  233. pQueue->Tasks --;
  234. pQueue->TaskCounter++;
  235. //
  236. // Reset the pointers. This is required by the recovery logic
  237. // in the Enqueue function below.
  238. //
  239. pTask->Next.Flink = NULL ;
  240. pTask->Next.Blink = NULL ;
  241. return( pTask );
  242. }
  243. //
  244. // No pending on primary queue. Check secondaries:
  245. //
  246. if (pQueue->Type == QueueShared)
  247. {
  248. pShared = pQueue->pShared;
  249. while ( pShared )
  250. {
  251. DWORD WaitStatus;
  252. //
  253. // We need to wait now to change the semaphore count
  254. //
  255. WaitStatus = WaitForSingleObject(pShared->hSemaphore, 0);
  256. LockQueue( pShared );
  257. if ((WaitStatus == WAIT_OBJECT_0) && !IsListEmpty(&pShared->pTasks) )
  258. {
  259. pTask = (PLSAP_THREAD_TASK) RemoveHeadList(&pShared->pTasks);
  260. pShared->Tasks--;
  261. pShared->TaskCounter++;
  262. UnlockQueue( pShared ); // Unlock shared queue
  263. return( pTask );
  264. }
  265. pPrev = pShared;
  266. pShared = pShared->pNext;
  267. UnlockQueue( pPrev );
  268. }
  269. }
  270. return( NULL );
  271. }
  272. //+---------------------------------------------------------------------------
  273. //
  274. // Function: WaitForThreadTask
  275. //
  276. // Synopsis: Function called by queue waiters
  277. //
  278. // Arguments: [pQueue] -- Queue to wait on
  279. // [TimeOut] -- timeout in seconds
  280. //
  281. // History: 11-10-95 RichardW Created
  282. //
  283. // Notes:
  284. //
  285. //----------------------------------------------------------------------------
  286. PLSAP_THREAD_TASK
  287. WaitForThreadTask(
  288. PLSAP_TASK_QUEUE pQueue,
  289. DWORD TimeOut)
  290. {
  291. PLSAP_THREAD_TASK pTask;
  292. PLSAP_TASK_QUEUE pShared;
  293. PLSAP_TASK_QUEUE pPrev;
  294. int WaitResult;
  295. LockQueue( pQueue );
  296. pTask = DequeueAnyTask( pQueue );
  297. if (pTask)
  298. {
  299. UnlockQueue( pQueue );
  300. return( pTask );
  301. }
  302. //
  303. // No pending anywhere.
  304. //
  305. if (TimeOut == 0)
  306. {
  307. UnlockQueue( pQueue );
  308. return( NULL );
  309. }
  310. //
  311. // Principal of the loop: We do this loop so long as we were awakened
  312. // by the semaphore being released. If there was no task to pick up, then
  313. // we go back and wait again. We return NULL only after a timeout, or an
  314. // error.
  315. //
  316. do
  317. {
  318. pQueue->IdleThreads++;
  319. UnlockQueue( pQueue );
  320. WaitResult = WaitForSingleObject( pQueue->hSemaphore, TimeOut );
  321. LockQueue( pQueue );
  322. //
  323. // In between the wait returning and the lock succeeding, another
  324. // thread might have queued up a request.
  325. DsysAssert(pQueue->IdleThreads > 0);
  326. pQueue->IdleThreads--;
  327. //
  328. // In between the wait returning and the lock succeeding, another
  329. // thread might have queued up a request, so don't blindly
  330. // bail out. Check the pending count, and skip over this
  331. // exit path if something is there
  332. //
  333. if ( pQueue->Tasks == 0 )
  334. {
  335. if (WaitResult != WAIT_OBJECT_0)
  336. {
  337. UnlockQueue( pQueue );
  338. if ( WaitResult == -1 )
  339. {
  340. DebugLog((DEB_ERROR, "Error on waiting for semaphore, %d\n", GetLastError()));
  341. }
  342. return( NULL );
  343. }
  344. }
  345. //
  346. // If the queue type is reset to Zombie, then this queue is
  347. // being killed. Return NULL immediately. If we're the last
  348. // thread, set the event so the thread deleting the queue will
  349. // wake up in a timely fashion.
  350. //
  351. if ( pQueue->Type == QueueZombie )
  352. {
  353. return NULL ;
  354. }
  355. pTask = DequeueAnyTask( pQueue );
  356. if (pTask)
  357. {
  358. UnlockQueue( pQueue );
  359. return( pTask );
  360. }
  361. //
  362. // Track number of times we woke up but didn't have anything to do
  363. //
  364. pQueue->MissedTasks ++ ;
  365. } while ( WaitResult != WAIT_TIMEOUT );
  366. UnlockQueue( pQueue );
  367. return( NULL );
  368. }
  369. //+---------------------------------------------------------------------------
  370. //
  371. // Function: SpmPoolThreadBase
  372. //
  373. // Synopsis: New Pool Thread Base
  374. //
  375. // Arguments: [pvQueue] -- OPTIONAL queue to use
  376. //
  377. // History: 11-09-95 RichardW Created
  378. //
  379. // Notes:
  380. //
  381. //----------------------------------------------------------------------------
  382. DWORD
  383. SpmPoolThreadBase(
  384. PVOID pvSession)
  385. {
  386. PLSAP_THREAD_TASK pTask;
  387. PLSAP_TASK_QUEUE pQueue;
  388. PSession pSession;
  389. BOOLEAN ShrinkWS = FALSE;
  390. DWORD dwResult;
  391. DWORD Timeout;
  392. PSession ThreadSession ;
  393. PSession OriginalSession ;
  394. OriginalSession = GetCurrentSession();
  395. if ( pvSession )
  396. {
  397. ThreadSession = (PSession) pvSession ;
  398. SpmpReferenceSession( ThreadSession );
  399. SetCurrentSession( ThreadSession );
  400. }
  401. else
  402. {
  403. ThreadSession = OriginalSession ;
  404. SpmpReferenceSession( ThreadSession );
  405. }
  406. pQueue = ThreadSession->SharedData->pQueue ;
  407. if (!pQueue)
  408. {
  409. pQueue = &GlobalQueue;
  410. }
  411. QueueAssociateThread( pQueue );
  412. //
  413. // Share pool threads have short lifespans. Dedicated single, or
  414. // single read threads have infinite life span.
  415. //
  416. if (pQueue->Type == QueueShared)
  417. {
  418. Timeout = LsaTuningParameters.ThreadLifespan * 1000;
  419. }
  420. else
  421. {
  422. //
  423. // If we are the dedicated thread for this queue, the timeout
  424. // is infinite. If we are a temporary thread, the timeout is
  425. // the subqueue
  426. //
  427. if ( ThreadSession->ThreadId != GetCurrentThreadId() )
  428. {
  429. Timeout = LsaTuningParameters.SubQueueLifespan ;
  430. }
  431. else
  432. {
  433. Timeout = INFINITE ;
  434. }
  435. }
  436. if ( pQueue->StartSync )
  437. {
  438. DebugLog(( DEB_TRACE, "ThreadPool: Signaling start event\n" ));
  439. //
  440. // If a queue was passed in, the caller of CreateXxxQueue is blocked
  441. // waiting for us to strobe the start sync event.
  442. //
  443. SetEvent( pQueue->StartSync );
  444. }
  445. while ( TRUE )
  446. {
  447. pTask = WaitForThreadTask( pQueue, Timeout );
  448. if ( pTask )
  449. {
  450. SetCurrentSession( pTask->pSession );
  451. dwResult = pTask->pFunction(pTask->pvParameter);
  452. #if DBG
  453. RtlCheckForOrphanedCriticalSections( NtCurrentThread() );
  454. #endif
  455. SetCurrentSession( ThreadSession );
  456. //
  457. // The session in this task was referenced during the AssignThread call.
  458. // This dereference cleans that up.
  459. //
  460. SpmpDereferenceSession(pTask->pSession);
  461. LsapFreePrivateHeap( pTask );
  462. }
  463. else
  464. {
  465. //
  466. // We can never leave the queue empty of threads if
  467. // there are still tasks pending. QueueDisassociateThread
  468. // will fail if there are tasks pending. In that case,
  469. // skip up to the top of the loop again.
  470. //
  471. if ( !QueueDisassociateThread( pQueue, &ShrinkWS ) )
  472. {
  473. continue;
  474. }
  475. //
  476. // Now that we are not part of that queue, reset our thread session
  477. //
  478. SpmpDereferenceSession( ThreadSession );
  479. SetCurrentSession( OriginalSession );
  480. if ( LsaTuningParameters.Options & TUNE_TRIM_WORKING_SET )
  481. {
  482. if (ShrinkWS)
  483. {
  484. LsaTuningParameters.ShrinkOn = TRUE ;
  485. LsaTuningParameters.ShrinkCount++;
  486. SetProcessWorkingSetSize( GetCurrentProcess(),
  487. (SIZE_T)(-1),
  488. (SIZE_T)(-1) );
  489. }
  490. else
  491. {
  492. LsaTuningParameters.ShrinkSkip++;
  493. }
  494. }
  495. DebugLog(( DEB_TRACE,
  496. "No tasks pending on queue %x, thread exiting\n",
  497. pQueue ));
  498. return( 0 );
  499. }
  500. }
  501. return( 0 );
  502. }
  503. //+---------------------------------------------------------------------------
  504. //
  505. // Function: EnqueueThreadTask
  506. //
  507. // Synopsis: Enqueue a task, update counts, etc.
  508. //
  509. // Arguments: [pTask] -- Task to add
  510. //
  511. // History: 7-13-93 RichardW Created
  512. //
  513. // Notes:
  514. //
  515. //----------------------------------------------------------------------------
  516. LSAP_TASK_STATUS
  517. EnqueueThreadTask(
  518. PLSAP_TASK_QUEUE pQueue,
  519. PLSAP_THREAD_TASK pTask,
  520. BOOLEAN fUrgent)
  521. {
  522. BOOLEAN NeedMoreThreads = FALSE;
  523. HANDLE hQueueSem ;
  524. HANDLE hParentSem = NULL ;
  525. PLSAP_TASK_QUEUE pThreadQueue = NULL;
  526. PSession pSession = NULL;
  527. LockQueue(pQueue);
  528. if ( pQueue->Type == QueueZombie )
  529. {
  530. UnlockQueue( pQueue );
  531. return TaskNotQueued ;
  532. }
  533. if (fUrgent)
  534. {
  535. InsertHeadList(
  536. &pQueue->pTasks,
  537. &pTask->Next
  538. );
  539. }
  540. else
  541. {
  542. InsertTailList(
  543. &pQueue->pTasks,
  544. &pTask->Next
  545. );
  546. }
  547. pQueue->Tasks++;
  548. pQueue->QueuedCounter++;
  549. if ( pQueue->Tasks > pQueue->TaskHighWater )
  550. {
  551. pQueue->TaskHighWater = pQueue->Tasks ;
  552. }
  553. if (pQueue->Type == QueueShared)
  554. {
  555. if ((pQueue->Tasks > pQueue->IdleThreads) &&
  556. (pQueue->TotalThreads < MAX_POOL_THREADS_HARD))
  557. {
  558. NeedMoreThreads = TRUE;
  559. pThreadQueue = pQueue;
  560. }
  561. hParentSem = NULL ;
  562. }
  563. else if (pQueue->Type == QueueShareRead )
  564. {
  565. DsysAssert( pQueue->pOriginal );
  566. //
  567. // Here's the race potential. If the queue we have has no idle thread,
  568. // then make sure there is an idle thread at the parent queue, otherwise
  569. // we can deadlock (e.g. this call is in response to the job being executed
  570. // by the dedicated thread. Of course, this can also be a problem correctly
  571. // determining the parent queue's status, since locks should always flow down,
  572. // not up. So, if the number of jobs that we have pending exceeds the number
  573. // of idle threads, *always*, regardless of the other queue's real state or
  574. // total threads, queue up another thread.
  575. //
  576. if ( pQueue->Tasks > pQueue->IdleThreads )
  577. {
  578. NeedMoreThreads = TRUE ;
  579. pSession = pTask->pSession ;
  580. if ( pQueue->TotalThreads < MAX_SUBQUEUE_THREADS )
  581. {
  582. pThreadQueue = pQueue ;
  583. }
  584. else
  585. {
  586. pThreadQueue = pQueue->pOriginal;
  587. }
  588. }
  589. //
  590. // This is a safe read. The semaphore is not subject to change after creation,
  591. // and the worst that can happen is a bad handle.
  592. //
  593. hParentSem = pQueue->pOriginal->hSemaphore ;
  594. }
  595. hQueueSem = pQueue->hSemaphore ;
  596. UnlockQueue( pQueue );
  597. //
  598. // Kick our semaphore.
  599. //
  600. ReleaseSemaphore( hQueueSem, 1, NULL );
  601. //
  602. // Kick the parent semaphore
  603. //
  604. if ( hParentSem )
  605. {
  606. ReleaseSemaphore( hParentSem, 1, NULL );
  607. }
  608. if (NeedMoreThreads)
  609. {
  610. HANDLE hThread;
  611. DWORD tid;
  612. DebugLog((DEB_TRACE_QUEUE, "Queue %x needs more threads\n", pQueue));
  613. //
  614. // Increment the number of threads now so we don't create more threads
  615. // while we wait for the first one to be created.
  616. //
  617. //// LockQueue(pThreadQueue);
  618. #if 0
  619. pThreadQueue->TotalThreads++;
  620. pThreadQueue->IdleThreads++;
  621. //
  622. // Update the statistics:
  623. //
  624. if ( pThreadQueue->MaxThreads < (LONG) pThreadQueue->TotalThreads )
  625. {
  626. pThreadQueue->MaxThreads = (LONG) pThreadQueue->TotalThreads ;
  627. }
  628. #endif
  629. //// pThreadQueue->ReqThread++;
  630. //// UnlockQueue(pThreadQueue);
  631. InterlockedIncrement( &pThreadQueue->ReqThread );
  632. //
  633. // If the queue is a dedicated queue, supply the session from the task.
  634. // if the queue is a shared (global) queue, pass in NULL:
  635. //
  636. hThread = LsapCreateThread( NULL, 0,
  637. SpmPoolThreadBase,
  638. (pThreadQueue->Type == QueueShareRead ?
  639. pThreadQueue->OwnerSession : NULL ),
  640. 0, &tid);
  641. //
  642. // Check for failure
  643. //
  644. if (hThread == NULL)
  645. {
  646. #if 0
  647. LockQueue(pThreadQueue);
  648. DsysAssert(pThreadQueue->TotalThreads > 0);
  649. DsysAssert(pThreadQueue->IdleThreads > 0);
  650. pThreadQueue->TotalThreads--;
  651. pThreadQueue->IdleThreads--;
  652. UnlockQueue(pThreadQueue);
  653. #endif
  654. //
  655. // This is extremely painful. The thread creation attempt
  656. // failed, but because of the nature of the queue, we don't
  657. // know if it was picked up and executed, or it was dropped,
  658. // or anything about it.
  659. //
  660. return TaskUnknown ;
  661. }
  662. else
  663. {
  664. NtClose(hThread);
  665. }
  666. }
  667. return TaskQueued ;
  668. }
  669. //+---------------------------------------------------------------------------
  670. //
  671. // Function: SpmAssignThread
  672. //
  673. // Synopsis: Assigns a task to a thread pool thread.
  674. //
  675. // Arguments: [pFunction] -- Function to execute
  676. // [pvParameter] -- Parameter to function
  677. // [pSession] -- Session to execute as
  678. //
  679. // History: 11-24-93 RichardW Created
  680. //
  681. // Notes:
  682. //
  683. //----------------------------------------------------------------------------
  684. PVOID
  685. LsapAssignThread(
  686. LPTHREAD_START_ROUTINE pFunction,
  687. PVOID pvParameter,
  688. PSession pSession,
  689. BOOLEAN fUrgent)
  690. {
  691. PLSAP_THREAD_TASK pTask;
  692. PLSAP_TASK_QUEUE pQueue;
  693. LSAP_TASK_STATUS TaskStatus ;
  694. pTask = (PLSAP_THREAD_TASK) LsapAllocatePrivateHeap( sizeof( LSAP_THREAD_TASK ) );
  695. if (!pTask)
  696. {
  697. return( NULL );
  698. }
  699. pTask->pFunction = pFunction;
  700. pTask->pvParameter = pvParameter;
  701. pTask->pSession = pSession;
  702. if ( pSession->SharedData->pQueue )
  703. {
  704. LockSession(pSession);
  705. if( pSession->SharedData->pQueue )
  706. {
  707. pQueue = pSession->SharedData->pQueue;
  708. } else
  709. {
  710. pQueue = &GlobalQueue;
  711. }
  712. UnlockSession(pSession);
  713. }
  714. else
  715. {
  716. pQueue = &GlobalQueue;
  717. }
  718. LsaTuningParameters.ShrinkOn = FALSE;
  719. //
  720. // Reference the session so that it will never go away while a thread
  721. // is working on this task. The worker function will deref the session.
  722. //
  723. SpmpReferenceSession( pSession );
  724. TaskStatus = EnqueueThreadTask( pQueue,
  725. pTask,
  726. fUrgent );
  727. if ( ( TaskStatus == TaskQueued ) ||
  728. ( TaskStatus == TaskUnknown ) )
  729. {
  730. return pTask ;
  731. }
  732. if ( TaskStatus == TaskNotQueued )
  733. {
  734. //
  735. // Failed, therefore deref this session.
  736. //
  737. SpmpDereferenceSession( pSession );
  738. LsapFreePrivateHeap( pTask );
  739. }
  740. return NULL ;
  741. }
  742. //+---------------------------------------------------------------------------
  743. //
  744. // Function: CreateSubordinateQueue
  745. //
  746. // Synopsis: Create a Queue hanging off an original queue.
  747. //
  748. // Arguments: [pQueue] --
  749. // [pOriginalQueue] --
  750. //
  751. // History: 11-17-95 RichardW Created
  752. //
  753. // Notes:
  754. //
  755. //----------------------------------------------------------------------------
  756. BOOL
  757. CreateSubordinateQueue(
  758. PSession pSession,
  759. PLSAP_TASK_QUEUE pOriginalQueue)
  760. {
  761. HANDLE hThread;
  762. DWORD tid;
  763. PLSAP_TASK_QUEUE pQueue ;
  764. pQueue = (LSAP_TASK_QUEUE *) LsapAllocatePrivateHeap( sizeof( LSAP_TASK_QUEUE ) );
  765. if ( !pQueue )
  766. {
  767. return FALSE ;
  768. }
  769. DebugLog(( DEB_TRACE_QUEUE, "Creating sub queue %x\n", pQueue ));
  770. if (InitializeTaskQueue( pQueue, QueueShareRead ))
  771. {
  772. LockQueue( pQueue );
  773. LockQueue( pOriginalQueue );
  774. pQueue->pNext = pOriginalQueue->pShared;
  775. pOriginalQueue->pShared = pQueue;
  776. pQueue->pOriginal = pOriginalQueue;
  777. pQueue->OwnerSession = pSession ;
  778. #if 0
  779. pQueue->TotalThreads++;
  780. pQueue->IdleThreads++;
  781. #endif
  782. UnlockQueue( pOriginalQueue );
  783. UnlockQueue( pQueue );
  784. pSession->SharedData->pQueue = pQueue ;
  785. hThread = LsapCreateThread( NULL, 0,
  786. SpmPoolThreadBase, pSession,
  787. 0, &pSession->ThreadId );
  788. if (hThread != NULL)
  789. {
  790. NtClose( hThread );
  791. //
  792. // Wait for the thread to signal the event, so that
  793. // we know it's ready
  794. //
  795. WaitForSingleObject( pQueue->StartSync, INFINITE );
  796. NtClose( pQueue->StartSync );
  797. pQueue->StartSync = NULL ;
  798. return( TRUE );
  799. }
  800. else
  801. {
  802. RtlDeleteCriticalSection( &pQueue->Lock );
  803. LsapFreePrivateHeap( pQueue );
  804. pQueue = NULL ;
  805. }
  806. }
  807. if ( pQueue )
  808. {
  809. LsapFreePrivateHeap( pQueue );
  810. }
  811. return( FALSE );
  812. }
  813. //+---------------------------------------------------------------------------
  814. //
  815. // Function: DeleteSubordinateQueue
  816. //
  817. // Synopsis:
  818. //
  819. // Effects:
  820. //
  821. // Arguments: [pQueue] --
  822. //
  823. // Requires:
  824. //
  825. // Returns:
  826. //
  827. // Signals:
  828. //
  829. // Modifies:
  830. //
  831. // Algorithm:
  832. //
  833. // History: 8-05-96 RichardW Created
  834. //
  835. // Notes:
  836. //
  837. //----------------------------------------------------------------------------
  838. BOOL
  839. DeleteSubordinateQueue(
  840. PLSAP_TASK_QUEUE pQueue,
  841. ULONG Flags)
  842. {
  843. PLSAP_TASK_QUEUE pOriginal;
  844. PLSAP_TASK_QUEUE pScan;
  845. PLSAP_TASK_QUEUE pPrev ;
  846. PLSAP_THREAD_TASK pTask ;
  847. DWORD dwResult ;
  848. PLIST_ENTRY List ;
  849. PSession ThreadSession = GetCurrentSession();
  850. OBJECT_HANDLE_FLAG_INFORMATION FlagInfo ;
  851. //
  852. // Lock it
  853. //
  854. DebugLog(( DEB_TRACE, "Deleting queue %x\n", pQueue ));
  855. LockQueue( pQueue );
  856. if ( pQueue->pShared )
  857. {
  858. pOriginal = pQueue->pOriginal ;
  859. LockQueue( pOriginal );
  860. //
  861. // Unlink Queue from parent:
  862. //
  863. if ( pOriginal->pShared != pQueue )
  864. {
  865. pScan = pOriginal->pShared;
  866. LockQueue( pScan );
  867. while ( pScan->pNext && (pScan->pNext != pQueue) )
  868. {
  869. pPrev = pScan ;
  870. pScan = pScan->pNext;
  871. UnlockQueue( pPrev );
  872. }
  873. if ( pScan->pNext )
  874. {
  875. pScan->pNext = pQueue->pNext ;
  876. }
  877. UnlockQueue( pScan );
  878. }
  879. else
  880. {
  881. pOriginal->pShared = pQueue->pNext;
  882. }
  883. pQueue->pNext = NULL ;
  884. //
  885. // Done with parent
  886. //
  887. UnlockQueue( pOriginal );
  888. }
  889. //
  890. // Drain queue by removing all the tasks.
  891. //
  892. while ( !IsListEmpty( &pQueue->pTasks ) )
  893. {
  894. List = RemoveHeadList( &pQueue->pTasks );
  895. pQueue->Tasks-- ;
  896. pTask = CONTAINING_RECORD( List, LSAP_THREAD_TASK, Next );
  897. //
  898. // A synchronous drain will have this thread execute
  899. // all remaining tasks.
  900. //
  901. if ( Flags & DELETEQ_SYNC_DRAIN )
  902. {
  903. SpmpReferenceSession(pTask->pSession);
  904. SetCurrentSession( pTask->pSession );
  905. dwResult = pTask->pFunction(pTask->pvParameter);
  906. SetCurrentSession( ThreadSession );
  907. SpmpDereferenceSession(pTask->pSession);
  908. LsapFreePrivateHeap( pTask );
  909. }
  910. else
  911. {
  912. //
  913. // Otherwise, send them to the global queue to be
  914. // executed by other threads
  915. //
  916. EnqueueThreadTask(
  917. &GlobalQueue,
  918. pTask,
  919. FALSE );
  920. }
  921. }
  922. //
  923. // Now, kill off all the threads
  924. //
  925. pQueue->Type = QueueZombie ;
  926. //
  927. // We might be executing on our own worker thread. If there are
  928. // more than one thread associated with this queue, we also
  929. // need to do the sync.
  930. //
  931. if ( ( pQueue->OwnerSession != ThreadSession ) ||
  932. ( pQueue->TotalThreads > 1 ) )
  933. {
  934. //
  935. // We are not a worker thread. Sync with the other
  936. // threads to clean up:
  937. //
  938. pQueue->StartSync = CreateEvent( NULL, FALSE, FALSE, NULL );
  939. //
  940. // Kick the semaphore for all the threads that need it
  941. // (all of them if we aren't a worker thread, or n-1 if
  942. // we are:
  943. //
  944. ReleaseSemaphore(
  945. pQueue->hSemaphore,
  946. (pQueue->OwnerSession == ThreadSession ?
  947. pQueue->TotalThreads - 1 :
  948. pQueue->TotalThreads),
  949. NULL );
  950. UnlockQueue( pQueue );
  951. //
  952. // if we failed to create an event, then we may cause an invalid handle
  953. // problem in the client threads. Sleep a little to let the other threads
  954. // go (hopefully), then close the semaphore and let the error handling
  955. // in the threads deal with it.
  956. //
  957. if ( pQueue->StartSync )
  958. {
  959. WaitForSingleObjectEx( pQueue->StartSync, INFINITE, FALSE );
  960. //
  961. // Synchronize with the last thread to own the queue:
  962. //
  963. LockQueue( pQueue );
  964. CloseHandle( pQueue->StartSync );
  965. pQueue->StartSync = NULL ;
  966. }
  967. else
  968. {
  969. //
  970. // kludge up a retry loop:
  971. //
  972. int i = 50 ;
  973. while ( i && pQueue->TotalThreads )
  974. {
  975. Sleep( 100 );
  976. i-- ;
  977. }
  978. //
  979. // If they're still there, forget it. Return FALSE. Leak.
  980. //
  981. if ( pQueue->TotalThreads )
  982. {
  983. return FALSE ;
  984. }
  985. }
  986. }
  987. //
  988. // At this point, we close the queue down:
  989. //
  990. FlagInfo.Inherit = FALSE ;
  991. FlagInfo.ProtectFromClose = FALSE ;
  992. NtSetInformationObject(
  993. pQueue->hSemaphore,
  994. ObjectHandleFlagInformation,
  995. &FlagInfo,
  996. sizeof( FlagInfo ) );
  997. CloseHandle( pQueue->hSemaphore );
  998. UnlockQueue( pQueue );
  999. RtlDeleteCriticalSection( &pQueue->Lock );
  1000. LsapFreePrivateHeap( pQueue );
  1001. return( TRUE );
  1002. }