Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

638 lines
17 KiB

  1. /*++
  2. Copyright (c) 1996 Microsoft Corporation
  3. Module Name:
  4. threadq.c
  5. Abstract:
  6. Generic Worker Thread Queue Package
  7. Author:
  8. Mike Massa (mikemas) April 5, 1996
  9. Revision History:
  10. Who When What
  11. -------- -------- ----------------------------------------------
  12. mikemas 04-05-96 created
  13. Notes:
  14. Worker Thread Queues provide a single mechanism for processing
  15. overlapped I/O completions as well as deferred work items. Work
  16. queues are created and destroyed using ClRtlCreateWorkQueue()
  17. and ClRtlDestroyWorkQueue(). Overlapped I/O completions are
  18. directed to a work queue by associating an I/O handle with a work
  19. queue using ClRtlAssociateIoHandleWorkQueue(). Deferred work items
  20. are posted to a work queue using ClRtlPostItemWorkQueue().
  21. Work queues are implemented using I/O completion ports. Each work
  22. queue is serviced by a set of threads which dispatch work items
  23. to specified work routines. Threads are created dynamically, up to
  24. a specified maximum, to ensure that there is always a thread waiting
  25. to service new work items. The priority of the threads servicing a
  26. work queue can be specified.
  27. [Future enhancement: dynamically shrink the thread pool when the
  28. incoming work rate drops off. Currently, threads continue to service
  29. the work queue until it is destroyed.]
  30. Special care must be taken when destroying a work queue to ensure
  31. that all threads terminate properly and no work items are lost.
  32. See the notes under ClRtlDestroyWorkQueue.
  33. --*/
  34. #include "clusrtlp.h"
  35. //
  36. // Private Types
  37. //
  38. typedef struct _CLRTL_WORK_QUEUE {
  39. HANDLE IoCompletionPort;
  40. LONG MaximumThreads;
  41. LONG TotalThreads;
  42. LONG WaitingThreads;
  43. LONG ReserveThreads;
  44. LONG ConcurrentThreads;
  45. DWORD Timeout;
  46. int ThreadPriority;
  47. HANDLE StopEvent;
  48. } CLRTL_WORK_QUEUE;
  49. //
  50. // Private Routines
  51. //
  52. DWORD
  53. ClRtlpWorkerThread(
  54. LPDWORD Context
  55. )
  56. {
  57. PCLRTL_WORK_QUEUE workQueue = (PCLRTL_WORK_QUEUE) Context;
  58. DWORD bytesTransferred;
  59. BOOL ioSuccess;
  60. ULONG_PTR ioContext;
  61. LPOVERLAPPED overlapped;
  62. PCLRTL_WORK_ITEM workItem;
  63. DWORD status;
  64. LONG interlockedResult;
  65. DWORD threadId;
  66. HANDLE threadHandle;
  67. DWORD timeout;
  68. LONG myThreadId;
  69. timeout = workQueue->Timeout;
  70. myThreadId = GetCurrentThreadId();
  71. if (!SetThreadPriority(GetCurrentThread(), workQueue->ThreadPriority)) {
  72. status = GetLastError();
  73. ClRtlLogPrint(
  74. LOG_UNUSUAL,
  75. "[WTQ] Thread %1!u! unable to set priority to %2!d!, status %3!u!\n",
  76. myThreadId,
  77. workQueue->ThreadPriority,
  78. status
  79. );
  80. }
  81. #if THREADQ_VERBOSE
  82. ClRtlLogPrint(
  83. LOG_CRITICAL,
  84. "[WTQ] Thread %1!u! started, queue %2!lx!.\n",
  85. myThreadId,
  86. workQueue
  87. );
  88. #endif
  89. while (TRUE) {
  90. InterlockedIncrement(&(workQueue->WaitingThreads));
  91. ioSuccess = GetQueuedCompletionStatus(
  92. workQueue->IoCompletionPort,
  93. &bytesTransferred,
  94. &ioContext,
  95. &overlapped,
  96. timeout
  97. );
  98. interlockedResult = InterlockedDecrement(
  99. &(workQueue->WaitingThreads)
  100. );
  101. if (overlapped) {
  102. //
  103. // Something was dequeued.
  104. //
  105. workItem = CONTAINING_RECORD(
  106. overlapped,
  107. CLRTL_WORK_ITEM,
  108. Overlapped
  109. );
  110. if (interlockedResult == 0) {
  111. //
  112. // No more threads are waiting. Fire another one up.
  113. // Make sure we haven't started too many first.
  114. //
  115. interlockedResult = InterlockedDecrement(
  116. &(workQueue->ReserveThreads)
  117. );
  118. if (interlockedResult > 0) {
  119. //
  120. // We haven't started too many
  121. //
  122. #if THREADQ_VERBOSE
  123. ClRtlLogPrint(
  124. LOG_NOISE,
  125. "[WTQ] Thread %1!u! starting another thread for queue %2!lx!.\n",
  126. myThreadId,
  127. workQueue
  128. );
  129. #endif // 0
  130. InterlockedIncrement(&(workQueue->TotalThreads));
  131. threadHandle = CreateThread(
  132. NULL,
  133. 0,
  134. ClRtlpWorkerThread,
  135. workQueue,
  136. 0,
  137. &threadId
  138. );
  139. if (threadHandle == NULL) {
  140. InterlockedDecrement(&(workQueue->TotalThreads));
  141. InterlockedIncrement(&(workQueue->ReserveThreads));
  142. status = GetLastError();
  143. ClRtlLogPrint(
  144. LOG_CRITICAL,
  145. "[WTQ] Thread %1!u! failed to create thread, %2!u!\n",
  146. myThreadId,
  147. status
  148. );
  149. }
  150. else {
  151. CloseHandle(threadHandle);
  152. }
  153. }
  154. else {
  155. InterlockedIncrement(&(workQueue->ReserveThreads));
  156. }
  157. } // end if (interlockedResult == 0)
  158. if (ioSuccess) {
  159. (*(workItem->WorkRoutine))(
  160. workItem,
  161. ERROR_SUCCESS,
  162. bytesTransferred,
  163. ioContext
  164. );
  165. }
  166. else {
  167. //
  168. // The item was posted with an error.
  169. //
  170. status = GetLastError();
  171. (*(workItem->WorkRoutine))(
  172. workItem,
  173. status,
  174. bytesTransferred,
  175. ioContext
  176. );
  177. }
  178. continue;
  179. }
  180. else {
  181. //
  182. // No item was dequeued
  183. //
  184. if (ioSuccess) {
  185. //
  186. // This is our cue to start the termination process.
  187. // Set the timeout to zero to make sure we don't block
  188. // after the port is drained.
  189. //
  190. timeout = 0;
  191. #if THREADQ_VERBOSE
  192. ClRtlLogPrint(
  193. LOG_NOISE,
  194. "[WTQ] Thread %1!u! beginning termination process\n",
  195. myThreadId
  196. );
  197. #endif // 0
  198. }
  199. else {
  200. status = GetLastError();
  201. if (status == WAIT_TIMEOUT) {
  202. //
  203. // No more items pending, time to exit.
  204. //
  205. CL_ASSERT(timeout == 0);
  206. break;
  207. }
  208. CL_ASSERT(status == WAIT_TIMEOUT);
  209. ClRtlLogPrint(
  210. LOG_CRITICAL,
  211. "[WTQ] Thread %1!u! No item, strange status %2!u! on queue %3!lx!\n",
  212. myThreadId,
  213. status,
  214. workQueue
  215. );
  216. }
  217. } // end if (overlapped)
  218. } // end while(TRUE)
  219. CL_ASSERT(workQueue->TotalThreads > 0);
  220. InterlockedIncrement(&(workQueue->ReserveThreads));
  221. InterlockedDecrement(&(workQueue->TotalThreads));
  222. #if THREADQ_VERBOSE
  223. ClRtlLogPrint(LOG_NOISE, "[WTQ] Thread %1!u! exiting.\n", myThreadId);
  224. #endif // 0
  225. //
  226. // Let the ClRtlDestroyWorkQueue know we are terminating.
  227. //
  228. SetEvent(workQueue->StopEvent);
  229. return(ERROR_SUCCESS);
  230. }
  231. //
  232. // Public Routines
  233. //
  234. PCLRTL_WORK_QUEUE
  235. ClRtlCreateWorkQueue(
  236. IN DWORD MaximumThreads,
  237. IN int ThreadPriority
  238. )
  239. /*++
  240. Routine Description:
  241. Creates a work queue and a dynamic pool of threads to service it.
  242. Arguments:
  243. MaximumThreads - The maximum number of threads to create to service
  244. the queue.
  245. ThreadPriority - The priority level at which the queue worker threads
  246. should run.
  247. Return Value:
  248. A pointer to the created queue if the routine is successful.
  249. NULL if the routine fails. Call GetLastError for extended
  250. error information.
  251. --*/
  252. {
  253. DWORD status;
  254. PCLRTL_WORK_QUEUE workQueue = NULL;
  255. DWORD threadId;
  256. HANDLE threadHandle = NULL;
  257. HANDLE bogusHandle = NULL;
  258. if (MaximumThreads == 0) {
  259. SetLastError(ERROR_INVALID_PARAMETER);
  260. return(NULL);
  261. }
  262. bogusHandle = CreateFileW(
  263. L"NUL",
  264. GENERIC_READ,
  265. FILE_SHARE_READ,
  266. NULL,
  267. OPEN_EXISTING,
  268. FILE_FLAG_OVERLAPPED,
  269. NULL
  270. );
  271. if (bogusHandle == INVALID_HANDLE_VALUE) {
  272. status = GetLastError();
  273. ClRtlLogPrint(LOG_CRITICAL, "[WTQ] bogus file creation failed, %1!u!\n", status);
  274. return(NULL);
  275. }
  276. workQueue = LocalAlloc(
  277. LMEM_FIXED | LMEM_ZEROINIT,
  278. sizeof(CLRTL_WORK_QUEUE)
  279. );
  280. if (workQueue == NULL) {
  281. status = ERROR_NOT_ENOUGH_MEMORY;
  282. goto error_exit;
  283. }
  284. workQueue->MaximumThreads = MaximumThreads;
  285. workQueue->TotalThreads = 1;
  286. workQueue->WaitingThreads = 0;
  287. workQueue->ReserveThreads = MaximumThreads - 1;
  288. workQueue->ConcurrentThreads = 0;
  289. workQueue->Timeout = INFINITE;
  290. workQueue->ThreadPriority = ThreadPriority;
  291. workQueue->IoCompletionPort = CreateIoCompletionPort(
  292. bogusHandle,
  293. NULL,
  294. 0,
  295. workQueue->ConcurrentThreads
  296. );
  297. CloseHandle(bogusHandle); bogusHandle = NULL;
  298. if (workQueue->IoCompletionPort == NULL) {
  299. status = GetLastError();
  300. ClRtlLogPrint(LOG_CRITICAL, "[WTQ] Creation of I/O Port failed, %1!u!\n", status);
  301. }
  302. workQueue->StopEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  303. if (workQueue->StopEvent == NULL) {
  304. status = GetLastError();
  305. ClRtlLogPrint(LOG_CRITICAL, "[WTQ] Creation of stop event failed, %1!u!\n", status);
  306. goto error_exit;
  307. }
  308. threadHandle = CreateThread(
  309. NULL,
  310. 0,
  311. ClRtlpWorkerThread,
  312. workQueue,
  313. 0,
  314. &threadId
  315. );
  316. if (threadHandle == NULL) {
  317. status = GetLastError();
  318. ClRtlLogPrint(LOG_CRITICAL, "[WTQ] Failed to create worker thread, %1!u!\n", status);
  319. goto error_exit;
  320. }
  321. CloseHandle(threadHandle);
  322. return(workQueue);
  323. error_exit:
  324. if (bogusHandle != NULL) {
  325. CloseHandle(bogusHandle);
  326. }
  327. if (workQueue != NULL) {
  328. if (workQueue->IoCompletionPort != NULL) {
  329. CloseHandle(workQueue->IoCompletionPort);
  330. }
  331. if (workQueue->StopEvent != NULL) {
  332. CloseHandle(workQueue->StopEvent);
  333. }
  334. LocalFree(workQueue);
  335. }
  336. SetLastError(status);
  337. return(NULL);
  338. }
  339. VOID
  340. ClRtlDestroyWorkQueue(
  341. IN PCLRTL_WORK_QUEUE WorkQueue
  342. )
  343. /*++
  344. Routine Description:
  345. Destroys a work queue and its thread pool.
  346. Arguments:
  347. WorkQueue - The queue to destroy.
  348. Return Value:
  349. None.
  350. Notes:
  351. The following rules must be observed in order to safely destroy a
  352. work queue:
  353. 1) No new work items may be posted to the queue once all previously
  354. posted items have been processed by this routine.
  355. 2) WorkRoutines must be able to process items until this
  356. call returns. After the call returns, no more items will
  357. be delivered from the specified queue.
  358. One workable cleanup procedure is as follows: First, direct the
  359. WorkRoutines to silently discard completed items. Next, eliminate
  360. all sources of new work. Finally, destroy the work queue. Note that
  361. when in discard mode, the WorkRoutines may not access any structures
  362. which will be destroyed by eliminating the sources of new work.
  363. --*/
  364. {
  365. BOOL posted;
  366. DWORD status;
  367. #if THREADQ_VERBOSE
  368. ClRtlLogPrint(LOG_NOISE, "[WTQ] Destroying work queue %1!lx!\n", WorkQueue);
  369. #endif // 0
  370. while (WorkQueue->TotalThreads != 0) {
  371. #if THREADQ_VERBOSE
  372. ClRtlLogPrint(
  373. LOG_NOISE,
  374. "[WTQ] Destroy: Posting terminate item, thread cnt %1!u!\n",
  375. WorkQueue->TotalThreads
  376. );
  377. #endif // 0
  378. posted = PostQueuedCompletionStatus(
  379. WorkQueue->IoCompletionPort,
  380. 0,
  381. 0,
  382. NULL
  383. );
  384. if (!posted) {
  385. status = GetLastError();
  386. ClRtlLogPrint(
  387. LOG_CRITICAL,
  388. "[WTQ] Destroy: Failed to post termination item, %1!u!\n",
  389. status
  390. );
  391. CL_ASSERT(status == ERROR_SUCCESS);
  392. break;
  393. }
  394. #if THREADQ_VERBOSE
  395. ClRtlLogPrint(LOG_NOISE, "[WTQ] Destroy: Waiting for a thread to terminate.\n");
  396. #endif // 0
  397. status = WaitForSingleObject(WorkQueue->StopEvent, INFINITE);
  398. CL_ASSERT(status == WAIT_OBJECT_0);
  399. #if THREADQ_VERBOSE
  400. ClRtlLogPrint(LOG_NOISE, "[WTQ] Destroy: A thread terminated.\n");
  401. #endif // 0
  402. }
  403. CloseHandle(WorkQueue->IoCompletionPort);
  404. CloseHandle(WorkQueue->StopEvent);
  405. LocalFree(WorkQueue);
  406. #if THREADQ_VERBOSE
  407. ClRtlLogPrint(LOG_NOISE, "[WTQ] Work queue %1!lx! destroyed\n", WorkQueue);
  408. #endif // 0
  409. return;
  410. }
  411. DWORD
  412. ClRtlPostItemWorkQueue(
  413. IN PCLRTL_WORK_QUEUE WorkQueue,
  414. IN PCLRTL_WORK_ITEM WorkItem,
  415. IN DWORD BytesTransferred,
  416. IN ULONG_PTR IoContext
  417. )
  418. /*++
  419. Routine Description:
  420. Posts a specified work item to a specified work queue.
  421. Arguments:
  422. WorkQueue - A pointer to the work queue to which to post the item.
  423. WorkItem - A pointer to the item to post.
  424. BytesTransferred - If the work item represents a completed I/O operation,
  425. this parameter contains the number of bytes
  426. transferred during the operation. For other work items,
  427. the semantics of this parameter may be defined by
  428. the caller.
  429. IoContext - If the work item represents a completed I/O operation,
  430. this parameter contains the context value associated
  431. with the handle on which the operation was submitted.
  432. Of other work items, the semantics of this parameter
  433. may be defined by the caller.
  434. Return Value:
  435. ERROR_SUCCESS if the item was posted successfully.
  436. A Win32 error code if the post operation fails.
  437. --*/
  438. {
  439. BOOL posted;
  440. posted = PostQueuedCompletionStatus(
  441. WorkQueue->IoCompletionPort,
  442. BytesTransferred,
  443. IoContext,
  444. &(WorkItem->Overlapped)
  445. );
  446. if (posted) {
  447. return(ERROR_SUCCESS);
  448. }
  449. return(GetLastError());
  450. }
  451. DWORD
  452. ClRtlAssociateIoHandleWorkQueue(
  453. IN PCLRTL_WORK_QUEUE WorkQueue,
  454. IN HANDLE IoHandle,
  455. IN ULONG_PTR IoContext
  456. )
  457. /*++
  458. Routine Description:
  459. Associates a specified I/O handle, opened for overlapped I/O
  460. completion, with a work queue. All pending I/O operations on
  461. the specified handle will be posted to the work queue when
  462. completed. An initialized CLRTL_WORK_ITEM must be used to supply
  463. the OVERLAPPED structure whenever an I/O operation is submitted on
  464. the specified handle.
  465. Arguments:
  466. WorkQueue - The work queue with which to associate the I/O handle.
  467. IoHandle - The I/O handle to associate.
  468. IoContext - A context value to associate with the specified handle.
  469. This value will be supplied as a parameter to the
  470. WorkRoutine which processes completions for this
  471. handle.
  472. Return Value:
  473. ERROR_SUCCESS if the association completes successfully.
  474. A Win32 error code if the association fails.
  475. --*/
  476. {
  477. HANDLE portHandle;
  478. portHandle = CreateIoCompletionPort(
  479. IoHandle,
  480. WorkQueue->IoCompletionPort,
  481. IoContext,
  482. WorkQueue->ConcurrentThreads
  483. );
  484. if (portHandle != NULL) {
  485. CL_ASSERT(portHandle == WorkQueue->IoCompletionPort);
  486. return(ERROR_SUCCESS);
  487. }
  488. return(GetLastError());
  489. }