Leaked source code of windows server 2003
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

618 lines
21 KiB

  1. /*++
  2. Copyright (c) 1995 Microsoft Corporation
  3. Module Name:
  4. common\trace\worker.c
  5. Abstract:
  6. Worker threads for the router process
  7. Revision History:
  8. Gurdeep Singh Pall 7/28/95 Created
  9. 12-10-97: lokeshs: removed blocking of InitializeWorkerThread()
  10. on initialization of the first AlertableThread() created
  11. --*/
  12. #include <nt.h>
  13. #include <ntrtl.h>
  14. #include <nturtl.h>
  15. #include <windows.h>
  16. #include <stdio.h>
  17. #include <string.h>
  18. #include <malloc.h>
  19. #include <rtutils.h>
  20. //#define STRSAFE_LIB
  21. #include <strsafe.h>
  22. #include "trace.h"
  23. #include "workerinc.h"
  24. // Time that thread has to be idle before exiting
  25. LARGE_INTEGER ThreadIdleTO = {
  26. (ULONG)(THREAD_IDLE_TIMEOUT*(-10000000)),
  27. 0xFFFFFFFF};
  28. // Time that the worker queue is not served before starting new thread
  29. CONST LARGE_INTEGER WorkQueueTO = {
  30. (ULONG)(WORK_QUEUE_TIMEOUT*(-10000000)),
  31. 0xFFFFFFFF};
  32. // Total number of threads
  33. LONG ThreadCount;
  34. // Number of threads waiting on the completion port
  35. LONG ThreadsWaiting;
  36. // Min allowed number of threads
  37. LONG MinThreads;
  38. // Completion port for threads to wait on
  39. HANDLE WorkQueuePort;
  40. // Timer to intiate creation of new thread if worker queue is not
  41. // server within a timeout
  42. HANDLE WorkQueueTimer;
  43. // Queue for alertable work items
  44. LIST_ENTRY AlertableWorkQueue ;
  45. // Lock for the alertable work item queue
  46. CRITICAL_SECTION AlertableWorkQueueLock ;
  47. // Heap for alertable work items
  48. HANDLE AlertableWorkerHeap ;
  49. // Worker Semaphore used for releasing alertable worker threads
  50. HANDLE AlertableThreadSemaphore;
  51. // Number of alertable threads
  52. LONG AlertableThreadCount;
  53. volatile LONG WorkersInitialized=WORKERS_NOT_INITIALIZED;
  54. //* WorkerThread()
  55. //
  56. // Function: Thread to execute work items in.
  57. //
  58. // Returns: Nothing
  59. //
  60. //*
  61. DWORD APIENTRY
  62. WorkerThread (
  63. LPVOID param
  64. ) {
  65. // It'll be waiting
  66. InterlockedIncrement (&ThreadsWaiting);
  67. do {
  68. LPOVERLAPPED_COMPLETION_ROUTINE completionRoutine;
  69. NTSTATUS status;
  70. PVOID context;
  71. IO_STATUS_BLOCK ioStatus;
  72. status = NtRemoveIoCompletion (
  73. WorkQueuePort,
  74. (PVOID *)&completionRoutine,
  75. &context,
  76. &ioStatus,
  77. &ThreadIdleTO);
  78. if (NT_SUCCESS (status)) {
  79. switch (status) {
  80. // We did dequeue a work item
  81. case STATUS_SUCCESS:
  82. if (InterlockedExchangeAdd (&ThreadsWaiting, -1)==1) {
  83. // Last thread to wait, start the timer
  84. // to create a new thread
  85. SetWaitableTimer (WorkQueueTimer,
  86. &WorkQueueTO,
  87. 0,
  88. NULL, NULL,
  89. FALSE);
  90. }
  91. // Execute work item/completion routine
  92. completionRoutine (
  93. // Quick check for success that all work items
  94. // and most of IRP complete with
  95. (ioStatus.Status==STATUS_SUCCESS)
  96. ? NO_ERROR
  97. : RtlNtStatusToDosError (ioStatus.Status),
  98. (DWORD)ioStatus.Information,
  99. (LPOVERLAPPED)context);
  100. if (InterlockedExchangeAdd (&ThreadsWaiting, 1)==0) {
  101. // Cancel time if this is the first thread
  102. // to return
  103. CancelWaitableTimer (WorkQueueTimer);
  104. }
  105. break;
  106. // Thread was not used for ThreadIdle timeout, see
  107. // if we need to quit
  108. case STATUS_TIMEOUT:
  109. while (1) {
  110. // Make a local copy of the count and
  111. // attempt to atomically check and update
  112. // it if necessary
  113. LONG count = ThreadCount;
  114. // Quick check for min thread condition
  115. if (count<=MinThreads)
  116. break;
  117. else {
  118. // Attempt to decrease the count
  119. // use another local variable
  120. // because of MIPS optimizer bug
  121. LONG newCount = count-1;
  122. if (InterlockedCompareExchange (&ThreadCount,
  123. newCount, count)==count) {
  124. // Succeded, exit the thread
  125. goto ExitThread;
  126. }
  127. // else try again
  128. }
  129. }
  130. break;
  131. default:
  132. ASSERTMSG ("Unexpected status code returned ", FALSE);
  133. break;
  134. }
  135. }
  136. // Execute while we are initialized
  137. }
  138. while (WorkersInitialized==WORKERS_INITIALIZED);
  139. ExitThread:
  140. InterlockedDecrement (&ThreadsWaiting);
  141. return 0;
  142. }
  143. //* AlertableWorkerThread()
  144. //
  145. // Function: Alertable work item thread
  146. //
  147. // Returns: Nothing
  148. //
  149. //*
  150. DWORD APIENTRY
  151. AlertableWorkerThread (
  152. LPVOID param
  153. ) {
  154. HANDLE WaitArray [] = {
  155. #define ALERTABLE_THREAD_SEMAPHORE WAIT_OBJECT_0
  156. AlertableThreadSemaphore,
  157. #define WORK_QUEUE_TIMER (WAIT_OBJECT_0+1)
  158. WorkQueueTimer
  159. };
  160. do {
  161. WorkItem *workitem;
  162. DWORD rc;
  163. // Wait for signal to run
  164. //
  165. rc = WaitForMultipleObjectsEx (
  166. sizeof (WaitArray)/sizeof (WaitArray[0]),
  167. WaitArray,
  168. FALSE,
  169. INFINITE,
  170. TRUE);
  171. switch (rc) {
  172. case ALERTABLE_THREAD_SEMAPHORE:
  173. // Pick up and execute the worker
  174. EnterCriticalSection (&AlertableWorkQueueLock);
  175. ASSERT (!IsListEmpty (&AlertableWorkQueue));
  176. workitem = (WorkItem *) RemoveHeadList (&AlertableWorkQueue) ;
  177. LeaveCriticalSection (&AlertableWorkQueueLock);
  178. (workitem->WI_Function) (workitem->WI_Context);
  179. HeapFree (AlertableWorkerHeap, 0, workitem);
  180. break;
  181. case WORK_QUEUE_TIMER:
  182. // Work queue has not been served wothin specified
  183. // timeout
  184. while (1) {
  185. // Make a local copy of the count
  186. LONG count = ThreadCount;
  187. // Make sure we havn't exceded the limit
  188. if (count>=MAX_WORKER_THREADS)
  189. break;
  190. else {
  191. // Try to increment the value
  192. // use another local variable
  193. // because of MIPS optimizer bug
  194. LONG newCount = count+1;
  195. if (InterlockedCompareExchange (&ThreadCount,
  196. newCount, count)==count) {
  197. HANDLE hThread;
  198. DWORD tid;
  199. // Create new thread if increment succeded
  200. hThread = CreateThread (NULL, 0, WorkerThread, NULL, 0, &tid);
  201. if (hThread!=NULL) {
  202. CloseHandle (hThread);
  203. }
  204. else // Restore the value if thread creation
  205. // failed
  206. InterlockedDecrement (&ThreadCount);
  207. break;
  208. }
  209. // else repeat the loop if ThreadCount was modified
  210. // while we were checking
  211. }
  212. }
  213. break;
  214. case WAIT_IO_COMPLETION:
  215. // Handle IO completion
  216. break;
  217. case 0xFFFFFFFF:
  218. // Error, we must have closed the semaphore handle
  219. break;
  220. default:
  221. ASSERTMSG ("Unexpected rc from WaitForObject ", FALSE);
  222. }
  223. }
  224. while (WorkersInitialized==WORKERS_INITIALIZED);
  225. return 0 ;
  226. }
  227. //* WorkerCompletionRoutine
  228. //
  229. // Function: Worker function wrapper for non-io work items
  230. //
  231. VOID WINAPI
  232. WorkerCompletionRoutine (
  233. DWORD dwErrorCode,
  234. PVOID ActualContext,
  235. LPOVERLAPPED ActualCompletionRoutine
  236. ) {
  237. UNREFERENCED_PARAMETER (dwErrorCode);
  238. ((WORKERFUNCTION)ActualCompletionRoutine)(ActualContext);
  239. }
  240. //* InitializeWorkerThread()
  241. //
  242. // Function: Called by the first work item
  243. //
  244. // Returns: WORKERS_INITIALIZED if successful.
  245. // WORKERS_NOT_INITIALIZED not.
  246. //*
  247. LONG
  248. InitializeWorkerThread (
  249. LONG initFlag
  250. ) {
  251. DWORD dwErr;
  252. #if 0
  253. if (initFlag==WORKERS_INITIALIZING) {
  254. #if DBG
  255. DbgPrint ("RTUTILS: %lx - waiting for worker initialization.\n", GetCurrentThreadId ());
  256. #endif
  257. while (WorkersInitialized==WORKERS_INITIALIZING)
  258. Sleep (100);
  259. #if DBG
  260. DbgPrint ("RTUTILS: %lx - wait for worker initialization done.\n", GetCurrentThreadId ());
  261. #endif
  262. }
  263. if (WorkersInitialized==WORKERS_INITIALIZED) {
  264. return WORKERS_INITIALIZED;
  265. }
  266. else {
  267. INT i;
  268. DWORD tid;
  269. HANDLE threadhandle;
  270. SYSTEM_INFO systeminfo;
  271. // Get number of processors
  272. //
  273. GetSystemInfo (&systeminfo) ;
  274. MinThreads = systeminfo.dwNumberOfProcessors;
  275. ThreadsWaiting = 0;
  276. // Init critical section
  277. //
  278. InitializeCriticalSection (&AlertableWorkQueueLock);
  279. // Initialize work queue
  280. //
  281. InitializeListHead (&AlertableWorkQueue) ;
  282. // Allocate private heap
  283. //
  284. AlertableWorkerHeap = HeapCreate (0, // no flags
  285. systeminfo.dwPageSize,// initial heap size
  286. 0); // no maximum size
  287. if (AlertableWorkerHeap != NULL) {
  288. // Create counting semaphore for releasing alertable threads
  289. AlertableThreadSemaphore = CreateSemaphore(NULL, // No security
  290. 0, // Initial value
  291. MAXLONG, // Max items to queue
  292. NULL); // No name
  293. if (AlertableThreadSemaphore!=NULL) {
  294. // Create completion port for work items
  295. WorkQueuePort = CreateIoCompletionPort (
  296. INVALID_HANDLE_VALUE, // Just create a port, no file yet
  297. NULL, // New port
  298. 0, // Key is ignored
  299. MAX_WORKER_THREADS); // Number of active threads
  300. if (WorkQueuePort!=NULL) {
  301. // Create timer to trigger creation of
  302. // new threads if work queue is not served
  303. // for the specified timeout
  304. WorkQueueTimer = CreateWaitableTimer (
  305. NULL, // No security
  306. FALSE, // auto-reset
  307. NULL); // No name
  308. if (WorkQueueTimer!=NULL) {
  309. // Start Alertable threads
  310. //
  311. //
  312. // initialize the global structure for wait threads
  313. //
  314. dwErr = InitializeWaitGlobal();
  315. if (dwErr!=NO_ERROR) {
  316. DeInitializeWaitGlobal();
  317. goto ThreadCreationError;
  318. }
  319. /*WTG.g_InitializedEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  320. if (WTG.g_InitializedEvent==NULL)
  321. goto ThreadCreationError;
  322. */
  323. //
  324. // create one alertable thread and wait for it to get initialized
  325. // This makes sure that at least one server thread is initialized
  326. // before the others attemp to use it.
  327. //
  328. i =0;
  329. threadhandle = CreateThread (
  330. NULL, // No security
  331. 0, // Default stack
  332. AlertableWaitWorkerThread,// Start routine
  333. (PVOID)(LONG_PTR)i, // Thread param
  334. 0, // No flags
  335. &tid);
  336. if (threadhandle!=NULL)
  337. CloseHandle (threadhandle);
  338. else
  339. goto ThreadCreationError;
  340. /*
  341. WaitForSingleObject(WTG.g_InitializedEvent, INFINITE);
  342. CloseHandle(WTG.g_InitializedEvent);
  343. */
  344. /*
  345. //
  346. // create the other alertable threads but dont wait on them to
  347. // get initialization
  348. //
  349. for (i=1; i < NUM_ALERTABLE_THREADS; i++) {
  350. threadhandle = CreateThread (
  351. NULL, // No security
  352. 0, // Default stack
  353. AlertableWaitWorkerThread,// Start routine
  354. (PVOID)(LONG_PTR)i, // Thread id
  355. 0, // No flags
  356. &tid);
  357. if (threadhandle!=NULL)
  358. CloseHandle (threadhandle);
  359. else
  360. goto ThreadCreationError;
  361. }
  362. */
  363. // Start the rest of worker threads
  364. //
  365. for (i=0; i < MinThreads; i++) {
  366. threadhandle = CreateThread (
  367. NULL, // No security
  368. 0, // Default stack
  369. WorkerThread,// Start routine
  370. NULL, // No parameter
  371. 0, // No flags
  372. &tid) ;
  373. if (threadhandle!=NULL)
  374. CloseHandle (threadhandle);
  375. else
  376. goto ThreadCreationError;
  377. }
  378. ThreadCount = i;
  379. WorkersInitialized = WORKERS_INITIALIZED;
  380. return WORKERS_INITIALIZED;
  381. ThreadCreationError:
  382. // Cleanup in case of failure
  383. // Threads will exit by themselves when objects are
  384. // deleted
  385. CloseHandle (WorkQueueTimer);
  386. }
  387. CloseHandle (WorkQueuePort);
  388. }
  389. CloseHandle (AlertableThreadSemaphore);
  390. }
  391. HeapDestroy (AlertableWorkerHeap);
  392. }
  393. DeleteCriticalSection (&AlertableWorkQueueLock);
  394. #if DBG
  395. DbgPrint ("RTUTILS: %lx - worker initialization failed (%ld).\n",
  396. GetCurrentThreadId (), GetLastError ());
  397. #endif
  398. return WORKERS_NOT_INITIALIZED;
  399. }
  400. #endif
  401. return WORKERS_NOT_INITIALIZED;
  402. }
  403. //* StopWorkers()
  404. //
  405. // Function: Cleanup worker thread when dll is unloaded
  406. //
  407. //*
  408. VOID
  409. StopWorkers (
  410. VOID
  411. ) {
  412. // Make sure we were initialized
  413. if (WorkersInitialized==WORKERS_INITIALIZED) {
  414. // All work items should have been completed
  415. // already (no other components should be using
  416. // our routines or we shouldn't have been unloaded)
  417. // Set the flag telling all threads to quit
  418. WorkersInitialized = WORKERS_NOT_INITIALIZED;
  419. // Close all syncronization objects (this should
  420. // terminate the wait)
  421. if (WorkQueueTimer)
  422. CloseHandle (WorkQueueTimer);
  423. if (WorkQueuePort)
  424. CloseHandle (WorkQueuePort);
  425. if (AlertableThreadSemaphore)
  426. CloseHandle (AlertableThreadSemaphore);
  427. // Let threads complete
  428. Sleep (1000);
  429. // Destroy the rest
  430. if (AlertableWorkerHeap)
  431. HeapDestroy (AlertableWorkerHeap);
  432. DeleteCriticalSection (&AlertableWorkQueueLock);
  433. }
  434. }
  435. //* QueueWorkItem()
  436. //
  437. // Function: Queues the supplied work item in the work queue.
  438. //
  439. // Returns: 0 (success)
  440. // Win32 error codes for cases like out of memory
  441. //
  442. //*
  443. DWORD APIENTRY
  444. QueueWorkItem (WORKERFUNCTION functionptr, PVOID context, BOOL serviceinalertablethread)
  445. {
  446. DWORD retcode ;
  447. LONG initFlag;
  448. if (functionptr == NULL)
  449. return ERROR_INVALID_PARAMETER;
  450. // if uninitialized, attempt to initialize worker threads
  451. //
  452. if (!ENTER_WORKER_API) {
  453. retcode = GetLastError();
  454. return (retcode == NO_ERROR ? ERROR_CAN_NOT_COMPLETE : retcode);
  455. }
  456. // based on this flag insert in either the alertablequeue or the workerqueue
  457. //
  458. if (!serviceinalertablethread) {
  459. NTSTATUS status;
  460. // Use completion port to execute the item
  461. status = NtSetIoCompletion (
  462. WorkQueuePort, // Port
  463. (PVOID)WorkerCompletionRoutine, // Completion routine
  464. functionptr, // Apc context
  465. STATUS_SUCCESS, // Status
  466. (ULONG_PTR)context); // Information ()
  467. if (status==STATUS_SUCCESS)
  468. retcode = NO_ERROR;
  469. else
  470. retcode = RtlNtStatusToDosError (status);
  471. }
  472. else {
  473. // Create and queue work item
  474. WorkItem *workitem ;
  475. workitem = (WorkItem *) HeapAlloc (
  476. AlertableWorkerHeap,
  477. 0, // No flags
  478. sizeof (WorkItem));
  479. if (workitem != NULL) {
  480. workitem->WI_Function = functionptr ;
  481. workitem->WI_Context = context ;
  482. EnterCriticalSection (&AlertableWorkQueueLock) ;
  483. InsertTailList (&AlertableWorkQueue, &workitem->WI_List) ;
  484. LeaveCriticalSection (&AlertableWorkQueueLock) ;
  485. // let a worker thread run if waiting
  486. //
  487. ReleaseSemaphore (AlertableThreadSemaphore, 1, NULL) ;
  488. retcode = 0 ;
  489. }
  490. else
  491. retcode = ERROR_NOT_ENOUGH_MEMORY ;
  492. }
  493. return retcode ;
  494. }
  495. // Function: Associates file handle with the completion port (all
  496. // asynchronous io on this handle will be queued to the
  497. // completion port)
  498. //
  499. // FileHandle: file handle to be associated with completion port
  500. // CompletionProc: procedure to be called when io associated with
  501. // the file handle completes. This function will be
  502. // executed in the context of non-alertable worker thread
  503. DWORD
  504. APIENTRY
  505. SetIoCompletionProc (
  506. IN HANDLE FileHandle,
  507. IN LPOVERLAPPED_COMPLETION_ROUTINE CompletionProc
  508. ) {
  509. HANDLE hdl;
  510. LONG initFlag;
  511. DWORD retcode;
  512. if (!CompletionProc)
  513. return ERROR_INVALID_PARAMETER;
  514. if (FileHandle==NULL || FileHandle==INVALID_HANDLE_VALUE)
  515. return ERROR_INVALID_PARAMETER;
  516. // if uninitialized, attempt to initialize worker threads
  517. //
  518. if (!ENTER_WORKER_API) {
  519. retcode = GetLastError();
  520. return (retcode == NO_ERROR ? ERROR_CAN_NOT_COMPLETE : retcode);
  521. }
  522. hdl = CreateIoCompletionPort (FileHandle,
  523. WorkQueuePort,
  524. (UINT_PTR)CompletionProc,
  525. 0);
  526. if (hdl!=NULL)
  527. return NO_ERROR;
  528. else
  529. return GetLastError ();
  530. }