Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

489 lines
11 KiB

  1. /*++
  2. Copyright (c) 2000 Microsoft Corporation
  3. Module Name:
  4. recovery.c
  5. Abstract:
  6. Handles node down events
  7. Author:
  8. Ahmed Mohamed (ahmedm) 12, 01, 2000
  9. Revision History:
  10. --*/
  11. #include "gs.h"
  12. #include "gsp.h"
  13. #include <stdio.h>
  14. extern gs_nid_t GsLocalNodeId;
  15. extern int GsMaxNodeId;
  16. extern int GsMinNodeId;
  17. extern gs_group_t GsGroupTable[];
  18. // Node down event
  19. void
  20. GspRsFree(gs_recovery_state_t *rs)
  21. {
  22. // free recovery state
  23. gs_rblk_t *p;
  24. while (p = rs->rs_list) {
  25. rs->rs_list = p->next;
  26. free((char *)p);
  27. }
  28. GsEventFree(rs->rs_event);
  29. free((char *)rs);
  30. }
  31. void
  32. GspPhase1NodeDown(ULONG set)
  33. {
  34. gs_group_t *gd;
  35. int i, j;
  36. for (i = 0; i < GsGroupTableSize; i++) {
  37. gd = &GsGroupTable[i];
  38. if (gd->g_state == GS_GROUP_STATE_FREE) {
  39. continue;
  40. }
  41. GsLockEnter(gd->g_lock);
  42. if (gd->g_mset & set) {
  43. gd->g_mset &= ~set;
  44. gd->g_curview++;
  45. gd->g_state |= GS_GROUP_FLAGS_RECOVERY;
  46. gd->g_sz = 0;
  47. for (j = gd->g_mset; j > 0; j = j >> 1) {
  48. if (j & 0x1)
  49. gd->g_sz++;
  50. }
  51. if (set & (1 << gd->g_mid)) {
  52. for (j = GsMinNodeId; j != GsMaxNodeId; j++) {
  53. if (gd->g_mset & (1 << j)) {
  54. break;
  55. }
  56. }
  57. // elect a new master
  58. gd->g_mid = (gs_memberid_t) j;
  59. gd->g_state |= GS_GROUP_FLAGS_NEWMASTER;
  60. }
  61. recovery_log(("Phase1 mask %x gid %d mid %d mset %x sz %d\n",
  62. set, gd->g_id, gd->g_mid, gd->g_mset,
  63. gd->g_sz));
  64. if (gd->g_rs != NULL) {
  65. set |= gd->g_rs->rs_dset;
  66. GsEventSignal(gd->g_rs->rs_event);
  67. GspRsFree(gd->g_rs);
  68. }
  69. gd->g_rs = (gs_recovery_state_t *) malloc(sizeof(*gd->g_rs));
  70. assert(gd->g_rs != NULL);
  71. GsManualEventInit(gd->g_rs->rs_event);
  72. gd->g_rs->rs_sz = 0;
  73. gd->g_rs->rs_list = NULL;
  74. gd->g_rs->rs_epoch = gd->g_curview;
  75. gd->g_rs->rs_dset = set;
  76. gd->g_rs->rs_mset = gd->g_mset;
  77. if (gd->g_mid != gd->g_nid) {
  78. // we are not master, reset our mset to self and master only
  79. gd->g_rs->rs_mset = (1 << gd->g_nid) | (1 << gd->g_mid);
  80. }
  81. } else if (gd->g_mset == 0 && (set & (1 << gd->g_mid))) {
  82. // no one is participating in this group and the sole owner dead
  83. // remove the group and free it
  84. GsCloseGroup(gd);
  85. }
  86. GsLockExit(gd->g_lock);
  87. }
  88. }
  89. void
  90. GspRsAddSequence(gs_recovery_state_t *rs, gs_sequence_t mseq, int delta)
  91. {
  92. gs_rblk_t *p, **q;
  93. for (p = rs->rs_list; p != NULL; p = p->next) {
  94. if (p->mseq == mseq) {
  95. p->have += delta;
  96. recovery_log(("Found seq %d cnt %d\n", mseq, p->have));
  97. return;
  98. }
  99. }
  100. // if we get here that means the sequence is missing
  101. p = (gs_rblk_t *) malloc(sizeof(*p));
  102. if (p == NULL) {
  103. err_log(("GspRsAddSeq: unable to allocate memory!\n"));
  104. exit(1);
  105. }
  106. p->mseq = mseq;
  107. p->have = delta;
  108. recovery_log(("Add seq %d cnt %d\n", mseq, p->have));
  109. rs->rs_sz++;
  110. q = &rs->rs_list;
  111. while (*q != NULL) {
  112. if ((*q)->mseq > mseq) {
  113. p->next = *q;
  114. *q = p;
  115. return;
  116. }
  117. q = &(*q)->next;
  118. }
  119. p->next = *q;
  120. *q = p;
  121. }
  122. void
  123. GspPhase2NodeDown(ULONG set)
  124. {
  125. gs_group_t *gd;
  126. int i, j;
  127. for (i = 0; i < GsGroupTableSize; i++) {
  128. gd = &GsGroupTable[i];
  129. if (!(gd->g_state & GS_GROUP_FLAGS_RECOVERY)) {
  130. continue;
  131. }
  132. GsLockEnter(gd->g_lock);
  133. if (gd->g_state & GS_GROUP_FLAGS_RECOVERY) {
  134. gs_msg_t *p;
  135. extern void GspDumpQueue(gs_group_t*);
  136. recovery_log(("Phase2 queue\n"));
  137. GspDumpQueue(gd);
  138. recovery_log(("Expect gid %d <%d, %d>\n",
  139. gd->g_id, gd->g_recv.r_mseq, gd->g_recv.r_bnum));
  140. // walk recv queue and replay messages from dead members
  141. for (p = gd->g_recv.r_head; p != NULL; p = p->m_next) {
  142. if (set & (1 << p->m_hdr.h_sid)) {
  143. // tag message as if we got a reply
  144. p->m_hdr.h_flags |= GS_FLAGS_REPLY;
  145. if (p->m_hdr.h_type != GS_MSG_TYPE_UCAST){
  146. p->m_hdr.h_flags |= GS_FLAGS_REPLAY;
  147. msg_mcast(gd->g_mset, &p->m_hdr,
  148. p->m_buf, p->m_hdr.h_len);
  149. }
  150. // check of unclosed continued sends
  151. if (p->m_hdr.h_flags & GS_FLAGS_CONTINUED) {
  152. gs_msg_t *q;
  153. q = p->m_next;
  154. if (q == NULL ||
  155. q->m_hdr.h_mseq != p->m_hdr.h_mseq ||
  156. q->m_hdr.h_bnum != p->m_hdr.h_bnum+1) {
  157. q = msg_alloc(NULL, 0);
  158. if (q == NULL) {
  159. err_log(("Unable to allocate memory!\n"));
  160. halt(1);
  161. }
  162. memcpy(&q->m_hdr, &p->m_hdr, sizeof(p->m_hdr));
  163. q->m_hdr.h_type = GS_MSG_TYPE_ABORT;
  164. q->m_hdr.h_len = 0;
  165. q->m_hdr.h_bnum++;
  166. q->m_hdr.h_flags = GS_FLAGS_LAST;
  167. // insert abort msg
  168. q->m_next = p->m_next;
  169. p->m_next = q;
  170. }
  171. }
  172. }
  173. }
  174. // walk recv queue and build msg of sequences we have
  175. for (p = gd->g_recv.r_head; p != NULL; p = p->m_next) {
  176. if (p->m_hdr.h_mseq != GS_MSG_TYPE_UCAST)
  177. GspRsAddSequence(gd->g_rs, p->m_hdr.h_mseq, 1);
  178. }
  179. // send msg of sequences to master
  180. if (gd->g_mid != gd->g_nid) {
  181. gs_rblk_t *p;
  182. gs_sequence_t *list;
  183. int k;
  184. gs_msg_hdr_t hdr;
  185. recovery_log(("Sending sequence state to master %d\n", gd->g_mid));
  186. list = (gs_sequence_t *) malloc(sizeof(*list) * gd->g_rs->rs_sz);
  187. if (list == NULL) {
  188. err_log(("Unable to allocate memory during recovery\n"));
  189. exit(1);
  190. }
  191. k = 0;
  192. for (p = gd->g_rs->rs_list; p != NULL; p = p->next) {
  193. list[k] = p->mseq;
  194. k++;
  195. }
  196. assert(k == gd->g_rs->rs_sz);
  197. k = k * sizeof(*list);
  198. hdr.h_len = (UINT16) k;
  199. hdr.h_type = GS_MSG_TYPE_RECOVERY;
  200. hdr.h_sid = (gs_memberid_t)gd->g_nid;
  201. hdr.h_mid = (gs_memberid_t) gd->g_mid;
  202. hdr.h_gid = gd->g_id;
  203. hdr.h_viewnum = gd->g_curview;
  204. hdr.h_mseq = gd->g_recv.r_mseq;
  205. hdr.h_lseq = gd->g_send.s_mseq;
  206. msg_send(gd->g_mid, &hdr, (const char *) list, k);
  207. free((char *)list);
  208. } else {
  209. // add current sequence to dispatch
  210. GspRsAddSequence(gd->g_rs, gd->g_recv.r_mseq, 0);
  211. }
  212. // handle send path
  213. for (j = 0; j < gd->g_send.s_wsz; j++) {
  214. gs_context_t *ctx = &gd->g_send.s_ctxpool[j];
  215. if (ctx->ctx_id != GS_CONTEXT_INVALID_ID && ctx->ctx_msg != NULL){
  216. recovery_log(("phase2 gid %d ctx %d mask %x\n",
  217. gd->g_id, ctx->ctx_id, ctx->ctx_mask));
  218. if (set & ctx->ctx_mask) {
  219. int k, n;
  220. recovery_log(("phase2 complete gid %d ctx %d\n",
  221. gd->g_id, ctx->ctx_id));
  222. for (n = 0, k = set; k != 0; k = k >> 1, n++) {
  223. if (k & 0x1) {
  224. GspProcessReply(gd, ctx, n, NULL, 0,
  225. STATUS_HOST_UNREACHABLE);
  226. }
  227. }
  228. }
  229. }
  230. }
  231. // clear this node bit
  232. gd->g_rs->rs_mset &= ~(1 << gd->g_nid);
  233. if (gd->g_rs->rs_mset == 0) {
  234. void GspComputeState(gs_group_t *gd);
  235. GspComputeState(gd);
  236. }
  237. }
  238. GsLockExit(gd->g_lock);
  239. }
  240. }
  241. void GspSyncState(gs_group_t *gd, gs_msg_t *msg, gs_sequence_t *list, int sz);
  242. void
  243. GspComputeState(gs_group_t *gd)
  244. {
  245. int k;
  246. gs_sequence_t *list;
  247. gs_rblk_t *p, *last = NULL;
  248. gs_msg_t *msg;
  249. recovery_log(("Compute missing sequences gid %d\n", gd->g_id));
  250. // compute missing sequences
  251. list = (gs_sequence_t *) malloc(sizeof(*list) * gd->g_rs->rs_sz);
  252. if (list == NULL) {
  253. err_log(("Unable to allocate memory during computestate\n"));
  254. exit(1);
  255. }
  256. k = 0;
  257. for (p = gd->g_rs->rs_list; p != NULL; p = p->next) {
  258. recovery_log(("rs list sequence %d\n", p->mseq));
  259. if (p->have == 0) {
  260. recovery_log(("Skip sequence %d\n", p->mseq));
  261. list[k] = p->mseq;
  262. k++;
  263. }
  264. last = p;
  265. }
  266. // compute next starting mseq
  267. gd->g_global_seq = last != NULL ? last->mseq+1 : gd->g_recv.r_mseq;
  268. k = k * sizeof(*list);
  269. msg = msg_alloc((char *)list, k);
  270. assert(msg != NULL);
  271. msg->m_hdr.h_len = (UINT16) k;
  272. msg->m_hdr.h_type = GS_MSG_TYPE_SYNC;
  273. msg->m_hdr.h_flags = GS_FLAGS_LAST;
  274. msg->m_hdr.h_sid = (gs_memberid_t) gd->g_nid;
  275. msg->m_hdr.h_mid = (gs_memberid_t) gd->g_mid;
  276. msg->m_hdr.h_cid = (gs_cookie_t) -1;
  277. msg->m_hdr.h_gid = gd->g_id;
  278. msg->m_hdr.h_viewnum = gd->g_curview;
  279. msg->m_hdr.h_mseq = gd->g_global_seq++;
  280. msg->m_hdr.h_lseq = gd->g_send.s_lseq;
  281. msg->m_hdr.h_bnum = 0;
  282. *((ULONG *)msg->m_hdr.h_tag) = gd->g_rs->rs_dset;
  283. // send missing sequence list to other nodes
  284. msg_mcast(gd->g_mset, &msg->m_hdr, (const char *) list, k);
  285. recovery_log(("Next starting sequence is %d\n", gd->g_global_seq));
  286. // handle self
  287. GspSyncState(gd, msg, list, k / sizeof(*list));
  288. free((char *)list);
  289. }
  290. void
  291. GspRecoveryMsgHandler(gs_msg_t *rmsg)
  292. {
  293. gs_msg_hdr_t *hdr;
  294. gs_group_t *gd;
  295. hdr = &rmsg->m_hdr;
  296. gd = GspLookupGroup(hdr->h_gid);
  297. // accept messages only if in a valid view
  298. if (gd && rmsg->m_hdr.h_viewnum == gd->g_curview) {
  299. gs_sequence_t *list;
  300. int sz, k;
  301. list = (gs_sequence_t *) rmsg->m_buf;
  302. sz = rmsg->m_hdr.h_len / sizeof(*list);
  303. GsLockEnter(gd->g_lock);
  304. // make sure group is in recovery mode
  305. assert(gd->g_state & GS_GROUP_FLAGS_RECOVERY);
  306. assert(gd->g_mid == gd->g_nid);
  307. // add current sequence to dispatch
  308. GspRsAddSequence(gd->g_rs, hdr->h_mseq, 0);
  309. // insert sequences into have list
  310. for (k = 0; k < sz; k++) {
  311. GspRsAddSequence(gd->g_rs, list[k], 1);
  312. }
  313. // clear this node bit
  314. gd->g_rs->rs_mset &= ~(1 << hdr->h_sid);
  315. if (gd->g_rs->rs_mset == 0) {
  316. GspComputeState(gd);
  317. }
  318. GsLockExit(gd->g_lock);
  319. }
  320. msg_free(rmsg);
  321. }
  322. void
  323. GspSyncState(gs_group_t *gd, gs_msg_t *msg, gs_sequence_t *list, int sz)
  324. {
  325. int k;
  326. // make sure group is in recovery mode
  327. assert(gd->g_state & GS_GROUP_FLAGS_RECOVERY);
  328. assert(gd->g_mid != gd->g_nid);
  329. assert(gd->g_mid == hdr->h_sid);
  330. // mark missing sequences
  331. for (k = 0; k < sz; k++) {
  332. gs_msg_t *p;
  333. recovery_log(("Missing sequence %d\n", list[k]));
  334. p = msg_alloc(NULL, 0);
  335. if (p == NULL) {
  336. err_log(("Unable to allocate memory during syncstate!\n"));
  337. halt(1);
  338. }
  339. p->m_hdr.h_sid = gd->g_nid;
  340. p->m_hdr.h_gid = gd->g_id;
  341. p->m_hdr.h_cid = (gs_cookie_t) -1;
  342. p->m_hdr.h_type = GS_MSG_TYPE_SKIP;
  343. p->m_hdr.h_mseq = list[k];
  344. p->m_hdr.h_lseq = gd->g_send.s_lseq;
  345. p->m_hdr.h_bnum = 0;
  346. p->m_hdr.h_flags = GS_FLAGS_LAST;
  347. GspOrderInsert(gd, p, p, p->m_hdr.h_mseq, 0);
  348. }
  349. // set startview to curview
  350. gd->g_startview = gd->g_curview;
  351. // clear recovery state
  352. gd->g_state &= ~GS_GROUP_FLAGS_RECOVERY;
  353. // free recovery state
  354. GsEventSignal(gd->g_rs->rs_event);
  355. GspRsFree(gd->g_rs);
  356. gd->g_rs = NULL;
  357. // insert msg into dispatch queue at proper order
  358. GspOrderInsert(gd, msg, msg, msg->m_hdr.h_mseq, 0);
  359. GspDispatch(gd);
  360. #if 0
  361. // xxx: need to understand this again
  362. if (gd->g_recv.r_last != NULL) {
  363. GspCleanQueue(gd, last_mseq);
  364. }
  365. #endif
  366. // restart any pending sends
  367. if (gd->g_send.s_waitqueue != NULL && (gd->g_state & GS_GROUP_FLAGS_NEWMASTER)) {
  368. recovery_log(("resend: gs %x s %x\n", gd, gd->g_send.s_waitqueue));
  369. GspAllocateSequence(gd);
  370. }
  371. gd->g_state &= ~GS_GROUP_FLAGS_NEWMASTER;
  372. }
  373. void
  374. GspSyncMsgHandler(gs_msg_t *msg)
  375. {
  376. gs_msg_hdr_t *hdr;
  377. gs_group_t *gd;
  378. hdr = &msg->m_hdr;
  379. gd = GspLookupGroup(hdr->h_gid);
  380. // accept messages only if in a valid view
  381. if (gd && msg->m_hdr.h_viewnum == gd->g_curview) {
  382. gs_sequence_t *list;
  383. int sz;
  384. list = (gs_sequence_t *) msg->m_buf;
  385. sz = msg->m_hdr.h_len / sizeof(*list);
  386. GsLockEnter(gd->g_lock);
  387. // clear this node bit
  388. gd->g_rs->rs_mset &= ~(1 << hdr->h_sid);
  389. assert(gd->g_rs->rs_mset == 0);
  390. GspSyncState(gd, msg, list, sz);
  391. GsLockExit(gd->g_lock);
  392. } else {
  393. msg_free(msg);
  394. }
  395. }