Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2357 lines
68 KiB

  1. /*++
  2. Copyright (C) Microsoft Corporation, 1997 - 1999
  3. Module Name:
  4. mq.cxx
  5. Abstract:
  6. This is the code that actually makes Falcon API calls. It is
  7. used by the Falcon/RPC transport (mqtrans.cxx)
  8. Author:
  9. Edward Reus (edwardr) 05-Jul-1997
  10. Revision History:
  11. --*/
  12. #include <precomp.hxx>
  13. #include <trans.hxx>
  14. #include <dgtrans.hxx>
  15. #include <wswrap.hxx>
  16. #include <rpc.h>
  17. #include <rpcdce.h>
  18. #include <mqmgr.h>
  19. #include "mqtrans.hxx"
  20. static HINSTANCE g_hMqRt = 0;
  21. FALCON_API *g_pMqApi = 0;
  22. static CQueueMap *g_pQueueMap = 0;
  23. char placeHolder[sizeof(MUTEX)]; // provide placeholder for the mutex
  24. MUTEX *gp_csContext = (MUTEX *)placeHolder; // the mutex itself
  25. #ifdef UNICODE
  26. #define ptszVal pwszVal
  27. #define VT_LPTSTR VT_LPWSTR
  28. #else
  29. #define ptszVal pszVal
  30. #define VT_LPTSTR VT_LPSTR
  31. #endif
  32. static PCONTEXT_HANDLE g_pContext = 0;
  33. // WARNING: the size and ordering of g_arszMqApis is dependent
  34. // on the definition of the FALCON_API structure in mqtrans.hxx
  35. // WARNING: When you are freeing Falcon allocated strings, do
  36. // *not* use MQFreeMemory. Use MQFreeStringFromProperty. Otherwise
  37. // you will break Win98 code.
  38. const static char *g_arszMqApis[] = {
  39. "MQLocateBegin",
  40. "MQLocateNext",
  41. "MQLocateEnd",
  42. "MQInstanceToFormatName",
  43. "MQOpenQueue",
  44. "MQFreeMemory",
  45. "MQSendMessage",
  46. "MQReceiveMessage",
  47. "MQCloseQueue",
  48. "MQDeleteQueue",
  49. "MQPathNameToFormatName",
  50. "MQCreateQueue",
  51. "MQGetMachineProperties",
  52. "MQGetQueueProperties",
  53. "MQSetQueueProperties",
  54. NULL };
  55. // Error mappings for MQ_MapStatusCode():
  56. typedef struct _STATUS_MAPPING
  57. {
  58. HRESULT hr;
  59. RPC_STATUS status;
  60. } STATUS_MAPPING;
  61. const static STATUS_MAPPING g_StatusMap[] =
  62. {
  63. { MQ_OK, RPC_S_OK },
  64. { MQ_ERROR_IO_TIMEOUT, RPC_P_TIMEOUT },
  65. { MQ_ERROR_INSUFFICIENT_RESOURCES, RPC_S_OUT_OF_MEMORY },
  66. { MQ_ERROR_QUEUE_NOT_FOUND, RPC_S_SERVER_UNAVAILABLE },
  67. { MQ_ERROR, RPC_S_INTERNAL_ERROR },
  68. { MQMSG_CLASS_NACK_BAD_DST_Q, RPC_S_SERVER_UNAVAILABLE },
  69. { MQMSG_CLASS_NACK_PURGED, RPC_S_CALL_FAILED_DNE },
  70. { MQMSG_CLASS_NACK_REACH_QUEUE_TIMEOUT, RPC_S_CALL_FAILED_DNE },
  71. { MQMSG_CLASS_NACK_Q_EXCEED_QUOTA, RPC_S_CALL_FAILED_DNE },
  72. { MQMSG_CLASS_NACK_ACCESS_DENIED, RPC_S_CALL_FAILED_DNE },
  73. { MQMSG_CLASS_NACK_HOP_COUNT_EXCEEDED, RPC_S_CALL_FAILED_DNE },
  74. { MQMSG_CLASS_NACK_BAD_SIGNATURE, RPC_S_CALL_FAILED_DNE },
  75. { MQMSG_CLASS_NACK_BAD_ENCRYPTION, RPC_S_CALL_FAILED_DNE },
  76. { MQMSG_CLASS_NACK_COULD_NOT_ENCRYPT, RPC_S_CALL_FAILED_DNE },
  77. { MQMSG_CLASS_NACK_NOT_TRANSACTIONAL_Q, RPC_S_CALL_FAILED_DNE },
  78. { MQMSG_CLASS_NACK_NOT_TRANSACTIONAL_MSG,RPC_S_CALL_FAILED_DNE },
  79. { MQMSG_CLASS_NACK_Q_DELETED, RPC_S_CALL_FAILED_DNE },
  80. { MQMSG_CLASS_NACK_Q_PURGED, RPC_S_CALL_FAILED_DNE },
  81. { MQMSG_CLASS_NACK_RECEIVE_TIMEOUT, RPC_S_CALL_FAILED_DNE },
  82. { MQ_ERROR_ILLEGAL_QUEUE_PATHNAME, RPC_S_INVALID_ENDPOINT_FORMAT}
  83. };
  84. //----------------------------------------------------------------
  85. // MQ_Initialize()
  86. //
  87. // Called by DG_TransportLoad() to initialize the ncadg_mq
  88. // transport. This function does a dynamic load of the Falcon
  89. // runtime DLL (MQRT.DLL) and creates a call table to access
  90. // the Falcon API. If successful, return TRUE, else return
  91. // FALSE.
  92. //----------------------------------------------------------------
  93. BOOL
  94. MQ_Initialize()
  95. {
  96. BOOL fStatus = TRUE;
  97. RPC_STATUS RpcStatus = RPC_S_OK;
  98. // Make sure the function count is correct...
  99. ASSERT( sizeof(g_arszMqApis)-sizeof(PVOID) == sizeof(FALCON_API) );
  100. //
  101. gp_csContext = new (gp_csContext) MUTEX(&RpcStatus);
  102. if (RpcStatus != RPC_S_OK)
  103. {
  104. return FALSE;
  105. }
  106. // Get the MSMQ runtime library:
  107. g_hMqRt = LoadLibrary(TEXT("mqrt.dll"));
  108. if (!g_hMqRt)
  109. {
  110. fStatus = FALSE;
  111. }
  112. // Build up the MSMQ call table that we will use to access
  113. // the MSMQ C API:
  114. if (fStatus)
  115. {
  116. g_pMqApi = (FALCON_API*)I_RpcAllocate(sizeof(FALCON_API));
  117. if (!g_pMqApi)
  118. {
  119. fStatus = FALSE;
  120. }
  121. }
  122. if (fStatus)
  123. {
  124. FARPROC *ppFn = (FARPROC*)g_pMqApi;
  125. int i = 0;
  126. while (g_arszMqApis[i])
  127. {
  128. *ppFn = GetProcAddress(g_hMqRt,g_arszMqApis[i++]);
  129. if (!*ppFn)
  130. {
  131. fStatus = FALSE;
  132. break;
  133. }
  134. ppFn++;
  135. }
  136. }
  137. if (fStatus)
  138. {
  139. g_pQueueMap = new CQueueMap;
  140. if (!g_pQueueMap)
  141. {
  142. fStatus = FALSE;
  143. }
  144. else
  145. {
  146. fStatus = g_pQueueMap->Initialize();
  147. }
  148. }
  149. if (!fStatus)
  150. {
  151. gp_csContext->Free();
  152. if (g_hMqRt)
  153. {
  154. FreeLibrary(g_hMqRt);
  155. g_hMqRt = 0;
  156. }
  157. if (g_pQueueMap)
  158. {
  159. delete g_pQueueMap;
  160. g_pQueueMap = 0;
  161. }
  162. if (g_pMqApi)
  163. {
  164. I_RpcFree(g_pMqApi);
  165. g_pMqApi = 0;
  166. }
  167. }
  168. return fStatus;
  169. }
  170. //----------------------------------------------------------------
  171. // MQ_MapStatusCode()
  172. //
  173. // Convert a MQ HRESULT status to a RPC_STATUS code.
  174. //----------------------------------------------------------------
  175. RPC_STATUS MQ_MapStatusCode( HRESULT hr, RPC_STATUS defaultStatus )
  176. {
  177. int iSize = sizeof(g_StatusMap)/sizeof(STATUS_MAPPING);
  178. RPC_STATUS status = defaultStatus;
  179. for (int i=0; i<iSize; i++)
  180. {
  181. if (hr == g_StatusMap[i].hr)
  182. {
  183. status = g_StatusMap[i].status;
  184. break;
  185. }
  186. }
  187. return status;
  188. }
  189. //----------------------------------------------------------------
  190. // MQ_InitOptions()
  191. //
  192. // Initialize transport specific binding handle options structure.
  193. //----------------------------------------------------------------
  194. RPC_STATUS RPC_ENTRY MQ_InitOptions( IN void PAPI *pvTransportOptions )
  195. {
  196. RPC_STATUS status = RPC_S_OK;
  197. MQ_OPTIONS *pOpts = (MQ_OPTIONS*)pvTransportOptions;
  198. if (pOpts)
  199. {
  200. memset(pOpts,0,sizeof(MQ_OPTIONS));
  201. pOpts->ulPriority = DEFAULT_PRIORITY;
  202. pOpts->ulTimeToReachQueue = INFINITE;
  203. pOpts->ulTimeToReceive = INFINITE;
  204. }
  205. else
  206. {
  207. status = RPC_S_OUT_OF_MEMORY;
  208. }
  209. return status;
  210. }
  211. //----------------------------------------------------------------
  212. // MQ_SetOption()
  213. //
  214. // Set transport specific binding handle options.
  215. //----------------------------------------------------------------
  216. RPC_STATUS RPC_ENTRY MQ_SetOption( IN void PAPI *pvTransportOptions,
  217. IN unsigned long option,
  218. IN ULONG_PTR optionValue )
  219. {
  220. RPC_STATUS status = RPC_S_OK;
  221. MQ_OPTIONS *pOpts = (MQ_OPTIONS*)pvTransportOptions;
  222. switch (option)
  223. {
  224. case RPC_C_OPT_MQ_DELIVERY:
  225. if ( (optionValue == RPC_C_MQ_EXPRESS)
  226. || (optionValue == RPC_C_MQ_RECOVERABLE) )
  227. pOpts->ulDelivery = (unsigned long) optionValue;
  228. else
  229. status = RPC_S_INVALID_ARG;
  230. break;
  231. case RPC_C_OPT_MQ_PRIORITY:
  232. if (optionValue <= MQ_MAX_PRIORITY)
  233. pOpts->ulPriority = (unsigned long) optionValue;
  234. else
  235. pOpts->ulPriority = MQ_MAX_PRIORITY;
  236. break;
  237. case RPC_C_OPT_MQ_JOURNAL:
  238. if ( (optionValue == RPC_C_MQ_JOURNAL_NONE)
  239. || (optionValue == RPC_C_MQ_JOURNAL_ALWAYS)
  240. || (optionValue == RPC_C_MQ_JOURNAL_DEADLETTER) )
  241. pOpts->ulJournaling = (unsigned long) optionValue;
  242. else
  243. status = RPC_S_INVALID_ARG;
  244. break;
  245. case RPC_C_OPT_MQ_TIME_TO_REACH_QUEUE:
  246. pOpts->ulTimeToReachQueue = (unsigned long) optionValue;
  247. break;
  248. case RPC_C_OPT_MQ_TIME_TO_BE_RECEIVED:
  249. pOpts->ulTimeToReceive = (unsigned long) optionValue;
  250. break;
  251. case RPC_C_OPT_MQ_ACKNOWLEDGE:
  252. pOpts->fAck = (optionValue != FALSE);
  253. break;
  254. case RPC_C_OPT_MQ_AUTHN_SERVICE:
  255. if (optionValue == RPC_C_AUTHN_MQ)
  256. {
  257. status = RPC_S_OK;
  258. }
  259. else if (optionValue == RPC_C_AUTHN_NONE)
  260. {
  261. pOpts->fAuthenticate = FALSE;
  262. pOpts->fEncrypt = FALSE;
  263. status = RPC_S_OK;
  264. }
  265. else
  266. {
  267. status = RPC_S_UNKNOWN_AUTHN_SERVICE;
  268. }
  269. break;
  270. case RPC_C_OPT_MQ_AUTHN_LEVEL:
  271. if (optionValue > RPC_C_AUTHN_LEVEL_NONE)
  272. {
  273. pOpts->fAuthenticate = TRUE;
  274. pOpts->fEncrypt = (optionValue == RPC_C_AUTHN_LEVEL_PKT_PRIVACY)? TRUE : FALSE;
  275. }
  276. else
  277. {
  278. pOpts->fAuthenticate = FALSE;
  279. pOpts->fEncrypt = FALSE;
  280. }
  281. break;
  282. default:
  283. status = RPC_S_CANNOT_SUPPORT;
  284. break;
  285. }
  286. // The following is some code to make sure the RPC_C_xxx
  287. // constants always have the correct values:
  288. #if ( (RPC_C_MQ_EXPRESS != MQMSG_DELIVERY_EXPRESS) \
  289. || (RPC_C_MQ_RECOVERABLE != MQMSG_DELIVERY_RECOVERABLE) )
  290. #error "RPC constants wrong"
  291. #endif
  292. #if ( (RPC_C_MQ_JOURNAL_NONE != MQMSG_JOURNAL_NONE) \
  293. || (RPC_C_MQ_JOURNAL_ALWAYS != MQMSG_JOURNAL) \
  294. || (RPC_C_MQ_JOURNAL_DEADLETTER != MQMSG_DEADLETTER) )
  295. #error "RPC constants wrong"
  296. #endif
  297. return status;
  298. }
  299. //----------------------------------------------------------------
  300. // MQ_InqOption()
  301. //
  302. // Get transport specific binding handle options.
  303. //----------------------------------------------------------------
  304. RPC_STATUS RPC_ENTRY MQ_InqOption( IN void PAPI *pvTransportOptions,
  305. IN unsigned long option,
  306. OUT ULONG_PTR *pOptionValue )
  307. {
  308. RPC_STATUS status = RPC_S_OK;
  309. MQ_OPTIONS *pOpts = (MQ_OPTIONS*)pvTransportOptions;
  310. switch (option)
  311. {
  312. case RPC_C_OPT_MQ_DELIVERY:
  313. *pOptionValue = pOpts->ulDelivery;
  314. break;
  315. case RPC_C_OPT_MQ_PRIORITY:
  316. *pOptionValue = pOpts->ulPriority;
  317. break;
  318. case RPC_C_OPT_MQ_JOURNAL:
  319. *pOptionValue = pOpts->ulJournaling;
  320. break;
  321. case RPC_C_OPT_MQ_TIME_TO_REACH_QUEUE:
  322. *pOptionValue = pOpts->ulTimeToReachQueue;
  323. break;
  324. case RPC_C_OPT_MQ_TIME_TO_BE_RECEIVED:
  325. *pOptionValue = pOpts->ulTimeToReceive;
  326. break;
  327. case RPC_C_OPT_MQ_ACKNOWLEDGE:
  328. *pOptionValue = pOpts->fAck;
  329. break;
  330. default:
  331. status = RPC_S_INVALID_ARG;
  332. *pOptionValue = 0;
  333. break;
  334. }
  335. return status;
  336. }
  337. //----------------------------------------------------------------
  338. // MQ_ImplementOptions()
  339. //
  340. // Apply transport specific binding handle options to the
  341. // specified server.
  342. //----------------------------------------------------------------
  343. RPC_STATUS RPC_ENTRY MQ_ImplementOptions(
  344. IN DG_TRANSPORT_ENDPOINT pvTransEndpoint,
  345. IN void *pvTransportOptions )
  346. {
  347. RPC_STATUS Status = RPC_S_OK;
  348. HRESULT hr;
  349. MQ_OPTIONS *pOpts = (MQ_OPTIONS*)pvTransportOptions;
  350. MQ_DATAGRAM_ENDPOINT *pEndpoint = (MQ_DATAGRAM_ENDPOINT*)pvTransEndpoint;
  351. pEndpoint->fAck = pOpts->fAck;
  352. pEndpoint->ulDelivery = pOpts->ulDelivery;
  353. pEndpoint->ulPriority = pOpts->ulPriority;
  354. pEndpoint->ulJournaling = pOpts->ulJournaling;
  355. pEndpoint->ulTimeToReachQueue = pOpts->ulTimeToReachQueue;
  356. pEndpoint->ulTimeToReceive = pOpts->ulTimeToReceive;
  357. pEndpoint->fAuthenticate = pOpts->fAuthenticate;
  358. pEndpoint->fEncrypt = pOpts->fEncrypt;
  359. //
  360. // If the fAck flag is set, then we want to get an acknowledgement
  361. // for each call (message) as it gets to the destination (server)
  362. // queue. So, setup an admin queue to receive Falcon ACK messages.
  363. //
  364. if ( (pEndpoint->fAck) && (pEndpoint->hAdminQueue == 0) )
  365. {
  366. hr = SetupAdminQueue(pEndpoint);
  367. Status = MQ_MapStatusCode(hr,RPC_S_INTERNAL_ERROR);
  368. if (Status == RPC_S_OK)
  369. {
  370. MQ_RegisterQueueToDelete(pEndpoint->wsAdminQFormat,pEndpoint->wsMachine);
  371. }
  372. }
  373. return Status;
  374. }
  375. //----------------------------------------------------------------
  376. // MQ_BuildAddressVector()
  377. //
  378. //----------------------------------------------------------------
  379. RPC_STATUS MQ_BuildAddressVector( OUT NETWORK_ADDRESS_VECTOR **ppVector )
  380. {
  381. DWORD dwSize;
  382. RPC_CHAR wsMachine[MAX_COMPUTERNAME_LEN];
  383. NETWORK_ADDRESS_VECTOR *pVector;
  384. dwSize = sizeof(wsMachine)/sizeof(RPC_CHAR);
  385. GetComputerName((RPC_SCHAR *)wsMachine,&dwSize);
  386. *ppVector = 0;
  387. pVector = new( sizeof(RPC_CHAR*)
  388. + sizeof(RPC_CHAR)*(1+RpcpStringLength(wsMachine)) )
  389. NETWORK_ADDRESS_VECTOR;
  390. if (!pVector)
  391. {
  392. return RPC_S_OUT_OF_MEMORY;
  393. }
  394. pVector->Count = 1;
  395. pVector->NetworkAddresses[0] = (RPC_CHAR*)(&pVector->NetworkAddresses[1]);
  396. RpcpStringCopy(pVector->NetworkAddresses[0],wsMachine);
  397. *ppVector = pVector;
  398. return RPC_S_OK;
  399. }
  400. //----------------------------------------------------------------
  401. // MQ_FillInAddress()
  402. //
  403. //----------------------------------------------------------------
  404. RPC_STATUS MQ_FillInAddress( MQ_ADDRESS *pAddress,
  405. MQPROPVARIANT *pMsgProps )
  406. {
  407. pAddress->fAuthenticated = pMsgProps[I_AUTHENTICATED].bVal;
  408. pAddress->ulPrivacyLevel = pMsgProps[I_PRIVACY_LEVEL].ulVal;
  409. ParseQueuePathName( (RPC_CHAR *)pMsgProps[I_MESSAGE_LABEL].ptszVal,
  410. pAddress->wsMachine,
  411. pAddress->wsQName );
  412. return RPC_S_OK;
  413. }
  414. #ifdef TRANSPORT_DLL
  415. //----------------------------------------------------------------
  416. // MIDL_user_allocate()
  417. //
  418. // Used by Mq_RegisterQueueToDelete().
  419. //----------------------------------------------------------------
  420. void __RPC_FAR * __RPC_USER MIDL_user_allocate( size_t len )
  421. {
  422. return (I_RpcAllocate(len));
  423. }
  424. //----------------------------------------------------------------
  425. // MIDL_user_free()
  426. //
  427. // Used by Mq_RegisterQueueToDelete().
  428. //----------------------------------------------------------------
  429. void __RPC_USER MIDL_user_free( void __RPC_FAR *ptr )
  430. {
  431. I_RpcFree(ptr);
  432. }
  433. #endif
  434. //----------------------------------------------------------------
  435. // MQ_RegisterQueueToDelete()
  436. //
  437. //----------------------------------------------------------------
  438. RPC_STATUS MQ_RegisterQueueToDelete( RPC_CHAR *pwsQFormat,
  439. RPC_CHAR *pwsMachine )
  440. {
  441. RPC_STATUS Status;
  442. RPC_CHAR *pwsStringBinding;
  443. RPC_BINDING_HANDLE hBinding = 0;
  444. ASSERT(pwsQFormat);
  445. gp_csContext->Request();
  446. // This is a long critical section... but only for the first call.
  447. if (!g_pContext)
  448. {
  449. Status = RpcStringBindingCompose( NULL,
  450. Q_SVC_PROTSEQ,
  451. pwsMachine,
  452. Q_SVC_ENDPOINT,
  453. NULL,
  454. &pwsStringBinding );
  455. if (RPC_S_OK == Status)
  456. {
  457. Status = RpcBindingFromStringBinding( pwsStringBinding,
  458. &hBinding );
  459. if (RPC_S_OK != Status)
  460. {
  461. gp_csContext->Clear();
  462. return Status;
  463. }
  464. RpcStringFree(&pwsStringBinding);
  465. RpcTryExcept
  466. {
  467. Status = MqGetContext(hBinding,&g_pContext);
  468. Status = RpcBindingFree(&hBinding);
  469. }
  470. RpcExcept(I_RpcExceptionFilter(RpcExceptionCode()))
  471. {
  472. Status = RpcExceptionCode();
  473. }
  474. RpcEndExcept
  475. }
  476. }
  477. gp_csContext->Clear();
  478. if (g_pContext)
  479. {
  480. Status = MqRegisterQueue(g_pContext,pwsQFormat);
  481. }
  482. return Status;
  483. }
  484. //----------------------------------------------------------------
  485. // ConstructQueuePathName()
  486. //
  487. // Return Value: TRUE on success.
  488. // FALSE on fail (Path Name buffer too small).
  489. //----------------------------------------------------------------
  490. BOOL ConstructQueuePathName( IN RPC_CHAR *pwsMachine,
  491. IN RPC_CHAR *pwsQName,
  492. OUT RPC_CHAR *pwsPathName,
  493. IN OUT DWORD *pdwSize )
  494. {
  495. BOOL status = TRUE;
  496. DWORD len = sizeof(RPC_CHAR) * (1 + RpcpStringLength(pwsMachine)
  497. + RpcpStringLength(WS_SEPARATOR)
  498. + RpcpStringLength(pwsQName) );
  499. if (*pdwSize < len)
  500. {
  501. status = FALSE;
  502. }
  503. else
  504. {
  505. RpcpStringCopy(pwsPathName,pwsMachine);
  506. RpcpStringCat(pwsPathName,WS_SEPARATOR);
  507. RpcpStringCat(pwsPathName,pwsQName);
  508. }
  509. *pdwSize = len;
  510. return status;
  511. }
  512. //----------------------------------------------------------------
  513. // ConstructPrivateQueuePathName()
  514. //
  515. // Return Value: TRUE on success.
  516. // FALSE on fail (Path Name buffer too small).
  517. //----------------------------------------------------------------
  518. BOOL ConstructPrivateQueuePathName( IN RPC_CHAR *pwsMachine,
  519. IN RPC_CHAR *pwsQName,
  520. OUT RPC_CHAR *pwsPathName,
  521. IN OUT DWORD *pdwSize )
  522. {
  523. BOOL status = TRUE;
  524. DWORD dwSize = sizeof(RPC_CHAR) * (1 + RpcpStringLength(pwsMachine)
  525. + RpcpStringLength(WS_PRIVATE_DOLLAR)
  526. + RpcpStringLength(pwsQName) );
  527. if (*pdwSize < dwSize)
  528. {
  529. status = FALSE;
  530. }
  531. else
  532. {
  533. RpcpStringCopy(pwsPathName,pwsMachine);
  534. RpcpStringCat(pwsPathName,WS_PRIVATE_DOLLAR);
  535. RpcpStringCat(pwsPathName,pwsQName);
  536. }
  537. *pdwSize = dwSize;
  538. return status;
  539. }
  540. #ifdef USE_PRIVATE_QUEUES
  541. //----------------------------------------------------------------
  542. // ConstructPrivateDirectFormat()
  543. //
  544. // Return Value: TRUE on success.
  545. // FALSE on fail (Path Name buffer too small).
  546. //----------------------------------------------------------------
  547. BOOL ConstructPrivateDirectFormat( IN RPC_CHAR *pwsMachine,
  548. IN RPC_CHAR *pwsQName,
  549. OUT RPC_CHAR *pwsPathName,
  550. IN OUT DWORD *pdwSize )
  551. {
  552. BOOL status = TRUE;
  553. DWORD dwSize = sizeof(RPC_CHAR) * (1 + RpcpStringLength(WS_DIRECT),
  554. + RpcpStringLength(pwsMachine)
  555. + RpcpStringLength(WS_PRIVATE_DOLLAR)
  556. + RpcpStringLength(pwsQName) );
  557. if (*pdwSize < dwSize)
  558. {
  559. status = FALSE;
  560. }
  561. else
  562. {
  563. RpcpStringCopy(pwsPathName,WS_DIRECT);
  564. RpcpStringCat(pwsPathName,pwsMachine);
  565. RpcpStringCat(pwsPathName,WS_PRIVATE_DOLLAR);
  566. RpcpStringCat(pwsPathName,pwsQName);
  567. }
  568. *pdwSize = dwSize;
  569. return status;
  570. }
  571. #endif
  572. #if FALSE
  573. //----------------------------------------------------------------
  574. // ConstructDirectFormat()
  575. //
  576. // Return Value: TRUE on success.
  577. // FALSE on fail (Path Name buffer too small).
  578. //----------------------------------------------------------------
  579. BOOL ConstructDirectFormat( IN RPC_CHAR *pwsMachine,
  580. IN RPC_CHAR *pwsQName,
  581. OUT RPC_CHAR *pwsPathName,
  582. IN OUT DWORD *pdwSize )
  583. {
  584. BOOL status = TRUE;
  585. DWORD dwSize = sizeof(RPC_CHAR) * (1 + RpcpStringLength(WS_DIRECT)
  586. + RpcpStringLength(pwsMachine)
  587. + RpcpStringLength(WS_SEPARATOR)
  588. + RpcpStringLength(pwsQName) );
  589. if (*pdwSize < dwSize)
  590. {
  591. status = FALSE;
  592. }
  593. else
  594. {
  595. RpcpStringCopy(pwsPathName,WS_DIRECT);
  596. RpcpStringCat(pwsPathName,pwsMachine);
  597. RpcpStringCat(pwsPathName,WS_SEPARATOR);
  598. RpcpStringCat(pwsPathName,pwsQName);
  599. }
  600. *pdwSize = dwSize;
  601. return status;
  602. }
  603. #endif
  604. //----------------------------------------------------------------
  605. // ParseQueuePathName()
  606. //
  607. // For RPC's use of MQ, a queue path name is of the form:
  608. // "machine_name\queue_name" or "machine\PRIVATE$\queue_name".
  609. // This routine extracts the machine name and queue name from
  610. // a given queue path name.
  611. //
  612. // Return Value: TRUE on success.
  613. // FALSE on fail (Can't find the "\" separator).
  614. //----------------------------------------------------------------
  615. BOOL ParseQueuePathName(
  616. IN RPC_CHAR *pwsPathName,
  617. OUT RPC_CHAR wsMachineName[MAX_COMPUTERNAME_LEN],
  618. OUT RPC_CHAR wsQueueName[MQ_MAX_Q_NAME_LEN] )
  619. {
  620. BOOL status = TRUE;
  621. RPC_CHAR *pSlash;
  622. pSlash = (RPC_CHAR *)RpcpCharacter(pwsPathName,*(WS_SEPARATOR));
  623. if (pSlash)
  624. {
  625. *pSlash = (RPC_CHAR)0;
  626. RpcpStringCopy(wsMachineName,pwsPathName);
  627. *pSlash = *(WS_SEPARATOR);
  628. }
  629. else
  630. status = FALSE;
  631. if (status)
  632. {
  633. pSlash = (RPC_CHAR *)RpcpCharacter(pwsPathName,*(WS_SEPARATOR));
  634. if (pSlash)
  635. {
  636. RpcpStringCopy(wsQueueName,++pSlash);
  637. }
  638. else
  639. status = FALSE;
  640. }
  641. return status;
  642. }
  643. //--------------------------------------------------------------------
  644. // LocateQueueViaQName()
  645. //
  646. // Try to find a MQ queue of type specified by the Queue UUID (the
  647. // queue type) with the specified queue name. The first one that is
  648. // found (if any) is returned.
  649. //--------------------------------------------------------------------
  650. HRESULT LocateQueueViaQName( IN OUT MQ_ADDRESS *pAddress )
  651. {
  652. HRESULT hr = MQ_OK;
  653. HANDLE hEnum;
  654. int iSize;
  655. DWORD dwSize;
  656. DWORD cProps;
  657. DWORD cQueue;
  658. UUID QUuid;
  659. QUEUEPROPID aqPropId[MAX_VAR];
  660. MQPROPVARIANT aPropVar[MAX_VAR];
  661. MQPROPERTYRESTRICTION aPropRestrict[MAX_VAR];
  662. MQRESTRICTION restrict;
  663. MQCOLUMNSET column;
  664. RPC_CHAR wsMachine[MAX_COMPUTERNAME_LEN];
  665. RPC_CHAR wsQName[MQ_MAX_Q_NAME_LEN];
  666. if (RPC_S_OK != UuidFromString(SVR_QTYPE_UUID_STR,&QUuid))
  667. {
  668. return MQ_ERROR;
  669. }
  670. // Set up the restriction properties such that we will
  671. // only find our queue (of type pQUuid):
  672. cProps = 0;
  673. aPropRestrict[cProps].rel = PREQ;
  674. aPropRestrict[cProps].prop = PROPID_Q_TYPE;
  675. aPropRestrict[cProps].prval.vt = VT_CLSID;
  676. aPropRestrict[cProps].prval.puuid = &QUuid;
  677. cProps++;
  678. ASSERT(cProps < MAX_VAR);
  679. restrict.cRes = cProps;
  680. restrict.paPropRes = aPropRestrict;
  681. cProps = 0;
  682. aqPropId[cProps++] = PROPID_Q_INSTANCE;
  683. aqPropId[cProps++] = PROPID_Q_PATHNAME;
  684. ASSERT(cProps < MAX_VAR);
  685. column.cCol = cProps;
  686. column.aCol = aqPropId;
  687. // Ok, do a locate enumeration:
  688. hr = MQLocateBegin(NULL,&restrict,&column,NULL,&hEnum);
  689. if (FAILED(hr))
  690. {
  691. return hr;
  692. }
  693. cQueue = cProps;
  694. while (cQueue > 0)
  695. {
  696. hr = MQLocateNext( hEnum, &cQueue, aPropVar );
  697. if (FAILED(hr))
  698. {
  699. MQLocateEnd(hEnum);
  700. return hr;
  701. }
  702. if (cQueue > 0)
  703. {
  704. // Now extract the queue name from the path name:
  705. if (ParseQueuePathName((RPC_CHAR *)(aPropVar[1].ptszVal),wsMachine,wsQName))
  706. {
  707. if (!RpcpStringCompare(pAddress->wsQName,wsQName))
  708. {
  709. // We have a match! Ok, get the format name,
  710. // cleanup then return...
  711. // Transform the queue instance UUID into a
  712. // format name:
  713. dwSize = sizeof(pAddress->wsQFormat);
  714. hr = MQInstanceToFormatName( aPropVar[0].puuid, pAddress->wsQFormat, &dwSize);
  715. if (FAILED(hr))
  716. {
  717. break;
  718. }
  719. // Free memory allocated by MQLocateNext():
  720. MQFreeMemory(aPropVar[0].puuid); // From: PROPID_Q_INSTANCE
  721. MQFreeStringFromProperty(&aPropVar[1]); // From: PROPID_Q_PATHNAME
  722. // Machine name:
  723. RpcpStringCopy(pAddress->wsMachine,wsMachine);
  724. break;
  725. }
  726. }
  727. // Free memory allocated by MQLocateNext():
  728. MQFreeMemory(aPropVar[0].puuid); // From: PROPID_Q_INSTANCE
  729. MQFreeStringFromProperty(&aPropVar[1]); // From: PROPID_Q_PATHNAME
  730. }
  731. }
  732. MQLocateEnd(hEnum);
  733. if (cQueue == 0)
  734. {
  735. return MQ_ERROR_QUEUE_NOT_FOUND;
  736. }
  737. return hr;
  738. }
  739. //----------------------------------------------------------------
  740. // CreateQueue()
  741. //
  742. // Create a MQ queue of the specified path name.
  743. //
  744. // Return Value: MQ HRESULT value.
  745. //
  746. //----------------------------------------------------------------
  747. HRESULT CreateQueue( IN SECURITY_DESCRIPTOR *pSecurityDescriptor,
  748. IN UUID *pQueueUuid,
  749. IN RPC_CHAR *pwsPathName,
  750. IN RPC_CHAR *pwsQueueLabel,
  751. IN ULONG ulQueueFlags,
  752. OUT RPC_CHAR *pwsFormat,
  753. IN OUT DWORD *pdwFormatSize )
  754. {
  755. HRESULT hr;
  756. DWORD cProps;
  757. DWORD dwSize;
  758. MQQUEUEPROPS qProps;
  759. MQPROPVARIANT aPropVar[MAX_VAR];
  760. QUEUEPROPID aqPropId[MAX_VAR];
  761. //
  762. // Setup properties to create a queue on this machine:
  763. //
  764. cProps = 0;
  765. // Set the PathName:
  766. aqPropId[cProps] = PROPID_Q_PATHNAME;
  767. aPropVar[cProps].vt = VT_LPTSTR;
  768. aPropVar[cProps].ptszVal = (RPC_SCHAR *)pwsPathName;
  769. cProps++;
  770. // Set the type of the queue (this is a UUID).
  771. // This can be used to locate RPC specific queues.
  772. if (pQueueUuid)
  773. {
  774. aqPropId[cProps] = PROPID_Q_TYPE;
  775. aPropVar[cProps].vt = VT_CLSID;
  776. aPropVar[cProps].puuid = pQueueUuid;
  777. cProps++;
  778. }
  779. // Do we want to force authentication of messages
  780. // on the queue?
  781. if (ulQueueFlags & RPC_C_MQ_AUTHN_LEVEL_PKT_INTEGRITY)
  782. {
  783. aqPropId[cProps] = PROPID_Q_AUTHENTICATE;
  784. aPropVar[cProps].vt = VT_UI1;
  785. aPropVar[cProps].bVal = TRUE;
  786. cProps++;
  787. }
  788. // Do we want to force encrypted messages?
  789. if (ulQueueFlags & RPC_C_MQ_AUTHN_LEVEL_PKT_PRIVACY)
  790. {
  791. aqPropId[cProps] = PROPID_Q_PRIV_LEVEL;
  792. aPropVar[cProps].vt = VT_UI4;
  793. aPropVar[cProps].ulVal = MQ_PRIV_LEVEL_BODY;
  794. cProps++;
  795. }
  796. // Put a description to the queue.
  797. // Useful for administration purposes (through the MSMQ admin tools).
  798. aqPropId[cProps] = PROPID_Q_LABEL;
  799. aPropVar[cProps].vt = VT_LPTSTR;
  800. aPropVar[cProps].ptszVal = (RPC_SCHAR *)pwsQueueLabel;
  801. cProps++;
  802. ASSERT(cProps < MAX_VAR);
  803. // Assemble the QUEUEPROPS structure:
  804. qProps.cProp = cProps;
  805. qProps.aPropID = aqPropId;
  806. qProps.aPropVar = aPropVar;
  807. qProps.aStatus = 0;
  808. //-------------------------------------------------------
  809. // Create the queue
  810. hr = MQCreateQueue( pSecurityDescriptor,// Queue permissions.
  811. &qProps, // Queue properties.
  812. pwsFormat, // Format Name [out].
  813. pdwFormatSize ); // Size of Format Name [in,out].
  814. return hr;
  815. }
  816. //----------------------------------------------------------------
  817. // ConnectToServerQueue()
  818. //
  819. //----------------------------------------------------------------
  820. RPC_STATUS ConnectToServerQueue( MQ_ADDRESS *pAddress,
  821. RPC_CHAR *pNetworkAddress,
  822. RPC_CHAR *pEndpoint )
  823. {
  824. HRESULT hr;
  825. DWORD dwSize;
  826. RPC_CHAR wsQPathName[MAX_PATHNAME_LEN];
  827. //
  828. // First, check the end point:
  829. //
  830. if ( (pEndpoint == NULL)
  831. || (*pEndpoint == '\0')
  832. || (RpcpStringLength(pEndpoint) >= MQ_MAX_Q_NAME_LEN) )
  833. {
  834. return RPC_S_INVALID_ENDPOINT_FORMAT;
  835. }
  836. memset(pAddress,0,sizeof(MQ_ADDRESS));
  837. RpcpStringCopy(pAddress->wsQName,pEndpoint);
  838. //
  839. // Now, if the server was specified, then use it as is,
  840. // otherwise use the local machine name:
  841. //
  842. if ( (pNetworkAddress == NULL) || (*pNetworkAddress == '\0') )
  843. {
  844. dwSize = sizeof(pAddress->wsMachine);
  845. GetComputerName((RPC_SCHAR *)pAddress->wsMachine,&dwSize);
  846. }
  847. else if (RpcpStringLength(pNetworkAddress) >= MAX_COMPUTERNAME_LEN)
  848. {
  849. return RPC_S_INVALID_ENDPOINT_FORMAT;
  850. }
  851. else
  852. {
  853. RpcpStringCopy(pAddress->wsMachine,pNetworkAddress);
  854. }
  855. //
  856. // If the server name is a "*", then locate a server (andy) that
  857. // has the specified queue name:
  858. //
  859. if (!RpcpStringCompare(pAddress->wsMachine,WS_ASTRISK))
  860. {
  861. hr = LocateQueueViaQName(pAddress);
  862. if (FAILED(hr))
  863. {
  864. return RPC_S_SERVER_UNAVAILABLE;
  865. }
  866. }
  867. #if FALSE
  868. //
  869. // Try to use a direct format to get to the server queue:
  870. //
  871. dwSize = sizeof(pAddress->wsQFormat);
  872. if (!ConstructDirectFormat( pAddress->wsMachine,
  873. pAddress->wsQName,
  874. pAddress->wsQFormat,
  875. &dwSize))
  876. {
  877. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  878. DPFLTR_WARNING_LEVEL,
  879. RPCTRANS "ConnectToServerQueue(): ConstructDirectFormat() failed.\n"));
  880. return RPC_S_SERVER_UNAVAILABLE;
  881. }
  882. hr = MQOpenQueue( pAddress->wsQFormat,
  883. MQ_SEND_ACCESS, 0, &(pAddress->hQueue) );
  884. if (!FAILED(hr))
  885. {
  886. return RPC_S_OK;
  887. }
  888. #endif
  889. //
  890. // If we get here, then the direct format failed, so try using
  891. // a lookup (MQPathNameToFormatName()):
  892. //
  893. dwSize = sizeof(wsQPathName);
  894. if (!ConstructQueuePathName( pAddress->wsMachine,
  895. pAddress->wsQName,
  896. wsQPathName,
  897. &dwSize ))
  898. {
  899. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  900. DPFLTR_WARNING_LEVEL,
  901. RPCTRANS "ConnectToServerQueue(): ConstructQueuePathName() failed.\n"));
  902. return RPC_S_SERVER_UNAVAILABLE;
  903. }
  904. dwSize = sizeof(pAddress->wsQFormat);
  905. hr = MQPathNameToFormatName( wsQPathName,
  906. pAddress->wsQFormat,
  907. &dwSize );
  908. if (FAILED(hr))
  909. {
  910. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  911. DPFLTR_WARNING_LEVEL, RPCTRANS "ConnectToServerQueue(): MQPathNameToFormatName() failed: 0x%x\n",
  912. hr));
  913. return RPC_S_SERVER_UNAVAILABLE;
  914. }
  915. hr = MQOpenQueue( pAddress->wsQFormat,
  916. MQ_SEND_ACCESS, 0, &(pAddress->hQueue) );
  917. if (FAILED(hr))
  918. {
  919. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  920. DPFLTR_WARNING_LEVEL,
  921. RPCTRANS "ConnectToServerQueue(): MQOpenQueue() failed: 0x%x\n",
  922. hr));
  923. return RPC_S_SERVER_UNAVAILABLE;
  924. }
  925. return RPC_S_OK;
  926. }
  927. //----------------------------------------------------------------
  928. // DisconnectFromServer()
  929. //
  930. //----------------------------------------------------------------
  931. RPC_STATUS DisconnectFromServer( IN OUT MQ_ADDRESS *pAddress )
  932. {
  933. DWORD Status = NO_ERROR;
  934. if ((pAddress) && (pAddress->hQueue))
  935. {
  936. MQCloseQueue(pAddress->hQueue);
  937. pAddress->hQueue = 0;
  938. }
  939. return Status;
  940. }
  941. //----------------------------------------------------------------
  942. // SetQueueProperties()
  943. //
  944. // Set the properties for an already existing queue. Currently
  945. // the only two Falcon queue properties that need to be set are
  946. // for forcing message authentication and encryption.
  947. //
  948. // Return Value: MQ HRESULT value.
  949. //
  950. //----------------------------------------------------------------
  951. HRESULT SetQueueProperties( IN RPC_CHAR *pwsQFormat,
  952. IN ULONG ulQueueFlags )
  953. {
  954. HRESULT hr = MQ_OK;
  955. DWORD cProps = 0;
  956. DWORD dwSize;
  957. MQQUEUEPROPS qProps;
  958. MQPROPVARIANT aPropVar[MAX_VAR];
  959. QUEUEPROPID aqPropId[MAX_VAR];
  960. HRESULT aStatus[MAX_VAR];
  961. // Do we want to force authentication of messages
  962. // on the queue?
  963. if (ulQueueFlags & RPC_C_MQ_AUTHN_LEVEL_PKT_INTEGRITY)
  964. {
  965. aqPropId[cProps] = PROPID_Q_AUTHENTICATE;
  966. aPropVar[cProps].vt = VT_UI1;
  967. aPropVar[cProps].bVal = TRUE;
  968. cProps++;
  969. }
  970. else
  971. {
  972. aqPropId[cProps] = PROPID_Q_AUTHENTICATE;
  973. aPropVar[cProps].vt = VT_UI1;
  974. aPropVar[cProps].bVal = FALSE;
  975. cProps++;
  976. }
  977. // Do we want to force encrypted messages?
  978. if (ulQueueFlags & RPC_C_MQ_AUTHN_LEVEL_PKT_PRIVACY)
  979. {
  980. aqPropId[cProps] = PROPID_Q_PRIV_LEVEL;
  981. aPropVar[cProps].vt = VT_UI4;
  982. aPropVar[cProps].ulVal = MQ_PRIV_LEVEL_BODY;
  983. cProps++;
  984. }
  985. else
  986. {
  987. aqPropId[cProps] = PROPID_Q_PRIV_LEVEL;
  988. aPropVar[cProps].vt = VT_UI4;
  989. aPropVar[cProps].ulVal = MQ_PRIV_LEVEL_OPTIONAL;
  990. cProps++;
  991. }
  992. // Assemble the QUEUEPROPS structure:
  993. qProps.cProp = cProps;
  994. qProps.aPropID = aqPropId;
  995. qProps.aPropVar = aPropVar;
  996. qProps.aStatus = 0;
  997. // Set the new queue properties:
  998. hr = MQSetQueueProperties(pwsQFormat,&qProps);
  999. return hr;
  1000. }
  1001. //--------------------------------------------------------------------
  1002. // ClearQueue()
  1003. //
  1004. // Clear out all waiting messages from the specified queue (if
  1005. // any).
  1006. //
  1007. //--------------------------------------------------------------------
  1008. HRESULT ClearQueue( QUEUEHANDLE hQueue )
  1009. {
  1010. HRESULT hr;
  1011. DWORD cProps = 0;
  1012. MQMSGPROPS msgProps;
  1013. MSGPROPID aMsgPropID[MAX_RECV_VAR];
  1014. MQPROPVARIANT aMsgPropVar[MAX_RECV_VAR];
  1015. RPC_CHAR wsMsgLabel[MQ_MAX_MSG_LABEL_LEN];
  1016. //
  1017. // MQ doesn't seem to let me clear out the queue (by reading
  1018. // messages) unless we have at least one queue property.
  1019. //
  1020. aMsgPropID[cProps] = PROPID_M_LABEL;
  1021. aMsgPropVar[cProps].vt = VT_LPTSTR;
  1022. aMsgPropVar[cProps].ptszVal = (RPC_SCHAR *)wsMsgLabel;
  1023. cProps++;
  1024. aMsgPropID[cProps] = PROPID_M_LABEL_LEN;
  1025. aMsgPropVar[cProps].vt = VT_UI4;
  1026. aMsgPropVar[cProps].ulVal = sizeof(wsMsgLabel);
  1027. cProps++;
  1028. msgProps.cProp = cProps;
  1029. msgProps.aPropID = aMsgPropID;
  1030. msgProps.aPropVar = aMsgPropVar;
  1031. msgProps.aStatus = 0;
  1032. //
  1033. // pull up all pending MQ messages.
  1034. //
  1035. while (TRUE)
  1036. {
  1037. hr = MQReceiveMessage(hQueue,0,MQ_ACTION_RECEIVE,
  1038. &msgProps,NULL,NULL,NULL,NULL);
  1039. if (FAILED(hr))
  1040. break;
  1041. }
  1042. //
  1043. // A timeout means the queue is empty:
  1044. //
  1045. if (hr == MQ_ERROR_IO_TIMEOUT)
  1046. hr = MQ_OK;
  1047. return hr;
  1048. }
  1049. //----------------------------------------------------------------
  1050. // ClientSetupQueue()
  1051. //
  1052. // Called by MQ_CreateEndpoint() to create and initialize the
  1053. // client's message queue (to read server responses).
  1054. //
  1055. // pEP -- The structure that will hold information about
  1056. // the queue being created and setup.
  1057. //
  1058. // pwsMachine -- The machine to create the queue on.
  1059. //
  1060. // pwsEndpoint -- RPC_CHAR string name of the endpoint. This will
  1061. // be used as the queue name.
  1062. //
  1063. //----------------------------------------------------------------
  1064. HRESULT ClientSetupQueue( MQ_DATAGRAM_ENDPOINT *pEndpoint,
  1065. RPC_CHAR *pwsMachine,
  1066. RPC_CHAR *pwsEndpoint )
  1067. {
  1068. HRESULT hr;
  1069. DWORD dwSize;
  1070. // The computer name for the server process:
  1071. if (pEndpoint->wsMachine != pwsMachine)
  1072. {
  1073. RpcpStringCopy(pEndpoint->wsMachine,pwsMachine);
  1074. }
  1075. // The endpoint string (RPC_CHAR) is used as the queue name:
  1076. RpcpStringCopy(pEndpoint->wsQName,pwsEndpoint);
  1077. // Build the path name for the server queue:
  1078. dwSize = sizeof(pEndpoint->wsQPathName);
  1079. ConstructQueuePathName( pEndpoint->wsMachine, // [in]
  1080. pEndpoint->wsQName, // [in]
  1081. pEndpoint->wsQPathName, // [out]
  1082. &dwSize ); // [in,out]
  1083. // Try to create the client process's receive queue (for
  1084. // responses back from the RPC server):
  1085. UuidFromString( CLNT_QTYPE_UUID_STR, &(pEndpoint->uuidQType) );
  1086. dwSize = sizeof(pEndpoint->wsQFormat);
  1087. hr = CreateQueue( NULL, // [in] No security descriptor.
  1088. &(pEndpoint->uuidQType), // [in]
  1089. pEndpoint->wsQPathName, // [in]
  1090. pEndpoint->wsQName, // [in] Use QName as the QLabel.
  1091. 0x00000000, // [in] Flags
  1092. pEndpoint->wsQFormat, // [out]
  1093. &dwSize ); // [in,out]
  1094. if ( (FAILED(hr)) && (hr != MQ_ERROR_QUEUE_EXISTS) )
  1095. {
  1096. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  1097. DPFLTR_WARNING_LEVEL,
  1098. RPCTRANS "ClientSetupQueue(): CreateQueue(): 0x%x\n",
  1099. hr));
  1100. return hr;
  1101. }
  1102. //
  1103. // If the queue already exists, then locate it.
  1104. //
  1105. // NOTE: Currently client queues are temporary, but if cases
  1106. // are added in the future where client queues can be
  1107. // presistent, then this code will be needed.
  1108. //
  1109. if (hr == MQ_ERROR_QUEUE_EXISTS)
  1110. {
  1111. dwSize = sizeof(pEndpoint->wsQFormat);
  1112. hr = MQPathNameToFormatName( pEndpoint->wsQPathName,
  1113. pEndpoint->wsQFormat,
  1114. &dwSize );
  1115. if (FAILED(hr))
  1116. return hr;
  1117. }
  1118. //
  1119. // Ok, open the receive queue:
  1120. //
  1121. hr = MQOpenQueue( pEndpoint->wsQFormat, MQ_RECEIVE_ACCESS, 0, &(pEndpoint->hQueue));
  1122. #if FALSE
  1123. if (!FAILED(hr))
  1124. {
  1125. pEndpoint->fInitialized = TRUE;
  1126. }
  1127. #endif
  1128. #ifdef DBG
  1129. if (FAILED(hr))
  1130. {
  1131. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  1132. DPFLTR_WARNING_LEVEL,
  1133. RPCTRANS "ClientSetupQueue(): MQOpenQueueFailed(): 0x%x\n",
  1134. hr));
  1135. }
  1136. #endif
  1137. return hr;
  1138. }
  1139. //----------------------------------------------------------------
  1140. // ClientCloseQueue()
  1141. //
  1142. //----------------------------------------------------------------
  1143. HRESULT ClientCloseQueue( MQ_DATAGRAM_ENDPOINT *pEndpoint )
  1144. {
  1145. ASSERT(pEndpoint);
  1146. if (pEndpoint->hQueue)
  1147. {
  1148. MQCloseQueue(pEndpoint->hQueue);
  1149. pEndpoint->hQueue = 0;
  1150. }
  1151. g_pQueueMap->Remove(pEndpoint->wsQFormat);
  1152. MQDeleteQueue(pEndpoint->wsQFormat);
  1153. return MQ_OK;
  1154. }
  1155. //----------------------------------------------------------------
  1156. // QueryQM()
  1157. //
  1158. //----------------------------------------------------------------
  1159. HRESULT QueryQM( RPC_CHAR *pwsMachine,
  1160. DWORD *pdwSize )
  1161. {
  1162. DWORD cProps = 0;
  1163. HRESULT hr;
  1164. MQQMPROPS msgProps;
  1165. MSGPROPID aMsgPropID[MAX_RECV_VAR];
  1166. MQPROPVARIANT aMsgPropVar[MAX_RECV_VAR];
  1167. aMsgPropID[cProps] = PROPID_QM_PATHNAME; // 0
  1168. aMsgPropVar[cProps].vt = VT_NULL;
  1169. cProps++;
  1170. ASSERT( cProps < MAX_RECV_VAR );
  1171. msgProps.cProp = cProps;
  1172. msgProps.aPropID = aMsgPropID;
  1173. msgProps.aPropVar = aMsgPropVar;
  1174. msgProps.aStatus = 0;
  1175. // The following receive should always fail, we're just calling
  1176. // it to get the size of the message body:
  1177. hr = MQGetMachineProperties( NULL, NULL, &msgProps );
  1178. if (FAILED(hr))
  1179. {
  1180. return hr;
  1181. }
  1182. RpcpStringCopy(pwsMachine,aMsgPropVar[0].pwszVal);
  1183. MQFreeStringFromProperty(&aMsgPropVar[0]);
  1184. #ifdef MAJOR_DBG
  1185. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  1186. DPFLTR_WARNING_LEVEL,
  1187. RPCTRANS "QueryQM(): wsMachine: %S\n",
  1188. pwsMachine));
  1189. #endif
  1190. return hr;
  1191. }
  1192. //----------------------------------------------------------------
  1193. // ServerSetupQueue()
  1194. //
  1195. // Called by MQ_CreateEndpoint() to create and initialize the
  1196. // server process's message queue (endpoint).
  1197. //
  1198. // pEP -- The struct to hold information about the queue
  1199. // that is being set up.
  1200. //
  1201. // pwsMachine -- The machine to create the queue on. For RPC this
  1202. // is always the machine that the server process is
  1203. // running on.
  1204. //
  1205. // pwsEndpoint - The name of the endpoint. This will be used as
  1206. // the queue name.
  1207. //
  1208. // pSecurityDescriptor -- User may specify a security descriptor
  1209. // to apply to this queue (may be NULL).
  1210. //
  1211. // dwEndpointFlags -- Flags to control queue properties.
  1212. //
  1213. //----------------------------------------------------------------
  1214. HRESULT ServerSetupQueue( MQ_DATAGRAM_ENDPOINT *pEndpoint,
  1215. RPC_CHAR *pwsMachine,
  1216. RPC_CHAR *pwsEndpoint,
  1217. void *pSecurityDescriptor,
  1218. DWORD dwEndpointFlags )
  1219. {
  1220. HRESULT hr;
  1221. DWORD dwSize;
  1222. // The computer name for the server process:
  1223. if (pEndpoint->wsMachine != pwsMachine)
  1224. {
  1225. RpcpStringCopy(pEndpoint->wsMachine,pwsMachine);
  1226. }
  1227. // The endpoint string (RPC_CHAR) is used as the queue name:
  1228. if (pEndpoint->wsQName != pwsEndpoint)
  1229. {
  1230. RpcpStringCopy(pEndpoint->wsQName,pwsEndpoint);
  1231. }
  1232. // Build the path name for the server queue:
  1233. dwSize = sizeof(pEndpoint->wsQPathName);
  1234. if (!ConstructQueuePathName( pEndpoint->wsMachine, // [in]
  1235. pEndpoint->wsQName, // [in]
  1236. pEndpoint->wsQPathName, // [out]
  1237. &dwSize )) // [in,out]
  1238. {
  1239. return MQ_ERROR_ILLEGAL_QUEUE_PATHNAME;
  1240. }
  1241. // Try to create the server process receive queue;
  1242. UuidFromString( SVR_QTYPE_UUID_STR, &(pEndpoint->uuidQType) );
  1243. dwSize = sizeof(pEndpoint->wsQFormat);
  1244. hr = CreateQueue( (SECURITY_DESCRIPTOR*)pSecurityDescriptor,
  1245. &(pEndpoint->uuidQType), // [in]
  1246. pEndpoint->wsQPathName, // [in]
  1247. pEndpoint->wsQName, // [in] Use QName as the QLabel.
  1248. dwEndpointFlags, // [in]
  1249. pEndpoint->wsQFormat, // [out]
  1250. &dwSize ); // [in,out]
  1251. // If the queue already exists, then locate it:
  1252. if (hr == MQ_ERROR_QUEUE_EXISTS)
  1253. {
  1254. dwSize = sizeof(pEndpoint->wsQFormat);
  1255. hr = MQPathNameToFormatName( pEndpoint->wsQPathName,
  1256. pEndpoint->wsQFormat,
  1257. &dwSize );
  1258. if (FAILED(hr))
  1259. {
  1260. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  1261. DPFLTR_WARNING_LEVEL,
  1262. RPCTRANS "ServerSetupQueue(): MQPathNameToFormatName() failed: 0x%x\n",
  1263. hr));
  1264. return hr;
  1265. }
  1266. if ( !(dwEndpointFlags & RPC_C_MQ_USE_EXISTING_SECURITY) )
  1267. {
  1268. hr = SetQueueProperties(pEndpoint->wsQFormat,dwEndpointFlags);
  1269. if (FAILED(hr))
  1270. {
  1271. return hr;
  1272. }
  1273. }
  1274. }
  1275. else if (FAILED(hr))
  1276. {
  1277. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  1278. DPFLTR_WARNING_LEVEL,
  1279. RPCTRANS "ServerSetupQueue(): CreateQueue() failed: 0x%x\n",
  1280. hr));
  1281. return hr;
  1282. }
  1283. //
  1284. // Ok, open the receive queue:
  1285. //
  1286. hr = MQOpenQueue( pEndpoint->wsQFormat, MQ_RECEIVE_ACCESS, 0, &(pEndpoint->hQueue));
  1287. //
  1288. // Does the user want to make sure the queue is empty (in case it
  1289. // was a perminent queue):
  1290. //
  1291. if ( (hr == MQ_OK) && (dwEndpointFlags & RPC_C_MQ_CLEAR_ON_OPEN) )
  1292. {
  1293. hr = ClearQueue(pEndpoint->hQueue);
  1294. }
  1295. #ifdef DBG
  1296. if (FAILED(hr))
  1297. {
  1298. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  1299. DPFLTR_WARNING_LEVEL,
  1300. RPCTRANS "ServerSetupQueue(): MQOpenQueue() failed: 0x%x\n",
  1301. hr));
  1302. }
  1303. #endif
  1304. return hr;
  1305. }
  1306. //----------------------------------------------------------------
  1307. // ServerCloseQueue()
  1308. //
  1309. //----------------------------------------------------------------
  1310. HRESULT ServerCloseQueue( MQ_DATAGRAM_ENDPOINT *pEndpoint )
  1311. {
  1312. ASSERT(pEndpoint);
  1313. if (pEndpoint->hQueue)
  1314. {
  1315. MQCloseQueue(pEndpoint->hQueue);
  1316. pEndpoint->hQueue = 0;
  1317. }
  1318. // MQDeleteQueue(pEndpoint->wsQFormat);
  1319. return MQ_OK;
  1320. }
  1321. //----------------------------------------------------------------
  1322. // AsyncPeekQueue()
  1323. //
  1324. //----------------------------------------------------------------
  1325. HRESULT AsyncPeekQueue( IN MQ_DATAGRAM_ENDPOINT *pEndpoint,
  1326. IN MQ_OVERLAPPED *pOl )
  1327. {
  1328. DWORD cProps = 0;
  1329. HRESULT hr;
  1330. pOl->aMsgPropID[cProps] = PROPID_M_BODY_SIZE;
  1331. pOl->aMsgPropVar[cProps].vt = VT_UI4;
  1332. pOl->aMsgPropVar[cProps].ulVal = 0;
  1333. cProps++;
  1334. ASSERT( cProps < MAX_RECV_VAR );
  1335. pOl->msgProps.cProp = cProps;
  1336. pOl->msgProps.aPropID = pOl->aMsgPropID;
  1337. pOl->msgProps.aPropVar = pOl->aMsgPropVar;
  1338. pOl->msgProps.aStatus = pOl->aStatus;
  1339. hr = MQReceiveMessage( pEndpoint->hQueue,
  1340. INFINITE,
  1341. MQ_ACTION_PEEK_CURRENT,
  1342. &pOl->msgProps,
  1343. &pOl->ol, // Asynchronous.
  1344. NULL, // No callback.
  1345. NULL, // Message filter.
  1346. NULL ); // Transaction object.
  1347. if (FAILED(hr))
  1348. {
  1349. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  1350. DPFLTR_WARNING_LEVEL,
  1351. RPCTRANS "AsyncPeekQueue() failed: 0x%x\n",
  1352. hr));
  1353. }
  1354. return hr;
  1355. }
  1356. //----------------------------------------------------------------
  1357. // AsyncReadQueue()
  1358. //
  1359. //----------------------------------------------------------------
  1360. HRESULT AsyncReadQueue( IN MQ_DATAGRAM_ENDPOINT *pEndpoint,
  1361. IN MQ_OVERLAPPED *pOl,
  1362. OUT MQ_ADDRESS *pAddress,
  1363. OUT UCHAR *pBuffer,
  1364. IN DWORD dwBufferSize )
  1365. {
  1366. DWORD cProps = 0;
  1367. HRESULT hr;
  1368. pOl->aMsgPropID[cProps] = PROPID_M_BODY; // [0]
  1369. pOl->aMsgPropVar[cProps].vt = (VT_UI1 | VT_VECTOR);
  1370. pOl->aMsgPropVar[cProps].caub.cElems = dwBufferSize;
  1371. pOl->aMsgPropVar[cProps].caub.pElems = pBuffer;
  1372. cProps++;
  1373. ASSERT(cProps == I_MESSAGE_SIZE);
  1374. pOl->aMsgPropID[cProps] = PROPID_M_BODY_SIZE; // [1]
  1375. pOl->aMsgPropVar[cProps].vt = VT_UI4;
  1376. cProps++;
  1377. ASSERT(cProps == I_MESSAGE_LABEL);
  1378. pOl->aMsgPropID[cProps] = PROPID_M_LABEL; // [2]
  1379. pOl->aMsgPropVar[cProps].vt = VT_LPWSTR;
  1380. pOl->aMsgPropVar[cProps].pwszVal = (WCHAR *)pAddress->wsMsgLabel;
  1381. cProps++;
  1382. pOl->aMsgPropID[cProps] = PROPID_M_LABEL_LEN; // [3]
  1383. pOl->aMsgPropVar[cProps].vt = VT_UI4;
  1384. pOl->aMsgPropVar[cProps].ulVal = sizeof(pAddress->wsMsgLabel);
  1385. cProps++;
  1386. pOl->aMsgPropID[cProps] = PROPID_M_RESP_QUEUE; // [4]
  1387. pOl->aMsgPropVar[cProps].vt = VT_LPWSTR;
  1388. pOl->aMsgPropVar[cProps].pwszVal = (WCHAR *)pAddress->wsQFormat;
  1389. cProps++;
  1390. pOl->aMsgPropID[cProps] = PROPID_M_RESP_QUEUE_LEN; // [5]
  1391. pOl->aMsgPropVar[cProps].vt = VT_UI4;
  1392. pOl->aMsgPropVar[cProps].ulVal = sizeof(pAddress->wsQFormat);
  1393. cProps++;
  1394. //
  1395. // These message properties are for authentication and privacy:
  1396. //
  1397. ASSERT(cProps == I_AUTHENTICATED);
  1398. pOl->aMsgPropID[cProps] = PROPID_M_AUTHENTICATED; // [6]
  1399. pOl->aMsgPropVar[cProps].vt = VT_UI1;
  1400. pOl->aMsgPropVar[cProps].bVal = 0;
  1401. cProps++;
  1402. ASSERT(cProps == I_PRIVACY_LEVEL);
  1403. pOl->aMsgPropID[cProps] = PROPID_M_PRIV_LEVEL; // [7]
  1404. pOl->aMsgPropVar[cProps].vt = VT_UI4;
  1405. pOl->aMsgPropVar[cProps].ulVal = 0;
  1406. cProps++;
  1407. //
  1408. // WARNING: these always need to be the last two properties
  1409. // in the arrays (see the while loop below):
  1410. //
  1411. pOl->aMsgPropID[cProps] = PROPID_M_SENDERID_TYPE; // [8]
  1412. pOl->aMsgPropVar[cProps].vt = VT_UI4;
  1413. pOl->aMsgPropVar[cProps].ulVal = 0;
  1414. cProps++;
  1415. pOl->aMsgPropID[cProps] = PROPID_M_SENDERID; // [9]
  1416. pOl->aMsgPropVar[cProps].vt = (VT_UI1 | VT_VECTOR);
  1417. pOl->aMsgPropVar[cProps].caub.cElems = sizeof(pAddress->aSidBuffer);
  1418. pOl->aMsgPropVar[cProps].caub.pElems = pAddress->aSidBuffer;
  1419. cProps++;
  1420. ASSERT( cProps < MAX_RECV_VAR );
  1421. pOl->msgProps.cProp = cProps;
  1422. pOl->msgProps.aPropID = pOl->aMsgPropID;
  1423. pOl->msgProps.aPropVar = pOl->aMsgPropVar;
  1424. pOl->msgProps.aStatus = pOl->aStatus;
  1425. hr = MQReceiveMessage( pEndpoint->hQueue,
  1426. INFINITE,
  1427. MQ_ACTION_RECEIVE,
  1428. &pOl->msgProps,
  1429. &pOl->ol, // Asynchronous.
  1430. NULL, // No callback.
  1431. NULL, // Message filter.
  1432. NULL ); // Transaction object.
  1433. #ifdef DBG
  1434. if ( (hr != MQ_OK)
  1435. && (hr != MQ_INFORMATION_OPERATION_PENDING)
  1436. && (hr != MQ_ERROR_BUFFER_OVERFLOW) )
  1437. {
  1438. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  1439. DPFLTR_WARNING_LEVEL,
  1440. RPCTRANS "AsyncReadQueue() failed: 0x%x\n",
  1441. hr));
  1442. }
  1443. #endif
  1444. return hr;
  1445. }
  1446. //----------------------------------------------------------------
  1447. // MQ_SendToQueue()
  1448. //
  1449. // Send a PDU to somebody (specified by pAddress).
  1450. //
  1451. // pEndpoint -- My (endpoint) for responses.
  1452. //
  1453. // pAddress -- The destination queue.
  1454. //
  1455. // pBuffer -- The data PDU to send.
  1456. //
  1457. // dwBufferSize The size of the PDU (bytes).
  1458. //
  1459. //----------------------------------------------------------------
  1460. HRESULT MQ_SendToQueue( IN MQ_DATAGRAM_ENDPOINT *pEndpoint,
  1461. IN MQ_ADDRESS *pAddress,
  1462. IN UCHAR *pBuffer,
  1463. IN DWORD dwBufferSize )
  1464. {
  1465. HRESULT hr;
  1466. DWORD cProps = 0;
  1467. MQMSGPROPS msgProps;
  1468. MSGPROPID aMsgPropID[MAX_SEND_VAR];
  1469. MQPROPVARIANT aMsgPropVar[MAX_SEND_VAR];
  1470. HRESULT aStatus[MAX_SEND_VAR];
  1471. // NOTE: If you add MQ properties to be sent, make sure that
  1472. // MAX_SEND_VAR is large enough...
  1473. // Message body contains the packet being sent:
  1474. aMsgPropID[cProps] = PROPID_M_BODY;
  1475. aMsgPropVar[cProps].vt = (VT_UI1 | VT_VECTOR);
  1476. aMsgPropVar[cProps].caub.cElems = dwBufferSize;
  1477. aMsgPropVar[cProps].caub.pElems = pBuffer;
  1478. cProps++;
  1479. // The size of the packet:
  1480. #if FALSE
  1481. aMsgPropID[cProps] = PROPID_M_BODY_SIZE;
  1482. aMsgPropVar[cProps].vt = VT_UI4;
  1483. aMsgPropVar[cProps].ulVal = dwBufferSize;
  1484. cProps++;
  1485. #endif
  1486. // Pass the sender (me) as the queue label. The queue label
  1487. // holds the my Queue Path Name:
  1488. aMsgPropID[cProps] = PROPID_M_LABEL;
  1489. aMsgPropVar[cProps].vt = VT_LPTSTR;
  1490. aMsgPropVar[cProps].ptszVal = (RPC_SCHAR *)pEndpoint->wsQPathName;
  1491. cProps++;
  1492. // Delivery (express or recoverable):
  1493. aMsgPropID[cProps] = PROPID_M_DELIVERY;
  1494. aMsgPropVar[cProps].vt = VT_UI1;
  1495. aMsgPropVar[cProps].bVal = (unsigned char)(pEndpoint->ulDelivery);
  1496. cProps++;
  1497. // Priority (MQ_MIN_PRIORITY to MQ_MAX_PRIORITY):
  1498. aMsgPropID[cProps] = PROPID_M_PRIORITY;
  1499. aMsgPropVar[cProps].vt = VT_UI1;
  1500. aMsgPropVar[cProps].bVal = (unsigned char)(pEndpoint->ulPriority);
  1501. cProps++;
  1502. // Journaling (none, deadletter or journal):
  1503. aMsgPropID[cProps] = PROPID_M_JOURNAL;
  1504. aMsgPropVar[cProps].vt = VT_UI1;
  1505. aMsgPropVar[cProps].bVal = (unsigned char)(pEndpoint->ulJournaling);
  1506. cProps++;
  1507. // Time limit to reach destination queue (seconds):
  1508. aMsgPropID[cProps] = PROPID_M_TIME_TO_REACH_QUEUE;
  1509. aMsgPropVar[cProps].vt = VT_UI4;
  1510. aMsgPropVar[cProps].ulVal = pEndpoint->ulTimeToReachQueue;
  1511. cProps++;
  1512. // Time limit for call to be received (seconds):
  1513. aMsgPropID[cProps] = PROPID_M_TIME_TO_BE_RECEIVED;
  1514. aMsgPropVar[cProps].vt = VT_UI4;
  1515. aMsgPropVar[cProps].ulVal = pEndpoint->ulTimeToReceive;
  1516. cProps++;
  1517. // Response Queue:
  1518. aMsgPropID[cProps] = PROPID_M_RESP_QUEUE;
  1519. aMsgPropVar[cProps].vt = VT_LPTSTR;
  1520. aMsgPropVar[cProps].ptszVal = (RPC_SCHAR *)pEndpoint->wsQFormat;
  1521. cProps++;
  1522. // Authentication:
  1523. aMsgPropID[cProps] = PROPID_M_AUTH_LEVEL;
  1524. aMsgPropVar[cProps].vt = VT_UI4;
  1525. aMsgPropVar[cProps].ulVal = (pEndpoint->fAuthenticate)? MQMSG_AUTH_LEVEL_ALWAYS : MQMSG_AUTH_LEVEL_NONE;
  1526. cProps++;
  1527. // Encryption:
  1528. aMsgPropID[cProps] = PROPID_M_PRIV_LEVEL;
  1529. aMsgPropVar[cProps].vt = VT_UI4;
  1530. aMsgPropVar[cProps].ulVal = (pEndpoint->fEncrypt)? MQMSG_PRIV_LEVEL_BODY : MQMSG_PRIV_LEVEL_NONE;
  1531. cProps++;
  1532. // Call (message) acknowledgment:
  1533. if (pEndpoint->fAck)
  1534. {
  1535. aMsgPropID[cProps] = PROPID_M_ACKNOWLEDGE;
  1536. aMsgPropVar[cProps].vt = VT_UI1;
  1537. aMsgPropVar[cProps].bVal = MQMSG_ACKNOWLEDGMENT_FULL_REACH_QUEUE;
  1538. cProps++;
  1539. aMsgPropID[cProps] = PROPID_M_ADMIN_QUEUE;
  1540. aMsgPropVar[cProps].vt = VT_LPTSTR;
  1541. aMsgPropVar[cProps].ptszVal = (RPC_SCHAR *)pEndpoint->wsAdminQFormat;
  1542. cProps++;
  1543. }
  1544. ASSERT( cProps < MAX_SEND_VAR );
  1545. msgProps.cProp = cProps;
  1546. msgProps.aPropID = aMsgPropID;
  1547. msgProps.aPropVar = aMsgPropVar;
  1548. msgProps.aStatus = aStatus;
  1549. if ( (!pAddress->hQueue)
  1550. && !(pAddress->hQueue = g_pQueueMap->Lookup(pAddress->wsQFormat)) )
  1551. {
  1552. hr = MQOpenQueue( pAddress->wsQFormat,
  1553. MQ_SEND_ACCESS, 0, &(pAddress->hQueue) );
  1554. if (FAILED(hr))
  1555. {
  1556. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  1557. DPFLTR_WARNING_LEVEL,
  1558. RPCTRANS "MQ_SendToQueue(): MQOpenQueue() failed: 0x%x\n",
  1559. hr));
  1560. return hr;
  1561. }
  1562. if (!g_pQueueMap->Add(pAddress->wsQFormat, pAddress->hQueue))
  1563. {
  1564. return MQ_ERROR_INSUFFICIENT_RESOURCES;
  1565. }
  1566. }
  1567. hr = MQSendMessage( pAddress->hQueue, &msgProps, NULL );
  1568. if ( (!FAILED(hr)) && (pEndpoint->fAck) )
  1569. {
  1570. hr = WaitForAck(pEndpoint);
  1571. if (hr == MQ_ERROR_QUEUE_NOT_FOUND)
  1572. {
  1573. MQCloseQueue(pEndpoint->hQueue);
  1574. pEndpoint->hQueue = 0;
  1575. }
  1576. }
  1577. #ifdef DBG
  1578. if (hr != MQ_OK)
  1579. {
  1580. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  1581. DPFLTR_WARNING_LEVEL,
  1582. RPCTRANS "MQ_SendToQueue(): MQSendMessage() failed: 0x%x\n",
  1583. hr));
  1584. }
  1585. #endif
  1586. return hr;
  1587. }
  1588. //----------------------------------------------------------------
  1589. // ReadQueue()
  1590. //
  1591. // Blocking read of the next message from the queue in pEndpoint.
  1592. // If there is no pending message on the queue, wait around for
  1593. // timeoutMsec.
  1594. //
  1595. // pInfo -- Holds information about the queue that we are
  1596. // doing a read on.
  1597. //
  1598. // timeoutMsec -- How long to wait around if there are no messages
  1599. // pending.
  1600. //
  1601. // pAddress -- Where to replace information about the queue to
  1602. // respond queue (who sent the message).
  1603. //
  1604. // pBuffer -- The MQ message body is returned in the memory
  1605. // pointed to by pBuffer. This is the RPC packet.
  1606. //
  1607. // pdwBufferSize On entry it is passed in as the total size of
  1608. // pBuffer, and it is returned with the actual
  1609. // number of bytes in the message.
  1610. //
  1611. //----------------------------------------------------------------
  1612. HRESULT ReadQueue( IN MQ_DATAGRAM_ENDPOINT *pEndpoint,
  1613. IN DWORD timeoutMsec,
  1614. OUT MQ_ADDRESS *pAddress,
  1615. OUT UCHAR *pBuffer,
  1616. IN OUT DWORD *pdwBufferSize )
  1617. {
  1618. DWORD cProps = 0;
  1619. HRESULT hr;
  1620. MQMSGPROPS msgProps;
  1621. MSGPROPID aMsgPropID[MAX_RECV_VAR];
  1622. MQPROPVARIANT aMsgPropVar[MAX_RECV_VAR];
  1623. HRESULT aStatus[MAX_RECV_VAR];
  1624. aMsgPropID[cProps] = PROPID_M_BODY; // [0]
  1625. aMsgPropVar[cProps].vt = (VT_UI1 | VT_VECTOR);
  1626. aMsgPropVar[cProps].caub.cElems = *pdwBufferSize;
  1627. aMsgPropVar[cProps].caub.pElems = pBuffer;
  1628. cProps++;
  1629. ASSERT(cProps == I_MESSAGE_SIZE);
  1630. aMsgPropID[cProps] = PROPID_M_BODY_SIZE; // [1]
  1631. aMsgPropVar[cProps].vt = VT_UI4;
  1632. cProps++;
  1633. ASSERT(cProps == I_MESSAGE_LABEL);
  1634. aMsgPropID[cProps] = PROPID_M_LABEL; // [2]
  1635. aMsgPropVar[cProps].vt = VT_LPTSTR;
  1636. aMsgPropVar[cProps].ptszVal = (RPC_SCHAR *)pAddress->wsMsgLabel;
  1637. cProps++;
  1638. aMsgPropID[cProps] = PROPID_M_LABEL_LEN; // [3]
  1639. aMsgPropVar[cProps].vt = VT_UI4;
  1640. aMsgPropVar[cProps].ulVal = sizeof(pAddress->wsMsgLabel);
  1641. cProps++;
  1642. aMsgPropID[cProps] = PROPID_M_RESP_QUEUE; // [4]
  1643. aMsgPropVar[cProps].vt = VT_LPTSTR;
  1644. aMsgPropVar[cProps].ptszVal = (RPC_SCHAR *)pAddress->wsQFormat;
  1645. cProps++;
  1646. aMsgPropID[cProps] = PROPID_M_RESP_QUEUE_LEN; // [5]
  1647. aMsgPropVar[cProps].vt = VT_UI4;
  1648. aMsgPropVar[cProps].ulVal = MAX_FORMAT_LEN;
  1649. cProps++;
  1650. //
  1651. // These message properties are for authentication and privacy:
  1652. //
  1653. ASSERT(cProps == I_AUTHENTICATED);
  1654. aMsgPropID[cProps] = PROPID_M_AUTHENTICATED; // [6]
  1655. aMsgPropVar[cProps].vt = VT_UI1;
  1656. aMsgPropVar[cProps].bVal = 0;
  1657. cProps++;
  1658. ASSERT(cProps == I_PRIVACY_LEVEL);
  1659. aMsgPropID[cProps] = PROPID_M_PRIV_LEVEL; // [7]
  1660. aMsgPropVar[cProps].vt = VT_UI4;
  1661. aMsgPropVar[cProps].ulVal = 0;
  1662. cProps++;
  1663. aMsgPropID[cProps] = PROPID_M_SENDERID_TYPE; // [8]
  1664. aMsgPropVar[cProps].vt = VT_UI4;
  1665. aMsgPropVar[cProps].ulVal = 0;
  1666. cProps++;
  1667. aMsgPropID[cProps] = PROPID_M_SENDERID; // [9]
  1668. aMsgPropVar[cProps].vt = (VT_UI1 | VT_VECTOR);
  1669. aMsgPropVar[cProps].caub.cElems = sizeof(pAddress->aSidBuffer);
  1670. aMsgPropVar[cProps].caub.pElems = pAddress->aSidBuffer;
  1671. cProps++;
  1672. ASSERT( cProps < MAX_RECV_VAR );
  1673. msgProps.cProp = cProps;
  1674. msgProps.aPropID = aMsgPropID;
  1675. msgProps.aPropVar = aMsgPropVar;
  1676. msgProps.aStatus = aStatus;
  1677. hr = MQReceiveMessage( pEndpoint->hQueue,
  1678. timeoutMsec,
  1679. MQ_ACTION_RECEIVE,
  1680. &msgProps,
  1681. NULL, // No overlap (synchronous).
  1682. NULL, // No callback.
  1683. NULL, // Message filter.
  1684. NULL ); // Transaction object.
  1685. #ifdef DBG
  1686. if ( (hr != MQ_OK) && (hr != MQ_ERROR_IO_TIMEOUT) )
  1687. {
  1688. DbgPrint("ReadQueue(): ERROR: hr: 0x%x\n",hr);
  1689. }
  1690. #endif
  1691. if (!FAILED(hr))
  1692. {
  1693. pAddress->hQueue = 0;
  1694. *pdwBufferSize = msgProps.aPropVar[I_MESSAGE_SIZE].ulVal;
  1695. }
  1696. return hr;
  1697. }
  1698. //----------------------------------------------------------------
  1699. // PeekQueue()
  1700. //
  1701. // Do a peek on the queue for the specified endpoint in order to
  1702. // find out how big the next message is. If there are no messages
  1703. // in the queue, wait around for dwTimeoutMsec. Return the size
  1704. // of the message in *pdwSize.
  1705. //
  1706. //----------------------------------------------------------------
  1707. HRESULT PeekQueue( IN MQ_DATAGRAM_ENDPOINT *pEndpoint,
  1708. IN DWORD dwTimeoutMsec,
  1709. OUT DWORD *pdwSize )
  1710. {
  1711. DWORD cProps = 0;
  1712. BOOL bSuccess;
  1713. HRESULT hr;
  1714. MQMSGPROPS msgProps;
  1715. MSGPROPID aMsgPropID[MAX_RECV_VAR];
  1716. MQPROPVARIANT aMsgPropVar[MAX_RECV_VAR];
  1717. RPC_CHAR wsMsgLabel[MQ_MAX_MSG_LABEL_LEN];
  1718. aMsgPropID[cProps] = PROPID_M_BODY; // 0
  1719. aMsgPropVar[cProps].vt = (VT_UI1 | VT_VECTOR);
  1720. aMsgPropVar[cProps].caub.cElems = 0;
  1721. aMsgPropVar[cProps].caub.pElems = NULL;
  1722. cProps++;
  1723. aMsgPropID[cProps] = PROPID_M_BODY_SIZE; // 1
  1724. aMsgPropVar[cProps].vt = VT_UI4;
  1725. cProps++;
  1726. aMsgPropID[cProps] = PROPID_M_LABEL; // 2
  1727. aMsgPropVar[cProps].vt = VT_LPTSTR;
  1728. aMsgPropVar[cProps].ptszVal = (RPC_SCHAR *)wsMsgLabel;
  1729. cProps++;
  1730. aMsgPropID[cProps] = PROPID_M_LABEL_LEN; // 3
  1731. aMsgPropVar[cProps].vt = VT_UI4;
  1732. aMsgPropVar[cProps].ulVal = sizeof(wsMsgLabel);
  1733. cProps++;
  1734. ASSERT( cProps < MAX_RECV_VAR );
  1735. msgProps.cProp = cProps;
  1736. msgProps.aPropID = aMsgPropID;
  1737. msgProps.aPropVar = aMsgPropVar;
  1738. msgProps.aStatus = 0;
  1739. // The following receive should always fail, we're just calling
  1740. // it to get the size of the message body:
  1741. hr = MQReceiveMessage( pEndpoint->hQueue,
  1742. dwTimeoutMsec,
  1743. MQ_ACTION_RECEIVE,
  1744. &msgProps,
  1745. NULL, // No overlap (synchronous).
  1746. NULL, // No callback.
  1747. NULL, // Message filter.
  1748. NULL ); // Transaction object.
  1749. if (hr == MQ_ERROR_BUFFER_OVERFLOW)
  1750. {
  1751. *pdwSize = aMsgPropVar[1].ulVal;
  1752. hr = MQ_OK;
  1753. }
  1754. else
  1755. *pdwSize = 0;
  1756. #ifdef DBG
  1757. if ( (hr != MQ_OK) && (hr != MQ_ERROR_IO_TIMEOUT) )
  1758. {
  1759. DbgPrint("ClntPeekQueue(): ERROR: hr: 0x%x (%d)\n",hr,hr);
  1760. }
  1761. #endif
  1762. return hr;
  1763. }
  1764. //----------------------------------------------------------------
  1765. // EvaluateAckMessage()
  1766. //
  1767. //----------------------------------------------------------------
  1768. HRESULT EvaluateAckMessage( IN USHORT msgClass )
  1769. {
  1770. HRESULT hr = msgClass;
  1771. switch (msgClass)
  1772. {
  1773. case MQMSG_CLASS_ACK_REACH_QUEUE:
  1774. case MQMSG_CLASS_ACK_RECEIVE:
  1775. hr = MQ_OK;
  1776. break;
  1777. case MQMSG_CLASS_NACK_BAD_DST_Q:
  1778. hr = MQ_ERROR_QUEUE_NOT_FOUND;
  1779. break;
  1780. // All other cases are handled in MQ_MapStatusCode()...
  1781. }
  1782. return hr;
  1783. }
  1784. //----------------------------------------------------------------
  1785. // ClntWaitForAck()
  1786. //
  1787. // Used by the client side to wait for a MQ acknowledgement when
  1788. // ClntSendToQueue() sends a call. An ACK is sent when the call
  1789. // message reaches the destination (server) queue.
  1790. //
  1791. //----------------------------------------------------------------
  1792. HRESULT WaitForAck( IN MQ_DATAGRAM_ENDPOINT *pEndpoint )
  1793. {
  1794. HRESULT hr;
  1795. DWORD cProps = 0;
  1796. UCHAR msgClass;
  1797. MQMSGPROPS msgProps;
  1798. MSGPROPID aMsgPropID[MAX_RECV_VAR];
  1799. MQPROPVARIANT aMsgPropVar[MAX_RECV_VAR];
  1800. HRESULT aMsgHr[MAX_RECV_VAR];
  1801. RPC_CHAR wsMsgLabel[MQ_MAX_MSG_LABEL_LEN];
  1802. // The message class will tell us the message acknowledgement:
  1803. aMsgPropID[cProps] = PROPID_M_CLASS;
  1804. aMsgPropVar[cProps].vt = VT_UI2;
  1805. aMsgPropVar[cProps].uiVal = 0;
  1806. aMsgHr[cProps] = MQ_OK;
  1807. cProps++;
  1808. ASSERT( cProps < MAX_RECV_VAR );
  1809. msgProps.cProp = cProps;
  1810. msgProps.aPropID = aMsgPropID;
  1811. msgProps.aPropVar = aMsgPropVar;
  1812. msgProps.aStatus = aMsgHr;
  1813. hr = MQReceiveMessage( pEndpoint->hAdminQueue,
  1814. INFINITE,
  1815. MQ_ACTION_RECEIVE,
  1816. &msgProps,
  1817. NULL, NULL, NULL, NULL );
  1818. if (!FAILED(hr))
  1819. {
  1820. hr = EvaluateAckMessage( aMsgPropVar[0].uiVal );
  1821. }
  1822. # ifdef DBG
  1823. else
  1824. {
  1825. DbgPrint("WaitForAck(): FAILED: hr: 0x%x aMsgHr[0]: 0x%x\n", hr, aMsgHr[0] );
  1826. }
  1827. # endif
  1828. return hr;
  1829. }
  1830. //----------------------------------------------------------------
  1831. // SetupAdminQueue()
  1832. //
  1833. //
  1834. //----------------------------------------------------------------
  1835. HRESULT SetupAdminQueue( MQ_DATAGRAM_ENDPOINT *pEndpoint )
  1836. {
  1837. HRESULT hr;
  1838. DWORD dwSize;
  1839. UUID uuidQType;
  1840. RPC_CHAR wsQName[MQ_MAX_Q_NAME_LEN];
  1841. RPC_CHAR wsQPathName[MAX_PATHNAME_LEN];
  1842. RpcpStringCopy(wsQName,TEXT("Admin"));
  1843. RpcpStringCat(wsQName,pEndpoint->wsQName);
  1844. // Build the path name for the admin queue (NOTE: that this
  1845. // is a private queue):
  1846. dwSize = sizeof(pEndpoint->wsQPathName);
  1847. ConstructPrivateQueuePathName( pEndpoint->wsMachine, // [in]
  1848. wsQName, // [in]
  1849. wsQPathName, // [out]
  1850. &dwSize ); // [in,out]
  1851. // Try to create the server process receive queue;
  1852. UuidFromString( CLNT_ADMIN_QTYPE_UUID_STR, &uuidQType );
  1853. dwSize = sizeof(pEndpoint->wsAdminQFormat);
  1854. hr = CreateQueue( NULL, // [in] No security descriptor.
  1855. &uuidQType, // [in]
  1856. wsQPathName, // [in]
  1857. wsQName, // [in] Use QName as the QLabel.
  1858. 0x00000000, // [in] Flags
  1859. pEndpoint->wsAdminQFormat, // [out]
  1860. &dwSize ); // [in,out]
  1861. if ( (FAILED(hr)) && (hr != MQ_ERROR_QUEUE_EXISTS) )
  1862. {
  1863. #ifdef DBG
  1864. DbgPrint("SetupAdminQueue(): %S FAILED: 0x%x (%d)\n", wsQPathName, hr, hr );
  1865. #endif
  1866. return hr;
  1867. }
  1868. //
  1869. // If the queue already exists, then locate it.
  1870. //
  1871. if (hr == MQ_ERROR_QUEUE_EXISTS)
  1872. {
  1873. dwSize = sizeof(pEndpoint->wsQPathName);
  1874. hr = MQPathNameToFormatName( pEndpoint->wsQPathName,
  1875. pEndpoint->wsQFormat,
  1876. &dwSize );
  1877. if (FAILED(hr))
  1878. {
  1879. #ifdef DBG
  1880. DbgPrint("SetupAdminQueue(): %S FAILED: 0x%x (%d)\n", wsQPathName, hr, hr );
  1881. #endif
  1882. return hr;
  1883. }
  1884. }
  1885. //
  1886. // Ok, open the admin queue for receive:
  1887. //
  1888. hr = MQOpenQueue( pEndpoint->wsAdminQFormat,
  1889. MQ_RECEIVE_ACCESS, 0, &(pEndpoint->hAdminQueue));
  1890. #ifdef DBG
  1891. if (FAILED(hr))
  1892. {
  1893. DbgPrint("SetupAdminQueue(): %S FAILED: 0x%x (%d)\n", wsQPathName, hr, hr );
  1894. }
  1895. #endif
  1896. return hr;
  1897. }
  1898. //----------------------------------------------------------------
  1899. // Debug test code -- DG_DbgPrintPacket().
  1900. //----------------------------------------------------------------
  1901. #ifdef MAJOR_DBG
  1902. const static char *packetTypeStrs[] =
  1903. {
  1904. "REQUEST",
  1905. "PING ",
  1906. "RESP ",
  1907. "FAULT ",
  1908. "WORKING",
  1909. "NOCALL ",
  1910. "REJECT ",
  1911. "ACK ",
  1912. "QUIT ",
  1913. "FACK ",
  1914. "QUACK ",
  1915. "Unknown",
  1916. };
  1917. const static char asciiByteChars[] =
  1918. {
  1919. '0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'
  1920. };
  1921. #define HIGH_NIBBLE(uc) ((uc) >> 4)
  1922. #define LOW_NIBBLE(uc) ((uc) & 0x0f)
  1923. //----------------------------------------------------------------
  1924. // DbgPacketType()
  1925. //
  1926. //----------------------------------------------------------------
  1927. static char *DbgPacketType( unsigned char *pPacket )
  1928. {
  1929. if ( (pPacket[1] >= 0) && (pPacket[1] < 11) )
  1930. return packetTypeStrs[pPacket[1]];
  1931. else
  1932. return packetTypeStrs[11];
  1933. }
  1934. //----------------------------------------------------------------
  1935. // DbgUuidToStr()
  1936. //
  1937. //----------------------------------------------------------------
  1938. static char *DbgUuidToStr( unsigned char *pUuidArg, char *pszUuid )
  1939. {
  1940. int i = 0;
  1941. int j;
  1942. GUID uuid;
  1943. unsigned char *pUuid;
  1944. // Work with local copy of the UUID:
  1945. pUuid = (unsigned char*)&uuid;
  1946. CopyMemory(pUuid,pUuidArg,sizeof(GUID));
  1947. // Assume this is intel and byte-swap it...
  1948. uuid.Data1 = RpcpByteSwapLong(uuid.Data1);
  1949. uuid.Data2 = RpcpByteSwapShort(uuid.Data2);
  1950. uuid.Data3 = RpcpByteSwapShort(uuid.Data3);
  1951. for (j=0; j<16; j++)
  1952. {
  1953. pszUuid[i++] = asciiByteChars[HIGH_NIBBLE(pUuid[j])];
  1954. pszUuid[i++] = asciiByteChars[LOW_NIBBLE(pUuid[j])];
  1955. if ( (j==3)||(j==5)||(j==7)||(j==9) )
  1956. pszUuid[i++] = '-';
  1957. }
  1958. pszUuid[i] = '\0';
  1959. return pszUuid;
  1960. }
  1961. //----------------------------------------------------------------
  1962. // DbgPrintPacket()
  1963. //
  1964. //----------------------------------------------------------------
  1965. void DG_DbgPrintPacket( unsigned char *pPacket )
  1966. {
  1967. char szIf[50];
  1968. char szAct[50];
  1969. ULONG ulSequenceNumber;
  1970. if (pPacket)
  1971. {
  1972. ulSequenceNumber = *((unsigned long*)(&(pPacket[56])));
  1973. ulSequenceNumber = RpcpByteSwapLong(ulSequenceNumber);
  1974. DbgPrint(" Type: %s:0x%x:0x%x:0x%x:0x%x\n Intferface: %s\n Activity : %s\n SequenceNumber: %d\n",
  1975. DbgPacketType(pPacket),
  1976. pPacket[0], pPacket[1], pPacket[2], pPacket[3],
  1977. DbgUuidToStr(&(pPacket[24]),szIf),
  1978. DbgUuidToStr(&(pPacket[40]),szAct),
  1979. ulSequenceNumber );
  1980. }
  1981. else
  1982. {
  1983. DbgPrint(" NULL Packet.\n" );
  1984. }
  1985. }
  1986. #endif