Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1144 lines
23 KiB

  1. /*++
  2. Copyright (c) 2000 Microsoft Corporation
  3. Module Name:
  4. msg.c
  5. Abstract:
  6. Point to point tcp and ip-multicast
  7. Author:
  8. Ahmed Mohamed (ahmedm) 12, 01, 2000
  9. Revision History:
  10. --*/
  11. #include <stdio.h>
  12. #include "msg.h"
  13. #include <stdlib.h>
  14. #include <winsock2.h>
  15. #include <ctype.h>
  16. #include <string.h>
  17. #include <assert.h>
  18. #include <time.h>
  19. #define MSG_ATOMIC 1
  20. int GS_MAX_MSG_SZ = (64 * 1024);
  21. #define PROTOCOL_TYPE SOCK_STREAM
  22. extern gs_node_handler_t gs_node_handler[];
  23. static int max_mcmsg = 0;
  24. static char cl_subnet[16];
  25. #define MAX_NODEID 16
  26. int NodesSize = 0;
  27. static char *nodes[MAX_NODEID] = {0};
  28. static char *ipaddr[MAX_NODEID] = {0};
  29. static int DEFAULT_PORT=6009;
  30. static int mcast_enabled = 0;
  31. static int MSG_POOL_SZ=32;
  32. int MY_NODEID;
  33. static SOCKET prf_handles[MAX_NODEID];
  34. static SOCKET rcv_handles[MAX_NODEID];
  35. static SOCKET send_handles[MAX_NODEID];
  36. static SOCKET tmp_socks[MAX_NODEID];
  37. static CRITICAL_SECTION msglock;
  38. static HANDLE Msg_Event[MAX_NODEID];
  39. void mcast_init();
  40. DWORD WINAPI srv(LPVOID arg);
  41. DWORD WINAPI mcast_srv(LPVOID arg);
  42. DWORD WINAPI srv_io(LPVOID arg);
  43. DWORD WINAPI cmgr(LPVOID arg);
  44. static gs_msg_t *msg_pool = NULL;
  45. static gs_msg_t *msg_hdrpool = NULL;
  46. void
  47. Msg_AllocPool()
  48. {
  49. char *p;
  50. gs_msg_t *prev;
  51. int sz, elmsz;
  52. // allocate msg header pool
  53. sz = sizeof(gs_msg_t) * MSG_POOL_SZ;
  54. prev = (gs_msg_t *) malloc(sz);
  55. if (prev == NULL) {
  56. printf("Unable to allocate message hdr pool\n");
  57. halt(1);
  58. }
  59. msg_hdrpool = NULL;
  60. for (sz = 0; sz < MSG_POOL_SZ; sz++) {
  61. prev->m_refcnt = 0;
  62. prev->m_buflen = 0;
  63. prev->m_buf = NULL;
  64. prev->m_next = msg_hdrpool;
  65. msg_hdrpool = prev;
  66. prev++;
  67. }
  68. // allocate msg pool
  69. sz = sizeof(gs_msg_t) * MSG_POOL_SZ;
  70. // prev = (gs_msg_t *) malloc(sz);
  71. prev = (gs_msg_t *) VirtualAlloc(NULL, sz, MEM_RESERVE|MEM_COMMIT,
  72. PAGE_READWRITE);
  73. if (prev == NULL) {
  74. printf("Unable to allocate message pool\n");
  75. halt(1);
  76. }
  77. // lock region now
  78. if (!VirtualLock(prev, sz)) {
  79. printf("Unable to lock down hdr pages %d\n", GetLastError());
  80. }
  81. sz = MSG_POOL_SZ * GS_MAX_MSG_SZ;
  82. p = VirtualAlloc(NULL, sz, MEM_RESERVE|MEM_COMMIT,
  83. PAGE_READWRITE);
  84. if (p == NULL) {
  85. printf("Unable to allocate message memory pool\n");
  86. halt(1);
  87. }
  88. if (!VirtualLock(p, sz)) {
  89. printf("Unable to lock down pages %d err %d\n", sz, GetLastError());
  90. }
  91. msg_pool = NULL;
  92. for (sz = 0; sz < MSG_POOL_SZ; sz++) {
  93. prev->m_refcnt = 0;
  94. prev->m_buflen = GS_MAX_MSG_SZ - 1;
  95. prev->m_buf = p;
  96. prev->m_next = msg_pool;
  97. msg_pool = prev;
  98. prev++;
  99. *p = 0; // touch it
  100. p += GS_MAX_MSG_SZ;
  101. }
  102. }
  103. gs_msg_t *
  104. msg_hdralloc(const char *buf, int len)
  105. {
  106. PVOID t;
  107. gs_msg_t * p;
  108. #ifdef MSG_ATOMIC
  109. do {
  110. p = msg_hdrpool;
  111. if (p == NULL) {
  112. break;
  113. }
  114. t = InterlockedCompareExchangePointer((PVOID *)&msg_hdrpool,
  115. (PVOID)p->m_next, (PVOID) p);
  116. } while (t != (PVOID) p);
  117. #else
  118. GsLockEnter(msglock);
  119. if (p = msg_hdrpool) {
  120. msg_hdrpool = p->m_next;
  121. p->m_next = NULL;
  122. }
  123. GsLockExit(msglock);
  124. #endif
  125. if (p == NULL) {
  126. printf("Out of message headers!!!\n");
  127. halt(1);
  128. }
  129. // p->m_buflen = 0;
  130. p->m_refcnt = 1;
  131. p->m_buf = (char *)buf;
  132. p->m_type = MSG_TYPE_HDR;
  133. msg_log(("Alloc hdr msg %x len %d pool %x\n", p, p->m_buflen, msg_hdrpool));
  134. return p;
  135. }
  136. gs_msg_t *
  137. msg_alloc(const char *buf, int len)
  138. {
  139. PVOID t;
  140. gs_msg_t * p;
  141. if (len > GS_MAX_MSG_SZ) {
  142. printf("Large msg, can't handle %d\n", len);
  143. halt(1);
  144. }
  145. if (buf != NULL) {
  146. return msg_hdralloc(buf, len);
  147. }
  148. #ifdef MSG_ATOMIC
  149. do {
  150. p = msg_pool;
  151. if (p == NULL) {
  152. break;
  153. }
  154. t = InterlockedCompareExchangePointer((PVOID *)&msg_pool,
  155. (PVOID)p->m_next, (PVOID) p);
  156. } while (t != (PVOID) p);
  157. #else
  158. GsLockEnter(msglock);
  159. if (p = msg_pool) {
  160. msg_pool = p->m_next;
  161. p->m_next = NULL;
  162. msg_log(("Alloc msg %x pool %x\n", p, msg_pool));
  163. }
  164. GsLockExit(msglock);
  165. #endif
  166. if (p == NULL) {
  167. printf("Out of messages!!!\n");
  168. halt(1);
  169. }
  170. // p->m_buflen = len;
  171. p->m_refcnt = 1;
  172. p->m_type = MSG_TYPE_DATA;
  173. if (buf) {
  174. memcpy(p->m_buf, buf, len);
  175. }
  176. msg_log(("Alloc msg %x buf %x len %d\n", p, p->m_buf, p->m_buflen));
  177. return p;
  178. }
  179. void
  180. msg_hdrfree(gs_msg_t *msg)
  181. {
  182. PVOID t, p;
  183. msg_log(("Free hdr msg %x len %d pool %x\n", msg, msg->m_buflen,msg_pool));
  184. #ifdef MSG_ATOMIC
  185. do {
  186. msg->m_next = msg_hdrpool;
  187. t = InterlockedCompareExchangePointer((PVOID *)&msg_hdrpool, (PVOID)msg,
  188. (PVOID)msg->m_next);
  189. } while (t != (PVOID) msg->m_next);
  190. #else
  191. GsLockEnter(msglock);
  192. msg->m_next = msg_hdrpool;
  193. msg_hdrpool = msg;
  194. GsLockExit(msglock);
  195. #endif
  196. }
  197. void
  198. msg_free(gs_msg_t *msg)
  199. {
  200. PVOID t, p;
  201. msg->m_refcnt--;
  202. if (msg->m_refcnt > 0) {
  203. msg_log(("msg %x not freed %d flags %x\n", msg, msg->m_refcnt,
  204. msg->m_hdr.h_flags));
  205. if (msg->m_refcnt > 10) {
  206. halt(0);
  207. }
  208. return;
  209. }
  210. if (msg->m_type == MSG_TYPE_HDR) {
  211. msg_hdrfree(msg);
  212. return;
  213. }
  214. msg_log(("Free msg %x buf %x pool %x\n", msg, msg->m_buf, msg_pool));
  215. #ifdef MSG_ATOMIC
  216. do {
  217. msg->m_next = msg_pool;
  218. t = InterlockedCompareExchangePointer((PVOID *)&msg_pool, (PVOID)msg,
  219. (PVOID)msg->m_next);
  220. } while (t != (PVOID) msg->m_next);
  221. #else
  222. GsLockEnter(msglock);
  223. msg->m_next = msg_pool;
  224. msg_pool = msg;
  225. GsLockExit(msglock);
  226. #endif
  227. }
  228. char *
  229. strsave(char *s)
  230. {
  231. char *p;
  232. p = (char*)malloc(strlen(s) + 1);
  233. assert(p != NULL);
  234. return strcpy(p, s);
  235. }
  236. int
  237. Strncasecmp(char *s, char *p, int len)
  238. {
  239. while (len-- > 0) {
  240. if (tolower(s[len]) != tolower(p[len]))
  241. return 1;
  242. }
  243. return 0;
  244. }
  245. /********************************************************************/
  246. int
  247. msg_buildaddr(struct sockaddr_in *sin, char *hostname, char *ipaddr)
  248. {
  249. struct hostent *h;
  250. int i;
  251. char *p;
  252. char *tmp;
  253. h = gethostbyname(hostname);
  254. if (h == NULL) {
  255. fprintf(stderr,"cannot get info for host %s\n", hostname);
  256. return 1;
  257. }
  258. p = (char *) h->h_addr_list[0];
  259. for (i = 0; h->h_addr_list[i]; i++) {
  260. struct in_addr x;
  261. memcpy(&x, p, h->h_length);
  262. tmp = inet_ntoa(x);
  263. if (!strncmp(cl_subnet, tmp, strlen(cl_subnet))) {
  264. break;
  265. }
  266. p += h->h_length;
  267. }
  268. if (h->h_addr_list[i] == NULL) {
  269. printf("Unable to find proper subnet %s host %s\n", cl_subnet, hostname);
  270. if (ipaddr != NULL) {
  271. // use this address
  272. sin->sin_addr.s_addr = inet_addr(ipaddr);
  273. printf("host %s addr %s\n", hostname, ipaddr);
  274. } else {
  275. sin->sin_addr.s_addr = INADDR_ANY;
  276. printf("host %s addr %s\n", hostname, "any");
  277. }
  278. } else {
  279. memcpy(&sin->sin_addr.s_addr, h->h_addr_list[i], h->h_length);
  280. printf("host %s addr %s\n", hostname, tmp);
  281. }
  282. return 0;
  283. }
  284. #ifndef TEST
  285. int
  286. WINAPI
  287. msg_addnode(int id, char *n, char *a)
  288. {
  289. char *s;
  290. s = strchr(n, '.');
  291. if (s) {
  292. *s = '\0';
  293. }
  294. printf("nodeid %d node %s ip %s\n", id, n, a);
  295. nodes[id-1] = strsave(n);
  296. ipaddr[id-1] = strsave(a);
  297. if (id > NodesSize)
  298. NodesSize = id;
  299. return NodesSize;
  300. }
  301. #endif
  302. /********************************************************************/
  303. int
  304. msg_getsize()
  305. {
  306. return NodesSize;
  307. }
  308. void
  309. msg_closenode(int nodeid)
  310. {
  311. GsLockEnter(msglock);
  312. if (rcv_handles[nodeid]) {
  313. closesocket(rcv_handles[nodeid]);
  314. rcv_handles[nodeid] = 0;
  315. }
  316. if (send_handles[nodeid]) {
  317. closesocket(send_handles[nodeid]);
  318. send_handles[nodeid] = 0;
  319. }
  320. prf_handles[nodeid] = 0;
  321. GsLockExit(msglock);
  322. return;
  323. }
  324. int
  325. msg_send(gs_memberid_t nodeid, gs_msg_hdr_t *hdr, const char *buf, int len)
  326. {
  327. int i;
  328. SOCKET s;
  329. WSABUF io[2];
  330. LPWSAOVERLAPPED ov;
  331. ov = NULL;
  332. nodeid--;
  333. if (nodeid >= NodesSize) {
  334. err_log(("send bad node %d\n", nodeid));
  335. return 1;
  336. }
  337. s = prf_handles[nodeid];
  338. if (!s) {
  339. s = send_handles[nodeid];
  340. if (!s) {
  341. s = rcv_handles[nodeid];
  342. if (!s) {
  343. err_log(("Node %d is dead\n", nodeid+1));
  344. return 1;
  345. }
  346. }
  347. }
  348. msg_log(("Send msg nid %d type %d seq %d bnum %d view %d\n",
  349. nodeid+1, hdr->h_type, hdr->h_mseq,
  350. hdr->h_bnum, hdr->h_viewnum));
  351. io[0].len = sizeof(*hdr);
  352. io[0].buf = (char *) hdr;
  353. io[1].len = len;
  354. io[1].buf = (char *) buf;
  355. if (WSASend(s, io, 2, &i, 0, ov, 0)) {
  356. int err = WSAGetLastError();
  357. if (err == WSA_IO_PENDING) {
  358. printf("Async send\n");
  359. return 1;
  360. }
  361. printf("Send nid %d failed %d\n", nodeid+1, err);
  362. msg_closenode(nodeid);
  363. return 1;
  364. }
  365. i -= sizeof(*hdr);
  366. if (i != len) {
  367. printf("Send failed: node %d len %d, %d\n", nodeid+1, len, i);
  368. halt(1);
  369. }
  370. return 0;
  371. }
  372. void
  373. msg_mcast(ULONG mset, gs_msg_hdr_t *hdr, const char *buf, int len)
  374. {
  375. gs_memberid_t i;
  376. void mcast_send(gs_msg_hdr_t *hdr, const char *buf, int len);
  377. mset = mset & ~(1 << MY_NODEID);
  378. if (mset == 0)
  379. return;
  380. // if (mcast_enabled == 0 || len > max_mcmsg) {
  381. if (len > max_mcmsg) {
  382. for (i = 1; i <= NodesSize; i++) {
  383. if (mset & (1 << i)) {
  384. msg_send(i, hdr, buf, len);
  385. }
  386. }
  387. }
  388. else {
  389. mcast_send(hdr, buf, len);
  390. }
  391. }
  392. void
  393. msg_smcast(ULONG mset, gs_msg_hdr_t *hdr, const char *buf, int len)
  394. {
  395. gs_memberid_t i;
  396. mset = mset & ~(1 << MY_NODEID);
  397. if (mset == 0)
  398. return;
  399. for (i = 1; i <= NodesSize; i++) {
  400. if (mset & (1 << i)) {
  401. msg_send(i, hdr, buf, len);
  402. }
  403. }
  404. }
  405. msg_init()
  406. {
  407. int i;
  408. WSADATA wsaData;
  409. char h_name[64];
  410. // set our priority to high class
  411. if (!SetPriorityClass(GetCurrentProcess(),HIGH_PRIORITY_CLASS)) {
  412. printf("Unable to set high priority %d\n", GetLastError());
  413. }
  414. if (WSAStartup(0x202,&wsaData) == SOCKET_ERROR) {
  415. fprintf(stderr,"WSAStartup failed with error %d\n",
  416. WSAGetLastError());
  417. WSACleanup();
  418. return -1;
  419. }
  420. i = gethostname(h_name, 64);
  421. // increase our working set
  422. if (!SetProcessWorkingSetSize(GetCurrentProcess(),
  423. 32*1024*1024, 64*1024*1024)) {
  424. printf("Unable to set working size %d\n", GetLastError());
  425. }
  426. InitializeCriticalSection(&msglock);
  427. Msg_AllocPool();
  428. for (i = 0; i < NodesSize; i++) {
  429. Msg_Event[i] = CreateEvent(NULL, TRUE, FALSE, NULL);
  430. prf_handles[i] = 0;
  431. send_handles[i] = 0;
  432. rcv_handles[i] = 0;
  433. if (!Strncasecmp(h_name, nodes[i], strlen(h_name))) {
  434. MY_NODEID = i+1;
  435. gs_node_handler[MSG_NODE_ID](MY_NODEID);
  436. } else {
  437. LPVOID arg = (LPVOID) ((ULONGLONG) i);
  438. CreateThread(NULL, 2*64*1024, cmgr, arg, 0, NULL);
  439. }
  440. }
  441. cm_log(("Local host %d %s\n", MY_NODEID, h_name));
  442. if (mcast_enabled) {
  443. mcast_init();
  444. }
  445. if (NodesSize > 1) {
  446. LPVOID arg = (LPVOID) ((ULONGLONG) DEFAULT_PORT);
  447. // create srv thread
  448. CreateThread(NULL, 4*1024, srv, arg, 0,NULL);
  449. // create mcast thread
  450. if (mcast_enabled) {
  451. for (i = 0; i < 8; i++)
  452. CreateThread(NULL, 2*64*1024, mcast_srv, 0, 0,NULL);
  453. }
  454. }
  455. return 0;
  456. }
  457. void
  458. msg_exit()
  459. {
  460. // xxx: Stop all threads before during this
  461. WSACleanup();
  462. }
  463. void
  464. msg_start(ULONG mask)
  465. {
  466. int i;
  467. mask = mask >> 1;
  468. for (i = 0; i < NodesSize; i++) {
  469. GsLockEnter(msglock);
  470. if (!(mask & (1 << i)) && !send_handles[i]) {
  471. SetEvent(Msg_Event[i]);
  472. }
  473. GsLockExit(msglock);
  474. }
  475. }
  476. DWORD
  477. srv_msg(SOCKET msgsock, int nodeid)
  478. {
  479. gs_msg_t *msg;
  480. int retval;
  481. char *buf;
  482. int len;
  483. while (1) {
  484. extern gs_msg_handler_t gs_msg_handler[];
  485. int type;
  486. msg = msg_alloc(NULL, GS_MAX_MSG_SZ);
  487. // read hdr info first
  488. buf = (char *) &msg->m_hdr;
  489. len = sizeof(msg->m_hdr);
  490. do {
  491. retval = recv(msgsock, buf, len, 0);
  492. if (retval < 0) {
  493. err_log(("recv failed %d, %d\n",
  494. retval,
  495. WSAGetLastError()));
  496. msg_free(msg);
  497. return 0;
  498. }
  499. len -= retval;
  500. buf += retval;
  501. } while (len > 0);
  502. // read rest of message
  503. buf = msg->m_buf;
  504. len = msg->m_hdr.h_len;
  505. while (len > 0) {
  506. retval = recv(msgsock, buf, len, 0);
  507. if (retval < 0) {
  508. err_log(("recv failed %d, %d\n",retval,
  509. WSAGetLastError()));
  510. msg_free(msg);
  511. return 0;
  512. }
  513. len -= retval;
  514. buf += retval;
  515. }
  516. // set preferred socket to use
  517. prf_handles[nodeid] = msgsock;
  518. msg_log(("rec nid %d gid %d type %d seq %d view %d len %d\n",
  519. msg->m_hdr.h_sid,msg->m_hdr.h_gid, type = msg->m_hdr.h_type,
  520. msg->m_hdr.h_mseq, msg->m_hdr.h_viewnum, msg->m_hdr.h_len));
  521. gs_msg_handler[msg->m_hdr.h_type](msg);
  522. msg_log(("Done Type %d\n", type));
  523. }
  524. return 0;
  525. }
  526. DWORD WINAPI
  527. srv_io(LPVOID arg)
  528. {
  529. int retval;
  530. char *buf;
  531. int len;
  532. ULONGLONG tmp = (ULONGLONG) arg;
  533. int nodeid = (int) tmp;
  534. SOCKET msgsock = tmp_socks[nodeid];
  535. GsLockEnter(msglock);
  536. gs_node_handler[MSG_NODE_JOIN](nodeid+1);
  537. rcv_handles[nodeid] = msgsock;
  538. // issue join callback
  539. if (!send_handles[nodeid]) {
  540. gs_node_handler[MSG_NODE_UP](nodeid+1);
  541. SetEvent(Msg_Event[nodeid]);
  542. }
  543. GsLockExit(msglock);
  544. srv_msg(msgsock, nodeid);
  545. cm_log(("Terminating connection with node %d\n", nodeid));
  546. msg_closenode(nodeid);
  547. gs_node_handler[MSG_NODE_DOWN](nodeid+1);
  548. return (0);
  549. }
  550. void
  551. msg_setopt(SOCKET s)
  552. {
  553. // set option keepalive
  554. BOOLEAN val = TRUE;
  555. if (setsockopt(s, IPPROTO_TCP, SO_KEEPALIVE, (char *)&val,
  556. sizeof(val)) == SOCKET_ERROR) {
  557. fprintf(stderr,"Keepalive %d\n", WSAGetLastError());
  558. }
  559. // set option nodelay
  560. val = TRUE;
  561. if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char *)&val,
  562. sizeof(val)) == SOCKET_ERROR) {
  563. fprintf(stderr,"No delay %d\n", WSAGetLastError());
  564. }
  565. // set option nolinger
  566. val = TRUE;
  567. if (setsockopt(s, SOL_SOCKET, SO_DONTLINGER, (char *)&val,
  568. sizeof(val)) == SOCKET_ERROR) {
  569. fprintf(stderr,"No delay %d\n", WSAGetLastError());
  570. }
  571. }
  572. DWORD WINAPI
  573. srv(LPVOID arg)
  574. {
  575. char *nic= NULL;
  576. int fromlen;
  577. int i;
  578. struct sockaddr_in local, from;
  579. SOCKET listen_socket, msgsock;
  580. ULONGLONG tmp = (ULONGLONG) arg;
  581. short port = (short) tmp;
  582. #if 0
  583. nic = ipaddr[MY_NODEID-1];
  584. local.sin_addr.s_addr = (!nic)?INADDR_ANY:inet_addr(nic);
  585. #else
  586. if (msg_buildaddr(&local, nodes[MY_NODEID-1], ipaddr[MY_NODEID-1])) {
  587. fprintf(stderr,"Unable to get my own address\n");
  588. return -1;
  589. }
  590. #endif
  591. local.sin_family = AF_INET;
  592. /*
  593. * Port MUST be in Network Byte Order
  594. */
  595. local.sin_port = htons(port);
  596. // TCP socket
  597. listen_socket = WSASocket(AF_INET, PROTOCOL_TYPE, 0, NULL, 0,
  598. WSA_FLAG_OVERLAPPED);
  599. if (listen_socket == INVALID_SOCKET){
  600. fprintf(stderr,"socket() failed with error %d\n",WSAGetLastError());
  601. return -1;
  602. }
  603. //
  604. // bind() associates a local address and port combination with the
  605. // socket just created. This is most useful when the application is a
  606. // server that has a well-known port that clients know about in advance.
  607. //
  608. if (bind(listen_socket,(struct sockaddr*)&local,sizeof(local) )
  609. == SOCKET_ERROR) {
  610. fprintf(stderr,"bind() failed with error %d\n",WSAGetLastError());
  611. return -1;
  612. }
  613. msg_setopt(listen_socket);
  614. if (listen(listen_socket,5) == SOCKET_ERROR) {
  615. fprintf(stderr,"listen() failed with error %d\n",WSAGetLastError());
  616. return -1;
  617. }
  618. while(1) {
  619. char *name;
  620. struct hostent *p;
  621. cm_log(("Accepting connections\n"));
  622. fromlen =sizeof(from);
  623. msgsock = accept(listen_socket,(struct sockaddr*)&from, &fromlen);
  624. if (msgsock == INVALID_SOCKET) {
  625. fprintf(stderr,"accept() error %d\n",WSAGetLastError());
  626. return -1;
  627. }
  628. name = inet_ntoa(from.sin_addr);
  629. p = gethostbyaddr((char *)&from.sin_addr, 4, AF_INET);
  630. if (p == NULL) {
  631. printf("can't find host name %s %d\n", name, GetLastError());
  632. closesocket(msgsock);
  633. continue;
  634. }
  635. name = p->h_name;
  636. if (strchr(name, '~')) {
  637. name = strchr(name, '~') + 1;
  638. }
  639. // find node id
  640. for (i = 0; i < NodesSize; i++) {
  641. int j;
  642. j = Strncasecmp(nodes[i], name, strlen(name));
  643. if (j == 0)
  644. break;
  645. }
  646. if (i < NodesSize) {
  647. cm_log(("Accepted node : %d\n", i));
  648. msg_setopt(msgsock);
  649. tmp_socks[i] = msgsock;
  650. CreateThread(NULL, 2*64*1024, srv_io,
  651. (LPVOID) ((ULONGLONG)i), 0, NULL);
  652. } else {
  653. printf("bad node name: %d %s\n", i, name);
  654. closesocket(msgsock);
  655. }
  656. }
  657. return (0);
  658. }
  659. DWORD WINAPI
  660. cmgr(LPVOID arg)
  661. {
  662. int retval;
  663. struct sockaddr_in server;
  664. SOCKET conn_socket;
  665. unsigned short port = (unsigned short) DEFAULT_PORT;
  666. int nodeid = (int) ((ULONGLONG)arg);
  667. char *server_name = nodes[nodeid];
  668. if (send_handles[nodeid] != 0 || (nodeid+1 == MY_NODEID))
  669. return 0;
  670. memset(&server,0,sizeof(server));
  671. if (msg_buildaddr(&server, server_name, ipaddr[nodeid])) {
  672. fprintf(stderr,"Client: cann't resolve name %s\n", server_name);
  673. return 0;
  674. }
  675. //
  676. // Copy the resolved information into the sockaddr_in structure
  677. //
  678. server.sin_family = AF_INET; //hp->h_addrtype;
  679. server.sin_port = htons(port);
  680. again:
  681. ResetEvent(Msg_Event[nodeid]);
  682. /* Open a socket */
  683. conn_socket = WSASocket(AF_INET, PROTOCOL_TYPE, 0, NULL, 0,
  684. WSA_FLAG_OVERLAPPED);
  685. if (conn_socket != 0 ) {
  686. cm_log(("Client connecting to: %s\n", nodes[nodeid]));
  687. msg_setopt(conn_socket);
  688. if (connect(conn_socket,(struct sockaddr*)&server,sizeof(server))
  689. != SOCKET_ERROR) {
  690. cm_log(("Client connected to: %s\n", nodes[nodeid]));
  691. GsLockEnter(msglock);
  692. gs_node_handler[MSG_NODE_JOIN](nodeid+1);
  693. send_handles[nodeid] = conn_socket;
  694. if (!rcv_handles[nodeid]) {
  695. gs_node_handler[MSG_NODE_UP](nodeid+1);
  696. }
  697. GsLockExit(msglock);
  698. srv_msg(conn_socket, nodeid);
  699. msg_closenode(nodeid);
  700. gs_node_handler[MSG_NODE_DOWN](nodeid+1);
  701. } else {
  702. int err = WSAGetLastError();
  703. cm_log(("connect() failed: %d\n", err));
  704. closesocket(conn_socket);
  705. }
  706. } else {
  707. int err = WSAGetLastError();
  708. printf("Client: Error Opening socket: Error %d\n", err);
  709. }
  710. cm_log(("Cmgr %d sleeping\n", nodeid));
  711. WaitForSingleObject(Msg_Event[nodeid], INFINITE); //5 * 1000);
  712. cm_log(("Cmgr %d wokeup\n", nodeid));
  713. goto again;
  714. return (0);
  715. }
  716. static char *MCAST_IPADDR="224.0.20.65";
  717. static int MPORT_NUM=9100;
  718. int
  719. OpenSocket(SOCKET *rs, struct sockaddr_in *sin, ULONG mipaddr, u_short port)
  720. {
  721. struct hostent *h;
  722. int size, msgsize, len, n;
  723. struct sockaddr_in mysin;
  724. SOCKET s;
  725. char hostname[128];
  726. BOOLEAN bFlag = TRUE;
  727. s = WSASocket(AF_INET, SOCK_DGRAM, 0, (LPWSAPROTOCOL_INFO)NULL, 0,
  728. WSA_FLAG_OVERLAPPED | WSA_FLAG_MULTIPOINT_C_LEAF | WSA_FLAG_MULTIPOINT_D_LEAF);
  729. if (s == INVALID_SOCKET) {
  730. fprintf(stderr, "Unable to create socket %d\n", GetLastError());
  731. return 1;
  732. }
  733. #if 1
  734. if (msg_buildaddr(&mysin, nodes[MY_NODEID-1], ipaddr[MY_NODEID-1])) {
  735. fprintf(stderr, "Unable to get my own address\n");
  736. return 1;
  737. }
  738. #if 0
  739. gethostname(hostname, sizeof(hostname));
  740. h = gethostbyname(hostname);
  741. if (h == NULL) {
  742. fprintf(stderr,"cannot get my own address\n");
  743. return 1;
  744. }
  745. printf("host %s addr cnt = %d\n", hostname, h->h_length);
  746. {
  747. int i;
  748. char *p;
  749. p = (char *) h->h_addr_list[0];
  750. for (i = 0; h->h_addr_list[i]; i++) {
  751. struct in_addr x;
  752. char *tmp;
  753. memcpy(&x, p, h->h_length);
  754. tmp = inet_ntoa(x);
  755. printf("Slot %d ip %s\n", i, tmp);
  756. p += h->h_length;
  757. }
  758. }
  759. memcpy(&mysin.sin_addr.s_addr, h->h_addr_list[0], 4);
  760. #endif
  761. #else
  762. if (ipaddr[MY_NODEID-1] != NULL) {
  763. mysin.sin_addr.s_addr = inet_addr(ipaddr[MY_NODEID-1]);
  764. } else {
  765. mysin.sin_addr.s_addr = INADDR_ANY;
  766. }
  767. #endif
  768. mysin.sin_family = PF_INET;
  769. port = htons (port);
  770. mysin.sin_port = (u_short) port;
  771. if (bind (s, (struct sockaddr *)&mysin, sizeof(mysin)) <0) {
  772. fprintf(stderr, "Bind failed %d\n", GetLastError());
  773. return 1;
  774. }
  775. len = sizeof(max_mcmsg);
  776. /* get max. message size */
  777. if (getsockopt(s, SOL_SOCKET, SO_MAX_MSG_SIZE, (PVOID) &max_mcmsg,
  778. &len)) {
  779. fprintf(stderr,"getsockopt SO_MAX_MSG_SIZE failed %d\n",
  780. WSAGetLastError());
  781. closesocket(s);
  782. return 1;
  783. }
  784. max_mcmsg -= sizeof(gs_msg_hdr_t);
  785. printf("Max mcast message %d\n", max_mcmsg);
  786. /* make sure we can run multiple copies */
  787. if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *) &bFlag, sizeof(bFlag))< 0) {
  788. fprintf(stderr, "setsockopt SO_REUSEADDR failed %d\n", GetLastError());
  789. closesocket(s);
  790. return 1;
  791. }
  792. /* disable loopback on send */
  793. bFlag = FALSE;
  794. if (WSAIoctl(s, SIO_MULTIPOINT_LOOPBACK, (char *) &bFlag, sizeof(bFlag), NULL, 0, &n, NULL, NULL)< 0) {
  795. fprintf(stderr, "ioctl loopback failed %d\n", GetLastError());
  796. closesocket(s);
  797. return 1;
  798. }
  799. sin->sin_family = PF_INET;
  800. sin->sin_port = (u_short) (ntohs(port));
  801. sin->sin_addr.s_addr = mipaddr; //inet_addr(MCAST_IPADDR);
  802. /* join the multicast address */
  803. s = WSAJoinLeaf (s, (struct sockaddr *)sin, sizeof (*sin),
  804. NULL, NULL, NULL, NULL, JL_BOTH);
  805. /* dead in the water */
  806. if (s == INVALID_SOCKET) {
  807. fprintf(stderr, "Join failed %d\n", GetLastError());
  808. return 1;
  809. }
  810. *rs = s;
  811. return 0;
  812. }
  813. SOCKET msock;
  814. struct sockaddr_in msin;
  815. void
  816. mcast_send(gs_msg_hdr_t *hdr, const char *buf, int len)
  817. {
  818. int i;
  819. WSABUF io[2];
  820. msg_log(("Send msg mcast type %d seq %d len %d\n",
  821. hdr->h_type, hdr->h_mseq, len));
  822. io[0].buf = (char *) hdr;
  823. io[0].len = sizeof(*hdr);
  824. io[1].buf = (char *) buf;
  825. io[1].len = len;
  826. if (WSASendTo(msock, io, 2, &i, 0,
  827. (struct sockaddr *)&msin, sizeof(msin), 0, 0)) {
  828. int err = WSAGetLastError();
  829. if (err == WSA_IO_PENDING) {
  830. printf("Async send\n");
  831. return;
  832. }
  833. printf("Send failed %d\n", WSAGetLastError());
  834. halt(1);
  835. }
  836. i -= sizeof(*hdr);
  837. if (i != len) {
  838. printf("Send failed: mcast len %d, %d\n", len, i);
  839. halt(1);
  840. }
  841. msg_log(("Send done mcast type %d seq %d\n",
  842. hdr->h_type, hdr->h_mseq));
  843. return;
  844. }
  845. void
  846. mcast_init()
  847. {
  848. u_short port = (u_short) MPORT_NUM;
  849. ULONG ipaddr = inet_addr(MCAST_IPADDR);
  850. if (OpenSocket(&msock, &msin, ipaddr, port) == 1) {
  851. err_log(("Unable to create mcast socket\n"));
  852. mcast_enabled = 0;
  853. max_mcmsg = 0;
  854. }
  855. printf("Mcast %d\n", mcast_enabled);
  856. }
  857. DWORD WINAPI
  858. mcast_srv(LPVOID arg)
  859. {
  860. SOCKET msgsock;
  861. gs_msg_t *msg;
  862. int retval;
  863. char *buf;
  864. int len, flags;
  865. msgsock = msock;
  866. while (1) {
  867. extern gs_msg_handler_t gs_msg_handler[];
  868. int type;
  869. WSABUF io[2];
  870. msg = msg_alloc(NULL, GS_MAX_MSG_SZ);
  871. assert(msg);
  872. assert(msg->m_buflen != 0);
  873. io[0].buf = (char *)&msg->m_hdr;
  874. io[0].len = sizeof(msg->m_hdr);
  875. io[1].buf = msg->m_buf;
  876. io[1].len = msg->m_buflen;
  877. flags = 0;
  878. retval = WSARecv(msgsock, io, 2, &len, &flags, 0, 0);
  879. if (retval == SOCKET_ERROR) {
  880. err_log(("mcast recv failed %d, %d, len %d\n",
  881. retval,
  882. WSAGetLastError(), msg->m_buflen));
  883. msg_free(msg);
  884. halt(1);
  885. return 0;
  886. }
  887. if (len != (int)(msg->m_hdr.h_len + sizeof(msg->m_hdr))) {
  888. err_log(("Bad mcast recv got %d, expected %d\n", len, msg->m_hdr.h_len));
  889. halt(1);
  890. }
  891. msg_log(("rec mcast nid %d gid %d type %d seq %d view %d len %d\n",
  892. msg->m_hdr.h_sid,msg->m_hdr.h_gid, type = msg->m_hdr.h_type,
  893. msg->m_hdr.h_mseq, msg->m_hdr.h_viewnum, msg->m_hdr.h_len));
  894. gs_msg_handler[msg->m_hdr.h_type](msg);
  895. msg_log(("Done Type %d\n", type));
  896. }
  897. }
  898. void
  899. msg_set_uport(int uport)
  900. {
  901. DEFAULT_PORT = uport;
  902. }
  903. void
  904. msg_set_mport(int mport)
  905. {
  906. MPORT_NUM = mport;
  907. }
  908. void
  909. msg_set_subnet(char *addr)
  910. {
  911. strcpy(cl_subnet, addr);
  912. }
  913. void
  914. msg_set_mipaddr(char *addr)
  915. {
  916. }
  917. void
  918. msg_set_bufcount(int count)
  919. {
  920. MSG_POOL_SZ = count;
  921. }
  922. void
  923. msg_set_bufsize(int size)
  924. {
  925. if (size > GS_MAX_MSG_SZ) {
  926. fprintf(stderr,"You are exceeding the 64K msg size limit\n");
  927. } else {
  928. GS_MAX_MSG_SZ = size;
  929. }
  930. }
  931. void
  932. msg_set_mode(int mode)
  933. {
  934. mcast_enabled = mode;
  935. }