/*++ Copyright (c) 1989 Microsoft Corporation Module Name: worker.c Abstract: This module implements the LAN Manager server FSP worker thread function. It also implements routines for managing (i.e., starting and stopping) worker threads, and balancing load. Author: Chuck Lenzmeier (chuckl) 01-Oct-1989 David Treadwell (davidtr) Environment: Kernel mode Revision History: --*/ #include "precomp.h" #include "worker.tmh" #pragma hdrstop #define BugCheckFileId SRV_FILE_WORKER // // Local declarations // NTSTATUS CreateQueueThread ( IN PWORK_QUEUE Queue ); VOID InitializeWorkerThread ( IN PWORK_QUEUE WorkQueue, IN KPRIORITY ThreadPriority ); VOID WorkerThread ( IN PWORK_QUEUE WorkQueue ); #ifdef ALLOC_PRAGMA #pragma alloc_text( PAGE, SrvCreateWorkerThreads ) #pragma alloc_text( PAGE, CreateQueueThread ) #pragma alloc_text( PAGE, InitializeWorkerThread ) #pragma alloc_text( PAGE, WorkerThread ) #endif #if 0 NOT PAGEABLE -- SrvQueueWorkToBlockingThread NOT PAGEABLE -- SrvQueueWorkToFsp NOT PAGEABLE -- SrvQueueWorkToFspAtSendCompletion NOT PAGEABLE -- SrvBalanceLoad #endif NTSTATUS SrvCreateWorkerThreads ( VOID ) /*++ Routine Description: This function creates the worker threads for the LAN Manager server FSP. Arguments: None. Return Value: NTSTATUS - Status of thread creation --*/ { NTSTATUS status; PWORK_QUEUE queue; PAGED_CODE( ); // // Create the nonblocking worker threads. // for( queue = SrvWorkQueues; queue < eSrvWorkQueues; queue++ ) { status = CreateQueueThread( queue ); if( !NT_SUCCESS( status ) ) { return status; } } // // Create the blocking worker threads // return CreateQueueThread( &SrvBlockingWorkQueue ); } // SrvCreateWorkerThreads NTSTATUS CreateQueueThread ( IN PWORK_QUEUE Queue ) /*++ Routine Description: This function creates a worker thread to service a queue. NOTE: The scavenger occasionally kills off threads on a queue. If logic here is modified, you may need to look there too. Arguments: Queue - the queue to service Return Value: NTSTATUS - Status of thread creation --*/ { HANDLE threadHandle; LARGE_INTEGER interval; NTSTATUS status; PAGED_CODE(); // // Another thread is coming into being. Keep the counts up to date // InterlockedIncrement( &Queue->Threads ); InterlockedIncrement( &Queue->AvailableThreads ); status = PsCreateSystemThread( &threadHandle, PROCESS_ALL_ACCESS, NULL, NtCurrentProcess(), NULL, WorkerThread, Queue ); if ( !NT_SUCCESS(status) ) { INTERNAL_ERROR( ERROR_LEVEL_EXPECTED, "CreateQueueThread: PsCreateSystemThread for " "queue %X returned %X", Queue, status ); InterlockedDecrement( &Queue->Threads ); InterlockedDecrement( &Queue->AvailableThreads ); SrvLogServiceFailure( SRV_SVC_PS_CREATE_SYSTEM_THREAD, status ); return status; } // // Close the handle so the thread can die when needed // SrvNtClose( threadHandle, FALSE ); // // If we just created the first queue thread, wait for it // to store its thread pointer in IrpThread. This pointer is // stored in all IRPs issued for this queue by the server. // while ( Queue->IrpThread == NULL ) { interval.QuadPart = -1*10*1000*10; // .01 second KeDelayExecutionThread( KernelMode, FALSE, &interval ); } return STATUS_SUCCESS; } // CreateQueueThread VOID InitializeWorkerThread ( IN PWORK_QUEUE WorkQueue, IN KPRIORITY ThreadPriority ) { NTSTATUS status; KPRIORITY basePriority; PAGED_CODE( ); #if SRVDBG_LOCK { // // Create a special system thread TEB. The size of this TEB is just // large enough to accommodate the first three user-reserved // longwords. These three locations are used for lock debugging. If // the allocation fails, then no lock debugging will be performed // for this thread. // // PETHREAD Thread = PsGetCurrentThread( ); ULONG TebSize = FIELD_OFFSET( TEB, UserReserved[0] ) + SRV_TEB_USER_SIZE; Thread->Tcb.Teb = ExAllocatePoolWithTag( NonPagedPool, TebSize, BlockTypeMisc ); if ( Thread->Tcb.Teb != NULL ) { RtlZeroMemory( Thread->Tcb.Teb, TebSize ); } } #endif // SRVDBG_LOCK // // Set this thread's priority. // basePriority = ThreadPriority; status = NtSetInformationThread ( NtCurrentThread( ), ThreadBasePriority, &basePriority, sizeof(basePriority) ); if ( !NT_SUCCESS(status) ) { INTERNAL_ERROR( ERROR_LEVEL_UNEXPECTED, "InitializeWorkerThread: NtSetInformationThread failed: %X\n", status, NULL ); SrvLogServiceFailure( SRV_SVC_NT_SET_INFO_THREAD, status ); } #if MULTIPROCESSOR // // If this is a nonblocking worker thread, set its ideal processor affinity. Setting // ideal affinity informs ntos that the thread would rather run on its ideal // processor if reasonable, but if ntos can't schedule it on that processor then it is // ok to schedule it on a different processor. // if( SrvNumberOfProcessors > 1 && WorkQueue >= SrvWorkQueues && WorkQueue < eSrvWorkQueues ) { KeSetIdealProcessorThread( KeGetCurrentThread(), (CCHAR)(WorkQueue - SrvWorkQueues) ); } #endif // // Disable hard error popups for this thread. // IoSetThreadHardErrorMode( FALSE ); return; } // InitializeWorkerThread VOID WorkerThread ( IN PWORK_QUEUE WorkQueue ) { PLIST_ENTRY listEntry; PWORK_CONTEXT workContext; ULONG timeDifference; ULONG updateSmbCount = 0; ULONG updateTime = 0; ULONG iAmBlockingThread = (WorkQueue == &SrvBlockingWorkQueue); PLARGE_INTEGER Timeout = NULL; PAGED_CODE(); // // If this is the first worker thread, save the thread pointer. // if( WorkQueue->IrpThread == NULL ) { WorkQueue->IrpThread = PsGetCurrentThread( ); } InitializeWorkerThread( WorkQueue, SrvThreadPriority ); // // If we are the IrpThread, we don't want to die // if( WorkQueue->IrpThread != PsGetCurrentThread( ) ) { Timeout = &WorkQueue->IdleTimeOut; } // // Loop infinitely dequeueing and processing work items. // while ( TRUE ) { listEntry = KeRemoveQueue( &WorkQueue->Queue, WorkQueue->WaitMode, Timeout ); if( (ULONG_PTR)listEntry == STATUS_TIMEOUT ) { // // We have a non critical thread that hasn't gotten any work for // awhile. Time to die. // InterlockedDecrement( &WorkQueue->AvailableThreads ); InterlockedDecrement( &WorkQueue->Threads ); SrvTerminateWorkerThread( NULL ); } if( InterlockedDecrement( &WorkQueue->AvailableThreads ) == 0 && !SrvFspTransitioning && WorkQueue->Threads < WorkQueue->MaxThreads ) { // // We are running low on threads for this queue. Spin up // another one before handling this request // CreateQueueThread( WorkQueue ); } // // Get the address of the work item. // workContext = CONTAINING_RECORD( listEntry, WORK_CONTEXT, ListEntry ); ASSERT( KeGetCurrentIrql() == 0 ); // // There is work available. It may be a work contect block or // an RFCB. (Blocking threads won't get RFCBs.) // ASSERT( (GET_BLOCK_TYPE(workContext) == BlockTypeWorkContextInitial) || (GET_BLOCK_TYPE(workContext) == BlockTypeWorkContextNormal) || (GET_BLOCK_TYPE(workContext) == BlockTypeWorkContextRaw) || (GET_BLOCK_TYPE(workContext) == BlockTypeWorkContextSpecial) || (GET_BLOCK_TYPE(workContext) == BlockTypeRfcb) ); #if DBG if ( GET_BLOCK_TYPE( workContext ) == BlockTypeRfcb ) { ((PRFCB)workContext)->ListEntry.Flink = ((PRFCB)workContext)->ListEntry.Blink = NULL; } #endif IF_DEBUG(WORKER1) { KdPrint(( "WorkerThread working on work context %p", workContext )); } // // Make sure we have a resaonable idea of the system time // if( ++updateTime == TIME_SMB_INTERVAL ) { updateTime = 0; SET_SERVER_TIME( WorkQueue ); } // // Update statistics. // if ( ++updateSmbCount == STATISTICS_SMB_INTERVAL ) { updateSmbCount = 0; GET_SERVER_TIME( WorkQueue, &timeDifference ); timeDifference = timeDifference - workContext->Timestamp; ++(WorkQueue->stats.WorkItemsQueued.Count); WorkQueue->stats.WorkItemsQueued.Time.QuadPart += timeDifference; } { // // Put the workContext out relative to bp so we can find it later if we need // to debug. The block of memory we're writing to is likely already in cache, // so this should be relatively cheap. // PWORK_CONTEXT volatile savedWorkContext; savedWorkContext = workContext; } // // Make sure the WorkContext knows if it is on the blocking work queue // workContext->UsingBlockingThread = iAmBlockingThread; // // Call the restart routine for the work item. // IF_SMB_DEBUG( TRACE ) { KdPrint(( "Blocking %d, Count %d -> %p( %p )\n", iAmBlockingThread, workContext->ProcessingCount, workContext->FspRestartRoutine, workContext )); } workContext->FspRestartRoutine( workContext ); // // Make sure we are still at normal level. // ASSERT( KeGetCurrentIrql() == 0 ); // // We're getting ready to be available (i.e. waiting on the queue) // InterlockedIncrement( &WorkQueue->AvailableThreads ); } } // WorkerThread VOID SRVFASTCALL SrvQueueWorkToBlockingThread ( IN OUT PWORK_CONTEXT WorkContext ) /*++ Routine Description: This routine queues a work item to a blocking thread. These threads are used to service requests that may block for a long time, so we don't want to tie up our normal worker threads. Arguments: WorkContext - Supplies a pointer to the work context block representing the work item Return Value: None. --*/ { // // Increment the processing count. // WorkContext->ProcessingCount++; // // Insert the work item at the tail of the blocking work queue. // SrvInsertWorkQueueTail( &SrvBlockingWorkQueue, (PQUEUEABLE_BLOCK_HEADER)WorkContext ); return; } // SrvQueueWorkToBlockingThread VOID SRVFASTCALL SrvQueueWorkToFsp ( IN OUT PWORK_CONTEXT WorkContext ) /*++ Routine Description: This is the restart routine for work items that are to be queued to a nonblocking worker thread in the FSP. This function is also called from elsewhere in the server to transfer work to the FSP. This function should not be called at dispatch level -- use SrvQueueWorkToFspAtDpcLevel instead. Arguments: WorkContext - Supplies a pointer to the work context block representing the work item Return Value: None. --*/ { // // Increment the processing count. // WorkContext->ProcessingCount++; // // Insert the work item at the tail of the nonblocking work queue. // if( WorkContext->QueueToHead ) { SrvInsertWorkQueueHead( WorkContext->CurrentWorkQueue, (PQUEUEABLE_BLOCK_HEADER)WorkContext ); } else { SrvInsertWorkQueueTail( WorkContext->CurrentWorkQueue, (PQUEUEABLE_BLOCK_HEADER)WorkContext ); } } // SrvQueueWorkToFsp NTSTATUS SrvQueueWorkToFspAtSendCompletion ( IN PDEVICE_OBJECT DeviceObject, IN PIRP Irp, IN PWORK_CONTEXT WorkContext ) /*++ Routine Description: Send completion handler for work items that are to be queued to a nonblocking worker thread in the FSP. This function is also called from elsewhere in the server to transfer work to the FSP. This function should not be called at dispatch level -- use SrvQueueWorkToFspAtDpcLevel instead. Arguments: DeviceObject - Pointer to target device object for the request. Irp - Pointer to I/O request packet WorkContext - Caller-specified context parameter associated with IRP. This is actually a pointer to a Work Context block. Return Value: STATUS_MORE_PROCESSING_REQUIRED. --*/ { // // Check the status of the send completion. // CHECK_SEND_COMPLETION_STATUS( Irp->IoStatus.Status ); // // Reset the IRP cancelled bit. // Irp->Cancel = FALSE; // // Increment the processing count. // WorkContext->ProcessingCount++; // // Insert the work item on the nonblocking work queue. // if( WorkContext->QueueToHead ) { SrvInsertWorkQueueHead( WorkContext->CurrentWorkQueue, (PQUEUEABLE_BLOCK_HEADER)WorkContext ); } else { SrvInsertWorkQueueTail( WorkContext->CurrentWorkQueue, (PQUEUEABLE_BLOCK_HEADER)WorkContext ); } return STATUS_MORE_PROCESSING_REQUIRED; } // SrvQueueWorkToFspAtSendCompletion VOID SRVFASTCALL SrvTerminateWorkerThread ( IN OUT PWORK_CONTEXT WorkItem OPTIONAL ) /*++ Routine Description: This routine is called when a thread is being requested to terminate. There are two cases when this happens. One is at server shutdown -- in this case we need to keep requeueing the termination request until all the threads on the queue have terminated. The other time is if a thread has not received work for some amount of time. --*/ { LONG priority = 16; // // Raise our priority to ensure that this thread has a chance to get completely // done before the main thread causes the driver to unload or something // NtSetInformationThread ( NtCurrentThread( ), ThreadBasePriority, &priority, sizeof(priority) ); if( ARGUMENT_PRESENT( WorkItem ) && InterlockedDecrement( &WorkItem->CurrentWorkQueue->Threads ) != 0 ) { // // We are being asked to terminate all of the worker threads on this queue. // So, if we're not the last thread, we should requeue the workitem so // the other threads will terminate // // // There are still other threads servicing this queue, so requeue // the workitem // SrvInsertWorkQueueTail( WorkItem->CurrentWorkQueue, (PQUEUEABLE_BLOCK_HEADER)WorkItem ); } PsTerminateSystemThread( STATUS_SUCCESS ); // no return; } #if MULTIPROCESSOR VOID SrvBalanceLoad( IN PCONNECTION connection ) /*++ Routine Description: Ensure that the processor handling 'connection' is the best one for the job. This routine is called periodically per connection from DPC level. It can not be paged. Arguments: connection - the connection to inspect Return Value: none. --*/ { ULONG MyQueueLength, OtherQueueLength; ULONG i; PWORK_QUEUE tmpqueue; PWORK_QUEUE queue = connection->CurrentWorkQueue; ASSERT( queue >= SrvWorkQueues ); ASSERT( queue < eSrvWorkQueues ); // // Reset the countdown. After the client performs BalanceCount // more operations, we'll call this routine again. // connection->BalanceCount = SrvBalanceCount; // // Figure out the load on the current work queue. The load is // the sum of the average work queue depth and the current work // queue depth. This gives us some history mixed in with the // load *right now* // MyQueueLength = queue->AvgQueueDepthSum >> LOG2_QUEUE_SAMPLES; MyQueueLength += KeReadStateQueue( &queue->Queue ); // // If we are not on our preferred queue, look to see if we want to // go back to it. The preferred queue is the queue for the processor // handling this client's network card DPCs. We prefer to run on that // processor to avoid sloshing data between CPUs in an MP system. // tmpqueue = connection->PreferredWorkQueue; ASSERT( tmpqueue >= SrvWorkQueues ); ASSERT( tmpqueue < eSrvWorkQueues ); if( tmpqueue != queue ) { // // We are not queueing to our preferred queue. See if we // should go back to our preferred queue // ULONG PreferredQueueLength; PreferredQueueLength = tmpqueue->AvgQueueDepthSum >> LOG2_QUEUE_SAMPLES; PreferredQueueLength += KeReadStateQueue( &tmpqueue->Queue ); if( PreferredQueueLength <= MyQueueLength + SrvPreferredAffinity ) { // // We want to switch back to our preferred processor! // IF_DEBUG( REBALANCE ) { KdPrint(( "%p C%d(%p) > P%p(%d)\n", connection, MyQueueLength, (PVOID)(connection->CurrentWorkQueue - SrvWorkQueues), (PVOID)(tmpqueue - SrvWorkQueues), PreferredQueueLength )); } InterlockedDecrement( &queue->CurrentClients ); InterlockedExchangePointer( &connection->CurrentWorkQueue, tmpqueue ); InterlockedIncrement( &tmpqueue->CurrentClients ); SrvReBalanced++; return; } } // // We didn't hop to the preferred processor, so let's see if // another processor looks more lightly loaded than we are. // // // SrvNextBalanceProcessor is the next processor we should consider // moving to. It is a global to ensure everybody doesn't pick the // the same processor as the next candidate. // tmpqueue = &SrvWorkQueues[ SrvNextBalanceProcessor ]; // // Advance SrvNextBalanceProcessor to the next processor in the system // i = SrvNextBalanceProcessor + 1; if( i >= SrvNumberOfProcessors ) i = 0; SrvNextBalanceProcessor = i; // // Look at the other processors, and pick the next one which is doing // enough less work than we are to make the jump worthwhile // for( i = SrvNumberOfProcessors; i > 1; --i ) { ASSERT( tmpqueue >= SrvWorkQueues ); ASSERT( tmpqueue < eSrvWorkQueues ); OtherQueueLength = tmpqueue->AvgQueueDepthSum >> LOG2_QUEUE_SAMPLES; OtherQueueLength += KeReadStateQueue( &tmpqueue->Queue ); if( OtherQueueLength + SrvOtherQueueAffinity < MyQueueLength ) { // // This processor looks promising. Switch to it // IF_DEBUG( REBALANCE ) { KdPrint(( "%p %c%p(%d) > %c%p(%d)\n", connection, queue == connection->PreferredWorkQueue ? 'P' : 'C', (PVOID)(queue - SrvWorkQueues), MyQueueLength, tmpqueue == connection->PreferredWorkQueue ? 'P' : 'C', (PVOID)(tmpqueue - SrvWorkQueues), OtherQueueLength )); } InterlockedDecrement( &queue->CurrentClients ); InterlockedExchangePointer( &connection->CurrentWorkQueue, tmpqueue ); InterlockedIncrement( &tmpqueue->CurrentClients ); SrvReBalanced++; return; } if( ++tmpqueue == eSrvWorkQueues ) tmpqueue = SrvWorkQueues; } // // No rebalancing necessary // return; } #endif