Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

706 lines
16 KiB

  1. /*++
  2. Copyright (c) 2000 Microsoft Corporation
  3. Module Name:
  4. receive.c
  5. Abstract:
  6. Receive handler and sends reply packets
  7. Author:
  8. Ahmed Mohamed (ahmedm) 12, 01, 2000
  9. Revision History:
  10. --*/
  11. #include "gs.h"
  12. #include "gsp.h"
  13. #include <stdio.h>
  14. #include <assert.h>
  15. void
  16. GspDumpQueue(gs_group_t *gd)
  17. {
  18. gs_msg_t *q;
  19. int i = 0;
  20. for (q = gd->g_recv.r_head; q != NULL; q = q->m_next) {
  21. state_log(("Msg %x: nid %d gid %d type %d mseq %d bnum %d flags %x cnt %d\n",
  22. q, q->m_hdr.h_sid, q->m_hdr.h_gid, q->m_hdr.h_type,
  23. q->m_hdr.h_mseq, q->m_hdr.h_bnum, q->m_hdr.h_flags, q->m_refcnt));
  24. i++;
  25. if (i > 100) {
  26. err_log(("Infinite loop\n"));
  27. halt(1);
  28. }
  29. }
  30. state_log(("Head %x Next %x expecting <%d, %d.\n",
  31. gd->g_recv.r_head,
  32. gd->g_recv.r_next,
  33. gd->g_recv.r_mseq,
  34. gd->g_recv.r_bnum));
  35. }
  36. void
  37. GspRemoveMsg(gs_group_t *gd, gs_msg_t *msg)
  38. {
  39. gs_msg_t **p;
  40. gs_msg_t *q;
  41. gs_log(("Remove gid %d seq %d msg %x\n", gd->g_id,
  42. msg->m_hdr.h_mseq, msg));
  43. GspDumpQueue(gd);
  44. if (msg->m_hdr.h_flags & GS_FLAGS_QUEUED) {
  45. while ((q = gd->g_recv.r_head) != msg) {
  46. if (q == NULL) {
  47. err_log(("Internal error: null head during remove %x\n", msg));
  48. GspDumpQueue(gd);
  49. halt(1);
  50. break;
  51. }
  52. q->m_hdr.h_flags &= ~GS_FLAGS_QUEUED;
  53. gd->g_recv.r_head = q->m_next;
  54. msg_free(q);
  55. }
  56. // delay the freeing of continued messages to simplify recovery
  57. if (!(msg->m_hdr.h_flags & GS_FLAGS_CONTINUED)) {
  58. msg->m_refcnt--;
  59. msg->m_hdr.h_flags &= ~GS_FLAGS_QUEUED;
  60. gd->g_recv.r_head = msg->m_next;
  61. if (&msg->m_next == gd->g_recv.r_next)
  62. gd->g_recv.r_next = &gd->g_recv.r_head;
  63. }
  64. }
  65. msg_free(msg);
  66. GspDumpQueue(gd);
  67. }
  68. void
  69. GspCleanQueue(gs_group_t *gd, gs_sequence_t mseq)
  70. {
  71. gs_msg_t *q, *msg;
  72. gs_log(("Clean gid %d seq %d\n", gd->g_id, mseq));
  73. GspDumpQueue(gd);
  74. while ((q = gd->g_recv.r_head) != NULL && q->m_hdr.h_mseq < mseq) {
  75. if (&q->m_next == gd->g_recv.r_next) {
  76. gd->g_recv.r_next = &gd->g_recv.r_head;
  77. }
  78. q->m_hdr.h_flags &= ~GS_FLAGS_QUEUED;
  79. gd->g_recv.r_head = q->m_next;
  80. msg_free(q);
  81. }
  82. GspDumpQueue(gd);
  83. }
  84. void
  85. GspUOrderInsert(gs_group_t *gd, gs_msg_t *head, gs_msg_t *tail,
  86. gs_sequence_t mseq, gs_sequence_t bnum)
  87. {
  88. gs_msg_t **p;
  89. // insert msg into proper order in receive queue
  90. // this routine needs to check for duplicates
  91. gs_log(("Add ucast gid %d mseq %d,%d head %x tail %x @ next %x\n",
  92. gd->g_id, mseq, bnum,
  93. head, tail, gd->g_recv.r_next));
  94. p = gd->g_recv.r_next;
  95. while (*p) {
  96. if ((*p)->m_hdr.h_mseq > mseq) {
  97. tail->m_next = *p;
  98. *p = head;
  99. return;
  100. }
  101. p = &(*p)->m_next;
  102. }
  103. // add at tail of history queue
  104. tail->m_next = *p;
  105. *p = head;
  106. GspDumpQueue(gd);
  107. }
  108. void
  109. GspOrderInsert(gs_group_t *gd, gs_msg_t *head, gs_msg_t *tail,
  110. gs_sequence_t mseq, gs_sequence_t bnum)
  111. {
  112. gs_msg_t **p;
  113. // check if we have already processed this sequence
  114. if (mseq < gd->g_recv.r_mseq || (mseq == gd->g_recv.r_mseq &&
  115. bnum < gd->g_recv.r_bnum)) {
  116. gs_log(("Droping msg %d,%d @ %d,%d\n", mseq, bnum,
  117. gd->g_recv.r_mseq, gd->g_recv.r_bnum));
  118. msg_free(head);
  119. return;
  120. }
  121. if (head->m_hdr.h_flags & GS_FLAGS_REPLAY) {
  122. p = &gd->g_recv.r_head;
  123. while (p != gd->g_recv.r_next && *p != NULL) {
  124. if ((*p)->m_hdr.h_mseq == mseq && (*p)->m_hdr.h_bnum == bnum) {
  125. gs_log(("duplicate pending type %d mseq %d bnum %d\n",
  126. head->m_hdr.h_type, mseq, bnum));
  127. msg_free(head);
  128. return;
  129. }
  130. }
  131. }
  132. // insert msg into proper order in receive queue
  133. // this routine needs to check for duplicates
  134. gs_log(("Add gid %d mseq %d,%d head %x tail %x @ next %x\n",
  135. gd->g_id, mseq, bnum,
  136. head, tail, gd->g_recv.r_next));
  137. p = gd->g_recv.r_next;
  138. while (*p) {
  139. if ((*p)->m_hdr.h_mseq > mseq ||
  140. ((*p)->m_hdr.h_mseq == mseq && (*p)->m_hdr.h_bnum > bnum)) {
  141. tail->m_next = *p;
  142. *p = head;
  143. return;
  144. } else if ((*p)->m_hdr.h_mseq == mseq && (*p)->m_hdr.h_bnum == bnum) {
  145. assert(head->m_hdr.h_flags & GS_FLAGS_REPLAY);
  146. assert(head == tail);
  147. gs_log(("duplicate type %d mseq %d bnum %d\n", head->m_hdr.h_type,mseq, bnum));
  148. msg_free(head);
  149. return;
  150. }
  151. p = &(*p)->m_next;
  152. }
  153. // add at tail of history queue
  154. tail->m_next = *p;
  155. *p = head;
  156. GspDumpQueue(gd);
  157. }
  158. void
  159. GspReplyMsgHandler(gs_msg_t *msg)
  160. {
  161. gs_msg_hdr_t *hdr;
  162. gs_group_t *gd;
  163. gs_context_t *ctx;
  164. hdr = &msg->m_hdr;
  165. // find group using group internal identifier
  166. gd = GspLookupGroup(hdr->h_gid);
  167. GsLockEnter(gd->g_lock);
  168. // find context in waiting queue
  169. ctx = GspLookupContext(gd, hdr->h_cid);
  170. assert(ctx != NULL);
  171. if (ctx->ctx_msg == NULL) {
  172. err_log(("Internal error gid %d ctx %d mseq %d bnum %d flags %x mask %x\n",
  173. ctx->ctx_gid, ctx->ctx_id, ctx->ctx_mseq, ctx->ctx_bnum,
  174. ctx->ctx_flags, ctx->ctx_mask));
  175. err_log(("Internal error msg sid %d mid %d gid %d ctx %d mseq %d bnum %d flags %x\n",
  176. hdr->h_sid, hdr->h_mid,
  177. hdr->h_gid, hdr->h_cid, hdr->h_mseq, hdr->h_bnum, hdr->h_flags));
  178. halt(1);
  179. }
  180. assert(ctx->ctx_msg != NULL);
  181. if (ctx->ctx_msg->m_hdr.h_mseq != hdr->h_mseq) {
  182. err_log(("Internal error ctx %d %d reply %d mismatch %d\n",
  183. ctx->ctx_id, hdr->h_cid,
  184. hdr->h_mseq,
  185. ctx->ctx_msg->m_hdr.h_mseq));
  186. halt(1);
  187. }
  188. GspProcessReply(gd, ctx, msg->m_hdr.h_sid, msg->m_buf, msg->m_hdr.h_len,
  189. *((NTSTATUS *)msg->m_hdr.h_tag));
  190. GsLockExit(gd->g_lock);
  191. msg_free(msg);
  192. }
  193. void
  194. GspSendAck(gs_group_t *gd, gs_msg_t *msg, NTSTATUS status)
  195. {
  196. gs_msg_hdr_t *hdr;
  197. hdr = &msg->m_hdr;
  198. if (hdr->h_cid == (gs_cookie_t) -1)
  199. return;
  200. gs_log(("Ack nid %d msg %x flags %x\n",hdr->h_sid, msg,
  201. msg->m_hdr.h_flags));
  202. if (hdr->h_sid != gd->g_nid) {
  203. gs_msg_hdr_t rhdr;
  204. memcpy(&rhdr, hdr, sizeof(rhdr));
  205. rhdr.h_sid = (gs_memberid_t) gd->g_nid;
  206. rhdr.h_mid = hdr->h_sid;
  207. rhdr.h_type = GS_MSG_TYPE_ACK;
  208. rhdr.h_len = 0;
  209. *((NTSTATUS *)rhdr.h_tag) = status;
  210. msg_send(hdr->h_sid, &rhdr, NULL, 0);
  211. } else {
  212. gs_context_t *ctx;
  213. ctx = GspLookupContext(gd, hdr->h_cid);
  214. GspProcessReply(gd, ctx, gd->g_nid, NULL, 0, status);
  215. }
  216. }
  217. NTSTATUS
  218. WINAPI
  219. GsSendReply(HANDLE cookie, PVOID buf, int len, NTSTATUS status)
  220. {
  221. gs_group_t *gd;
  222. gs_msg_t *msg = (gs_msg_t *)cookie;
  223. NTSTATUS err = ERROR_SUCCESS;
  224. if (msg == NULL || msg->m_hdr.h_rlen < len)
  225. return ERROR_INVALID_PARAMETER;
  226. // find group
  227. gd = GspLookupGroup(msg->m_hdr.h_gid);
  228. GsLockEnter(gd->g_lock);
  229. if (!(msg->m_hdr.h_flags & GS_FLAGS_REPLY) &&
  230. msg->m_hdr.h_rlen >= len) {
  231. // mark msg state
  232. msg->m_hdr.h_flags |= GS_FLAGS_REPLY;
  233. gs_log(("Reply msg %x flags %x len %x ubuf %x ulen %x\n",msg,
  234. msg->m_hdr.h_flags, msg->m_hdr.h_rlen, buf, len));
  235. // local reply
  236. if (msg->m_hdr.h_sid == gd->g_nid) {
  237. gs_context_t *ctx;
  238. // find context in waiting queue
  239. ctx = GspLookupContext(gd, msg->m_hdr.h_cid);
  240. assert(ctx != NULL);
  241. assert(ctx->ctx_msg->m_hdr.h_mseq == hdr->h_mseq);
  242. GspProcessReply(gd, ctx, msg->m_hdr.h_sid, (char *)buf, len, status);
  243. } else {
  244. gs_msg_hdr_t rhdr;
  245. memcpy(&rhdr, &msg->m_hdr, sizeof(rhdr));
  246. rhdr.h_sid = gd->g_nid;
  247. rhdr.h_mid = msg->m_hdr.h_sid;
  248. rhdr.h_type = GS_MSG_TYPE_REPLY;
  249. rhdr.h_len = (UINT16) len;
  250. *((NTSTATUS *)rhdr.h_tag) = status;
  251. msg_send(rhdr.h_mid, &rhdr, (const char *)buf, len);
  252. }
  253. // release msg
  254. msg_free(msg);
  255. } else {
  256. gs_log(("Reply failed %x: flags %x len %x ubuf %x ulen %x\n",msg,
  257. msg->m_hdr.h_flags, msg->m_hdr.h_rlen, buf, len));
  258. err = ERROR_INVALID_OPERATION;
  259. }
  260. GsLockExit(gd->g_lock);
  261. return err;
  262. }
  263. static gs_eventid_t GsTypeToEventId[] = {
  264. GsEventInvalid,
  265. GsEventInvalid,
  266. GsEventData,
  267. GsEventInvalid,
  268. GsEventSingleData,
  269. GsEventInvalid,
  270. GsEventInvalid,
  271. GsEventInvalid,
  272. GsEventInvalid,
  273. GsEventMemberJoin,
  274. GsEventMemberUp,
  275. GsEventInvalid,
  276. GsEventMemberEvicted,
  277. GsEventInvalid,
  278. GsEventMemberDown
  279. };
  280. #define GsMsgTypeToEventId(x) (x != GS_MSG_TYPE_ABORT ? GsTypeToEventId[x] : GsEventAbort)
  281. void
  282. GspSyncMember(gs_group_t *gd, gs_memberid_t mid, gs_sequence_t mseq)
  283. {
  284. gs_msg_t *p;
  285. // forward all messages that we have sent with higher sequence number
  286. for (p = gd->g_recv.r_head; p != NULL; p = p->m_next) {
  287. if (p->m_hdr.h_sid == gd->g_nid && p->m_hdr.h_mseq > mseq &&
  288. p->m_hdr.h_type != GS_MSG_TYPE_UCAST) {
  289. gs_context_t *ctx = &gd->g_send.s_ctxpool[p->m_hdr.h_cid];
  290. assert(ctx->ctx_msg == p);
  291. if (!(ctx->ctx_mask & (1 << mid))) {
  292. recovery_log(("sync node %d mseq %d\n", mid, p->m_hdr.h_mseq));
  293. ctx->ctx_mask |= (1 << mid);
  294. msg_send(mid, &p->m_hdr, p->m_buf, p->m_hdr.h_len);
  295. }
  296. }
  297. }
  298. }
  299. void
  300. GspDeliverMsg(gs_group_t *gd, gs_msg_t *msg)
  301. {
  302. IO_STATUS_BLOCK ios;
  303. NTSTATUS status;
  304. gs_memberid_t mid;
  305. switch(msg->m_hdr.h_type) {
  306. case GS_MSG_TYPE_UP:
  307. mid = *((gs_memberid_t *)msg->m_hdr.h_tag);
  308. GspAddMember(gd, mid, *(int *)msg->m_buf);
  309. GspSyncMember(gd, mid, msg->m_hdr.h_mseq);
  310. recovery_log(("New membership gid %d view %d,%d sz %d set %x\n",
  311. gd->g_id,
  312. gd->g_startview, gd->g_curview, gd->g_sz, gd->g_mset));
  313. break;
  314. default:
  315. break;
  316. }
  317. // hold msg
  318. msg->m_refcnt++;
  319. GsLockExit(gd->g_lock);
  320. ios.Status = GsMsgTypeToEventId(msg->m_hdr.h_type);
  321. ios.Information = msg->m_hdr.h_len;
  322. status = gd->g_callback((HANDLE)msg, msg->m_hdr.h_tag, msg->m_buf, &ios);
  323. GsLockEnter(gd->g_lock);
  324. if (status == STATUS_PENDING) {
  325. gs_log(("Reply msg pending %x\n", msg));
  326. return;
  327. }
  328. if (!(msg->m_hdr.h_flags & GS_FLAGS_REPLY)) {
  329. msg->m_hdr.h_flags |= GS_FLAGS_REPLY;
  330. // *((NTSTATUS *)msg->m_hdr.h_tag) = status;
  331. // release msg
  332. msg->m_refcnt--;
  333. GspSendAck(gd, msg, status);
  334. }
  335. if (msg->m_hdr.h_type == GS_MSG_TYPE_UCAST) {
  336. msg->m_refcnt++;
  337. msg->m_hdr.h_flags &= ~GS_FLAGS_CONTINUED;
  338. GspRemoveMsg(gd, msg);
  339. }
  340. }
  341. void
  342. GspDispatch(gs_group_t *gd)
  343. {
  344. gs_msg_t *msg;
  345. assert(gd->g_recv.r_next != NULL);
  346. while (gd->g_pending == 0 && (msg = *(gd->g_recv.r_next)) != NULL) {
  347. int hit = FALSE;
  348. int flags;
  349. if (msg->m_hdr.h_type != GS_MSG_TYPE_UCAST) {
  350. // compare sequence numbers
  351. if (gd->g_recv.r_mseq == msg->m_hdr.h_mseq &&
  352. gd->g_recv.r_bnum == msg->m_hdr.h_bnum) {
  353. // got it
  354. hit = TRUE;
  355. }
  356. } else {
  357. // compare sequence numbers
  358. if (gd->g_recv.r_mseq >= msg->m_hdr.h_mseq) {
  359. // got it
  360. hit = TRUE;
  361. }
  362. }
  363. if (hit == FALSE) {
  364. break;
  365. }
  366. gd->g_pending = 1;
  367. msg->m_hdr.h_flags &= ~GS_FLAGS_REPLY;
  368. flags = msg->m_hdr.h_flags;
  369. gs_log(("dispatch seq <%d, %d> flags %x msg %x @ next %x\n",
  370. msg->m_hdr.h_mseq,
  371. msg->m_hdr.h_bnum,
  372. flags, msg, gd->g_recv.r_next));
  373. // advance next msg to deliver
  374. gd->g_recv.r_next = &msg->m_next;
  375. // don't touch msg beyond this point, it may get freed as part of delivery
  376. if (msg->m_hdr.h_type != GS_MSG_TYPE_SKIP) {
  377. GspDeliverMsg(gd, msg);
  378. }
  379. // if a continued msg don't advance mseq/bnum
  380. if (!(flags & GS_FLAGS_CONTINUED)) {
  381. if (flags & GS_FLAGS_LAST) {
  382. gd->g_recv.r_bnum = 0;
  383. gd->g_recv.r_mseq++;
  384. } else if (!(flags & GS_FLAGS_PTP)) {
  385. gd->g_recv.r_bnum += (1 << 16);
  386. }
  387. } else if (!(flags & GS_FLAGS_PTP)) {
  388. gd->g_recv.r_bnum++;
  389. }
  390. gd->g_pending = 0;
  391. }
  392. gs_log(("waiting gid %d expect <%d, %d>\n",
  393. gd->g_id, gd->g_recv.r_mseq, gd->g_recv.r_bnum));
  394. GspDumpQueue(gd);
  395. }
  396. #if 0
  397. WINAPI
  398. GsReceiveRequest(gd, buf, len, ios)
  399. {
  400. GsLockEnter(gd->recv_lock);
  401. m = gd->recv_last;
  402. // advance receive window
  403. if (m && m->state == MSG_STATE_DELIVERED) {
  404. if (m->flags & GS_FLAGS_DELIVERED) {
  405. msg_send_reply(m->srcid, m->mseq, m->cseq..);
  406. m->reply = 1;
  407. }
  408. m->state = MSG_STATE_DONE;
  409. // check if this msg can be freed before moving to next one
  410. m = m->next;
  411. }
  412. if (m && m->state == MSG_STATE_READY) {
  413. m->state = MSG_STATE_DELIVERED;
  414. GsLockExit(gd->recv_lock);
  415. memcpy(buf, m->data, m->len);
  416. Ios->status = m->srcid;
  417. Ios->information = m->len;
  418. Return SUCCESS;
  419. }
  420. // queue request
  421. irp->next = gd->recv_pending_queue;
  422. gd->recv_pending_queue = irp;
  423. GsLockExit(gd->recv_lock);
  424. Return PENDING;
  425. }
  426. #endif
  427. void
  428. GspMcastMsgHandler(gs_msg_t *msg)
  429. {
  430. gs_msg_hdr_t *hdr;
  431. gs_group_t *gd;
  432. hdr = &msg->m_hdr;
  433. gd = GspLookupGroup(hdr->h_gid);
  434. // accept messages only if in a valid view
  435. if (gd && GspValidateView(gd, msg->m_hdr.h_viewnum)) {
  436. gs_sequence_t lseq = msg->m_hdr.h_lseq;
  437. GsLockEnter(gd->g_lock);
  438. hdr->h_flags |= GS_FLAGS_QUEUED;
  439. // insert msg into dispatch queue at proper order
  440. GspOrderInsert(gd, msg, msg, hdr->h_mseq, hdr->h_bnum);
  441. GspDispatch(gd);
  442. GspCleanQueue(gd, lseq);
  443. GsLockExit(gd->g_lock);
  444. } else {
  445. msg_free(msg);
  446. }
  447. }
  448. void
  449. GspUcastMsgHandler(gs_msg_t *msg)
  450. {
  451. gs_msg_hdr_t *hdr;
  452. gs_group_t *gd;
  453. hdr = &msg->m_hdr;
  454. gd = GspLookupGroup(hdr->h_gid);
  455. if (gd && GspValidateView(gd, msg->m_hdr.h_viewnum)) {
  456. gs_sequence_t lseq = msg->m_hdr.h_lseq;
  457. GsLockEnter(gd->g_lock);
  458. hdr->h_flags |= GS_FLAGS_QUEUED;
  459. // insert msg into dispatch queue at proper order
  460. GspUOrderInsert(gd, msg, msg, hdr->h_mseq, hdr->h_bnum);
  461. GspDispatch(gd);
  462. GspCleanQueue(gd, lseq);
  463. GsLockExit(gd->g_lock);
  464. } else {
  465. gs_log(("Dropping ucast: gid %d nid %d mseq %d view %d\n", hdr->h_gid,
  466. hdr->h_mid, hdr->h_mseq, hdr->h_viewnum));
  467. msg_free(msg);
  468. }
  469. }
  470. void
  471. GspSeqAllocMsgHandler(gs_msg_t *msg)
  472. {
  473. gs_msg_hdr_t *hdr;
  474. gs_seq_info_t info;
  475. gs_group_t *gd;
  476. hdr = &msg->m_hdr;
  477. gd = GspLookupGroup(hdr->h_gid);
  478. if (gd) {
  479. GsLockEnter(gd->g_lock);
  480. info.mseq = gd->g_global_seq++;
  481. info.viewnum = gd->g_curview;
  482. GsLockExit(gd->g_lock);
  483. hdr->h_mid = hdr->h_sid;
  484. hdr->h_sid = gd->g_nid;
  485. hdr->h_type = GS_MSG_TYPE_SEQREPLY;
  486. hdr->h_len = sizeof(info);
  487. gs_log(("SeqAlloc: nid %d mseq %d view %d\n",
  488. hdr->h_mid, info.mseq, info.viewnum));
  489. msg_send(hdr->h_mid, hdr, (char *) &info, sizeof(info));
  490. }
  491. msg_free(msg);
  492. }
  493. void
  494. GspSeqReplyMsgHandler(gs_msg_t *msg)
  495. {
  496. gs_msg_hdr_t *hdr;
  497. gs_group_t *gd;
  498. gs_context_t *ctx;
  499. hdr = &msg->m_hdr;
  500. assert(hdr->h_len == sizeof(gs_seq_info_t));
  501. // find group using group internal identifier
  502. gd = GspLookupGroup(hdr->h_gid);
  503. if (gd != NULL && GspValidateView(gd, hdr->h_viewnum)) {
  504. gs_seq_info_t *info = (gs_seq_info_t *)msg->m_buf;
  505. GsLockEnter(gd->g_lock);
  506. if (GspValidateView(gd, info->viewnum) && hdr->h_sid == gd->g_mid) {
  507. GspProcessWaitQueue(gd, info);
  508. }
  509. GsLockExit(gd->g_lock);
  510. }
  511. msg_free(msg);
  512. }
  513. void
  514. GspJoinRequestMsgHandler(gs_msg_t *msg)
  515. {
  516. gs_msg_hdr_t *hdr;
  517. gs_join_info_t info;
  518. gs_group_t *gd;
  519. hdr = &msg->m_hdr;
  520. gd = GspLookupGroup(hdr->h_gid);
  521. if (gd) {
  522. GsLockEnter(gd->g_lock);
  523. info.mseq = gd->g_global_seq++;
  524. info.viewnum = gd->g_curview;
  525. info.mset = gd->g_mset;
  526. info.sz = gd->g_sz;
  527. GsLockExit(gd->g_lock);
  528. hdr->h_mid = hdr->h_sid;
  529. hdr->h_sid = gd->g_nid;
  530. hdr->h_type = GS_MSG_TYPE_REPLY;
  531. hdr->h_len = sizeof(info);
  532. msg_send(hdr->h_mid, hdr, (char *) &info, sizeof(info));
  533. }
  534. msg_free(msg);
  535. }
  536. void
  537. GspJoinUpMsgHandler(gs_msg_t *msg)
  538. {
  539. gs_group_t *gd;
  540. gs_msg_hdr_t *hdr;
  541. hdr = &msg->m_hdr;
  542. gd = GspLookupGroup(hdr->h_gid);
  543. // accept messages only if in a valid view
  544. if (gd && GspValidateView(gd, msg->m_hdr.h_viewnum)) {
  545. GsLockEnter(gd->g_lock);
  546. hdr->h_flags |= GS_FLAGS_QUEUED;
  547. // insert msg into dispatch queue at proper order
  548. GspOrderInsert(gd, msg, msg, hdr->h_mseq, hdr->h_bnum);
  549. GspDispatch(gd);
  550. GsLockExit(gd->g_lock);
  551. } else {
  552. msg_free(msg);
  553. }
  554. }
  555. void GspInfoMsgHandler(gs_msg_t *);
  556. void GspMmMsgHandler(gs_msg_t *);
  557. void GspRecoveryMsgHandler(gs_msg_t *);
  558. void GspSyncMsgHandler(gs_msg_t *);
  559. gs_msg_handler_t gs_msg_handler[] = {
  560. GspSeqAllocMsgHandler,
  561. GspSeqReplyMsgHandler,
  562. GspMcastMsgHandler,
  563. GspReplyMsgHandler,
  564. GspUcastMsgHandler,
  565. GspReplyMsgHandler,
  566. GspInfoMsgHandler,
  567. GspMmMsgHandler,
  568. GspJoinRequestMsgHandler,
  569. GspJoinUpMsgHandler, // join
  570. GspJoinUpMsgHandler, // up
  571. NULL, // evict request
  572. NULL, // evict
  573. GspRecoveryMsgHandler,
  574. GspSyncMsgHandler
  575. };