Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

742 lines
24 KiB

  1. /*++
  2. Copyright (c) 1995 Microsoft Corporation
  3. Module Name:
  4. common\trace\worker.c
  5. Abstract:
  6. Worker threads for the router process
  7. Revision History:
  8. Gurdeep Singh Pall 7/28/95 Created
  9. 12-10-97: lokeshs: removed blocking of InitializeWorkerThread()
  10. on initialization of the first AlertableThread() created
  11. --*/
  12. #include <nt.h>
  13. #include <ntrtl.h>
  14. #include <nturtl.h>
  15. #include <windows.h>
  16. #include <stdio.h>
  17. #include <string.h>
  18. #include <malloc.h>
  19. #include <rtutils.h>
  20. //
  21. // #includes for the wait-server threads
  22. //
  23. #include "wt.h"
  24. #include "wtproto.h"
  25. #include "trace.h"
  26. #include "workerinc.h"
  27. #include "wttrace.h"
  28. //
  29. // Global variables for wait-server threads
  30. //
  31. WAIT_THREAD_GLOBAL WTG;
  32. //
  33. // Functions for asynchronous receives.
  34. // Currently #def 0. If these functions are to be enabled, then
  35. // all files #including rtutils.h should use winsock2.h
  36. //
  37. #if 0
  38. //------------------------------------------------------------------------------//
  39. // ProcessAsyncRecv //
  40. // This function runs the callback registered with the asynchronous receive //
  41. //------------------------------------------------------------------------------//
  42. VOID
  43. WINAPI
  44. ProcessAsyncRecv(
  45. DWORD dwErrorCode,
  46. DWORD dwNumBytesRecv,
  47. LPOVERLAPPED lpOverlapped
  48. )
  49. {
  50. PASYNC_SOCKET_DATA pSockData;
  51. TRACE_ENTER("ProcessAsyncRecv: going to run the user function");
  52. pSockData = CONTAINING_RECORD(lpOverlapped, ASYNC_SOCKET_DATA, Overlapped);
  53. UNREFERENCED_PARAMETER (dwErrorCode);
  54. pSockData->NumBytesReceived = dwNumBytesRecv;
  55. TRACE1(RECEIVE, "Received %d bytes size packet", dwNumBytesRecv);
  56. ((WORKERFUNCTION)pSockData->pFunction)(pSockData);
  57. TRACE_LEAVE("ProcessAsyncRecv: ran the user function");
  58. return;
  59. }
  60. //------------------------------------------------------------------------------//
  61. // AsyncSocketInit //
  62. // This function binds the socket with the IoCompletionPort, that is used to //
  63. // wait for the asynchronous receives //
  64. //------------------------------------------------------------------------------//
  65. DWORD
  66. APIENTRY
  67. AsyncSocketInit (
  68. SOCKET sock
  69. )
  70. {
  71. HANDLE hTemp;
  72. DWORD dwErr;
  73. // initialize worker threads if not yet done
  74. if (!ENTER_WORKER_API) {
  75. dwErr = GetLastError();
  76. return (dwErr == NO_ERROR ? ERROR_CAN_NOT_COMPLETE : dwErr);
  77. }
  78. TRACE_ENTER("AsyncSocketInit()");
  79. hTemp = CreateIoCompletionPort((HANDLE)sock, WorkQueuePort,
  80. (ULONG)ProcessAsyncRecv, 0);
  81. if (hTemp!=WorkQueuePort) {
  82. dwErr = GetLastError();
  83. return dwErr;
  84. }
  85. TRACE_LEAVE("AsyncSocketInit()");
  86. return NO_ERROR;
  87. }
  88. //------------------------------------------------------------------------------//
  89. // AsyncWSARecvFrom //
  90. // This function calls the WSARecvFrom // //
  91. //------------------------------------------------------------------------------//
  92. DWORD
  93. APIENTRY
  94. AsyncWSARecvFrom (
  95. SOCKET sock,
  96. PASYNC_SOCKET_DATA pSockData
  97. )
  98. {
  99. DWORD dwFromLen;
  100. DWORD dwErr;
  101. TRACE_ENTER("AsyncWSARecvFrom()");
  102. dwFromLen = sizeof (pSockData->SrcAddress);
  103. ZeroMemory((PVOID)&pSockData->Overlapped, sizeof(OVERLAPPED));
  104. pSockData->Overlapped.hEvent = NULL;
  105. dwErr = WSARecvFrom(sock, &pSockData->WsaBuf, 1, &pSockData->NumBytesReceived,
  106. &pSockData->Flags,
  107. (SOCKADDR FAR *)&pSockData->SrcAddress,
  108. &dwFromLen, &pSockData->Overlapped, NULL);
  109. if (dwErr!=SOCKET_ERROR) {
  110. TRACE0(RECEIVE, "the WSAReceiveFrom returned immediately\n");
  111. }
  112. else {
  113. dwErr = WSAGetLastError();
  114. if (dwErr!=WSA_IO_PENDING) {
  115. TRACE1(RECEIVE, "WSARecvFrom() returned in AsyncWSARecvFrom() with error code: %0x", dwErr);
  116. LOGERR0(WSA_RECV_FROM_FAILED, dwErr);
  117. }
  118. else
  119. TRACE0(RECEIVE, "WSARecvFrom() returned WSA_IO_PENDING in AsyncWSARecvFrom()");
  120. }
  121. TRACE_LEAVE("AsyncWSARecvFrom()");
  122. return NO_ERROR;
  123. }
  124. #endif //#def 0 of the Asynchronous Receive From functions
  125. // Time that thread has to be idle before exiting
  126. LARGE_INTEGER ThreadIdleTO = {
  127. (ULONG)(THREAD_IDLE_TIMEOUT*(-10000000)),
  128. 0xFFFFFFFF};
  129. // Time that the worker queue is not served before starting new thread
  130. CONST LARGE_INTEGER WorkQueueTO = {
  131. (ULONG)(WORK_QUEUE_TIMEOUT*(-10000000)),
  132. 0xFFFFFFFF};
  133. // Total number of threads
  134. LONG ThreadCount;
  135. // Number of threads waiting on the completion port
  136. LONG ThreadsWaiting;
  137. // Min allowed number of threads
  138. LONG MinThreads;
  139. // Completion port for threads to wait on
  140. HANDLE WorkQueuePort;
  141. // Timer to intiate creation of new thread if worker queue is not
  142. // server within a timeout
  143. HANDLE WorkQueueTimer;
  144. // Queue for alertable work items
  145. LIST_ENTRY AlertableWorkQueue ;
  146. // Lock for the alertable work item queue
  147. CRITICAL_SECTION AlertableWorkQueueLock ;
  148. // Heap for alertable work items
  149. HANDLE AlertableWorkerHeap ;
  150. // Worker Semaphore used for releasing alertable worker threads
  151. HANDLE AlertableThreadSemaphore;
  152. // Number of alertable threads
  153. LONG AlertableThreadCount;
  154. volatile LONG WorkersInitialized=WORKERS_NOT_INITIALIZED;
  155. //* WorkerThread()
  156. //
  157. // Function: Thread to execute work items in.
  158. //
  159. // Returns: Nothing
  160. //
  161. //*
  162. DWORD APIENTRY
  163. WorkerThread (
  164. LPVOID param
  165. ) {
  166. // It'll be waiting
  167. InterlockedIncrement (&ThreadsWaiting);
  168. do {
  169. LPOVERLAPPED_COMPLETION_ROUTINE completionRoutine;
  170. NTSTATUS status;
  171. PVOID context;
  172. IO_STATUS_BLOCK ioStatus;
  173. status = NtRemoveIoCompletion (
  174. WorkQueuePort,
  175. (PVOID *)&completionRoutine,
  176. &context,
  177. &ioStatus,
  178. &ThreadIdleTO);
  179. if (NT_SUCCESS (status)) {
  180. switch (status) {
  181. // We did dequeue a work item
  182. case STATUS_SUCCESS:
  183. if (InterlockedExchangeAdd (&ThreadsWaiting, -1)==1) {
  184. // Last thread to wait, start the timer
  185. // to create a new thread
  186. SetWaitableTimer (WorkQueueTimer,
  187. &WorkQueueTO,
  188. 0,
  189. NULL, NULL,
  190. FALSE);
  191. }
  192. // Execute work item/completion routine
  193. completionRoutine (
  194. // Quick check for success that all work items
  195. // and most of IRP complete with
  196. (ioStatus.Status==STATUS_SUCCESS)
  197. ? NO_ERROR
  198. : RtlNtStatusToDosError (ioStatus.Status),
  199. (DWORD)ioStatus.Information,
  200. (LPOVERLAPPED)context);
  201. if (InterlockedExchangeAdd (&ThreadsWaiting, 1)==0) {
  202. // Cancel time if this is the first thread
  203. // to return
  204. CancelWaitableTimer (WorkQueueTimer);
  205. }
  206. break;
  207. // Thread was not used for ThreadIdle timeout, see
  208. // if we need to quit
  209. case STATUS_TIMEOUT:
  210. while (1) {
  211. // Make a local copy of the count and
  212. // attempt to atomically check and update
  213. // it if necessary
  214. LONG count = ThreadCount;
  215. // Quick check for min thread condition
  216. if (count<=MinThreads)
  217. break;
  218. else {
  219. // Attempt to decrease the count
  220. // use another local variable
  221. // because of MIPS optimizer bug
  222. LONG newCount = count-1;
  223. if (InterlockedCompareExchange (&ThreadCount,
  224. newCount, count)==count) {
  225. // Succeded, exit the thread
  226. goto ExitThread;
  227. }
  228. // else try again
  229. }
  230. }
  231. break;
  232. default:
  233. ASSERTMSG ("Unexpected status code returned ", FALSE);
  234. break;
  235. }
  236. }
  237. // Execute while we are initialized
  238. }
  239. while (WorkersInitialized==WORKERS_INITIALIZED);
  240. ExitThread:
  241. InterlockedDecrement (&ThreadsWaiting);
  242. return 0;
  243. }
  244. //* AlertableWorkerThread()
  245. //
  246. // Function: Alertable work item thread
  247. //
  248. // Returns: Nothing
  249. //
  250. //*
  251. DWORD APIENTRY
  252. AlertableWorkerThread (
  253. LPVOID param
  254. ) {
  255. HANDLE WaitArray [] = {
  256. #define ALERTABLE_THREAD_SEMAPHORE WAIT_OBJECT_0
  257. AlertableThreadSemaphore,
  258. #define WORK_QUEUE_TIMER (WAIT_OBJECT_0+1)
  259. WorkQueueTimer
  260. };
  261. do {
  262. WorkItem *workitem;
  263. DWORD rc;
  264. // Wait for signal to run
  265. //
  266. rc = WaitForMultipleObjectsEx (
  267. sizeof (WaitArray)/sizeof (WaitArray[0]),
  268. WaitArray,
  269. FALSE,
  270. INFINITE,
  271. TRUE);
  272. switch (rc) {
  273. case ALERTABLE_THREAD_SEMAPHORE:
  274. // Pick up and execute the worker
  275. EnterCriticalSection (&AlertableWorkQueueLock);
  276. ASSERT (!IsListEmpty (&AlertableWorkQueue));
  277. workitem = (WorkItem *) RemoveHeadList (&AlertableWorkQueue) ;
  278. LeaveCriticalSection (&AlertableWorkQueueLock);
  279. (workitem->WI_Function) (workitem->WI_Context);
  280. HeapFree (AlertableWorkerHeap, 0, workitem);
  281. break;
  282. case WORK_QUEUE_TIMER:
  283. // Work queue has not been served wothin specified
  284. // timeout
  285. while (1) {
  286. // Make a local copy of the count
  287. LONG count = ThreadCount;
  288. // Make sure we havn't exceded the limit
  289. if (count>=MAX_WORKER_THREADS)
  290. break;
  291. else {
  292. // Try to increment the value
  293. // use another local variable
  294. // because of MIPS optimizer bug
  295. LONG newCount = count+1;
  296. if (InterlockedCompareExchange (&ThreadCount,
  297. newCount, count)==count) {
  298. HANDLE hThread;
  299. DWORD tid;
  300. // Create new thread if increment succeded
  301. hThread = CreateThread (NULL, 0, WorkerThread, NULL, 0, &tid);
  302. if (hThread!=NULL) {
  303. CloseHandle (hThread);
  304. }
  305. else // Restore the value if thread creation
  306. // failed
  307. InterlockedDecrement (&ThreadCount);
  308. break;
  309. }
  310. // else repeat the loop if ThreadCount was modified
  311. // while we were checking
  312. }
  313. }
  314. break;
  315. case WAIT_IO_COMPLETION:
  316. // Handle IO completion
  317. break;
  318. case 0xFFFFFFFF:
  319. // Error, we must have closed the semaphore handle
  320. break;
  321. default:
  322. ASSERTMSG ("Unexpected rc from WaitForObject ", FALSE);
  323. }
  324. }
  325. while (WorkersInitialized==WORKERS_INITIALIZED);
  326. return 0 ;
  327. }
  328. //* WorkerCompletionRoutine
  329. //
  330. // Function: Worker function wrapper for non-io work items
  331. //
  332. VOID WINAPI
  333. WorkerCompletionRoutine (
  334. DWORD dwErrorCode,
  335. PVOID ActualContext,
  336. LPOVERLAPPED ActualCompletionRoutine
  337. ) {
  338. UNREFERENCED_PARAMETER (dwErrorCode);
  339. ((WORKERFUNCTION)ActualCompletionRoutine)(ActualContext);
  340. }
  341. //* InitializeWorkerThread()
  342. //
  343. // Function: Called by the first work item
  344. //
  345. // Returns: WORKERS_INITIALIZED if successful.
  346. // WORKERS_NOT_INITIALIZED not.
  347. //*
  348. LONG
  349. InitializeWorkerThread (
  350. LONG initFlag
  351. ) {
  352. DWORD dwErr;
  353. #if 0
  354. if (initFlag==WORKERS_INITIALIZING) {
  355. #if DBG
  356. DbgPrint ("RTUTILS: %lx - waiting for worker initialization.\n", GetCurrentThreadId ());
  357. #endif
  358. while (WorkersInitialized==WORKERS_INITIALIZING)
  359. Sleep (100);
  360. #if DBG
  361. DbgPrint ("RTUTILS: %lx - wait for worker initialization done.\n", GetCurrentThreadId ());
  362. #endif
  363. }
  364. if (WorkersInitialized==WORKERS_INITIALIZED) {
  365. return WORKERS_INITIALIZED;
  366. }
  367. else {
  368. INT i;
  369. DWORD tid;
  370. HANDLE threadhandle;
  371. SYSTEM_INFO systeminfo;
  372. // Get number of processors
  373. //
  374. GetSystemInfo (&systeminfo) ;
  375. MinThreads = systeminfo.dwNumberOfProcessors;
  376. ThreadsWaiting = 0;
  377. // Init critical section
  378. //
  379. InitializeCriticalSection (&AlertableWorkQueueLock);
  380. // Initialize work queue
  381. //
  382. InitializeListHead (&AlertableWorkQueue) ;
  383. // Allocate private heap
  384. //
  385. AlertableWorkerHeap = HeapCreate (0, // no flags
  386. systeminfo.dwPageSize,// initial heap size
  387. 0); // no maximum size
  388. if (AlertableWorkerHeap != NULL) {
  389. // Create counting semaphore for releasing alertable threads
  390. AlertableThreadSemaphore = CreateSemaphore(NULL, // No security
  391. 0, // Initial value
  392. MAXLONG, // Max items to queue
  393. NULL); // No name
  394. if (AlertableThreadSemaphore!=NULL) {
  395. // Create completion port for work items
  396. WorkQueuePort = CreateIoCompletionPort (
  397. INVALID_HANDLE_VALUE, // Just create a port, no file yet
  398. NULL, // New port
  399. 0, // Key is ignored
  400. MAX_WORKER_THREADS); // Number of active threads
  401. if (WorkQueuePort!=NULL) {
  402. // Create timer to trigger creation of
  403. // new threads if work queue is not served
  404. // for the specified timeout
  405. WorkQueueTimer = CreateWaitableTimer (
  406. NULL, // No security
  407. FALSE, // auto-reset
  408. NULL); // No name
  409. if (WorkQueueTimer!=NULL) {
  410. // Start Alertable threads
  411. //
  412. //
  413. // initialize the global structure for wait threads
  414. //
  415. dwErr = InitializeWaitGlobal();
  416. if (dwErr!=NO_ERROR) {
  417. DeInitializeWaitGlobal();
  418. goto ThreadCreationError;
  419. }
  420. /*WTG.g_InitializedEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  421. if (WTG.g_InitializedEvent==NULL)
  422. goto ThreadCreationError;
  423. */
  424. //
  425. // create one alertable thread and wait for it to get initialized
  426. // This makes sure that at least one server thread is initialized
  427. // before the others attemp to use it.
  428. //
  429. i =0;
  430. threadhandle = CreateThread (
  431. NULL, // No security
  432. 0, // Default stack
  433. AlertableWaitWorkerThread,// Start routine
  434. (PVOID)(LONG_PTR)i, // Thread param
  435. 0, // No flags
  436. &tid);
  437. if (threadhandle!=NULL)
  438. CloseHandle (threadhandle);
  439. else
  440. goto ThreadCreationError;
  441. /*
  442. WaitForSingleObject(WTG.g_InitializedEvent, INFINITE);
  443. CloseHandle(WTG.g_InitializedEvent);
  444. */
  445. /*
  446. //
  447. // create the other alertable threads but dont wait on them to
  448. // get initialization
  449. //
  450. for (i=1; i < NUM_ALERTABLE_THREADS; i++) {
  451. threadhandle = CreateThread (
  452. NULL, // No security
  453. 0, // Default stack
  454. AlertableWaitWorkerThread,// Start routine
  455. (PVOID)(LONG_PTR)i, // Thread id
  456. 0, // No flags
  457. &tid);
  458. if (threadhandle!=NULL)
  459. CloseHandle (threadhandle);
  460. else
  461. goto ThreadCreationError;
  462. }
  463. */
  464. // Start the rest of worker threads
  465. //
  466. for (i=0; i < MinThreads; i++) {
  467. threadhandle = CreateThread (
  468. NULL, // No security
  469. 0, // Default stack
  470. WorkerThread,// Start routine
  471. NULL, // No parameter
  472. 0, // No flags
  473. &tid) ;
  474. if (threadhandle!=NULL)
  475. CloseHandle (threadhandle);
  476. else
  477. goto ThreadCreationError;
  478. }
  479. ThreadCount = i;
  480. WorkersInitialized = WORKERS_INITIALIZED;
  481. return WORKERS_INITIALIZED;
  482. ThreadCreationError:
  483. // Cleanup in case of failure
  484. // Threads will exit by themselves when objects are
  485. // deleted
  486. CloseHandle (WorkQueueTimer);
  487. }
  488. CloseHandle (WorkQueuePort);
  489. }
  490. CloseHandle (AlertableThreadSemaphore);
  491. }
  492. HeapDestroy (AlertableWorkerHeap);
  493. }
  494. DeleteCriticalSection (&AlertableWorkQueueLock);
  495. #if DBG
  496. DbgPrint ("RTUTILS: %lx - worker initialization failed (%ld).\n",
  497. GetCurrentThreadId (), GetLastError ());
  498. #endif
  499. return WORKERS_NOT_INITIALIZED;
  500. }
  501. #endif
  502. return WORKERS_NOT_INITIALIZED;
  503. }
  504. //* StopWorkers()
  505. //
  506. // Function: Cleanup worker thread when dll is unloaded
  507. //
  508. //*
  509. VOID
  510. StopWorkers (
  511. VOID
  512. ) {
  513. // Make sure we were initialized
  514. if (WorkersInitialized==WORKERS_INITIALIZED) {
  515. // All work items should have been completed
  516. // already (no other components should be using
  517. // our routines or we shouldn't have been unloaded)
  518. // Set the flag telling all threads to quit
  519. WorkersInitialized = WORKERS_NOT_INITIALIZED;
  520. // Close all syncronization objects (this should
  521. // terminate the wait)
  522. CloseHandle (WorkQueueTimer);
  523. CloseHandle (WorkQueuePort);
  524. CloseHandle (AlertableThreadSemaphore);
  525. // Let threads complete
  526. Sleep (1000);
  527. // Destroy the rest
  528. HeapDestroy (AlertableWorkerHeap);
  529. DeleteCriticalSection (&AlertableWorkQueueLock);
  530. }
  531. }
  532. //* QueueWorkItem()
  533. //
  534. // Function: Queues the supplied work item in the work queue.
  535. //
  536. // Returns: 0 (success)
  537. // Win32 error codes for cases like out of memory
  538. //
  539. //*
  540. DWORD APIENTRY
  541. QueueWorkItem (WORKERFUNCTION functionptr, PVOID context, BOOL serviceinalertablethread)
  542. {
  543. DWORD retcode ;
  544. LONG initFlag;
  545. // if uninitialized, attempt to initialize worker threads
  546. //
  547. if (!ENTER_WORKER_API) {
  548. retcode = GetLastError();
  549. return (retcode == NO_ERROR ? ERROR_CAN_NOT_COMPLETE : retcode);
  550. }
  551. // based on this flag insert in either the alertablequeue or the workerqueue
  552. //
  553. if (!serviceinalertablethread) {
  554. NTSTATUS status;
  555. // Use completion port to execute the item
  556. status = NtSetIoCompletion (
  557. WorkQueuePort, // Port
  558. (PVOID)WorkerCompletionRoutine, // Completion routine
  559. functionptr, // Apc context
  560. STATUS_SUCCESS, // Status
  561. (ULONG_PTR)context); // Information ()
  562. if (status==STATUS_SUCCESS)
  563. retcode = NO_ERROR;
  564. else
  565. retcode = RtlNtStatusToDosError (status);
  566. }
  567. else {
  568. // Create and queue work item
  569. WorkItem *workitem ;
  570. workitem = (WorkItem *) HeapAlloc (
  571. AlertableWorkerHeap,
  572. 0, // No flags
  573. sizeof (WorkItem));
  574. if (workitem != NULL) {
  575. workitem->WI_Function = functionptr ;
  576. workitem->WI_Context = context ;
  577. EnterCriticalSection (&AlertableWorkQueueLock) ;
  578. InsertTailList (&AlertableWorkQueue, &workitem->WI_List) ;
  579. LeaveCriticalSection (&AlertableWorkQueueLock) ;
  580. // let a worker thread run if waiting
  581. //
  582. ReleaseSemaphore (AlertableThreadSemaphore, 1, NULL) ;
  583. retcode = 0 ;
  584. }
  585. else
  586. retcode = GetLastError () ;
  587. }
  588. return retcode ;
  589. }
  590. // Function: Associates file handle with the completion port (all
  591. // asynchronous io on this handle will be queued to the
  592. // completion port)
  593. //
  594. // FileHandle: file handle to be associated with completion port
  595. // CompletionProc: procedure to be called when io associated with
  596. // the file handle completes. This function will be
  597. // executed in the context of non-alertable worker thread
  598. DWORD
  599. APIENTRY
  600. SetIoCompletionProc (
  601. IN HANDLE FileHandle,
  602. IN LPOVERLAPPED_COMPLETION_ROUTINE CompletionProc
  603. ) {
  604. HANDLE hdl;
  605. LONG initFlag;
  606. DWORD retcode;
  607. // if uninitialized, attempt to initialize worker threads
  608. //
  609. if (!ENTER_WORKER_API) {
  610. retcode = GetLastError();
  611. return (retcode == NO_ERROR ? ERROR_CAN_NOT_COMPLETE : retcode);
  612. }
  613. hdl = CreateIoCompletionPort (FileHandle,
  614. WorkQueuePort,
  615. (UINT_PTR)CompletionProc,
  616. 0);
  617. if (hdl!=NULL)
  618. return NO_ERROR;
  619. else
  620. return GetLastError ();
  621. }