Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1325 lines
32 KiB

  1. //******************************************************************************
  2. //
  3. // QSINK.CPP
  4. //
  5. // Copyright (C) 1996-1999 Microsoft Corporation
  6. //
  7. //******************************************************************************
  8. #include "precomp.h"
  9. #include <stdio.h>
  10. #include <genutils.h>
  11. #include <cominit.h>
  12. #include "ess.h"
  13. #include "evsink.h"
  14. #include "delivrec.h"
  15. /*****************************************************************************
  16. CQueueingEventSink
  17. ******************************************************************************/
  18. class CSpinLock
  19. {
  20. protected:
  21. long m_lCount;
  22. public:
  23. CSpinLock() : m_lCount(-1){}
  24. ~CSpinLock(){}
  25. void Enter();
  26. void Leave();
  27. };
  28. class CInSpinLock
  29. {
  30. protected:
  31. CSpinLock* m_p;
  32. public:
  33. CInSpinLock(CSpinLock* p) : m_p(p) {m_p->Enter();}
  34. ~CInSpinLock() {m_p->Leave();}
  35. };
  36. void CSpinLock::Leave()
  37. {
  38. InterlockedDecrement(&m_lCount);
  39. }
  40. #define IN_SPIN_LOCK CInCritSec
  41. #define MAX_EVENT_DELIVERY_SIZE 10000000
  42. #define SLOWDOWN_DROP_LIMIT 1000
  43. #define DELIVER_SPIN_COUNT 1000
  44. CQueueingEventSink::CQueueingEventSink(CEssNamespace* pNamespace)
  45. : m_pNamespace(pNamespace), m_bDelivering(FALSE), m_dwTotalSize(0),
  46. m_dwMaxSize(0xFFFFFFFF), m_wszName(NULL), m_bRecovering(FALSE),
  47. m_hRecoveryComplete(NULL), m_hrRecovery(S_OK)
  48. {
  49. m_pNamespace->AddRef();
  50. m_pNamespace->AddCache();
  51. }
  52. CQueueingEventSink::~CQueueingEventSink()
  53. {
  54. if ( m_hRecoveryComplete != NULL )
  55. {
  56. CloseHandle( m_hRecoveryComplete );
  57. }
  58. delete m_wszName;
  59. m_pNamespace->RemoveCache();
  60. m_pNamespace->Release();
  61. }
  62. HRESULT CQueueingEventSink::SetName( LPCWSTR wszName )
  63. {
  64. if ( m_wszName != NULL )
  65. {
  66. return WBEM_E_CRITICAL_ERROR;
  67. }
  68. m_wszName = new WCHAR[wcslen(wszName)+1];
  69. if ( m_wszName == NULL )
  70. {
  71. return WBEM_E_OUT_OF_MEMORY;
  72. }
  73. wcscpy( m_wszName, wszName );
  74. return WBEM_S_NO_ERROR;
  75. }
  76. STDMETHODIMP CQueueingEventSink::SecureIndicate( long lNumEvents,
  77. IWbemEvent** apEvents,
  78. BOOL bMaintainSecurity,
  79. BOOL bSlowDown,
  80. DWORD dwQoS,
  81. CEventContext* pContext)
  82. {
  83. // BUGBUG: context. levn: no security implications at this level --- we
  84. // are past the filter
  85. HRESULT hres;
  86. DWORD dwSleep = 0;
  87. // If security needs to be maintained, record the calling security
  88. // context
  89. // ===============================================================
  90. IWbemCallSecurity* pSecurity = NULL;
  91. if(bMaintainSecurity && IsNT())
  92. {
  93. pSecurity = CWbemCallSecurity::CreateInst();
  94. if (pSecurity == 0)
  95. return WBEM_E_OUT_OF_MEMORY;
  96. hres = pSecurity->CloneThreadContext(FALSE);
  97. if(FAILED(hres))
  98. {
  99. pSecurity->Release();
  100. return hres;
  101. }
  102. }
  103. CReleaseMe rmpSecurity( pSecurity );
  104. HRESULT hr;
  105. BOOL bSchedule = FALSE;
  106. for(int i = 0; i < lNumEvents; i++)
  107. {
  108. CWbemPtr<CDeliveryRecord> pRecord;
  109. //
  110. // TODO: Fix this so that we put multiple events in the record.
  111. //
  112. hr = GetDeliveryRecord( 1,
  113. &apEvents[i],
  114. dwQoS,
  115. pContext,
  116. pSecurity,
  117. &pRecord );
  118. if ( FAILED(hr) )
  119. {
  120. ERRORTRACE((LOG_ESS, "Couldn't create delivery record for %S "
  121. " sink. HR = 0x%x\n", m_wszName, hr ));
  122. ReportQosFailure( apEvents[i], hr );
  123. continue;
  124. }
  125. DWORD dwThisSleep;
  126. BOOL bFirst;
  127. if( !AddRecord( pRecord, bSlowDown, &dwThisSleep, &bFirst) )
  128. {
  129. //
  130. // make sure that we give the record a chance to perform any post
  131. // deliver actions before getting rid of it.
  132. //
  133. pRecord->PostDeliverAction( NULL, S_OK );
  134. return WBEM_E_OUT_OF_MEMORY;
  135. }
  136. dwSleep += dwThisSleep;
  137. if(bFirst)
  138. bSchedule = TRUE;
  139. }
  140. if(bSchedule)
  141. {
  142. // DeliverAll();
  143. // TRACE((LOG_ESS, "Scheduling delivery!!\n"));
  144. hres = m_pNamespace->ScheduleDelivery(this);
  145. }
  146. else
  147. {
  148. // TRACE((LOG_ESS, "NOT Scheduling delivery!!\n"));
  149. hres = WBEM_S_FALSE;
  150. }
  151. if(dwSleep && bSlowDown)
  152. m_pNamespace->AddSleepCharge(dwSleep);
  153. return hres;
  154. }
  155. BOOL CQueueingEventSink::AddRecord( CDeliveryRecord* pRecord,
  156. BOOL bSlowDown,
  157. DWORD* pdwSleep,
  158. BOOL* pbFirst )
  159. {
  160. // Inform the system of the additional space in the queue
  161. // ======================================================
  162. DWORD dwRecordSize = pRecord->GetTotalBytes();
  163. pRecord->AddToCache( m_pNamespace, m_dwTotalSize, pdwSleep );
  164. BOOL bDrop = FALSE;
  165. // Check if the sleep is such as to cause us to drop the event
  166. // ===========================================================
  167. if(!bSlowDown && *pdwSleep > SLOWDOWN_DROP_LIMIT)
  168. {
  169. bDrop = TRUE;
  170. }
  171. else
  172. {
  173. // Check if our queue size is so large as to cause us to drop
  174. // ==============================================================
  175. if(m_dwTotalSize + dwRecordSize > m_dwMaxSize)
  176. bDrop = TRUE;
  177. }
  178. if( bDrop )
  179. {
  180. //
  181. // Report that we're dropping the events. Call for each event.
  182. //
  183. IWbemClassObject** apEvents = pRecord->GetEvents();
  184. for( ULONG i=0; i < pRecord->GetNumEvents(); i++ )
  185. {
  186. ReportQueueOverflow( apEvents[i], m_dwTotalSize + dwRecordSize );
  187. }
  188. *pdwSleep = 0;
  189. *pbFirst = FALSE;
  190. }
  191. else
  192. {
  193. IN_SPIN_LOCK isl(&m_sl);
  194. *pbFirst = (m_qpEvents.GetQueueSize() == 0) && !m_bDelivering;
  195. m_dwTotalSize += dwRecordSize;
  196. if(!m_qpEvents.Enqueue(pRecord))
  197. {
  198. *pdwSleep = 0;
  199. return FALSE;
  200. }
  201. pRecord->AddRef();
  202. }
  203. return TRUE;
  204. }
  205. HRESULT CQueueingEventSink::DeliverAll()
  206. {
  207. HRESULT hr = WBEM_S_NO_ERROR;
  208. BOOL bSomeLeft = TRUE;
  209. while( bSomeLeft )
  210. {
  211. try
  212. {
  213. {
  214. IN_SPIN_LOCK ics(&m_sl);
  215. m_bDelivering = TRUE;
  216. }
  217. hr = DeliverSome( );
  218. }
  219. catch( CX_MemoryException )
  220. {
  221. hr = WBEM_E_OUT_OF_MEMORY;
  222. }
  223. catch ( ... )
  224. {
  225. hr = WBEM_E_FAILED;
  226. }
  227. {
  228. IN_SPIN_LOCK ics(&m_sl);
  229. m_bDelivering = FALSE;
  230. if ( SUCCEEDED( hr ) )
  231. {
  232. bSomeLeft = (m_qpEvents.GetQueueSize() != 0);
  233. }
  234. else
  235. {
  236. m_qpEvents.Clear();
  237. bSomeLeft = FALSE;
  238. }
  239. }
  240. }
  241. return hr;
  242. }
  243. void CQueueingEventSink::ClearAll()
  244. {
  245. IN_SPIN_LOCK isl(&m_sl);
  246. m_qpEvents.Clear();
  247. }
  248. #pragma optimize("", off)
  249. void CQueueingEventSink::WaitABit()
  250. {
  251. SwitchToThread();
  252. /*
  253. int nCount = 0;
  254. while(m_qpEvents.GetQueueSize() == 0 && nCount++ < DELIVER_SPIN_COUNT);
  255. */
  256. }
  257. #pragma optimize("", on)
  258. HRESULT CQueueingEventSink::DeliverSome( )
  259. {
  260. // Retrieve records until maximum size is reached and while the same
  261. // security context is used for all
  262. // ==================================================================
  263. CTempArray<CDeliveryRecord*> apRecords;
  264. m_sl.Enter(); // CANNOT USE SCOPE BECAUSE CTempArray uses _alloca
  265. DWORD dwMaxRecords = m_qpEvents.GetQueueSize();
  266. m_sl.Leave();
  267. if(!INIT_TEMP_ARRAY(apRecords, dwMaxRecords))
  268. {
  269. return WBEM_E_OUT_OF_MEMORY;
  270. }
  271. CDeliveryRecord* pEventRec;
  272. DWORD dwDeliverySize = 0;
  273. DWORD dwTotalEvents = 0;
  274. int cRecords = 0;
  275. LUID luidBatch;
  276. IWbemCallSecurity* pBatchSecurity = NULL;
  277. m_sl.Enter();
  278. while( dwDeliverySize < GetMaxDeliverySize() &&
  279. cRecords < dwMaxRecords &&
  280. (pEventRec = m_qpEvents.Dequeue()) != NULL )
  281. {
  282. // Compare it to the last context
  283. // ==============================
  284. m_sl.Leave();
  285. if( dwDeliverySize > 0 )
  286. {
  287. if(!DoesRecordFitBatch(pEventRec, pBatchSecurity, luidBatch))
  288. {
  289. // Put it back and that's it for the batch
  290. // =======================================
  291. IN_SPIN_LOCK ics(&m_sl);
  292. m_qpEvents.Requeue(pEventRec);
  293. break;
  294. }
  295. }
  296. else
  297. {
  298. // First --- record luid
  299. // =====================
  300. pBatchSecurity = pEventRec->GetCallSecurity();
  301. if( pBatchSecurity )
  302. {
  303. pBatchSecurity->AddRef();
  304. pBatchSecurity->GetAuthenticationId( luidBatch );
  305. }
  306. }
  307. apRecords[cRecords++] = pEventRec;
  308. dwTotalEvents += pEventRec->GetNumEvents();
  309. // Matched batch parameters --- add it to the batch
  310. // ================================================
  311. DWORD dwRecordSize = pEventRec->GetTotalBytes();
  312. m_dwTotalSize -= dwRecordSize;
  313. dwDeliverySize += dwRecordSize;
  314. //
  315. // Remove this size from the total of events held
  316. //
  317. m_sl.Enter();
  318. }
  319. m_sl.Leave();
  320. //
  321. // we've now got one or more delivery records to handle.
  322. //
  323. //
  324. // we now need to initialize the event array that we're going to indicate
  325. // to the client.
  326. //
  327. CTempArray<IWbemClassObject*> apEvents;
  328. if( !INIT_TEMP_ARRAY( apEvents, dwTotalEvents ))
  329. {
  330. return WBEM_E_OUT_OF_MEMORY;
  331. }
  332. //
  333. // go through the delivery records and add their events to the
  334. // events to deliver. Also perform any PreDeliverAction on the
  335. // record.
  336. //
  337. CWbemPtr<ITransaction> pTxn;
  338. HRESULT hr;
  339. int cEvents = 0;
  340. int i;
  341. for(i=0; i < cRecords; i++ )
  342. {
  343. //if ( apRecords[i]->RequiresTransaction() && pTxn == NULL )
  344. //{
  345. // TODO : XACT - aquire txn from DTC.
  346. //}
  347. hr = apRecords[i]->PreDeliverAction( pTxn );
  348. if ( FAILED(hr) )
  349. {
  350. //
  351. // TODO : handle error reporting here.
  352. //
  353. continue;
  354. }
  355. IWbemEvent** apRecordEvents = apRecords[i]->GetEvents();
  356. DWORD cRecordEvents = apRecords[i]->GetNumEvents();
  357. for( DWORD j=0; j < cRecordEvents; j++ )
  358. {
  359. apEvents[cEvents++] = apRecordEvents[j];
  360. }
  361. }
  362. // Actually Deliver
  363. // =======
  364. HRESULT hres = WBEM_S_NO_ERROR;
  365. if( dwDeliverySize > 0 )
  366. {
  367. //
  368. // Error returns are already logged in ActuallyDeliver
  369. // we do not need to return return value of DeliverEvents
  370. //
  371. hres = DeliverEvents( pBatchSecurity, cEvents, apEvents );
  372. }
  373. //
  374. // call postdeliveryaction on all the records. Then clean them up.
  375. //
  376. for(i=0; i < cRecords; i++ )
  377. {
  378. apRecords[i]->PostDeliverAction( pTxn, hres );
  379. apRecords[i]->Release();
  380. }
  381. // Release all of the events.
  382. // ================
  383. if( pBatchSecurity )
  384. {
  385. pBatchSecurity->Release();
  386. }
  387. // Check if we need to continue
  388. // ============================
  389. WaitABit();
  390. return WBEM_S_NO_ERROR;
  391. }
  392. HRESULT CQueueingEventSink::DeliverEvents(IWbemCallSecurity* pBatchSecurity,
  393. long lNumEvents, IWbemEvent** apEvents)
  394. {
  395. HRESULT hres = WBEM_S_NO_ERROR;
  396. IUnknown* pOldSec = NULL;
  397. if(pBatchSecurity)
  398. {
  399. hres = WbemCoSwitchCallContext(pBatchSecurity, &pOldSec);
  400. if(FAILED(hres))
  401. {
  402. // Unable to set security --- cannot deliver
  403. // =========================================
  404. }
  405. }
  406. if(SUCCEEDED(hres))
  407. {
  408. // BUGBUG: propagate context. levn: no security implications at this
  409. // point --- we are past the filter
  410. hres = ActuallyDeliver(lNumEvents, apEvents, (pBatchSecurity != NULL),
  411. NULL);
  412. }
  413. if(pBatchSecurity)
  414. {
  415. IUnknown* pTemp;
  416. WbemCoSwitchCallContext(pOldSec, &pTemp);
  417. }
  418. return hres;
  419. }
  420. BOOL CQueueingEventSink::DoesRecordFitBatch( CDeliveryRecord* pEventRec,
  421. IWbemCallSecurity* pBatchSecurity,
  422. LUID luidBatch )
  423. {
  424. IWbemCallSecurity* pEventSec = pEventRec->GetCallSecurity();
  425. if( pEventSec != NULL || pBatchSecurity != NULL )
  426. {
  427. if( pEventSec == NULL || pBatchSecurity == NULL )
  428. {
  429. // Definite mistatch --- one NULL, one not
  430. // =======================================
  431. return FALSE;
  432. }
  433. else
  434. {
  435. LUID luidThis;
  436. pEventSec->GetAuthenticationId(luidThis);
  437. if( luidThis.LowPart != luidBatch.LowPart ||
  438. luidThis.HighPart != luidBatch.HighPart )
  439. {
  440. return FALSE;
  441. }
  442. else
  443. {
  444. return TRUE;
  445. }
  446. }
  447. }
  448. else
  449. {
  450. return TRUE;
  451. }
  452. }
  453. DWORD CQueueingEventSink::GetMaxDeliverySize()
  454. {
  455. return MAX_EVENT_DELIVERY_SIZE;
  456. }
  457. #ifdef __WHISTLER_UNCUT
  458. //
  459. // used to capture callbacks from MsgReceiver. The Msg Receiver interfaces
  460. // use callbacks to avoid unnecessary copying.
  461. //
  462. struct MsgReceive
  463. : public CUnkBase<IWmiMessageSendReceive, &IID_IWmiMessageSendReceive>
  464. {
  465. BYTE* m_pData;
  466. ULONG m_cData;
  467. BYTE* m_pAuxData;
  468. ULONG m_cAuxData;
  469. STDMETHOD(SendReceive)( BYTE* pData,
  470. ULONG cData,
  471. BYTE* pAuxData,
  472. ULONG cAuxData,
  473. DWORD dwFlagStatus,
  474. IUnknown* pUnk )
  475. {
  476. m_pData = pData;
  477. m_cData = cData;
  478. m_pAuxData = pAuxData;
  479. m_cAuxData = cAuxData;
  480. return S_OK;
  481. }
  482. };
  483. HRESULT CQueueingEventSink::OpenReceiver( LPCWSTR wszQueueName,
  484. DWORD dwQos,
  485. IWmiMessageSendReceive* pRecv,
  486. IWmiMessageQueueReceiver** ppRcvr )
  487. {
  488. HRESULT hr;
  489. *ppRcvr = NULL;
  490. CWbemPtr<IWmiMessageQueue> pQueue;
  491. hr = CoCreateInstance( CLSID_WmiMessageQueue,
  492. NULL,
  493. CLSCTX_INPROC,
  494. IID_IWmiMessageQueue,
  495. (void**)&pQueue );
  496. if ( FAILED(hr) )
  497. {
  498. return hr;
  499. }
  500. CWbemPtr<IWmiMessageQueueReceiver> pRcvr;
  501. hr = pQueue->Open( wszQueueName, dwQos, pRecv, &pRcvr );
  502. if ( FAILED(hr) )
  503. {
  504. return hr;
  505. }
  506. pRcvr->AddRef();
  507. *ppRcvr = pRcvr;
  508. return WBEM_S_NO_ERROR;
  509. }
  510. HRESULT CQueueingEventSink::OpenSender( LPCWSTR wszQueueName,
  511. DWORD dwQos,
  512. IWmiMessageSendReceive** ppSend )
  513. {
  514. HRESULT hr;
  515. *ppSend = NULL;
  516. //
  517. // make sure that the queue has been created.
  518. //
  519. hr = m_pNamespace->GetEss()->CreatePersistentQueue( wszQueueName, dwQos );
  520. if ( FAILED(hr) )
  521. {
  522. if ( hr != WBEM_E_ALREADY_EXISTS )
  523. {
  524. return hr;
  525. }
  526. }
  527. //
  528. // now open a sender on the queue.
  529. //
  530. CWbemPtr<IWmiMessageSender> pSender;
  531. hr = CoCreateInstance( CLSID_WmiMessageMsmqSender,
  532. NULL,
  533. CLSCTX_INPROC,
  534. IID_IWmiMessageSender,
  535. (void**)&pSender );
  536. if ( FAILED(hr) )
  537. {
  538. return hr;
  539. }
  540. hr = pSender->Open( wszQueueName, dwQos, NULL, NULL, NULL, ppSend );
  541. return hr;
  542. }
  543. HRESULT CQueueingEventSink::GetPersistentRecord( ULONG cEvents,
  544. IWbemEvent** apEvents,
  545. DWORD dwQos,
  546. CEventContext* pContext,
  547. CDeliveryRecord** ppRecord )
  548. {
  549. HRESULT hr;
  550. CInCritSec ics( &m_csQueue );
  551. //
  552. // the idea here is that the act of saving/removing messages and
  553. // performing recovery can never take place at the same time. This
  554. // is we because we must ensure that the message removed from the
  555. // front of the persistent queues corresponds with the guaranteed
  556. // delivery pulled off of the transient queue.
  557. //
  558. while ( m_bRecovering )
  559. {
  560. DEBUGTRACE((LOG_ESS, "%S queue sink waiting for recovery.\n", m_wszName ));
  561. assert( m_hRecoveryComplete != NULL );
  562. LeaveCriticalSection( &m_csQueue );
  563. WaitForSingleObject( m_hRecoveryComplete, INFINITE );
  564. EnterCriticalSection( &m_csQueue );
  565. DEBUGTRACE((LOG_ESS, "%S queue sink waited for recovery.\n", m_wszName ));
  566. }
  567. //
  568. // check to see if we're in a bad state. If so, return the error
  569. // that got us there.
  570. //
  571. if ( FAILED(m_hrRecovery) )
  572. {
  573. return m_hrRecovery;
  574. }
  575. //
  576. // first ensure that the objects associated with the QoS are initialized.
  577. // once this happens, we'll save the record in the appropriate queue.
  578. // there's a bit of indirect referencing here to address the case when
  579. // we have multiple types of senders and receivers ( right now only one ).
  580. //
  581. IWmiMessageSendReceive** ppSend;
  582. IWmiMessageQueueReceiver** ppRcvr;
  583. //
  584. // TODO:XACT later use persistent base class
  585. //
  586. CWbemPtr<CGuaranteedDeliveryRecord> pRecord;
  587. if ( dwQos == WMIMSG_FLAG_QOS_GUARANTEED )
  588. {
  589. pRecord = new CGuaranteedDeliveryRecord;
  590. ppSend = &m_pSend;
  591. ppRcvr = &m_pRcvr;
  592. }
  593. else
  594. {
  595. //
  596. // TODO : XACT delivery.
  597. //
  598. return WBEM_E_CRITICAL_ERROR;
  599. }
  600. if ( pRecord == NULL )
  601. {
  602. return WBEM_E_OUT_OF_MEMORY;
  603. }
  604. WString wsQueueName;
  605. if ( *ppSend == NULL || *ppRcvr == NULL )
  606. {
  607. //
  608. // construct the queue name from our sinkname, namespace, and qos.
  609. //
  610. hr = SinkNameToQueueName( m_wszName,
  611. m_pNamespace->GetName(),
  612. dwQos,
  613. wsQueueName );
  614. if ( FAILED(hr) )
  615. {
  616. return hr;
  617. }
  618. }
  619. if ( *ppSend == NULL )
  620. {
  621. hr = OpenSender( wsQueueName, dwQos, ppSend );
  622. if ( FAILED(hr) )
  623. {
  624. return hr;
  625. }
  626. }
  627. if ( *ppRcvr == NULL )
  628. {
  629. //
  630. // we don't need to pass a callback for receiving messages,
  631. // because all we're ever going to do with this receiver is
  632. // remove messages.
  633. //
  634. hr = OpenReceiver( wsQueueName, dwQos, NULL, ppRcvr );
  635. if ( FAILED(hr) )
  636. {
  637. return hr;
  638. }
  639. }
  640. hr = pRecord->Initialize( apEvents, cEvents );
  641. if ( FAILED(hr) )
  642. {
  643. return hr;
  644. }
  645. //
  646. // set the receiver on the record so that later it can come back and
  647. // remove the message from the queue.
  648. //
  649. pRecord->SetCB( this, *ppRcvr );
  650. hr = SaveDeliveryRecord( *ppSend, pRecord );
  651. if ( FAILED(hr) )
  652. {
  653. return hr;
  654. }
  655. pRecord->AddRef();
  656. *ppRecord = pRecord;
  657. return WBEM_S_NO_ERROR;
  658. }
  659. HRESULT CQueueingEventSink::SaveDeliveryRecord( IWmiMessageSendReceive* pSend,
  660. CDeliveryRecord* pRecord )
  661. {
  662. HRESULT hr;
  663. //
  664. // reset the message buffer.
  665. //
  666. m_MsgData.Reset();
  667. //
  668. // first set the message data.
  669. //
  670. hr = pRecord->Persist( &m_MsgData );
  671. if ( FAILED(hr) )
  672. {
  673. return hr;
  674. }
  675. //
  676. // TODO: Later we need to store some random bytes with the header so
  677. // that it can act as a signature. Since no one can read the messages
  678. // but us, the data of the message does not have to be hashed. We just
  679. // want to know if the sender has the private key.
  680. // Since the header will be the same for all messages, we can probably
  681. // set it up somewhere once.
  682. //
  683. return pSend->SendReceive( m_MsgData.GetRawData(),
  684. m_MsgData.GetIndex(),
  685. NULL,
  686. 0,
  687. 0,
  688. NULL );
  689. }
  690. #endif
  691. HRESULT CQueueingEventSink::GetDeliveryRecord( ULONG cEvents,
  692. IWbemEvent** apEvents,
  693. DWORD dwQos,
  694. CEventContext* pContext,
  695. IWbemCallSecurity* pCallSec,
  696. CDeliveryRecord** ppRecord )
  697. {
  698. HRESULT hr;
  699. *ppRecord = NULL;
  700. CWbemPtr<CDeliveryRecord> pRecord;
  701. if ( dwQos == WMIMSG_FLAG_QOS_EXPRESS )
  702. {
  703. pRecord = new CExpressDeliveryRecord;
  704. if ( pRecord == NULL )
  705. {
  706. return WBEM_E_OUT_OF_MEMORY;
  707. }
  708. hr = pRecord->Initialize( apEvents, cEvents, pCallSec );
  709. }
  710. #ifdef __WHISTLER_UNCUT
  711. else
  712. {
  713. //
  714. // this is a guaranteed type of QoS, we will need to save the
  715. // record before returning it.
  716. //
  717. if ( pCallSec != NULL )
  718. {
  719. return WBEM_E_NOT_SUPPORTED;
  720. }
  721. hr = GetPersistentRecord( cEvents,
  722. apEvents,
  723. dwQos,
  724. pContext,
  725. &pRecord );
  726. if ( FAILED(hr) && HandlePersistentQueueError(hr, dwQos ) === S_OK )
  727. {
  728. //
  729. // we should retry once more
  730. //
  731. hr = GetPersistentRecord( cEvents,
  732. apEvents,
  733. dwQos,
  734. pContext,
  735. &pRecord );
  736. }
  737. }
  738. #endif
  739. if ( FAILED(hr) )
  740. {
  741. return hr;
  742. }
  743. pRecord->AddRef();
  744. *ppRecord = pRecord;
  745. return WBEM_S_NO_ERROR;
  746. }
  747. #ifdef __WHISTLER_UNCUT
  748. HRESULT CQueueingEventSink::GuaranteedPostDeliverAction(
  749. IWmiMessageQueueReceiver* pRcvr )
  750. {
  751. //
  752. // we pass along the receiver that existed at the time the delivery
  753. // was saved because Recovery may have occurred and completed
  754. // between the time the delivery was saved and now. If this was the
  755. // case, we would have released that receiver connection. To see
  756. // if this case has occurred we just compare the receiver pointers.
  757. //
  758. HRESULT hr;
  759. CInCritSec ics( &m_cs );
  760. //
  761. // XACT note: when a message is read using a transaction we cannot
  762. // allow recovery to occur until that transaction is completed.
  763. // this is because recovery uses a cursor on the queue and if a txn
  764. // is aborted, it goes back into the queue and screws up the cursor.
  765. // what we'll do is hold the lock when there are outstanding txns.
  766. //
  767. if ( m_bRecovering || FAILED(m_hrRecovery) || m_pRcvr != pRcvr )
  768. {
  769. DEBUGTRACE((LOG_ESS, "ignoring removal of persistent delivery "
  770. "due to recovery of %S sink\n", m_wszName));
  771. return S_OK; // recovery will handle removing this message.
  772. }
  773. hr = pRcvr->ReceiveMessage( INFINITE,
  774. NULL,
  775. WMIMSG_ACTION_QRCV_REMOVE,
  776. NULL );
  777. if ( FAILED(hr) )
  778. {
  779. ERRORTRACE((LOG_ESS, "Couldn't remove persistent delivery for %S "
  780. " sink. HR = 0x%x\n", m_wszName, hr ));
  781. HandlePersistentQueueError( hr, WMIMSG_FLAG_QOS_GUARANTEED );
  782. }
  783. return hr;
  784. }
  785. HRESULT CQueueingEventSink::HandlePersistentQueueError( HRESULT hr,
  786. DWORD dwQos )
  787. {
  788. //
  789. // returns S_OK if caller should retry the request that caused the error.
  790. //
  791. DEBUGTRACE((LOG_ESS, "Received error from persistent queue for %S sink "
  792. "HR = 0x%x\n", m_wszName, hr));
  793. if ( hr != WMIMSG_E_REQSVCNOTAVAIL )
  794. {
  795. return S_FALSE;
  796. }
  797. //
  798. // the msmq service is down. Restart it and initiate recovery.
  799. //
  800. WString wsQueueName;
  801. hr = SinkNameToQueueName( m_wszName,
  802. m_pNamespace->GetName(),
  803. dwQos,
  804. wsQueueName );
  805. if ( FAILED(hr) )
  806. {
  807. return hr;
  808. }
  809. //
  810. // reset msmq connections.
  811. //
  812. if ( dwQos == WMIMSG_FLAG_QOS_GUARANTEED )
  813. {
  814. m_pSend.Release();
  815. m_pRcvr.Release();
  816. }
  817. else
  818. {
  819. m_pXactSend.Release();
  820. m_pXactRcvr.Release();
  821. }
  822. Recover( wsQueueName, dwQos );
  823. return S_OK; // if recovery failed we'll pick it up later.
  824. }
  825. HRESULT CQueueingEventSink::Recover( LPCWSTR wszQueueName, DWORD dwQoS )
  826. {
  827. HRESULT hr;
  828. DEBUGTRACE((LOG_ESS, "Recovering Queue %S\n", wszQueueName ));
  829. {
  830. CInCritSec ics( &m_cs );
  831. m_hRecoveryComplete = CreateEvent( NULL, TRUE, FALSE, NULL );
  832. if ( m_hRecoveryComplete == NULL )
  833. {
  834. return HRESULT_FROM_WIN32( GetLastError() );
  835. }
  836. ResetEvent( m_hRecoveryComplete );
  837. m_bRecovering = TRUE;
  838. }
  839. hr = InternalRecover( wszQueueName, dwQoS );
  840. {
  841. CInCritSec ics( &m_cs );
  842. SetEvent( m_hRecoveryComplete );
  843. CloseHandle( m_hRecoveryComplete );
  844. m_bRecovering = FALSE;
  845. m_hrRecovery = hr;
  846. }
  847. if ( FAILED(hr) )
  848. {
  849. ERRORTRACE(( LOG_ESS, "Failed Recovering %S queue. HR=0x%x\n",
  850. wszQueueName, hr ));
  851. return hr;
  852. }
  853. DEBUGTRACE((LOG_ESS, "Recovered Queue %S\n", wszQueueName ));
  854. return hr;
  855. }
  856. HRESULT CQueueingEventSink::InternalRecover(LPCWSTR wszQueueName, DWORD dwQoS)
  857. {
  858. HRESULT hr;
  859. CWbemPtr<MsgReceive> pRecv = new MsgReceive;
  860. //
  861. // here we open a new receiver. We don't want to keep this receiver
  862. // open afterwards though because recovery would open all the persistent
  863. // queues - which could be a lot of handles. We'll close this receiver
  864. // after we're done and wait until someone actually indicates a persistent
  865. // message before initializing the receiver that we'll hold on to.
  866. //
  867. CWbemPtr<IWmiMessageQueueReceiver> pRcvr;
  868. hr = OpenReceiver( wszQueueName, dwQoS, pRecv, &pRcvr );
  869. if ( FAILED(hr) )
  870. {
  871. return hr;
  872. }
  873. PVOID pvCursor;
  874. hr = pRcvr->CreateCursor( &pvCursor );
  875. if ( FAILED(hr) )
  876. {
  877. return hr;
  878. }
  879. BOOL bSchedule = FALSE;
  880. hr = pRcvr->ReceiveMessage( 0,
  881. pvCursor,
  882. WMIMSG_ACTION_QRCV_PEEK_CURRENT,
  883. NULL );
  884. while( SUCCEEDED(hr) )
  885. {
  886. CBuffer Data( pRecv->m_pData, pRecv->m_cData, FALSE );
  887. CWbemPtr<CGuaranteedDeliveryRecord> pRecord;
  888. pRecord = new CGuaranteedDeliveryRecord; // TODO : XACT
  889. if ( pRecord == NULL )
  890. {
  891. hr = WBEM_E_OUT_OF_MEMORY;
  892. break;
  893. }
  894. pRecord->SetCB( this, pRcvr );
  895. hr = pRecord->Unpersist( &Data );
  896. if ( FAILED(hr) )
  897. {
  898. ERRORTRACE(( LOG_ESS, "Invalid Delivery Message in %S queue\n",
  899. m_wszName));
  900. hr = WBEM_S_NO_ERROR;
  901. continue;
  902. }
  903. //
  904. // add the record to the transient queue.
  905. //
  906. DWORD dwThisSleep;
  907. BOOL bFirst;
  908. if( !AddRecord( pRecord, FALSE, &dwThisSleep, &bFirst) )
  909. {
  910. //
  911. // We can't add the record because of out of memory.
  912. // we're going to have to bail on our recovery.
  913. //
  914. return WBEM_E_OUT_OF_MEMORY;
  915. break;
  916. }
  917. bSchedule = TRUE; // at least one was added successfully .
  918. hr = pRcvr->ReceiveMessage( 0,
  919. pvCursor,
  920. WMIMSG_ACTION_QRCV_PEEK_NEXT,
  921. NULL );
  922. }
  923. if ( SUCCEEDED(hr) && bSchedule )
  924. {
  925. m_pNamespace->ScheduleDelivery( this );
  926. }
  927. pRcvr->DestroyCursor( pvCursor );
  928. if ( hr != WMIMSG_E_TIMEDOUT )
  929. {
  930. return hr;
  931. }
  932. return WBEM_S_NO_ERROR;
  933. }
  934. HRESULT CQueueingEventSink::CleanupPersistentQueues()
  935. {
  936. HRESULT hr;
  937. if ( m_wszName == NULL )
  938. {
  939. return WBEM_S_NO_ERROR; // not a permanent event consumer. Temporary.
  940. }
  941. DWORD dwQoS = WMIMSG_FLAG_QOS_GUARANTEED; // TODO : XACT
  942. WString wsQueueName;
  943. hr = SinkNameToQueueName( m_wszName,
  944. m_pNamespace->GetName(),
  945. dwQoS,
  946. wsQueueName );
  947. if ( FAILED(hr) )
  948. {
  949. return hr;
  950. }
  951. return m_pNamespace->GetEss()->DestroyPersistentQueue( wsQueueName );
  952. }
  953. const LPCWSTR g_wszGuaranteed = L"Guaranteed";
  954. //
  955. // queue name must be a valid msmq pathname to a private queue where the
  956. // logical name is of the formate sinkname!namespace!qos
  957. //
  958. HRESULT CQueueingEventSink::QueueNameToSinkName( LPCWSTR wszQueueName,
  959. WString& rwsSinkName,
  960. WString& rwsNamespace,
  961. DWORD& rdwQoS )
  962. {
  963. wszQueueName = wcschr( wszQueueName, '\\');
  964. if ( wszQueueName == NULL )
  965. {
  966. return WBEM_E_INVALID_PARAMETER;
  967. }
  968. wszQueueName++; // advance past delimiter.
  969. //
  970. // pathname is always private so advance one more slash.
  971. //
  972. wszQueueName = wcschr( wszQueueName, '\\');
  973. if ( wszQueueName == NULL )
  974. {
  975. return WBEM_E_INVALID_PARAMETER;
  976. }
  977. wszQueueName++; // advance past delimiter.
  978. WCHAR* pwchNamespace = wcschr( wszQueueName, '!' );
  979. if ( pwchNamespace == NULL )
  980. {
  981. return WBEM_E_INVALID_PARAMETER;
  982. }
  983. pwchNamespace++;
  984. WCHAR* pwchQoS = wcschr( pwchNamespace, '!' );
  985. if ( pwchQoS == NULL )
  986. {
  987. return WBEM_E_INVALID_PARAMETER;
  988. }
  989. pwchQoS++;
  990. rwsNamespace = pwchNamespace;
  991. rwsSinkName = wszQueueName;
  992. LPWSTR wszSinkName = rwsSinkName;
  993. LPWSTR wszNamespace = rwsNamespace;
  994. wszSinkName[pwchNamespace-wszQueueName-1] = '\0';
  995. wszNamespace[pwchQoS-pwchNamespace-1] = '\0';
  996. //
  997. // substitute the slashes back into the namespace.
  998. //
  999. WCHAR* pwch = wszNamespace;
  1000. while( (pwch=wcschr(pwch,'~')) != NULL )
  1001. {
  1002. *pwch++ = '\\';
  1003. }
  1004. rdwQoS = WMIMSG_FLAG_QOS_GUARANTEED; // TODO : XACT check
  1005. return WBEM_S_NO_ERROR;
  1006. }
  1007. HRESULT CQueueingEventSink::SinkNameToQueueName( LPCWSTR wszSinkName,
  1008. LPCWSTR wszNamespace,
  1009. DWORD dwQoS,
  1010. WString& rwsQueueName )
  1011. {
  1012. LPCWSTR wszQos;
  1013. if ( dwQoS != WMIMSG_FLAG_QOS_GUARANTEED ) // TODO : XACT
  1014. {
  1015. return WBEM_E_CRITICAL_ERROR;
  1016. }
  1017. //
  1018. // the logical part of the pathname cannot contain any slashes, so
  1019. // when saving the namespace, we must remove them and replace them with
  1020. // something else.
  1021. //
  1022. WString wsNormNamespace = wszNamespace;
  1023. WCHAR* pwch = wsNormNamespace;
  1024. while( (pwch=wcschr(pwch,'\\')) != NULL )
  1025. {
  1026. *pwch++ = '~';
  1027. }
  1028. wszQos = g_wszGuaranteed; // TODO : XACT
  1029. rwsQueueName = L".\\private$\\";
  1030. rwsQueueName += wszSinkName;
  1031. rwsQueueName += L"!";
  1032. rwsQueueName += wsNormNamespace;
  1033. rwsQueueName += L"!";
  1034. rwsQueueName += wszQos;
  1035. return WBEM_S_NO_ERROR;
  1036. }
  1037. #endif