Leaked source code of windows server 2003
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1093 lines
32 KiB

  1. /*++
  2. Copyright (C) Microsoft Corporation, 1996 - 1999
  3. Module Name:
  4. Complete.cxx
  5. Abstract:
  6. The place that IO completes
  7. Author:
  8. Mario Goertzel [MarioGo]
  9. Revision History:
  10. MarioGo 3/19/1996 Bits 'n pieces
  11. MarioGo 10/25/1996 Async RPC
  12. --*/
  13. #include <precomp.hxx>
  14. #include <trans.hxx>
  15. #include <cotrans.hxx>
  16. HANDLE RpcCompletionPort = 0;
  17. HANDLE InactiveRpcCompletionPort = 0;
  18. HANDLE *RpcCompletionPorts;
  19. long *CompletionPortHandleLoads;
  20. BASE_ADDRESS *AddressList = 0;
  21. HANDLE g_NotificationHandle = 0;
  22. LONG g_ListeningForPNPNotifications = 0;
  23. LONG g_NotifyRt = 0;
  24. OVERLAPPED g_Overlapped;
  25. CRITICAL_SECTION AddressListLock;
  26. RPC_STATUS
  27. RPC_ENTRY
  28. COMMON_PostNonIoEvent(
  29. RPC_TRANSPORT_EVENT Event,
  30. DWORD Type,
  31. PVOID Context
  32. )
  33. {
  34. BOOL b;
  35. int i = 5;
  36. ASSERT(Event != TRANSPORT_POSTED_KEY);
  37. do
  38. {
  39. // Kick a listening thread
  40. b = PostQueuedCompletionStatus(RpcCompletionPort,
  41. Type,
  42. Event,
  43. (LPOVERLAPPED)Context
  44. );
  45. if (b)
  46. {
  47. break;
  48. }
  49. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  50. DPFLTR_WARNING_LEVEL,
  51. RPCTRANS "PostQueuedCompleitonStatus failed %d\n",
  52. GetLastError()));
  53. Sleep(100);
  54. i--;
  55. }
  56. while(i);
  57. //
  58. // If this has failed we are out of luck unless something else manages
  59. // to wake up the listen thread.
  60. //
  61. // As of 4/19/96 PostQueuedCompletionStatus will only fail if the handle
  62. // is invalid or the kernel is unable to allocate a small bit of non-paged
  63. // pool. Either way we're toast...
  64. //
  65. ASSERT(b);
  66. if (!b)
  67. {
  68. return(RPC_S_OUT_OF_RESOURCES);
  69. }
  70. return(RPC_S_OK);
  71. }
  72. RPC_STATUS
  73. RPC_ENTRY
  74. COMMON_PostRuntimeEvent(
  75. IN DWORD Type,
  76. IN PVOID Context
  77. )
  78. /*++
  79. Routine Description:
  80. Posts an event to the completion port. This will complete
  81. with an event type of RuntimePosted, event status RPC_S_OK
  82. and event context of Context.
  83. Arguments:
  84. Context - Context associated with the event
  85. Return Value:
  86. RPC_S_OK
  87. RPC_S_OUT_OF_RESOURCES
  88. RPC_S_OUT_OF_MEMORY
  89. --*/
  90. {
  91. return(COMMON_PostNonIoEvent(RuntimePosted, Type, Context));
  92. }
  93. void
  94. COMMON_AddressManager(
  95. BASE_ADDRESS *pAddress
  96. )
  97. /*++
  98. Routine Description:
  99. When an address does not have an outstanding connect/accept/recv for some
  100. reason it is added to the AddressList global list of address objects. Listen
  101. threads will try to submit a listen on these as time passed. New addresses
  102. are put onto this list when they are ready to start listening.
  103. Arguments:
  104. pAddress - An address without an outstanding listen.
  105. Return Value:
  106. None
  107. --*/
  108. {
  109. EnterCriticalSection(&AddressListLock);
  110. if (pAddress->InAddressList == NotInList)
  111. {
  112. #if DBG
  113. // The address should not be in the list.
  114. BASE_ADDRESS *pT = AddressList;
  115. while(pT)
  116. {
  117. ASSERT(pT != pAddress);
  118. pT = pT->pNext;
  119. }
  120. #endif
  121. pAddress->pNext = AddressList;
  122. AddressList = pAddress;
  123. pAddress->InAddressList = InTheList;
  124. }
  125. LeaveCriticalSection(&AddressListLock);
  126. }
  127. void RPC_ENTRY
  128. COMMON_ServerCompleteListen(
  129. IN RPC_TRANSPORT_ADDRESS ThisAddress
  130. )
  131. /*++
  132. Routine Description:
  133. Called on an address once the runtime is really ready to start
  134. processing connections on this address.
  135. Arguments:
  136. Address - A fully initalized address which the runtime is
  137. ready to start receiving connection on.
  138. Return Value:
  139. None
  140. --*/
  141. {
  142. BASE_ADDRESS *pList = (BASE_ADDRESS *) ThisAddress;
  143. while(pList)
  144. {
  145. COMMON_AddressManager(pList);
  146. pList = pList->pNextAddress;
  147. }
  148. COMMON_ListenForPNPNotifications();
  149. // The TRANSPORT message indicates that a new
  150. // address has been added to the AddressList.
  151. COMMON_PostNonIoEvent(TRANSPORT, 0, 0);
  152. return;
  153. }
  154. RPC_STATUS RPC_ENTRY
  155. COMMON_PrepareNewHandle(HANDLE hAdd)
  156. /*++
  157. Routine Description:
  158. Generic wrapper used to add a newly create IO handle to
  159. to the IO completion port.
  160. Arguments:
  161. hAdd - The handle to be added to the port.
  162. Return Value:
  163. RPC_S_OK
  164. RPC_S_OUT_OF_MEMORY
  165. --*/
  166. {
  167. HANDLE h = CreateIoCompletionPort(hAdd,
  168. RpcCompletionPort,
  169. TRANSPORT_POSTED_KEY,
  170. 0);
  171. if (h)
  172. {
  173. ASSERT(h == RpcCompletionPort);
  174. return(RPC_S_OK);
  175. }
  176. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  177. DPFLTR_WARNING_LEVEL,
  178. RPCTRANS "CreateIoCompletionPort failed %d\n",
  179. GetLastError()));
  180. ASSERT((GetLastError() == ERROR_NO_SYSTEM_RESOURCES) ||
  181. (GetLastError() == ERROR_NOT_ENOUGH_QUOTA));
  182. return(RPC_S_OUT_OF_MEMORY);
  183. }
  184. void
  185. COMMON_RemoveAddress (
  186. IN BASE_ADDRESS *Address
  187. )
  188. /*++
  189. Function Name:COMMON_RemoveAddress
  190. Parameters:
  191. Description:
  192. This function must be called only when AddressListLock is held
  193. Remove address from the address manager list
  194. Returns:
  195. --*/
  196. {
  197. Address->InAddressList = Inactive;
  198. //
  199. // Close the sockets in the address
  200. //
  201. if (Address->type & DATAGRAM)
  202. {
  203. DG_DeactivateAddress((WS_DATAGRAM_ENDPOINT *) Address);
  204. }
  205. else
  206. {
  207. WS_DeactivateAddress((WS_ADDRESS *) Address);
  208. }
  209. }
  210. VOID
  211. RPC_ENTRY
  212. COMMON_StartPnpNotifications (
  213. )
  214. {
  215. ASSERT(RpcCompletionPort);
  216. g_NotifyRt = TRUE;
  217. COMMON_ListenForPNPNotifications();
  218. }
  219. VOID
  220. RPC_ENTRY
  221. COMMON_ListenForPNPNotifications (
  222. )
  223. /*++
  224. Function Name:COMMON_ListenForPNPNotifications
  225. Parameters:
  226. Description:
  227. Returns:
  228. --*/
  229. {
  230. int retval;
  231. HANDLE h;
  232. if (hWinsock2 == 0)
  233. {
  234. //
  235. // Winsock not loaded, don't need to do any PNP stuff
  236. //
  237. return;
  238. }
  239. if (InterlockedIncrement(&g_ListeningForPNPNotifications) != 1)
  240. {
  241. return;
  242. }
  243. // REVIEW: We may need to provide a mechanism to prevent spinning for lack of
  244. // resources
  245. if (g_NotificationHandle == 0)
  246. {
  247. retval = WSAProviderConfigChange(
  248. &g_NotificationHandle,
  249. 0, 0);
  250. if (retval != 0 || g_NotificationHandle == 0)
  251. {
  252. if (g_NotificationHandle)
  253. CloseHandle(g_NotificationHandle);
  254. goto Cleanup;
  255. }
  256. h = CreateIoCompletionPort(g_NotificationHandle,
  257. RpcCompletionPort,
  258. NewAddress,
  259. 0);
  260. if (h == 0)
  261. {
  262. CloseHandle(g_NotificationHandle);
  263. goto Cleanup;
  264. }
  265. else
  266. {
  267. ASSERT(h == RpcCompletionPort);
  268. }
  269. }
  270. // if the previous request is still there, we don't want to submit another one
  271. if (g_Overlapped.Internal != STATUS_PENDING)
  272. {
  273. g_Overlapped.hEvent = 0;
  274. g_Overlapped.Offset = 0;
  275. g_Overlapped.OffsetHigh = 0;
  276. retval = WSAProviderConfigChange(
  277. &g_NotificationHandle,
  278. &g_Overlapped,
  279. 0);
  280. if (retval != 0)
  281. {
  282. if (GetLastError() != WSA_IO_PENDING)
  283. {
  284. CloseHandle(g_NotificationHandle);
  285. goto Cleanup;
  286. }
  287. }
  288. }
  289. if (!TransportProtocol::ResubmitQueriesIfNecessary())
  290. {
  291. CloseHandle(g_NotificationHandle);
  292. goto Cleanup;
  293. }
  294. g_ListeningForPNPNotifications = 2;
  295. return;
  296. Cleanup:
  297. g_ListeningForPNPNotifications = 0;
  298. g_NotificationHandle = 0;
  299. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  300. DPFLTR_WARNING_LEVEL,
  301. RPCTRANS "COMMON_ListenForPNPNotifications failed\n"));
  302. }
  303. RPC_STATUS
  304. RPC_ENTRY
  305. COMMON_ProcessCalls(
  306. IN INT Timeout,
  307. OUT RPC_TRANSPORT_EVENT *pEvent,
  308. OUT RPC_STATUS *pEventStatus,
  309. IN OUT PVOID *ppEventContext,
  310. OUT UINT *pBufferLength,
  311. OUT BUFFER *pBuffer,
  312. OUT PVOID *ppSourceContext)
  313. /*++
  314. Routine Description:
  315. This routine waits for any async IO to complete for all protocols
  316. within a transport DLL. It maybe called by multiple threads at a
  317. time. A minimum of one thread should always be calling this function
  318. for each DLL.
  319. Note: async clients with no outstanding IO may allow the
  320. last thread to timeout and only call this function again
  321. when a new call is started.
  322. Note: During calls to this API in connection oriented servers
  323. a callback to I_RpcTransServerNewConnection() may occur.
  324. Arguments:
  325. Timeout - -1 - infinite
  326. other - number of milliseconds to wait for IO
  327. pEvent - Set on return to the type of IO event which finished.
  328. pEventStatus - The status of the IO event
  329. ppEventContext - On IN, the handle that the thread should dequeue on.
  330. On output the context of the event
  331. pBufferLength - If the event is successful then the number of
  332. bytes transferred.
  333. pBuffer - If the even is successful then the buffer associated
  334. with the IO.
  335. ppSourceContext - For datagram recvs this is the address
  336. of the sender.
  337. For connection sends this is the SendContext associated
  338. with the IO. For connection recvs it is NULL.
  339. Return Value:
  340. RPC_S_OK - IO completed, see pEventStatus.
  341. RPC_P_TIMEOUT - only if Timeout != INFINITE and is exceeded.
  342. --*/
  343. {
  344. BOOL b;
  345. ULONG_PTR key;
  346. DWORD bytes;
  347. RPC_STATUS status;
  348. LPOVERLAPPED lpOverlapped;
  349. PBASE_OVERLAPPED pBaseOverlapped;
  350. PREQUEST pRequest;
  351. PCONNECTION pConnection;
  352. PADDRESS pAddress;
  353. INT LocalTimeout;
  354. HANDLE hCompletionPortHandle = (HANDLE) *ppEventContext;
  355. DWORD LastError;
  356. ASSERT(RpcCompletionPort);
  357. *pEvent = 0;
  358. *pBuffer = 0;
  359. for(;;)
  360. {
  361. //
  362. // Do general house keeping work here. If it appears that more
  363. // house keeping work will be required in the future make
  364. // sure to reduce the LocalTimeout to something < INFINITE.
  365. //
  366. LocalTimeout = Timeout;
  367. // House keeping - look for any non-listening addresses and see if we
  368. // can make them listen now. Addresses start in this list and are added
  369. // back into the list if they are unable to submit a listen for some reason.
  370. if (AddressList)
  371. {
  372. EnterCriticalSection(&AddressListLock);
  373. if (AddressList)
  374. {
  375. pAddress = (PADDRESS)AddressList;
  376. AddressList = 0;
  377. if (Timeout == INFINITE)
  378. {
  379. // We want to wake up again soon and recheck the AddressList.
  380. LocalTimeout = 7*1000;
  381. }
  382. }
  383. else
  384. {
  385. pAddress = 0;
  386. }
  387. LeaveCriticalSection(&AddressListLock);
  388. while(pAddress)
  389. {
  390. PADDRESS pNext = (PADDRESS)pAddress->pNext;
  391. pAddress->pNext = 0;
  392. if (pAddress->InAddressList == InTheList)
  393. {
  394. pAddress->InAddressList = NotInList;
  395. pAddress->SubmitListen(pAddress);
  396. }
  397. pAddress = pNext;
  398. }
  399. }
  400. if (!g_ListeningForPNPNotifications)
  401. {
  402. COMMON_ListenForPNPNotifications();
  403. }
  404. //
  405. // The good part! Wait for something to happen...
  406. //
  407. b = GetQueuedCompletionStatus(hCompletionPortHandle,
  408. &bytes,
  409. &key,
  410. &lpOverlapped,
  411. LocalTimeout
  412. );
  413. if (!b && !lpOverlapped)
  414. {
  415. // If lpOverlapped is NULL this mean no IO completed.
  416. if ((status = GetLastError()) == STATUS_TIMEOUT)
  417. {
  418. if (Timeout == INFINITE)
  419. {
  420. continue;
  421. }
  422. return(RPC_P_TIMEOUT);
  423. }
  424. else
  425. {
  426. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  427. DPFLTR_WARNING_LEVEL,
  428. RPCTRANS "GetQueuedCompletionStatus failed %d\n",
  429. status));
  430. ASSERT(0);
  431. Sleep(1); // Avoid burning all the CPU in case we are hosed.
  432. continue;
  433. }
  434. }
  435. //PrintToDebugger("A request arrived at the completion port\n");
  436. if (key != TRANSPORT_POSTED_KEY)
  437. {
  438. if (b)
  439. {
  440. // Internal Non-IO posted event
  441. // Key - The type of event
  442. // lpOverlapped - The context associated with the event
  443. ASSERT( key == RuntimePosted
  444. || key == TRANSPORT
  445. || key == NewAddress);
  446. // RuntimePosted events allowed the RPC runtime to wake
  447. // a listening thread with an atbitrary context.
  448. if (key == RuntimePosted)
  449. {
  450. *pEvent = RuntimePosted;
  451. *pEventStatus = RPC_S_OK;
  452. *ppEventContext = lpOverlapped;
  453. *pBufferLength = bytes;
  454. return(RPC_S_OK);
  455. }
  456. //
  457. // A protocol was just loaded or unloaded. Take care of it
  458. //
  459. if (key == NewAddress)
  460. {
  461. if (TransportProtocol::HandlePnPStateChange())
  462. {
  463. g_ListeningForPNPNotifications = 0;
  464. *pEvent = NewAddress;
  465. return(RPC_S_OK);
  466. }
  467. // REVIEW: Not processing notification handling failures
  468. // may create problems where new protocols, or unloading of
  469. // old ones are ignored. This is not very bad, so we keep
  470. // it simple and ignore it.
  471. g_ListeningForPNPNotifications = 0;
  472. continue;
  473. }
  474. // TRANSPORT event is posted when a new address
  475. // has been added to the AddressListen. Simply continue
  476. // around the loop.
  477. ASSERT(bytes == 0);
  478. ASSERT(lpOverlapped == 0);
  479. }
  480. else
  481. {
  482. if (key == NewAddress)
  483. {
  484. g_ListeningForPNPNotifications = 0;
  485. }
  486. }
  487. continue;
  488. }
  489. ASSERT(!b || lpOverlapped);
  490. status = RPC_S_OK;
  491. if (!b)
  492. {
  493. pBaseOverlapped = FindOverlapped(lpOverlapped);
  494. pRequest = FindRequest(lpOverlapped);
  495. LastError = GetLastError();
  496. if (( pRequest->type & ADDRESS)
  497. && (LastError != ERROR_MORE_DATA))
  498. {
  499. VALIDATE(GetLastError())
  500. {
  501. ERROR_NETNAME_DELETED,
  502. ERROR_BAD_NETPATH,
  503. ERROR_NO_SYSTEM_RESOURCES,
  504. ERROR_SEM_TIMEOUT,
  505. ERROR_OPERATION_ABORTED,
  506. ERROR_HOST_UNREACHABLE,
  507. ERROR_NETWORK_UNREACHABLE,
  508. ERROR_UNEXP_NET_ERR,
  509. ERROR_NOT_ENOUGH_QUOTA,
  510. ERROR_BROKEN_PIPE,
  511. ERROR_CONNECTION_ABORTED
  512. } END_VALIDATE;
  513. COMMON_AddressManager((BASE_ADDRESS *)pRequest);
  514. continue;
  515. }
  516. switch (LastError)
  517. {
  518. case ERROR_MORE_DATA:
  519. {
  520. // Normal parital read of a connection request
  521. // or an oversized datagram. This is ok, falls
  522. // into the normal path.
  523. status = RPC_P_OVERSIZE_PACKET;
  524. break;
  525. }
  526. case ERROR_INVALID_HANDLE:
  527. // Named pipes allows a close to reach the server before
  528. // the read. When this happens the server rejects the read
  529. // with an invalid handle error.
  530. ASSERT(pRequest->id == NMP);
  531. ASSERT(pRequest->fAborted);
  532. // Fall into normal close case.
  533. case ERROR_NETNAME_DELETED:
  534. case ERROR_BROKEN_PIPE:
  535. case ERROR_PIPE_NOT_CONNECTED:
  536. case ERROR_NO_DATA:
  537. case ERROR_SEM_TIMEOUT:
  538. case ERROR_GRACEFUL_DISCONNECT:
  539. case WSAECONNRESET:
  540. case WSAESHUTDOWN:
  541. case WSAECONNABORTED:
  542. case WSAEHOSTDOWN:
  543. case ERROR_CONNECTION_ABORTED:
  544. {
  545. bytes = 0;
  546. ASSERT((pRequest->type & PROTO_MASK) == CONNECTION);
  547. // Will be handled as a close
  548. break;
  549. }
  550. case ERROR_NO_SYSTEM_RESOURCES:
  551. {
  552. //
  553. // This is just like the errors above except that both c/o and datagram requests
  554. // can generate it.
  555. //
  556. if ((pRequest->type & PROTO_MASK) == CONNECTION)
  557. {
  558. bytes = 0;
  559. // Will be handled as a close
  560. }
  561. else
  562. {
  563. bytes = 0;
  564. status = ERROR_OPERATION_ABORTED;
  565. }
  566. break;
  567. }
  568. case ERROR_OPERATION_ABORTED:
  569. {
  570. //
  571. // When a thread that issued an I/O dies the operation
  572. // completes with this error.
  573. // There are a couple cases here:
  574. // 1) The IO is datagram in which case we can just
  575. // reissue the I/O on this thread. In an idle
  576. // server eventually all DG I/O will migrate to
  577. // the single listening thread.
  578. // 2) The IO is on a client connection and the
  579. // the client thread has died. In this case
  580. // we need to abort the connection and return
  581. // to the runtime.
  582. // 3) If this happens on an address we have a bug.
  583. // 4) If this happens on a server connection we have a bug.
  584. //
  585. if (pRequest->type & DATAGRAM)
  586. {
  587. // We deal with this in the normal datagram path
  588. ASSERT(bytes == 0);
  589. status = ERROR_OPERATION_ABORTED;
  590. break;
  591. }
  592. ASSERT((pRequest->type & PROTO_MASK) == CONNECTION);
  593. // zero out the bytes just in case. Sometimes network operations
  594. // return positive byte count on operation aborted
  595. bytes = 0;
  596. // We'll treat this as a connection close on the client.
  597. // REVIEW: Maybe do something better.
  598. break;
  599. }
  600. case ERROR_NETWORK_UNREACHABLE:
  601. case ERROR_HOST_UNREACHABLE:
  602. case ERROR_PORT_UNREACHABLE:
  603. //
  604. // errors coming from ICMP packets to our UDP endpoint.
  605. // Winsock does not present this in a way our async architecture
  606. // can use, so ignore them.
  607. //
  608. if ((pRequest->type & PROTO_MASK) == CONNECTION)
  609. {
  610. bytes = 0;
  611. // Will be handled as a close
  612. }
  613. else
  614. {
  615. status = ERROR_OPERATION_ABORTED;
  616. }
  617. break;
  618. default:
  619. {
  620. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  621. DPFLTR_WARNING_LEVEL,
  622. RPCTRANS "IO failed %lX %d\n",
  623. pRequest,
  624. GetLastError()));
  625. ASSERT(0);
  626. status = RPC_S_OUT_OF_RESOURCES;
  627. // treat as a close
  628. bytes = 0;
  629. break;
  630. }
  631. }
  632. }
  633. // here we actually have a completed IO
  634. pBaseOverlapped = FindOverlapped(lpOverlapped);
  635. pRequest = FindRequest(lpOverlapped);
  636. if (pRequest == NULL)
  637. {
  638. ASSERT(0);
  639. return (RPC_S_INTERNAL_ERROR);
  640. }
  641. switch(pRequest->type & PROTO_MASK)
  642. {
  643. case CONNECTION:
  644. //
  645. // Connection IO completed.
  646. //
  647. I_RpcTransUnprotectThread(pBaseOverlapped->thread);
  648. pConnection = (PCONNECTION)pRequest;
  649. // A read or write either completed or failed
  650. *ppEventContext = pConnection;
  651. if (pBaseOverlapped == &pConnection->Read)
  652. {
  653. // Read completed
  654. *ppSourceContext = UlongToPtr(bytes);
  655. if (bytes == 0)
  656. {
  657. *pEvent = pConnection->type | RECEIVE;
  658. pConnection->Abort();
  659. *pEventStatus = RPC_P_CONNECTION_SHUTDOWN;
  660. return(RPC_S_OK);
  661. }
  662. status = pConnection->ProcessRead(bytes,
  663. pBuffer,
  664. pBufferLength);
  665. // N.B. Do not move the reading of the pConnection->type
  666. // before ProcessRead. ProcessRead can change the type based
  667. // on what it reads
  668. *pEvent = pConnection->type | RECEIVE;
  669. if (status != RPC_P_PARTIAL_RECEIVE)
  670. {
  671. ASSERT( status == RPC_P_RECEIVE_FAILED
  672. || status == RPC_S_OK
  673. || status == RPC_P_INITIALIZE_HTTP2_CONNECTION);
  674. *pEventStatus = status;
  675. return(RPC_S_OK);
  676. }
  677. // Message is not complete, submit the next read and continue.
  678. status = CO_SubmitRead(pConnection);
  679. if (status != RPC_S_OK)
  680. {
  681. ASSERT(status == RPC_P_RECEIVE_FAILED);
  682. *pEventStatus = status;
  683. return(RPC_S_OK);
  684. }
  685. }
  686. else
  687. {
  688. // Write completed
  689. CO_SEND_CONTEXT *pSend = (CO_SEND_CONTEXT *)pBaseOverlapped;
  690. ASSERT(pSend->Write.pAsyncObject == pConnection);
  691. *pEvent = pConnection->type | SEND;
  692. *ppSourceContext = pSend;
  693. *pBuffer = pSend->pWriteBuffer;
  694. if (bytes == 0)
  695. {
  696. pConnection->Abort();
  697. *pEventStatus = RPC_P_SEND_FAILED;
  698. *pBufferLength = 0;
  699. }
  700. else
  701. {
  702. status = RPC_S_OK;
  703. *pEventStatus = status;
  704. *pBufferLength = pSend->maxWriteBuffer;
  705. // Netbios client-side writes are sizeof(DWORD) too big since
  706. // they also include the sequence number.
  707. ASSERT( bytes == pSend->maxWriteBuffer
  708. || ( (bytes == pSend->maxWriteBuffer + sizeof(DWORD))
  709. && ((pConnection->type & TYPE_MASK) == CLIENT) ) );
  710. }
  711. return(RPC_S_OK);
  712. }
  713. break;
  714. case ADDRESS:
  715. {
  716. // ASSERT(bytes == 0);
  717. pAddress = (PADDRESS)pRequest;
  718. PCONNECTION pNewConnection = 0;
  719. status = pAddress->NewConnection(pAddress, &pNewConnection);
  720. if (RPC_S_OK == status)
  721. {
  722. // Opened a connection, now try to submit the first recv.
  723. ASSERT(pNewConnection);
  724. RPC_CONNECTION_TRANSPORT *pInfo;
  725. pInfo = (RPC_CONNECTION_TRANSPORT *)TransportTable[pAddress->id].pInfo;
  726. ASSERT(pInfo->Recv);
  727. status = (pInfo->Recv)(pNewConnection);
  728. if (RPC_S_OK != status)
  729. {
  730. ASSERT(status == RPC_P_RECEIVE_FAILED);
  731. *pEvent = pNewConnection->type | RECEIVE;
  732. *ppEventContext = pNewConnection;
  733. *pEventStatus = status;
  734. return(RPC_S_OK);
  735. }
  736. }
  737. // Connection has been established or closed, either
  738. // way we can continue around the loop.
  739. }
  740. break;
  741. case DATAGRAM:
  742. {
  743. BASE_ASYNC_OBJECT *pBase = (BASE_ASYNC_OBJECT*)pRequest;
  744. #ifdef NCADG_MQ_ON
  745. if (pBase->id == MSMQ)
  746. {
  747. // MSMQ (Falcon) datagram path:
  748. MQ_DATAGRAM *pDatagram = (MQ_DATAGRAM*)pRequest;
  749. MQ_DATAGRAM_ENDPOINT *pEndpoint = (MQ_DATAGRAM_ENDPOINT*)pDatagram->pEndpoint;
  750. if (status == RPC_P_OVERSIZE_PACKET)
  751. {
  752. // Data still pending, get it:
  753. status = MQ_ResizePacket( pEndpoint,
  754. (void**)&pDatagram->pAddress,
  755. (unsigned int*)pBufferLength,
  756. pBuffer );
  757. }
  758. if (status == RPC_S_OK)
  759. {
  760. MQ_FillInAddress(pDatagram->pAddress,pDatagram->Read.aMsgPropVar);
  761. *pEvent = pDatagram->type;
  762. *pEventStatus = status;
  763. *ppEventContext = pEndpoint;
  764. // WATCH OUT! MSMQ doesn't return the size in "bytes"
  765. // from GetQueuedCompletionStatus() like everything
  766. // else does! We need to extract the #bytes from the
  767. // message structure.
  768. //
  769. // DON'T: *pBufferLength = bytes;
  770. *pBufferLength = pDatagram->Read.aMsgPropVar[1].ulVal;
  771. *pBuffer = (BUFFER)pDatagram->pPacket;
  772. *ppSourceContext = pDatagram->pAddress;
  773. pDatagram->pPacket = 0;
  774. pDatagram->dwPacketSize = 0;
  775. }
  776. pDatagram->Busy = 0;
  777. LONG c = InterlockedDecrement(&pEndpoint->cPendingIos);
  778. ASSERT(c >= 0);
  779. if (c == 0)
  780. {
  781. // No pending receives, time to post more. This doesn't
  782. // get hit very often, normally additional recieves are
  783. // after sending a packet. (see DG_SendPacket)
  784. MQ_SubmitReceives(pEndpoint);
  785. }
  786. if (status == RPC_S_OK)
  787. {
  788. return RPC_S_OK;
  789. }
  790. }
  791. else
  792. #endif
  793. {
  794. // Normal datagram path:
  795. WS_DATAGRAM *pDatagram = (WS_DATAGRAM *)pRequest;
  796. WS_DATAGRAM_ENDPOINT *pEndpoint = (WS_DATAGRAM_ENDPOINT*)pDatagram->pEndpoint;
  797. if (status == RPC_P_OVERSIZE_PACKET)
  798. {
  799. ASSERT(bytes == pDatagram->Packet.len);
  800. }
  801. if ( status == RPC_S_OK
  802. || status == RPC_P_OVERSIZE_PACKET)
  803. {
  804. // A receive completed
  805. ASSERT(bytes);
  806. *pEvent = pDatagram->type;
  807. *pEventStatus = status;
  808. *ppEventContext = pEndpoint;
  809. *pBufferLength = bytes;
  810. *pBuffer = (BUFFER)pDatagram->Packet.buf;
  811. *ppSourceContext = pDatagram->AddressPair;
  812. ASSERT( pDatagram->Packet.buf );
  813. // Ready the datagram for another IO operation.
  814. pDatagram->Packet.buf = 0;
  815. status = RPC_S_OK;
  816. }
  817. #if DBG
  818. if (status != RPC_S_OK &&
  819. status != ERROR_OPERATION_ABORTED)
  820. {
  821. DbgPrint("RPC: I/O completed with 0x%x\n", status);
  822. ASSERT( 0 );
  823. }
  824. #endif
  825. // Do not touch the datagram after this!
  826. pDatagram->Busy = 0;
  827. LONG c = InterlockedDecrement(&pEndpoint->cPendingIos);
  828. ASSERT(c >= 0);
  829. if (c == 0)
  830. {
  831. // No pending receives, time to post more. This doesn't
  832. // get hit very often, normally additional recieves are
  833. // after sending a packet. (see DG_SendPacket)
  834. DG_SubmitReceives(pEndpoint);
  835. }
  836. if (status == RPC_S_OK)
  837. {
  838. return RPC_S_OK;
  839. }
  840. }
  841. }
  842. // Operation aborted, continue around the loop.
  843. break;
  844. default:
  845. TransDbgPrint((DPFLTR_RPCPROXY_ID,
  846. DPFLTR_WARNING_LEVEL,
  847. RPCTRANS "Invalid request type: 0x%x (%p)\n",
  848. pRequest->type, pRequest));
  849. ASSERT(0);
  850. break;
  851. }
  852. // Loop
  853. }
  854. ASSERT(0);
  855. return(RPC_S_INTERNAL_ERROR);
  856. }