Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1086 lines
31 KiB

  1. /*++
  2. Copyright (C) Microsoft Corporation, 1996 - 1999
  3. Module Name:
  4. Complete.cxx
  5. Abstract:
  6. The place that IO completes
  7. Author:
  8. Mario Goertzel [MarioGo]
  9. Revision History:
  10. MarioGo 3/19/1996 Bits 'n pieces
  11. MarioGo 10/25/1996 Async RPC
  12. --*/
  13. #include <precomp.hxx>
  14. #include <trans.hxx>
  15. #include <cotrans.hxx>
  16. HANDLE RpcCompletionPort = 0;
  17. HANDLE InactiveRpcCompletionPort = 0;
  18. HANDLE *RpcCompletionPorts;
  19. long *CompletionPortHandleLoads;
  20. BASE_ADDRESS *AddressList = 0;
  21. HANDLE g_NotificationHandle = 0;
  22. LONG g_ListeningForPNPNotifications = 0;
  23. LONG g_NotifyRt = 0;
  24. OVERLAPPED g_Overlapped;
  25. CRITICAL_SECTION AddressListLock;
  26. RPC_STATUS
  27. RPC_ENTRY
  28. COMMON_PostNonIoEvent(
  29. RPC_TRANSPORT_EVENT Event,
  30. DWORD Type,
  31. PVOID Context
  32. )
  33. {
  34. BOOL b;
  35. int i = 5;
  36. ASSERT(Event != TRANSPORT_POSTED_KEY);
  37. do
  38. {
  39. // Kick a listening thread
  40. b = PostQueuedCompletionStatus(RpcCompletionPort,
  41. Type,
  42. Event,
  43. (LPOVERLAPPED)Context
  44. );
  45. if (b)
  46. {
  47. break;
  48. }
  49. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  50. DPFLTR_WARNING_LEVEL,
  51. RPCTRANS "PostQueuedCompleitonStatus failed %d\n",
  52. GetLastError()));
  53. Sleep(100);
  54. i--;
  55. }
  56. while(i);
  57. //
  58. // If this has failed we are out of luck unless something else manages
  59. // to wake up the listen thread.
  60. //
  61. // As of 4/19/96 PostQueuedCompletionStatus will only fail if the handle
  62. // is invalid or the kernel is unable to allocate a small bit of non-paged
  63. // pool. Either way we're toast...
  64. //
  65. ASSERT(b);
  66. if (!b)
  67. {
  68. return(RPC_S_OUT_OF_RESOURCES);
  69. }
  70. return(RPC_S_OK);
  71. }
  72. RPC_STATUS
  73. RPC_ENTRY
  74. COMMON_PostRuntimeEvent(
  75. IN DWORD Type,
  76. IN PVOID Context
  77. )
  78. /*++
  79. Routine Description:
  80. Posts an event to the completion port. This will complete
  81. with an event type of RuntimePosted, event status RPC_S_OK
  82. and event context of Context.
  83. Arguments:
  84. Context - Context associated with the event
  85. Return Value:
  86. RPC_S_OK
  87. RPC_S_OUT_OF_RESOURCES
  88. RPC_S_OUT_OF_MEMORY
  89. --*/
  90. {
  91. return(COMMON_PostNonIoEvent(RuntimePosted, Type, Context));
  92. }
  93. void
  94. COMMON_AddressManager(
  95. BASE_ADDRESS *pAddress
  96. )
  97. /*++
  98. Routine Description:
  99. When an address does not have an outstanding connect/accept/recv for some
  100. reason it is added to the AddressList global list of address objects. Listen
  101. threads will try to submit a listen on these as time passed. New addresses
  102. are put onto this list when they are ready to start listening.
  103. Arguments:
  104. pAddress - An address without an outstanding listen.
  105. Return Value:
  106. None
  107. --*/
  108. {
  109. EnterCriticalSection(&AddressListLock);
  110. if (pAddress->InAddressList == NotInList)
  111. {
  112. #if DBG
  113. // The address should not be in the list.
  114. BASE_ADDRESS *pT = AddressList;
  115. while(pT)
  116. {
  117. ASSERT(pT != pAddress);
  118. pT = pT->pNext;
  119. }
  120. #endif
  121. pAddress->pNext = AddressList;
  122. AddressList = pAddress;
  123. pAddress->InAddressList = InTheList;
  124. }
  125. LeaveCriticalSection(&AddressListLock);
  126. }
  127. void RPC_ENTRY
  128. COMMON_ServerCompleteListen(
  129. IN RPC_TRANSPORT_ADDRESS ThisAddress
  130. )
  131. /*++
  132. Routine Description:
  133. Called on an address once the runtime is really ready to start
  134. processing connections on this address.
  135. Arguments:
  136. Address - A fully initalized address which the runtime is
  137. ready to start receiving connection on.
  138. Return Value:
  139. None
  140. --*/
  141. {
  142. BASE_ADDRESS *pList = (BASE_ADDRESS *) ThisAddress;
  143. while(pList)
  144. {
  145. COMMON_AddressManager(pList);
  146. pList = pList->pNextAddress;
  147. }
  148. COMMON_ListenForPNPNotifications();
  149. // The TRANSPORT message indicates that a new
  150. // address has been added to the AddressList.
  151. COMMON_PostNonIoEvent(TRANSPORT, 0, 0);
  152. return;
  153. }
  154. RPC_STATUS RPC_ENTRY
  155. COMMON_PrepareNewHandle(HANDLE hAdd)
  156. /*++
  157. Routine Description:
  158. Generic wrapper used to add a newly create IO handle to
  159. to the IO completion port.
  160. Arguments:
  161. hAdd - The handle to be added to the port.
  162. Return Value:
  163. RPC_S_OK
  164. RPC_S_OUT_OF_MEMORY
  165. --*/
  166. {
  167. HANDLE h = CreateIoCompletionPort(hAdd,
  168. RpcCompletionPort,
  169. TRANSPORT_POSTED_KEY,
  170. 0);
  171. if (h)
  172. {
  173. ASSERT(h == RpcCompletionPort);
  174. return(RPC_S_OK);
  175. }
  176. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  177. DPFLTR_WARNING_LEVEL,
  178. RPCTRANS "CreateIoCompletionPort failed %d\n",
  179. GetLastError()));
  180. ASSERT(GetLastError() == ERROR_NO_SYSTEM_RESOURCES);
  181. return(RPC_S_OUT_OF_MEMORY);
  182. }
  183. void
  184. COMMON_RemoveAddress (
  185. IN BASE_ADDRESS *Address
  186. )
  187. /*++
  188. Function Name:COMMON_RemoveAddress
  189. Parameters:
  190. Description:
  191. This function must be called only when AddressListLock is held
  192. Remove address from the address manager list
  193. Returns:
  194. --*/
  195. {
  196. Address->InAddressList = Inactive;
  197. //
  198. // Close the sockets in the address
  199. //
  200. if (Address->type & DATAGRAM)
  201. {
  202. DG_DeactivateAddress((WS_DATAGRAM_ENDPOINT *) Address);
  203. }
  204. else
  205. {
  206. WS_DeactivateAddress((WS_ADDRESS *) Address);
  207. }
  208. }
  209. VOID
  210. RPC_ENTRY
  211. COMMON_StartPnpNotifications (
  212. )
  213. {
  214. ASSERT(RpcCompletionPort);
  215. g_NotifyRt = TRUE;
  216. COMMON_ListenForPNPNotifications();
  217. }
  218. VOID
  219. RPC_ENTRY
  220. COMMON_ListenForPNPNotifications (
  221. )
  222. /*++
  223. Function Name:COMMON_ListenForPNPNotifications
  224. Parameters:
  225. Description:
  226. Returns:
  227. --*/
  228. {
  229. int retval;
  230. HANDLE h;
  231. if (hWinsock2 == 0)
  232. {
  233. //
  234. // Winsock not loaded, don't need to do any PNP stuff
  235. //
  236. return;
  237. }
  238. if (InterlockedIncrement(&g_ListeningForPNPNotifications) != 1)
  239. {
  240. return;
  241. }
  242. // REVIEW: We may need to provide a mechanism to prevent spinning for lack of
  243. // resources
  244. if (g_NotificationHandle == 0)
  245. {
  246. retval = WSAProviderConfigChange(
  247. &g_NotificationHandle,
  248. 0, 0);
  249. if (retval != 0 || g_NotificationHandle == 0)
  250. {
  251. if (g_NotificationHandle)
  252. CloseHandle(g_NotificationHandle);
  253. goto Cleanup;
  254. }
  255. h = CreateIoCompletionPort(g_NotificationHandle,
  256. RpcCompletionPort,
  257. NewAddress,
  258. 0);
  259. if (h == 0)
  260. {
  261. CloseHandle(g_NotificationHandle);
  262. goto Cleanup;
  263. }
  264. else
  265. {
  266. ASSERT(h == RpcCompletionPort);
  267. }
  268. }
  269. // if the previous request is still there, we don't want to submit another one
  270. if (g_Overlapped.Internal != STATUS_PENDING)
  271. {
  272. g_Overlapped.hEvent = 0;
  273. g_Overlapped.Offset = 0;
  274. g_Overlapped.OffsetHigh = 0;
  275. retval = WSAProviderConfigChange(
  276. &g_NotificationHandle,
  277. &g_Overlapped,
  278. 0);
  279. if (retval != 0)
  280. {
  281. if (GetLastError() != WSA_IO_PENDING)
  282. {
  283. CloseHandle(g_NotificationHandle);
  284. goto Cleanup;
  285. }
  286. }
  287. }
  288. if (!TransportProtocol::ResubmitQueriesIfNecessary())
  289. {
  290. CloseHandle(g_NotificationHandle);
  291. goto Cleanup;
  292. }
  293. g_ListeningForPNPNotifications = 2;
  294. return;
  295. Cleanup:
  296. g_ListeningForPNPNotifications = 0;
  297. g_NotificationHandle = 0;
  298. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  299. DPFLTR_WARNING_LEVEL,
  300. RPCTRANS "COMMON_ListenForPNPNotifications failed\n"));
  301. }
  302. RPC_STATUS
  303. RPC_ENTRY
  304. COMMON_ProcessCalls(
  305. IN INT Timeout,
  306. OUT RPC_TRANSPORT_EVENT *pEvent,
  307. OUT RPC_STATUS *pEventStatus,
  308. IN OUT PVOID *ppEventContext,
  309. OUT UINT *pBufferLength,
  310. OUT BUFFER *pBuffer,
  311. OUT PVOID *ppSourceContext)
  312. /*++
  313. Routine Description:
  314. This routine waits for any async IO to complete for all protocols
  315. within a transport DLL. It maybe called by multiple threads at a
  316. time. A minimum of one thread should always be calling this function
  317. for each DLL.
  318. Note: async clients with no outstanding IO may allow the
  319. last thread to timeout and only call this function again
  320. when a new call is started.
  321. Note: During calls to this API in connection oriented servers
  322. a callback to I_RpcTransServerNewConnection() may occur.
  323. Arguments:
  324. Timeout - -1 - infinite
  325. other - number of milliseconds to wait for IO
  326. pEvent - Set on return to the type of IO event which finished.
  327. pEventStatus - The status of the IO event
  328. ppEventContext - On IN, the handle that the thread should dequeue on.
  329. On output the context of the event
  330. pBufferLength - If the event is successful then the number of
  331. bytes transferred.
  332. pBuffer - If the even is successful then the buffer associated
  333. with the IO.
  334. ppSourceContext - For datagram recvs this is the address
  335. of the sender.
  336. For connection sends this is the SendContext associated
  337. with the IO. For connection recvs it is NULL.
  338. Return Value:
  339. RPC_S_OK - IO completed, see pEventStatus.
  340. RPC_P_TIMEOUT - only if Timeout != INFINITE and is exceeded.
  341. --*/
  342. {
  343. BOOL b;
  344. ULONG_PTR key;
  345. DWORD bytes;
  346. RPC_STATUS status;
  347. LPOVERLAPPED lpOverlapped;
  348. PBASE_OVERLAPPED pBaseOverlapped;
  349. PREQUEST pRequest;
  350. PCONNECTION pConnection;
  351. PADDRESS pAddress;
  352. INT LocalTimeout;
  353. HANDLE hCompletionPortHandle = (HANDLE) *ppEventContext;
  354. DWORD LastError;
  355. ASSERT(RpcCompletionPort);
  356. *pEvent = 0;
  357. *pBuffer = 0;
  358. for(;;)
  359. {
  360. //
  361. // Do general house keeping work here. If it appears that more
  362. // house keeping work will be required in the future make
  363. // sure to reduce the LocalTimeout to something < INFINITE.
  364. //
  365. LocalTimeout = Timeout;
  366. // House keeping - look for any non-listening addresses and see if we
  367. // can make them listen now. Addresses start in this list and are added
  368. // back into the list if they are unable to submit a listen for some reason.
  369. if (AddressList)
  370. {
  371. EnterCriticalSection(&AddressListLock);
  372. if (AddressList)
  373. {
  374. pAddress = (PADDRESS)AddressList;
  375. AddressList = 0;
  376. if (Timeout == INFINITE)
  377. {
  378. // We want to wake up again soon and recheck the AddressList.
  379. LocalTimeout = 7*1000;
  380. }
  381. }
  382. else
  383. {
  384. pAddress = 0;
  385. }
  386. LeaveCriticalSection(&AddressListLock);
  387. while(pAddress)
  388. {
  389. PADDRESS pNext = (PADDRESS)pAddress->pNext;
  390. pAddress->pNext = 0;
  391. if (pAddress->InAddressList == InTheList)
  392. {
  393. pAddress->InAddressList = NotInList;
  394. pAddress->SubmitListen(pAddress);
  395. }
  396. pAddress = pNext;
  397. }
  398. }
  399. if (!g_ListeningForPNPNotifications)
  400. {
  401. COMMON_ListenForPNPNotifications();
  402. }
  403. //
  404. // The good part! Wait for something to happen...
  405. //
  406. b = GetQueuedCompletionStatus(hCompletionPortHandle,
  407. &bytes,
  408. &key,
  409. &lpOverlapped,
  410. LocalTimeout
  411. );
  412. if (!b && !lpOverlapped)
  413. {
  414. // If lpOverlapped is NULL this mean no IO completed.
  415. if ((status = GetLastError()) == STATUS_TIMEOUT)
  416. {
  417. if (Timeout == INFINITE)
  418. {
  419. continue;
  420. }
  421. return(RPC_P_TIMEOUT);
  422. }
  423. else
  424. {
  425. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  426. DPFLTR_WARNING_LEVEL,
  427. RPCTRANS "GetQueuedCompletionStatus failed %d\n",
  428. status));
  429. ASSERT(0);
  430. Sleep(1); // Avoid burning all the CPU in case we are hosed.
  431. continue;
  432. }
  433. }
  434. //PrintToDebugger("A request arrived at the completion port\n");
  435. if (key != TRANSPORT_POSTED_KEY)
  436. {
  437. if (b)
  438. {
  439. // Internal Non-IO posted event
  440. // Key - The type of event
  441. // lpOverlapped - The context associated with the event
  442. ASSERT( key == RuntimePosted
  443. || key == TRANSPORT
  444. || key == NewAddress);
  445. // RuntimePosted events allowed the RPC runtime to wake
  446. // a listening thread with an atbitrary context.
  447. if (key == RuntimePosted)
  448. {
  449. *pEvent = RuntimePosted;
  450. *pEventStatus = RPC_S_OK;
  451. *ppEventContext = lpOverlapped;
  452. *pBufferLength = bytes;
  453. return(RPC_S_OK);
  454. }
  455. //
  456. // A protocol was just loaded or unloaded. Take care of it
  457. //
  458. if (key == NewAddress)
  459. {
  460. if (TransportProtocol::HandlePnPStateChange())
  461. {
  462. g_ListeningForPNPNotifications = 0;
  463. *pEvent = NewAddress;
  464. return(RPC_S_OK);
  465. }
  466. // REVIEW: Not processing notification handling failures
  467. // may create problems where new protocols, or unloading of
  468. // old ones are ignored. This is not very bad, so we keep
  469. // it simple and ignore it.
  470. g_ListeningForPNPNotifications = 0;
  471. continue;
  472. }
  473. // TRANSPORT event is posted when a new address
  474. // has been added to the AddressListen. Simply continue
  475. // around the loop.
  476. ASSERT(bytes == 0);
  477. ASSERT(lpOverlapped == 0);
  478. }
  479. else
  480. {
  481. if (key == NewAddress)
  482. {
  483. g_ListeningForPNPNotifications = 0;
  484. }
  485. }
  486. continue;
  487. }
  488. ASSERT(!b || lpOverlapped);
  489. status = RPC_S_OK;
  490. if (!b)
  491. {
  492. pBaseOverlapped = FindOverlapped(lpOverlapped);
  493. pRequest = FindRequest(lpOverlapped);
  494. LastError = GetLastError();
  495. if (( pRequest->type & ADDRESS)
  496. && (LastError != ERROR_MORE_DATA))
  497. {
  498. VALIDATE(GetLastError())
  499. {
  500. ERROR_NETNAME_DELETED,
  501. ERROR_BAD_NETPATH,
  502. ERROR_NO_SYSTEM_RESOURCES,
  503. ERROR_SEM_TIMEOUT,
  504. ERROR_OPERATION_ABORTED,
  505. ERROR_HOST_UNREACHABLE,
  506. ERROR_NETWORK_UNREACHABLE,
  507. ERROR_UNEXP_NET_ERR,
  508. ERROR_NOT_ENOUGH_QUOTA,
  509. ERROR_BROKEN_PIPE,
  510. ERROR_CONNECTION_ABORTED
  511. } END_VALIDATE;
  512. COMMON_AddressManager((BASE_ADDRESS *)pRequest);
  513. continue;
  514. }
  515. switch (LastError)
  516. {
  517. case ERROR_MORE_DATA:
  518. {
  519. // Normal parital read of a connection request
  520. // or an oversized datagram. This is ok, falls
  521. // into the normal path.
  522. status = RPC_P_OVERSIZE_PACKET;
  523. break;
  524. }
  525. case ERROR_INVALID_HANDLE:
  526. // Named pipes allows a close to reach the server before
  527. // the read. When this happens the server rejects the read
  528. // with an invalid handle error.
  529. ASSERT(pRequest->id == NMP);
  530. ASSERT(pRequest->fAborted);
  531. // Fall into normal close case.
  532. case ERROR_NETNAME_DELETED:
  533. case ERROR_BROKEN_PIPE:
  534. case ERROR_PIPE_NOT_CONNECTED:
  535. case ERROR_NO_DATA:
  536. case ERROR_SEM_TIMEOUT:
  537. case ERROR_GRACEFUL_DISCONNECT:
  538. case WSAECONNRESET:
  539. case WSAESHUTDOWN:
  540. case WSAECONNABORTED:
  541. case WSAEHOSTDOWN:
  542. case ERROR_CONNECTION_ABORTED:
  543. {
  544. bytes = 0;
  545. ASSERT((pRequest->type & PROTO_MASK) == CONNECTION);
  546. // Will be handled as a close
  547. break;
  548. }
  549. case ERROR_NO_SYSTEM_RESOURCES:
  550. {
  551. //
  552. // This is just like the errors above except that both c/o and datagram requests
  553. // can generate it.
  554. //
  555. if ((pRequest->type & PROTO_MASK) == CONNECTION)
  556. {
  557. bytes = 0;
  558. // Will be handled as a close
  559. }
  560. else
  561. {
  562. bytes = 0;
  563. status = ERROR_OPERATION_ABORTED;
  564. }
  565. break;
  566. }
  567. case ERROR_OPERATION_ABORTED:
  568. {
  569. //
  570. // When a thread that issued an I/O dies the operation
  571. // completes with this error.
  572. // There are a couple cases here:
  573. // 1) The IO is datagram in which case we can just
  574. // reissue the I/O on this thread. In an idle
  575. // server eventually all DG I/O will migrate to
  576. // the single listening thread.
  577. // 2) The IO is on a client connection and the
  578. // the client thread has died. In this case
  579. // we need to abort the connection and return
  580. // to the runtime.
  581. // 3) If this happens on an address we have a bug.
  582. // 4) If this happens on a server connection we have a bug.
  583. //
  584. if (pRequest->type & DATAGRAM)
  585. {
  586. // We deal with this in the normal datagram path
  587. ASSERT(bytes == 0);
  588. status = ERROR_OPERATION_ABORTED;
  589. break;
  590. }
  591. ASSERT((pRequest->type & PROTO_MASK) == CONNECTION);
  592. // zero out the bytes just in case. Sometimes network operations
  593. // return positive byte count on operation aborted
  594. bytes = 0;
  595. // We'll treat this as a connection close on the client.
  596. // REVIEW: Maybe do something better.
  597. break;
  598. }
  599. case ERROR_NETWORK_UNREACHABLE:
  600. case ERROR_HOST_UNREACHABLE:
  601. case ERROR_PORT_UNREACHABLE:
  602. //
  603. // errors coming from ICMP packets to our UDP endpoint.
  604. // Winsock does not present this in a way our async architecture
  605. // can use, so ignore them.
  606. //
  607. if ((pRequest->type & PROTO_MASK) == CONNECTION)
  608. {
  609. bytes = 0;
  610. // Will be handled as a close
  611. }
  612. else
  613. {
  614. status = ERROR_OPERATION_ABORTED;
  615. }
  616. break;
  617. default:
  618. {
  619. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  620. DPFLTR_WARNING_LEVEL,
  621. RPCTRANS "IO failed %lX %d\n",
  622. pRequest,
  623. GetLastError()));
  624. ASSERT(0);
  625. status = RPC_S_OUT_OF_RESOURCES;
  626. // treat as a close
  627. bytes = 0;
  628. break;
  629. }
  630. }
  631. }
  632. // here we actually have a completed IO
  633. pBaseOverlapped = FindOverlapped(lpOverlapped);
  634. pRequest = FindRequest(lpOverlapped);
  635. switch(pRequest->type & PROTO_MASK)
  636. {
  637. case CONNECTION:
  638. //
  639. // Connection IO completed.
  640. //
  641. I_RpcTransUnprotectThread(pBaseOverlapped->thread);
  642. pConnection = (PCONNECTION)pRequest;
  643. // A read or write either completed or failed
  644. *ppEventContext = pConnection;
  645. if (pBaseOverlapped == &pConnection->Read)
  646. {
  647. // Read completed
  648. *ppSourceContext = UlongToPtr(bytes);
  649. if (bytes == 0)
  650. {
  651. *pEvent = pConnection->type | RECEIVE;
  652. pConnection->Abort();
  653. *pEventStatus = RPC_P_CONNECTION_SHUTDOWN;
  654. return(RPC_S_OK);
  655. }
  656. status = pConnection->ProcessRead(bytes,
  657. pBuffer,
  658. pBufferLength);
  659. // N.B. Do not move the reading of the pConnection->type
  660. // before ProcessRead. ProcessRead can change the type based
  661. // on what it reads
  662. *pEvent = pConnection->type | RECEIVE;
  663. if (status != RPC_P_PARTIAL_RECEIVE)
  664. {
  665. ASSERT( status == RPC_P_RECEIVE_FAILED
  666. || status == RPC_S_OK
  667. || status == RPC_P_PACKET_CONSUMED);
  668. *pEventStatus = status;
  669. return(RPC_S_OK);
  670. }
  671. // Message is not complete, submit the next read and continue.
  672. status = CO_SubmitRead(pConnection);
  673. if (status != RPC_S_OK)
  674. {
  675. ASSERT(status == RPC_P_RECEIVE_FAILED);
  676. *pEventStatus = status;
  677. return(RPC_S_OK);
  678. }
  679. }
  680. else
  681. {
  682. // Write completed
  683. CO_SEND_CONTEXT *pSend = (CO_SEND_CONTEXT *)pBaseOverlapped;
  684. ASSERT(pSend->Write.pAsyncObject == pConnection);
  685. *pEvent = pConnection->type | SEND;
  686. *ppSourceContext = pSend;
  687. *pBuffer = pSend->pWriteBuffer;
  688. if (bytes == 0)
  689. {
  690. pConnection->Abort();
  691. *pEventStatus = RPC_P_SEND_FAILED;
  692. *pBufferLength = 0;
  693. }
  694. else
  695. {
  696. status = RPC_S_OK;
  697. *pEventStatus = status;
  698. *pBufferLength = pSend->maxWriteBuffer;
  699. // Netbios client-side writes are sizeof(DWORD) too big since
  700. // they also include the sequence number.
  701. ASSERT( bytes == pSend->maxWriteBuffer
  702. || ( (bytes == pSend->maxWriteBuffer + sizeof(DWORD))
  703. && ((pConnection->type & TYPE_MASK) == CLIENT) ) );
  704. }
  705. return(RPC_S_OK);
  706. }
  707. break;
  708. case ADDRESS:
  709. {
  710. // ASSERT(bytes == 0);
  711. pAddress = (PADDRESS)pRequest;
  712. PCONNECTION pNewConnection = 0;
  713. status = pAddress->NewConnection(pAddress, &pNewConnection);
  714. if (RPC_S_OK == status)
  715. {
  716. // Opened a connection, now try to submit the first recv.
  717. ASSERT(pNewConnection);
  718. RPC_CONNECTION_TRANSPORT *pInfo;
  719. pInfo = (RPC_CONNECTION_TRANSPORT *)TransportTable[pAddress->id].pInfo;
  720. ASSERT(pInfo->Recv);
  721. status = (pInfo->Recv)(pNewConnection);
  722. if (RPC_S_OK != status)
  723. {
  724. ASSERT(status == RPC_P_RECEIVE_FAILED);
  725. *pEvent = pNewConnection->type | RECEIVE;
  726. *ppEventContext = pNewConnection;
  727. *pEventStatus = status;
  728. return(RPC_S_OK);
  729. }
  730. }
  731. // Connection has been established or closed, either
  732. // way we can continue around the loop.
  733. }
  734. break;
  735. case DATAGRAM:
  736. {
  737. BASE_ASYNC_OBJECT *pBase = (BASE_ASYNC_OBJECT*)pRequest;
  738. #ifdef NCADG_MQ_ON
  739. if (pBase->id == MSMQ)
  740. {
  741. // MSMQ (Falcon) datagram path:
  742. MQ_DATAGRAM *pDatagram = (MQ_DATAGRAM*)pRequest;
  743. MQ_DATAGRAM_ENDPOINT *pEndpoint = (MQ_DATAGRAM_ENDPOINT*)pDatagram->pEndpoint;
  744. if (status == RPC_P_OVERSIZE_PACKET)
  745. {
  746. // Data still pending, get it:
  747. status = MQ_ResizePacket( pEndpoint,
  748. (void**)&pDatagram->pAddress,
  749. (unsigned int*)pBufferLength,
  750. pBuffer );
  751. }
  752. if (status == RPC_S_OK)
  753. {
  754. MQ_FillInAddress(pDatagram->pAddress,pDatagram->Read.aMsgPropVar);
  755. *pEvent = pDatagram->type;
  756. *pEventStatus = status;
  757. *ppEventContext = pEndpoint;
  758. // WATCH OUT! MSMQ doesn't return the size in "bytes"
  759. // from GetQueuedCompletionStatus() like everything
  760. // else does! We need to extract the #bytes from the
  761. // message structure.
  762. //
  763. // DON'T: *pBufferLength = bytes;
  764. *pBufferLength = pDatagram->Read.aMsgPropVar[1].ulVal;
  765. *pBuffer = (BUFFER)pDatagram->pPacket;
  766. *ppSourceContext = pDatagram->pAddress;
  767. pDatagram->pPacket = 0;
  768. pDatagram->dwPacketSize = 0;
  769. }
  770. pDatagram->Busy = 0;
  771. LONG c = InterlockedDecrement(&pEndpoint->cPendingIos);
  772. ASSERT(c >= 0);
  773. if (c == 0)
  774. {
  775. // No pending receives, time to post more. This doesn't
  776. // get hit very often, normally additional recieves are
  777. // after sending a packet. (see DG_SendPacket)
  778. MQ_SubmitReceives(pEndpoint);
  779. }
  780. if (status == RPC_S_OK)
  781. {
  782. return RPC_S_OK;
  783. }
  784. }
  785. else
  786. #endif
  787. {
  788. // Normal datagram path:
  789. WS_DATAGRAM *pDatagram = (WS_DATAGRAM *)pRequest;
  790. WS_DATAGRAM_ENDPOINT *pEndpoint = (WS_DATAGRAM_ENDPOINT*)pDatagram->pEndpoint;
  791. if (status == RPC_P_OVERSIZE_PACKET)
  792. {
  793. ASSERT(bytes == pDatagram->Packet.len);
  794. }
  795. if ( status == RPC_S_OK
  796. || status == RPC_P_OVERSIZE_PACKET)
  797. {
  798. // A receive completed
  799. ASSERT(bytes);
  800. *pEvent = pDatagram->type;
  801. *pEventStatus = status;
  802. *ppEventContext = pEndpoint;
  803. *pBufferLength = bytes;
  804. *pBuffer = (BUFFER)pDatagram->Packet.buf;
  805. *ppSourceContext = pDatagram->AddressPair;
  806. ASSERT( pDatagram->Packet.buf );
  807. // Ready the datagram for another IO operation.
  808. pDatagram->Packet.buf = 0;
  809. status = RPC_S_OK;
  810. }
  811. #if DBG
  812. if (status != RPC_S_OK &&
  813. status != ERROR_OPERATION_ABORTED)
  814. {
  815. DbgPrint("RPC: I/O completed with 0x%x\n", status);
  816. ASSERT( 0 );
  817. }
  818. #endif
  819. // Do not touch the datagram after this!
  820. pDatagram->Busy = 0;
  821. LONG c = InterlockedDecrement(&pEndpoint->cPendingIos);
  822. ASSERT(c >= 0);
  823. if (c == 0)
  824. {
  825. // No pending receives, time to post more. This doesn't
  826. // get hit very often, normally additional recieves are
  827. // after sending a packet. (see DG_SendPacket)
  828. DG_SubmitReceives(pEndpoint);
  829. }
  830. if (status == RPC_S_OK)
  831. {
  832. return RPC_S_OK;
  833. }
  834. }
  835. }
  836. // Operation aborted, continue around the loop.
  837. break;
  838. default:
  839. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  840. DPFLTR_WARNING_LEVEL,
  841. RPCTRANS "Invalid request type: 0x%x (%p)\n",
  842. pRequest->type, pRequest));
  843. ASSERT(0);
  844. break;
  845. }
  846. // Loop
  847. }
  848. ASSERT(0);
  849. return(RPC_S_INTERNAL_ERROR);
  850. }