Leaked source code of windows server 2003
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

304 lines
7.3 KiB

  1. ///////////////////////////////////////////////////////////////////////////////
  2. //
  3. // Copyright (c) Microsoft Corp. All rights reserved.
  4. //
  5. // FILE
  6. //
  7. // Dispatcher.cpp
  8. //
  9. // SYNOPSIS
  10. //
  11. // This file implements the class Dispatcher.
  12. //
  13. // MODIFICATION HISTORY
  14. //
  15. // 07/31/1997 Original version.
  16. // 12/04/1997 Check return value of _beginthreadex.
  17. // 02/24/1998 Initialize COM run-time for all threads.
  18. // 04/16/1998 Block in Finalize until all the threads have returned.
  19. // 05/20/1998 GetQueuedCompletionStatus signature changed.
  20. // 08/07/1998 Wait on thread handle to ensure all threads have exited.
  21. //
  22. ///////////////////////////////////////////////////////////////////////////////
  23. #include <nt.h>
  24. #include <ntrtl.h>
  25. #include <nturtl.h>
  26. #include <windows.h>
  27. #include <iascore.h>
  28. #include <process.h>
  29. #include <cstddef>
  30. #include <dispatcher.h>
  31. ///////////////////////////////////////////////////////////////////////////////
  32. //
  33. // METHOD
  34. //
  35. // Dispatcher::initialize
  36. //
  37. ///////////////////////////////////////////////////////////////////////////////
  38. BOOL Dispatcher::initialize(DWORD dwMaxThreads, DWORD dwMaxIdle) throw ()
  39. {
  40. // Initialize the various parameters.
  41. numThreads = 0;
  42. maxThreads = dwMaxThreads;
  43. available = 0;
  44. maxIdle = dwMaxIdle;
  45. // If maxThreads == 0, then we compute a suitable default.
  46. if (maxThreads == 0)
  47. {
  48. // Threads defaults to 64 times the number of processors.
  49. SYSTEM_INFO sinf;
  50. ::GetSystemInfo(&sinf);
  51. maxThreads = sinf.dwNumberOfProcessors * 64;
  52. }
  53. // Initialize the handles.
  54. hPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
  55. if (hPort == NULL)
  56. {
  57. return FALSE;
  58. }
  59. hEmpty = CreateEvent(NULL, TRUE, TRUE, NULL);
  60. if (hEmpty == NULL)
  61. {
  62. CloseHandle(hPort);
  63. hPort = NULL;
  64. return FALSE;
  65. }
  66. hLastOut = NULL;
  67. return TRUE;
  68. }
  69. ///////////////////////////////////////////////////////////////////////////////
  70. //
  71. // METHOD
  72. //
  73. // Dispatcher::finalize
  74. //
  75. ///////////////////////////////////////////////////////////////////////////////
  76. void Dispatcher::finalize()
  77. {
  78. Lock();
  79. // Block any new threads from being created.
  80. maxThreads = 0;
  81. // How many threads are still in the pool?
  82. DWORD remaining = numThreads;
  83. Unlock();
  84. // Post a null request for each existing thread.
  85. while (remaining--)
  86. {
  87. PostQueuedCompletionStatus(hPort, 0, 0, NULL);
  88. }
  89. // Wait until the pool is empty.
  90. WaitForSingleObject(hEmpty, INFINITE);
  91. if (hLastOut != NULL)
  92. {
  93. // Wait for the last thread to exit.
  94. WaitForSingleObject(hLastOut, INFINITE);
  95. }
  96. //////////
  97. // Clean-up the handles.
  98. //////////
  99. CloseHandle(hLastOut);
  100. hLastOut = NULL;
  101. CloseHandle(hEmpty);
  102. hEmpty = NULL;
  103. CloseHandle(hPort);
  104. hPort = NULL;
  105. }
  106. ///////////////////////////////////////////////////////////////////////////////
  107. //
  108. // METHOD
  109. //
  110. // Dispatcher::Dispatch
  111. //
  112. // DESCRIPTION
  113. //
  114. // This is the main loop for all the threads in the pool.
  115. //
  116. ///////////////////////////////////////////////////////////////////////////////
  117. inline void Dispatcher::fillRequests() throw ()
  118. {
  119. DWORD dwNumBytes;
  120. ULONG_PTR ulKey;
  121. PIAS_CALLBACK pRequest;
  122. //////////
  123. // Loop until we either timeout or get a null request.
  124. //////////
  125. next:
  126. BOOL success = GetQueuedCompletionStatus(hPort,
  127. &dwNumBytes,
  128. &ulKey,
  129. (OVERLAPPED**)&pRequest,
  130. maxIdle);
  131. if (pRequest)
  132. {
  133. pRequest->CallbackRoutine(pRequest);
  134. Lock();
  135. ++available;
  136. Unlock();
  137. goto next;
  138. }
  139. Lock();
  140. // We never want to timeout a thread while there's a backlog.
  141. if (available <= 0 && success == FALSE && GetLastError() == WAIT_TIMEOUT)
  142. {
  143. Unlock();
  144. goto next;
  145. }
  146. // Save the current value of 'last out' and replace it with our handle.
  147. HANDLE previousThread = hLastOut;
  148. hLastOut = NULL;
  149. DuplicateHandle(
  150. NtCurrentProcess(),
  151. NtCurrentThread(),
  152. NtCurrentProcess(),
  153. &hLastOut,
  154. 0,
  155. FALSE,
  156. DUPLICATE_SAME_ACCESS
  157. );
  158. // We're removing a thread from the pool, so update our state.
  159. --available;
  160. --numThreads;
  161. // If there are no threads left, set the 'empty' event.
  162. if (numThreads == 0) { SetEvent(hEmpty); }
  163. Unlock();
  164. // Wait until the previous thread exits. This guarantees that when the
  165. // 'last out' thread exits, all threads have exited.
  166. if (previousThread != NULL)
  167. {
  168. WaitForSingleObject(previousThread, INFINITE);
  169. CloseHandle(previousThread);
  170. }
  171. }
  172. ///////////////////////////////////////////////////////////////////////////////
  173. //
  174. // METHOD
  175. //
  176. // Dispatcher::RequestThread
  177. //
  178. ///////////////////////////////////////////////////////////////////////////////
  179. BOOL Dispatcher::requestThread(PIAS_CALLBACK OnStart) throw ()
  180. {
  181. Lock();
  182. // If there are no threads available AND we're below our limit,
  183. // create a new thread.
  184. if (--available < 0 && numThreads < maxThreads)
  185. {
  186. unsigned nThreadID;
  187. HANDLE hThread = (HANDLE)_beginthreadex(NULL,
  188. 0,
  189. startRoutine,
  190. (void*)this,
  191. 0,
  192. &nThreadID);
  193. if (hThread)
  194. {
  195. // We don't need the thread handle.
  196. CloseHandle(hThread);
  197. // We added a thread to the pool, so update our state.
  198. if (numThreads == 0) { ResetEvent(hEmpty); }
  199. ++numThreads;
  200. ++available;
  201. }
  202. }
  203. Unlock();
  204. //////////
  205. // Post it to the I/O Completion Port.
  206. //////////
  207. return PostQueuedCompletionStatus(hPort, 0, 0, (OVERLAPPED*)OnStart);
  208. }
  209. ///////////////////////////////////////////////////////////////////////////////
  210. //
  211. // METHOD
  212. //
  213. // Dispatcher::setMaxNumberOfThreads
  214. //
  215. ///////////////////////////////////////////////////////////////////////////////
  216. DWORD Dispatcher::setMaxNumberOfThreads(DWORD dwMaxThreads) throw ()
  217. {
  218. Lock();
  219. DWORD oldval = maxThreads;
  220. maxThreads = dwMaxThreads;
  221. Unlock();
  222. return oldval;
  223. }
  224. ///////////////////////////////////////////////////////////////////////////////
  225. //
  226. // METHOD
  227. //
  228. // Dispatcher::setMaxThreadIdle
  229. //
  230. ///////////////////////////////////////////////////////////////////////////////
  231. DWORD Dispatcher::setMaxThreadIdle(DWORD dwMilliseconds)
  232. {
  233. Lock();
  234. DWORD oldval = maxIdle;
  235. maxIdle = dwMilliseconds;
  236. Unlock();
  237. return oldval;
  238. }
  239. ///////////////////////////////////////////////////////////////////////////////
  240. //
  241. // METHOD
  242. //
  243. // Dispatcher::StartRoutine
  244. //
  245. ///////////////////////////////////////////////////////////////////////////////
  246. unsigned __stdcall Dispatcher::startRoutine(void* pArg) throw ()
  247. {
  248. ((Dispatcher*)pArg)->fillRequests();
  249. return 0;
  250. }