#pragma hdrstop #include #include "TSQ.h" #include "TSQPublic.h" #define TSQ_TAG 'QST ' //================================================================================= // // TSInitQueue // This function initializes the TS queue with the specified parameters. // // Inputs: // Flags: Properties of the TS Queue. // MaxThreads: Max number of worker threads to process the item. // // Return value: // Pointer to the TS Queue. NULL if init failed for any reason. // BUGBUG: Better to have a status: // Access denied, Invalid parameters, memory failure or success. // //================================================================================= PVOID TSInitQueue( IN ULONG Flags, IN ULONG MaxThreads, IN PDEVICE_OBJECT pDeviceObject ) { PTSQUEUE pTsQueue = NULL; // Validate the inputs. if ( ( Flags & TSQUEUE_BEING_DELETED ) || ( MaxThreads > MAX_WORKITEMS ) || ( pDeviceObject == NULL ) ) { // BUGBUG: Ideally should check all the bits in the flags using mask. goto Exit; } // If the caller wants TS Queue to use its own thread, then caller must be running at PASSIVE_LEVEL. if ( ( KeGetCurrentIrql() != PASSIVE_LEVEL ) && ( Flags & TSQUEUE_OWN_THREAD ) ) { goto Exit; } // Allocate space for the new TS Queue. pTsQueue = (PTSQUEUE) ExAllocatePoolWithTag( NonPagedPool, sizeof( TSQUEUE ), TSQ_TAG ); if (pTsQueue == NULL) { goto Exit; } // Initialize the terminate event. KeInitializeEvent( &pTsQueue->TerminateEvent, NotificationEvent, FALSE ); // Initialize the TS Queue spin lock. KeInitializeSpinLock( &pTsQueue->TsqSpinLock ); // Initialize the list of work items and number of items being processed. InitializeListHead( &pTsQueue->WorkItemsHead ); pTsQueue->ThreadsCount = 0; // Initialize the rest of the TS Queue fields as specified in the inputs. pTsQueue->Flags = Flags; pTsQueue->MaxThreads = MaxThreads; pTsQueue->pDeviceObject = pDeviceObject; Exit: return ( PVOID ) pTsQueue; } //================================================================================= // // TSAddWorkItemToQueue // This function allocates a work item (TSQ type) and adds it to the queue // from where it is processed by either system queue thread or TS queue // worker thread. // // Inputs: // TS Queue: To which the work item is to be added. // pContext: Caller context. // CallBack: The user's callback routine // // Return value: // Status of the operation: // STATUS_INVALID_PARAMETER: Incorrect TS Queue pointer, OR // STATUS_ACCESS_DENIED: The queue is being deleted, OR // STATUS_NO_MEMORY: Insufficient resources OR // STATUS_SUCCESS: Operation successful. // //================================================================================= NTSTATUS TSAddWorkItemToQueue( IN PTSQUEUE pTsQueue, // Pointer to the TS Queue. IN PVOID pContext, // Context. IN PTSQ_CALLBACK pCallBack // Callback function. ) { KIRQL Irql; NTSTATUS Status; PTSQUEUE_WORK_ITEM pWorkItem = NULL; HANDLE ThreadHandle; // Check if the Input TS Queue pointer is valid. // May be we need a better error check on TS Queue pointer here (Like use of signature) // I don't need to care about validity of the other parameters. if ( pTsQueue == NULL ) { return STATUS_INVALID_PARAMETER; } // Allocate space for the new work item (TSQ type). // NOTE: Allocation is done before validation, because this is a costly operation and we // don't want to do it with spin-lock held for perf reasons. pWorkItem = (PTSQUEUE_WORK_ITEM) ExAllocatePoolWithTag( NonPagedPool, sizeof( TSQUEUE_WORK_ITEM ), TSQ_TAG ); if ( pWorkItem == NULL ) { return STATUS_NO_MEMORY; } // Initialize the TSQ work item. pWorkItem->pContext = pContext; pWorkItem->pCallBack = pCallBack; // Acquire the Queue spin lock first. KeAcquireSpinLock( &pTsQueue->TsqSpinLock, &Irql ); // Check if this queue is being deleted. If so, return error. if ( pTsQueue->Flags & TSQUEUE_BEING_DELETED ) { KeReleaseSpinLock( &pTsQueue->TsqSpinLock, Irql ); if ( pWorkItem ) { ExFreePool( pWorkItem ); } return STATUS_ACCESS_DENIED; } // All right, insert the work item in TS queue. InsertTailList( &pTsQueue->WorkItemsHead, &pWorkItem->Links ); // NOTE: Once the work item is queued, we are going to return status success anyway. // Failed cases, if any, will be handled either by already running worker threads or // later when the queue is deleted. // Check if we need to start another worker thread. if ( pTsQueue->ThreadsCount >= pTsQueue->MaxThreads ) { // Do nothing else, when there are already enough number of worker threads serving the queue. KeReleaseSpinLock( &pTsQueue->TsqSpinLock, Irql ); return STATUS_SUCCESS; } // We are about to start a new thread (own thread or system thread). // So, increment the thread count and release the spin lock. pTsQueue->ThreadsCount ++; KeReleaseSpinLock( &pTsQueue->TsqSpinLock, Irql ); // Check if we are allowed to start a worker thread. if ( ( pTsQueue->Flags & TSQUEUE_OWN_THREAD ) && ( KeGetCurrentIrql() == PASSIVE_LEVEL ) ) { // We can create our own thread for processing work item. Status = PsCreateSystemThread( &ThreadHandle, THREAD_ALL_ACCESS, NULL, NULL, NULL, (PKSTART_ROUTINE) TSQueueWorker, (PVOID) pTsQueue ); if ( Status != STATUS_SUCCESS ) { goto QueueError; } ZwClose( ThreadHandle ); } else { // Means, we can not create thread. Then call IOQueueWorkItem. PIO_WORKITEM pIoWorkItem = NULL; PTSQ_CONTEXT pTsqContext = NULL; WORK_QUEUE_TYPE QueueType = ( pTsQueue->Flags & TSQUEUE_CRITICAL ) ? CriticalWorkQueue : DelayedWorkQueue; // Allocate space for TSQ context. pTsqContext = (PTSQ_CONTEXT) ExAllocatePoolWithTag( NonPagedPool, sizeof( TSQ_CONTEXT ), TSQ_TAG ); if ( pTsqContext == NULL ) { goto QueueError; } // Allocate the IO work item. pIoWorkItem = IoAllocateWorkItem( pTsQueue->pDeviceObject ); if ( pIoWorkItem == NULL ) { ExFreePool( pTsqContext ); goto QueueError; } // Initialize the TSQ context and queue the work item in the system queue. pTsqContext->pTsQueue = pTsQueue; pTsqContext->pWorkItem = pIoWorkItem; // This is IO work item. IoQueueWorkItem( pIoWorkItem, ( PIO_WORKITEM_ROUTINE )TSQueueCallback, QueueType, (PVOID) pTsqContext ); } return STATUS_SUCCESS; QueueError: KeAcquireSpinLock( &pTsQueue->TsqSpinLock, &Irql ); pTsQueue->ThreadsCount --; // If the thread count is zero, we are the last one who finished processing work items. // Now if the queue is marked "being deleted", we should set the Terminate event. if ( ( pTsQueue->Flags & TSQUEUE_BEING_DELETED ) && ( pTsQueue->ThreadsCount == 0 ) ) { KeSetEvent( &pTsQueue->TerminateEvent, 0, FALSE ); } KeReleaseSpinLock( &pTsQueue->TsqSpinLock, Irql ); return STATUS_SUCCESS; } //================================================================================= // // TSDeleteQueue // This function deletes the specified TS Queue. It first processes all the // pending work items before deleting. // // Inputs: // TS Queue: To be deleted. // // Return value: // STATUS_SUCCESS: Operation successful. // BUGBUG: Wondering why we have this. // //================================================================================= NTSTATUS TSDeleteQueue(PVOID pTsq) { KIRQL Irql; PTSQUEUE pTsQueue = (PTSQUEUE) pTsq; NTSTATUS Status; // BUGBUG: There should be better way of checking TS Queue pointer (Use of signature or alike). if ( pTsQueue == NULL ) { return STATUS_SUCCESS; } KeAcquireSpinLock( &pTsQueue->TsqSpinLock, &Irql ); // Check if the queue is already being deleted. // It should not happen, but just in case, if the driver is not good. if ( pTsQueue->Flags & TSQUEUE_BEING_DELETED ) { ASSERT( FALSE ); return STATUS_ACCESS_DENIED; } // Mark the queue "being deleted", so that it won't accept any new work items. pTsQueue->Flags |= TSQUEUE_BEING_DELETED; // Now help other worker threads in processing the pending work items on the queue. // So, increment the thread count, which will be decremented in the TSQueueWorker routine. pTsQueue->ThreadsCount ++; KeReleaseSpinLock( &pTsQueue->TsqSpinLock, Irql ); // NOTE: This will also clean up the queue, if there are work items in the queue and no // worker threads to process them. TSQueueWorker( pTsQueue ); // It is still possible that other threads are still working with their work items. // So, we can not just go and free the queue. So wait for the termination event. KeWaitForSingleObject( &pTsQueue->TerminateEvent, Executive, KernelMode, TRUE, NULL ); // BUGBUG: Now the worker threads have set the event, but they have done that while holding // the spin lock. If we free the TS queue right away, they will access NULL pointer and // bug-check. So, acquire spin-lock, so that the thread which set the event will release it. // We know that there will be only one such thread at a give time. So, we don't need ref-count // now. But using ref-count is more eligent solution here. KeAcquireSpinLock( &pTsQueue->TsqSpinLock, &Irql ); KeReleaseSpinLock( &pTsQueue->TsqSpinLock, Irql ); // We are all done. // Clean up the space allocated for the TS queue. ExFreePool( pTsQueue ); pTsQueue = NULL; return STATUS_SUCCESS; } //================================================================================= // // TSQueueWorker // This is a worker thread for the TS Queue, which goes through the queue and // processes the pending work items (TSQ type) one by one. // //================================================================================= void TSQueueWorker(PTSQUEUE pTsQueue) { PLIST_ENTRY Item; PTSQUEUE_WORK_ITEM pWorkItem; KIRQL Irql; // Acquire the Queue spin lock first. KeAcquireSpinLock( &pTsQueue->TsqSpinLock, &Irql ); // Process the work items on the queue, while the queue is not empty while( !IsListEmpty( &pTsQueue->WorkItemsHead ) ) { // Get the next TSQ work item from the queue. Item = RemoveHeadList( &pTsQueue->WorkItemsHead ); pWorkItem = CONTAINING_RECORD( Item, TSQUEUE_WORK_ITEM, Links ); // Release the Queue spin lock. KeReleaseSpinLock( &pTsQueue->TsqSpinLock, Irql ); // Call the callback routine specified in the work item. if ( pWorkItem->pCallBack ) { ( *pWorkItem->pCallBack ) ( pTsQueue->pDeviceObject, pWorkItem->pContext ); } // Free the TSQ work item. ExFreePool( pWorkItem ); // Acquire the Queue spin lock again. KeAcquireSpinLock( &pTsQueue->TsqSpinLock, &Irql ); } // We are done processing the work items. This thread is going to exit now. // Decrememnt the thread count so that next work item gets processed by a new thread // or queued in the system queue. pTsQueue->ThreadsCount --; // If the thread count is zero, we are the last one who finished processing work items. // Now if the queue is marked "being deleted", we should set the Terminate event. if ( ( pTsQueue->Flags & TSQUEUE_BEING_DELETED ) && ( pTsQueue->ThreadsCount == 0 ) ) { KeSetEvent( &pTsQueue->TerminateEvent, 0, FALSE ); } KeReleaseSpinLock( &pTsQueue->TsqSpinLock, Irql ); } //================================================================================= // // TSQueueCallback // This is the callback routine we specify, when we use system queue for // processing the TSQ work item. This will in turn call the routine that // TS Queue worker thread executes. And that routine will process all the // pending work items from the queue. We use this callback routine just for // cleaning up the IO work item that we allocated for using system queue. // //================================================================================= void TSQueueCallback(PDEVICE_OBJECT pDeviceObject, PVOID pContext) { PTSQ_CONTEXT pTsqContext = (PTSQ_CONTEXT) pContext; // BUGBUG: It's better to have a check on pDeviceObject. // If input context here is NULL, then we sure have a big problem in system worker queue implementation. ASSERT( pTsqContext != NULL ); // Process the TSQ work items on the queue. TSQueueWorker( pTsqContext->pTsQueue ); // Cleanup the IO work Item. if ( pTsqContext->pWorkItem ) { IoFreeWorkItem( pTsqContext->pWorkItem ); pTsqContext->pWorkItem = NULL; } // Free TSQ context. ExFreePool( pTsqContext ); }