Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

636 lines
16 KiB

  1. /*==========================================================================
  2. *
  3. * Copyright (C) 1995-1997 Microsoft Corporation. All Rights Reserved.
  4. *
  5. * File: reliable.c
  6. * Content: stream communication related routines
  7. * History:
  8. * Date By Reason
  9. * ==== == ======
  10. * 01-29-98 sohailm initial implementation
  11. * 02-15-98 a-peterz Remove unused SetMessageHeader
  12. *
  13. ***************************************************************************/
  14. #include "dphelp.h"
  15. /*
  16. * Globals
  17. */
  18. FDS gReadfds; // fd set to receive data
  19. RECEIVELIST gReceiveList; // list of connections + listener
  20. /*
  21. * Externs
  22. */
  23. extern SOCKET gsStreamListener; // we listen for tcp connections on this socket
  24. extern gbReceiveShutdown; // receive thread will exit when TRUE
  25. extern LPSPNODE gNodeList;
  26. #undef DPF_MODNAME
  27. #define DPF_MODNAME "MakeBufferSpace"
  28. // make sure the buffer is big enough to fit the message size
  29. HRESULT MakeBufferSpace(LPBYTE * ppBuffer,LPDWORD pdwBufferSize,DWORD dwMessageSize)
  30. {
  31. HRESULT hr = DP_OK;
  32. ASSERT(ppBuffer);
  33. ASSERT(pdwBufferSize);
  34. ENTER_DPLAYSVR();
  35. if (!*ppBuffer)
  36. {
  37. DPF(9, "Allocating space for message of size %d", dwMessageSize);
  38. // need to alloc receive buffer?
  39. *ppBuffer = MemAlloc(dwMessageSize);
  40. if (!*ppBuffer)
  41. {
  42. DPF_ERR("could not alloc stream receive buffer - out of memory");
  43. hr = E_OUTOFMEMORY;
  44. goto CLEANUP_EXIT;
  45. }
  46. *pdwBufferSize = dwMessageSize;
  47. }
  48. // make sure receive buffer can hold data
  49. else if (dwMessageSize > *pdwBufferSize)
  50. {
  51. LPVOID pvTemp;
  52. DPF(9, "ReAllocating space for message of size %d", dwMessageSize);
  53. // realloc buffer to hold data
  54. pvTemp = MemReAlloc(*ppBuffer,dwMessageSize);
  55. if (!pvTemp)
  56. {
  57. DPF_ERR("could not realloc stream receive buffer - out of memory");
  58. hr = E_OUTOFMEMORY;
  59. goto CLEANUP_EXIT;
  60. }
  61. *ppBuffer = pvTemp;
  62. *pdwBufferSize = dwMessageSize;
  63. }
  64. // fall through
  65. CLEANUP_EXIT:
  66. LEAVE_DPLAYSVR();
  67. return hr;
  68. } // MakeBufferSpace
  69. #undef DPF_MODNAME
  70. #define DPF_MODNAME "AddSocketToReceiveList"
  71. HRESULT AddSocketToReceiveList(SOCKET sSocket)
  72. {
  73. UINT i = 0;
  74. UINT err, iNewSlot;
  75. BOOL bFoundSlot = FALSE;
  76. HRESULT hr = DP_OK;
  77. INT addrlen=sizeof(SOCKADDR);
  78. LPCONNECTION pNewConnection;
  79. ENTER_DPLAYSVR();
  80. // look for an empty slot
  81. while ( (i < gReceiveList.nConnections) && !bFoundSlot)
  82. {
  83. if (INVALID_SOCKET == gReceiveList.pConnection[i].socket)
  84. {
  85. bFoundSlot = TRUE;
  86. iNewSlot = i;
  87. }
  88. else
  89. {
  90. i++;
  91. }
  92. }
  93. if (!bFoundSlot)
  94. {
  95. DWORD dwCurrentSize,dwNewSize;
  96. // allocate space for list of connections
  97. dwCurrentSize = gReceiveList.nConnections * sizeof(CONNECTION);
  98. dwNewSize = dwCurrentSize + INITIAL_RECEIVELIST_SIZE * sizeof(CONNECTION);
  99. hr = MakeBufferSpace((LPBYTE *)&(gReceiveList.pConnection),&dwCurrentSize,dwNewSize);
  100. if (FAILED(hr))
  101. {
  102. ASSERT(FALSE);
  103. goto CLEANUP_EXIT;
  104. }
  105. ASSERT(dwCurrentSize == dwNewSize);
  106. // set all the new entries to INVALID
  107. for (i = gReceiveList.nConnections + 1;
  108. i < gReceiveList.nConnections + INITIAL_RECEIVELIST_SIZE; i++ )
  109. {
  110. gReceiveList.pConnection[i].socket = INVALID_SOCKET;
  111. }
  112. // store the new socket in the 1st new spot
  113. iNewSlot = gReceiveList.nConnections;
  114. // allocate space for an fd set (fd_count + fd_array)
  115. if (gReceiveList.nConnections)
  116. {
  117. dwCurrentSize = sizeof(u_int) + gReceiveList.nConnections * sizeof(SOCKET);
  118. dwNewSize = dwCurrentSize + INITIAL_RECEIVELIST_SIZE * sizeof(SOCKET);
  119. }
  120. else
  121. {
  122. dwCurrentSize = 0;
  123. dwNewSize = sizeof(u_int) + INITIAL_RECEIVELIST_SIZE * sizeof(SOCKET);
  124. }
  125. hr = MakeBufferSpace((LPBYTE *)&(gReadfds.pfdbigset),&dwCurrentSize,dwNewSize);
  126. if (FAILED(hr))
  127. {
  128. ASSERT(FALSE);
  129. goto CLEANUP_EXIT;
  130. }
  131. ASSERT(dwCurrentSize == dwNewSize);
  132. // update the # of connections
  133. gReceiveList.nConnections += INITIAL_RECEIVELIST_SIZE;
  134. // update the fd_array buffer size
  135. gReadfds.dwArraySize = gReceiveList.nConnections;
  136. } // !bFoundSlot
  137. // Initialize new connection
  138. pNewConnection = &(gReceiveList.pConnection[iNewSlot]);
  139. pNewConnection->socket = sSocket;
  140. // allocate a default receive buffer
  141. pNewConnection->pDefaultBuffer = MemAlloc(DEFAULT_RECEIVE_BUFFERSIZE);
  142. if (NULL == pNewConnection->pDefaultBuffer)
  143. {
  144. DPF_ERR("could not alloc default receive buffer - out of memory");
  145. hr = E_OUTOFMEMORY;
  146. goto CLEANUP_EXIT;
  147. }
  148. // receive buffer initially points to our default buffer
  149. pNewConnection->pBuffer = pNewConnection->pDefaultBuffer;
  150. // remember the address we are connected to
  151. err = g_getpeername(pNewConnection->socket, &(pNewConnection->sockAddr), &addrlen);
  152. if (SOCKET_ERROR == err)
  153. {
  154. err = g_WSAGetLastError();
  155. DPF(1,"could not getpeername err = %d\n",err);
  156. }
  157. DPF(9, "Added new socket at index %d", iNewSlot);
  158. CLEANUP_EXIT:
  159. LEAVE_DPLAYSVR();
  160. return hr;
  161. } // AddSocketToReceiveList
  162. #undef DPF_MODNAME
  163. #define DPF_MODNAME "KillSocket"
  164. HRESULT KillSocket(SOCKET sSocket,BOOL fStream,BOOL fHard)
  165. {
  166. UINT err;
  167. if (INVALID_SOCKET == sSocket)
  168. {
  169. return E_FAIL;
  170. }
  171. if (!fStream)
  172. {
  173. if (SOCKET_ERROR == g_closesocket(sSocket))
  174. {
  175. err = g_WSAGetLastError();
  176. DPF(0,"killsocket - dgram close err = %d\n",err);
  177. return E_FAIL;
  178. }
  179. }
  180. else
  181. {
  182. LINGER Linger;
  183. if (fHard)
  184. {
  185. Linger.l_onoff=TRUE; // turn linger on
  186. Linger.l_linger=0; // nice small time out
  187. if( SOCKET_ERROR == g_setsockopt( sSocket,SOL_SOCKET,SO_LINGER,(char FAR *)&Linger,
  188. sizeof(Linger) ) )
  189. {
  190. err = g_WSAGetLastError();
  191. DPF(0,"killsocket - stream setopt err = %d\n",err);
  192. }
  193. }
  194. if (SOCKET_ERROR == g_shutdown(sSocket,2))
  195. {
  196. // this may well fail, if e.g. no one is using this socket right now...
  197. // the error would be wsaenotconn
  198. err = g_WSAGetLastError();
  199. DPF(5,"killsocket - stream shutdown err = %d\n",err);
  200. }
  201. if (SOCKET_ERROR == g_closesocket(sSocket))
  202. {
  203. err = g_WSAGetLastError();
  204. DPF(0,"killsocket - stream close err = %d\n",err);
  205. return E_FAIL;
  206. }
  207. }
  208. return DP_OK;
  209. }// KillSocket
  210. void FreeConnection(LPCONNECTION pConnection)
  211. {
  212. DEBUGPRINTSOCK(5,"Freeing connection - ",&pConnection->socket);
  213. KillSocket(pConnection->socket,TRUE,FALSE);
  214. if (pConnection->pBuffer && (pConnection->pBuffer != pConnection->pDefaultBuffer))
  215. {
  216. MemFree(pConnection->pBuffer);
  217. pConnection->pBuffer = NULL;
  218. }
  219. if (pConnection->pDefaultBuffer)
  220. {
  221. MemFree(pConnection->pDefaultBuffer);
  222. pConnection->pDefaultBuffer = NULL;
  223. }
  224. // initialize connection
  225. pConnection->socket = INVALID_SOCKET; // this tells us if connection is valid
  226. pConnection->dwCurMessageSize = 0;
  227. pConnection->dwTotalMessageSize = 0;
  228. }
  229. #undef DPF_MODNAME
  230. #define DPF_MODNAME "RemoveSocketFromList"
  231. void RemoveSocketFromList(SOCKET socket)
  232. {
  233. UINT i = 0;
  234. BOOL bFound = FALSE;
  235. ENTER_DPLAYSVR();
  236. // look for the corresponding connection
  237. while ( (i < gReceiveList.nConnections) && !bFound)
  238. {
  239. if (gReceiveList.pConnection[i].socket == socket)
  240. {
  241. bFound = TRUE;
  242. FreeConnection(&gReceiveList.pConnection[i]);
  243. }
  244. else
  245. {
  246. i++;
  247. }
  248. } // while
  249. LEAVE_DPLAYSVR();
  250. return ;
  251. }
  252. #undef DPF_MODNAME
  253. #define DPF_MODNAME "EmptyConnectionList"
  254. void EmptyConnectionList(void)
  255. {
  256. UINT i;
  257. DPF(5, "Emptying connection list");
  258. ENTER_DPLAYSVR();
  259. for (i=0;i<gReceiveList.nConnections ;i++ )
  260. {
  261. if (INVALID_SOCKET != gReceiveList.pConnection[i].socket)
  262. {
  263. FreeConnection(&(gReceiveList.pConnection[i]));
  264. }
  265. }
  266. LEAVE_DPLAYSVR();
  267. return ;
  268. } // EmptyConnectionList
  269. #undef DPF_MODNAME
  270. #define DPF_MODNAME "StreamReceive"
  271. /*
  272. ** StreamReceive
  273. *
  274. * CALLED BY: StreamReceiveThreadProc
  275. *
  276. * PARAMETERS:
  277. * sSocket - socket to receive on
  278. * ppBuffer - buffer to receive into - alloc'ed / realloc'ed as necessary
  279. * pdwBuffersize - size of pBuffer
  280. *
  281. * DESCRIPTION:
  282. * take the bytes out of sSocket until no more bytes
  283. *
  284. * RETURNS: E_FAIL on sockerr, or DP_OK.
  285. *
  286. */
  287. HRESULT StreamReceive(LPCONNECTION pConnection)
  288. {
  289. HRESULT hr = DP_OK;
  290. UINT err;
  291. DWORD dwBytesReceived=0;
  292. DWORD dwMessageSize;
  293. LPBYTE pReceiveBuffer=NULL;
  294. DWORD dwReceiveBufferSize;
  295. // is it a new message ?
  296. if (pConnection->dwCurMessageSize == 0)
  297. {
  298. // receive the header first
  299. pConnection->dwTotalMessageSize = SPMESSAGEHEADERLEN;
  300. }
  301. // continue receiving message
  302. pReceiveBuffer = pConnection->pBuffer + pConnection->dwCurMessageSize;
  303. dwReceiveBufferSize = pConnection->dwTotalMessageSize - pConnection->dwCurMessageSize;
  304. DPF(9,"Attempting to receive %d bytes", dwReceiveBufferSize);
  305. DEBUGPRINTSOCK(9,">>> receiving data on socket - ",&pConnection->socket);
  306. // receive data from socket
  307. // note - make exactly one call to recv after select otherwise we'll hang
  308. dwBytesReceived = g_recv(pConnection->socket, (LPBYTE)pReceiveBuffer, dwReceiveBufferSize, 0);
  309. DEBUGPRINTSOCK(9,"<<< received data on socket - ",&pConnection->socket);
  310. DPF(5, "received %d bytes", dwBytesReceived);
  311. if (0 == dwBytesReceived)
  312. {
  313. // remote side has shutdown connection gracefully
  314. hr = DP_OK;
  315. DPF(5,"Remote side has shutdown connection gracefully");
  316. goto CLEANUP_EXIT;
  317. }
  318. else if (SOCKET_ERROR == dwBytesReceived)
  319. {
  320. err = g_WSAGetLastError();
  321. DPF(0,"STREAMRECEIVEE: receive error - err = %d",err);
  322. hr = E_UNEXPECTED;
  323. goto CLEANUP_EXIT;
  324. }
  325. // we have received this much message so far
  326. pConnection->dwCurMessageSize += dwBytesReceived;
  327. if (pConnection->dwCurMessageSize == SPMESSAGEHEADERLEN)
  328. {
  329. // we just completed receiving message header
  330. if (VALID_DPLAYSVR_MESSAGE(pConnection->pDefaultBuffer))
  331. {
  332. dwMessageSize = SP_MESSAGE_SIZE(pConnection->pDefaultBuffer); // total message size
  333. }
  334. else
  335. {
  336. DPF(2,"got invalid message");
  337. ASSERT(FALSE);
  338. hr = E_UNEXPECTED;
  339. goto CLEANUP_EXIT;
  340. }
  341. // prepare to receive the rest of the message (after token)
  342. if (dwMessageSize)
  343. {
  344. pConnection->dwTotalMessageSize = dwMessageSize;
  345. // which buffer to receive message in ?
  346. if (dwMessageSize > DEFAULT_RECEIVE_BUFFERSIZE)
  347. {
  348. ASSERT(pConnection->pBuffer == pConnection->pDefaultBuffer);
  349. // get a new buffer to fit the message
  350. pConnection->pBuffer = MemAlloc(dwMessageSize);
  351. if (!pConnection->pBuffer)
  352. {
  353. DPF(0,"Failed to allocate receive buffer for message - out of memory");
  354. goto CLEANUP_EXIT;
  355. }
  356. // copy header into new message buffer
  357. memcpy(pConnection->pBuffer, pConnection->pDefaultBuffer, SPMESSAGEHEADERLEN);
  358. }
  359. }
  360. }
  361. // did we receive a complete message ?
  362. if (pConnection->dwCurMessageSize == pConnection->dwTotalMessageSize)
  363. {
  364. // received a complete message - process it
  365. if (TOKEN == SP_MESSAGE_TOKEN(pConnection->pBuffer))
  366. {
  367. DEBUGPRINTADDR(9,"dplay helper :: received reliable enum request from ",(SOCKADDR *)&pConnection->sockAddr);
  368. // take the dplay lock so no one messes w/ our list of registered serves while we're
  369. // trying to send to them...
  370. ENTER_DPLAYSVR();
  371. HandleIncomingMessage(pConnection->pBuffer, pConnection->dwTotalMessageSize,
  372. (SOCKADDR_IN *)&pConnection->sockAddr);
  373. // give up the lock
  374. LEAVE_DPLAYSVR();
  375. }
  376. // cleanup up new receive buffer if any
  377. if (pConnection->dwTotalMessageSize > DEFAULT_RECEIVE_BUFFERSIZE)
  378. {
  379. DPF(9, "Releasing receive buffer of size %d", pConnection->dwTotalMessageSize);
  380. if (pConnection->pBuffer) MemFree(pConnection->pBuffer);
  381. }
  382. // initialize message information
  383. pConnection->dwCurMessageSize = 0;
  384. pConnection->dwTotalMessageSize = 0;
  385. pConnection->pBuffer = pConnection->pDefaultBuffer;
  386. }
  387. // all done
  388. return DP_OK;
  389. CLEANUP_EXIT:
  390. RemoveSocketFromList(pConnection->socket);
  391. return hr;
  392. } // StreamReceive
  393. #undef DPF_MODNAME
  394. #define DPF_MODNAME "StreamReceiveThreadProc"
  395. // watch our list of sockets, waiting for one to have data to be received, or to be closed
  396. DWORD WINAPI StreamReceiveThreadProc(LPVOID pvCast)
  397. {
  398. HRESULT hr;
  399. INT_PTR rval;
  400. UINT i = 0;
  401. UINT err;
  402. DWORD dwBufferSize = 0;
  403. UINT nSelected;
  404. SOCKADDR sockaddr; // socket we receive from
  405. INT addrlen=sizeof(sockaddr);
  406. SOCKET sSocket;
  407. // add listener socket to receive list
  408. // listener socket should be the first socket in the receive list
  409. hr = AddSocketToReceiveList(gsStreamListener);
  410. if (FAILED(hr))
  411. {
  412. DPF(0, "Failed to add TCP listener to receive list");
  413. return hr;
  414. }
  415. while (1)
  416. {
  417. ENTER_DPLAYSVR();
  418. ASSERT(gReadfds.pfdbigset);
  419. // add all sockets in our recv list to readfds
  420. FD_ZERO(gReadfds.pfdbigset);
  421. nSelected = 0;
  422. for (i=0;i < gReceiveList.nConnections ; i++)
  423. {
  424. if (INVALID_SOCKET != gReceiveList.pConnection[i].socket)
  425. {
  426. FD_BIG_SET(gReceiveList.pConnection[i].socket,&gReadfds);
  427. nSelected++;
  428. }
  429. }
  430. LEAVE_DPLAYSVR();
  431. if (0 == nSelected)
  432. {
  433. if (gbReceiveShutdown)
  434. {
  435. DPF(2,"stream receive thread proc detected shutdown - bailing");
  436. goto CLEANUP_EXIT;
  437. }
  438. // we should have at least one?
  439. DPF_ERR("No sockets in receive list - missing listener socket? bailing!");
  440. ASSERT(FALSE);
  441. goto CLEANUP_EXIT;
  442. }
  443. // now, we wait for something to happen w/ our socket set
  444. rval = g_select(0,(fd_set *)(gReadfds.pfdbigset),NULL,NULL,NULL);
  445. if (SOCKET_ERROR == rval)
  446. {
  447. err = g_WSAGetLastError();
  448. if (WSAEINTR != err)
  449. {
  450. // WSAEINTR is what winsock uses to break a blocking socket out of
  451. // its wait. it means someone killed this socket.
  452. // if it's not that, then it's a real error.
  453. DPF(0,"\n select error = %d socket - trying again",err);
  454. }
  455. else
  456. {
  457. DPF(9,"\n select error = %d socket - trying again",err);
  458. }
  459. rval = 0;
  460. }
  461. // shut 'em down?
  462. if (gbReceiveShutdown)
  463. {
  464. DPF(2,"receive thread proc detected bShutdown - bailing");
  465. goto CLEANUP_EXIT;
  466. }
  467. DPF(5,"receive thread proc - events on %d sockets",rval);
  468. i = 0;
  469. ENTER_DPLAYSVR();
  470. while (rval>0)
  471. {
  472. // walk the receive list, dealing w/ all new sockets
  473. if (i >= gReceiveList.nConnections)
  474. {
  475. ASSERT(FALSE); // should never happen
  476. rval = 0; // just to be safe, reset
  477. }
  478. if (gReceiveList.pConnection[i].socket != INVALID_SOCKET)
  479. {
  480. // see if it's in the set
  481. if (g_WSAFDIsSet(gReceiveList.pConnection[i].socket,(fd_set *)gReadfds.pfdbigset))
  482. {
  483. if (0==i)
  484. // we got a new connection
  485. {
  486. // accept any incoming connection
  487. sSocket = g_accept(gReceiveList.pConnection[i].socket,&sockaddr,&addrlen);
  488. if (INVALID_SOCKET == sSocket)
  489. {
  490. err = g_WSAGetLastError();
  491. DPF(0,"\n stream accept error - err = %d socket = %d BAILING",err,(DWORD)sSocket);
  492. DPF(0, "\n !!! stream accept thread is going away - won't get reliable enum sessions anymore !!!");
  493. ASSERT(FALSE);
  494. LEAVE_DPLAYSVR();
  495. goto CLEANUP_EXIT;
  496. }
  497. DEBUGPRINTADDR(5,"stream - accepted connection from",&sockaddr);
  498. // add the new socket to our receive list
  499. hr = AddSocketToReceiveList(sSocket);
  500. if (FAILED(hr))
  501. {
  502. ASSERT(FALSE);
  503. }
  504. }
  505. else
  506. // socket has new data
  507. {
  508. DPF(9, "Receiving on socket %d from ReceiveList", i);
  509. // got one! this socket has something going on...
  510. hr = StreamReceive(&(gReceiveList.pConnection[i]));
  511. if (FAILED(hr))
  512. {
  513. DPF(1,"Stream Receive failed - hr = 0x%08lx\n",hr);
  514. }
  515. }
  516. rval--; // one less to hunt for
  517. } // IS_SET
  518. } // != INVALID_SOCKET
  519. i++;
  520. } // while rval
  521. LEAVE_DPLAYSVR();
  522. } // while 1
  523. CLEANUP_EXIT:
  524. EmptyConnectionList();
  525. DPF(5, "Stream receive thread exiting");
  526. return 0;
  527. } // ReceiveThreadProc
  528.