Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1714 lines
50 KiB

  1. /*++
  2. Copyright (c) 1989 Microsoft Corporation
  3. Module Name:
  4. rxworkq.c
  5. Abstract:
  6. This module implements the Work queue routines for the Rx File system.
  7. Author:
  8. JoeLinn [JoeLinn] 8-8-94 Initial Implementation
  9. Balan Sethu Raman [SethuR] 22-11-95 Implemented dispatch support for mini rdrs
  10. Balan Sethu Raman [SethuR] 20-03-96 Delinked from executive worker threads
  11. Notes:
  12. There are two kinds of support for asynchronous resumption provided in the RDBSS.
  13. The I/O requests that cannot be completed in the context of the thread in which
  14. the request was made are posted to a file system process for completion.
  15. The mini redirectors also require support for asynchronously completing requests as
  16. well as post requests that cannot be completed at DPC level.
  17. The requests for posting from the mini redirectors are classified into Critical(blocking and
  18. non blocking requests. In order to ensure progress these requests are handled through
  19. separate resources. There is no well known mechanism to ensure that the hyper critical
  20. requests will not block.
  21. The two functions that are available to all mini redirector writers are
  22. RxDispatchToWorkerThread
  23. RxPostToWorkerThread.
  24. These two routines enable the mini redirector writer to make the appropriate space
  25. time tradeoffs. The RxDispatchToWorkerThread trades time for reduction in space by
  26. dynamically allocating the work item as and when required, while RxPostToWorkerThread
  27. trades space for time by pre allocating a work item.
  28. --*/
  29. #include "precomp.h"
  30. #pragma hdrstop
  31. #ifdef ALLOC_PRAGMA
  32. #pragma alloc_text(PAGE, RxInitializeDispatcher)
  33. #pragma alloc_text(PAGE, RxInitializeMRxDispatcher)
  34. #pragma alloc_text(PAGE, RxSpinDownMRxDispatcher)
  35. #pragma alloc_text(PAGE, RxInitializeWorkQueueDispatcher)
  36. #pragma alloc_text(PAGE, RxInitializeWorkQueue)
  37. #pragma alloc_text(PAGE, RxTearDownDispatcher)
  38. #pragma alloc_text(PAGE, RxTearDownWorkQueueDispatcher)
  39. #pragma alloc_text(PAGE, RxpSpinUpWorkerThreads)
  40. #pragma alloc_text(PAGE, RxBootstrapWorkerThreadDispatcher)
  41. #pragma alloc_text(PAGE, RxWorkerThreadDispatcher)
  42. #endif
  43. //
  44. // The local debug trace level
  45. //
  46. #define Dbg (DEBUG_TRACE_FSP_DISPATCHER)
  47. //
  48. // had to steal this from ntifs.h in order to use ntsrv.h
  49. extern POBJECT_TYPE *PsThreadType;
  50. //
  51. // Prototype forward declarations.
  52. //
  53. extern NTSTATUS
  54. RxInitializeWorkQueueDispatcher(
  55. PRX_WORK_QUEUE_DISPATCHER pDispatcher);
  56. extern
  57. VOID
  58. RxInitializeWorkQueue(
  59. PRX_WORK_QUEUE pWorkQueue,
  60. WORK_QUEUE_TYPE WorkQueueType,
  61. ULONG MaximumNumberOfWorkerThreads,
  62. ULONG MinimumNumberOfWorkerThreads);
  63. extern VOID
  64. RxTearDownWorkQueueDispatcher(
  65. PRX_WORK_QUEUE_DISPATCHER pDispatcher);
  66. extern VOID
  67. RxTearDownWorkQueue(
  68. PRX_WORK_QUEUE pWorkQueue);
  69. extern NTSTATUS
  70. RxSpinUpWorkerThread(
  71. PRX_WORK_QUEUE pWorkQueue,
  72. PRX_WORKERTHREAD_ROUTINE Routine,
  73. PVOID Parameter);
  74. extern VOID
  75. RxSpinUpWorkerThreads(
  76. PRX_WORK_QUEUE pWorkQueue);
  77. extern VOID
  78. RxSpinDownWorkerThreads(
  79. PRX_WORK_QUEUE pWorkQueue);
  80. extern VOID
  81. RxpWorkerThreadDispatcher(
  82. IN PRX_WORK_QUEUE pWorkQueue,
  83. IN PLARGE_INTEGER pWaitInterval);
  84. VOID
  85. RxBootstrapWorkerThreadDispatcher(
  86. IN PRX_WORK_QUEUE pWorkQueue);
  87. VOID
  88. RxWorkerThreadDispatcher(
  89. IN PRX_WORK_QUEUE pWorkQueue);
  90. extern VOID
  91. RxWorkItemDispatcher(
  92. PVOID pContext);
  93. extern VOID
  94. RxSpinUpRequestsDispatcher(
  95. PRX_DISPATCHER pDispatcher);
  96. // The spin up requests thread
  97. PETHREAD RxSpinUpRequestsThread = NULL;
  98. //
  99. // The delay parameter for KQUEUE wait requests in the RX_WORK_QUEUE implemenation
  100. //
  101. LARGE_INTEGER RxWorkQueueWaitInterval[MaximumWorkQueue];
  102. LARGE_INTEGER RxSpinUpDispatcherWaitInterval;
  103. //
  104. // Currently the levels correspond to the three levels defined in ex.h
  105. // Delayed,Critical and HyperCritical. As regards mini redirectors if any work
  106. // is not dependent on any mini redirector/RDBSS resource, i.e., it will not wait
  107. // it can be classified as a hypercritical1 work item. There is no good way to
  108. // enforce this, therefore one should exercise great caution before classifying
  109. // something as hypercritical.
  110. //
  111. NTSTATUS
  112. RxInitializeDispatcher()
  113. /*++
  114. Routine Description:
  115. This routine initializes the work queues dispatcher
  116. Return Value:
  117. STATUS_SUCCESS -- successful
  118. other status codes indicate failure to initialize
  119. Notes:
  120. The dispatching mechanism is implemented as a two tiered approach. Each
  121. processor in the system is associated with a set of work queues. A best
  122. effort is made to schedule all work emanating from a processor onto the
  123. same processor. This prevents excessive sloshing of state information from
  124. one processor cache to another.
  125. For a given processor there are three work queues corresponding to three
  126. levels of classification -- Delayed Work items, Critical Work items and
  127. Hyper Critical work items. Each of these levels is associated with a
  128. Kernel Queue (KQUEUE). The number of threads associated with each of these
  129. queues can be independently controlled.
  130. Currently the tuning parameters for the dispatcher are all hard coded. A
  131. mechanism to intialize them from the registry needs to be implemented.
  132. The following parameters associated with the dispatcher can be tuned ...
  133. 1) the wait time intervals associated with the kernel queue for each level.
  134. 2) the minimum and amximum number of worker threads associated with each
  135. level.
  136. --*/
  137. {
  138. ULONG ProcessorIndex,NumberOfProcessors;
  139. NTSTATUS Status;
  140. PAGED_CODE();
  141. // Currently we set the number of processors to 1. In future the
  142. // dispatcher can be tailored to multi processor implementation
  143. // by appropriately initializing it as follows
  144. // NumberOfProcessors = KeNumberProcessors;
  145. NumberOfProcessors = 1;
  146. RxFileSystemDeviceObject->DispatcherContext.NumberOfWorkerThreads = 0;
  147. RxFileSystemDeviceObject->DispatcherContext.pTearDownEvent = NULL;
  148. // Currently the default values for the wait intervals are set as
  149. // 10 seconds ( expressed in system time units of 100 ns ticks ).
  150. RxWorkQueueWaitInterval[DelayedWorkQueue].QuadPart = -10 * TICKS_PER_SECOND;
  151. RxWorkQueueWaitInterval[CriticalWorkQueue].QuadPart = -10 * TICKS_PER_SECOND;
  152. RxWorkQueueWaitInterval[HyperCriticalWorkQueue].QuadPart = -10 * TICKS_PER_SECOND;
  153. RxSpinUpDispatcherWaitInterval.QuadPart = -60 * TICKS_PER_SECOND;
  154. RxDispatcher.NumberOfProcessors = NumberOfProcessors;
  155. RxDispatcher.OwnerProcess = IoGetCurrentProcess();
  156. RxDispatcher.pWorkQueueDispatcher = &RxDispatcherWorkQueues;
  157. if (RxDispatcher.pWorkQueueDispatcher != NULL) {
  158. for (
  159. ProcessorIndex = 0;
  160. ProcessorIndex < NumberOfProcessors;
  161. ProcessorIndex++
  162. ) {
  163. Status = RxInitializeWorkQueueDispatcher(
  164. &RxDispatcher.pWorkQueueDispatcher[ProcessorIndex]);
  165. if (Status != STATUS_SUCCESS) {
  166. break;
  167. }
  168. }
  169. if (Status == STATUS_SUCCESS) {
  170. Status = RxInitializeMRxDispatcher(RxFileSystemDeviceObject);
  171. }
  172. } else {
  173. Status = STATUS_INSUFFICIENT_RESOURCES;
  174. }
  175. if (Status == STATUS_SUCCESS) {
  176. HANDLE ThreadHandle;
  177. KeInitializeEvent(
  178. &RxDispatcher.SpinUpRequestsEvent,
  179. NotificationEvent,
  180. FALSE);
  181. KeInitializeEvent(
  182. &RxDispatcher.SpinUpRequestsTearDownEvent,
  183. NotificationEvent,
  184. FALSE);
  185. InitializeListHead(
  186. &RxDispatcher.SpinUpRequests);
  187. RxDispatcher.State = RxDispatcherActive;
  188. KeInitializeSpinLock(&RxDispatcher.SpinUpRequestsLock);
  189. Status = PsCreateSystemThread(
  190. &ThreadHandle,
  191. PROCESS_ALL_ACCESS,
  192. NULL,
  193. NULL,
  194. NULL,
  195. RxSpinUpRequestsDispatcher,
  196. &RxDispatcher);
  197. if (NT_SUCCESS(Status)) {
  198. // Close the handle so the thread can die when needed
  199. ZwClose(ThreadHandle);
  200. }
  201. }
  202. return Status;
  203. }
  204. NTSTATUS
  205. RxInitializeMRxDispatcher(PRDBSS_DEVICE_OBJECT pMRxDeviceObject)
  206. /*++
  207. Routine Description:
  208. This routine initializes the dispatcher context for a mini rdr
  209. Return Value:
  210. STATUS_SUCCESS -- successful
  211. other status codes indicate failure to initialize
  212. Notes:
  213. --*/
  214. {
  215. PAGED_CODE();
  216. pMRxDeviceObject->DispatcherContext.NumberOfWorkerThreads = 0;
  217. pMRxDeviceObject->DispatcherContext.pTearDownEvent = NULL;
  218. return STATUS_SUCCESS;
  219. }
  220. NTSTATUS
  221. RxSpinDownMRxDispatcher(PRDBSS_DEVICE_OBJECT pMRxDeviceObject)
  222. /*++
  223. Routine Description:
  224. This routine tears down the dispatcher context for a mini rdr
  225. Return Value:
  226. STATUS_SUCCESS -- successful
  227. other status codes indicate failure to initialize
  228. Notes:
  229. --*/
  230. {
  231. LONG FinalRefCount;
  232. KEVENT TearDownEvent;
  233. KIRQL SavedIrql;
  234. PAGED_CODE();
  235. KeInitializeEvent(
  236. &TearDownEvent,
  237. NotificationEvent,
  238. FALSE);
  239. InterlockedIncrement(&pMRxDeviceObject->DispatcherContext.NumberOfWorkerThreads);
  240. pMRxDeviceObject->DispatcherContext.pTearDownEvent = &TearDownEvent;
  241. FinalRefCount = InterlockedDecrement(&pMRxDeviceObject->DispatcherContext.NumberOfWorkerThreads);
  242. if (FinalRefCount > 0) {
  243. KeWaitForSingleObject(
  244. &TearDownEvent,
  245. Executive,
  246. KernelMode,
  247. FALSE,
  248. NULL);
  249. } else {
  250. InterlockedExchangePointer(
  251. &pMRxDeviceObject->DispatcherContext.pTearDownEvent,
  252. NULL);
  253. }
  254. ASSERT(pMRxDeviceObject->DispatcherContext.pTearDownEvent == NULL);
  255. return STATUS_SUCCESS;
  256. }
  257. NTSTATUS
  258. RxInitializeWorkQueueDispatcher(
  259. PRX_WORK_QUEUE_DISPATCHER pDispatcher)
  260. /*++
  261. Routine Description:
  262. This routine initializes the work queue dispatcher for a particular processor
  263. Arguments:
  264. pDispatcher - Work Queue Dispatcher
  265. Return Value:
  266. STATUS_SUCCESS -- successful
  267. other status codes indicate failure to initialize
  268. Notes:
  269. For each of the work queues associated with a processor the minimum number of
  270. worker threads and maximum number of worker threads can be independently
  271. specified and tuned.
  272. The two factors that influence these decision are (1) the cost of spinnning up/
  273. spinning down worker threads and (2) the amount of resources consumed by an
  274. idle worker thread.
  275. Currently these numbers are hard coded, a desirable extension would be a mechanism
  276. to initialize them from a registry setting. This will enable us to tune the
  277. parameters easily. This has to be implemented.
  278. --*/
  279. {
  280. NTSTATUS Status;
  281. MM_SYSTEMSIZE SystemSize;
  282. ULONG NumberOfCriticalWorkerThreads;
  283. PAGED_CODE();
  284. SystemSize = MmQuerySystemSize();
  285. RxInitializeWorkQueue(
  286. &pDispatcher->WorkQueue[DelayedWorkQueue],DelayedWorkQueue,2,1);
  287. if (SystemSize == MmLargeSystem) {
  288. NumberOfCriticalWorkerThreads = 10;
  289. } else {
  290. NumberOfCriticalWorkerThreads = 5;
  291. }
  292. RxInitializeWorkQueue(
  293. &pDispatcher->WorkQueue[CriticalWorkQueue],
  294. CriticalWorkQueue,
  295. NumberOfCriticalWorkerThreads,
  296. 1);
  297. RxInitializeWorkQueue(
  298. &pDispatcher->WorkQueue[HyperCriticalWorkQueue],
  299. HyperCriticalWorkQueue,
  300. 5,
  301. 1);
  302. Status = RxSpinUpWorkerThread(
  303. &pDispatcher->WorkQueue[HyperCriticalWorkQueue],
  304. RxBootstrapWorkerThreadDispatcher,
  305. &pDispatcher->WorkQueue[HyperCriticalWorkQueue]);
  306. if (Status == STATUS_SUCCESS) {
  307. Status = RxSpinUpWorkerThread(
  308. &pDispatcher->WorkQueue[CriticalWorkQueue],
  309. RxBootstrapWorkerThreadDispatcher,
  310. &pDispatcher->WorkQueue[CriticalWorkQueue]);
  311. }
  312. if (Status == STATUS_SUCCESS) {
  313. Status = RxSpinUpWorkerThread(
  314. &pDispatcher->WorkQueue[DelayedWorkQueue],
  315. RxBootstrapWorkerThreadDispatcher,
  316. &pDispatcher->WorkQueue[DelayedWorkQueue]);
  317. }
  318. return Status;
  319. }
  320. VOID
  321. RxInitializeWorkQueue(
  322. PRX_WORK_QUEUE pWorkQueue,
  323. WORK_QUEUE_TYPE WorkQueueType,
  324. ULONG MaximumNumberOfWorkerThreads,
  325. ULONG MinimumNumberOfWorkerThreads)
  326. /*++
  327. Routine Description:
  328. This routine initializes a work queue
  329. Arguments:
  330. pWorkQueue - Work Queue Dispatcher
  331. MaximumNumberOfWorkerThreads - the upper bound on worker threads
  332. MinimumNumberOfWorkerThreads - the lower bound on the threads.
  333. --*/
  334. {
  335. PAGED_CODE();
  336. pWorkQueue->Type = (UCHAR)WorkQueueType;
  337. pWorkQueue->State = RxWorkQueueActive;
  338. pWorkQueue->SpinUpRequestPending = FALSE;
  339. pWorkQueue->pRundownContext = NULL;
  340. pWorkQueue->NumberOfWorkItemsDispatched = 0;
  341. pWorkQueue->NumberOfWorkItemsToBeDispatched = 0;
  342. pWorkQueue->CumulativeQueueLength = 0;
  343. pWorkQueue->NumberOfSpinUpRequests = 0;
  344. pWorkQueue->MaximumNumberOfWorkerThreads = MaximumNumberOfWorkerThreads;
  345. pWorkQueue->MinimumNumberOfWorkerThreads = MinimumNumberOfWorkerThreads;
  346. pWorkQueue->NumberOfActiveWorkerThreads = 0;
  347. pWorkQueue->NumberOfIdleWorkerThreads = 0;
  348. pWorkQueue->NumberOfFailedSpinUpRequests = 0;
  349. pWorkQueue->WorkQueueItemForSpinUpWorkerThreadInUse = 0;
  350. ExInitializeWorkItem(&pWorkQueue->WorkQueueItemForTearDownWorkQueue,NULL,NULL);
  351. ExInitializeWorkItem(&pWorkQueue->WorkQueueItemForSpinUpWorkerThread,NULL,NULL);
  352. ExInitializeWorkItem(&pWorkQueue->WorkQueueItemForSpinDownWorkerThread,NULL,NULL);
  353. pWorkQueue->WorkQueueItemForSpinDownWorkerThread.pDeviceObject = NULL;
  354. pWorkQueue->WorkQueueItemForSpinUpWorkerThread.pDeviceObject = NULL;
  355. pWorkQueue->WorkQueueItemForTearDownWorkQueue.pDeviceObject = NULL;
  356. KeInitializeQueue(&pWorkQueue->Queue,MaximumNumberOfWorkerThreads);
  357. KeInitializeSpinLock(&pWorkQueue->SpinLock);
  358. }
  359. NTSTATUS
  360. RxTearDownDispatcher()
  361. /*++
  362. Routine Description:
  363. This routine tears down the dispatcher
  364. Return Value:
  365. STATUS_SUCCESS -- successful
  366. other status codes indicate failure to initialize
  367. --*/
  368. {
  369. LONG ProcessorIndex;
  370. NTSTATUS Status;
  371. PAGED_CODE();
  372. if (RxDispatcher.pWorkQueueDispatcher != NULL) {
  373. RxDispatcher.State = RxDispatcherInactive;
  374. KeSetEvent(
  375. &RxDispatcher.SpinUpRequestsEvent,
  376. IO_NO_INCREMENT,
  377. FALSE);
  378. KeWaitForSingleObject(
  379. &RxDispatcher.SpinUpRequestsTearDownEvent,
  380. Executive,
  381. KernelMode,
  382. FALSE,
  383. NULL);
  384. if (RxSpinUpRequestsThread != NULL) {
  385. if (!PsIsThreadTerminating(RxSpinUpRequestsThread)) {
  386. // Wait for the thread to terminate.
  387. KeWaitForSingleObject(
  388. RxSpinUpRequestsThread,
  389. Executive,
  390. KernelMode,
  391. FALSE,
  392. NULL);
  393. ASSERT(PsIsThreadTerminating(RxSpinUpRequestsThread));
  394. }
  395. ObDereferenceObject(RxSpinUpRequestsThread);
  396. }
  397. for (
  398. ProcessorIndex = 0;
  399. ProcessorIndex < RxDispatcher.NumberOfProcessors;
  400. ProcessorIndex++
  401. ) {
  402. RxTearDownWorkQueueDispatcher(&RxDispatcher.pWorkQueueDispatcher[ProcessorIndex]);
  403. }
  404. //RxFreePool(RxDispatcher.pWorkQueueDispatcher);
  405. }
  406. return STATUS_SUCCESS;
  407. }
  408. VOID
  409. RxTearDownWorkQueueDispatcher(
  410. PRX_WORK_QUEUE_DISPATCHER pDispatcher)
  411. /*++
  412. Routine Description:
  413. This routine tears dwon the work queue dispatcher for a particular processor
  414. Arguments:
  415. pDispatcher - Work Queue Dispatcher
  416. --*/
  417. {
  418. PAGED_CODE();
  419. RxTearDownWorkQueue(
  420. &pDispatcher->WorkQueue[DelayedWorkQueue]);
  421. RxTearDownWorkQueue(
  422. &pDispatcher->WorkQueue[CriticalWorkQueue]);
  423. RxTearDownWorkQueue(
  424. &pDispatcher->WorkQueue[HyperCriticalWorkQueue]);
  425. }
  426. VOID
  427. RxTearDownWorkQueue(
  428. PRX_WORK_QUEUE pWorkQueue)
  429. /*++
  430. Routine Description:
  431. This routine tears down a work queue
  432. Arguments:
  433. pWorkQueue - Work Queue
  434. Notes:
  435. Tearing down a work queue is a more complex process when compared to initializing
  436. a work queue. This is because of the threads associated with the queue. In order
  437. to ensure that the work queue can be torn down correctly each of the threads
  438. associated with the queue must be spun down correctly.
  439. This is accomplished by changing the state of the work queue from
  440. RxWorkQueueActive to RxWorkQueueRundownInProgress. This prevents further requests
  441. from being inserted into the queue. Having done that the currently active threads
  442. must be spundown.
  443. The spinning down process is accelerated by posting a dummy work item onto the
  444. work queue so that the waits are immediately satisfied.
  445. --*/
  446. {
  447. KIRQL SavedIrql;
  448. ULONG NumberOfActiveThreads;
  449. PLIST_ENTRY pFirstListEntry,pNextListEntry;
  450. PRX_WORK_QUEUE_RUNDOWN_CONTEXT pRundownContext;
  451. pRundownContext = (PRX_WORK_QUEUE_RUNDOWN_CONTEXT)
  452. RxAllocatePoolWithTag(
  453. NonPagedPool,
  454. sizeof(RX_WORK_QUEUE_RUNDOWN_CONTEXT) +
  455. pWorkQueue->MaximumNumberOfWorkerThreads * sizeof(PETHREAD),
  456. RX_WORKQ_POOLTAG);
  457. if (pRundownContext != NULL) {
  458. KeInitializeEvent(
  459. &pRundownContext->RundownCompletionEvent,
  460. NotificationEvent,
  461. FALSE);
  462. pRundownContext->NumberOfThreadsSpunDown = 0;
  463. pRundownContext->ThreadPointers = (PETHREAD *)(pRundownContext + 1);
  464. KeAcquireSpinLock(&pWorkQueue->SpinLock,&SavedIrql);
  465. ASSERT((pWorkQueue->pRundownContext == NULL) &&
  466. (pWorkQueue->State == RxWorkQueueActive));
  467. pWorkQueue->pRundownContext = pRundownContext;
  468. pWorkQueue->State = RxWorkQueueRundownInProgress;
  469. NumberOfActiveThreads = pWorkQueue->NumberOfActiveWorkerThreads;
  470. KeReleaseSpinLock(&pWorkQueue->SpinLock,SavedIrql);
  471. if (NumberOfActiveThreads > 0) {
  472. pWorkQueue->WorkQueueItemForTearDownWorkQueue.pDeviceObject = RxFileSystemDeviceObject;
  473. InterlockedIncrement(&RxFileSystemDeviceObject->DispatcherContext.NumberOfWorkerThreads);
  474. ExInitializeWorkItem(&pWorkQueue->WorkQueueItemForTearDownWorkQueue,RxSpinDownWorkerThreads,pWorkQueue);
  475. KeInsertQueue(&pWorkQueue->Queue,&pWorkQueue->WorkQueueItemForTearDownWorkQueue.List);
  476. KeWaitForSingleObject(
  477. &pWorkQueue->pRundownContext->RundownCompletionEvent,
  478. Executive,
  479. KernelMode,
  480. FALSE,
  481. NULL);
  482. }
  483. if (pRundownContext->NumberOfThreadsSpunDown > 0) {
  484. LONG Index = 0;
  485. for (
  486. Index = pRundownContext->NumberOfThreadsSpunDown - 1;
  487. Index >= 0;
  488. Index--
  489. ) {
  490. PETHREAD pThread;
  491. pThread = pRundownContext->ThreadPointers[Index];
  492. ASSERT(pThread != NULL);
  493. if (!PsIsThreadTerminating(pThread)) {
  494. // Wait for the thread to terminate.
  495. KeWaitForSingleObject(
  496. pThread,
  497. Executive,
  498. KernelMode,
  499. FALSE,
  500. NULL);
  501. ASSERT(PsIsThreadTerminating(pThread));
  502. }
  503. ObDereferenceObject(pThread);
  504. }
  505. }
  506. RxFreePool(pRundownContext);
  507. }
  508. ASSERT(pWorkQueue->NumberOfActiveWorkerThreads == 0);
  509. pFirstListEntry = KeRundownQueue(&pWorkQueue->Queue);
  510. if (pFirstListEntry != NULL) {
  511. pNextListEntry = pFirstListEntry;
  512. do {
  513. PWORK_QUEUE_ITEM pWorkQueueItem;
  514. pWorkQueueItem = (PWORK_QUEUE_ITEM)
  515. CONTAINING_RECORD(
  516. pNextListEntry,
  517. WORK_QUEUE_ITEM,
  518. List);
  519. pNextListEntry = pNextListEntry->Flink;
  520. if (pWorkQueueItem->WorkerRoutine == RxWorkItemDispatcher) {
  521. RxFreePool(pWorkQueueItem);
  522. }
  523. } while (pNextListEntry != pFirstListEntry);
  524. }
  525. }
  526. NTSTATUS
  527. RxSpinUpWorkerThread(
  528. PRX_WORK_QUEUE pWorkQueue,
  529. PRX_WORKERTHREAD_ROUTINE Routine,
  530. PVOID Parameter)
  531. /*++
  532. Routine Description:
  533. This routine spins up a worker thread associated with the given queue.
  534. Arguments:
  535. pWorkQueue - the WorkQueue instance.
  536. Routine - the thread routine
  537. Parameter - the thread routine parameter
  538. Return Value:
  539. STATUS_SUCCESS if successful,
  540. otherwise appropriate error code
  541. --*/
  542. {
  543. NTSTATUS Status;
  544. HANDLE ThreadHandle;
  545. KIRQL SavedIrql;
  546. PAGED_CODE();
  547. KeAcquireSpinLock(&pWorkQueue->SpinLock,&SavedIrql);
  548. if( pWorkQueue->State == RxWorkQueueActive )
  549. {
  550. pWorkQueue->NumberOfActiveWorkerThreads++;
  551. Status = STATUS_SUCCESS;
  552. //RxLogRetail(("SpinUpWT %x %d %d\n", pWorkQueue, pWorkQueue->State, pWorkQueue->NumberOfActiveWorkerThreads ));
  553. }
  554. else
  555. {
  556. Status = STATUS_UNSUCCESSFUL;
  557. RxLogRetail(("SpinUpWT Fail %x %d %d\n", pWorkQueue, pWorkQueue->State, pWorkQueue->NumberOfActiveWorkerThreads ));
  558. //DbgPrint("[dkruse] RDBSS would have crashed here without this fix!\n");
  559. }
  560. KeReleaseSpinLock(&pWorkQueue->SpinLock, SavedIrql );
  561. if( NT_SUCCESS(Status) )
  562. {
  563. Status = PsCreateSystemThread(
  564. &ThreadHandle,
  565. PROCESS_ALL_ACCESS,
  566. NULL,
  567. NULL,
  568. NULL,
  569. Routine,
  570. Parameter);
  571. if (NT_SUCCESS(Status)) {
  572. // Close the handle so the thread can die when needed
  573. ZwClose(ThreadHandle);
  574. } else {
  575. // Log the inability to create a worker thread.
  576. RxLog(("WorkQ: %lx SpinUpStat %lx\n",pWorkQueue,Status));
  577. RxWmiLogError(Status,
  578. LOG,
  579. RxSpinUpWorkerThread,
  580. LOGPTR(pWorkQueue)
  581. LOGULONG(Status));
  582. // Change the thread count back, and set the rundown completion event if necessary
  583. KeAcquireSpinLock( &pWorkQueue->SpinLock, &SavedIrql );
  584. pWorkQueue->NumberOfActiveWorkerThreads--;
  585. pWorkQueue->NumberOfFailedSpinUpRequests++;
  586. if( (pWorkQueue->NumberOfActiveWorkerThreads == 0) &&
  587. (pWorkQueue->State == RxWorkQueueRundownInProgress) )
  588. {
  589. KeSetEvent(
  590. &pWorkQueue->pRundownContext->RundownCompletionEvent,
  591. IO_NO_INCREMENT,
  592. FALSE);
  593. }
  594. RxLogRetail(("SpinUpWT Fail2 %x %d %d\n", pWorkQueue, pWorkQueue->State, pWorkQueue->NumberOfActiveWorkerThreads ));
  595. KeReleaseSpinLock( &pWorkQueue->SpinLock, SavedIrql );
  596. }
  597. }
  598. return Status;
  599. }
  600. VOID
  601. RxpSpinUpWorkerThreads(
  602. PRX_WORK_QUEUE pWorkQueue)
  603. /*++
  604. Routine Description:
  605. This routine ensures that the dispatcher is not torn down while requests
  606. are pending in the kernel worker threads for spin ups
  607. Arguments:
  608. pWorkQueue - the WorkQueue instance.
  609. Notes:
  610. There is implicit reliance on the fact that the RxDispatcher owner process
  611. is the same as the system process. If this is not TRUE then an alternate
  612. way needs to be implemented for ensuring that spinup requests are not stuck
  613. behind other requests.
  614. --*/
  615. {
  616. LONG NumberOfWorkerThreads;
  617. PAGED_CODE();
  618. ASSERT(IoGetCurrentProcess() == RxDispatcher.OwnerProcess);
  619. RxSpinUpWorkerThreads(pWorkQueue);
  620. NumberOfWorkerThreads = InterlockedDecrement(
  621. &RxFileSystemDeviceObject->DispatcherContext.NumberOfWorkerThreads);
  622. if (NumberOfWorkerThreads == 0) {
  623. PKEVENT pTearDownEvent;
  624. pTearDownEvent = (PKEVENT)
  625. InterlockedExchangePointer(
  626. &RxFileSystemDeviceObject->DispatcherContext.pTearDownEvent,
  627. NULL);
  628. if (pTearDownEvent != NULL) {
  629. KeSetEvent(
  630. pTearDownEvent,
  631. IO_NO_INCREMENT,
  632. FALSE);
  633. }
  634. }
  635. }
  636. VOID
  637. RxSpinUpRequestsDispatcher(
  638. PRX_DISPATCHER pDispatcher)
  639. /*++
  640. Routine Description:
  641. This routine ensures that there is an independent thread to handle spinup
  642. requests for all types of threads. This routine will be active as long as
  643. the dispatcher is active
  644. Arguments:
  645. pDispatcher - the dispatcher instance.
  646. Notes:
  647. There is implicit reliance on the fact that the RxDispatcher owner process
  648. is the same as the system process. If this is not TRUE then an alternate
  649. way needs to be implemented for ensuring that spinup requests are not stuck
  650. behind other requests.
  651. --*/
  652. {
  653. PETHREAD ThisThread;
  654. NTSTATUS Status;
  655. RxDbgTrace(0,Dbg,("+++++ Worker SpinUp Requests Thread Startup %lx\n",PsGetCurrentThread()));
  656. ThisThread = PsGetCurrentThread();
  657. Status = ObReferenceObjectByPointer(
  658. ThisThread,
  659. THREAD_ALL_ACCESS,
  660. *PsThreadType,
  661. KernelMode);
  662. if (Status == STATUS_SUCCESS) {
  663. RxSpinUpRequestsThread = ThisThread;
  664. for (;;) {
  665. NTSTATUS Status;
  666. RX_DISPATCHER_STATE State;
  667. KIRQL SavedIrql;
  668. LIST_ENTRY SpinUpRequests;
  669. PLIST_ENTRY pListEntry;
  670. InitializeListHead(&SpinUpRequests);
  671. Status = KeWaitForSingleObject(
  672. &pDispatcher->SpinUpRequestsEvent,
  673. Executive,
  674. KernelMode,
  675. FALSE,
  676. &RxSpinUpDispatcherWaitInterval);
  677. ASSERT((Status == STATUS_SUCCESS) || (Status == STATUS_TIMEOUT));
  678. KeAcquireSpinLock(
  679. &pDispatcher->SpinUpRequestsLock,
  680. &SavedIrql);
  681. RxTransferList(
  682. &SpinUpRequests,
  683. &pDispatcher->SpinUpRequests);
  684. State = pDispatcher->State;
  685. KeResetEvent(
  686. &pDispatcher->SpinUpRequestsEvent);
  687. KeReleaseSpinLock(
  688. &pDispatcher->SpinUpRequestsLock,
  689. SavedIrql);
  690. // Process the spin up requests
  691. while (!IsListEmpty(&SpinUpRequests)) {
  692. PRX_WORKERTHREAD_ROUTINE Routine;
  693. PVOID pParameter;
  694. PWORK_QUEUE_ITEM pWorkQueueItem;
  695. PRX_WORK_QUEUE pWorkQueue;
  696. LONG ItemInUse;
  697. pListEntry = RemoveHeadList(&SpinUpRequests);
  698. pWorkQueueItem = (PWORK_QUEUE_ITEM)
  699. CONTAINING_RECORD(
  700. pListEntry,
  701. WORK_QUEUE_ITEM,
  702. List);
  703. Routine = pWorkQueueItem->WorkerRoutine;
  704. pParameter = pWorkQueueItem->Parameter;
  705. pWorkQueue = (PRX_WORK_QUEUE)pParameter;
  706. ItemInUse = InterlockedDecrement(&pWorkQueue->WorkQueueItemForSpinUpWorkerThreadInUse);
  707. RxLog(("WORKQ:SR %lx %lx\n", Routine, pParameter ));
  708. RxWmiLog(LOG,
  709. RxSpinUpRequestsDispatcher,
  710. LOGPTR(Routine)
  711. LOGPTR(pParameter));
  712. Routine(pParameter);
  713. }
  714. if (State != RxDispatcherActive) {
  715. KeSetEvent(
  716. &pDispatcher->SpinUpRequestsTearDownEvent,
  717. IO_NO_INCREMENT,
  718. FALSE);
  719. break;
  720. }
  721. }
  722. }
  723. PsTerminateSystemThread(STATUS_SUCCESS);
  724. }
  725. VOID
  726. RxSpinUpWorkerThreads(
  727. PRX_WORK_QUEUE pWorkQueue)
  728. /*++
  729. Routine Description:
  730. This routine spins up one or more worker thread associated with the given queue.
  731. Arguments:
  732. pWorkQueue - the WorkQueue instance.
  733. Return Value:
  734. STATUS_SUCCESS if successful,
  735. otherwise appropriate error code
  736. --*/
  737. {
  738. NTSTATUS Status = STATUS_SUCCESS;
  739. HANDLE ThreadHandle;
  740. LONG NumberOfThreads;
  741. KIRQL SavedIrql;
  742. LONG ItemInUse;
  743. if ((IoGetCurrentProcess() != RxDispatcher.OwnerProcess) ||
  744. (KeGetCurrentIrql() != PASSIVE_LEVEL)) {
  745. ItemInUse = InterlockedIncrement(&pWorkQueue->WorkQueueItemForSpinUpWorkerThreadInUse);
  746. if (ItemInUse > 1) {
  747. // A work queue item is already on the SpinUpRequests waiting to be processed.
  748. // No need to post another one.
  749. InterlockedDecrement(&pWorkQueue->WorkQueueItemForSpinUpWorkerThreadInUse);
  750. return;
  751. }
  752. InterlockedIncrement(
  753. &RxFileSystemDeviceObject->DispatcherContext.NumberOfWorkerThreads);
  754. ExInitializeWorkItem(
  755. (PWORK_QUEUE_ITEM)&pWorkQueue->WorkQueueItemForSpinUpWorkerThread,
  756. RxpSpinUpWorkerThreads,
  757. pWorkQueue);
  758. KeAcquireSpinLock(&RxDispatcher.SpinUpRequestsLock, &SavedIrql);
  759. InsertTailList(
  760. &RxDispatcher.SpinUpRequests,
  761. &pWorkQueue->WorkQueueItemForSpinUpWorkerThread.List);
  762. KeSetEvent(
  763. &RxDispatcher.SpinUpRequestsEvent,
  764. IO_NO_INCREMENT,
  765. FALSE);
  766. KeReleaseSpinLock(&RxDispatcher.SpinUpRequestsLock,SavedIrql);
  767. } else {
  768. // Decide on the number of worker threads that need to be spun up.
  769. KeAcquireSpinLock(&pWorkQueue->SpinLock, &SavedIrql);
  770. if( pWorkQueue->State != RxWorkQueueRundownInProgress )
  771. {
  772. NumberOfThreads = pWorkQueue->MaximumNumberOfWorkerThreads -
  773. pWorkQueue->NumberOfActiveWorkerThreads;
  774. if (NumberOfThreads > pWorkQueue->NumberOfWorkItemsToBeDispatched) {
  775. NumberOfThreads = pWorkQueue->NumberOfWorkItemsToBeDispatched;
  776. }
  777. }
  778. else
  779. {
  780. // We're running down, so don't increment
  781. NumberOfThreads = 0;
  782. //DbgPrint( "[dkruse] Preventing rundown!\n" );
  783. }
  784. pWorkQueue->SpinUpRequestPending = FALSE;
  785. KeReleaseSpinLock(&pWorkQueue->SpinLock, SavedIrql);
  786. while (NumberOfThreads-- > 0) {
  787. Status = RxSpinUpWorkerThread(
  788. pWorkQueue,
  789. RxWorkerThreadDispatcher,
  790. pWorkQueue);
  791. if (Status != STATUS_SUCCESS) {
  792. break;
  793. }
  794. }
  795. if (Status != STATUS_SUCCESS) {
  796. ItemInUse = InterlockedIncrement(&pWorkQueue->WorkQueueItemForSpinUpWorkerThreadInUse);
  797. if (ItemInUse > 1) {
  798. // A work queue item is already on the SpinUpRequests waiting to be processed.
  799. // No need to post another one.
  800. InterlockedDecrement(&pWorkQueue->WorkQueueItemForSpinUpWorkerThreadInUse);
  801. return;
  802. }
  803. ExInitializeWorkItem(
  804. (PWORK_QUEUE_ITEM)&pWorkQueue->WorkQueueItemForSpinUpWorkerThread,
  805. RxpSpinUpWorkerThreads,
  806. pWorkQueue);
  807. KeAcquireSpinLock(&pWorkQueue->SpinLock, &SavedIrql);
  808. pWorkQueue->SpinUpRequestPending = TRUE;
  809. KeReleaseSpinLock(&pWorkQueue->SpinLock, SavedIrql);
  810. KeAcquireSpinLock(&RxDispatcher.SpinUpRequestsLock, &SavedIrql);
  811. // An attempt to spin up a worker thread failed. Reschedule the
  812. // requests to attempt this operation later.
  813. InterlockedIncrement(
  814. &RxFileSystemDeviceObject->DispatcherContext.NumberOfWorkerThreads);
  815. InsertTailList(
  816. &RxDispatcher.SpinUpRequests,
  817. &pWorkQueue->WorkQueueItemForSpinUpWorkerThread.List);
  818. KeReleaseSpinLock(&RxDispatcher.SpinUpRequestsLock,SavedIrql);
  819. }
  820. }
  821. }
  822. VOID
  823. RxSpinDownWorkerThreads(
  824. PRX_WORK_QUEUE pWorkQueue)
  825. /*++
  826. Routine Description:
  827. This routine spins down one or more worker thread associated with the given queue.
  828. Arguments:
  829. pWorkQueue - the WorkQueue instance.
  830. --*/
  831. {
  832. KIRQL SavedIrql;
  833. BOOLEAN RepostSpinDownRequest = FALSE;
  834. // Decide on the number of worker threads that need to be spun up.
  835. KeAcquireSpinLock(&pWorkQueue->SpinLock, &SavedIrql);
  836. if (pWorkQueue->NumberOfActiveWorkerThreads > 1) {
  837. RepostSpinDownRequest = TRUE;
  838. }
  839. KeReleaseSpinLock(&pWorkQueue->SpinLock, SavedIrql);
  840. if (RepostSpinDownRequest) {
  841. if (pWorkQueue->WorkQueueItemForSpinDownWorkerThread.pDeviceObject == NULL) {
  842. pWorkQueue->WorkQueueItemForSpinDownWorkerThread.pDeviceObject = RxFileSystemDeviceObject;
  843. }
  844. ExInitializeWorkItem(&pWorkQueue->WorkQueueItemForSpinDownWorkerThread,RxSpinDownWorkerThreads,pWorkQueue);
  845. KeInsertQueue(&pWorkQueue->Queue,&pWorkQueue->WorkQueueItemForSpinDownWorkerThread.List);
  846. }
  847. }
  848. BOOLEAN DumpDispatchRoutine = FALSE;
  849. VOID
  850. RxpWorkerThreadDispatcher(
  851. IN PRX_WORK_QUEUE pWorkQueue,
  852. IN PLARGE_INTEGER pWaitInterval)
  853. /*++
  854. Routine Description:
  855. This routine dispatches a work item and frees the associated work item
  856. Arguments:
  857. pWorkQueue - the WorkQueue instance.
  858. pWaitInterval - the interval for waiting on the KQUEUE.
  859. --*/
  860. {
  861. NTSTATUS Status;
  862. PLIST_ENTRY pListEntry;
  863. PRX_WORK_QUEUE_ITEM pWorkQueueItem;
  864. PRX_WORKERTHREAD_ROUTINE Routine;
  865. PVOID pParameter;
  866. BOOLEAN SpindownThread,DereferenceThread;
  867. KIRQL SavedIrql;
  868. PETHREAD ThisThread;
  869. RxDbgTrace(0,Dbg,("+++++ Worker Thread Startup %lx\n",PsGetCurrentThread()));
  870. InterlockedIncrement(&pWorkQueue->NumberOfIdleWorkerThreads);
  871. ThisThread = PsGetCurrentThread();
  872. Status = ObReferenceObjectByPointer(
  873. ThisThread,
  874. THREAD_ALL_ACCESS,
  875. *PsThreadType,
  876. KernelMode);
  877. ASSERT(Status == STATUS_SUCCESS);
  878. SpindownThread = FALSE;
  879. DereferenceThread = FALSE;
  880. for (;;) {
  881. pListEntry = KeRemoveQueue(
  882. &pWorkQueue->Queue,
  883. KernelMode,
  884. pWaitInterval);
  885. if ((NTSTATUS)(ULONG_PTR)pListEntry != STATUS_TIMEOUT) {
  886. LONG FinalRefCount;
  887. PRDBSS_DEVICE_OBJECT pMRxDeviceObject;
  888. InterlockedIncrement(&pWorkQueue->NumberOfWorkItemsDispatched);
  889. InterlockedDecrement(&pWorkQueue->NumberOfWorkItemsToBeDispatched);
  890. InterlockedDecrement(&pWorkQueue->NumberOfIdleWorkerThreads);
  891. InitializeListHead(pListEntry);
  892. pWorkQueueItem = (PRX_WORK_QUEUE_ITEM)
  893. CONTAINING_RECORD(
  894. pListEntry,
  895. RX_WORK_QUEUE_ITEM,
  896. List);
  897. pMRxDeviceObject = pWorkQueueItem->pDeviceObject;
  898. // This is a regular work item. Invoke the routine in the context of
  899. // a try catch block.
  900. Routine = pWorkQueueItem->WorkerRoutine;
  901. pParameter = pWorkQueueItem->Parameter;
  902. // Reset the fields in the Work item.
  903. ExInitializeWorkItem(pWorkQueueItem,NULL,NULL);
  904. pWorkQueueItem->pDeviceObject = NULL;
  905. RxDbgTrace(0, Dbg, ("RxWorkerThreadDispatcher Routine(%lx) Parameter(%lx)\n",Routine,pParameter));
  906. //RxLog(("WORKQ:Ex Dev(%lx) %lx %lx\n", pMRxDeviceObject,Routine, pParameter ));
  907. //RxWmiLog(LOG,
  908. // RxpWorkerThreadDispatcher,
  909. // LOGPTR(pMRxDeviceObject)
  910. // LOGPTR(Routine)
  911. // LOGPTR(pParameter));
  912. Routine(pParameter);
  913. FinalRefCount = InterlockedDecrement(&pMRxDeviceObject->DispatcherContext.NumberOfWorkerThreads);
  914. if (FinalRefCount == 0) {
  915. PKEVENT pTearDownEvent;
  916. pTearDownEvent = (PKEVENT)
  917. InterlockedExchangePointer(
  918. &pMRxDeviceObject->DispatcherContext.pTearDownEvent,
  919. NULL);
  920. if (pTearDownEvent != NULL) {
  921. KeSetEvent(
  922. pTearDownEvent,
  923. IO_NO_INCREMENT,
  924. FALSE);
  925. }
  926. }
  927. InterlockedIncrement(&pWorkQueue->NumberOfIdleWorkerThreads);
  928. }
  929. KeAcquireSpinLock(&pWorkQueue->SpinLock,&SavedIrql);
  930. switch (pWorkQueue->State) {
  931. case RxWorkQueueActive:
  932. {
  933. if (pWorkQueue->NumberOfWorkItemsToBeDispatched > 0) {
  934. // Delay spinning down a worker thread till the existing work
  935. // items have been dispatched.
  936. break;
  937. }
  938. }
  939. // lack of break intentional.
  940. // Ensure that the number of idle threads is not more than the
  941. // minimum number of worker threads permitted for the work queue
  942. case RxWorkQueueInactive:
  943. {
  944. ASSERT(pWorkQueue->NumberOfActiveWorkerThreads > 0);
  945. if (pWorkQueue->NumberOfActiveWorkerThreads >
  946. pWorkQueue->MinimumNumberOfWorkerThreads) {
  947. SpindownThread = TRUE;
  948. DereferenceThread = TRUE;
  949. InterlockedDecrement(&pWorkQueue->NumberOfActiveWorkerThreads);
  950. }
  951. }
  952. break;
  953. case RxWorkQueueRundownInProgress:
  954. {
  955. PRX_WORK_QUEUE_RUNDOWN_CONTEXT pRundownContext;
  956. pRundownContext = pWorkQueue->pRundownContext;
  957. // The work queue is no longer active. Spin down all the worker
  958. // threads associated with the work queue.
  959. ASSERT(pRundownContext != NULL);
  960. pRundownContext->ThreadPointers[pRundownContext->NumberOfThreadsSpunDown++] = ThisThread;
  961. InterlockedDecrement(&pWorkQueue->NumberOfActiveWorkerThreads);
  962. SpindownThread = TRUE;
  963. DereferenceThread = FALSE;
  964. if (pWorkQueue->NumberOfActiveWorkerThreads == 0) {
  965. KeSetEvent(
  966. &pWorkQueue->pRundownContext->RundownCompletionEvent,
  967. IO_NO_INCREMENT,
  968. FALSE);
  969. }
  970. }
  971. break;
  972. default:
  973. ASSERT(!"Valid State For Work Queue");
  974. }
  975. if (SpindownThread) {
  976. InterlockedDecrement(&pWorkQueue->NumberOfIdleWorkerThreads);
  977. }
  978. KeReleaseSpinLock(&pWorkQueue->SpinLock,SavedIrql);
  979. if (SpindownThread) {
  980. RxDbgTrace(0,Dbg,("----- Worker Thread Exit %lx\n",PsGetCurrentThread()));
  981. break;
  982. }
  983. }
  984. if (DereferenceThread) {
  985. ObDereferenceObject(ThisThread);
  986. }
  987. if (DumpDispatchRoutine) {
  988. // just to keep them around on free build for debug purpose
  989. DbgPrint("Dispatch routine %lx %lx %lx\n",Routine,pParameter,pWorkQueueItem);
  990. }
  991. PsTerminateSystemThread(STATUS_SUCCESS);
  992. }
  993. VOID
  994. RxBootstrapWorkerThreadDispatcher(
  995. PRX_WORK_QUEUE pWorkQueue)
  996. /*++
  997. Routine Description:
  998. This routine is for worker threads that use a infinite time interval
  999. for waiting on the KQUEUE data structure. These threads cannot be throtled
  1000. back and are used for ensuring that the bare minimum number of threads
  1001. are always active ( primarily startup purposes )
  1002. Arguments:
  1003. pWorkQueue - the WorkQueue instance.
  1004. --*/
  1005. {
  1006. PAGED_CODE();
  1007. RxpWorkerThreadDispatcher(pWorkQueue,NULL);
  1008. }
  1009. VOID
  1010. RxWorkerThreadDispatcher(
  1011. PRX_WORK_QUEUE pWorkQueue)
  1012. /*++
  1013. Routine Description:
  1014. This routine is for worker threads that use a finite time interval to wait
  1015. on the KQUEUE data structure. Such threads have a self regulatory mechanism
  1016. built in which causes them to spin down if the work load eases off. The
  1017. time interval is based on the type of the work queue
  1018. Arguments:
  1019. pWorkQueue - the WorkQueue instance.
  1020. --*/
  1021. {
  1022. PAGED_CODE();
  1023. RxpWorkerThreadDispatcher(
  1024. pWorkQueue,
  1025. &RxWorkQueueWaitInterval[pWorkQueue->Type]);
  1026. }
  1027. NTSTATUS
  1028. RxInsertWorkQueueItem(
  1029. PRDBSS_DEVICE_OBJECT pDeviceObject,
  1030. WORK_QUEUE_TYPE WorkQueueType,
  1031. PRX_WORK_QUEUE_ITEM pWorkQueueItem)
  1032. /*++
  1033. Routine Description:
  1034. This routine inserts a work item into the appropriate queue.
  1035. Arguments:
  1036. pDeviceObject - the device object
  1037. WorkQueueType - the type of work item
  1038. pWorkQueueItem - the work queue item
  1039. Return Value:
  1040. STATUS_SUCCESS -- successful
  1041. other status codes indicate error conditions
  1042. STATUS_INSUFFICIENT_RESOURCES -- could not dispatch
  1043. Notes:
  1044. This routine inserts the work item into the appropriate queue and spins
  1045. up a worker thread if required.
  1046. There are some extensions to this routine that needs to be implemented. These
  1047. have been delayed in order to get an idea of the costs and the benefits of
  1048. the various tradeoffs involved.
  1049. The current implementation follows a very simple logic in queueing work
  1050. from the various sources onto the same processor from which it originated.
  1051. The benefits associated with this approach are the prevention of cache/state
  1052. sloshing as the work is moved around from one processor to another. The
  1053. undesirable charecterstic is the skewing of work load on the various processors.
  1054. The important question that needs to be answered is when is it beneficial to
  1055. sacrifice the affinity to a processor. This depends upon the workload associated
  1056. with the current processor and the amount of information associated with the
  1057. given processor. The later is more difficult to determine.
  1058. --*/
  1059. {
  1060. NTSTATUS Status = STATUS_SUCCESS;
  1061. KIRQL SavedIrql;
  1062. BOOLEAN SpinUpWorkerThread = FALSE;
  1063. ULONG ProcessorNumber;
  1064. // If the dispatcher were on a per processor basis the ProcessorNumber
  1065. // would be indx for accessing the dispatcher data structure
  1066. // ProcessorNumber = KeGetCurrentProcessorNumber();
  1067. PRX_WORK_QUEUE_DISPATCHER pWorkQueueDispatcher;
  1068. PRX_WORK_QUEUE pWorkQueue;
  1069. ProcessorNumber = 0;
  1070. pWorkQueueDispatcher = &RxDispatcher.pWorkQueueDispatcher[ProcessorNumber];
  1071. pWorkQueue = &pWorkQueueDispatcher->WorkQueue[WorkQueueType];
  1072. if (RxDispatcher.State != RxDispatcherActive)
  1073. {
  1074. return STATUS_UNSUCCESSFUL;
  1075. }
  1076. KeAcquireSpinLock(&pWorkQueue->SpinLock, &SavedIrql);
  1077. if ((pWorkQueue->State == RxWorkQueueActive) &&
  1078. (pDeviceObject->DispatcherContext.pTearDownEvent == NULL)) {
  1079. pWorkQueueItem->pDeviceObject = pDeviceObject;
  1080. InterlockedIncrement(&pDeviceObject->DispatcherContext.NumberOfWorkerThreads);
  1081. pWorkQueue->CumulativeQueueLength += pWorkQueue->NumberOfWorkItemsToBeDispatched;
  1082. InterlockedIncrement(&pWorkQueue->NumberOfWorkItemsToBeDispatched);
  1083. if ((pWorkQueue->NumberOfIdleWorkerThreads < pWorkQueue->NumberOfWorkItemsToBeDispatched) &&
  1084. (pWorkQueue->NumberOfActiveWorkerThreads < pWorkQueue->MaximumNumberOfWorkerThreads) &&
  1085. (!pWorkQueue->SpinUpRequestPending)) {
  1086. pWorkQueue->SpinUpRequestPending = TRUE;
  1087. SpinUpWorkerThread = TRUE;
  1088. }
  1089. } else {
  1090. Status = STATUS_UNSUCCESSFUL;
  1091. }
  1092. KeReleaseSpinLock(&pWorkQueue->SpinLock, SavedIrql);
  1093. if (Status == STATUS_SUCCESS) {
  1094. KeInsertQueue(&pWorkQueue->Queue,&pWorkQueueItem->List);
  1095. if (SpinUpWorkerThread) {
  1096. RxSpinUpWorkerThreads(
  1097. pWorkQueue);
  1098. }
  1099. } else {
  1100. RxWmiLogError(Status,
  1101. LOG,
  1102. RxInsertWorkQueueItem,
  1103. LOGPTR(pDeviceObject)
  1104. LOGULONG(WorkQueueType)
  1105. LOGPTR(pWorkQueueItem)
  1106. LOGUSTR(pDeviceObject->DeviceName));
  1107. }
  1108. return Status;
  1109. }
  1110. VOID
  1111. RxWorkItemDispatcher(
  1112. PVOID pContext)
  1113. /*++
  1114. Routine Description:
  1115. This routine serves as a wrapper for dispatching a work item and for
  1116. performing the related cleanup actions
  1117. Arguments:
  1118. pContext - the Context parameter that is passed to the driver routine.
  1119. Notes:
  1120. There are two cases of dispatching to worker threads. When an instance is going to
  1121. be repeatedly dispatched time is conserved by allocating the WORK_QUEUE_ITEM as
  1122. part of the data structure to be dispatched. On the other hand if it is a very
  1123. infrequent operation space can be conserved by dynamically allocating and freeing
  1124. memory for the work queue item. This tradesoff time for space.
  1125. This routine implements a wrapper for those instances in which time was traded
  1126. off for space. It invokes the desired routine and frees the memory.
  1127. --*/
  1128. {
  1129. PRX_WORK_DISPATCH_ITEM pDispatchItem;
  1130. PRX_WORKERTHREAD_ROUTINE Routine;
  1131. PVOID Parameter;
  1132. pDispatchItem = (PRX_WORK_DISPATCH_ITEM)pContext;
  1133. Routine = pDispatchItem->DispatchRoutine;
  1134. Parameter = pDispatchItem->DispatchRoutineParameter;
  1135. //RxLog(("WORKQ:Ds %lx %lx\n", Routine, Parameter ));
  1136. //RxWmiLog(LOG,
  1137. // RxWorkItemDispatcher,
  1138. // LOGPTR(Routine)
  1139. // LOGPTR(Parameter));
  1140. Routine(Parameter);
  1141. RxFreePool(pDispatchItem);
  1142. }
  1143. NTSTATUS
  1144. RxDispatchToWorkerThread(
  1145. IN OUT PRDBSS_DEVICE_OBJECT pMRxDeviceObject,
  1146. IN WORK_QUEUE_TYPE WorkQueueType,
  1147. IN PRX_WORKERTHREAD_ROUTINE Routine,
  1148. IN PVOID pContext)
  1149. /*++
  1150. Routine Description:
  1151. This routine invokes the routine in the context of a worker thread.
  1152. Arguments:
  1153. pMRxDeviceObject - the device object of the corresponding mini redirector
  1154. WorkQueueType - the type of the work queue
  1155. Routine - routine to be invoked
  1156. pContext - the Context parameter that is passed to the driver routine.
  1157. Return Value:
  1158. STATUS_SUCCESS -- successful
  1159. STATUS_INSUFFICIENT_RESOURCES -- could not dispatch
  1160. Notes:
  1161. There are two cases of dispatching to worker threads. When an instance is going to
  1162. be repeatedly dispatched time is conserved by allocating the WORK_QUEUE_ITEM as
  1163. part of the data structure to be dispatched. On the other hand if it is a very
  1164. infrequent operation space can be conserved by dynamically allocating and freeing
  1165. memory for the work queue item. This tradesoff time for space.
  1166. --*/
  1167. {
  1168. NTSTATUS Status;
  1169. PRX_WORK_DISPATCH_ITEM pDispatchItem;
  1170. KIRQL SavedIrql;
  1171. pDispatchItem = RxAllocatePoolWithTag(
  1172. NonPagedPool,
  1173. sizeof(RX_WORK_DISPATCH_ITEM),
  1174. RX_WORKQ_POOLTAG);
  1175. if (pDispatchItem != NULL) {
  1176. pDispatchItem->DispatchRoutine = Routine;
  1177. pDispatchItem->DispatchRoutineParameter = pContext;
  1178. ExInitializeWorkItem(
  1179. &pDispatchItem->WorkQueueItem,
  1180. RxWorkItemDispatcher,
  1181. pDispatchItem);
  1182. Status = RxInsertWorkQueueItem(pMRxDeviceObject,WorkQueueType,&pDispatchItem->WorkQueueItem);
  1183. } else {
  1184. Status = STATUS_INSUFFICIENT_RESOURCES;
  1185. }
  1186. if (Status != STATUS_SUCCESS) {
  1187. if (pDispatchItem != NULL) {
  1188. RxFreePool(pDispatchItem);
  1189. }
  1190. RxLog(("WORKQ:Queue(D) %ld %lx %lx %lx\n", WorkQueueType,Routine,pContext,Status));
  1191. RxWmiLogError(Status,
  1192. LOG,
  1193. RxDispatchToWorkerThread,
  1194. LOGULONG(WorkQueueType)
  1195. LOGPTR(Routine)
  1196. LOGPTR(pContext)
  1197. LOGULONG(Status));
  1198. }
  1199. return Status;
  1200. }
  1201. NTSTATUS
  1202. RxPostToWorkerThread(
  1203. IN OUT PRDBSS_DEVICE_OBJECT pMRxDeviceObject,
  1204. IN WORK_QUEUE_TYPE WorkQueueType,
  1205. IN OUT PRX_WORK_QUEUE_ITEM pWorkQueueItem,
  1206. IN PRX_WORKERTHREAD_ROUTINE Routine,
  1207. IN PVOID pContext)
  1208. /*++
  1209. Routine Description:
  1210. This routine invokes the routine in the context of a worker thread.
  1211. Arguments:
  1212. WorkQueueType - the priority of the task at hand.
  1213. WorkQueueItem - the work queue item
  1214. Routine - routine to be invoked
  1215. pContext - the Context parameter that is passed to the driver routine.
  1216. Return Value:
  1217. STATUS_SUCCESS -- successful
  1218. STATUS_INSUFFICIENT_RESOURCES -- could not dispatch
  1219. Notes:
  1220. There are two cases of dispatching to worker threads. When an instance is going to
  1221. be repeatedly dispatched time is conserved by allocating the WORK_QUEUE_ITEM as
  1222. part of the data structure to be dispatched. On the other hand if it is a very
  1223. infrequent operation space can be conserved by dynamically allocating and freeing
  1224. memory for the work queue item. This tradesoff time for space.
  1225. --*/
  1226. {
  1227. NTSTATUS Status;
  1228. ExInitializeWorkItem( pWorkQueueItem,Routine,pContext );
  1229. Status = RxInsertWorkQueueItem(pMRxDeviceObject,WorkQueueType,pWorkQueueItem);
  1230. if (Status != STATUS_SUCCESS) {
  1231. RxLog(("WORKQ:Queue(P) %ld %lx %lx %lx\n", WorkQueueType,Routine,pContext,Status));
  1232. RxWmiLogError(Status,
  1233. LOG,
  1234. RxPostToWorkerThread,
  1235. LOGULONG(WorkQueueType)
  1236. LOGPTR(Routine)
  1237. LOGPTR(pContext)
  1238. LOGULONG(Status));
  1239. }
  1240. return Status;
  1241. }
  1242. PEPROCESS
  1243. RxGetRDBSSProcess()
  1244. {
  1245. return RxData.OurProcess;
  1246. }