Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

592 lines
16 KiB

/*++
Copyright (c) 1995 Microsoft Corporation
Module Name:
XactRT.cpp
Abstract:
This module contains RT code involved with transactions.
Author:
Alexander Dadiomov (alexdad) 19-Jun-96
Revision History:
--*/
#include "stdh.h"
#include "TXDTC.H"
#include "txcoord.h"
#include "xactsink.h"
#include "cs.h"
#include "mqutil.h"
#include "rtprpc.h"
#include "xactmq.h"
#include "xactrt.tmh"
//RT transactions cache: ring buffer of transaction UOWs
#define XACT_RING_BUF_SIZE 16 // size of the transactions ring buffer
static XACTUOW s_uowXactRingBuf[XACT_RING_BUF_SIZE]; // transaction ring buffer
ULONG s_ulXrbFirst = XACT_RING_BUF_SIZE; // First used element in transaction ring buffer
ULONG s_ulXrbLast = XACT_RING_BUF_SIZE; // Last used element in transaction ring buffer
static BOOL g_DtcInit = FALSE;
static ULONG g_StubRmCounter = 0;
static CCriticalSection s_RingBufCS;
// Whereabouts of the controlling DTC for the QM
// For the dependent client it will be non-local
ULONG g_cbQmTmWhereabouts = 0; // length of DTC whereabouts
BYTE *g_pbQmTmWhereabouts = NULL; // DTC whereabouts
static ITransactionExport *g_pExport = NULL; // cached DTC export object
HANDLE g_hMutexDTC = NULL; // Serializes calls to DTC
extern HRESULT DepGetTmWhereabouts(
IN DWORD cbBufSize,
OUT UCHAR *pbWhereabouts,
OUT DWORD *pcbWhereabouts);
/*====================================================
GetMutex
Internal: creates/opens global mutex and waits for it
=====================================================*/
HRESULT GetMutex()
{
if (!g_hMutexDTC)
{
g_hMutexDTC = CreateMutexA(NULL, FALSE, "MSMQ_DTC");
}
if (!g_hMutexDTC)
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("CreateMutex failed: %x "), 0));
return MQ_ERROR_DTC_CONNECT;
}
WaitForSingleObject(g_hMutexDTC, 5 * 60 * 1000);
return MQ_OK;
}
//---------------------------------------------------------
// BOOL FindTransaction( XACTUOW *pUow )
//
// Description:
//
// Linear search in the ring buffer; *not* adds
// returns TRUE if xaction was found, FALSE - if not
//---------------------------------------------------------
static BOOL FindTransaction(XACTUOW *pUow)
{
CS lock(s_RingBufCS);
// Look for the UOW in the ring buffer
for (ULONG i = s_ulXrbFirst; i <= s_ulXrbLast && i < XACT_RING_BUF_SIZE; i++)
{
if (memcmp(&s_uowXactRingBuf[i], pUow, sizeof(XACTUOW))==0)
{
return TRUE;
}
}
return FALSE;
}
//---------------------------------------------------------
// BOOL RememberTransaction( XACTUOW *pUow )
//
// Description:
//
// Linear search in the ring buffer; adds there if not found;
// returns TRUE if xaction was found, FALSE - if it was added
//---------------------------------------------------------
static BOOL RememberTransaction(XACTUOW *pUow)
{
CS lock(s_RingBufCS);
// Look for the UOW in the ring buffer
for (ULONG i = s_ulXrbFirst; i <= s_ulXrbLast && i < XACT_RING_BUF_SIZE; i++)
{
if (memcmp(&s_uowXactRingBuf[i], pUow, sizeof(XACTUOW))==0)
{
return TRUE;
}
}
// No check for ring buffer overflow, because it is not dangerous (maximum RT will go to QM)
// adding transaction to the ring buffer
if (s_ulXrbFirst == XACT_RING_BUF_SIZE)
{
// Ring buffer is empty
s_ulXrbFirst = s_ulXrbLast = 0;
memcpy(&s_uowXactRingBuf[s_ulXrbFirst], pUow, sizeof(XACTUOW));
}
else
{
s_ulXrbLast = (s_ulXrbLast == XACT_RING_BUF_SIZE-1 ? 0 : s_ulXrbLast+1);
memcpy(&s_uowXactRingBuf[s_ulXrbLast], pUow, sizeof(XACTUOW));
}
return FALSE;
}
//---------------------------------------------------------
// HRESULT RTpGetExportObject
//
// Description:
//
// Creates and caches the DTC export object
//---------------------------------------------------------
HRESULT RTpGetExportObject(IUnknown *punkDtc,
ULONG cbTmWhereabouts,
BYTE *pbTmWhereabouts)
{
HRESULT hr = MQ_OK;
R<ITransactionExportFactory> pTxExpFac = NULL;
if (g_pExport)
{
g_pExport->Release();
g_pExport = NULL;
}
// Get the DTC's ITransactionExportFactory interface
hr = punkDtc->QueryInterface (IID_ITransactionExportFactory, (void **)(&pTxExpFac.ref()));
if (FAILED(hr))
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("QueryInterface failed: %x "), hr));
return hr;
}
// Create Export object
hr = pTxExpFac->Create (cbTmWhereabouts, pbTmWhereabouts, &g_pExport);
if (FAILED(hr))
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("Create Export Object failed: %x "), hr));
return hr;
}
return(MQ_OK);
}
//---------------------------------------------------------
// HRESULT RTpBuildTransactionCookie
//
// Description:
//
// Builds transaction Cookie
//---------------------------------------------------------
HRESULT RTpBuildTransactionCookie(ITransaction *pTrans,
ULONG *pcbCookie,
BYTE **ppbCookie)
{
HRESULT hr = MQ_OK;
ULONG cbUsed;
R<IUnknown> punkTx = NULL;
*pcbCookie = 0;
*ppbCookie = NULL;
// Get transaction's Unknown
hr = pTrans->QueryInterface (IID_IUnknown, (void **)(&punkTx.ref()));
if (FAILED(hr))
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("QueryInterface failed: %x "), hr));
return hr;
}
// Get transaction cookie size
hr = g_pExport->Export (punkTx.get(), pcbCookie);
if (FAILED(hr) || *pcbCookie == 0)
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("Export failed: %x "), hr));
return hr;
}
// Allocate memory for transaction Cookie
try
{
*ppbCookie = new BYTE[*pcbCookie];
}
catch(const bad_alloc&)
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("Allocation failed: %x "), hr));
return MQ_ERROR_INSUFFICIENT_RESOURCES;
}
// Get transaction Cookie itself
hr = g_pExport->GetTransactionCookie (punkTx.get(), *pcbCookie, *ppbCookie, &cbUsed);
if (FAILED(hr))
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("GetTransactionCookie failed: %x "), hr));
return hr;
}
return(MQ_OK);
}
//---------------------------------------------------------
// HRESULT RTXactGetDTC
//
// Description:
//
// Obtains DTC transaction manager. Defers to mqutil
//
// Outputs:
// ppunkDTC pointers to DTC transaction manager
//---------------------------------------------------------
DLL_EXPORT HRESULT RTXactGetDTC(IUnknown **ppunkDTC)
{
HRESULT hr = MQ_ERROR;
__try
{
GetMutex(); // Isolate export creation from others
hr = XactGetDTC(ppunkDTC, NULL, NULL);//, g_fDependentClient);
}
__finally
{
ReleaseMutex(g_hMutexDTC);
}
return (SUCCEEDED(hr) ? MQ_OK : hr);
}
//---------------------------------------------------------
// HRESULT RTpProvideTransactionEnlist
//
// Description:
//
// Provides that QM is enlisted in this transaction,
// checks the transaction state
//---------------------------------------------------------
HRESULT RTpProvideTransactionEnlist(ITransaction *pTrans, XACTUOW *pUow)
{
HRESULT hr = MQ_OK;
IUnknown *punkDtc = NULL;
IMSMQTransaction *pIntXact = NULL;
ULONG cbTmWhereabouts;
BYTE *pbTmWhereabouts = NULL;
ULONG cbCookie;
BYTE *pbCookie = NULL;
XACTTRANSINFO xinfo;
BOOL fMutexTaken = FALSE;
__try
{
//
// Get the transaction info. UOW resides there.
//
hr = pTrans->GetTransactionInfo(&xinfo);
if (FAILED(hr))
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("GetTransactionInfo failed: %x "), hr));
__leave;
}
// Put pointer to UOW in the output parameter
CopyMemory(pUow, &xinfo.uow, sizeof(XACTUOW));
//
// Is it internal transaction?
//
pTrans->QueryInterface (IID_IMSMQTransaction, (void **)(&pIntXact));
if (pIntXact)
{
// Internal transactions
//------------------------
hr = pIntXact->EnlistTransaction(pUow);
if (FAILED(hr))
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("EnlistTransaction failed: %x "), hr));
}
}
else
{
// External transactions
//------------------------
// Look for the transaction in the cache
//
if (FindTransaction(pUow)) // this xaction is known already; QM must have been enlisted
{
hr = MQ_OK;
__leave;
}
// Get global mutex to isolate enlistment
//
GetMutex(); // Isolate export creation from others
fMutexTaken = TRUE;
//
// Get the DTC IUnknown and TM whereabouts
//
hr = XactGetDTC(&punkDtc, &cbTmWhereabouts, &pbTmWhereabouts);//, g_fDependentClient);
if (FAILED(hr))
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("XactGetDTC failed: %x "), hr));
__leave;
}
// XactGetDTC could return success code of 1 if it reconnected to DTC
if (hr == 1)
{
// No Release: DTC object is not alive anymore
g_pExport = NULL;
}
// Get the QM's controlling DTC whereabouts
//
if (!g_pbQmTmWhereabouts)
{
g_cbQmTmWhereabouts = 128;
g_pbQmTmWhereabouts = new BYTE[128];
DWORD cbNeeded;
hr = DepGetTmWhereabouts(g_cbQmTmWhereabouts, g_pbQmTmWhereabouts, &cbNeeded);
if (hr == MQ_ERROR_USER_BUFFER_TOO_SMALL)
{
delete [] g_pbQmTmWhereabouts;
g_cbQmTmWhereabouts = cbNeeded;
g_pbQmTmWhereabouts = new BYTE[cbNeeded];
hr = DepGetTmWhereabouts(g_cbQmTmWhereabouts, g_pbQmTmWhereabouts, &cbNeeded);
}
if (FAILED(hr))
{
delete [] g_pbQmTmWhereabouts;
g_cbQmTmWhereabouts = 0;
g_pbQmTmWhereabouts = NULL;
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("DepGetTmWhereabouts failed: %x "), hr));
__leave;
}
else
{
g_cbQmTmWhereabouts = cbNeeded;
}
}
//
// Get and cache Export object
//
if (g_pExport == NULL)
{
hr = RTpGetExportObject(
punkDtc,
g_cbQmTmWhereabouts,
g_pbQmTmWhereabouts);
if (FAILED(hr))
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("RTpGetExportObject failed: %x "), hr));
__leave;
}
}
//
// Prepare the transaction Cookie
//
hr = RTpBuildTransactionCookie(
pTrans,
&cbCookie,
&pbCookie);
if (FAILED(hr))
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("RTpBuildTransactionCookie failed: %x "), hr));
__leave;
}
//
// RPC call to QM for enlistment
//
__try
{
INIT_RPC_HANDLE ;
if(tls_hBindRpc == 0)
return MQ_ERROR_SERVICE_NOT_AVAILABLE;
hr = QMEnlistTransaction(tls_hBindRpc, pUow, cbCookie, pbCookie);
}
__except(EXCEPTION_EXECUTE_HANDLER)
{
DWORD rc = GetExceptionCode();
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("RTpProvideTransactionEnlist failed: RPC code=%x "), rc));
DBG_USED(rc);
hr = MQ_ERROR_SERVICE_NOT_AVAILABLE;
}
//Now that transaction is actually enlisted we remember it in ring buffer
if (SUCCEEDED(hr))
{
RememberTransaction(pUow);
}
else
{
DBGMSG((DBGMOD_XACT, DBGLVL_ERROR, TEXT("QMEnlistTransaction failed: %x "), hr));
}
}
if (FAILED(hr)) {
__leave;
}
hr = MQ_OK;
}
__finally
{
if (SUCCEEDED(hr) && AbnormalTermination())
hr = MQ_ERROR;
#ifdef _DEBUG
DWORD cRef = 0;
if (punkDtc)
cRef = punkDtc->Release();
#else
if (punkDtc)
punkDtc->Release();
#endif
if (pIntXact)
pIntXact->Release();
if (pbCookie)
delete pbCookie;
if (fMutexTaken)
ReleaseMutex(g_hMutexDTC);
}
return(hr);
}
//---------------------------------------------------------
// HRESULT InitStubRm
//
// Description:
//
// Initializes stub RM manager - now needed only for performance managements
//
//---------------------------------------------------------
IResourceManager *g_pIResMgr = 0;
HRESULT InitStubRm()
{
HRESULT hRc;
IResourceManagerFactory *pIRmFactory = 0;
CStubIResourceManagerSink *pIResMgrSink = 0;
IUnknown *punkDTC = 0;
UUID guid;
//CS lock(s_RingBufCS);
DBGMSG((DBGMOD_XACT, DBGLVL_WARNING, TEXT("InitStubRM called!")));
if (g_DtcInit)
{
return MQ_OK;
}
//
// (1) Establish contact with the MS DTC transaction manager.
// First the application obtains the IUnknown interface to the DTC TM.
//
hRc = XactGetDTC(&punkDTC, NULL, NULL);//, g_fDependentClient);
if (FAILED(hRc)) {
return hRc;
}
// Get the resource manager factory from the IUnknown
hRc = punkDTC->QueryInterface(IID_IResourceManagerFactory,(LPVOID *) &pIRmFactory);
punkDTC->Release();
if (S_OK != hRc)
{
return hRc;
}
// (2) Create and instance of the resource manager interface. A
// pointer to this interface is retrned to the client appliction
// through the pIResMgr member variable.
//
// Create a resource manager sink and create an instance of the
// resource manager through the resource manager factory.
pIResMgrSink = new CStubIResourceManagerSink; //stub implementation
if ( 0 == pIResMgrSink )
{
pIRmFactory->Release();
return E_FAIL;
}
//pIResMgrSink->AddRef();
// Create a new guid for each resource manager.
hRc = UuidCreate (&guid);
if ( S_OK != hRc)
{
return E_FAIL;
}
// Prepare stub RM name (ANSI)
CHAR szStubBaseName[MAX_REG_DEFAULT_LEN];
READ_REG_STRING(wszStubBaseName, FALCON_RM_STUB_NAME_REGNAME, FALCON_DEFAULT_STUB_RM_NAME ) ;
size_t res = wcstombs(szStubBaseName, wszStubBaseName, sizeof(szStubBaseName));
ASSERT(res != (size_t)(-1));
DBG_USED(res);
CHAR szStubRmName[60];
sprintf(szStubRmName, "%s%d", szStubBaseName, g_StubRmCounter++);
// Create instance of the resource manager interface.
hRc = pIRmFactory->Create (&guid,
szStubRmName,
(IResourceManagerSink *) pIResMgrSink,
&g_pIResMgr );
//pIRmFactory->Release();
if (S_OK != hRc)
{
return hRc;
}
//g_pIResMgr->AddRef(); // we want to keep it. BUGBUG: When will we release it ?
g_DtcInit = TRUE;
return S_OK;
}
//---------------------------------------------------------
// void RTpInitXactRingBuf()
//
// Description:
//
// Initiates the ring buffer data
//---------------------------------------------------------
void RTpInitXactRingBuf()
{
CS lock(s_RingBufCS);
s_ulXrbFirst = XACT_RING_BUF_SIZE;
s_ulXrbLast = XACT_RING_BUF_SIZE;
g_DtcInit = FALSE;
}