Leaked source code of windows server 2003
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1354 lines
44 KiB

  1. //-----------------------------------------------------------------------------
  2. //
  3. //
  4. // File: asyncq.inl
  5. //
  6. // Description: Implementation of templated CAsyncQueue class.
  7. //
  8. // Author: Mike Swafford (MikeSwa)
  9. //
  10. // History:
  11. // 7/17/98 - MikeSwa Created
  12. //
  13. // Copyright (C) 1998 Microsoft Corporation
  14. //
  15. //-----------------------------------------------------------------------------
  16. #ifndef __ASYNCQ_INL__
  17. #define __ASYNCQ_INL__
  18. #include "asyncq.h"
  19. #include "fifoqimp.h"
  20. #include "aqinst.h"
  21. //---[ CAsyncQueueBase::CAsyncQueueBase ]--------------------------------------
  22. //
  23. //
  24. // Description:
  25. // Default constructor for CAsyncQueueBase class
  26. // Parameters:
  27. // dwTemplateSignature - Signature used to identify the type of
  28. // templated super class this is associated with
  29. // when an ATQ completion routine is called
  30. // Returns:
  31. // -
  32. // History:
  33. // 7/18/98 - MikeSwa Created
  34. //
  35. //-----------------------------------------------------------------------------
  36. CAsyncQueueBase::CAsyncQueueBase(DWORD dwTemplateSignature) :
  37. CStateMachineBase(ASYNC_QUEUE_STATUS_NORMAL, ASYNC_QUEUE_STATE_MACHINE_SIG)
  38. {
  39. m_dwSignature = ASYNC_QUEUE_SIG;
  40. m_dwTemplateSignature = dwTemplateSignature;
  41. m_cMaxSyncThreads = 0;
  42. m_cCurrentSyncThreads = 0;
  43. m_cCurrentAsyncThreads = 0;
  44. m_cItemsPending = 0;
  45. m_cItemsPerATQThread = 0;
  46. m_cItemsPerSyncThread = 0;
  47. m_cScheduledWorkItems = 0;
  48. m_cCurrentCompletionThreads = 0;
  49. m_cCompletionThreadsRequested = 0;
  50. m_pvContext = NULL;
  51. m_pAtqContext = NULL;
  52. m_hAtqHandle = INVALID_SOCKET;
  53. m_cTotalAsyncCompletionThreads = 0;
  54. m_cTotalSyncCompletionThreads = 0;
  55. m_cTotalShortCircuitThreads = 0;
  56. m_cPendingAsyncCompletions = 0;
  57. m_cMaxPendingAsyncCompletions = 0;
  58. m_cThreadsNeeded = 0;
  59. }
  60. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::CAsyncQueue<PQDATA, TEMPLATE_SIG> ]--
  61. //
  62. //
  63. // Description:
  64. // Default constructor for CAsyncQueue
  65. // Parameters:
  66. // -
  67. // Returns:
  68. // -
  69. // History:
  70. // 7/17/98 - MikeSwa Created
  71. //
  72. //-----------------------------------------------------------------------------
  73. template<class PQDATA, DWORD TEMPLATE_SIG>
  74. CAsyncQueue<PQDATA, TEMPLATE_SIG>::CAsyncQueue<PQDATA, TEMPLATE_SIG>() :
  75. CAsyncQueueBase(TEMPLATE_SIG)
  76. {
  77. m_pfnQueueCompletion = NULL;
  78. m_pfnQueueFailure = NULL;
  79. m_pfnFailedItem = NULL;
  80. }
  81. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::~CAsyncQueue<PQDATA, TEMPLATE_SIG> ]--
  82. //
  83. //
  84. // Description:
  85. // Default desctructor for CAsyncQueue. Call Queue-mapping function to
  86. // to clear out the queue
  87. // Parameters:
  88. // -
  89. // Returns:
  90. // -
  91. // History:
  92. // 7/17/98 - MikeSwa Created
  93. //
  94. //-----------------------------------------------------------------------------
  95. template<class PQDATA, DWORD TEMPLATE_SIG>
  96. CAsyncQueue<PQDATA, TEMPLATE_SIG>::~CAsyncQueue<PQDATA, TEMPLATE_SIG>()
  97. {
  98. TraceFunctEnterEx((LPARAM) this, "CAsyncQueue<PQDATA, TEMPLATE_SIG>::~CAsyncQueue<PQDATA, TEMPLATE_SIG>");
  99. HRESULT hr = S_OK;
  100. DWORD cItems = 0;
  101. //
  102. // If this is off, then we may not actually be processing anything
  103. // (if we have hit the limit).
  104. //
  105. _ASSERT(!m_cPendingAsyncCompletions);
  106. hr = m_fqQueue.HrMapFn(HrClearQueueMapFn, m_pvContext, &cItems);
  107. if (FAILED(hr))
  108. ErrorTrace((LPARAM) this, "ERROR: Unable to Cleanup CAsyncQueue - hr 0x%08X", hr);
  109. else
  110. DecrementPendingCount(-((LONG) cItems));
  111. if (m_pAtqContext)
  112. {
  113. //Freeing context will close handle
  114. AtqFreeContext(m_pAtqContext, FALSE);
  115. }
  116. TraceFunctLeave();
  117. }
  118. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrInitialize ]-----------------------
  119. //
  120. //
  121. // Description:
  122. // Initializes CAsyncQueue with the neccessary information
  123. // Parameters:
  124. // cMaxSyncThreads The maximum # of threads that will be "stolen" from
  125. // the enqueuing threads and used to process items
  126. // from the front of the queue
  127. // cItemsPerATQThread Max # of items an ATQ thread will process from the
  128. // front of the queue before being released
  129. // cItemsPerSyncThread Max # of items a stolen sync thread will process
  130. // from the fron of the queeu before being released
  131. // pvContext Context pass to completion routines and queue-map
  132. // functions (can be NULL)
  133. // pfnQueueCompletion Function called to process a single item from
  134. // the front of the queue.
  135. // pfnFailedItem Function called if an internal resource failure
  136. // prevents an item from being queued or requeued
  137. // pfnQueueFailure Function called to walk the queues when the
  138. // completion function fails
  139. //
  140. // Note:
  141. // Queue completion functino has the following prototype
  142. // BOOL (*QCOMPFN)(PQDATA pqData, PVOID pvContext)
  143. // Returns TRUE if the item has been handles
  144. // FALSE if the item needs to be requeued
  145. //
  146. // Returns:
  147. // S_OK on success
  148. // E_INVALIDARG if the params are invalid:
  149. // cItemsPerThread is 0
  150. // pfnQueueCompletion is NULL
  151. // History:
  152. // 7/17/98 - MikeSwa Created
  153. // 2/3/99 - MikeSwa Added pfnFailedItem
  154. // 12/11/2000 - MikeSwa Added t-toddc's state table work
  155. //
  156. //-----------------------------------------------------------------------------
  157. template<class PQDATA, DWORD TEMPLATE_SIG>
  158. HRESULT CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrInitialize(
  159. DWORD cMaxSyncThreads,
  160. DWORD cItemsPerATQThread,
  161. DWORD cItemsPerSyncThread,
  162. PVOID pvContext,
  163. QCOMPFN pfnQueueCompletion,
  164. QCOMPFN pfnFailedItem,
  165. typename CFifoQueue<PQDATA>::MAPFNAPI pfnQueueFailure,
  166. DWORD cMaxPendingAsyncCompletions)
  167. {
  168. TraceFunctEnterEx((LPARAM) this, "CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrInitialize");
  169. HRESULT hr = S_OK;
  170. ThreadPoolInitialize();
  171. if (!cItemsPerATQThread || !cItemsPerSyncThread || !pfnQueueCompletion)
  172. {
  173. hr = E_INVALIDARG;
  174. goto Exit;
  175. }
  176. m_cMaxSyncThreads = cMaxSyncThreads;
  177. m_cItemsPerATQThread = (DWORD) cItemsPerATQThread;
  178. m_cItemsPerSyncThread = (DWORD) cItemsPerSyncThread;
  179. _ASSERT(m_cItemsPerATQThread > 0);
  180. m_pvContext = pvContext;
  181. m_pfnFailedItem = pfnFailedItem;
  182. m_pfnQueueCompletion = pfnQueueCompletion;
  183. m_pfnQueueFailure = pfnQueueFailure;
  184. m_cMaxPendingAsyncCompletions = cMaxPendingAsyncCompletions;
  185. //Create a dummy socket to handle async completion
  186. m_hAtqHandle = socket(AF_INET, SOCK_STREAM, 0);
  187. if (INVALID_SOCKET == m_hAtqHandle)
  188. {
  189. hr = HRESULT_FROM_WIN32(WSAGetLastError());
  190. ErrorTrace((LPARAM) this, "ERROR socket() failed - hr 0x%08X", hr);
  191. if (SUCCEEDED(hr))
  192. hr = E_FAIL;
  193. goto Exit;
  194. }
  195. //associate socket handle with ATQ
  196. if (!AtqAddAsyncHandle(&m_pAtqContext, NULL, this,
  197. AsyncQueueAtqCompletion, INFINITE, (HANDLE) m_hAtqHandle))
  198. {
  199. hr = HRESULT_FROM_WIN32(GetLastError());
  200. ErrorTrace((LPARAM) this, "ERROR AtqAddAsyncHandle failed - hr 0x%08X", hr);
  201. if (SUCCEEDED(hr))
  202. hr = E_FAIL;
  203. goto Exit;
  204. }
  205. // make sure state transition table is valid
  206. if (!fValidateStateTable())
  207. {
  208. _ASSERT(0 && "State Transition Table Invalid");
  209. hr = E_FAIL;
  210. goto Exit;
  211. }
  212. Exit:
  213. if (FAILED(hr))
  214. ThreadPoolDeinitialize();
  215. TraceFunctLeave();
  216. return hr;
  217. }
  218. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrDeinitialize ]---------------------
  219. //
  220. //
  221. // Description:
  222. // Walks queues with given function for shutdown
  223. // Parameters:
  224. // pfnQueueShutdown Queue-mapping function called on shutdown to
  225. // clean queues. If NULL, it will substitute
  226. // HrClearQueueMapFn which walks the queues and
  227. // releases all PQDATA in it
  228. // paqinst Shutdown context with server stop hint function
  229. // Returns:
  230. // S_OK on success
  231. // History:
  232. // 7/20/98 - MikeSwa Created
  233. //
  234. //-----------------------------------------------------------------------------
  235. template<class PQDATA, DWORD TEMPLATE_SIG>
  236. HRESULT CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrDeinitialize(
  237. typename CFifoQueue<PQDATA>::MAPFNAPI pfnQueueShutdown,
  238. CAQSvrInst *paqinst)
  239. {
  240. TraceFunctEnterEx((LPARAM) this, "CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrDeinitialize");
  241. HRESULT hr = S_OK;
  242. _ASSERT(paqinst);
  243. DWORD cItems = 0;
  244. //shutdown action has occurred.
  245. dwGetNextState(ASYNC_QUEUE_ACTION_SHUTDOWN);
  246. //wait until all requested threads have returned
  247. while (m_cCurrentCompletionThreads || m_cCompletionThreadsRequested)
  248. {
  249. if (paqinst)
  250. paqinst->ServerStopHintFunction();
  251. Sleep(1000);
  252. }
  253. //map shutdown function
  254. hr = m_fqQueue.HrMapFn(pfnQueueShutdown, paqinst, &cItems);
  255. if (FAILED(hr))
  256. ErrorTrace((LPARAM) this, "ERROR: Unable to Cleanup CAsyncQueue - hr 0x%08X", hr);
  257. else
  258. DecrementPendingCount(-((LONG)cItems));
  259. ThreadPoolDeinitialize();
  260. TraceFunctLeave();
  261. return hr;
  262. }
  263. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrQueueRequest ]---------------------
  264. //
  265. //
  266. // Description:
  267. // Queues request for async completion.
  268. // Parameters:
  269. // pqdata Data to pass to completion function
  270. // fRetry TRUE => Put item at front of queue (and don't use this
  271. // thread to process it).
  272. // FALSE => Queue normaly
  273. // Returns:
  274. // S_OK on success
  275. // E_OUTOFMEMORY if queue-related resources could not be allocated
  276. // History:
  277. // 7/17/98 - MikeSwa Created
  278. //
  279. //-----------------------------------------------------------------------------
  280. template<class PQDATA, DWORD TEMPLATE_SIG>
  281. HRESULT CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrQueueRequest(PQDATA pqdata,
  282. BOOL fRetry)
  283. {
  284. TraceFunctEnterEx((LPARAM) this, "CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrQueueRequest");
  285. HRESULT hr = S_OK;
  286. DWORD cCurrentSyncThreads;
  287. _ASSERT(m_pfnQueueCompletion);
  288. cCurrentSyncThreads = InterlockedIncrement((PLONG) &m_cCurrentSyncThreads);
  289. //If we are shutting down... do not bother to queue the message
  290. if (fInShutdown())
  291. goto Exit;
  292. IncrementPendingCount();
  293. //Only enqueue if there are others waiting
  294. if (fRetry || (m_cItemsPending > 1) ||
  295. (m_cMaxSyncThreads < cCurrentSyncThreads) || fShouldStopProcessing())
  296. {
  297. //Enqueue data
  298. if (fRetry && g_fRetryAtFrontOfAsyncQueue)
  299. hr = m_fqQueue.HrRequeue(pqdata);
  300. else
  301. hr = m_fqQueue.HrEnqueue(pqdata);
  302. if (FAILED(hr))
  303. {
  304. DecrementPendingCount();
  305. ErrorTrace((LPARAM) this, "ERROR: Unable to queue item for async handling - hr 0x%08X", hr);
  306. goto Exit;
  307. }
  308. //see if we can steal this thread thread to process queue entries
  309. //Only steal a thread if the following conditions are met:
  310. // - We have not exceeded our sync thread limit
  311. // - There are no async threads that could be doing the work
  312. // - We are not retrying something
  313. if (!fRetry && !fShouldStopProcessing() &&
  314. (m_cMaxSyncThreads >= cCurrentSyncThreads) &&
  315. !m_cCurrentAsyncThreads && !m_cCompletionThreadsRequested)
  316. {
  317. //Make sure there is work to be done
  318. if (fThreadNeededAndMarkWorkPending(TRUE))
  319. {
  320. //Steal thread
  321. StartThreadCompletionRoutine(TRUE);
  322. }
  323. }
  324. }
  325. else
  326. {
  327. //Steal this thread thread to proccess this item w/o hitting queue
  328. DecrementPendingCount();
  329. InterlockedIncrement((PLONG) &m_cTotalShortCircuitThreads);
  330. //Process Item & handle failure case
  331. if (!m_pfnQueueCompletion(pqdata, m_pvContext))
  332. {
  333. fHandleCompletionFailure(pqdata);
  334. }
  335. }
  336. //Always make sure there are enough threads to do the work (unless we are retrying)
  337. if (!fRetry)
  338. RequestCompletionThreadIfNeeded();
  339. Exit:
  340. InterlockedDecrement((PLONG) &m_cCurrentSyncThreads);
  341. return hr;
  342. TraceFunctLeave();
  343. }
  344. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::ProcessSingleQueueItem ]-------------
  345. //
  346. //
  347. // Description:
  348. // Processes a single item at the head of the queue. All failures need
  349. // to be handled internally
  350. // Parameters:
  351. // -
  352. // Returns:
  353. // SUCCEEDED(hr)) on success and we should continue
  354. // AQUEUE_E_QUEUE_EMPTY when there are no more items to process
  355. // E_FAIL if the completion call failed
  356. // Error code from HrDequeue on other failure.
  357. // History:
  358. // 7/17/98 - MikeSwa Created
  359. // 2/3/2000 - MikeSwa Modified to return an HRESULT
  360. //
  361. //-----------------------------------------------------------------------------
  362. template<class PQDATA, DWORD TEMPLATE_SIG>
  363. HRESULT CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrProcessSingleQueueItem()
  364. {
  365. HRESULT hr = S_OK;
  366. PQDATA pqdata = NULL;
  367. DWORD cItemsLeft = 0;
  368. BOOL fSucceeded = TRUE;
  369. hr = m_fqQueue.HrDequeue(&pqdata);
  370. if (SUCCEEDED(hr))
  371. {
  372. DecrementPendingCount();
  373. //We have data item - now process it
  374. fSucceeded = m_pfnQueueCompletion(pqdata, m_pvContext);
  375. if (fSucceeded || fHandleCompletionFailure(pqdata))
  376. {
  377. //Request another thread if
  378. // - we had at least 1 success.
  379. // - handle failure told use to continue
  380. RequestCompletionThreadIfNeeded();
  381. //If fHandleCompletionFailure said we succeeded, then continue
  382. fSucceeded = TRUE;
  383. }
  384. pqdata->Release();
  385. }
  386. //
  387. // If the dequeue succeeded but the completion failed, then return E_FAIL
  388. //
  389. if (!fSucceeded && (SUCCEEDED(hr)))
  390. hr = E_FAIL;
  391. return hr;
  392. }
  393. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::StartThreadCompletionRoutine ]-------
  394. //
  395. //
  396. // Description:
  397. // Starting point to completion threads. Each thread will attempt to
  398. // process m_cItemsPerATQThread items from the front of the queue
  399. // Parameters:
  400. // fSync TRUE if a sync thread, FALSE... this is an ATQ thread
  401. // Returns:
  402. // -
  403. // History:
  404. // 7/17/98 - MikeSwa Created
  405. // 2/3/2000 - MikeSwa Modified to fix window that would leave items
  406. // "stranded" in queue.
  407. //
  408. //-----------------------------------------------------------------------------
  409. template<class PQDATA, DWORD TEMPLATE_SIG>
  410. void CAsyncQueue<PQDATA, TEMPLATE_SIG>::StartThreadCompletionRoutine(BOOL fSync)
  411. {
  412. TraceFunctEnterEx((LPARAM) this, "CAsyncQueue::StartThreadCompletionRoutine");
  413. DWORD cItemsToProcess = (DWORD) (fSync ? m_cItemsPerSyncThread: m_cItemsPerATQThread);
  414. HRESULT hr = S_OK;
  415. DWORD dwInitialTickCount = 0;
  416. DWORD dwCurrentTickCount = 0;
  417. BOOL fRequestNewThread = TRUE;
  418. InterlockedIncrement((PLONG) &m_cCurrentCompletionThreads);
  419. if (fSync)
  420. InterlockedIncrement((PLONG) &m_cTotalSyncCompletionThreads);
  421. // obtain tick count immediately before processing
  422. dwInitialTickCount = GetTickCount();
  423. //process items until we fail or are done
  424. while (cItemsToProcess) { // all cases for quitting are handled in the loop
  425. hr = HrProcessSingleQueueItem();
  426. // If we fail to process an item we will stop working in this thread
  427. if (FAILED(hr)) {
  428. // We failed, do not request a thread to replace this one unless
  429. // the failure was "AQUEUE_E_QUEUE_EMPTY" in which case we need
  430. // to try another thread in case an item is added between here
  431. // and this thread's termination.
  432. if (hr != AQUEUE_E_QUEUE_EMPTY) {
  433. fRequestNewThread = FALSE;
  434. InterlockedIncrement((PLONG)&s_cThreadCompletion_Failure);
  435. }
  436. else {
  437. InterlockedIncrement((PLONG)&s_cThreadCompletion_QueueEmpty);
  438. }
  439. break;
  440. }
  441. // A note about cItemsToProcess and m_cScheduledWorkItems : It is
  442. // important that when this function completes we have decremented the
  443. // original value of cItemsToProcess from m_cScheduledWorkItems because
  444. // that is the number that was added when this thread was requested. We
  445. // will either subtract them here one by one (items that were processed)
  446. // or at the end (items that were not processed) but what is most
  447. // important is that the exact number is subtracted when the thread
  448. // completes (to maintain the validity of m_cScheduledWorkItems)
  449. // Decrement number of items to process and scheduled work count
  450. cItemsToProcess--;
  451. InterlockedDecrement((PLONG)&m_cScheduledWorkItems);
  452. // If we have processed all our scheduled items - drop out now
  453. if (!cItemsToProcess) {
  454. InterlockedIncrement((PLONG)&s_cThreadCompletion_CompletedScheduledItems);
  455. break;
  456. }
  457. // If there's nothing left in the queue - drop out now
  458. if (!m_cItemsPending) {
  459. InterlockedIncrement((PLONG)&s_cThreadCompletion_QueueEmpty);
  460. break;
  461. }
  462. // If we have been paused - drop out now
  463. if (fShouldStopProcessing()){
  464. InterlockedIncrement((PLONG)&s_cThreadCompletion_Paused);
  465. break;
  466. }
  467. // If we are using too many threads - drop out now
  468. if (!fIsThreadCountAcceptable()) {
  469. InterlockedIncrement((PLONG)&s_cThreadCompletion_UnacceptableThreadCount);
  470. break;
  471. }
  472. // If we have been processing for too long - drop out now
  473. dwCurrentTickCount = GetTickCount();
  474. if (dwCurrentTickCount - dwInitialTickCount > g_cMaxTicksPerATQThread) {
  475. InterlockedIncrement((PLONG)&s_cThreadCompletion_Timeout);
  476. break;
  477. }
  478. }
  479. // Subtract from the scheduled item count the number of items we did not process
  480. if (cItemsToProcess) {
  481. InterlockedExchangeAdd((PLONG)&m_cScheduledWorkItems, -((LONG)cItemsToProcess));
  482. }
  483. InterlockedDecrement((PLONG) &m_cCurrentCompletionThreads);
  484. // Always request another thread when completing unless we failed - we will let
  485. // the ThreadsNeeded logic handle throttling how many threads act on this queue
  486. // at a time.
  487. if (fRequestNewThread)
  488. RequestCompletionThreadIfNeeded();
  489. TraceFunctLeave();
  490. }
  491. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::fThreadNeededAndMarkWorkPending ]----
  492. //
  493. //
  494. // Description:
  495. // Determines if another worker thread is needed, and adjusts
  496. // m_cScheduledWorkItems accordingly. Callee is repsonsible for determining
  497. // if a thread can be allocated.
  498. // Parameters:
  499. // fSync TRUE if checking for a sync thread, FALSE... checking for an ATQ thread
  500. // Returns:
  501. // TRUE if another thread is needed (and member values adjusted accordingly)
  502. // FALSE if another thread is not needed to do work
  503. // History:
  504. // 7/18/98 - MikeSwa Created
  505. //
  506. //-----------------------------------------------------------------------------
  507. template<class PQDATA, DWORD TEMPLATE_SIG>
  508. BOOL CAsyncQueue<PQDATA, TEMPLATE_SIG>::fThreadNeededAndMarkWorkPending(BOOL fSync)
  509. {
  510. if (fInShutdown())
  511. {
  512. _ASSERT(!fSync && "CAQSvrInst should not call now!!!");
  513. return FALSE;
  514. }
  515. else if (fShouldStopProcessing())
  516. {
  517. return FALSE;
  518. }
  519. else if (m_cScheduledWorkItems < m_cItemsPending)
  520. {
  521. // There are unscheduled items - we need a thread
  522. // Schedule the right number of items
  523. InterlockedExchangeAdd((PLONG)&m_cScheduledWorkItems,
  524. (fSync ? m_cItemsPerSyncThread : m_cItemsPerATQThread));
  525. return TRUE;
  526. }
  527. else
  528. {
  529. return FALSE;
  530. }
  531. }
  532. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::RequestCompletionThreadIfNeeded ]----
  533. //
  534. //
  535. // Description:
  536. // Requests a queue completion thread if needed. Uses ATQ and handle
  537. // allocated to POQS for another thread. Makes sure that we do not
  538. // exceed the max # of async threads.
  539. // Parameters:
  540. // -
  541. // Returns:
  542. // -
  543. // History:
  544. // 7/18/98 - MikeSwa Created
  545. //
  546. //-----------------------------------------------------------------------------
  547. template<class PQDATA, DWORD TEMPLATE_SIG>
  548. void CAsyncQueue<PQDATA, TEMPLATE_SIG>::RequestCompletionThreadIfNeeded()
  549. {
  550. DWORD cThreadsRequested = m_cCompletionThreadsRequested;
  551. BOOL fThreadRequested = FALSE;
  552. // Can we have a thread?
  553. InterlockedIncrement((PLONG) &m_cCompletionThreadsRequested);
  554. if (fIsThreadCountAcceptable()) {
  555. // Do we want a thread?
  556. if (fThreadNeededAndMarkWorkPending(FALSE)) {
  557. // Request a thread
  558. fThreadRequested = TRUE;
  559. AtqPostCompletionStatus(m_pAtqContext, GetTickCount());
  560. }
  561. }
  562. // If we didn't request a thread, decrement the request count
  563. if (!fThreadRequested)
  564. InterlockedDecrement((PLONG) &m_cCompletionThreadsRequested);
  565. }
  566. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::fHandleCompletionFailure ]------------
  567. //
  568. //
  569. // Description:
  570. // Called when async completion function returns false... handles requeuing
  571. // data and record-keeping. Needs to handle the following:
  572. // Parameters:
  573. // pqdata - Data that triggered failure
  574. // Returns:
  575. // -
  576. // History:
  577. // 7/18/98 - MikeSwa Created
  578. // 8/14/98 - MikeSwa Modified to add failure handling
  579. //
  580. //-----------------------------------------------------------------------------
  581. template<class PQDATA, DWORD TEMPLATE_SIG>
  582. BOOL CAsyncQueue<PQDATA, TEMPLATE_SIG>::fHandleCompletionFailure(PQDATA pqdata)
  583. {
  584. HRESULT hr;
  585. DWORD cItemsRemoved = 0;
  586. if (fInShutdown())
  587. return FALSE;
  588. if (g_fRetryAtFrontOfAsyncQueue)
  589. hr = m_fqQueue.HrRequeue(pqdata);
  590. else
  591. hr = m_fqQueue.HrEnqueue(pqdata);
  592. if (SUCCEEDED(hr))
  593. {
  594. IncrementPendingCount();
  595. }
  596. else
  597. HandleDroppedItem(pqdata);
  598. //call failure routine (if present)
  599. if (m_pfnQueueFailure)
  600. {
  601. hr = m_fqQueue.HrMapFn(m_pfnQueueFailure, m_pvContext, &cItemsRemoved);
  602. if (SUCCEEDED(hr))
  603. {
  604. //Adjust appropriate counters
  605. DecrementPendingCount(-((LONG)cItemsRemoved));
  606. }
  607. }
  608. return FALSE;
  609. }
  610. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::HandleDroppedItem ]------------------
  611. //
  612. //
  613. // Description:
  614. // Handles a dropped PQDATA by calling the callback provided at start
  615. // up
  616. // Parameters:
  617. // pqData
  618. // Returns:
  619. //
  620. // History:
  621. // 2/3/99 - MikeSwa Created
  622. //
  623. //-----------------------------------------------------------------------------
  624. template<class PQDATA, DWORD TEMPLATE_SIG>
  625. void CAsyncQueue<PQDATA, TEMPLATE_SIG>::HandleDroppedItem(PQDATA pqData)
  626. {
  627. if (m_pfnFailedItem)
  628. m_pfnFailedItem(pqData, m_pvContext);
  629. }
  630. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrMapFn ]----------------------------
  631. //
  632. //
  633. // Description:
  634. // Calls a function on every message in the queue
  635. // Parameters:
  636. // IN pfnQueueFn Function to call for every message
  637. // IN pvContext Context passed to completion function
  638. // Returns:
  639. // S_OK on success
  640. // Error code from CFifoQueue<PQDATA>::HrMapFn
  641. // History:
  642. // 2/23/99 - MikeSwa Created
  643. //
  644. //-----------------------------------------------------------------------------
  645. template<class PQDATA, DWORD TEMPLATE_SIG>
  646. HRESULT CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrMapFn(
  647. typename CFifoQueue<PQDATA>::MAPFNAPI pfnQueueFn,
  648. PVOID pvContext)
  649. {
  650. DWORD cItems = 0;
  651. HRESULT hr = S_OK;
  652. hr = m_fqQueue.HrMapFn(pfnQueueFn, pvContext, &cItems);
  653. if (SUCCEEDED(hr))
  654. {
  655. DecrementPendingCount(-((LONG)cItems));
  656. }
  657. return hr;
  658. }
  659. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::UnpauseQueue ]-----------------------
  660. //
  661. //
  662. // Description:
  663. // Unpauses a queue by unsetting the ASYNC_QUEUE_STATUS_PAUSED bit and
  664. // requesting threads if neccessary.
  665. // Parameters:
  666. // -
  667. // Returns:
  668. // -
  669. // History:
  670. // 1/24/2000 - MikeSwa Created
  671. // 6/12/2000 - t-toodc modified to use state machine functionality
  672. //
  673. //-----------------------------------------------------------------------------
  674. template<class PQDATA, DWORD TEMPLATE_SIG>
  675. void CAsyncQueue<PQDATA, TEMPLATE_SIG>::UnpauseQueue()
  676. {
  677. BOOL fStopped = fShouldStopProcessing();
  678. dwGetNextState(ASYNC_QUEUE_ACTION_UNPAUSE);
  679. //
  680. // The queue *was* paused. We should make sure that we reqest threads
  681. // if there are items to process.
  682. //
  683. if (fStopped && !fShouldStopProcessing()) {
  684. UpdateThreadsNeeded();
  685. RequestCompletionThreadIfNeeded();
  686. }
  687. }
  688. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::ThawQueue ]-----------------------
  689. //
  690. // Description:
  691. // Thaws a queue and sets the next state, requesting a completion thread
  692. // if necessary
  693. // Parameters:
  694. // -
  695. // Returns:
  696. // -
  697. // History:
  698. // 1/24/2000 - MikeSwa Created
  699. // 6/12/2000 - t-toodc modified to use state machine functionality
  700. //
  701. //-----------------------------------------------------------------------------
  702. template<class PQDATA, DWORD TEMPLATE_SIG>
  703. void CAsyncQueue<PQDATA, TEMPLATE_SIG>::ThawQueue()
  704. {
  705. BOOL fStopped = fShouldStopProcessing();
  706. dwGetNextState(ASYNC_QUEUE_ACTION_THAW);
  707. //
  708. // The queue *was* frozen. We should make sure that we reqest threads
  709. // if there are items to process.
  710. //
  711. if (fStopped && !fShouldStopProcessing())
  712. {
  713. // Update threads needed, we need threads again
  714. UpdateThreadsNeeded();
  715. RequestCompletionThreadIfNeeded();
  716. }
  717. }
  718. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::IncPendingAsyncCompletions ]---------
  719. //
  720. //
  721. // Description:
  722. // Increments the pending async completion count. If the async queue
  723. // feeds into something that may complete async (like CatMsg). In this
  724. // case, we may want to throttle the number of outstanding completions
  725. // we have (i.e.- too avoid having too many active messages)
  726. //
  727. // If we have hit our limit, then this call with pause the queue.
  728. // Parameters:
  729. // -
  730. // Returns:
  731. // -
  732. // History:
  733. // 1/24/2000 - MikeSwa Created
  734. //
  735. //-----------------------------------------------------------------------------
  736. template<class PQDATA, DWORD TEMPLATE_SIG>
  737. void CAsyncQueue<PQDATA, TEMPLATE_SIG>::IncPendingAsyncCompletions()
  738. {
  739. InterlockedIncrement((PLONG) &m_cPendingAsyncCompletions);
  740. //
  741. // Check against limit if we have one
  742. //
  743. if (m_cMaxPendingAsyncCompletions &&
  744. (m_cPendingAsyncCompletions > m_cMaxPendingAsyncCompletions))
  745. {
  746. PauseQueue();
  747. }
  748. }
  749. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::DecPendingAsyncCompletions ]---------
  750. //
  751. //
  752. // Description:
  753. // Decrements the pending async completion count. If we drop below our
  754. // threshold, then we will unpause the queue.
  755. // Parameters:
  756. //
  757. // Returns:
  758. //
  759. // History:
  760. // 1/24/2000 - MikeSwa Created
  761. //
  762. //-----------------------------------------------------------------------------
  763. template<class PQDATA, DWORD TEMPLATE_SIG>
  764. void CAsyncQueue<PQDATA, TEMPLATE_SIG>::DecPendingAsyncCompletions()
  765. {
  766. InterlockedDecrement((PLONG) &m_cPendingAsyncCompletions);
  767. if (m_cMaxPendingAsyncCompletions &&
  768. (m_cPendingAsyncCompletions < m_cMaxPendingAsyncCompletions))
  769. {
  770. UnpauseQueue();
  771. }
  772. }
  773. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::fNoPendingAsyncCompletions ]---------
  774. //
  775. //
  776. // Description:
  777. // Are there any pending async completions?
  778. // Parameters:
  779. //
  780. // Returns:
  781. // BOOL
  782. //
  783. // History:
  784. // 11/01/2000 - Awetmore created
  785. //
  786. //-----------------------------------------------------------------------------
  787. template<class PQDATA, DWORD TEMPLATE_SIG>
  788. BOOL CAsyncQueue<PQDATA, TEMPLATE_SIG>::fNoPendingAsyncCompletions()
  789. {
  790. return (m_cPendingAsyncCompletions == 0);
  791. }
  792. //---[ CAsyncQueue<PQDATA, TEMPLATE_SIG>::dwQueueAdminLinkGetLinkState ]-------
  793. //
  794. //
  795. // Description:
  796. // Gets the Queue admin state of this queue. This is different depending
  797. // on the type of async queue this is (normal vs. retry).
  798. // Parameters:
  799. // -
  800. // Returns:
  801. // returns the QAPI link flags describing what state this link is in
  802. // History:
  803. // 3/3/2000 - MikeSwa Created (moved from mailadmq.cpp)
  804. //
  805. //-----------------------------------------------------------------------------
  806. template<class PQDATA, DWORD TEMPLATE_SIG>
  807. DWORD CAsyncQueue<PQDATA, TEMPLATE_SIG>::dwQueueAdminLinkGetLinkState()
  808. {
  809. //
  810. //Queue is in retry if there are items pending and no threads are
  811. //processing them or it is active if there are items pending and
  812. //threads processing them. If there are no items then it is ready.
  813. //
  814. if (fIsFrozen())
  815. return LI_FROZEN;
  816. else if (fIsPaused())
  817. return LI_READY;
  818. else if (0 != cGetItemsPending() && 0 == dwGetTotalThreads())
  819. return LI_RETRY;
  820. else if (0 != m_pammq->cGetItemsPending())
  821. return LI_ACTIVE;
  822. else
  823. return LI_READY;
  824. }
  825. //---[ CAsyncQueueBase::UpdateThreadsNeeded ]----------------------------------
  826. //
  827. //
  828. // Description:
  829. // Update the threads needed counter locally and globally. This thread
  830. // need is only the need for ASYNC threads but does take into account the
  831. // fact that some items may already be scheduled for sync threads.
  832. // Parameters:
  833. // -
  834. // Returns:
  835. // -
  836. // History:
  837. // 11/16/2000 - dbraun - created
  838. //
  839. //-----------------------------------------------------------------------------
  840. template<class PQDATA, DWORD TEMPLATE_SIG>
  841. void CAsyncQueue<PQDATA, TEMPLATE_SIG>::UpdateThreadsNeeded()
  842. {
  843. DWORD cNewThreadsNeeded = 0;
  844. DWORD cOldThreadsNeeded = 0;
  845. LONG lUnscheduledItems = 0; // may be negative if we over-committed
  846. // We only get threads if we are not paused and we have items
  847. if(!fShouldStopProcessing() && m_cItemsPending) {
  848. // We always need the threads we have (or have requested)
  849. cNewThreadsNeeded = m_cCompletionThreadsRequested + m_cCurrentCompletionThreads;
  850. // Number of pending items that we have not already scheduled threads for
  851. lUnscheduledItems = m_cItemsPending - m_cScheduledWorkItems;
  852. // If we have unscheduled items, we need some more threads than we have
  853. if (lUnscheduledItems > 0) {
  854. _ASSERT(m_cItemsPerATQThread);
  855. cNewThreadsNeeded += (lUnscheduledItems / m_cItemsPerATQThread) + 1;
  856. }
  857. }
  858. if (cNewThreadsNeeded == m_cThreadsNeeded)
  859. return; // nothing to do here ...
  860. cOldThreadsNeeded = InterlockedExchange ((LPLONG) &m_cThreadsNeeded, cNewThreadsNeeded);
  861. InterlockedExchangeAdd((LPLONG) &g_cTotalThreadsNeeded, cNewThreadsNeeded - cOldThreadsNeeded);
  862. }
  863. //---[ CAsyncQueueBase::fIsThreadCountAcceptable ]-----------------------------
  864. //
  865. //
  866. // Description:
  867. // Checks whether the current thread count for this queue is acceptable.
  868. // This is used to release threads and to allow new threads to start on
  869. // this queue.
  870. // Parameters:
  871. // -
  872. // Returns:
  873. // TRUE : Count is acceptable
  874. // History:
  875. // 11/16/2000 - dbraun - created
  876. //
  877. //-----------------------------------------------------------------------------
  878. template<class PQDATA, DWORD TEMPLATE_SIG>
  879. BOOL CAsyncQueue<PQDATA, TEMPLATE_SIG>::fIsThreadCountAcceptable()
  880. {
  881. // Get these values once to make sure we are consistent and to prevent div/0
  882. DWORD cGlobalThreadsNeeded = g_cTotalThreadsNeeded;
  883. DWORD cThreadsNeeded = m_cThreadsNeeded;
  884. DWORD cThreadsAllowed = 0;
  885. DWORD cThreadsActiveAndPending = m_cCompletionThreadsRequested + m_cCurrentCompletionThreads;
  886. // We are only allowed threads if we are not paused and we have items pending
  887. if(!fShouldStopProcessing() && m_cItemsPending) {
  888. // Below we calculate how many threads are allowed. This can be more or less than
  889. // the number of threads needed and is only used to limit thread counts, it does
  890. // not mean that we will actually use the total threads allowed.
  891. // The number of threads allowed is based on the max threads and how many threads
  892. // this queue needs when compared with the rest of the queues that want threads
  893. if (cGlobalThreadsNeeded) {
  894. cThreadsAllowed = s_cDefaultMaxAsyncThreads * cThreadsNeeded / cGlobalThreadsNeeded;
  895. }
  896. // One thread is always acceptable
  897. if (!cThreadsAllowed) {
  898. cThreadsAllowed = 1;
  899. }
  900. }
  901. return (cThreadsAllowed >= cThreadsActiveAndPending);
  902. }
  903. //---[ CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::CAsyncRetryQueue ]--------------
  904. //
  905. //
  906. // Description:
  907. // Default constructor for CAsyncRetryQueue
  908. // Parameters:
  909. // -
  910. // Returns:
  911. // -
  912. // History:
  913. // 2/5/99 - MikeSwa Created
  914. //
  915. //-----------------------------------------------------------------------------
  916. template<class PQDATA, DWORD TEMPLATE_SIG>
  917. CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::CAsyncRetryQueue()
  918. {
  919. m_dwRetrySignature = ASYNC_RETRY_QUEUE_SIG;
  920. m_cRetryItems = 0;
  921. }
  922. //---[ CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::~CAsyncRetryQueue ]-------------
  923. //
  924. //
  925. // Description:
  926. // Default destructor for CAsyncRetryQueue. Walks retry queue to release
  927. // items on it.
  928. // Parameters:
  929. // -
  930. // Returns:
  931. // -
  932. // History:
  933. // 2/5/99 - MikeSwa Created
  934. //
  935. //-----------------------------------------------------------------------------
  936. template<class PQDATA, DWORD TEMPLATE_SIG>
  937. CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::~CAsyncRetryQueue()
  938. {
  939. m_fqRetryQueue.HrMapFn(HrClearQueueMapFn, m_pvContext, NULL);
  940. }
  941. //---[ CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::HrDeinitialize ]----------------
  942. //
  943. //
  944. // Description:
  945. // Walks queues with given function for shutdown
  946. // Parameters:
  947. // pfnQueueShutdown Queue-mapping function called on shutdown to
  948. // clean queues. If NULL, it will substitute
  949. // HrClearQueueMapFn which walks the queues and
  950. // releases all PQDATA in it
  951. // paqinst Shutdown context with server stop hint function
  952. // Returns:
  953. // S_OK on success
  954. // History:
  955. // 2/5/99 - MikeSwa Created
  956. //
  957. //-----------------------------------------------------------------------------
  958. template<class PQDATA, DWORD TEMPLATE_SIG>
  959. HRESULT CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::HrDeinitialize(
  960. typename CFifoQueue<PQDATA>::MAPFNAPI pfnQueueShutdown,
  961. CAQSvrInst *paqinst)
  962. {
  963. TraceFunctEnterEx((LPARAM) this, "CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::HrDeinitialize");
  964. HRESULT hr = S_OK;
  965. DWORD cItems = 0;
  966. _ASSERT(paqinst);
  967. CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrDeinitialize(pfnQueueShutdown,
  968. paqinst);
  969. //map shutdown function
  970. hr = m_fqRetryQueue.HrMapFn(pfnQueueShutdown, paqinst, &cItems);
  971. if (FAILED(hr))
  972. ErrorTrace((LPARAM) this, "ERROR: Unable to Cleanup CAsyncQueue - hr 0x%08X", hr);
  973. else
  974. dwInterlockedAddSubtractDWORD(&m_cRetryItems, cItems, FALSE);
  975. TraceFunctLeave();
  976. return hr;
  977. }
  978. //---[ CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::fHandleCompletionFailure ]------
  979. //
  980. //
  981. // Description:
  982. // Called when async completion function returns false... handles requeuing
  983. // data to the retry queue
  984. // Parameters:
  985. // pqdata - Data that triggered failure
  986. // Returns:
  987. // -
  988. // History:
  989. // 2/5/99 - MikeSwa Created
  990. //
  991. //-----------------------------------------------------------------------------
  992. template<class PQDATA, DWORD TEMPLATE_SIG>
  993. BOOL CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::fHandleCompletionFailure(PQDATA pqdata)
  994. {
  995. HRESULT hr;
  996. DWORD cItemsRemoved = 0;
  997. if (fInShutdown())
  998. return FALSE;
  999. //Requeue failed item to retry queue... a possible interesting thing to do
  1000. //here would be to run the failed item through the failure function
  1001. //(without the queue) to generate DSNs and see if the item actually need to
  1002. //be queues.
  1003. hr = m_fqRetryQueue.HrRequeue(pqdata);
  1004. if (SUCCEEDED(hr))
  1005. InterlockedIncrement((PLONG) &m_cRetryItems);
  1006. else
  1007. HandleDroppedItem(pqdata);
  1008. return TRUE;
  1009. }
  1010. //---[ CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::HrQueueRequest ]----------------
  1011. //
  1012. //
  1013. // Description:
  1014. // Queue a request for a retry queue.
  1015. // Parameters:
  1016. // pqdata Data to pass to completion function
  1017. // fRetry TRUE => Put item in retry queue until queue is kicked
  1018. // FALSE => Queue normaly
  1019. // Returns:
  1020. // S_OK on success
  1021. // E_OUTOFMEMORY if queue-related resources could not be allocated
  1022. // History:
  1023. // 3/3/2000 - MikeSwa Created
  1024. //
  1025. //-----------------------------------------------------------------------------
  1026. template<class PQDATA, DWORD TEMPLATE_SIG>
  1027. HRESULT CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::HrQueueRequest(PQDATA pqdata,
  1028. BOOL fRetry)
  1029. {
  1030. TraceFunctEnterEx((LPARAM) this, "CAsyncRetryQueue<>::HrQueueRequest");
  1031. HRESULT hr = S_OK;
  1032. //
  1033. // Handle as failure if retry (will put the item in the retry queue).
  1034. // Otherwise pass to base implementation (queue to normal asyncq).
  1035. //
  1036. if (fRetry)
  1037. fHandleCompletionFailure(pqdata);
  1038. else
  1039. hr = CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrQueueRequest(pqdata, fRetry);
  1040. TraceFunctLeave();
  1041. return hr;
  1042. }
  1043. //---[ CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::HrMapFn ]-----------------------
  1044. //
  1045. //
  1046. // Description:
  1047. // Calls a function on every message in the queue
  1048. // Parameters:
  1049. // IN pfnQueueFn Function to call for every message
  1050. // IN pvContext Context passed to completion function
  1051. // Returns:
  1052. // S_OK on success
  1053. // Error code from CFifoQueue<PQDATA>::HrMapFn
  1054. // History:
  1055. // 2/23/99 - MikeSwa Created
  1056. //
  1057. //-----------------------------------------------------------------------------
  1058. template<class PQDATA, DWORD TEMPLATE_SIG>
  1059. HRESULT CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::HrMapFn(
  1060. typename CFifoQueue<PQDATA>::MAPFNAPI pfnQueueFn,
  1061. PVOID pvContext)
  1062. {
  1063. HRESULT hr = S_OK;
  1064. hr = HrMapFnBaseQueue(pfnQueueFn, pvContext);
  1065. if (FAILED(hr))
  1066. goto Exit;
  1067. hr = HrMapFnRetryQueue(pfnQueueFn, pvContext);
  1068. if (FAILED(hr))
  1069. goto Exit;
  1070. Exit:
  1071. return hr;
  1072. }
  1073. //---[ CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::HrMapFnBaseQueue ]--------------
  1074. //
  1075. //
  1076. // Description:
  1077. // Calls a function on every message in the base (non-retry) queue
  1078. // Parameters:
  1079. // IN pfnQueueFn Function to call for every message
  1080. // IN pvContext Context passed to completion function
  1081. // Returns:
  1082. // S_OK on success
  1083. // Error code from CFifoQueue<PQDATA>::HrMapFn
  1084. // History:
  1085. // 2/23/99 - MikeSwa Created
  1086. // 1/10/2001 - MikeSwa Modified from HrMapFn
  1087. //
  1088. //-----------------------------------------------------------------------------
  1089. template<class PQDATA, DWORD TEMPLATE_SIG>
  1090. HRESULT CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::HrMapFnBaseQueue(
  1091. typename CFifoQueue<PQDATA>::MAPFNAPI pfnQueueFn,
  1092. PVOID pvContext)
  1093. {
  1094. HRESULT hr = S_OK;
  1095. hr = CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrMapFn(pfnQueueFn, pvContext);
  1096. return hr;
  1097. }
  1098. //---[ CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::HrMapFnRetryQueue ]-------------
  1099. //
  1100. //
  1101. // Description:
  1102. // Calls a function on every message in the queue retry
  1103. // Parameters:
  1104. // IN pfnQueueFn Function to call for every message
  1105. // IN pvContext Context passed to completion function
  1106. // Returns:
  1107. // S_OK on success
  1108. // Error code from CFifoQueue<PQDATA>::HrMapFn
  1109. // History:
  1110. // 2/23/99 - MikeSwa Created
  1111. // 1/10/2001 - MikeSwa Modified from HrMapFn
  1112. //
  1113. //-----------------------------------------------------------------------------
  1114. template<class PQDATA, DWORD TEMPLATE_SIG>
  1115. HRESULT CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::HrMapFnRetryQueue(
  1116. typename CFifoQueue<PQDATA>::MAPFNAPI pfnQueueFn,
  1117. PVOID pvContext)
  1118. {
  1119. DWORD cItems = 0;
  1120. HRESULT hr = S_OK;
  1121. hr = m_fqRetryQueue.HrMapFn(pfnQueueFn, pvContext, &cItems);
  1122. if (SUCCEEDED(hr))
  1123. dwInterlockedAddSubtractDWORD(&m_cRetryItems, cItems, FALSE);
  1124. return hr;
  1125. }
  1126. //---[ CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::MergeRetryQueue ]---------------
  1127. //
  1128. //
  1129. // Description:
  1130. // Merges retry queue into normal queue
  1131. // Parameters:
  1132. // -
  1133. // Returns:
  1134. // -
  1135. // History:
  1136. // 2/5/99 - MikeSwa Created
  1137. //
  1138. //-----------------------------------------------------------------------------
  1139. template<class PQDATA, DWORD TEMPLATE_SIG>
  1140. void CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::MergeRetryQueue()
  1141. {
  1142. DWORD cItemsRemoved = 0;
  1143. PQDATA pqData = NULL;
  1144. HRESULT hr = S_OK;
  1145. if (fInShutdown())
  1146. return;
  1147. //call failure routine (if present)
  1148. if (m_pfnQueueFailure)
  1149. {
  1150. hr = m_fqRetryQueue.HrMapFn(m_pfnQueueFailure, m_pvContext, &cItemsRemoved);
  1151. if (SUCCEEDED(hr))
  1152. {
  1153. //Adjust appropriate counters
  1154. InterlockedExchangeAdd((PLONG) &m_cRetryItems, -((LONG) cItemsRemoved));
  1155. }
  1156. }
  1157. //Now remerge queue
  1158. hr = S_OK;
  1159. while (SUCCEEDED(hr))
  1160. {
  1161. pqData = NULL;
  1162. hr = m_fqRetryQueue.HrDequeue(&pqData);
  1163. if (FAILED(hr))
  1164. break;
  1165. _ASSERT(pqData);
  1166. InterlockedDecrement((PLONG) &m_cRetryItems);
  1167. //Queue request as retry so we know thread will not be stolen
  1168. hr = CAsyncQueue<PQDATA, TEMPLATE_SIG>::HrQueueRequest(pqData, TRUE);
  1169. if (FAILED(hr))
  1170. HandleDroppedItem(pqData);
  1171. pqData->Release();
  1172. }
  1173. }
  1174. //---[ CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::dwQueueAdminLinkGetLinkState ]--
  1175. //
  1176. //
  1177. // Description:
  1178. // Gets the Queue admin state of this queue. This is different depending
  1179. // on the type of async queue this is (normal vs. retry).
  1180. // Parameters:
  1181. // -
  1182. // Returns:
  1183. // returns the QAPI link flags describing what state this link is in
  1184. // History:
  1185. // 3/3/2000 - MikeSwa Created (moved from localq.cpp)
  1186. //
  1187. //-----------------------------------------------------------------------------
  1188. template<class PQDATA, DWORD TEMPLATE_SIG>
  1189. DWORD CAsyncRetryQueue<PQDATA, TEMPLATE_SIG>::dwQueueAdminLinkGetLinkState()
  1190. {
  1191. //If we items in retry and others... mark it in retry
  1192. //If we have items pending.. it is active
  1193. //Otherwise it is ready
  1194. if (fIsFrozen())
  1195. return LI_FROZEN;
  1196. else if (fIsPaused())
  1197. return LI_READY;
  1198. else if ((0 != cGetItemsPendingRetry()) && (0 == cGetItemsPending()))
  1199. return LI_RETRY;
  1200. else if (0 != cGetItemsPending())
  1201. return LI_ACTIVE;
  1202. else
  1203. return LI_READY;
  1204. }
  1205. #endif //__ASYNCQ_INL__