Leaked source code of windows server 2003
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

911 lines
23 KiB

  1. /*++
  2. Copyright (c) 1989 Microsoft Corporation
  3. Module Name:
  4. worker.c
  5. Abstract:
  6. This module implements the LAN Manager server FSP worker thread
  7. function. It also implements routines for managing (i.e., starting
  8. and stopping) worker threads, and balancing load.
  9. Author:
  10. Chuck Lenzmeier (chuckl) 01-Oct-1989
  11. David Treadwell (davidtr)
  12. Environment:
  13. Kernel mode
  14. Revision History:
  15. --*/
  16. #include "precomp.h"
  17. #include "worker.tmh"
  18. #pragma hdrstop
  19. #define BugCheckFileId SRV_FILE_WORKER
  20. // This is the maximum number of workitems that can be in the queue to be
  21. // LPC'd up to user mode. This prevents us from consuming all the resources if
  22. // spooler or some other user-mode process gets wedged or overly stressed
  23. #define SRV_MAX_LPC_CALLS 250
  24. //
  25. // Local declarations
  26. //
  27. NTSTATUS
  28. CreateQueueThread (
  29. IN PWORK_QUEUE Queue
  30. );
  31. VOID
  32. InitializeWorkerThread (
  33. IN PWORK_QUEUE WorkQueue,
  34. IN KPRIORITY ThreadPriority
  35. );
  36. VOID
  37. WorkerThread (
  38. IN PWORK_QUEUE WorkQueue
  39. );
  40. #ifdef ALLOC_PRAGMA
  41. #pragma alloc_text( PAGE, SrvCreateWorkerThreads )
  42. #pragma alloc_text( PAGE, CreateQueueThread )
  43. #pragma alloc_text( PAGE, InitializeWorkerThread )
  44. #pragma alloc_text( PAGE, WorkerThread )
  45. #endif
  46. #if 0
  47. NOT PAGEABLE -- SrvQueueWorkToBlockingThread
  48. NOT PAGEABLE -- SrvQueueWorkToFsp
  49. NOT PAGEABLE -- SrvQueueWorkToFspAtSendCompletion
  50. NOT PAGEABLE -- SrvBalanceLoad
  51. #endif
  52. NTSTATUS
  53. SrvCreateWorkerThreads (
  54. VOID
  55. )
  56. /*++
  57. Routine Description:
  58. This function creates the worker threads for the LAN Manager server
  59. FSP.
  60. Arguments:
  61. None.
  62. Return Value:
  63. NTSTATUS - Status of thread creation
  64. --*/
  65. {
  66. NTSTATUS status;
  67. PWORK_QUEUE queue;
  68. PAGED_CODE( );
  69. //
  70. // Create the nonblocking worker threads.
  71. //
  72. for( queue = SrvWorkQueues; queue < eSrvWorkQueues; queue++ ) {
  73. status = CreateQueueThread( queue );
  74. if( !NT_SUCCESS( status ) ) {
  75. return status;
  76. }
  77. }
  78. //
  79. // Create the blocking worker threads.
  80. //
  81. for( queue = SrvBlockingWorkQueues; queue < eSrvBlockingWorkQueues; queue++ ) {
  82. status = CreateQueueThread( queue );
  83. if( !NT_SUCCESS( status ) ) {
  84. return status;
  85. }
  86. }
  87. status = CreateQueueThread( &SrvLpcWorkQueue );
  88. if( !NT_SUCCESS( status ) )
  89. {
  90. return status;
  91. }
  92. return STATUS_SUCCESS;
  93. } // SrvCreateWorkerThreads
  94. NTSTATUS
  95. CreateQueueThread (
  96. IN PWORK_QUEUE Queue
  97. )
  98. /*++
  99. Routine Description:
  100. This function creates a worker thread to service a queue.
  101. NOTE: The scavenger occasionally kills off threads on a queue. If logic
  102. here is modified, you may need to look there too.
  103. Arguments:
  104. Queue - the queue to service
  105. Return Value:
  106. NTSTATUS - Status of thread creation
  107. --*/
  108. {
  109. HANDLE threadHandle;
  110. LARGE_INTEGER interval;
  111. NTSTATUS status;
  112. PAGED_CODE();
  113. //
  114. // Another thread is coming into being. Keep the counts up to date
  115. //
  116. InterlockedIncrement( &Queue->Threads );
  117. InterlockedIncrement( &Queue->AvailableThreads );
  118. status = PsCreateSystemThread(
  119. &threadHandle,
  120. PROCESS_ALL_ACCESS,
  121. NULL,
  122. NtCurrentProcess(),
  123. NULL,
  124. WorkerThread,
  125. Queue
  126. );
  127. if ( !NT_SUCCESS(status) ) {
  128. INTERNAL_ERROR(
  129. ERROR_LEVEL_EXPECTED,
  130. "CreateQueueThread: PsCreateSystemThread for "
  131. "queue %X returned %X",
  132. Queue,
  133. status
  134. );
  135. InterlockedDecrement( &Queue->Threads );
  136. InterlockedDecrement( &Queue->AvailableThreads );
  137. SrvLogServiceFailure( SRV_SVC_PS_CREATE_SYSTEM_THREAD, status );
  138. return status;
  139. }
  140. //
  141. // Close the handle so the thread can die when needed
  142. //
  143. SrvNtClose( threadHandle, FALSE );
  144. //
  145. // If we just created the first queue thread, wait for it
  146. // to store its thread pointer in IrpThread. This pointer is
  147. // stored in all IRPs issued for this queue by the server.
  148. //
  149. while ( Queue->IrpThread == NULL ) {
  150. interval.QuadPart = -1*10*1000*10; // .01 second
  151. KeDelayExecutionThread( KernelMode, FALSE, &interval );
  152. }
  153. return STATUS_SUCCESS;
  154. } // CreateQueueThread
  155. VOID
  156. InitializeWorkerThread (
  157. IN PWORK_QUEUE WorkQueue,
  158. IN KPRIORITY ThreadPriority
  159. )
  160. {
  161. NTSTATUS status;
  162. KPRIORITY basePriority;
  163. PAGED_CODE( );
  164. #if SRVDBG_LOCK
  165. {
  166. //
  167. // Create a special system thread TEB. The size of this TEB is just
  168. // large enough to accommodate the first three user-reserved
  169. // longwords. These three locations are used for lock debugging. If
  170. // the allocation fails, then no lock debugging will be performed
  171. // for this thread.
  172. //
  173. //
  174. PETHREAD Thread = PsGetCurrentThread( );
  175. ULONG TebSize = FIELD_OFFSET( TEB, UserReserved[0] ) + SRV_TEB_USER_SIZE;
  176. Thread->Tcb.Teb = ExAllocatePoolWithTag( NonPagedPool, TebSize, TAG_FROM_TYPE(BlockTypeMisc) );
  177. if ( Thread->Tcb.Teb != NULL ) {
  178. RtlZeroMemory( Thread->Tcb.Teb, TebSize );
  179. }
  180. }
  181. #endif // SRVDBG_LOCK
  182. //
  183. // Set this thread's priority.
  184. //
  185. basePriority = ThreadPriority;
  186. status = NtSetInformationThread (
  187. NtCurrentThread( ),
  188. ThreadBasePriority,
  189. &basePriority,
  190. sizeof(basePriority)
  191. );
  192. if ( !NT_SUCCESS(status) ) {
  193. INTERNAL_ERROR(
  194. ERROR_LEVEL_UNEXPECTED,
  195. "InitializeWorkerThread: NtSetInformationThread failed: %X\n",
  196. status,
  197. NULL
  198. );
  199. SrvLogServiceFailure( SRV_SVC_NT_SET_INFO_THREAD, status );
  200. }
  201. #if MULTIPROCESSOR
  202. //
  203. // If this is a nonblocking worker thread, set its ideal processor affinity. Setting
  204. // ideal affinity informs ntos that the thread would rather run on its ideal
  205. // processor if reasonable, but if ntos can't schedule it on that processor then it is
  206. // ok to schedule it on a different processor.
  207. //
  208. if( SrvNumberOfProcessors > 1 && WorkQueue >= SrvWorkQueues && WorkQueue < eSrvWorkQueues ) {
  209. KeSetIdealProcessorThread( KeGetCurrentThread(), (CCHAR)(WorkQueue - SrvWorkQueues) );
  210. }
  211. //
  212. // Blocking threads are now also affinitized
  213. //
  214. if( SrvNumberOfProcessors >= 4 && WorkQueue >= SrvBlockingWorkQueues && WorkQueue < eSrvBlockingWorkQueues ) {
  215. KeSetIdealProcessorThread( KeGetCurrentThread(), (CCHAR)(WorkQueue - SrvBlockingWorkQueues) );
  216. }
  217. #endif
  218. //
  219. // Disable hard error popups for this thread.
  220. //
  221. IoSetThreadHardErrorMode( FALSE );
  222. return;
  223. } // InitializeWorkerThread
  224. VOID
  225. WorkerThread (
  226. IN PWORK_QUEUE WorkQueue
  227. )
  228. {
  229. PLIST_ENTRY listEntry;
  230. PWORK_CONTEXT workContext;
  231. ULONG timeDifference;
  232. ULONG updateSmbCount = 0;
  233. ULONG updateTime = 0;
  234. BOOLEAN iAmBlockingThread = ((WorkQueue >= SrvBlockingWorkQueues) && (WorkQueue < eSrvBlockingWorkQueues)) ? TRUE : FALSE;
  235. PLARGE_INTEGER Timeout = NULL;
  236. BOOLEAN iAmLpcThread = (WorkQueue == &SrvLpcWorkQueue) ? TRUE : FALSE;
  237. PAGED_CODE();
  238. //
  239. // If this is the first worker thread, save the thread pointer.
  240. //
  241. if( WorkQueue->IrpThread == NULL ) {
  242. WorkQueue->IrpThread = PsGetCurrentThread( );
  243. }
  244. InitializeWorkerThread( WorkQueue, SrvThreadPriority );
  245. //
  246. // If we are the IrpThread, we don't want to die
  247. //
  248. if( WorkQueue->IrpThread != PsGetCurrentThread( ) ) {
  249. Timeout = &WorkQueue->IdleTimeOut;
  250. }
  251. //
  252. // Loop infinitely dequeueing and processing work items.
  253. //
  254. while ( TRUE ) {
  255. listEntry = KeRemoveQueue(
  256. &WorkQueue->Queue,
  257. WorkQueue->WaitMode,
  258. Timeout
  259. );
  260. if( (ULONG_PTR)listEntry == STATUS_TIMEOUT ) {
  261. //
  262. // We have a non critical thread that hasn't gotten any work for
  263. // awhile. Time to die.
  264. //
  265. InterlockedDecrement( &WorkQueue->AvailableThreads );
  266. InterlockedDecrement( &WorkQueue->Threads );
  267. SrvTerminateWorkerThread( NULL );
  268. }
  269. if( InterlockedDecrement( &WorkQueue->AvailableThreads ) == 0 &&
  270. !SrvFspTransitioning &&
  271. WorkQueue->Threads < WorkQueue->MaxThreads ) {
  272. //
  273. // We are running low on threads for this queue. Spin up
  274. // another one before handling this request
  275. //
  276. CreateQueueThread( WorkQueue );
  277. }
  278. //
  279. // Get the address of the work item.
  280. //
  281. workContext = CONTAINING_RECORD(
  282. listEntry,
  283. WORK_CONTEXT,
  284. ListEntry
  285. );
  286. ASSERT( KeGetCurrentIrql() == 0 );
  287. //
  288. // There is work available. It may be a work contect block or
  289. // an RFCB. (Blocking threads won't get RFCBs.)
  290. //
  291. ASSERT( (GET_BLOCK_TYPE(workContext) == BlockTypeWorkContextInitial) ||
  292. (GET_BLOCK_TYPE(workContext) == BlockTypeWorkContextNormal) ||
  293. (GET_BLOCK_TYPE(workContext) == BlockTypeWorkContextRaw) ||
  294. (GET_BLOCK_TYPE(workContext) == BlockTypeWorkContextSpecial) ||
  295. (GET_BLOCK_TYPE(workContext) == BlockTypeRfcb) );
  296. #if DBG
  297. if ( GET_BLOCK_TYPE( workContext ) == BlockTypeRfcb ) {
  298. ((PRFCB)workContext)->ListEntry.Flink =
  299. ((PRFCB)workContext)->ListEntry.Blink = NULL;
  300. }
  301. #endif
  302. IF_DEBUG(WORKER1) {
  303. KdPrint(( "WorkerThread working on work context %p", workContext ));
  304. }
  305. //
  306. // Make sure we have a resaonable idea of the system time
  307. //
  308. if( ++updateTime == TIME_SMB_INTERVAL ) {
  309. updateTime = 0;
  310. SET_SERVER_TIME( WorkQueue );
  311. }
  312. //
  313. // Update statistics.
  314. //
  315. if ( ++updateSmbCount == STATISTICS_SMB_INTERVAL ) {
  316. updateSmbCount = 0;
  317. GET_SERVER_TIME( WorkQueue, &timeDifference );
  318. timeDifference = timeDifference - workContext->Timestamp;
  319. ++(WorkQueue->stats.WorkItemsQueued.Count);
  320. WorkQueue->stats.WorkItemsQueued.Time.QuadPart += timeDifference;
  321. }
  322. {
  323. //
  324. // Put the workContext out relative to bp so we can find it later if we need
  325. // to debug. The block of memory we're writing to is likely already in cache,
  326. // so this should be relatively cheap.
  327. //
  328. PWORK_CONTEXT volatile savedWorkContext;
  329. savedWorkContext = workContext;
  330. }
  331. //
  332. // Make sure the WorkContext knows if it is on the blocking work queue
  333. //
  334. workContext->UsingBlockingThread = iAmBlockingThread;
  335. workContext->UsingLpcThread = iAmLpcThread;
  336. //
  337. // Call the restart routine for the work item.
  338. //
  339. IF_SMB_DEBUG( TRACE ) {
  340. KdPrint(( "Blocking %d, Count %d -> %p( %p )\n",
  341. iAmBlockingThread,
  342. workContext->ProcessingCount,
  343. workContext->FspRestartRoutine,
  344. workContext
  345. ));
  346. }
  347. workContext->FspRestartRoutine( workContext );
  348. //
  349. // Make sure we are still at normal level.
  350. //
  351. ASSERT( KeGetCurrentIrql() == 0 );
  352. //
  353. // We're getting ready to be available (i.e. waiting on the queue)
  354. //
  355. InterlockedIncrement( &WorkQueue->AvailableThreads );
  356. }
  357. } // WorkerThread
  358. VOID SRVFASTCALL
  359. SrvQueueWorkToBlockingThread (
  360. IN OUT PWORK_CONTEXT WorkContext
  361. )
  362. /*++
  363. Routine Description:
  364. This routine queues a work item to a blocking thread. These threads
  365. are used to service requests that may block for a long time, so we
  366. don't want to tie up our normal worker threads.
  367. Arguments:
  368. WorkContext - Supplies a pointer to the work context block
  369. representing the work item
  370. Return Value:
  371. None.
  372. --*/
  373. {
  374. //
  375. // Increment the processing count.
  376. //
  377. WorkContext->ProcessingCount++;
  378. //
  379. // Insert the work item at the tail of the blocking work queue.
  380. //
  381. SrvInsertWorkQueueTail(
  382. GET_BLOCKING_WORK_QUEUE(),
  383. (PQUEUEABLE_BLOCK_HEADER)WorkContext
  384. );
  385. return;
  386. } // SrvQueueWorkToBlockingThread
  387. NTSTATUS SRVFASTCALL
  388. SrvQueueWorkToLpcThread (
  389. IN OUT PWORK_CONTEXT WorkContext,
  390. IN BOOLEAN ThrottleRequest
  391. )
  392. /*++
  393. Routine Description:
  394. This routine queues a work item to a blocking thread. These threads
  395. are used to service requests that may block for a long time, so we
  396. don't want to tie up our normal worker threads.
  397. Arguments:
  398. WorkContext - Supplies a pointer to the work context block
  399. representing the work item
  400. Return Value:
  401. None.
  402. --*/
  403. {
  404. // The most LPC calls we allow is the number of work items / 4 with a maximum of 250.
  405. // The minimum is 4. If the SrvSvc is not processing it at this fast of a rate, we need
  406. // to turn them away.
  407. if ( ThrottleRequest &&
  408. (KeReadStateQueue( &SrvLpcWorkQueue.Queue ) > (LONG)MAX( 4, MIN(SrvMaxReceiveWorkItemCount>>2, 250) )) ) {
  409. return STATUS_INSUFFICIENT_RESOURCES;
  410. }
  411. //
  412. // Increment the processing count.
  413. //
  414. WorkContext->ProcessingCount++;
  415. //
  416. // Insert the work item at the tail of the blocking work queue.
  417. //
  418. SrvInsertWorkQueueTail(
  419. &SrvLpcWorkQueue,
  420. (PQUEUEABLE_BLOCK_HEADER)WorkContext
  421. );
  422. return STATUS_SUCCESS;
  423. } // SrvQueueWorkToBlockingThread
  424. VOID SRVFASTCALL
  425. SrvQueueWorkToFsp (
  426. IN OUT PWORK_CONTEXT WorkContext
  427. )
  428. /*++
  429. Routine Description:
  430. This is the restart routine for work items that are to be queued to
  431. a nonblocking worker thread in the FSP. This function is also
  432. called from elsewhere in the server to transfer work to the FSP.
  433. This function should not be called at dispatch level -- use
  434. SrvQueueWorkToFspAtDpcLevel instead.
  435. Arguments:
  436. WorkContext - Supplies a pointer to the work context block
  437. representing the work item
  438. Return Value:
  439. None.
  440. --*/
  441. {
  442. //
  443. // Increment the processing count.
  444. //
  445. WorkContext->ProcessingCount++;
  446. //
  447. // Insert the work item at the tail of the nonblocking work queue.
  448. //
  449. if( WorkContext->QueueToHead ) {
  450. SrvInsertWorkQueueHead(
  451. WorkContext->CurrentWorkQueue,
  452. (PQUEUEABLE_BLOCK_HEADER)WorkContext
  453. );
  454. } else {
  455. SrvInsertWorkQueueTail(
  456. WorkContext->CurrentWorkQueue,
  457. (PQUEUEABLE_BLOCK_HEADER)WorkContext
  458. );
  459. }
  460. } // SrvQueueWorkToFsp
  461. NTSTATUS
  462. SrvQueueWorkToFspAtSendCompletion (
  463. IN PDEVICE_OBJECT DeviceObject,
  464. IN PIRP Irp,
  465. IN PWORK_CONTEXT WorkContext
  466. )
  467. /*++
  468. Routine Description:
  469. Send completion handler for work items that are to be queued to
  470. a nonblocking worker thread in the FSP. This function is also
  471. called from elsewhere in the server to transfer work to the FSP.
  472. This function should not be called at dispatch level -- use
  473. SrvQueueWorkToFspAtDpcLevel instead.
  474. Arguments:
  475. DeviceObject - Pointer to target device object for the request.
  476. Irp - Pointer to I/O request packet
  477. WorkContext - Caller-specified context parameter associated with IRP.
  478. This is actually a pointer to a Work Context block.
  479. Return Value:
  480. STATUS_MORE_PROCESSING_REQUIRED.
  481. --*/
  482. {
  483. //
  484. // Check the status of the send completion.
  485. //
  486. CHECK_SEND_COMPLETION_STATUS( Irp->IoStatus.Status );
  487. //
  488. // Reset the IRP cancelled bit.
  489. //
  490. Irp->Cancel = FALSE;
  491. //
  492. // Increment the processing count.
  493. //
  494. WorkContext->ProcessingCount++;
  495. //
  496. // Insert the work item on the nonblocking work queue.
  497. //
  498. if( WorkContext->QueueToHead ) {
  499. SrvInsertWorkQueueHead(
  500. WorkContext->CurrentWorkQueue,
  501. (PQUEUEABLE_BLOCK_HEADER)WorkContext
  502. );
  503. } else {
  504. SrvInsertWorkQueueTail(
  505. WorkContext->CurrentWorkQueue,
  506. (PQUEUEABLE_BLOCK_HEADER)WorkContext
  507. );
  508. }
  509. return STATUS_MORE_PROCESSING_REQUIRED;
  510. } // SrvQueueWorkToFspAtSendCompletion
  511. VOID SRVFASTCALL
  512. SrvTerminateWorkerThread (
  513. IN OUT PWORK_CONTEXT WorkItem OPTIONAL
  514. )
  515. /*++
  516. Routine Description:
  517. This routine is called when a thread is being requested to terminate. There
  518. are two cases when this happens. One is at server shutdown -- in this
  519. case we need to keep requeueing the termination request until all the
  520. threads on the queue have terminated. The other time is if a thread has
  521. not received work for some amount of time.
  522. --*/
  523. {
  524. LONG priority = 16;
  525. //
  526. // Raise our priority to ensure that this thread has a chance to get completely
  527. // done before the main thread causes the driver to unload or something
  528. //
  529. NtSetInformationThread (
  530. NtCurrentThread( ),
  531. ThreadBasePriority,
  532. &priority,
  533. sizeof(priority)
  534. );
  535. if( ARGUMENT_PRESENT( WorkItem ) &&
  536. InterlockedDecrement( &WorkItem->CurrentWorkQueue->Threads ) != 0 ) {
  537. //
  538. // We are being asked to terminate all of the worker threads on this queue.
  539. // So, if we're not the last thread, we should requeue the workitem so
  540. // the other threads will terminate
  541. //
  542. //
  543. // There are still other threads servicing this queue, so requeue
  544. // the workitem
  545. //
  546. SrvInsertWorkQueueTail( WorkItem->CurrentWorkQueue,
  547. (PQUEUEABLE_BLOCK_HEADER)WorkItem );
  548. }
  549. PsTerminateSystemThread( STATUS_SUCCESS ); // no return;
  550. }
  551. #if MULTIPROCESSOR
  552. VOID
  553. SrvBalanceLoad(
  554. IN PCONNECTION connection
  555. )
  556. /*++
  557. Routine Description:
  558. Ensure that the processor handling 'connection' is the best one
  559. for the job. This routine is called periodically per connection from
  560. DPC level. It can not be paged.
  561. Arguments:
  562. connection - the connection to inspect
  563. Return Value:
  564. none.
  565. --*/
  566. {
  567. ULONG MyQueueLength, OtherQueueLength;
  568. ULONG i;
  569. PWORK_QUEUE tmpqueue;
  570. PWORK_QUEUE queue = connection->CurrentWorkQueue;
  571. ASSERT( queue >= SrvWorkQueues );
  572. ASSERT( queue < eSrvWorkQueues );
  573. //
  574. // Reset the countdown. After the client performs BalanceCount
  575. // more operations, we'll call this routine again.
  576. //
  577. connection->BalanceCount = SrvBalanceCount;
  578. //
  579. // Figure out the load on the current work queue. The load is
  580. // the sum of the average work queue depth and the current work
  581. // queue depth. This gives us some history mixed in with the
  582. // load *right now*
  583. //
  584. MyQueueLength = queue->AvgQueueDepthSum >> LOG2_QUEUE_SAMPLES;
  585. MyQueueLength += KeReadStateQueue( &queue->Queue );
  586. //
  587. // If we are not on our preferred queue, look to see if we want to
  588. // go back to it. The preferred queue is the queue for the processor
  589. // handling this client's network card DPCs. We prefer to run on that
  590. // processor to avoid sloshing data between CPUs in an MP system.
  591. //
  592. tmpqueue = connection->PreferredWorkQueue;
  593. ASSERT( tmpqueue >= SrvWorkQueues );
  594. ASSERT( tmpqueue < eSrvWorkQueues );
  595. if( tmpqueue != queue ) {
  596. //
  597. // We are not queueing to our preferred queue. See if we
  598. // should go back to our preferred queue
  599. //
  600. ULONG PreferredQueueLength;
  601. PreferredQueueLength = tmpqueue->AvgQueueDepthSum >> LOG2_QUEUE_SAMPLES;
  602. PreferredQueueLength += KeReadStateQueue( &tmpqueue->Queue );
  603. if( PreferredQueueLength <= MyQueueLength + SrvPreferredAffinity ) {
  604. //
  605. // We want to switch back to our preferred processor!
  606. //
  607. IF_DEBUG( REBALANCE ) {
  608. KdPrint(( "%p C%d(%p) > P%p(%d)\n",
  609. connection,
  610. MyQueueLength,
  611. (PVOID)(connection->CurrentWorkQueue - SrvWorkQueues),
  612. (PVOID)(tmpqueue - SrvWorkQueues),
  613. PreferredQueueLength ));
  614. }
  615. InterlockedDecrement( &queue->CurrentClients );
  616. InterlockedExchangePointer( &connection->CurrentWorkQueue, tmpqueue );
  617. InterlockedIncrement( &tmpqueue->CurrentClients );
  618. SrvReBalanced++;
  619. return;
  620. }
  621. }
  622. //
  623. // We didn't hop to the preferred processor, so let's see if
  624. // another processor looks more lightly loaded than we are.
  625. //
  626. //
  627. // SrvNextBalanceProcessor is the next processor we should consider
  628. // moving to. It is a global to ensure everybody doesn't pick the
  629. // the same processor as the next candidate.
  630. //
  631. tmpqueue = &SrvWorkQueues[ SrvNextBalanceProcessor ];
  632. //
  633. // Advance SrvNextBalanceProcessor to the next processor in the system
  634. //
  635. i = SrvNextBalanceProcessor + 1;
  636. if( i >= SrvNumberOfProcessors )
  637. i = 0;
  638. SrvNextBalanceProcessor = i;
  639. //
  640. // Look at the other processors, and pick the next one which is doing
  641. // enough less work than we are to make the jump worthwhile
  642. //
  643. for( i = SrvNumberOfProcessors; i > 1; --i ) {
  644. ASSERT( tmpqueue >= SrvWorkQueues );
  645. ASSERT( tmpqueue < eSrvWorkQueues );
  646. OtherQueueLength = tmpqueue->AvgQueueDepthSum >> LOG2_QUEUE_SAMPLES;
  647. OtherQueueLength += KeReadStateQueue( &tmpqueue->Queue );
  648. if( OtherQueueLength + SrvOtherQueueAffinity < MyQueueLength ) {
  649. //
  650. // This processor looks promising. Switch to it
  651. //
  652. IF_DEBUG( REBALANCE ) {
  653. KdPrint(( "%p %c%p(%d) > %c%p(%d)\n",
  654. connection,
  655. queue == connection->PreferredWorkQueue ? 'P' : 'C',
  656. (PVOID)(queue - SrvWorkQueues),
  657. MyQueueLength,
  658. tmpqueue == connection->PreferredWorkQueue ? 'P' : 'C',
  659. (PVOID)(tmpqueue - SrvWorkQueues),
  660. OtherQueueLength ));
  661. }
  662. InterlockedDecrement( &queue->CurrentClients );
  663. InterlockedExchangePointer( &connection->CurrentWorkQueue, tmpqueue );
  664. InterlockedIncrement( &tmpqueue->CurrentClients );
  665. SrvReBalanced++;
  666. return;
  667. }
  668. if( ++tmpqueue == eSrvWorkQueues )
  669. tmpqueue = SrvWorkQueues;
  670. }
  671. //
  672. // No rebalancing necessary
  673. //
  674. return;
  675. }
  676. #endif