Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1062 lines
27 KiB

  1. /*++
  2. Copyright (c) 2001 Microsoft Corporation
  3. Module Name:
  4. crs.c
  5. Abstract:
  6. Implements Consistency Replica Set Algorithm
  7. Author:
  8. Ahmed Mohamed (ahmedm) 1-Jan-2001
  9. Revision History:
  10. --*/
  11. #include <nt.h>
  12. #include <ntdef.h>
  13. #include <ntrtl.h>
  14. #include <nturtl.h>
  15. #include <windows.h>
  16. #include <stdio.h>
  17. #include <assert.h>
  18. #include "crs.h"
  19. #define xmalloc(size) VirtualAlloc(NULL, size,MEM_RESERVE|MEM_COMMIT,PAGE_READWRITE)
  20. #define xfree(buffer) VirtualFree(buffer, 0, MEM_RELEASE|MEM_DECOMMIT)
  21. #define CrspEqual(r1,r2) ((r1)->hdr.seq == (r2)->hdr.seq && \
  22. (r1)->hdr.epoch == (r2)->hdr.epoch && \
  23. (r1)->hdr.state == (r2)->hdr.state)
  24. DWORD CrsForcedQuorumSize = 0xffff;
  25. void
  26. WINAPI
  27. CrsSetForcedQuorumSize(DWORD size)
  28. {
  29. CrsForcedQuorumSize = size;
  30. }
  31. DWORD
  32. CrspFindLast(CrsInfo_t *p, DWORD logsz)
  33. {
  34. CrsRecord_t *rec, *last_rec;
  35. BOOL err;
  36. DWORD n, i;
  37. n = SetFilePointer(p->fh, 0, NULL, FILE_BEGIN);
  38. if (n == INVALID_SET_FILE_POINTER) {
  39. return GetLastError();
  40. }
  41. err = ReadFile(p->fh, p->buf, logsz, &n, NULL);
  42. if (!err)
  43. return GetLastError();
  44. if (n != logsz) {
  45. CrsLog(("Crs%d: failed to load complete file, read %d expected %d\n",
  46. p->lid,
  47. n, logsz));
  48. return ERROR_BAD_LENGTH;
  49. }
  50. ASSERT(p->max_records * CRS_RECORD_SZ == (int)n);
  51. if(p->max_records * CRS_RECORD_SZ != (int)n) {
  52. CrsLog(("Crs%d: unable to load log file %d bytes, got %d bytes\n",
  53. p->lid, n, logsz));
  54. return ERROR_BAD_LENGTH;
  55. }
  56. CrsLog(("Crs%d: loaded %d bytes, %d records\n", p->lid,
  57. n, p->max_records));
  58. last_rec = NULL;
  59. rec = p->buf;
  60. for (i = 0; i < logsz; i += CRS_RECORD_SZ, rec++) {
  61. if (rec->hdr.tag != CRS_TAG) {
  62. CrsLog(("crs%d: Bad record %d, got %x expected %x\n",
  63. p->lid,
  64. i/CRS_RECORD_SZ, rec->hdr.tag, CRS_TAG));
  65. return ERROR_BAD_FORMAT;
  66. }
  67. if (!last_rec ||
  68. rec->hdr.epoch > last_rec->hdr.epoch ||
  69. (rec->hdr.epoch == last_rec->hdr.epoch &&
  70. (rec->hdr.seq > last_rec->hdr.seq))) {
  71. last_rec = rec;
  72. }
  73. }
  74. ASSERT(last_rec);
  75. // make sure only the last record is not committed or aborted
  76. rec = p->buf;
  77. for (i = 0; i < logsz; i += CRS_RECORD_SZ, rec++) {
  78. if (!(rec->hdr.state & (CRS_COMMIT | CRS_ABORT))) {
  79. if (rec != last_rec) {
  80. CrsLog(("crs:%d Bad record %d state %x expected commit|abort\n",
  81. p->lid, i/CRS_RECORD_SZ, rec->hdr.state));
  82. return ERROR_INTERNAL_ERROR;
  83. }
  84. }
  85. }
  86. p->last_record = (int) (last_rec - p->buf);
  87. p->seq = last_rec->hdr.seq;
  88. p->epoch = last_rec->hdr.epoch;
  89. return ERROR_SUCCESS;
  90. }
  91. #define CrspFlush(p,offset) CrspWrite(p,offset, CRS_SECTOR_SZ)
  92. static
  93. DWORD
  94. CrspWrite(CrsInfo_t *p, int offset, DWORD length)
  95. {
  96. DWORD n;
  97. p->pending = FALSE;
  98. n = (DWORD) offset;
  99. // write out last sector, assumes lock is held
  100. ASSERT(offset < p->max_records);
  101. offset = offset / CRS_RECORDS_PER_SECTOR;
  102. CrsLog(("Crs%d: flush %d bytes record %d -> %d,%d\n", p->lid,
  103. length, n,
  104. offset, offset*CRS_SECTOR_SZ));
  105. n = SetFilePointer(p->fh, offset * CRS_SECTOR_SZ, NULL, FILE_BEGIN);
  106. if (n == INVALID_SET_FILE_POINTER) {
  107. return GetLastError();
  108. }
  109. n = 0;
  110. if (WriteFile(p->fh, (PVOID) &p->buf[offset*CRS_RECORDS_PER_SECTOR], length, &n, NULL)) {
  111. if (n != length) {
  112. CrsLog(("Write count mismatch, wrote %d, expected %d\n", n, length));
  113. return ERROR_BAD_LENGTH;
  114. }
  115. return ERROR_SUCCESS;
  116. }
  117. n = GetLastError();
  118. CrsLog(("Crs%d: flush record %d failed err %d\n", p->lid, offset, n));
  119. if (n == ERROR_UNEXP_NET_ERR) {
  120. // repeat the write one more time
  121. p->pending = TRUE;
  122. }
  123. return n;
  124. }
  125. static
  126. DWORD
  127. CrspAppendRecord(CrsInfo_t *p, CrsRecord_t *rr, CrsRecord_t **rec)
  128. {
  129. CrsRecord_t *q;
  130. DWORD err;
  131. // tag record
  132. rr->hdr.tag = CRS_TAG;
  133. // assumes lock is held
  134. if ((p->last_record & CRS_SECTOR_MASK) == CRS_SECTOR_MASK) {
  135. // flush current sector
  136. err = CrspFlush(p, p->last_record);
  137. if (err != ERROR_SUCCESS)
  138. return err;
  139. }
  140. // advance last record
  141. p->last_record++;
  142. if (p->last_record == p->max_records)
  143. p->last_record = 0;
  144. CrsLog(("Crs%d: append record %d epoch %I64d seq %I64d state %x\n",
  145. p->lid, p->last_record,
  146. rr->hdr.epoch, rr->hdr.seq, rr->hdr.state));
  147. // copy record
  148. q = &p->buf[p->last_record];
  149. memcpy((PVOID)q, (PVOID) rr, CRS_RECORD_SZ);
  150. // flush it out now
  151. err = CrspFlush(p, p->last_record);
  152. if (err == ERROR_SUCCESS) {
  153. if (rec) *rec = q;
  154. } else {
  155. if (p->last_record == 0)
  156. p->last_record = p->max_records;
  157. p->last_record--;
  158. }
  159. return err;
  160. }
  161. // NextRecord:
  162. // if seq is null, fill in last record and return SUCCESS
  163. // if seq is not found, return NOT_FOUND
  164. // if seq is last record, return EOF
  165. // otherwise return next record after seq in lrec and SUCCESS
  166. DWORD
  167. CrspNextLogRecord(CrsInfo_t *info, CrsRecord_t *seq,
  168. CrsRecord_t *lrec, BOOLEAN this_flag)
  169. {
  170. CrsRecord_t *last, *p;
  171. DWORD err = ERROR_SUCCESS;
  172. if (lrec == NULL || info == NULL) {
  173. return ERROR_INVALID_PARAMETER;
  174. }
  175. // read record
  176. EnterCriticalSection(&info->lock);
  177. last = &info->buf[info->last_record];
  178. if (seq == NULL) {
  179. CrsLog(("Crs%d: last record %d %I64d %I64d\n",
  180. info->lid, info->last_record, last->hdr.epoch, last->hdr.seq));
  181. // read last record
  182. memcpy(lrec, last, CRS_RECORD_SZ);
  183. } else if (seq->hdr.epoch != last->hdr.epoch ||
  184. seq->hdr.seq != last->hdr.seq) {
  185. int i;
  186. CrsLog(("Crs%d: last record %d %I64d %I64d search %I64d %I64d\n",
  187. info->lid, info->last_record,
  188. last->hdr.epoch, last->hdr.seq,
  189. seq->hdr.epoch, seq->hdr.seq));
  190. // assume we don't have it
  191. p = seq;
  192. seq = NULL;
  193. // do a search instead of index, so that
  194. // seq can be reset as epoch increments
  195. for (i = 0; i < info->max_records; i++) {
  196. last = &info->buf[i];
  197. if (p->hdr.epoch == last->hdr.epoch &&
  198. p->hdr.seq == last->hdr.seq) {
  199. seq = last;
  200. break;
  201. }
  202. }
  203. if (seq != NULL) {
  204. if (this_flag == FALSE) {
  205. // return record after this one
  206. i++;
  207. if (i >= info->max_records)
  208. i = 0;
  209. seq = &info->buf[i];
  210. }
  211. CrsLog(("Crs%d: search found %d %I64d, %I64d\n", info->lid,
  212. seq - info->buf, seq->hdr.epoch, seq->hdr.seq));
  213. memcpy(lrec, seq, CRS_RECORD_SZ);
  214. } else {
  215. err = ERROR_NOT_FOUND;
  216. }
  217. } else {
  218. CrsLog(("Crs%d: reached last record %d %I64d %I64d, %I64d %I64d\n",
  219. info->lid, info->last_record,
  220. last->hdr.epoch, last->hdr.seq,
  221. seq->hdr.epoch, seq->hdr.seq));
  222. if (this_flag == TRUE) {
  223. // we are trying to read the last record
  224. memcpy(lrec, last, CRS_RECORD_SZ);
  225. err = ERROR_SUCCESS;
  226. } else {
  227. err = ERROR_HANDLE_EOF;
  228. }
  229. }
  230. LeaveCriticalSection(&info->lock);
  231. if (err == ERROR_SUCCESS && lrec->hdr.epoch == 0) {
  232. // invalid rec, log is empty
  233. err = ERROR_HANDLE_EOF;
  234. }
  235. return err;
  236. }
  237. // Call into fs with <undo, replay, query, disable, enable, done>
  238. // undo: pass replica in recovery due to a conflict
  239. // replay: replica is missing change, if replay fails with abort, we
  240. // do a full copy; otherwise we issue a skip record
  241. // query: ask replica if record was completed or not
  242. // done: signal end of recovery and pass in new wset, rset
  243. // we silently handle <abort(skip) and epoch records>
  244. // abort: add a skip record
  245. // epoch records: just log it as is
  246. DWORD
  247. CrspReplay(LPVOID rec)
  248. {
  249. CrsRecoveryBlk_t *rr;
  250. CrsInfo_t *info, *minfo;
  251. CrsRecord_t *p, *q;
  252. CrsRecord_t lrec, mlrec;
  253. DWORD err;
  254. rr = (CrsRecoveryBlk_t *) rec;
  255. info = rr->info;
  256. minfo = rr->minfo;
  257. CrsLog(("CrsReplay%d mid %d, lid %d leader_id %d\n",
  258. rr->nid, rr->mid, info->lid, info->leader_id));
  259. do {
  260. p = NULL;
  261. // read last record
  262. err = CrspNextLogRecord(info, NULL, &lrec, FALSE);
  263. if (err != ERROR_SUCCESS) {
  264. CrsLog(("CrsReplay%d: unable to read last record %d\n",
  265. info->lid, err));
  266. break;
  267. }
  268. // find our last record in master replica
  269. q = &lrec;
  270. p = &mlrec;
  271. err = CrspNextLogRecord(minfo, q, p, TRUE);
  272. // if found and consistent with master, no undo
  273. if (err == ERROR_SUCCESS && p->hdr.state == q->hdr.state) {
  274. CrsLog(("CrsReplay%d: last record %I64d, %I64d consistent %x %x\n",
  275. info->lid, q->hdr.epoch, q->hdr.seq,
  276. p->hdr.state, q->hdr.state));
  277. break;
  278. }
  279. if (err != ERROR_SUCCESS) {
  280. CrsLog(("CrsReplay%d: missing lrec %I64d, %I64d in disk %d, err %d\n",
  281. info->lid, q->hdr.epoch, q->hdr.seq, minfo->lid, err));
  282. } else {
  283. CrsLog(("CrsReplay%d: undo last record %I64d, %I64d %x needs %x\n",
  284. info->lid, q->hdr.epoch, q->hdr.seq,
  285. q->hdr.state, p->hdr.state));
  286. ASSERT(p->hdr.state & (CRS_COMMIT|CRS_ABORT));
  287. }
  288. // last record is in conflict, we must undo it first
  289. if (!(q->hdr.state & CRS_EPOCH)) {
  290. // if we found this record in master and a conflict is detected,
  291. // we undo it. Otherwise, we need to do a full copy
  292. if (err == ERROR_SUCCESS) {
  293. ASSERT(p->hdr.state & (CRS_COMMIT|CRS_ABORT));
  294. ASSERT(q->hdr.state & CRS_PREPARE);
  295. err = info->callback(info->callback_arg,
  296. rr->nid, q,
  297. CRS_ACTION_UNDO, rr->mid);
  298. }
  299. } else {
  300. // A missing epoch record doesn't mean we are old. A regroup
  301. // could have happened but no new data records got added. We
  302. // undo it, and continue;
  303. err = STATUS_SUCCESS;
  304. }
  305. if (err == STATUS_SUCCESS) {
  306. // update current record, sequence, epoch
  307. info->buf[info->last_record].hdr.state = 0;
  308. info->buf[info->last_record].hdr.epoch = 0;
  309. info->buf[info->last_record].hdr.seq = 0;
  310. if (info->last_record == 0) {
  311. info->last_record = info->max_records;
  312. }
  313. info->last_record--;
  314. info->seq = info->buf[info->last_record].hdr.seq;
  315. info->epoch = info->buf[info->last_record].hdr.epoch;
  316. CrsLog(("CrsReplay%d: new last record %d %I64d, %I64d\n",
  317. info->lid, info->last_record, info->epoch, info->seq));
  318. } else {
  319. // can't undo it, do full copy and readjust our log
  320. CrsLog(("CrsReplay%d: Unable to undo record %I64d, %I64d\n",
  321. info->lid, q->hdr.epoch, q->hdr.seq));
  322. p = NULL;
  323. }
  324. } while (err == STATUS_SUCCESS && info->state == CRS_STATE_RECOVERY);
  325. while (p != NULL && info->state == CRS_STATE_RECOVERY) {
  326. // read master copy
  327. err = CrspNextLogRecord(minfo, p, &mlrec, FALSE);
  328. if (err != ERROR_SUCCESS) {
  329. if (err == ERROR_HANDLE_EOF) {
  330. CrsLog(("CrsReplay%d: last record %I64d, %I64d in disk %d\n",
  331. info->lid, q->hdr.epoch, q->hdr.seq, minfo->lid));
  332. // the last record is where we are at
  333. info->seq = info->buf[info->last_record].hdr.seq;
  334. info->epoch = info->buf[info->last_record].hdr.epoch;
  335. // we reached the end, signal end of recovery
  336. err = info->callback(info->callback_arg,
  337. rr->nid, p,
  338. CRS_ACTION_DONE, rr->mid);
  339. goto exit;
  340. }
  341. break;
  342. }
  343. p = &mlrec;
  344. if ((p->hdr.state & CRS_EPOCH) || (p->hdr.state & CRS_ABORT)) {
  345. CrsLog(("CrsReplay%d: skip record %I64d, %I64d %x\n",
  346. info->lid, p->hdr.epoch, p->hdr.seq, p->hdr.state));
  347. err = !STATUS_SUCCESS;
  348. } else if (p->hdr.state & CRS_COMMIT) {
  349. err = info->callback(info->callback_arg,
  350. rr->nid, p,
  351. CRS_ACTION_REPLAY, rr->mid);
  352. if (err == STATUS_TRANSACTION_ABORTED) {
  353. CrsLog(("CrsReplay: failed nid %d seq %I64d err %d\n",
  354. rr->nid, p->hdr.seq, err));
  355. break;
  356. }
  357. } else {
  358. ASSERT(p->hdr.state & CRS_PREPARE);
  359. // what if the record is prepared but not yet committed or
  360. // aborted; in transit record.
  361. // stop now
  362. CrsLog(("CrsReplay%d: bad record seq %I64d state %x\n",
  363. rr->nid, p->hdr.seq, p->hdr.state));
  364. break;
  365. }
  366. if (err != STATUS_SUCCESS) {
  367. // add record
  368. err = CrspAppendRecord(info, p, NULL);
  369. if (err != ERROR_SUCCESS) {
  370. CrsLog(("CrsReplay%d: failed append seq %I64d err %d\n",
  371. rr->nid, p->hdr.seq, err));
  372. break;
  373. }
  374. if (p->hdr.state & CRS_EPOCH) {
  375. ; //ASSERT(info->epoch+1 == p->hdr.epoch);
  376. } else {
  377. ASSERT(info->epoch == p->hdr.epoch);
  378. ASSERT(info->seq+1 == p->hdr.seq);
  379. }
  380. info->seq = p->hdr.seq;
  381. info->epoch = p->hdr.epoch;
  382. } else {
  383. // make sure we have added it
  384. ASSERT(info->seq == p->hdr.seq);
  385. ASSERT(info->epoch == p->hdr.epoch);
  386. ASSERT(info->buf[info->last_record].hdr.seq == p->hdr.seq);
  387. ASSERT(info->buf[info->last_record].hdr.epoch == p->hdr.epoch);
  388. // Propagate dubious bit
  389. if (p->hdr.state & CRS_DUBIOUS) {
  390. info->buf[info->last_record].hdr.state |= CRS_DUBIOUS;
  391. }
  392. ASSERT(info->buf[info->last_record].hdr.state == p->hdr.state);
  393. }
  394. }
  395. if (p == NULL || err != STATUS_SUCCESS) {
  396. CrsLog(("CrsReplay%d: Full copy from disk %d\n",
  397. info->lid, minfo->lid));
  398. // we are out of date or need full recovery, do a full copy
  399. err = info->callback(info->callback_arg,
  400. rr->nid, NULL,
  401. CRS_ACTION_COPY, rr->mid);
  402. if (err == STATUS_SUCCESS) {
  403. DWORD len;
  404. // we now copy our master log and flush it
  405. ASSERT(minfo->max_records == info->max_records);
  406. len = info->max_records * CRS_RECORD_SZ;
  407. memcpy(info->buf, minfo->buf, len);
  408. err = CrspWrite(info, 0, len);
  409. if (err == ERROR_SUCCESS) {
  410. // adjust our state
  411. info->last_record = minfo->last_record;
  412. info->seq = info->buf[info->last_record].hdr.seq;
  413. info->epoch = info->buf[info->last_record].hdr.epoch;
  414. // we reached the end, signal end of recovery
  415. err = info->callback(info->callback_arg,
  416. rr->nid, p,
  417. CRS_ACTION_DONE, rr->mid);
  418. }
  419. }
  420. }
  421. exit:
  422. CrsLog(("CrsReplay%d mid %d status 0x%x\n", rr->nid, rr->mid, err));
  423. return err;
  424. }
  425. /////////////////////// Public Functions //////////////////////
  426. DWORD
  427. WINAPI
  428. CrsOpen(crs_callback_t callback, PVOID callback_arg, USHORT lid,
  429. WCHAR *log_name, int max_logsectors, HANDLE *outhdl)
  430. {
  431. // Open the log file
  432. // If the file in newly create, set the proper size
  433. // If the file size is not the same size, we need to either
  434. // expand or truncate the file. (truncate needs copy)
  435. // Scan file to locate last sector and record
  436. // If last record hasn't been commited, issue a query.
  437. // If query succeeded then, mark it as committed.
  438. // Set epoch,seq
  439. DWORD status;
  440. HANDLE maph;
  441. CrsInfo_t *p;
  442. int logsz;
  443. if (outhdl == NULL) {
  444. return ERROR_INVALID_PARAMETER;
  445. }
  446. *outhdl = NULL;
  447. p = (CrsInfo_t *) malloc(sizeof(*p));
  448. if (p == NULL) {
  449. return ERROR_NOT_ENOUGH_MEMORY;
  450. }
  451. memset((PVOID) p, 0, sizeof(*p));
  452. CrsLog(("Crs%d file '%S'\n", lid, log_name));
  453. p->lid = lid;
  454. p->callback = callback;
  455. p->callback_arg = callback_arg;
  456. p->pending = FALSE;
  457. // Create log file, and set size of newly created
  458. p->fh = CreateFileW(log_name,
  459. GENERIC_READ | GENERIC_WRITE,
  460. FILE_SHARE_READ|FILE_SHARE_WRITE,
  461. NULL,
  462. OPEN_ALWAYS,
  463. FILE_FLAG_WRITE_THROUGH,
  464. NULL);
  465. status = GetLastError();
  466. if(p->fh == INVALID_HANDLE_VALUE){
  467. free((char *) p);
  468. return status;
  469. }
  470. // acquire an exclusive lock on the whole file
  471. if (!LockFile(p->fh, 0, 0, (DWORD)-1, (DWORD)-1)) {
  472. FILE_FULL_EA_INFORMATION ea[2] = {0};
  473. IO_STATUS_BLOCK ios;
  474. NTSTATUS err;
  475. // get status
  476. status = GetLastError();
  477. // change the ea to cause a notification to happen
  478. ea[0].NextEntryOffset = 0;
  479. ea[0].Flags = 0;
  480. ea[0].EaNameLength = 1;
  481. ea[0].EaValueLength = 1;
  482. ea[0].EaName[0] = 'X';
  483. err = NtSetEaFile(p->fh, &ios, (PVOID) ea, sizeof(ea));
  484. CrsLog(("Crs%d Setting EA %x\n", lid, err));
  485. goto error;
  486. }
  487. if (status == ERROR_ALREADY_EXISTS) {
  488. // todo: compare current file size to new size and adjust file
  489. // size accordingly. For now, just use old size
  490. logsz = GetFileSize(p->fh, NULL);
  491. CrsLog(("Crs%d: Filesz %d max_sec %d\n", lid, logsz, max_logsectors));
  492. ASSERT(logsz == max_logsectors * CRS_SECTOR_SZ);
  493. } else {
  494. //extend the file pointer to max size
  495. logsz = max_logsectors * CRS_SECTOR_SZ;
  496. SetFilePointer(p->fh, logsz, NULL, FILE_BEGIN);
  497. SetEndOfFile(p->fh);
  498. CrsLog(("Crs%d: Set Filesz %d max_sec %d\n", lid, logsz, max_logsectors));
  499. }
  500. // allocate file copy in memory
  501. p->buf = xmalloc(logsz);
  502. if (p->buf == NULL) {
  503. status = ERROR_NOT_ENOUGH_MEMORY;
  504. goto error;
  505. }
  506. // set max record
  507. p->max_records = logsz / CRS_RECORD_SZ;
  508. if (status == ERROR_ALREADY_EXISTS) {
  509. // load file and compute last epoch/seq
  510. status = CrspFindLast(p, logsz);
  511. } else {
  512. status = !ERROR_SUCCESS;
  513. }
  514. // init the file, when we detect a read failure or first time
  515. if (status != ERROR_SUCCESS) {
  516. CrsRecord_t *r;
  517. int i;
  518. // initialize file
  519. p->seq = 0;
  520. p->epoch = 0;
  521. p->last_record = 0;
  522. r = p->buf;
  523. for (i = 0; i < logsz; i+= CRS_RECORD_SZ, r++) {
  524. r->hdr.epoch = p->epoch;
  525. r->hdr.seq = p->seq;
  526. r->hdr.tag = CRS_TAG;
  527. r->hdr.state = CRS_COMMIT | CRS_PREPARE | CRS_EPOCH;
  528. }
  529. status = CrspWrite(p, 0, logsz);
  530. }
  531. if (status != ERROR_SUCCESS) {
  532. goto error;
  533. }
  534. CrsLog(("Crs%d: %x Last record %d max %d epoch %I64d seq %I64d\n", p->lid,
  535. p->fh,
  536. p->last_record, p->max_records, p->epoch, p->seq));
  537. // initialize rest of state
  538. p->state = CRS_STATE_INIT;
  539. p->refcnt = 1;
  540. p->leader_id = 0;
  541. InitializeCriticalSection(&p->lock);
  542. *outhdl = p;
  543. return ERROR_SUCCESS;
  544. error:
  545. CloseHandle(p->fh);
  546. if (p->buf) {
  547. xfree(p->buf);
  548. }
  549. free((PVOID) p);
  550. return status;
  551. }
  552. //
  553. DWORD
  554. WINAPI
  555. CrsStart(PVOID *hdls, ULONG alive_set, int cluster_sz,
  556. ULONG *write_set, ULONG *read_set, ULONG *evict_set)
  557. {
  558. DWORD status;
  559. CrsInfo_t **info = (CrsInfo_t **) hdls;
  560. int i, active_sz, mid;
  561. ULONG mask, active_set, fail_set;
  562. CrsInfo_t *p;
  563. CrsRecord_t *q, *mlrec;
  564. if (write_set) *write_set = 0;
  565. if (read_set) *read_set = 0;
  566. if (evict_set) *evict_set = 0;
  567. // no alive node
  568. if (cluster_sz == 0 || alive_set == 0) {
  569. // nothing to do
  570. return ERROR_WRITE_PROTECT;
  571. }
  572. // scan each hdl and make sure it is initialized and lock all hdls
  573. mask = alive_set;
  574. for (i = 0; mask != 0; i++, mask = mask >> 1) {
  575. if (!(mask & 0x1)) {
  576. continue;
  577. }
  578. p = info[i];
  579. if (p == NULL) {
  580. continue;
  581. }
  582. EnterCriticalSection(&p->lock);
  583. // check the state of the last record
  584. p = info[i];
  585. q = &p->buf[p->last_record];
  586. CrsLog(("Crs%d last record %d epoch %I64d seq %I64d state %x\n",
  587. p->lid, p->last_record,
  588. q->hdr.epoch, q->hdr.seq, q->hdr.state));
  589. }
  590. mid = 0;
  591. mlrec = NULL;
  592. // select master replica
  593. for (i = 0, mask = alive_set; mask != 0; i++, mask = mask >> 1) {
  594. if (!(mask & 0x1)) {
  595. continue;
  596. }
  597. p = info[i];
  598. if (p == NULL)
  599. continue;
  600. q = &p->buf[p->last_record];
  601. if (!mlrec ||
  602. mlrec->hdr.epoch < q->hdr.epoch ||
  603. (mlrec->hdr.epoch == q->hdr.epoch && mlrec->hdr.seq < q->hdr.seq) ||
  604. (mlrec->hdr.epoch == q->hdr.epoch && mlrec->hdr.seq == q->hdr.seq &&
  605. mlrec->hdr.state != q->hdr.state && (q->hdr.state & CRS_COMMIT))) {
  606. mid = i;
  607. mlrec = q;
  608. }
  609. }
  610. ASSERT(mid != 0);
  611. // if master last record is in doubt, query filesystem. If the filesystem
  612. // is certain that the operation has occured, it returns STATUS_SUCCESS for
  613. // COMMIT, STATUS_CANCELLED for ABORT, and STATUS_NOT_FOUND for can't tell.
  614. // All undetermined IO must be undone and redone in all non-master replicas
  615. // to ensure all replicas reach consistency. This statement is true even
  616. // for replicas that are currently absent from our set. We tag such records
  617. // we both COMMIT and ABORT, so that the replay thread issues replay for
  618. // new records and undo,replay for last records
  619. p = info[mid];
  620. p->leader_id = (USHORT) mid;
  621. ASSERT(mlrec != NULL);
  622. if (!(mlrec->hdr.state & (CRS_COMMIT | CRS_ABORT))) {
  623. ASSERT(mlrec->hdr.state & CRS_PREPARE);
  624. status = p->callback(p->callback_arg, p->lid,
  625. mlrec, CRS_ACTION_QUERY,
  626. p->lid);
  627. if (status == STATUS_SUCCESS) {
  628. mlrec->hdr.state |= CRS_COMMIT;
  629. } else if (status == STATUS_CANCELLED) {
  630. mlrec->hdr.state |= CRS_ABORT;
  631. } else if (status == STATUS_NOT_FOUND) {
  632. // assume it is committed, but mark it for undo during recovery
  633. mlrec->hdr.state |= (CRS_COMMIT | CRS_DUBIOUS);
  634. }
  635. // todo: if status == TRANSACTION_ABORTED, we need to bail out since
  636. // must master is dead
  637. // no need to flush, I think!
  638. // CrspFlush(p, p->last_record);
  639. // todo: what if the flush fails here, I am assuming that
  640. // an append will equally fail.
  641. }
  642. ASSERT(mlrec->hdr.state & (CRS_COMMIT | CRS_ABORT));
  643. // compute sync and recovery masks
  644. fail_set = 0;
  645. active_set = 0;
  646. active_sz = 0;
  647. for (i = 0, mask = alive_set; mask != 0; i++, mask = mask >> 1) {
  648. if (!(mask & 0x1)) {
  649. continue;
  650. }
  651. p = info[i];
  652. if (p == NULL) {
  653. continue;
  654. }
  655. // set leader id
  656. p->leader_id = (USHORT) mid;
  657. q = &p->buf[p->last_record];
  658. if (CrspEqual(mlrec, q)) {
  659. ASSERT(q->hdr.state & (CRS_COMMIT | CRS_ABORT));
  660. p->state = CRS_STATE_READ;
  661. active_set |= (1 << i);
  662. active_sz++;
  663. } else if (p->state != CRS_STATE_RECOVERY) {
  664. CrsRecoveryBlk_t rrbuf;
  665. CrsRecoveryBlk_t *rr = &rrbuf;
  666. // recover replica
  667. rr->nid = i;
  668. rr->mid = mid;
  669. rr->info = p;
  670. rr->minfo = info[mid];
  671. // set recovery state
  672. p->state = CRS_STATE_RECOVERY;
  673. status = CrspReplay((LPVOID) rr);
  674. // if we fail, evict this replica
  675. if (status != ERROR_SUCCESS) {
  676. fail_set |= (1 << i);
  677. } else {
  678. // repeat this replica again
  679. i--;
  680. mask = mask << 1;
  681. }
  682. }
  683. }
  684. // assume success
  685. status = ERROR_SUCCESS;
  686. // set read sets
  687. if (read_set) *read_set = active_set;
  688. if (!CRS_QUORUM(active_sz, cluster_sz)) {
  689. CrsLog(("No quorum active %d cluster %d\n", active_sz, cluster_sz));
  690. mid = 0;
  691. status = ERROR_WRITE_PROTECT;
  692. } else {
  693. int pass_cnt = 0;
  694. ULONG pass_set = 0;
  695. // Enable writes on all active replicas
  696. for (i = 0, mask = active_set; mask != 0; i++, mask = mask >> 1) {
  697. CrsRecord_t rec;
  698. if (!(mask & 0x1)) {
  699. continue;
  700. }
  701. p = info[i];
  702. if (p == NULL)
  703. continue;
  704. p->state = CRS_STATE_WRITE;
  705. // we now generate a new epoch and flush it to the disk
  706. p->epoch++;
  707. if (p->epoch == 0)
  708. p->epoch = 1;
  709. // reset seq to zero
  710. p->seq = 0;
  711. // write new epoch now, if not a majority replicas succeeded in writing
  712. // the new <epoch, seq> we fail
  713. rec.hdr.epoch = p->epoch;
  714. rec.hdr.seq = p->seq;
  715. rec.hdr.state = CRS_PREPARE | CRS_COMMIT | CRS_EPOCH;
  716. memset(rec.data, 0, sizeof(rec.data));
  717. if (CrspAppendRecord(p, &rec, NULL) == ERROR_SUCCESS) {
  718. pass_cnt++;
  719. pass_set |= (1 << i);
  720. } else {
  721. fail_set |= (1 << i);
  722. }
  723. }
  724. // Recheck to make sure all replicas have advanced epoch
  725. if (!CRS_QUORUM(pass_cnt, cluster_sz)) {
  726. CrsLog(("No quorum due to error pass %d cluster %d\n", pass_cnt, cluster_sz));
  727. mid = 0;
  728. pass_set = 0;
  729. pass_cnt = 0;
  730. status = ERROR_WRITE_PROTECT;
  731. }
  732. if (pass_cnt != active_sz) {
  733. // some replicas have died
  734. for (i = 0, mask = pass_set; mask != 0; i++, mask = mask >> 1) {
  735. if ((alive_set & (1 << i)) && (!mask & (1 << i))) {
  736. p = info[i];
  737. ASSERT(p != NULL);
  738. p->state = CRS_STATE_READ;
  739. }
  740. }
  741. }
  742. // set write set
  743. if (write_set) *write_set = pass_set;
  744. }
  745. if (evict_set) *evict_set = fail_set;
  746. // unlock all hdls and set new master if any
  747. for (i = 0, mask = alive_set; mask != 0; i++, mask = mask >> 1) {
  748. if (!(mask & 0x1)) {
  749. continue;
  750. }
  751. p = info[i];
  752. if (p == NULL)
  753. continue;
  754. p->leader_id = (USHORT) mid;
  755. LeaveCriticalSection(&p->lock);
  756. }
  757. return status;
  758. }
  759. void
  760. WINAPI
  761. CrsClose(PVOID hd)
  762. {
  763. DWORD err;
  764. CrsInfo_t *info = (CrsInfo_t *) hd;
  765. // If we any recovery threads running, make sure we terminate them first
  766. // before close and free all of this stuff
  767. if (info == NULL) {
  768. CrsLog(("CrsClose: try to close a null handle!\n"));
  769. return;
  770. }
  771. // Flush everything out and close the file
  772. EnterCriticalSection(&info->lock);
  773. // flush
  774. CrspFlush(info, info->last_record);
  775. LeaveCriticalSection(&info->lock);
  776. DeleteCriticalSection(&info->lock);
  777. err = CloseHandle(info->fh);
  778. CrsLog(("Crs%d: %x Closed %d\n", info->fh, info->lid, err));
  779. xfree(info->buf);
  780. free((char *) info);
  781. }
  782. void
  783. WINAPI
  784. CrsFlush(PVOID hd)
  785. {
  786. CrsInfo_t *info = (CrsInfo_t *) hd;
  787. // if we have a commit or abort that isn't flushed yet, flush it now
  788. EnterCriticalSection(&info->lock);
  789. if (info->pending == TRUE) {
  790. CrspFlush(info, info->last_record);
  791. }
  792. LeaveCriticalSection(&info->lock);
  793. }
  794. PVOID
  795. WINAPI
  796. CrsPrepareRecord(PVOID hd, PVOID lrec, crs_id_t id)
  797. {
  798. CrsRecord_t *p = (CrsRecord_t *)lrec;
  799. CrsInfo_t *info = (CrsInfo_t *) hd;
  800. DWORD err;
  801. // move to correct slot in this sector. If we need a new sector,
  802. // read it from the file. Make sure we flush any pending commits on
  803. // current sector before we over write our in memory sector buffer.
  804. // prepare record, if seq none 0 then we are skipping the next sequence
  805. EnterCriticalSection(&info->lock);
  806. if (info->state == CRS_STATE_WRITE ||
  807. (info->state == CRS_STATE_RECOVERY && id != NULL && id[0] != 0)) {
  808. if (id != NULL && id[0] != 0) {
  809. CrsHdr_t *tmp = (CrsHdr_t *) id;
  810. assert(id[0] == info->seq+1);
  811. p->hdr.seq = tmp->seq;
  812. p->hdr.epoch = tmp->epoch;
  813. } else {
  814. p->hdr.seq = info->seq+1;
  815. p->hdr.epoch = info->epoch;
  816. }
  817. p->hdr.state = CRS_PREPARE;
  818. err = CrspAppendRecord(info, p, &p);
  819. if (err == ERROR_SUCCESS) {
  820. // we return with the lock held, gets release on commitorabort
  821. CrsLog(("Crs%d prepare %x seq %I64d\n",info->lid, p, p->hdr.seq));
  822. return p;
  823. }
  824. CrsLog(("Crs%d: Append failed seq %I64%d\n", info->lid, p->hdr.seq));
  825. } else {
  826. CrsLog(("Crs%d: Prepare bad state %d id %x\n", info->lid, info->state, id));
  827. }
  828. LeaveCriticalSection(&info->lock);
  829. return NULL;
  830. }
  831. int
  832. WINAPI
  833. CrsCommitOrAbort(PVOID hd, PVOID lrec, int commit)
  834. {
  835. CrsRecord_t *p = (CrsRecord_t *)lrec;
  836. CrsInfo_t *info = (CrsInfo_t *) hd;
  837. if (p == NULL || info == NULL) {
  838. return ERROR_INVALID_PARAMETER;
  839. }
  840. // update state of record
  841. if (p->hdr.seq != info->seq+1) {
  842. CrsLog(("Crs: sequence mis-match on commit|abort %I64d %I64d\n",
  843. p->hdr.seq, info->seq));
  844. assert(0);
  845. return ERROR_INVALID_PARAMETER;
  846. }
  847. assert(!(p->hdr.state & (CRS_COMMIT | CRS_ABORT)));
  848. // todo: this is wrong, what if one replica succeeds
  849. // and others abort. Now, the others will reuse the
  850. // same seq for a different update and when the
  851. // succeeded replica rejoins it can't tell that the
  852. // sequence got reused.
  853. if (commit == TRUE) {
  854. p->hdr.state |= CRS_COMMIT;
  855. // advance the sequence
  856. info->seq++;
  857. CrsLog(("Crs%d: commit last %d leader %d seq %I64d\n", info->lid,
  858. info->last_record,
  859. info->leader_id, p->hdr.seq));
  860. } else {
  861. p->hdr.state |= CRS_ABORT;
  862. // we need to re-adjust our last record
  863. if (info->last_record == 0) {
  864. info->last_record = info->max_records;
  865. }
  866. info->last_record--;
  867. CrsLog(("Crs%d: abort last %d leader %d seq %I64d\n", info->lid,
  868. info->last_record,
  869. info->leader_id, p->hdr.seq));
  870. }
  871. info->pending = TRUE;
  872. LeaveCriticalSection(&info->lock);
  873. return ERROR_SUCCESS;
  874. }
  875. int
  876. WINAPI
  877. CrsCanWrite(PVOID hd)
  878. {
  879. CrsInfo_t *info = (CrsInfo_t *) hd;
  880. int err;
  881. // do we have a quorm or not
  882. EnterCriticalSection(&info->lock);
  883. err = (info->state == CRS_STATE_WRITE);
  884. LeaveCriticalSection(&info->lock);
  885. return err;
  886. }