Leaked source code of windows server 2003
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1184 lines
36 KiB

  1. /*++
  2. Copyright (c) 2001 Microsoft Corporation
  3. Module Name:
  4. crs.c
  5. Abstract:
  6. Implements Consistency Replica Set Algorithm
  7. Author:
  8. Ahmed Mohamed (ahmedm) 1-Jan-2001
  9. Revision History:
  10. --*/
  11. #include <nt.h>
  12. #include <ntdef.h>
  13. #include <ntrtl.h>
  14. #include <nturtl.h>
  15. #include <windows.h>
  16. #include <stdio.h>
  17. #include <assert.h>
  18. #define QFS_DBG
  19. #include "crs.h"
  20. #include "fsutil.h"
  21. #define xmalloc(size) VirtualAlloc(NULL, size, MEM_COMMIT, PAGE_READWRITE)
  22. #define xfree(buffer) VirtualFree(buffer, 0, MEM_RELEASE)
  23. #define CrspEqual(r1,r2) ((r1)->hdr.seq == (r2)->hdr.seq && \
  24. (r1)->hdr.epoch == (r2)->hdr.epoch && \
  25. (r1)->hdr.state == (r2)->hdr.state)
  26. DWORD CrsForcedQuorumSize = 0xffff;
  27. void
  28. WINAPI
  29. CrsSetForcedQuorumSize(DWORD size)
  30. {
  31. CrsForcedQuorumSize = size;
  32. }
  33. VOID
  34. CrsForceClose(CrsInfo_t *p)
  35. /*
  36. This should be called only on emergency terminations. This would unlock the crs.log
  37. file and close the handle. This does not hold any lock.
  38. */
  39. {
  40. if (p == NULL) {
  41. CrsLog(("CrsForceClose: Exiting...\n"));
  42. return;
  43. }
  44. CrsLog(("CrsForceClose: fh 0x%x, nid %d\n", p->fh, p->lid));
  45. if (p->fh != INVALID_HANDLE_VALUE) {
  46. if(!UnlockFile(p->fh, 0, 0, (DWORD)-1, (DWORD)-1)) {
  47. CrsLog(("CrsForceClose: UnlockFile(0x%x) returns %d\n", p->fh, GetLastError()));
  48. }
  49. if(!CloseHandle(p->fh)) {
  50. CrsLog(("CrsForceClose: CloseHandle(0x%x) returns %d\n", p->fh, GetLastError()));
  51. }
  52. p->fh = INVALID_HANDLE_VALUE;
  53. }
  54. }
  55. DWORD
  56. CrspFindLast(CrsInfo_t *p, DWORD logsz)
  57. {
  58. CrsRecord_t *rec, *last_rec;
  59. BOOL err;
  60. DWORD n, i;
  61. if (p->fh == INVALID_HANDLE_VALUE) {
  62. CrsLog(("CrspFindLast: Invalid file handle. Exiting...\n"));
  63. return ERROR_INVALID_HANDLE;
  64. }
  65. n = SetFilePointer(p->fh, 0, NULL, FILE_BEGIN);
  66. if (n == INVALID_SET_FILE_POINTER) {
  67. return GetLastError();
  68. }
  69. err = ReadFile(p->fh, p->buf, logsz, &n, NULL);
  70. if (!err)
  71. return GetLastError();
  72. if (n != logsz) {
  73. CrsLog(("Crs%d: failed to load complete file, read %d expected %d\n",
  74. p->lid,
  75. n, logsz));
  76. return ERROR_BAD_LENGTH;
  77. }
  78. // Not needed.
  79. // ASSERT(p->max_records * CRS_RECORD_SZ == (int)n);
  80. // if(p->max_records * CRS_RECORD_SZ != (int)n) {
  81. // CrsLog(("Crs%d: unable to load log file %d bytes, got %d bytes\n",
  82. // p->lid, n, logsz));
  83. // return ERROR_BAD_LENGTH;
  84. // }
  85. CrsLog(("Crs%d: loaded %d bytes, %d records\n", p->lid,
  86. n, p->max_records));
  87. last_rec = NULL;
  88. rec = p->buf;
  89. for (i = 0; i < logsz; i += CRS_RECORD_SZ, rec++) {
  90. if (rec->hdr.tag != CRS_TAG) {
  91. CrsLog(("crs%d: Bad record %d, got %x expected %x\n",
  92. p->lid,
  93. i/CRS_RECORD_SZ, rec->hdr.tag, CRS_TAG));
  94. return ERROR_BAD_FORMAT;
  95. }
  96. if (!last_rec ||
  97. rec->hdr.epoch > last_rec->hdr.epoch ||
  98. (rec->hdr.epoch == last_rec->hdr.epoch &&
  99. (rec->hdr.seq > last_rec->hdr.seq))) {
  100. last_rec = rec;
  101. }
  102. }
  103. ASSERT(last_rec);
  104. // make sure only the last record is not committed or aborted
  105. rec = p->buf;
  106. for (i = 0; i < logsz; i += CRS_RECORD_SZ, rec++) {
  107. if (!(rec->hdr.state & (CRS_COMMIT | CRS_ABORT))) {
  108. if (rec != last_rec) {
  109. CrsLog(("crs:%d Bad record %d state %x expected commit|abort\n",
  110. p->lid, i/CRS_RECORD_SZ, rec->hdr.state));
  111. return ERROR_INTERNAL_ERROR;
  112. }
  113. }
  114. }
  115. p->last_record = (int) (last_rec - p->buf);
  116. p->seq = last_rec->hdr.seq;
  117. p->epoch = last_rec->hdr.epoch;
  118. return ERROR_SUCCESS;
  119. }
  120. #define CrspFlush(p,offset) CrspWrite(p,offset, CRS_SECTOR_SZ)
  121. static
  122. DWORD
  123. CrspWrite(CrsInfo_t *p, int offset, DWORD length)
  124. {
  125. DWORD n;
  126. if (p->fh == INVALID_HANDLE_VALUE) {
  127. CrsLog(("CrspWrite: Invalid file handle. Exiting...\n"));
  128. return ERROR_INVALID_HANDLE;
  129. }
  130. p->pending = FALSE;
  131. n = (DWORD) offset;
  132. // write out last sector, assumes lock is held
  133. ASSERT(offset < p->max_records);
  134. offset = offset / CRS_RECORDS_PER_SECTOR;
  135. CrsLog(("Crs%d: flush %d bytes record %d -> %d,%d\n", p->lid,
  136. length, n,
  137. offset, offset*CRS_SECTOR_SZ));
  138. n = SetFilePointer(p->fh, offset * CRS_SECTOR_SZ, NULL, FILE_BEGIN);
  139. if (n == INVALID_SET_FILE_POINTER) {
  140. return GetLastError();
  141. }
  142. n = 0;
  143. if (WriteFile(p->fh, (PVOID) &p->buf[offset*CRS_RECORDS_PER_SECTOR], length, &n, NULL)) {
  144. if (n != length) {
  145. CrsLog(("Write count mismatch, wrote %d, expected %d\n", n, length));
  146. return ERROR_BAD_LENGTH;
  147. }
  148. return ERROR_SUCCESS;
  149. }
  150. n = GetLastError();
  151. CrsLog(("Crs%d: flush record %d failed err %d\n", p->lid, offset, n));
  152. if (n == ERROR_UNEXP_NET_ERR) {
  153. // repeat the write one more time
  154. p->pending = TRUE;
  155. }
  156. return n;
  157. }
  158. static
  159. DWORD
  160. CrspAppendRecord(CrsInfo_t *p, CrsRecord_t *rr, CrsRecord_t **rec)
  161. {
  162. CrsRecord_t *q;
  163. DWORD err;
  164. // tag record
  165. rr->hdr.tag = CRS_TAG;
  166. // assumes lock is held
  167. if ((p->last_record & CRS_SECTOR_MASK) == CRS_SECTOR_MASK) {
  168. // flush current sector
  169. err = CrspFlush(p, p->last_record);
  170. if (err != ERROR_SUCCESS)
  171. return err;
  172. }
  173. // advance last record
  174. p->last_record++;
  175. if (p->last_record == p->max_records)
  176. p->last_record = 0;
  177. CrsLog(("Crs%d: append record %d epoch %I64d seq %I64d state %x\n",
  178. p->lid, p->last_record,
  179. rr->hdr.epoch, rr->hdr.seq, rr->hdr.state));
  180. // copy record
  181. q = &p->buf[p->last_record];
  182. memcpy((PVOID)q, (PVOID) rr, CRS_RECORD_SZ);
  183. // flush it out now
  184. err = CrspFlush(p, p->last_record);
  185. if (err == ERROR_SUCCESS) {
  186. if (rec) *rec = q;
  187. } else {
  188. if (p->last_record == 0)
  189. p->last_record = p->max_records;
  190. p->last_record--;
  191. }
  192. return err;
  193. }
  194. // NextRecord:
  195. // if seq is null, fill in last record and return SUCCESS
  196. // if seq is not found, return NOT_FOUND
  197. // if seq is last record, return EOF
  198. // otherwise return next record after seq in lrec and SUCCESS
  199. DWORD
  200. CrspNextLogRecord(CrsInfo_t *info, CrsRecord_t *seq,
  201. CrsRecord_t *lrec, BOOLEAN this_flag)
  202. {
  203. CrsRecord_t *last, *p;
  204. DWORD err = ERROR_SUCCESS;
  205. if (lrec == NULL || info == NULL) {
  206. return ERROR_INVALID_PARAMETER;
  207. }
  208. // read record
  209. EnterCriticalSection(&info->lock);
  210. last = &info->buf[info->last_record];
  211. if (seq == NULL) {
  212. CrsLog(("Crs%d: last record %d %I64d %I64d\n",
  213. info->lid, info->last_record, last->hdr.epoch, last->hdr.seq));
  214. // read last record
  215. memcpy(lrec, last, CRS_RECORD_SZ);
  216. } else if (seq->hdr.epoch != last->hdr.epoch ||
  217. seq->hdr.seq != last->hdr.seq) {
  218. int i;
  219. CrsLog(("Crs%d: last record %d %I64d %I64d search %I64d %I64d\n",
  220. info->lid, info->last_record,
  221. last->hdr.epoch, last->hdr.seq,
  222. seq->hdr.epoch, seq->hdr.seq));
  223. // assume we don't have it
  224. p = seq;
  225. seq = NULL;
  226. // do a search instead of index, so that
  227. // seq can be reset as epoch increments
  228. for (i = 0; i < info->max_records; i++) {
  229. last = &info->buf[i];
  230. if (p->hdr.epoch == last->hdr.epoch &&
  231. p->hdr.seq == last->hdr.seq) {
  232. seq = last;
  233. break;
  234. }
  235. }
  236. if (seq != NULL) {
  237. if (this_flag == FALSE) {
  238. // return record after this one
  239. i++;
  240. if (i >= info->max_records)
  241. i = 0;
  242. seq = &info->buf[i];
  243. }
  244. CrsLog(("Crs%d: search found %d %I64d, %I64d\n", info->lid,
  245. seq - info->buf, seq->hdr.epoch, seq->hdr.seq));
  246. memcpy(lrec, seq, CRS_RECORD_SZ);
  247. } else {
  248. err = ERROR_NOT_FOUND;
  249. }
  250. } else {
  251. CrsLog(("Crs%d: reached last record %d %I64d %I64d, %I64d %I64d\n",
  252. info->lid, info->last_record,
  253. last->hdr.epoch, last->hdr.seq,
  254. seq->hdr.epoch, seq->hdr.seq));
  255. if (this_flag == TRUE) {
  256. // we are trying to read the last record
  257. memcpy(lrec, last, CRS_RECORD_SZ);
  258. err = ERROR_SUCCESS;
  259. } else {
  260. err = ERROR_HANDLE_EOF;
  261. }
  262. }
  263. LeaveCriticalSection(&info->lock);
  264. if (err == ERROR_SUCCESS && lrec->hdr.epoch == 0) {
  265. // invalid rec, log is empty
  266. err = ERROR_HANDLE_EOF;
  267. }
  268. return err;
  269. }
  270. // Call into fs with <undo, replay, query, disable, enable, done>
  271. // undo: pass replica in recovery due to a conflict
  272. // replay: replica is missing change, if replay fails with abort, we
  273. // do a full copy; otherwise we issue a skip record
  274. // query: ask replica if record was completed or not
  275. // done: signal end of recovery and pass in new wset, rset
  276. // we silently handle <abort(skip) and epoch records>
  277. // abort: add a skip record
  278. // epoch records: just log it as is
  279. DWORD
  280. CrspReplay(LPVOID rec)
  281. {
  282. CrsRecoveryBlk_t *rr;
  283. CrsInfo_t *info, *minfo;
  284. CrsRecord_t *p, *q;
  285. CrsRecord_t lrec, mlrec;
  286. DWORD err;
  287. rr = (CrsRecoveryBlk_t *) rec;
  288. info = rr->info;
  289. minfo = rr->minfo;
  290. CrsLog(("CrsReplay%d mid %d, lid %d leader_id %d\n",
  291. rr->nid, rr->mid, info->lid, info->leader_id));
  292. // for now force a full copy. It seems sometimes I get into a bad state, when we
  293. // get the time, we can reenable this and find out exactly the corner cases that
  294. // cause us to be out of sync.
  295. #if 1
  296. do {
  297. p = NULL;
  298. // read last record
  299. err = CrspNextLogRecord(info, NULL, &lrec, FALSE);
  300. if (err != ERROR_SUCCESS) {
  301. CrsLog(("CrsReplay%d: unable to read last record %d\n",
  302. info->lid, err));
  303. break;
  304. }
  305. // find our last record in master replica
  306. q = &lrec;
  307. p = &mlrec;
  308. err = CrspNextLogRecord(minfo, q, p, TRUE);
  309. // if found and consistent with master, no undo
  310. if (err == ERROR_SUCCESS && p->hdr.state == q->hdr.state) {
  311. CrsLog(("CrsReplay%d: last record %I64d, %I64d consistent %x %x\n",
  312. info->lid, q->hdr.epoch, q->hdr.seq,
  313. p->hdr.state, q->hdr.state));
  314. break;
  315. }
  316. if (err != ERROR_SUCCESS) {
  317. CrsLog(("CrsReplay%d: missing lrec %I64d, %I64d in disk %d, err %d\n",
  318. info->lid, q->hdr.epoch, q->hdr.seq, minfo->lid, err));
  319. } else {
  320. CrsLog(("CrsReplay%d: undo last record %I64d, %I64d %x needs %x\n",
  321. info->lid, q->hdr.epoch, q->hdr.seq,
  322. q->hdr.state, p->hdr.state));
  323. ASSERT(p->hdr.state & (CRS_COMMIT|CRS_ABORT));
  324. }
  325. // last record is in conflict, we must undo it first
  326. if (!(q->hdr.state & CRS_EPOCH)) {
  327. // if we found this record in master and a conflict is detected,
  328. // we undo it. Otherwise, we need to do a full copy
  329. if (err == ERROR_SUCCESS) {
  330. ASSERT(p->hdr.state & (CRS_COMMIT|CRS_ABORT));
  331. ASSERT(q->hdr.state & CRS_PREPARE);
  332. err = info->callback(info->callback_arg,
  333. rr->nid, q,
  334. CRS_ACTION_UNDO, rr->mid);
  335. }
  336. } else {
  337. // A missing epoch record doesn't mean we are old. A regroup
  338. // could have happened but no new data records got added. We
  339. // undo it, and continue;
  340. err = STATUS_SUCCESS;
  341. }
  342. if (err == STATUS_SUCCESS) {
  343. // update current record, sequence, epoch
  344. info->buf[info->last_record].hdr.state = 0;
  345. info->buf[info->last_record].hdr.epoch = 0;
  346. info->buf[info->last_record].hdr.seq = 0;
  347. if (info->last_record == 0) {
  348. info->last_record = info->max_records;
  349. }
  350. info->last_record--;
  351. info->seq = info->buf[info->last_record].hdr.seq;
  352. info->epoch = info->buf[info->last_record].hdr.epoch;
  353. CrsLog(("CrsReplay%d: new last record %d %I64d, %I64d\n",
  354. info->lid, info->last_record, info->epoch, info->seq));
  355. } else {
  356. // can't undo it, do full copy and readjust our log
  357. CrsLog(("CrsReplay%d: Unable to undo record %I64d, %I64d\n",
  358. info->lid, q->hdr.epoch, q->hdr.seq));
  359. p = NULL;
  360. }
  361. } while (err == STATUS_SUCCESS && info->state == CRS_STATE_RECOVERY);
  362. while (p != NULL && info->state == CRS_STATE_RECOVERY) {
  363. // read master copy
  364. err = CrspNextLogRecord(minfo, p, &mlrec, FALSE);
  365. if (err != ERROR_SUCCESS) {
  366. if (err == ERROR_HANDLE_EOF) {
  367. CrsLog(("CrsReplay%d: last record %I64d, %I64d in disk %d\n",
  368. info->lid, q->hdr.epoch, q->hdr.seq, minfo->lid));
  369. // the last record is where we are at
  370. info->seq = info->buf[info->last_record].hdr.seq;
  371. info->epoch = info->buf[info->last_record].hdr.epoch;
  372. // This would be performed later in CrsStart().
  373. #if 0
  374. // we reached the end, signal end of recovery
  375. err = info->callback(info->callback_arg,
  376. rr->nid, p,
  377. CRS_ACTION_DONE, rr->mid);
  378. #else
  379. err = STATUS_SUCCESS;
  380. #endif
  381. goto exit;
  382. }
  383. break;
  384. }
  385. p = &mlrec;
  386. if ((p->hdr.state & CRS_EPOCH) || (p->hdr.state & CRS_ABORT)) {
  387. CrsLog(("CrsReplay%d: skip record %I64d, %I64d %x\n",
  388. info->lid, p->hdr.epoch, p->hdr.seq, p->hdr.state));
  389. err = !STATUS_SUCCESS;
  390. } else if (p->hdr.state & CRS_COMMIT) {
  391. err = info->callback(info->callback_arg,
  392. rr->nid, p,
  393. CRS_ACTION_REPLAY, rr->mid);
  394. if (err == STATUS_TRANSACTION_ABORTED) {
  395. CrsLog(("CrsReplay: failed nid %d seq %I64d err %x\n",
  396. rr->nid, p->hdr.seq, err));
  397. break;
  398. }
  399. } else {
  400. ASSERT(p->hdr.state & CRS_PREPARE);
  401. // what if the record is prepared but not yet committed or
  402. // aborted; in transit record.
  403. // stop now
  404. CrsLog(("CrsReplay%d: bad record seq %I64d state %x\n",
  405. rr->nid, p->hdr.seq, p->hdr.state));
  406. break;
  407. }
  408. if (err != STATUS_SUCCESS) {
  409. // add record
  410. err = CrspAppendRecord(info, p, NULL);
  411. if (err != ERROR_SUCCESS) {
  412. CrsLog(("CrsReplay%d: failed append seq %I64d err %x\n",
  413. rr->nid, p->hdr.seq, err));
  414. break;
  415. }
  416. if (p->hdr.state & CRS_EPOCH) {
  417. ; //ASSERT(info->epoch+1 == p->hdr.epoch);
  418. } else {
  419. ASSERT(info->epoch == p->hdr.epoch);
  420. ASSERT(info->seq+1 == p->hdr.seq);
  421. }
  422. info->seq = p->hdr.seq;
  423. info->epoch = p->hdr.epoch;
  424. } else if (info->seq == p->hdr.seq) {
  425. // make sure we have added it
  426. ASSERT(info->seq == p->hdr.seq);
  427. ASSERT(info->epoch == p->hdr.epoch);
  428. ASSERT(info->buf[info->last_record].hdr.seq == p->hdr.seq);
  429. ASSERT(info->buf[info->last_record].hdr.epoch == p->hdr.epoch);
  430. // Propagate dubious bit
  431. if (p->hdr.state & CRS_DUBIOUS) {
  432. info->buf[info->last_record].hdr.state |= CRS_DUBIOUS;
  433. }
  434. ASSERT(info->buf[info->last_record].hdr.state == p->hdr.state);
  435. } else {
  436. // force a full copy
  437. err = !STATUS_SUCCESS;
  438. break;
  439. }
  440. }
  441. #else
  442. p = NULL;
  443. #endif
  444. if (p == NULL || err != STATUS_SUCCESS) {
  445. CrsLog(("CrsReplay%d: Full copy from disk %d\n",
  446. info->lid, minfo->lid));
  447. // we are out of date or need full recovery, do a full copy
  448. err = info->callback(info->callback_arg,
  449. rr->nid, NULL,
  450. CRS_ACTION_COPY, rr->mid);
  451. if (err == STATUS_SUCCESS) {
  452. DWORD len;
  453. // we now copy our master log and flush it
  454. ASSERT(minfo->max_records == info->max_records);
  455. len = info->max_records * CRS_RECORD_SZ;
  456. memcpy(info->buf, minfo->buf, len);
  457. err = CrspWrite(info, 0, len);
  458. if (err == ERROR_SUCCESS) {
  459. // adjust our state
  460. info->last_record = minfo->last_record;
  461. info->seq = info->buf[info->last_record].hdr.seq;
  462. info->epoch = info->buf[info->last_record].hdr.epoch;
  463. // The action below would be performed later in CrsStart().
  464. #if 0
  465. // we reached the end, signal end of recovery
  466. err = info->callback(info->callback_arg,
  467. rr->nid, p,
  468. CRS_ACTION_DONE, rr->mid);
  469. #endif
  470. }
  471. }
  472. }
  473. exit:
  474. CrsLog(("CrsReplay%d mid %d status 0x%x\n", rr->nid, rr->mid, err));
  475. return err;
  476. }
  477. /////////////////////// Public Functions //////////////////////
  478. DWORD
  479. WINAPI
  480. CrsOpen(crs_callback_t callback, PVOID callback_arg, USHORT lid,
  481. WCHAR *log_name, int max_logsectors, HANDLE *outhdl)
  482. {
  483. // Open the log file
  484. // If the file in newly create, set the proper size
  485. // If the file size is not the same size, we need to either
  486. // expand or truncate the file. (truncate needs copy)
  487. // Scan file to locate last sector and record
  488. // If last record hasn't been commited, issue a query.
  489. // If query succeeded then, mark it as committed.
  490. // Set epoch,seq
  491. DWORD status;
  492. HANDLE maph;
  493. CrsInfo_t *p;
  494. int logsz;
  495. ULONG disp=FILE_OPEN_IF;
  496. if (outhdl == NULL) {
  497. return ERROR_INVALID_PARAMETER;
  498. }
  499. *outhdl = NULL;
  500. p = (CrsInfo_t *) malloc(sizeof(*p));
  501. if (p == NULL) {
  502. return ERROR_NOT_ENOUGH_MEMORY;
  503. }
  504. memset((PVOID) p, 0, sizeof(*p));
  505. // CrsLog(("Crs%d file '%S'\n", lid, log_name));
  506. p->lid = lid;
  507. p->callback = callback;
  508. p->callback_arg = callback_arg;
  509. p->pending = FALSE;
  510. #if 0
  511. // Create log file, and set size of newly created
  512. p->fh = CreateFileW(log_name,
  513. GENERIC_READ | GENERIC_WRITE,
  514. FILE_SHARE_READ|FILE_SHARE_WRITE,
  515. NULL,
  516. OPEN_ALWAYS,
  517. FILE_FLAG_WRITE_THROUGH,
  518. NULL);
  519. #else
  520. p->fh = INVALID_HANDLE_VALUE;
  521. status = xFsCreate(&p->fh,
  522. NULL,
  523. log_name,
  524. wcslen(log_name),
  525. FILE_WRITE_THROUGH|FILE_SYNCHRONOUS_IO_ALERT,
  526. 0,
  527. FILE_SHARE_READ|FILE_SHARE_WRITE,
  528. &disp,
  529. GENERIC_READ | GENERIC_WRITE | FILE_WRITE_EA,
  530. NULL,
  531. 0
  532. );
  533. if ((status == STATUS_SUCCESS)&&(disp == FILE_OPENED)) {
  534. status = ERROR_ALREADY_EXISTS;
  535. }
  536. #endif
  537. // status = GetLastError();
  538. if(p->fh == INVALID_HANDLE_VALUE){
  539. free((char *) p);
  540. return status;
  541. }
  542. // acquire an exclusive lock on the whole file
  543. if (!LockFile(p->fh, 0, 0, (DWORD)-1, (DWORD)-1)) {
  544. FILE_FULL_EA_INFORMATION ea[2] = {0};
  545. IO_STATUS_BLOCK ios;
  546. NTSTATUS err;
  547. // get status
  548. status = GetLastError();
  549. // change the ea to cause a notification to happen
  550. ea[0].NextEntryOffset = 0;
  551. ea[0].Flags = 0;
  552. ea[0].EaNameLength = 1;
  553. ea[0].EaValueLength = 1;
  554. ea[0].EaName[0] = 'X';
  555. // Increment size by 1, due to value.
  556. err = NtSetEaFile(p->fh, &ios, (PVOID) ea, sizeof(ea));
  557. CrsLog(("Crs%d Setting EA err=0x%x status=0x%x\n", lid, err, status));
  558. goto error;
  559. }
  560. if (status == ERROR_ALREADY_EXISTS) {
  561. // todo: compare current file size to new size and adjust file
  562. // size accordingly. For now, just use old size
  563. logsz = GetFileSize(p->fh, NULL);
  564. CrsLog(("Crs%d: (Open) Filesz %d max_sec %d\n", lid, logsz, max_logsectors));
  565. ASSERT(logsz == max_logsectors * CRS_SECTOR_SZ);
  566. } else {
  567. //extend the file pointer to max size
  568. logsz = max_logsectors * CRS_SECTOR_SZ;
  569. SetFilePointer(p->fh, logsz, NULL, FILE_BEGIN);
  570. SetEndOfFile(p->fh);
  571. CrsLog(("Crs%d: (Create) Set Filesz %d max_sec %d\n", lid, logsz, max_logsectors));
  572. }
  573. // allocate file copy in memory
  574. p->buf = xmalloc(logsz);
  575. if (p->buf == NULL) {
  576. status = ERROR_NOT_ENOUGH_MEMORY;
  577. goto error;
  578. }
  579. // set max record
  580. p->max_records = logsz / CRS_RECORD_SZ;
  581. if (status == ERROR_ALREADY_EXISTS) {
  582. // load file and compute last epoch/seq
  583. status = CrspFindLast(p, logsz);
  584. } else {
  585. status = !ERROR_SUCCESS;
  586. }
  587. // init the file, when we detect a read failure or first time
  588. if (status != ERROR_SUCCESS) {
  589. CrsRecord_t *r;
  590. int i;
  591. // initialize file
  592. p->seq = 0;
  593. p->epoch = 0;
  594. p->last_record = 0;
  595. r = p->buf;
  596. for (i = 0; i < logsz; i+= CRS_RECORD_SZ, r++) {
  597. r->hdr.epoch = p->epoch;
  598. r->hdr.seq = p->seq;
  599. r->hdr.tag = CRS_TAG;
  600. r->hdr.state = CRS_COMMIT | CRS_PREPARE | CRS_EPOCH;
  601. }
  602. status = CrspWrite(p, 0, logsz);
  603. }
  604. if (status != ERROR_SUCCESS) {
  605. goto error;
  606. }
  607. CrsLog(("Crs%d: %x Last record %d max %d epoch %I64d seq %I64d\n", p->lid,
  608. p->fh,
  609. p->last_record, p->max_records, p->epoch, p->seq));
  610. // initialize rest of state
  611. p->state = CRS_STATE_INIT;
  612. p->refcnt = 1;
  613. p->leader_id = 0;
  614. InitializeCriticalSection(&p->lock);
  615. *outhdl = p;
  616. return ERROR_SUCCESS;
  617. error:
  618. CloseHandle(p->fh);
  619. if (p->buf) {
  620. xfree(p->buf);
  621. }
  622. free((PVOID) p);
  623. return status;
  624. }
  625. //
  626. DWORD
  627. WINAPI
  628. CrsStart(PVOID *hdls, ULONG alive_set, int cluster_sz,
  629. ULONG *write_set, ULONG *read_set, ULONG *evict_set)
  630. {
  631. DWORD status;
  632. CrsInfo_t **info = (CrsInfo_t **) hdls;
  633. int i, active_sz, mid;
  634. ULONG mask, active_set, fail_set;
  635. CrsInfo_t *p;
  636. CrsRecord_t *q, *mlrec;
  637. if (write_set) *write_set = 0;
  638. if (read_set) *read_set = 0;
  639. if (evict_set) *evict_set = 0;
  640. // no alive node
  641. if (cluster_sz == 0 || alive_set == 0) {
  642. // nothing to do
  643. return ERROR_WRITE_PROTECT;
  644. }
  645. // scan each hdl and make sure it is initialized and lock all hdls
  646. mask = alive_set;
  647. for (i = 0; mask != 0; i++, mask = mask >> 1) {
  648. if (!(mask & 0x1)) {
  649. continue;
  650. }
  651. p = info[i];
  652. if (p == NULL) {
  653. continue;
  654. }
  655. EnterCriticalSection(&p->lock);
  656. // check the state of the last record
  657. p = info[i];
  658. q = &p->buf[p->last_record];
  659. CrsLog(("Crs%d last record %d epoch %I64d seq %I64d state %x\n",
  660. p->lid, p->last_record,
  661. q->hdr.epoch, q->hdr.seq, q->hdr.state));
  662. }
  663. mid = 0;
  664. mlrec = NULL;
  665. // select master replica
  666. for (i = 0, mask = alive_set; mask != 0; i++, mask = mask >> 1) {
  667. if (!(mask & 0x1)) {
  668. continue;
  669. }
  670. p = info[i];
  671. if (p == NULL)
  672. continue;
  673. q = &p->buf[p->last_record];
  674. if (!mlrec ||
  675. mlrec->hdr.epoch < q->hdr.epoch ||
  676. (mlrec->hdr.epoch == q->hdr.epoch && mlrec->hdr.seq < q->hdr.seq) ||
  677. (mlrec->hdr.epoch == q->hdr.epoch && mlrec->hdr.seq == q->hdr.seq &&
  678. mlrec->hdr.state != q->hdr.state && (q->hdr.state & CRS_COMMIT))) {
  679. mid = i;
  680. mlrec = q;
  681. }
  682. }
  683. ASSERT(mid != 0);
  684. // if master last record is in doubt, query filesystem. If the filesystem
  685. // is certain that the operation has occured, it returns STATUS_SUCCESS for
  686. // COMMIT, STATUS_CANCELLED for ABORT, and STATUS_NOT_FOUND for can't tell.
  687. // All undetermined IO must be undone and redone in all non-master replicas
  688. // to ensure all replicas reach consistency. This statement is true even
  689. // for replicas that are currently absent from our set. We tag such records
  690. // we both COMMIT and ABORT, so that the replay thread issues replay for
  691. // new records and undo,replay for last records
  692. p = info[mid];
  693. p->leader_id = (USHORT) mid;
  694. ASSERT(mlrec != NULL);
  695. if (!(mlrec->hdr.state & (CRS_COMMIT | CRS_ABORT))) {
  696. ASSERT(mlrec->hdr.state & CRS_PREPARE);
  697. status = p->callback(p->callback_arg, p->lid,
  698. mlrec, CRS_ACTION_QUERY,
  699. p->lid);
  700. if (status == STATUS_SUCCESS) {
  701. mlrec->hdr.state |= CRS_COMMIT;
  702. } else if (status == STATUS_CANCELLED) {
  703. mlrec->hdr.state |= CRS_ABORT;
  704. } else if (status == STATUS_NOT_FOUND) {
  705. // assume it is committed, but mark it for undo during recovery
  706. mlrec->hdr.state |= (CRS_COMMIT | CRS_DUBIOUS);
  707. }
  708. // todo: if status == TRANSACTION_ABORTED, we need to bail out since
  709. // must master is dead
  710. // no need to flush, I think!
  711. // CrspFlush(p, p->last_record);
  712. // todo: what if the flush fails here, I am assuming that
  713. // an append will equally fail.
  714. }
  715. ASSERT(mlrec->hdr.state & (CRS_COMMIT | CRS_ABORT));
  716. // compute sync and recovery masks
  717. fail_set = 0;
  718. active_set = 0;
  719. active_sz = 0;
  720. for (i = 0, mask = alive_set; mask != 0; i++, mask = mask >> 1) {
  721. if (!(mask & 0x1)) {
  722. continue;
  723. }
  724. p = info[i];
  725. if (p == NULL) {
  726. continue;
  727. }
  728. // set leader id
  729. p->leader_id = (USHORT) mid;
  730. q = &p->buf[p->last_record];
  731. if (CrspEqual(mlrec, q)) {
  732. ASSERT(q->hdr.state & (CRS_COMMIT | CRS_ABORT));
  733. p->state = CRS_STATE_READ;
  734. active_set |= (1 << i);
  735. active_sz++;
  736. } else if (p->state != CRS_STATE_RECOVERY) {
  737. CrsRecoveryBlk_t rrbuf;
  738. CrsRecoveryBlk_t *rr = &rrbuf;
  739. // recover replica
  740. rr->nid = i;
  741. rr->mid = mid;
  742. rr->info = p;
  743. rr->minfo = info[mid];
  744. // set recovery state
  745. p->state = CRS_STATE_RECOVERY;
  746. status = CrspReplay((LPVOID) rr);
  747. // if we fail, evict this replica
  748. if (status != ERROR_SUCCESS) {
  749. fail_set |= (1 << i);
  750. } else {
  751. // repeat this replica again
  752. i--;
  753. mask = mask << 1;
  754. }
  755. }
  756. }
  757. // Now recreate the open file state. This needs to be done for all replicas.
  758. // Removed this operation from CrspReplay() since now it needs to be performed on
  759. // all replicas, even master.
  760. //
  761. for (i=0, mask=active_set; mask != 0;i++, mask = mask >>1) {
  762. if (!(mask & 0x1)) {
  763. continue;
  764. }
  765. status = info[i]->callback(info[i]->callback_arg, i, NULL, CRS_ACTION_DONE, mid);
  766. if (status != STATUS_SUCCESS) {
  767. active_set &= (~(1<<i));
  768. active_sz--;
  769. fail_set |= (1<<i);
  770. }
  771. }
  772. // assume success
  773. status = ERROR_SUCCESS;
  774. // set read sets
  775. if (read_set) *read_set = active_set;
  776. if (!CRS_QUORUM(active_sz, cluster_sz)) {
  777. CrsLog(("No quorum active %d cluster %d\n", active_sz, cluster_sz));
  778. mid = 0;
  779. status = ERROR_WRITE_PROTECT;
  780. } else {
  781. int pass_cnt = 0;
  782. ULONG pass_set = 0;
  783. // Enable writes on all active replicas
  784. for (i = 0, mask = active_set; mask != 0; i++, mask = mask >> 1) {
  785. CrsRecord_t rec;
  786. if (!(mask & 0x1)) {
  787. continue;
  788. }
  789. p = info[i];
  790. if (p == NULL)
  791. continue;
  792. p->state = CRS_STATE_WRITE;
  793. // we now generate a new epoch and flush it to the disk
  794. p->epoch++;
  795. if (p->epoch == 0)
  796. p->epoch = 1;
  797. // reset seq to zero
  798. p->seq = 0;
  799. // write new epoch now, if not a majority replicas succeeded in writing
  800. // the new <epoch, seq> we fail
  801. rec.hdr.epoch = p->epoch;
  802. rec.hdr.seq = p->seq;
  803. rec.hdr.state = CRS_PREPARE | CRS_COMMIT | CRS_EPOCH;
  804. memset(rec.data, 0, sizeof(rec.data));
  805. if (CrspAppendRecord(p, &rec, NULL) == ERROR_SUCCESS) {
  806. pass_cnt++;
  807. pass_set |= (1 << i);
  808. } else {
  809. fail_set |= (1 << i);
  810. }
  811. }
  812. // Recheck to make sure all replicas have advanced epoch
  813. if (!CRS_QUORUM(pass_cnt, cluster_sz)) {
  814. CrsLog(("No quorum due to error pass %d cluster %d\n", pass_cnt, cluster_sz));
  815. mid = 0;
  816. pass_set = 0;
  817. pass_cnt = 0;
  818. status = ERROR_WRITE_PROTECT;
  819. }
  820. if (pass_cnt != active_sz) {
  821. // some replicas have died
  822. for (i = 0, mask = pass_set; mask != 0; i++, mask = mask >> 1) {
  823. if ((alive_set & (1 << i)) && ((~mask) & (1 << i))) {
  824. p = info[i];
  825. ASSERT(p != NULL);
  826. p->state = CRS_STATE_READ;
  827. }
  828. }
  829. }
  830. // set write set
  831. if (write_set) *write_set = pass_set;
  832. }
  833. if (evict_set) *evict_set = fail_set;
  834. // unlock all hdls and set new master if any
  835. for (i = 0, mask = alive_set; mask != 0; i++, mask = mask >> 1) {
  836. if (!(mask & 0x1)) {
  837. continue;
  838. }
  839. p = info[i];
  840. if (p == NULL)
  841. continue;
  842. p->leader_id = (USHORT) mid;
  843. LeaveCriticalSection(&p->lock);
  844. }
  845. return status;
  846. }
  847. void
  848. WINAPI
  849. CrsClose(PVOID hd)
  850. {
  851. DWORD err=ERROR_SUCCESS;
  852. CrsInfo_t *info = (CrsInfo_t *) hd;
  853. // If we any recovery threads running, make sure we terminate them first
  854. // before close and free all of this stuff
  855. if (info == NULL) {
  856. CrsLog(("CrsClose: try to close a null handle!\n"));
  857. return;
  858. }
  859. // Flush everything out and close the file
  860. EnterCriticalSection(&info->lock);
  861. // flush
  862. CrspFlush(info, info->last_record);
  863. LeaveCriticalSection(&info->lock);
  864. DeleteCriticalSection(&info->lock);
  865. if (info->fh != INVALID_HANDLE_VALUE) {
  866. UnlockFile(info->fh, 0, 0, (DWORD)-1, (DWORD)-1);
  867. err = CloseHandle(info->fh);
  868. info->fh = INVALID_HANDLE_VALUE;
  869. }
  870. CrsLog(("Crs%d: %x Closed %d\n", info->fh, info->lid, err));
  871. xfree(info->buf);
  872. free((char *) info);
  873. }
  874. void
  875. WINAPI
  876. CrsFlush(PVOID hd)
  877. {
  878. CrsInfo_t *info = (CrsInfo_t *) hd;
  879. // if we have a commit or abort that isn't flushed yet, flush it now
  880. EnterCriticalSection(&info->lock);
  881. if (info->pending == TRUE) {
  882. CrspFlush(info, info->last_record);
  883. }
  884. LeaveCriticalSection(&info->lock);
  885. }
  886. PVOID
  887. WINAPI
  888. CrsPrepareRecord(PVOID hd, PVOID lrec, crs_id_t id, ULONG *retVal)
  889. {
  890. CrsRecord_t *p = (CrsRecord_t *)lrec;
  891. CrsInfo_t *info = (CrsInfo_t *) hd;
  892. DWORD err;
  893. // move to correct slot in this sector. If we need a new sector,
  894. // read it from the file. Make sure we flush any pending commits on
  895. // current sector before we over write our in memory sector buffer.
  896. // prepare record, if seq none 0 then we are skipping the next sequence
  897. *retVal = STATUS_MEDIA_WRITE_PROTECTED;
  898. EnterCriticalSection(&info->lock);
  899. if (info->state == CRS_STATE_WRITE ||
  900. (info->state == CRS_STATE_RECOVERY && id != NULL && id[0] != 0)) {
  901. if (id != NULL && id[0] != 0) {
  902. CrsHdr_t *tmp = (CrsHdr_t *) id;
  903. assert(id[0] == info->seq+1);
  904. p->hdr.seq = tmp->seq;
  905. p->hdr.epoch = tmp->epoch;
  906. } else {
  907. p->hdr.seq = info->seq+1;
  908. p->hdr.epoch = info->epoch;
  909. }
  910. p->hdr.state = CRS_PREPARE;
  911. err = CrspAppendRecord(info, p, &p);
  912. *retVal = err;
  913. if (err == ERROR_SUCCESS) {
  914. // we return with the lock held, gets release on commitorabort
  915. CrsLog(("Crs%d prepare %x seq %I64d\n",info->lid, p, p->hdr.seq));
  916. return p;
  917. }
  918. CrsLog(("Crs%d: Append failed seq %I64%d\n", info->lid, p->hdr.seq));
  919. } else {
  920. CrsLog(("Crs%d: Prepare bad state %d id %x\n", info->lid, info->state, id));
  921. }
  922. LeaveCriticalSection(&info->lock);
  923. return NULL;
  924. }
  925. int
  926. WINAPI
  927. CrsCommitOrAbort(PVOID hd, PVOID lrec, int commit)
  928. {
  929. CrsRecord_t *p = (CrsRecord_t *)lrec;
  930. CrsInfo_t *info = (CrsInfo_t *) hd;
  931. if (p == NULL || info == NULL) {
  932. return ERROR_INVALID_PARAMETER;
  933. }
  934. // update state of record
  935. if (p->hdr.seq != info->seq+1) {
  936. CrsLog(("Crs: sequence mis-match on commit|abort %I64d %I64d\n",
  937. p->hdr.seq, info->seq));
  938. assert(0);
  939. return ERROR_INVALID_PARAMETER;
  940. }
  941. assert(!(p->hdr.state & (CRS_COMMIT | CRS_ABORT)));
  942. // todo: this is wrong, what if one replica succeeds
  943. // and others abort. Now, the others will reuse the
  944. // same seq for a different update and when the
  945. // succeeded replica rejoins it can't tell that the
  946. // sequence got reused.
  947. if (commit == TRUE) {
  948. p->hdr.state |= CRS_COMMIT;
  949. // advance the sequence
  950. info->seq++;
  951. CrsLog(("Crs%d: commit last %d leader %d seq %I64d\n", info->lid,
  952. info->last_record,
  953. info->leader_id, p->hdr.seq));
  954. } else {
  955. p->hdr.state |= CRS_ABORT;
  956. // we need to re-adjust our last record
  957. if (info->last_record == 0) {
  958. info->last_record = info->max_records;
  959. }
  960. info->last_record--;
  961. CrsLog(("Crs%d: abort last %d leader %d seq %I64d\n", info->lid,
  962. info->last_record,
  963. info->leader_id, p->hdr.seq));
  964. }
  965. info->pending = TRUE;
  966. LeaveCriticalSection(&info->lock);
  967. return ERROR_SUCCESS;
  968. }
  969. int
  970. WINAPI
  971. CrsCanWrite(PVOID hd)
  972. {
  973. CrsInfo_t *info = (CrsInfo_t *) hd;
  974. int err;
  975. // do we have a quorm or not
  976. EnterCriticalSection(&info->lock);
  977. err = (info->state == CRS_STATE_WRITE);
  978. LeaveCriticalSection(&info->lock);
  979. return err;
  980. }
  981. crs_epoch_t
  982. CrsGetEpoch(PVOID hd)
  983. {
  984. CrsInfo_t *info=(CrsInfo_t *)hd;
  985. crs_epoch_t epoch;
  986. EnterCriticalSection(&info->lock);
  987. epoch = info->epoch;
  988. LeaveCriticalSection(&info->lock);
  989. return epoch;
  990. }