Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2510 lines
65 KiB

  1. /*++
  2. Copyright (c) 1996,1997 Microsoft Corporation
  3. Module Name:
  4. SEND.C
  5. Abstract:
  6. Send Handler and Send Thread.
  7. Author:
  8. Aaron Ogus (aarono)
  9. Environment:
  10. Win32/COM
  11. Revision History:
  12. Date Author Description
  13. ====== ====== ============================================================
  14. 12/10/96 aarono Original
  15. 2/18/98 aarono added support for SendEx
  16. 2/18/98 aarono added support for Cancel
  17. 2/20/98 aarono B#18827 not pulling Cancelled sends off queue
  18. 3/09/98 aarono documented workaround for mmTimers on Win95, removed dead code.
  19. 3/29/98 aarono fixed locking for ReliableSend
  20. 3/30/98 aarono make sure erroring sends moved to Done state to avoid reprocess.
  21. 4/14/98 a-peterz B#18340 DPSEND_NOCOPY subsumes DPSEND_NOBUFFERCOPY
  22. 5/18/98 aarono fixed SendEx with scatter gather
  23. 6/6/98 aarono Turn on throttling and windowing
  24. --*/
  25. #include <windows.h>
  26. #include "newdpf.h"
  27. #include <mmsystem.h>
  28. #include <dplay.h>
  29. #include <dplaysp.h>
  30. #include <dplaypr.h>
  31. #include "mydebug.h"
  32. #include "arpd.h"
  33. #include "arpdint.h"
  34. #include "macros.h"
  35. #include "mytimer.h"
  36. BOOL DGCompleteSend(PSEND pSend);
  37. // a-josbor: for debuggin purposes only
  38. extern DWORD ExtractProtocolIds(PUCHAR pInBuffer, PUINT pdwIdFrom, PUINT pdwIdTo);
  39. INT AddSendRef(PSEND pSend, UINT count)
  40. {
  41. INT newcount;
  42. Lock(&pSend->SendLock);
  43. Lock(&g_SendTimeoutListLock);
  44. if(pSend->bCleaningUp){
  45. DPF(1,"WARNING: ADDSENDREF tried to add reference to cleaning up send\n");
  46. newcount=0;
  47. goto exit;
  48. }
  49. if(!pSend->RefCount){
  50. // Anyone calling addsend ref requires a reference on the session
  51. Unlock(&g_SendTimeoutListLock);
  52. Unlock(&pSend->SendLock);
  53. Lock(&pSend->pSession->pProtocol->m_SessionLock);
  54. Lock(&pSend->pSession->SessionLock);
  55. Lock(&pSend->SendLock);
  56. Lock(&g_SendTimeoutListLock);
  57. InterlockedIncrement((PLONG)&pSend->pSession->RefCount);
  58. Unlock(&pSend->pSession->SessionLock);
  59. Unlock(&pSend->pSession->pProtocol->m_SessionLock);
  60. }
  61. newcount = pSend->RefCount+count;
  62. pSend->RefCount = newcount;
  63. exit:
  64. Unlock(&g_SendTimeoutListLock);
  65. Unlock(&pSend->SendLock);
  66. return newcount;
  67. }
  68. // Critical Section must not be held when this is called, unless there
  69. // is a reference for holding the critical section (ie. will not hit 0).
  70. INT DecSendRef(PPROTOCOL pProtocol, PSEND pSend)
  71. {
  72. INT count;
  73. PSESSION pSession;
  74. Lock(&pSend->SendLock);
  75. count=InterlockedDecrement((PLONG)&pSend->RefCount);//count is zero if result of dec is zero, otw nonzero but not actual count.
  76. if(!count){
  77. pSession=pSend->pSession;
  78. pSend->bCleaningUp=TRUE;
  79. Unlock(&pSend->SendLock);
  80. // pull the Send off of the global queue and the session queue
  81. Lock(&pProtocol->m_SendQLock);
  82. Lock(&pSession->SessionLock);
  83. Lock(&pSend->SendLock);
  84. Lock(&g_SendTimeoutListLock);
  85. if(!pSend->RefCount){
  86. Delete(&pSend->TimeoutList);
  87. Delete(&pSend->m_GSendQ);
  88. Delete(&pSend->SendQ);
  89. } else {
  90. count=pSend->RefCount;
  91. }
  92. Unlock(&g_SendTimeoutListLock);
  93. Unlock(&pSend->SendLock);
  94. Unlock(&pSession->SessionLock);
  95. Unlock(&pProtocol->m_SendQLock);
  96. if(!count){
  97. DecSessionRef(pSession);
  98. DPF(9,"DecSendRef: pSession %x pSend %x Freeing Send, called from %x\n",pSession, pSend, _ReturnAddress());
  99. FreeHandleTableEntry(&pProtocol->lpHandleTable,&pProtocol->csHandleTable,pSend->dwMsgID);
  100. // Free the message buffer(s) (including memory if WE allocated it).
  101. FreeBufferChainAndMemory(pSend->pMessage);
  102. // BUGBUG:move any Stats we want to keep to the session.
  103. // free the send.(handles the stats for now).
  104. ReleaseSendDesc(pSend);
  105. }
  106. } else {
  107. DPF(9,"DecSendRef: pSession %x pSend %x count %d, called from %x\n",pSend->pSession, pSend, count,_ReturnAddress());
  108. if(count&0x80000000){
  109. DEBUG_BREAK();
  110. }
  111. Unlock(&pSend->SendLock);
  112. }
  113. return count;
  114. }
  115. // SFLAGS_DOUBLEBUFFER - if the send is ASYNCHRONOUS, make a copy of the data
  116. /*=============================================================================
  117. Send - Send a message to a client.
  118. Description:
  119. Used by the client to send a message to another directplay client
  120. or server.
  121. Parameters:
  122. ARPDID idFrom - who is sending this message
  123. ARPDID idTo - target
  124. DWORD dwSendFlags - specifies buffer ownership, priority, reliable
  125. LPVOID pBuffers - Array of buffer and lengths
  126. DWORD dwBufferCount - number of entries in array
  127. PASYNCINFO pAsyncInfo - If specified, call is asynchronous
  128. typedef struct _ASYNCSENDINFO {
  129. UINT Private[4];
  130. HANDLE hEvent;
  131. PSEND_CALLBACK SendCallBack;
  132. PVOID CallBackContext;
  133. UINT Status;
  134. } ASYNCSENDINFO, *PASYNCSENDINFO;
  135. hEvent - event to signal when send completes.
  136. SendCallBack - routine to call when send completes.
  137. CallBackContext - context passed to SendCallBack.
  138. Status - send completion status.
  139. Return Values:
  140. DP_OK - no problem
  141. DPERR_INVALIDPARAMS
  142. -----------------------------------------------------------------------------*/
  143. HRESULT Send(
  144. PPROTOCOL pProtocol,
  145. DPID idFrom,
  146. DPID idTo,
  147. DWORD dwSendFlags,
  148. LPVOID pBuffers,
  149. DWORD dwBufferCount,
  150. DWORD dwSendPri,
  151. DWORD dwTimeOut,
  152. LPVOID lpvUserMsgID,
  153. LPDWORD lpdwMsgID,
  154. BOOL bSendEx,
  155. PASYNCSENDINFO pAsyncInfo
  156. )
  157. {
  158. HRESULT hr=DP_OK;
  159. PSESSION pSession;
  160. PBUFFER pSendBufferChain;
  161. PSEND pSend;
  162. pSession=GetSysSession(pProtocol,idTo);
  163. if(!pSession) {
  164. DPF(4,"NO SESSION for idTo %x, returning SESSIONLOST\n",idTo);
  165. hr=DPERR_CONNECTIONLOST;
  166. goto exit2;
  167. }
  168. pSend=GetSendDesc();
  169. if(!pSend){
  170. ASSERT(0); //TRACE all paths.
  171. hr=DPERR_OUTOFMEMORY;
  172. goto exit;
  173. }
  174. pSend->pProtocol=pProtocol;
  175. // fails by returning 0 in which case cancel won't be available for this send.
  176. pSend->dwMsgID=AllocHandleTableEntry(&pProtocol->lpHandleTable, &pProtocol->csHandleTable, pSend);
  177. if(lpdwMsgID){
  178. *lpdwMsgID=pSend->dwMsgID;
  179. }
  180. pSend->lpvUserMsgID = lpvUserMsgID;
  181. pSend->bSendEx = bSendEx;
  182. // if pAsyncInfo is provided, the call is asynchronous.
  183. // if dwFlags DPSEND_ASYNC is set, the call is async.
  184. // if the call is asynchronous and double buffering is
  185. // required, we must make a copy of the data.
  186. if((pAsyncInfo||(dwSendFlags & DPSEND_ASYNC)) && (!(dwSendFlags & DPSEND_NOCOPY))){
  187. // Need to copy the memory
  188. pSendBufferChain=GetDoubleBufferAndCopy((PMEMDESC)pBuffers,dwBufferCount);
  189. // BUGBUG: if the provider requires contiguous buffers, we should
  190. // break this down into packet allocations, and chain them
  191. // on the send immediately. Using the packet chain to indicate
  192. // to ISend routine that the message is already broken down.
  193. } else {
  194. // Build a send buffer chain for the described buffers.
  195. pSendBufferChain=BuildBufferChain((PMEMDESC)pBuffers,dwBufferCount);
  196. }
  197. if(!pSendBufferChain){
  198. ASSERT(0); //TRACE all paths.
  199. return DPERR_OUTOFMEMORY;
  200. }
  201. pSend->pSession = pSession; //!!! when this is dropped, deref the connection
  202. pSend->pMessage = pSendBufferChain;
  203. pSend->MessageSize = BufferChainTotalSize(pSendBufferChain);
  204. pSend->SendOffset = 0;
  205. pSend->pCurrentBuffer = pSend->pMessage;
  206. pSend->CurrentBufferOffset = 0;
  207. pSend->Priority = dwSendPri;
  208. pSend->dwFlags = dwSendFlags;
  209. if(pAsyncInfo){
  210. pSend->pAsyncInfo = &pSend->AsyncInfo;
  211. pSend->AsyncInfo = *pAsyncInfo; //copy Async info from client.
  212. } else {
  213. pSend->pAsyncInfo = NULL;
  214. if(pSend->dwFlags & DPSEND_ASYNC){
  215. pSend->AsyncInfo.hEvent = 0;
  216. pSend->AsyncInfo.SendCallBack = InternalSendComplete;
  217. pSend->AsyncInfo.CallBackContext= pSend;
  218. pSend->AsyncInfo.pStatus = &pSend->Status;
  219. }
  220. }
  221. pSend->SendState = Start;
  222. pSend->RetryCount = 0;
  223. pSend->PacketSize = pSession->MaxPacketSize;
  224. pSend->fUpdate = FALSE;
  225. pSend->NR = 0;
  226. pSend->NS = 0;
  227. //pSend->SendSEQMSK = // filled in on the fly.
  228. pSend->WindowSize = pSession->WindowSize;
  229. pSend->SAKInterval = (pSend->WindowSize+1)/2;
  230. pSend->SAKCountDown = pSend->SAKInterval;
  231. pSend->uRetryTimer = 0;
  232. pSend->idFrom = idFrom;
  233. pSend->idTo = idTo;
  234. pSend->wIdFrom = GetIndexByDPID(pProtocol, idFrom);
  235. pSend->wIdTo = (WORD)pSession->iSession;
  236. pSend->RefCount = 0; // if provider does async send counts references.
  237. pSend->serial = 0;
  238. pSend->tLastACK = timeGetTime();
  239. pSend->dwSendTime = pSend->tLastACK;
  240. pSend->dwTimeOut = dwTimeOut;
  241. pSend->BytesThisSend = 0;
  242. pSend->messageid = -1; // avoid matching this send in ACK/NACK handlers
  243. pSend->bCleaningUp = FALSE;
  244. hr=ISend(pProtocol,pSession, pSend);
  245. exit:
  246. DecSessionRef(pSession);
  247. exit2:
  248. return hr;
  249. }
  250. /*================================================================================
  251. Send Completion information matrix:
  252. ===================================
  253. (pSend->dwFlags & ASEND_PROTOCOL)
  254. |
  255. Sync Async Internal (Async)
  256. -------------- ----- --------------------
  257. pSend->pAsyncInfo 0 user 0
  258. pSend->AI.SendCallback 0 user InternalSendComplete
  259. pSend->AI.hEvent pSend->hEvent user 0
  260. pSend->AI.pStatus &pSend->Status user &pSend->Status
  261. ---------------------------------------------------------------------------*/
  262. HRESULT ISend(
  263. PPROTOCOL pProtocol,
  264. PSESSION pSession,
  265. PSEND pSend
  266. )
  267. {
  268. HRESULT hr=DP_OK;
  269. DWORD_PTR fAsync;
  270. BOOL fCallDirect=FALSE;
  271. fAsync=(DWORD_PTR)(pSend->pAsyncInfo);
  272. if(!fAsync && !(pSend->dwFlags & (ASEND_PROTOCOL|DPSEND_ASYNC))) {
  273. //Synchronous call, and not a protocol generated packet
  274. pSend->AsyncInfo.SendCallBack=NULL;
  275. //AsyncInfo.CallbackContext=0; //not required.
  276. pSend->AsyncInfo.hEvent=pSend->hEvent;
  277. pSend->AsyncInfo.pStatus=&pSend->Status;
  278. ResetEvent(pSend->hEvent);
  279. }
  280. // don't need to check if ref added here since the send isn't on a list yet.
  281. AddSendRef(pSend,2); // 1 for ISend, 1 for completion.
  282. DPF(9,"ISend: ==>Q\n");
  283. hr=QueueSendOnSession(pProtocol,pSession,pSend);
  284. DPF(9,"ISend: <==Q\n");
  285. if(hr==DP_OK){
  286. if(!fAsync && !(pSend->dwFlags & (ASEND_PROTOCOL|DPSEND_ASYNC))){
  287. // Synchronous call, and not internal, we need
  288. // to wait until the send has completed.
  289. if(!(pSend->dwFlags & DPSEND_GUARANTEED)){
  290. // Non-guaranteed, need to drop dplay lock, in
  291. // guaranteed case, dplay already dropped it for us.
  292. LEAVE_DPLAY();
  293. }
  294. DPF(9,"ISend: Wait==> %x\n",pSend->hEvent);
  295. Wait(pSend->hEvent);
  296. if(!(pSend->dwFlags & DPSEND_GUARANTEED)){
  297. ENTER_DPLAY();
  298. }
  299. DPF(9,"ISend: <== WAIT\n");
  300. hr=pSend->Status;
  301. } else {
  302. hr=DPERR_PENDING;
  303. }
  304. } else {
  305. DecSendRef(pProtocol, pSend); //not going to complete a send that didn't enqueue.
  306. }
  307. DecSendRef(pProtocol,pSend);
  308. return hr;
  309. }
  310. HRESULT QueueSendOnSession(
  311. PPROTOCOL pProtocol, PSESSION pSession, PSEND pSend
  312. )
  313. {
  314. BILINK *pBilink; // walks the links scanning priority
  315. BILINK *pPriQLink; // runs links in the global priority queue.
  316. PSEND pSendWalker; // pointer to send structure
  317. BOOL fFront; // if we put this at the front of the CON SendQ
  318. BOOL fSignalQ=TRUE; // whether to signal the sendQ
  319. // BUGBUG: locking global and connection queues concurrently,
  320. // -> this better be fast!
  321. ASSERT_SIGN(pSend, SEND_SIGN);
  322. Lock(&pProtocol->m_SendQLock);
  323. Lock(&pSession->SessionLock);
  324. Lock(&pSend->SendLock);
  325. if(pSession->eState != Open){
  326. Unlock(&pSend->SendLock);
  327. Unlock(&pSession->SessionLock);
  328. Unlock(&pProtocol->m_SendQLock);
  329. return DPERR_CONNECTIONLOST;
  330. }
  331. if(!(pSend->dwFlags & ASEND_PROTOCOL)){
  332. pProtocol->m_dwBytesPending += pSend->MessageSize;
  333. pProtocol->m_dwMessagesPending += 1;
  334. }
  335. // Put on Connection SendQ
  336. // First Check if we are highest priority.
  337. pBilink = pSession->SendQ.next;
  338. pSendWalker=CONTAINING_RECORD(pBilink, SEND, SendQ);
  339. if(pBilink == &pSession->SendQ || pSendWalker->Priority < pSend->Priority)
  340. {
  341. InsertAfter(&pSend->SendQ,&pSession->SendQ);
  342. fFront=TRUE;
  343. } else {
  344. // Scan backwards through the SendQ until we find a Send with a higher
  345. // or equal priority and insert ourselves afterwards. This is optimized
  346. // for the same pri send case.
  347. pBilink = pSession->SendQ.prev;
  348. while(TRUE /*pBilink != &pSend->SendQ*/){
  349. pSendWalker = CONTAINING_RECORD(pBilink, SEND, SendQ);
  350. ASSERT_SIGN(pSendWalker, SEND_SIGN);
  351. if(pSend->Priority <= pSendWalker->Priority){
  352. InsertAfter(&pSend->SendQ, &pSendWalker->SendQ);
  353. fFront=FALSE;
  354. break;
  355. }
  356. pBilink=pBilink->prev;
  357. }
  358. ASSERT(pBilink != &pSend->SendQ);
  359. }
  360. //
  361. // Put on Global SendQ
  362. //
  363. if(!fFront){
  364. // We queued it not at the front, therefore there are already
  365. // entries in the Global Queue and we need to be inserted
  366. // after the entry that we are behind, so start scanning the
  367. // global queue backwards from the packet ahead of us in the
  368. // Connection Queue until we find a lower priority packet
  369. // get pointer into previous packet in queue.
  370. pBilink=pSend->SendQ.prev;
  371. // get pointer to the PriorityQ record of the previous packet.
  372. pPriQLink = &(CONTAINING_RECORD(pBilink, SEND, SendQ))->m_GSendQ;
  373. while(pPriQLink != &pProtocol->m_GSendQ){
  374. pSendWalker = CONTAINING_RECORD(pPriQLink, SEND, m_GSendQ);
  375. ASSERT_SIGN(pSendWalker, SEND_SIGN);
  376. if(pSendWalker->Priority < pSend->Priority){
  377. InsertBefore(&pSend->m_GSendQ, &pSendWalker->m_GSendQ);
  378. break;
  379. }
  380. pPriQLink=pPriQLink->next;
  381. }
  382. if(pPriQLink==&pProtocol->m_GSendQ){
  383. // put at the end of the list.
  384. InsertBefore(&pSend->m_GSendQ, &pProtocol->m_GSendQ);
  385. }
  386. } else {
  387. // There was no-one in front of us on the connection. So
  388. // we look at the head of the global queue first and then scan
  389. // from the back.
  390. pBilink = pProtocol->m_GSendQ.next;
  391. pSendWalker=CONTAINING_RECORD(pBilink, SEND, m_GSendQ);
  392. if(pBilink == &pProtocol->m_GSendQ || pSend->Priority > pSendWalker->Priority)
  393. {
  394. InsertAfter(&pSend->m_GSendQ,&pProtocol->m_GSendQ);
  395. } else {
  396. // Scan backwards through the m_GSendQ until we find a Send with a higher
  397. // or equal priority and insert ourselves afterwards. This is optimized
  398. // for the same pri send case.
  399. pBilink = pProtocol->m_GSendQ.prev;
  400. while(TRUE){
  401. pSendWalker = CONTAINING_RECORD(pBilink, SEND, m_GSendQ);
  402. ASSERT_SIGN(pSendWalker, SEND_SIGN);
  403. if(pSend->Priority <= pSendWalker->Priority){
  404. InsertAfter(&pSend->m_GSendQ, &pSendWalker->m_GSendQ);
  405. break;
  406. }
  407. pBilink=pBilink->prev;
  408. }
  409. ASSERT(pBilink != &pProtocol->m_GSendQ);
  410. }
  411. }
  412. // Fixup send state if we are blocking other sends on the session.
  413. if(pSend->dwFlags & DPSEND_GUARANTEED){
  414. if(pSession->nWaitingForMessageid){
  415. pSend->SendState=WaitingForId;
  416. pSession->nWaitingForMessageid++;
  417. fSignalQ=FALSE;
  418. }
  419. } else {
  420. if(pSession->nWaitingForDGMessageid){
  421. pSend->SendState=WaitingForId;
  422. pSession->nWaitingForDGMessageid++;
  423. fSignalQ=FALSE;
  424. }
  425. }
  426. #ifdef DEBUG
  427. DPF(9,"SessionQ:");
  428. pBilink=pSession->SendQ.next;
  429. while(pBilink!=&pSession->SendQ){
  430. pSendWalker=CONTAINING_RECORD(pBilink, SEND, SendQ);
  431. ASSERT_SIGN(pSendWalker,SEND_SIGN);
  432. DPF(9,"Send %x pSession %x Pri %x State %d\n",pSendWalker,pSendWalker->pSession,pSendWalker->Priority,pSendWalker->SendState);
  433. pBilink=pBilink->next;
  434. }
  435. DPF(9,"GlobalQ:");
  436. pBilink=pProtocol->m_GSendQ.next;
  437. while(pBilink!=&pProtocol->m_GSendQ){
  438. pSendWalker=CONTAINING_RECORD(pBilink, SEND, m_GSendQ);
  439. ASSERT_SIGN(pSendWalker,SEND_SIGN);
  440. DPF(9,"Send %x pSession %x Pri %x State %d\n",pSendWalker,pSendWalker->pSession,pSendWalker->Priority,pSendWalker->SendState);
  441. pBilink=pBilink->next;
  442. }
  443. #endif
  444. Unlock(&pSend->SendLock);
  445. Unlock(&pSession->SessionLock);
  446. Unlock(&pProtocol->m_SendQLock);
  447. if(fSignalQ){
  448. // tell send thread to process.
  449. SetEvent(pProtocol->m_hSendEvent);
  450. }
  451. return DP_OK;
  452. }
  453. /*=============================================================================
  454. CopyDataToFrame
  455. Description:
  456. Copies data for a frame from the Send to the frame's data area.
  457. Parameters:
  458. pFrameData - pointer to data area
  459. FrameDataSize - Size of the Frame Data area
  460. pSend - send from which to get data
  461. nAhead - number of frames ahead of NR to get data for.
  462. Return Values:
  463. Number of bytes copied.
  464. Notes:
  465. Send must be locked across this call.
  466. -----------------------------------------------------------------------------*/
  467. UINT CopyDataToFrame(
  468. PUCHAR pFrameData,
  469. UINT FrameDataLen,
  470. PSEND pSend,
  471. UINT nAhead)
  472. {
  473. UINT BytesToAdvance, BytesToCopy;
  474. UINT FrameOffset=0;
  475. PUCHAR dest,src;
  476. UINT len;
  477. UINT totlen=0;
  478. UINT SendOffset;
  479. PBUFFER pSrcBuffer;
  480. UINT CurrentBufferOffset;
  481. BytesToAdvance = nAhead*FrameDataLen;
  482. SendOffset = pSend->SendOffset;
  483. pSrcBuffer = pSend->pCurrentBuffer;
  484. CurrentBufferOffset = pSend->CurrentBufferOffset;
  485. //
  486. // Run ahead to the buffer we start getting data from
  487. //
  488. while(BytesToAdvance){
  489. len = pSrcBuffer->len - CurrentBufferOffset;
  490. if(len > BytesToAdvance){
  491. CurrentBufferOffset += BytesToAdvance;
  492. SendOffset+=BytesToAdvance;
  493. BytesToAdvance=0;
  494. } else {
  495. pSrcBuffer=pSrcBuffer->pNext;
  496. CurrentBufferOffset = 0;
  497. BytesToAdvance-=len;
  498. SendOffset+=len;
  499. }
  500. }
  501. //
  502. // Copy the data for the Send into the frame
  503. //
  504. BytesToCopy = pSend->MessageSize - SendOffset;
  505. if(BytesToCopy > FrameDataLen){
  506. BytesToCopy=FrameDataLen;
  507. }
  508. while(BytesToCopy){
  509. ASSERT(pSrcBuffer);
  510. dest= pFrameData + FrameOffset;
  511. src = pSrcBuffer->pData + CurrentBufferOffset;
  512. len = pSrcBuffer->len - CurrentBufferOffset;
  513. if(len > BytesToCopy){
  514. len=BytesToCopy;
  515. CurrentBufferOffset+=len;//BUGBUG: not used after, don't need.
  516. } else {
  517. pSrcBuffer = pSrcBuffer->pNext;
  518. CurrentBufferOffset = 0;
  519. }
  520. BytesToCopy -= len;
  521. FrameOffset += len;
  522. totlen+=len;
  523. memcpy(dest,src,len);
  524. }
  525. return totlen;
  526. }
  527. // NOTE: ONLY 1 SEND THREAD ALLOWED.
  528. ULONG WINAPI SendThread(LPVOID pProt)
  529. {
  530. PPROTOCOL pProtocol=((PPROTOCOL)pProt);
  531. UINT SendRc;
  532. while(TRUE){
  533. WaitForSingleObject(pProtocol->m_hSendEvent, INFINITE);
  534. Lock(&pProtocol->m_ObjLock);
  535. if(pProtocol->m_eState==ShuttingDown){
  536. pProtocol->m_nSendThreads--;
  537. Unlock(&pProtocol->m_ObjLock);
  538. ExitThread(0);
  539. }
  540. Unlock(&pProtocol->m_ObjLock);
  541. do {
  542. SendRc=SendHandler(pProtocol);
  543. } while (SendRc!=DPERR_NOMESSAGES);
  544. }
  545. return TRUE;
  546. }
  547. // Called with SendLock held.
  548. VOID CancelRetryTimer(PSEND pSend)
  549. {
  550. // UINT mmError;
  551. UINT retrycount=0;
  552. UINT_PTR uRetryTimer;
  553. UINT Unique;
  554. if(pSend->uRetryTimer){
  555. DPF(9,"Canceling Timer %x\n",pSend->uRetryTimer);
  556. // Delete it from the list first so we don't deadlock trying to kill it.
  557. Lock(&g_SendTimeoutListLock);
  558. uRetryTimer=pSend->uRetryTimer;
  559. Unique=pSend->TimerUnique;
  560. pSend->uRetryTimer=0;
  561. if(!EMPTY_BILINK(&pSend->TimeoutList)){
  562. Delete(&pSend->TimeoutList);
  563. InitBilink(&pSend->TimeoutList); // avoids DecSendRef having to know state of bilink.
  564. Unlock(&g_SendTimeoutListLock);
  565. CancelMyTimer(uRetryTimer, Unique);
  566. } else {
  567. Unlock(&g_SendTimeoutListLock);
  568. }
  569. } else {
  570. DPF(9,"CancelRetryTimer:No timer to cancel.\n");
  571. }
  572. }
  573. // Workaround for Win95 mmTimers:
  574. // ==============================
  575. //
  576. // We cannot use a reference count for the timeouts as a result of the following Win95 bug:
  577. //
  578. // The cancelling of mmTimers is non-deterministic. That is, when calling cancel, you cannot
  579. // tell from the return code whether the timer ran, was cancelled or is still going to run.
  580. // Since we use the Send as the context for timeout, we cannot dereference it until we make
  581. // sure it is still valid, since code that cancelled the send and timer may have already freed
  582. // the send memory. We place the sends being timed out on a list and scan the list for the
  583. // send before we use it. If we don't find the send on the list, we ignore the timeout.
  584. //
  585. // Also note, this workaround is not very expensive. The linked list is in the order timeouts
  586. // were scheduled, so generally if the links are approximately the same speed, timeouts will
  587. // be similiar so the context being checked should be near the beginning of the list.
  588. CRITICAL_SECTION g_SendTimeoutListLock;
  589. BILINK g_BilinkSendTimeoutList;
  590. void CALLBACK RetryTimerExpiry( UINT_PTR uID, UINT uMsg, DWORD_PTR dwUser, DWORD dw1, DWORD dw2 )
  591. {
  592. PSEND pSend=(PSEND)(dwUser), pSendWalker;
  593. UINT tWaiting;
  594. BILINK *pBilink;
  595. UINT bFound=FALSE;
  596. DPF(9,"RetryTimerExpiry: %x, expecting %x, pSend %x\n",uID, pSend->uRetryTimer, pSend);
  597. tWaiting=timeGetTime();
  598. // Scan the list of waiting sends to see if this one is still waiting for a timeout.
  599. Lock(&g_SendTimeoutListLock);
  600. pBilink=g_BilinkSendTimeoutList.next;
  601. while(pBilink!=&g_BilinkSendTimeoutList){
  602. pSendWalker=CONTAINING_RECORD(pBilink, SEND, TimeoutList);
  603. pBilink=pBilink->next;
  604. if(pSendWalker == pSend){
  605. if(pSend->uRetryTimer==uID){
  606. Delete(&pSend->TimeoutList);
  607. InitBilink(&pSend->TimeoutList); // avoids DecSendRef having to know state of bilink.
  608. Unlock(&g_SendTimeoutListLock);
  609. // it is ok to call AddSendRef here without the sessionlock because
  610. // there is no way we could be adding the session reference. If
  611. // the refcount is 0, it can only mean the send is already cleaning up
  612. // and we won't try to take the session locks so there is no lock
  613. // ordering problem.
  614. bFound=AddSendRef(pSend,1); // note bFound set to Refcount on send
  615. goto skip_unlock;
  616. }
  617. }
  618. }
  619. Unlock(&g_SendTimeoutListLock);
  620. skip_unlock:
  621. if(bFound){
  622. if(pSend->tRetryScheduled - pSend->tScheduled > 500){
  623. DWORD tm=timeGetTime();
  624. if(tm - pSend->tScheduled < 100 ){
  625. DPF(9,"RETRY TIMER EXPIRY IS WAY TOO EARLY, EXPECTED AT %x ACTUALLY AT %x\n",pSend->tRetryScheduled, tm);
  626. DEBUG_BREAK();
  627. }
  628. }
  629. DPF(9,"RetryTimerExpiry: Waiting For Send Lock...\n");
  630. Lock(&pSend->SendLock);
  631. DPF(9,"RetryTimerExpiry: Got SendLock\n");
  632. if(pSend->uRetryTimer==uID){ // check again, may be cancelled.
  633. pSend->uRetryTimer=0;
  634. switch(pSend->SendState)
  635. {
  636. case Start:
  637. case Sending:
  638. ASSERT(0);
  639. case Done:
  640. break;
  641. case WaitingForAck:
  642. pSend->RetryCount++;
  643. tWaiting-=pSend->tLastACK;
  644. #ifdef DEBUG
  645. {
  646. static int retries;
  647. IN_WRITESTATS InWS;
  648. memset((PVOID)&InWS,0xFF,sizeof(IN_WRITESTATS));
  649. InWS.stat_USER1=((retries++)%20)+1;
  650. DbgWriteStats(&InWS);
  651. }
  652. #endif
  653. if(tWaiting > pSend->pSession->MaxDropTime ||
  654. (pSend->RetryCount > pSend->pSession->MaxRetry && tWaiting > pSend->pSession->MinDropTime)
  655. )
  656. {
  657. DPF(8,"Send %x Timed Out, tWaiting: %d RetryCount: %d\n",pSend,tWaiting,pSend->RetryCount);
  658. pSend->SendState=TimedOut;
  659. } else {
  660. DPF(9,"Timer expired, retrying send %x RetryCount= %d\n",pSend,pSend->RetryCount);
  661. //pSend->NACKMask|=(1<<(pSend->NS-pSend->NR))-1;
  662. pSend->NACKMask |= 1; // just retry 1 frame.
  663. ASSERT_NACKMask(pSend);
  664. pSend->SendState=ReadyToSend;
  665. }
  666. SetEvent(pSend->pSession->pProtocol->m_hSendEvent);
  667. break;
  668. case Throttled:
  669. break;
  670. case ReadyToSend:
  671. default:
  672. break;
  673. }
  674. }
  675. Unlock(&pSend->SendLock);
  676. DecSendRef(pSend->pSession->pProtocol, pSend);
  677. }
  678. }
  679. VOID StartRetryTimer(PSEND pSend)
  680. {
  681. UINT FptLatency;
  682. UINT tLatencyLong;
  683. UINT FptDev;
  684. UINT tRetry;
  685. FptLatency=max(pSend->pSession->FpLocalAverageLatency,pSend->pSession->LastLatency);
  686. FptDev=pSend->pSession->FpLocalAvgDeviation;
  687. tRetry=unFp(FptLatency+3*FptDev);//Latency +3 average deviations
  688. tLatencyLong=unFp(pSend->pSession->FpAverageLatency);
  689. // Sometimes stddev of latency gets badly skewed by the serial driver
  690. // taking a long time to complete locally, avoid setting retry time
  691. // too high by limiting to 2x the long latency average.
  692. if(tLatencyLong > 100 && tRetry > 2*max(tLatencyLong,unFp(FptLatency))){
  693. tRetry = 2*tLatencyLong;
  694. }
  695. if(pSend->RetryCount > 3){
  696. if(pSend->pSession->RemoteBytesReceived==0){
  697. // haven't spoken to remote yet, may be waiting for nametable, so back down hard.
  698. tRetry=5000;
  699. } else if (tRetry < 1000){
  700. // taking a lot of retries to get response, back down.
  701. tRetry=1000;
  702. }
  703. }
  704. if(tRetry < 50){
  705. tRetry=50;
  706. }
  707. ASSERT(tRetry);
  708. if(tRetry > 30000){
  709. DPF(0,"RETRY TIMER REQUESTING %d seconds?\n",tRetry);
  710. }
  711. if(!pSend->uRetryTimer){
  712. Lock(&g_SendTimeoutListLock);
  713. DPF(9,"Setting Retry Timer of %d ms\n", tRetry);
  714. pSend->uRetryTimer=SetMyTimer((tRetry)?(tRetry):1,(tRetry>>2)+1,RetryTimerExpiry,(ULONG_PTR) pSend,&pSend->TimerUnique);
  715. if(pSend->uRetryTimer){
  716. pSend->tScheduled = timeGetTime();
  717. pSend->tRetryScheduled = pSend->tScheduled+tRetry;
  718. InsertBefore(&pSend->TimeoutList, &g_BilinkSendTimeoutList);
  719. } else {
  720. DPF(0,"Start Retry Timer failed to schedule a timer with tRetry=%d for pSend %x\n",tRetry,pSend);
  721. DEBUG_BREAK();
  722. }
  723. DPF(9,"Started Retry Timer %x\n",pSend->uRetryTimer);
  724. Unlock(&g_SendTimeoutListLock);
  725. if(!pSend->uRetryTimer){
  726. ASSERT(0);
  727. }
  728. } else {
  729. ASSERT(0);
  730. }
  731. }
  732. // Called with all necessary locks held.
  733. VOID TimeOutSession(PSESSION pSession)
  734. {
  735. PSEND pSend;
  736. BILINK *pBilink;
  737. UINT nSignalsRequired=0;
  738. // Mark Session Timed out.
  739. pSession->eState=Closing;
  740. // Mark all sends Timed out.
  741. pBilink=pSession->SendQ.next;
  742. while(pBilink != &pSession->SendQ){
  743. pSend=CONTAINING_RECORD(pBilink, SEND, SendQ);
  744. pBilink=pBilink->next;
  745. DPF(9,"TimeOutSession: Force Timing Out Send %x, State %d\n",pSend, pSend->SendState);
  746. switch(pSend->SendState){
  747. case Start:
  748. case Throttled:
  749. case ReadyToSend:
  750. DPF(9,"TimeOutSession: Moving to TimedOut, should be safe\n");
  751. pSend->SendState=TimedOut;
  752. nSignalsRequired += 1;
  753. break;
  754. case Sending:
  755. //BUGBUG: can we even get here? If we can
  756. // the send will reset the retry count and tLastACK.
  757. DPF(9,"TimeOutSession: ALLOWING TimeOut to cancel.(could take 15 secs)\n");
  758. pSend->RetryCount=pSession->MaxRetry;
  759. pSend->tLastACK=timeGetTime()-pSession->MinDropTime;
  760. break;
  761. case WaitingForAck:
  762. DPF(9,"TimeOutSession: Canceling timer and making TimedOut\n");
  763. CancelRetryTimer(pSend);
  764. pSend->SendState = TimedOut;
  765. nSignalsRequired += 1;
  766. break;
  767. case WaitingForId:
  768. // Note, this means we can get signals for ids that aren't used.
  769. DPF(9,"TimeOutSession: Timing Out Send Waiting for ID, GetNextMessageToSend may fail, this is OK\n");
  770. pSend->SendState=TimedOut;
  771. if(pSend->dwFlags & DPSEND_GUARANTEED){
  772. InterlockedDecrement(&pSession->nWaitingForMessageid);
  773. } else {
  774. InterlockedDecrement(&pSession->nWaitingForDGMessageid);
  775. }
  776. nSignalsRequired += 1;
  777. break;
  778. case TimedOut:
  779. case Done:
  780. DPF(9,"TimeOutSession: Send already done or timed out, doesn't need our help\n");
  781. break;
  782. default:
  783. DPF(0,"TimeOutSession, pSession %x found Send %x in Wierd State %d\n",pSession,pSend,pSend->SendState);
  784. ASSERT(0);
  785. break;
  786. } /* switch */
  787. } /* while */
  788. // Create enough signals to process timed out sends.
  789. DPF(9,"Signalling SendQ %d items to process\n",nSignalsRequired);
  790. SetEvent(pSession->pProtocol->m_hSendEvent);
  791. }
  792. UINT WrapSend(PPROTOCOL pProtocol, PSEND pSend, PBUFFER pBuffer)
  793. {
  794. PUCHAR pMessage,pMessageStart;
  795. DWORD dwWrapSize=0;
  796. DWORD dwIdTo=0;
  797. DWORD dwIdFrom=0;
  798. pMessageStart = &pBuffer->pData[pProtocol->m_dwSPHeaderSize];
  799. pMessage = pMessageStart;
  800. dwIdFrom = pSend->wIdFrom;
  801. dwIdTo = pSend->wIdTo;
  802. if(dwIdFrom==0x70){ // avoid looking like a system message 'play'
  803. dwIdFrom=0xFFFF;
  804. }
  805. if(dwIdFrom){
  806. while(dwIdFrom){
  807. *pMessage=(UCHAR)(dwIdFrom & 0x7F);
  808. dwIdFrom >>= 7;
  809. if(dwIdFrom){
  810. *pMessage|=0x80;
  811. }
  812. pMessage++;
  813. }
  814. } else {
  815. *(pMessage++)=0;
  816. }
  817. if(dwIdTo){
  818. while(dwIdTo){
  819. *pMessage=(UCHAR)(dwIdTo & 0x7F);
  820. dwIdTo >>= 7;
  821. if(dwIdTo){
  822. *pMessage|=0x80;
  823. }
  824. pMessage++;
  825. }
  826. } else {
  827. *(pMessage++)=0;
  828. }
  829. #if 0 // a-josbor: for debugging only. I left it in in case we ever needed it again
  830. ExtractProtocolIds(pMessageStart, &dwIdFrom, &dwIdTo);
  831. ASSERT(dwIdFrom == pSend->wIdFrom);
  832. ASSERT(dwIdTo == pSend->wIdTo);
  833. #endif
  834. return (UINT)(pMessage-pMessageStart);
  835. }
  836. #define DROP 0
  837. #if DROP
  838. // 1 for send, 0 for drop.
  839. char droparray[]= {
  840. 1,1,1,0,0,0,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,1,1,1,1,0,0,1,1,1,1,1,1,0,0,0,0};
  841. UINT dropindex=0;
  842. #endif
  843. VOID CALLBACK UnThrottle(UINT_PTR uID, UINT uMsg, DWORD_PTR dwUser, DWORD dw1, DWORD dw2)
  844. {
  845. PSESSION pSession=(PSESSION)dwUser;
  846. UINT tMissedBy; // how long we missed the throttle by.
  847. DWORD tm;
  848. Lock(&pSession->SessionLock);
  849. tm=timeGetTime();
  850. tMissedBy = tm-pSession->tNextSend;
  851. if( (int)tMissedBy > 0){
  852. pSession->FpAvgUnThrottleTime -= pSession->FpAvgUnThrottleTime >> 4;
  853. pSession->FpAvgUnThrottleTime += (Fp(tMissedBy) >> 4);
  854. DPF(9,"Missed by: %d ms Avg Unthrottle Miss %d.%d ms\n", tMissedBy, pSession->FpAvgUnThrottleTime >> 8, (((pSession->FpAvgUnThrottleTime&0xFF)*100)/256) );
  855. }
  856. pSession->uUnThrottle=0;
  857. pSession->dwFlags |= SESSION_UNTHROTTLED;
  858. pSession->pProtocol->m_bRescanQueue=TRUE; // tell send routine to restart scan.
  859. DPF(9,"Unthrottling Session %x at %d\n",pSession, timeGetTime());
  860. Unlock(&pSession->SessionLock);
  861. SetEvent(pSession->pProtocol->m_hSendEvent);
  862. DecSessionRef(pSession);
  863. }
  864. VOID Throttle( PSESSION pSession, DWORD tm )
  865. {
  866. DWORD tmDelta;
  867. Lock(&pSession->SessionLock);
  868. pSession->bhitThrottle=TRUE;
  869. pSession->dwFlags |= SESSION_THROTTLED;
  870. tmDelta = pSession->tNextSend - tm;
  871. if((INT)tmDelta < 0){
  872. tmDelta=1;
  873. }
  874. DPF(9,"Throttling pSession %x for %d ms (until %d)\n",pSession, tmDelta,pSession->tNextSend);
  875. pSession->RefCount++;
  876. pSession->uUnThrottle = SetMyTimer(tmDelta, (tmDelta>>2)?(tmDelta>>2):1, UnThrottle, (DWORD_PTR)pSession, &pSession->UnThrottleUnique);
  877. if(!pSession->uUnThrottle){
  878. DPF(0,"UH OH failed to schedule unthrottle event\n");
  879. DEBUG_BREAK();
  880. }
  881. Unlock(&pSession->SessionLock);
  882. #ifdef DEBUG
  883. {
  884. static int throttlecounter;
  885. IN_WRITESTATS InWS;
  886. memset((PVOID)&InWS,0xFF,sizeof(IN_WRITESTATS));
  887. InWS.stat_USER4=((throttlecounter++)%20)+1;
  888. DbgWriteStats(&InWS);
  889. }
  890. #endif
  891. }
  892. // Given the current time, the bandwidth we are throttling to and the length of the packet we are sending,
  893. // calculate the next time we are allowed to send. Also keep a residue from this calculation so that
  894. // we don't wind up using excessive bandwidth due to rounding, the residue from the last calculation is
  895. // used in this calculation.
  896. // Absolute flag means set the next send time relative to tm regardless
  897. VOID UpdateSendTime(PSESSION pSession, DWORD Len, DWORD tm, BOOL fAbsolute)
  898. {
  899. #define SendRate pSession->SendRateThrottle
  900. #define Residue pSession->tNextSendResidue
  901. #define tNext pSession->tNextSend
  902. DWORD tFrame; // amount of time this frame will take on the wire.
  903. tFrame = (Len+Residue)*1000 / SendRate; // rate is bps, but want to calc bpms, so (Len+Residue)*1000
  904. Residue = (Len+Residue) - (tFrame * SendRate)/1000 ;
  905. ASSERT(!(Residue&0x80000000)); // residue better be +ve
  906. if(fAbsolute || (INT)(tNext - tm) < 0){
  907. // tNext is less than tm, so calc based on tm.
  908. tNext = tm+tFrame;
  909. } else {
  910. // tNext is greater than tm, so add more wait.
  911. tNext = tNext+tFrame;
  912. }
  913. DPF(8,"UpdateSendTime time %d, tFrame %d, Residue %d, tNext %d",tm,tFrame,Residue,tNext);
  914. #undef SendRate
  915. #undef Residue
  916. #undef tNext
  917. }
  918. //CHAR Drop[]={0,0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,1,1,0,0};
  919. //DWORD DropSize = sizeof(Drop);
  920. //DWORD iDrop=0;
  921. // AO - added contraint, 1 send thread per session. Since this is not enforced by GetNextMessageToSend
  922. // 5-21-98 we are effectively restricted to 1 send thread for the protocol. We can fix this by adding
  923. // a sending state on the session and having GetNextMessageToSend skip sending sessions.
  924. HRESULT ReliableSend(PPROTOCOL pProtocol, PSEND pSend)
  925. {
  926. #define pBigFrame ((pPacket2)(pFrame))
  927. HRESULT hr;
  928. PBUFFER pBuffer;
  929. pPacket1 pFrame;
  930. PUCHAR pFrameData;
  931. UINT FrameDataLen;
  932. UINT FrameTotalLen;
  933. UINT MaxFrameLen;
  934. UINT FrameHeaderLen;
  935. UINT nFramesOutstanding;
  936. UINT nFramesToSend;
  937. UINT msk;
  938. UINT shift;
  939. UINT WrapSize;
  940. UINT DPWrapSize; // DirectPlay wrapping only. ([[DPLAY 0xFF]|],From,To)
  941. DWORD tm=0; // The time, 0 if we haven't retrieved it yet.
  942. DWORD tmExit=0;
  943. BOOL bExitEarly=FALSE;
  944. DPSP_SENDDATA SendData;
  945. //
  946. // Sending algorithm is designed to handle NACKs only (there
  947. // is no special case for sending data the first time). So
  948. // We send by making it look like the frames we want to send
  949. // have been NACKed. Every frame we send, we clear the NACK
  950. // bit for. If an actual NACK comes in, the bit is set.
  951. // When an ACK comes in, we shift the NACK and ACK masks
  952. // nACK-NR and if applicable, set new NACK bits.
  953. //
  954. Lock(&pSend->SendLock);
  955. if(pSend->SendState == Done){
  956. goto unlock_exit;
  957. }
  958. nFramesOutstanding=(pSend->NS-pSend->NR);
  959. if( nFramesOutstanding < pSend->WindowSize){
  960. // Set NACK bits up to WindowSize (unless over nFrames);
  961. nFramesToSend=pSend->WindowSize-nFramesOutstanding;
  962. if(nFramesToSend > pSend->nFrames-pSend->NS){
  963. nFramesToSend=pSend->nFrames-pSend->NS;
  964. }
  965. pSend->NACKMask |= ((1<<nFramesToSend)-1)<<nFramesOutstanding;
  966. pSend->OpenWindow = nFramesOutstanding + nFramesToSend;
  967. DPF(9,"Send: pSend->NACKMask %x, OpenWindow %d\n",pSend->NACKMask, pSend->OpenWindow);
  968. }
  969. tmExit=timeGetTime()+1000; // always blow out of here in 1 second max.
  970. Reload:
  971. msk=1;
  972. shift=0;
  973. MaxFrameLen=pSend->FrameSize;
  974. while(pSend->NACKMask){
  975. ASSERT_NACKMask(pSend);
  976. tm=timeGetTime(); // Getting the time is relatively expensive, so we do it once here and pass it around.
  977. if(((INT)tm - (INT)tmExit) > 0){
  978. DPF(0,"Breaking Out of Send Loop due to expiry of timer\n");
  979. bExitEarly=TRUE;
  980. break;
  981. }
  982. if((tm+unFp(pSend->pSession->FpAvgUnThrottleTime)-pSend->pSession->tNextSend) & 0x80000000){
  983. // we're still too early to do the next send, so throttled this session.
  984. goto throttle_exit;
  985. }
  986. if(pSend->NACKMask & msk){
  987. pBuffer=GetFrameBuffer(MaxFrameLen+pProtocol->m_dwSPHeaderSize+MAX_SEND_HEADER);
  988. if(!pBuffer){
  989. pSend->SendState=ReadyToSend;
  990. SetEvent(pSend->pSession->pProtocol->m_hSendEvent); // keep the queue rolling.
  991. hr=DPERR_PENDING;
  992. goto exit;
  993. }
  994. WrapSize=pProtocol->m_dwSPHeaderSize; // leave space for SP header.
  995. DPWrapSize=WrapSend(pProtocol, pSend, pBuffer); // fill in out address wrapping
  996. WrapSize+=DPWrapSize;
  997. pFrame=(pPacket1)&pBuffer->pData[WrapSize]; // protocol header after wrapping
  998. if(pSend->fSendSmall){
  999. pFrameData=&pFrame->data[0];
  1000. FrameHeaderLen=(UINT)(pFrameData-(PUCHAR)pFrame);
  1001. } else {
  1002. pFrameData=&pBigFrame->data[0];
  1003. FrameHeaderLen=(UINT)(pFrameData-(PUCHAR)pFrame);
  1004. }
  1005. // For calculating nFrames, we assumed MAX_SEND_HEADER, subtract out the unused portion
  1006. // so we don't put to much data in the frame and mess up the accounting.
  1007. pBuffer->len-=(MAX_SEND_HEADER-(FrameHeaderLen+DPWrapSize));
  1008. FrameHeaderLen += WrapSize; // now include wrapping and SPheader space.
  1009. FrameDataLen=CopyDataToFrame(pFrameData, pBuffer->len-FrameHeaderLen, pSend, shift);
  1010. if(!pSend->FrameDataLen){
  1011. pSend->FrameDataLen=FrameDataLen;
  1012. }
  1013. FrameTotalLen=FrameDataLen+FrameHeaderLen;
  1014. pSend->BytesThisSend=FrameTotalLen-WrapSize; //only counting payload
  1015. // Do that protocol thing
  1016. BuildHeader(pSend,pFrame,shift,tm);
  1017. // we know we don't have to check here since we have a reference
  1018. // from finding the send to work on ON the send queue. So it
  1019. // can't go away til we return from this function.
  1020. hr=AddSendRef(pSend,1);
  1021. ASSERT(hr);
  1022. if(pSend->NR+shift >= pSend->NS){
  1023. pSend->NS = pSend->NR+shift+1;
  1024. }
  1025. pSend->NACKMask &= ~msk;
  1026. DPF(9,"S %2x %2x %2x\n",pBuffer->pData[0], pBuffer->pData[1], pBuffer->pData[2]);
  1027. // Update the next time we are allowed to send.
  1028. UpdateSendTime(pSend->pSession, pSend->BytesThisSend, tm, FALSE);
  1029. Unlock(&pSend->SendLock);
  1030. ASSERT(!(FrameTotalLen &0xFFFF0000));
  1031. // Send this puppy...
  1032. SendData.dwFlags = pSend->dwFlags & ~DPSEND_GUARANTEED;
  1033. SendData.idPlayerTo = pSend->idTo;
  1034. SendData.idPlayerFrom = pSend->idFrom;
  1035. SendData.lpMessage = pBuffer->pData;
  1036. SendData.dwMessageSize = FrameTotalLen;
  1037. SendData.bSystemMessage = 0;
  1038. SendData.lpISP = pProtocol->m_lpISP;
  1039. ENTER_DPLAY();
  1040. Lock(&pProtocol->m_SPLock);
  1041. // if(!(Drop[(iDrop++)%DropSize])){//BUGBUG: DEBUG ONLY!
  1042. hr=CALLSP(pProtocol->m_lpDPlay->pcbSPCallbacks->Send,&SendData);
  1043. // }
  1044. Unlock(&pProtocol->m_SPLock);
  1045. LEAVE_DPLAY();
  1046. if(hr!=DPERR_PENDING){
  1047. if(!DecSendRef(pProtocol, pSend)){
  1048. ASSERT(0);
  1049. hr=DPERR_PENDING;
  1050. goto exit;
  1051. }
  1052. FreeFrameBuffer(pBuffer);
  1053. }
  1054. Lock(&pSend->SendLock);
  1055. } /* endif (pSend->NACKMask & msk) */
  1056. if(pSend->fUpdate){
  1057. pSend->fUpdate=FALSE;
  1058. goto Reload;
  1059. }
  1060. // Check if we are past windowsize, if so roll back the mask
  1061. // Also if there are earlier bits to ACK.
  1062. if((msk<<=1UL) >= (1UL<<pSend->WindowSize)){
  1063. msk=1;
  1064. shift=0;
  1065. } else {
  1066. shift++;
  1067. }
  1068. } /* end while (pSend->NACKMask) */
  1069. if(pSend->SendState != Done){
  1070. if(bExitEarly){
  1071. pSend->SendState=ReadyToSend;
  1072. SetEvent(pSend->pSession->pProtocol->m_hSendEvent); // keep the queue rolling.
  1073. } else {
  1074. pSend->SendState=WaitingForAck;
  1075. StartRetryTimer(pSend);
  1076. }
  1077. }
  1078. unlock_exit:
  1079. Unlock(&pSend->SendLock);
  1080. hr=DPERR_PENDING; // Reliable sends are completed by the ACK.
  1081. exit:
  1082. return hr;
  1083. throttle_exit:
  1084. hr=DPERR_PENDING;
  1085. pSend->SendState=Throttled;
  1086. Unlock(&pSend->SendLock);
  1087. Throttle(pSend->pSession, tm);
  1088. return hr;
  1089. #undef pBigFrame
  1090. }
  1091. // TRUE, didn't reach end, FALSE, no more to send.
  1092. BOOL AdvanceSend(PSEND pSend, UINT AckedLen)
  1093. {
  1094. BOOL rc=TRUE;
  1095. // quick short circuit for small messages.
  1096. if(AckedLen+pSend->SendOffset==pSend->MessageSize){
  1097. rc=FALSE;
  1098. goto exit;
  1099. }
  1100. if(pSend->SendOffset+AckedLen > pSend->MessageSize){
  1101. AckedLen=pSend->MessageSize-pSend->SendOffset;
  1102. }
  1103. pSend->SendOffset+=AckedLen;
  1104. while(AckedLen){
  1105. if(pSend->pCurrentBuffer->len-pSend->CurrentBufferOffset >= AckedLen){
  1106. pSend->CurrentBufferOffset+=AckedLen;
  1107. rc=TRUE;
  1108. break;
  1109. } else {
  1110. AckedLen -= (pSend->pCurrentBuffer->len-pSend->CurrentBufferOffset);
  1111. pSend->pCurrentBuffer=pSend->pCurrentBuffer->pNext;
  1112. pSend->CurrentBufferOffset=0;
  1113. rc=FALSE;
  1114. }
  1115. }
  1116. exit:
  1117. return rc;
  1118. }
  1119. HRESULT DGSend(PPROTOCOL pProtocol, PSEND pSend)
  1120. {
  1121. #define pBigFrame ((pPacket2)(pFrame))
  1122. PBUFFER pBuffer;
  1123. pPacket1 pFrame;
  1124. PUCHAR pFrameData;
  1125. UINT FrameDataLen;
  1126. UINT FrameHeaderLen;
  1127. UINT FrameTotalLen;
  1128. UINT MaxFrameLen;
  1129. UINT nFramesToSend;
  1130. UINT WrapSize;
  1131. UINT DPWrapSize; // DirectPlay wrapping only. ([[DPLAY 0xFF]|],From,To)
  1132. DPSP_SENDDATA SendData;
  1133. DWORD tm;
  1134. HRESULT hr;
  1135. Lock(&pSend->SendLock);
  1136. nFramesToSend=pSend->nFrames-pSend->NR;
  1137. MaxFrameLen=pSend->FrameSize;
  1138. while(nFramesToSend){
  1139. tm=timeGetTime(); // Getting the time is relatively expensive, so we do it once here and pass it around.
  1140. if((tm+unFp(pSend->pSession->FpAvgUnThrottleTime)-pSend->pSession->tNextSend) & 0x80000000){
  1141. // we're still too early to do the next send, so throttled this session.
  1142. goto throttle_exit;
  1143. }
  1144. pBuffer=GetFrameBuffer(MaxFrameLen+pProtocol->m_dwSPHeaderSize+MAX_SEND_HEADER);
  1145. if(!pBuffer){
  1146. hr=DPERR_PENDING;
  1147. goto exit;
  1148. }
  1149. WrapSize=pProtocol->m_dwSPHeaderSize; // leave space for SP header.
  1150. DPWrapSize=WrapSend(pProtocol, pSend, pBuffer); // fill in out address wrapping
  1151. WrapSize+=DPWrapSize;
  1152. pFrame=(pPacket1)&pBuffer->pData[WrapSize]; // protocol header after wrapping
  1153. if(pSend->fSendSmall){
  1154. pFrameData=&pFrame->data[0];
  1155. FrameHeaderLen=(UINT)(pFrameData-(PUCHAR)pFrame);
  1156. } else {
  1157. pFrameData=&pBigFrame->data[0];
  1158. FrameHeaderLen=(UINT)(pFrameData-(PUCHAR)pFrame);
  1159. }
  1160. // For calculating nFrames, we assumed MAX_SEND_HEADER, subtract out the unused portion
  1161. // so we don't put to much data in the frame and mess up the accounting.
  1162. pBuffer->len-=(MAX_SEND_HEADER-(FrameHeaderLen+DPWrapSize));
  1163. FrameHeaderLen += WrapSize; // now include wrapping and SPheader space.
  1164. FrameDataLen=CopyDataToFrame(pFrameData, pBuffer->len-FrameHeaderLen, pSend, 0);
  1165. FrameTotalLen=FrameDataLen+FrameHeaderLen;
  1166. pSend->BytesThisSend=FrameTotalLen-WrapSize; //only counting payload
  1167. // Do that protocol thing
  1168. BuildHeader(pSend,pFrame,0,tm);
  1169. //AddSendRef(pSend,1); //already locked, so just add one.
  1170. ASSERT(pSend->RefCount); //verifies ++ below is ok.
  1171. InterlockedIncrement((PLONG)&pSend->RefCount);
  1172. UpdateSendTime(pSend->pSession,pSend->BytesThisSend,tm,FALSE);
  1173. Unlock(&pSend->SendLock);
  1174. // Send this puppy...
  1175. ASSERT(!(pSend->dwFlags & DPSEND_GUARANTEED));
  1176. SendData.dwFlags = pSend->dwFlags;
  1177. SendData.idPlayerTo = pSend->idTo;
  1178. SendData.idPlayerFrom = pSend->idFrom;
  1179. SendData.lpMessage = pBuffer->pData;
  1180. SendData.dwMessageSize = FrameTotalLen;
  1181. SendData.bSystemMessage = 0;
  1182. SendData.lpISP = pProtocol->m_lpISP;
  1183. ENTER_DPLAY();
  1184. Lock(&pProtocol->m_SPLock);
  1185. hr=CALLSP(pProtocol->m_lpDPlay->pcbSPCallbacks->Send,&SendData);
  1186. Unlock(&pProtocol->m_SPLock);
  1187. LEAVE_DPLAY();
  1188. if(hr!=DPERR_PENDING){
  1189. if(!DecSendRef(pProtocol,pSend)){
  1190. // No async send support in Dplay at lower edge,
  1191. // so we should never get here!
  1192. ASSERT(0);
  1193. }
  1194. FreeFrameBuffer(pBuffer);
  1195. }
  1196. Lock(&pSend->SendLock);
  1197. nFramesToSend--;
  1198. AdvanceSend(pSend,FrameDataLen);
  1199. pSend->NR++;
  1200. pSend->NS++;
  1201. }
  1202. Unlock(&pSend->SendLock);
  1203. DGCompleteSend(pSend);
  1204. hr=DPERR_PENDING; // everything was sent, but already completed by DGCompleteSend
  1205. exit:
  1206. return hr;
  1207. throttle_exit:
  1208. hr=DPERR_PENDING;
  1209. pSend->SendState=Throttled;
  1210. Unlock(&pSend->SendLock);
  1211. Throttle(pSend->pSession, tm);
  1212. return hr;
  1213. #undef pBigFrame
  1214. }
  1215. BOOL DGCompleteSend(PSEND pSend)
  1216. {
  1217. UINT bit;
  1218. UINT MsgMask;
  1219. PSESSION pSession;
  1220. pSend->SendState=Done;
  1221. pSession=pSend->pSession;
  1222. Lock(&pSession->SessionLock);
  1223. if(!pSend->fSendSmall){
  1224. MsgMask = 0xFFFF;
  1225. } else {
  1226. MsgMask =0xFF;
  1227. }
  1228. DPF(9,"CompleteSend\n");
  1229. //
  1230. // Update Session information for completion of this send.
  1231. //
  1232. bit = ((pSend->messageid-pSession->DGFirstMsg) & MsgMask)-1;
  1233. // clear the message mask bit for the completed send.
  1234. if(pSession->DGOutMsgMask & 1<<bit){
  1235. pSession->DGOutMsgMask &= ~(1<<bit);
  1236. } else {
  1237. return FALSE;
  1238. }
  1239. // slide the first message count forward for each low
  1240. // bit clear in Message mask.
  1241. while(pSession->DGLastMsg-pSession->DGFirstMsg){
  1242. if(!(pSession->DGOutMsgMask & 1)){
  1243. pSession->DGFirstMsg=(pSession->DGFirstMsg+1)&MsgMask;
  1244. pSession->DGOutMsgMask >>= 1;
  1245. if(pSession->nWaitingForDGMessageid){
  1246. pSession->pProtocol->m_bRescanQueue=TRUE;
  1247. SetEvent(pSession->pProtocol->m_hSendEvent);
  1248. }
  1249. } else {
  1250. break;
  1251. }
  1252. }
  1253. //
  1254. // Return the Send to the pool and complete the waiting client.
  1255. //
  1256. Unlock(&pSession->SessionLock);
  1257. ASSERT(pSend->RefCount);
  1258. // Send completed, do completion
  1259. DoSendCompletion(pSend, DP_OK);
  1260. DecSendRef(pSession->pProtocol, pSend); // for completion.
  1261. return TRUE;
  1262. }
  1263. // Send a fully formatted System packet (ACK, nACK, etc..)
  1264. HRESULT SystemSend(PPROTOCOL pProtocol, PSEND pSend)
  1265. {
  1266. PBUFFER pBuffer;
  1267. DPSP_SENDDATA SendData;
  1268. HRESULT hr;
  1269. PSESSION pSession;
  1270. pBuffer=pSend->pMessage;
  1271. DPF(9,"System Send pBuffer %x pData %x len %d, idTo %x \n",pBuffer, pBuffer->pData, pBuffer->len, pSend->idTo);
  1272. pSession=GetSysSessionByIndex(pProtocol, pSend->wIdTo); // adds a ref on session.
  1273. // |
  1274. if(!pSession){ // |
  1275. goto exit; // |
  1276. } // |
  1277. // |
  1278. SendData.idPlayerTo = pSession->dpid; // |
  1279. DecSessionRef(pSession); // <----+ frees ref here.
  1280. // Send this puppy...
  1281. SendData.dwFlags = 0;
  1282. SendData.idPlayerFrom = pSend->idFrom;
  1283. SendData.lpMessage = pBuffer->pData;
  1284. SendData.dwMessageSize = pBuffer->len;
  1285. SendData.bSystemMessage = 0;
  1286. SendData.lpISP = pProtocol->m_lpISP;
  1287. ENTER_DPLAY();
  1288. Lock(&pProtocol->m_SPLock);
  1289. hr=CALLSP(pProtocol->m_lpDPlay->pcbSPCallbacks->Send,&SendData);
  1290. Unlock(&pProtocol->m_SPLock);
  1291. LEAVE_DPLAY();
  1292. #ifdef DEBUG
  1293. if(hr!=DP_OK){
  1294. DPF(0,"UNSUCCESSFUL SEND in SYSTEM SEND, hr=%x\n",hr);
  1295. }
  1296. #endif
  1297. exit:
  1298. return hr;
  1299. #undef pBigFrame
  1300. }
  1301. VOID DoSendCompletion(PSEND pSend, INT Status)
  1302. {
  1303. #ifdef DEBUG
  1304. if(Status != DP_OK){
  1305. DPF(8,"Send Error pSend %x, Status %x\n",pSend,Status);
  1306. }
  1307. #endif
  1308. if(!(pSend->dwFlags & ASEND_PROTOCOL)){
  1309. EnterCriticalSection(&pSend->pProtocol->m_SendQLock);
  1310. pSend->pProtocol->m_dwBytesPending -= pSend->MessageSize;
  1311. pSend->pProtocol->m_dwMessagesPending -= 1;
  1312. LeaveCriticalSection(&pSend->pProtocol->m_SendQLock);
  1313. }
  1314. if(pSend->pAsyncInfo){
  1315. // ASYNC_SEND
  1316. if(pSend->AsyncInfo.pStatus){
  1317. (*pSend->AsyncInfo.pStatus)=Status;
  1318. }
  1319. if(pSend->AsyncInfo.SendCallBack){
  1320. (*pSend->AsyncInfo.SendCallBack)(pSend->AsyncInfo.CallBackContext,Status);
  1321. }
  1322. if(pSend->AsyncInfo.hEvent){
  1323. DPF(9,"ASYNC_SENDCOMPLETE: Signalling Event %x\n",pSend->AsyncInfo.hEvent);
  1324. SetEvent(pSend->AsyncInfo.hEvent);
  1325. }
  1326. } else if (!(pSend->dwFlags&(ASEND_PROTOCOL|DPSEND_ASYNC))){
  1327. // SYNC_SEND
  1328. if(pSend->AsyncInfo.pStatus){
  1329. (*pSend->AsyncInfo.pStatus)=Status;
  1330. }
  1331. if(pSend->AsyncInfo.hEvent){
  1332. DPF(9,"SYNC_SENDCOMPLETE: Signalling Event %x\n",pSend->AsyncInfo.hEvent);
  1333. SetEvent(pSend->AsyncInfo.hEvent);
  1334. }
  1335. } else {
  1336. // PROTOCOL INTERNAL ASYNC SEND
  1337. if(pSend->AsyncInfo.pStatus){
  1338. (*pSend->AsyncInfo.pStatus)=Status;
  1339. }
  1340. if(pSend->AsyncInfo.SendCallBack){
  1341. (*pSend->AsyncInfo.SendCallBack)(pSend->AsyncInfo.CallBackContext,Status);
  1342. }
  1343. }
  1344. }
  1345. /*=============================================================================
  1346. SendHandler - Send the next message that needs to send packets.
  1347. Description:
  1348. Finds a message on the send queue that needs to send packets and deserves
  1349. to use some bandwidth, either because it is highest priority or because
  1350. all the higher priority messages are waiting for ACKs. Then sends as many
  1351. packets as possible before hitting the throttling limit.
  1352. Returns when the throttle limit is hit, or all packets for this send have
  1353. been sent.
  1354. Parameters:
  1355. pARPD pObj - pointer to the ARPD object to send packets on.
  1356. Return Values:
  1357. -----------------------------------------------------------------------------*/
  1358. HRESULT SendHandler(PPROTOCOL pProtocol)
  1359. {
  1360. PSEND pSend;
  1361. HRESULT hr=DP_OK;
  1362. PSESSION pSession;
  1363. // adds ref to send and session if found
  1364. pSend=GetNextMessageToSend(pProtocol);
  1365. if(!pSend){
  1366. goto nothing_to_send;
  1367. }
  1368. //DPF(4,"==>Send\n");
  1369. switch(pSend->pSession->eState){
  1370. case Open:
  1371. switch(pSend->SendState){
  1372. case Done: // Send handlers must deal with Done.
  1373. DPF(9,"Calling SendHandler for Done Send--should just return\n");
  1374. case Sending:
  1375. //
  1376. // Send as many frames as we can given the window size.
  1377. //
  1378. // Send handlers dump packets on the wire, if they expect
  1379. // to be completed later, they return PENDING in which case
  1380. // their completion handlers must do the cleanup. If they
  1381. // return OK, it means everything for this send is done and
  1382. // we do the cleanup.
  1383. if(pSend->dwFlags & ASEND_PROTOCOL){
  1384. hr=SystemSend(pProtocol, pSend);
  1385. } else if(pSend->dwFlags & DPSEND_GUARANTEE){
  1386. hr=ReliableSend(pProtocol, pSend);
  1387. } else {
  1388. hr=DGSend(pProtocol, pSend);
  1389. }
  1390. break;
  1391. case TimedOut:
  1392. hr=DPERR_CONNECTIONLOST;
  1393. pSend->SendState=Done;
  1394. break;
  1395. case Cancelled:
  1396. hr=DPERR_USERCANCEL;
  1397. pSend->SendState=Done;
  1398. break;
  1399. case UserTimeOut:
  1400. hr=DPERR_TIMEOUT;
  1401. pSend->SendState=Done;
  1402. break;
  1403. default:
  1404. DPF(0,"SendHandler: Invalid pSend %x SendState: %d\n",pSend,pSend->SendState);
  1405. ASSERT(0);
  1406. }
  1407. break;
  1408. case Closing:
  1409. switch(pSend->SendState){
  1410. case TimedOut:
  1411. DPF(8,"Returning CONNECTIONLOST on timed out message %x\n",DPERR_CONNECTIONLOST);
  1412. hr=DPERR_CONNECTIONLOST;
  1413. break;
  1414. default:
  1415. DPF(8,"Send for session in Closing State, returning %x\n",DPERR_INVALIDPLAYER);
  1416. hr=DPERR_INVALIDPLAYER;
  1417. break;
  1418. }
  1419. pSend->SendState=Done;
  1420. break;
  1421. case Closed:
  1422. DPF(8,"Send for session in Closed State, returning %x",DPERR_INVALIDPLAYER);
  1423. hr=DPERR_INVALIDPLAYER;
  1424. pSend->SendState=Done;
  1425. break;
  1426. }
  1427. //DPF(4,"<==Send Leaving,rc=%x\n",hr);
  1428. if( hr != DPERR_PENDING ){
  1429. Lock(&pSend->SendLock);
  1430. ASSERT(pSend->RefCount);
  1431. //
  1432. // Send completed, do completion
  1433. //
  1434. DoSendCompletion(pSend, hr);
  1435. Unlock(&pSend->SendLock);
  1436. DecSendRef(pProtocol, pSend); // for completion
  1437. }
  1438. pSession=pSend->pSession;
  1439. DecSendRef(pProtocol,pSend); // Balances GetNextMessageToSend
  1440. DecSessionRef(pSession); // Balances GetNextMessageToSend
  1441. return hr;
  1442. nothing_to_send:
  1443. return DPERR_NOMESSAGES;
  1444. }
  1445. /*=============================================================================
  1446. Build Header - fill in the frame header for a packet to be sent.
  1447. Description:
  1448. Enough space is left in the frame to go on the wire (pFrame) to fit the
  1449. message header. One of two types of headers is built, depending on the
  1450. value of the fSendSmall field of the packet. If fSendSmall is TRUE, a compact
  1451. header is built, this lowers overhead on slow media. If fSendSmall is FALSE
  1452. a larger header that can support larger windows is built. The header
  1453. is filled into the front of pFrame.
  1454. Parameters:
  1455. pARPD pObj - pointer to the ARPD object to send packets on.
  1456. Return Values:
  1457. -----------------------------------------------------------------------------*/
  1458. VOID BuildHeader(PSEND pSend,pPacket1 pFrame, UINT shift, DWORD tm)
  1459. {
  1460. #define pBigFrame ((pPacket2)(pFrame))
  1461. PSENDSTAT pStat=NULL;
  1462. UINT seq;
  1463. UINT bitEOM,bitSTA,bitSAK=0;
  1464. DWORD BytesSent;
  1465. DWORD RemoteBytesReceived;
  1466. DWORD tRemoteBytesReceived;
  1467. DWORD bResetBias=FALSE;
  1468. // on first frame of a message, set the start bit (STA).
  1469. if(pSend->NR+shift==0){
  1470. bitSTA=STA;
  1471. } else {
  1472. bitSTA=0;
  1473. }
  1474. // on the last frome of a message set the end of message bit (EOM)
  1475. if(pSend->nFrames==pSend->NR+shift+1){
  1476. bitEOM=EOM;
  1477. } else {
  1478. bitEOM=0;
  1479. }
  1480. // if we haven't set EOM and we haven't requested an ACK in 1/4 the
  1481. // round trip latency, set the SAK bit, to ensure we have at least
  1482. // 2 ACK's in flight for feedback to the send throttle control system.
  1483. // Don't create extra ACKs if round trip is less than 100 ms.
  1484. if(!bitEOM || !(pSend->dwFlags & DPSEND_GUARANTEED)){
  1485. DWORD tmDeltaSAK = tm-pSend->pSession->tLastSAK;
  1486. if(((int)tmDeltaSAK > 50 ) &&
  1487. (tmDeltaSAK > (unFp(pSend->pSession->FpLocalAverageLatency)>>2))
  1488. )
  1489. {
  1490. bitSAK=SAK;
  1491. }
  1492. }
  1493. // If we re-transmitted we need to send a SAK
  1494. // despite the SAK countdown.
  1495. if((!bitSAK) &&
  1496. (pSend->dwFlags & DPSEND_GUARANTEED) &&
  1497. ((pSend->NACKMask & (pSend->NACKMask-1)) == 0) &&
  1498. (bitEOM==0)
  1499. )
  1500. {
  1501. bitSAK=SAK;
  1502. }
  1503. if(!(--pSend->SAKCountDown)){
  1504. bitSAK=SAK;
  1505. }
  1506. if(bitSAK|bitEOM){
  1507. pSend->pSession->tLastSAK = tm;
  1508. pSend->SAKCountDown=pSend->SAKInterval;
  1509. pStat=GetSendStat();
  1510. }
  1511. if(pSend->fSendSmall){
  1512. pFrame->flags=CMD|bitEOM|bitSTA|bitSAK;
  1513. seq=(pSend->NR+shift+1) & pSend->SendSEQMSK;
  1514. pFrame->messageid = (byte)pSend->messageid;
  1515. pFrame->sequence = (byte)seq;
  1516. pFrame->serial = (byte)(pSend->serial++);
  1517. if(pStat){
  1518. pStat->serial=pFrame->serial;
  1519. }
  1520. } else {
  1521. pBigFrame->flags=CMD|BIG|bitEOM|bitSTA|bitSAK;
  1522. seq=((pSend->NR+shift+1) & pSend->SendSEQMSK);
  1523. pBigFrame->messageid = (word)pSend->messageid;
  1524. pBigFrame->sequence = (word)seq;
  1525. pBigFrame->serial = (byte)pSend->serial++;
  1526. if(pStat){
  1527. pStat->serial=pBigFrame->serial;
  1528. }
  1529. }
  1530. if(pSend->dwFlags & DPSEND_GUARANTEE){
  1531. pFrame->flags |= RLY;
  1532. }
  1533. // count the number of bytes we have sent.
  1534. Lock(&pSend->pSession->SessionStatLock);
  1535. pSend->pSession->BytesSent+=pSend->BytesThisSend;
  1536. BytesSent=pSend->pSession->BytesSent;
  1537. RemoteBytesReceived=pSend->pSession->RemoteBytesReceived;
  1538. tRemoteBytesReceived=pSend->pSession->tRemoteBytesReceived;
  1539. if(pStat && pSend->pSession->bResetBias &&
  1540. ((--pSend->pSession->bResetBias) == 0))
  1541. {
  1542. bResetBias=TRUE;
  1543. }
  1544. Unlock(&pSend->pSession->SessionStatLock);
  1545. if(pStat){
  1546. pStat->sequence=seq;
  1547. pStat->messageid=pSend->messageid;
  1548. pStat->tSent=tm;
  1549. pStat->LocalBytesSent=BytesSent;
  1550. pStat->RemoteBytesReceived=RemoteBytesReceived;
  1551. pStat->tRemoteBytesReceived=tRemoteBytesReceived;
  1552. pStat->bResetBias=bResetBias;
  1553. if(pSend->dwFlags & DPSEND_GUARANTEED){
  1554. InsertBefore(&pStat->StatList,&pSend->StatList);
  1555. } else {
  1556. Lock(&pSend->pSession->SessionStatLock);
  1557. InsertBefore(&pStat->StatList,&pSend->pSession->DGStatList);
  1558. Unlock(&pSend->pSession->SessionStatLock);
  1559. }
  1560. }
  1561. #undef pBigFrame
  1562. }
  1563. #if 0
  1564. // release sends waiting for an id.
  1565. VOID UnWaitSends(PSESSION pSession, DWORD fReliable)
  1566. {
  1567. BILINK *pBilink;
  1568. PSEND pSendWalker;
  1569. pBilink=pSession->SendQ.next;
  1570. while(pBilink != &pSession->SendQ){
  1571. pSendWalker=CONTAINING_RECORD(pBilink,SEND,SendQ);
  1572. pBilink=pBilink->next;
  1573. if(pSendWalker->SendState==WaitingForId){
  1574. if(fReliable){
  1575. if(pSendWalker->dwFlags & DPSEND_GUARANTEED){
  1576. pSendWalker->SendState=Start;
  1577. }
  1578. } else {
  1579. if(!(pSendWalker->dwFlags & DPSEND_GUARANTEED)){
  1580. pSendWalker->SendState=Start;
  1581. }
  1582. }
  1583. }
  1584. }
  1585. if(fReliable){
  1586. pSession->nWaitingForMessageid=0;
  1587. } else {
  1588. pSession->nWaitingForDGMessageid=0;
  1589. }
  1590. }
  1591. #endif
  1592. // Check if a datagram send can be started, if it can update teh
  1593. // Session and the Send.
  1594. BOOL StartDatagramSend(PSESSION pSession, PSEND pSend, UINT MsgIdMask)
  1595. {
  1596. BOOL bFoundSend;
  1597. UINT bit;
  1598. // BOOL bTransition=FALSE;
  1599. if((pSession->DGLastMsg-pSession->DGFirstMsg < pSession->MaxCDGSends)){
  1600. bFoundSend=TRUE;
  1601. if(pSend->SendState==WaitingForId){
  1602. InterlockedDecrement(&pSession->nWaitingForDGMessageid);
  1603. }
  1604. bit=(pSession->DGLastMsg-pSession->DGFirstMsg)&MsgIdMask;
  1605. ASSERT(bit<30);
  1606. pSession->DGOutMsgMask |= 1<<bit;
  1607. pSession->DGLastMsg =(pSession->DGLastMsg+1)&MsgIdMask;
  1608. pSend->messageid =pSession->DGLastMsg;
  1609. pSend->FrameSize =pSession->MaxPacketSize-MAX_SEND_HEADER;
  1610. // Calculate number of frames required for this send.
  1611. pSend->nFrames =(pSend->MessageSize/pSend->FrameSize);
  1612. if(pSend->FrameSize*pSend->nFrames < pSend->MessageSize || !pSend->nFrames){
  1613. pSend->nFrames++;
  1614. }
  1615. pSend->NR=0;
  1616. pSend->FrameDataLen=0;//BUGBUG: hack
  1617. pSend->fSendSmall=pSession->fSendSmallDG;
  1618. if(pSend->fSendSmall){
  1619. pSend->SendSEQMSK = 0xFF;
  1620. } else {
  1621. pSend->SendSEQMSK = 0xFFFF;
  1622. }
  1623. } else {
  1624. #if 0
  1625. if(pSession->fSendSmallDG && pSession->DGFirstMsg < 0xFF-MAX_SMALL_CSENDS) {
  1626. // Ran out of IDs, Transition to Large headers.
  1627. DPF(9,"OUT OF IDS, DATAGRAMS GOING TO LARGE FRAMES\n");
  1628. pSession->MaxCDGSends = MAX_LARGE_DG_CSENDS;
  1629. pSession->DGWindowSize = MAX_LARGE_WINDOW;
  1630. pSession->fSendSmallDG = FALSE;
  1631. bTransition=TRUE;
  1632. }
  1633. #endif
  1634. bFoundSend=FALSE;
  1635. if(pSend->SendState==Start){
  1636. InterlockedIncrement(&pSession->nWaitingForDGMessageid);
  1637. DPF(9,"StartDatagramSend: No Id's Avail: nWaitingForDGMessageid %x\n",pSession->nWaitingForDGMessageid);
  1638. pSend->SendState=WaitingForId;
  1639. #if 0
  1640. if(bTransition){
  1641. UnWaitSends(pSession,FALSE);
  1642. SetEvent(pSession->pProtocol->m_hSendEvent);
  1643. }
  1644. #endif
  1645. } else {
  1646. DPF(9,"Couldn't start datagram send on pSend %x State %d pSession %x\n",pSend,pSend->SendState,pSession);
  1647. if(pSend->SendState!=WaitingForId){
  1648. ASSERT(0);
  1649. }
  1650. }
  1651. }
  1652. return bFoundSend;
  1653. }
  1654. BOOL StartReliableSend(PSESSION pSession, PSEND pSend, UINT MsgIdMask)
  1655. {
  1656. BOOL bFoundSend;
  1657. UINT bit;
  1658. // BOOL bTransition=FALSE;
  1659. ASSERT(pSend->dwFlags & DPSEND_GUARANTEED);
  1660. if((pSession->LastMsg-pSession->FirstMsg & MsgIdMask) < pSession->MaxCSends){
  1661. DPF(9,"StartReliableSend: FirstMsg: x%x LastMsg: x%x\n",pSession->FirstMsg, pSession->LastMsg);
  1662. bFoundSend=TRUE;
  1663. if(pSend->SendState==WaitingForId){
  1664. InterlockedDecrement(&pSession->nWaitingForMessageid);
  1665. }
  1666. bit=(pSession->LastMsg-pSession->FirstMsg)&MsgIdMask;
  1667. #ifdef DEBUG
  1668. if(!(bit<pSession->MaxCSends)){
  1669. DEBUG_BREAK();
  1670. }
  1671. #endif
  1672. pSession->OutMsgMask |= 1<<bit;
  1673. pSession->LastMsg =(pSession->LastMsg+1)&MsgIdMask;
  1674. DPF(9,"StartReliableSend: pSend %x assigning id x%x\n",pSend,pSession->LastMsg);
  1675. pSend->messageid =pSession->LastMsg;
  1676. pSend->FrameSize =pSession->MaxPacketSize-MAX_SEND_HEADER;
  1677. // Calculate number of frames required for this send.
  1678. pSend->nFrames =(pSend->MessageSize/pSend->FrameSize);
  1679. if(pSend->FrameSize*pSend->nFrames < pSend->MessageSize || !pSend->nFrames){
  1680. pSend->nFrames++;
  1681. }
  1682. pSend->NR=0;
  1683. pSend->FrameDataLen=0;//BUGBUG: hack
  1684. pSend->fSendSmall=pSession->fSendSmall;
  1685. if(pSend->fSendSmall){
  1686. pSend->SendSEQMSK = 0xFF;
  1687. } else {
  1688. pSend->SendSEQMSK = 0xFFFF;
  1689. }
  1690. } else {
  1691. #if 0
  1692. if (pSession->fSendSmall && pSession->FirstMsg < 0xFF-MAX_SMALL_CSENDS){
  1693. // Ran out of IDs, Transition to Large headers - but only if we aren't going
  1694. // to confuse the wrapping code.
  1695. DPF(8,"OUT OF IDS, RELIABLE SENDS GOING TO LARGE FRAMES\n");
  1696. pSession->MaxCSends = MAX_LARGE_CSENDS;
  1697. pSession->WindowSize = MAX_LARGE_WINDOW;
  1698. pSession->fSendSmall = FALSE;
  1699. bTransition = TRUE;
  1700. }
  1701. #endif
  1702. bFoundSend=FALSE;
  1703. if(pSend->SendState==Start){
  1704. bFoundSend=FALSE;
  1705. // Reliable, waiting for id.
  1706. InterlockedIncrement(&pSession->nWaitingForMessageid);
  1707. pSend->SendState=WaitingForId;
  1708. DPF(9,"StartReliableSend: No Id's Avail: nWaitingForMessageid %x\n",pSession->nWaitingForMessageid);
  1709. #if 0
  1710. if(bTransition){
  1711. UnWaitSends(pSession,TRUE);
  1712. SetEvent(pSession->pProtocol->m_hSendEvent);
  1713. }
  1714. #endif
  1715. } else {
  1716. bFoundSend=FALSE;
  1717. DPF(9,"Couldn't start reliable send on pSend %x State %d pSession %x\n",pSend,pSend->SendState,pSession);
  1718. if(pSend->SendState!=WaitingForId){
  1719. ASSERT(0);
  1720. }
  1721. }
  1722. }
  1723. return bFoundSend;
  1724. }
  1725. BOOL CheckUserTimeOut(PSEND pSend)
  1726. {
  1727. if(pSend->dwTimeOut){
  1728. if((timeGetTime()-pSend->dwSendTime) > pSend->dwTimeOut){
  1729. pSend->SendState=UserTimeOut;
  1730. return TRUE;
  1731. }
  1732. }
  1733. return FALSE;
  1734. }
  1735. /*=============================================================================
  1736. GetNextMessageToSend
  1737. Description:
  1738. Scans the send queue for a message that is the current priority and
  1739. is in the ready to send state or throttled state (we shouldn't even
  1740. get here unless the throttle was removed.) If we find such a message
  1741. we return a pointer to the caller.
  1742. Adds a reference to the Send and the Session.
  1743. Parameters:
  1744. PPROTOCOOL pProtocol - pointer to the PROTOCOL object to send packets on.
  1745. Return Values:
  1746. NULL - no message should be sent.
  1747. PSEND - message to send.
  1748. -----------------------------------------------------------------------------*/
  1749. PSEND GetNextMessageToSend(PPROTOCOL pProtocol)
  1750. {
  1751. PSEND pSend;
  1752. BILINK *pBilink;
  1753. UINT CurrentSendPri;
  1754. BOOL bFoundSend;
  1755. PSESSION pSession;
  1756. UINT MsgIdMask;
  1757. Lock(&pProtocol->m_SendQLock);
  1758. DPF(9,"==>GetNextMessageToSend\n");
  1759. Top:
  1760. bFoundSend = FALSE;
  1761. pProtocol->m_bRescanQueue=FALSE;
  1762. if(EMPTY_BILINK(&pProtocol->m_GSendQ)){
  1763. Unlock(&pProtocol->m_SendQLock);
  1764. DPF(9,"GetNextMessageToSend: called with nothing in queue, heading for the door.\n");
  1765. goto exit;
  1766. }
  1767. pBilink = pProtocol->m_GSendQ.next;
  1768. pSend = CONTAINING_RECORD(pBilink, SEND, m_GSendQ);
  1769. CurrentSendPri = pSend->Priority;
  1770. while(pBilink != &pProtocol->m_GSendQ){
  1771. pSession=pSend->pSession;
  1772. ASSERT_SIGN(pSession, SESSION_SIGN);
  1773. Lock(&pSession->SessionLock);
  1774. if(pProtocol->m_bRescanQueue){
  1775. DPF(9,"RESCAN of QUEUE FORCED IN GETNEXTMESSAGETOSEND\n");
  1776. Unlock(&pSession->SessionLock);
  1777. goto Top;
  1778. }
  1779. if(pSession->dwFlags & SESSION_UNTHROTTLED){
  1780. // unthrottle happened, so rewind.
  1781. DPF(9,"Unthrottling Session %x\n",pSession);
  1782. pSession->dwFlags &= ~(SESSION_THROTTLED|SESSION_UNTHROTTLED);
  1783. }
  1784. Lock(&pSend->SendLock);
  1785. switch(pSession->eState){
  1786. case Open:
  1787. if((pSend->dwFlags & DPSEND_GUARANTEE)?(pSession->fSendSmall):(pSession->fSendSmallDG)){
  1788. MsgIdMask = 0xFF;
  1789. } else {
  1790. MsgIdMask = 0xFFFF;
  1791. }
  1792. if(!(pSend->dwFlags & ASEND_PROTOCOL) && (pSession->dwFlags & SESSION_THROTTLED)){
  1793. // don't do sends on a throttled session, unless they are internal sends.
  1794. break;
  1795. }
  1796. switch(pSend->SendState){
  1797. case Start:
  1798. case WaitingForId:
  1799. DPF(9,"Found Send in State %d, try Going to Sending State\n",pSend->SendState);
  1800. // Just starting, need an id.
  1801. if(!(pSend->dwFlags & ASEND_PROTOCOL) && CheckUserTimeOut(pSend)){
  1802. if(pSend->SendState==WaitingForId){
  1803. if(pSend->dwFlags&DPSEND_GUARANTEED){
  1804. InterlockedDecrement(&pSession->nWaitingForMessageid);
  1805. } else {
  1806. InterlockedDecrement(&pSession->nWaitingForDGMessageid);
  1807. }
  1808. }
  1809. bFoundSend=TRUE;
  1810. break;
  1811. }
  1812. if(pSend->dwFlags&ASEND_PROTOCOL){
  1813. DPF(9,"System Send in Start State, Going to Sending State\n");
  1814. bFoundSend=TRUE;
  1815. pSend->SendState=Sending;
  1816. break;
  1817. } else if(!(pSend->dwFlags&DPSEND_GUARANTEED)) {
  1818. //check_datagram:
  1819. bFoundSend=StartDatagramSend(pSession,pSend, MsgIdMask);
  1820. } else {
  1821. // NOT DataGram, .: reliable...
  1822. //check_reliable:
  1823. bFoundSend=StartReliableSend(pSession,pSend, MsgIdMask);
  1824. #ifdef DEBUG
  1825. if(bFoundSend){
  1826. BILINK *pBiSendWalker=pSend->SendQ.prev;
  1827. PSEND pSendWalker;
  1828. while(pBiSendWalker != &pSession->SendQ){
  1829. pSendWalker=CONTAINING_RECORD(pBiSendWalker,SEND,SendQ);
  1830. pBiSendWalker=pBiSendWalker->prev;
  1831. if((pSendWalker->SendState==Start || pSendWalker->SendState==WaitingForId)&&
  1832. pSendWalker->dwFlags&DPSEND_GUARANTEED &&
  1833. !(pSendWalker->dwFlags&ASEND_PROTOCOL) &&
  1834. pSendWalker->Priority >= pSend->Priority){
  1835. DPF(0,"Send %x got id %x but Send %x still in state %x on Session %x\n",pSend,pSend->messageid,pSendWalker,pSendWalker->SendState,pSession);
  1836. DEBUG_BREAK();
  1837. }
  1838. }
  1839. }
  1840. #endif
  1841. }
  1842. if(bFoundSend){
  1843. if(pSession->dwFlags & SESSION_THROTTLED)
  1844. {
  1845. pSend->SendState=Throttled;
  1846. bFoundSend=FALSE;
  1847. } else {
  1848. pSend->SendState=Sending;
  1849. }
  1850. }
  1851. break;
  1852. case ReadyToSend:
  1853. DPF(9,"Found Send in ReadyToSend State, going to Sending State\n");
  1854. bFoundSend=TRUE;
  1855. if(pSession->dwFlags & SESSION_THROTTLED)
  1856. {
  1857. pSend->SendState=Throttled;
  1858. bFoundSend=FALSE;
  1859. } else {
  1860. pSend->SendState=Sending;
  1861. }
  1862. break;
  1863. case Throttled:
  1864. ASSERT(!(pSession->dwFlags & SESSION_THROTTLED));
  1865. DPF(9,"Found Send in Throttled State, unthrottling going to Sending State\n");
  1866. bFoundSend=TRUE;
  1867. pSend->SendState=Sending;
  1868. if(pSession->dwFlags & SESSION_THROTTLED)
  1869. {
  1870. pSend->SendState=Throttled;
  1871. bFoundSend=FALSE;
  1872. } else {
  1873. pSend->SendState=Sending;
  1874. }
  1875. break;
  1876. case TimedOut:
  1877. DPF(9,"Found TimedOut Send.\n");
  1878. TimeOutSession(pSession);
  1879. bFoundSend=TRUE;
  1880. break;
  1881. case Cancelled:
  1882. bFoundSend=TRUE;
  1883. break;
  1884. default:
  1885. ASSERT(pSend->SendState <= Done);
  1886. break;
  1887. } /* end switch(SendState) */
  1888. break;
  1889. default:
  1890. switch(pSend->SendState){
  1891. case Sending:
  1892. case WaitingForAck:
  1893. case Done:
  1894. DPF(9,"GetNextMessageToSend: Session %x was in state %d ,pSend %x SendState %d, leaving...\n",pSession, pSession->eState, pSend, pSend->SendState);
  1895. //bFoundSend=FALSE;
  1896. break;
  1897. default:
  1898. DPF(9,"GetNextMessageToSend: Session %x was in state %d ,returning pSend %x SendState %d\n",pSession, pSession->eState, pSend, pSend->SendState);
  1899. bFoundSend=TRUE;
  1900. break;
  1901. }
  1902. break;
  1903. } /* end switch pSession->eState */
  1904. if(bFoundSend){
  1905. if(AddSendRef(pSend,1)){
  1906. pSession->RefCount++;
  1907. } else {
  1908. bFoundSend=FALSE;
  1909. }
  1910. }
  1911. Unlock(&pSend->SendLock);
  1912. Unlock(&pSession->SessionLock);
  1913. if(bFoundSend){
  1914. if(pSend->NS==0){
  1915. pSend->tLastACK=timeGetTime();
  1916. }
  1917. break;
  1918. }
  1919. pBilink=pBilink->next;
  1920. pSend=CONTAINING_RECORD(pBilink, SEND, m_GSendQ);
  1921. } /* end while (pBilink != &pProtocol->m_GSendQ) */
  1922. Unlock(&pProtocol->m_SendQLock);
  1923. exit:
  1924. if(bFoundSend){
  1925. DPF(9,"<==GetNextMessageToSend %x\n",pSend);
  1926. return pSend;
  1927. } else {
  1928. DPF(9,"<==GetNextMessageToSend NULL\n");
  1929. return NULL;
  1930. }
  1931. }