Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1213 lines
34 KiB

  1. /*++
  2. Copyright (c) 1997-1999 Microsoft Corporation
  3. Module Name:
  4. fetch.c
  5. Abstract:
  6. Staging File Fetcher Command Server.
  7. Author:
  8. Billy J. Fuller 05-Jun-1997
  9. Environment
  10. User mode winnt
  11. --*/
  12. #include <ntreppch.h>
  13. #pragma hdrstop
  14. #undef DEBSUB
  15. #undef DEBSUB
  16. #define DEBSUB "FETCH:"
  17. #include <frs.h>
  18. #include <tablefcn.h>
  19. #include <perrepsr.h>
  20. // #include <md5.h>
  21. //
  22. // Retry times
  23. //
  24. // NOT TOO LONG; we wouldn't want the comm timeout to hit on our
  25. // downstream partner waiting for the fetch to succeed.
  26. //
  27. // Our downstream partner waits FETCHCS_RETRY_WAIT before retrying.
  28. //
  29. #define FETCHCS_RETRY_MIN ( 1 * 1000) // 1 second
  30. #define FETCHCS_RETRY_MAX (10 * 1000) // 10 seconds
  31. #define FETCHCS_RETRY_WAIT ( 5 * 1000) // 5 seconds
  32. //
  33. // Maximume transfer block size in bytes
  34. //
  35. #define FETCHCS_MAX_BLOCK_SIZE (64 * 1024)
  36. //
  37. // Struct for the Staging File Fetcher Command Server
  38. // Contains info about the queues and the threads
  39. //
  40. COMMAND_SERVER FetchCs;
  41. ULONG MaxFetchCsThreads;
  42. //
  43. // Retry fetch after N fetches and reset N to N + 1
  44. //
  45. #if DBG
  46. #define PULL_FETCH_RETRY_TRIGGER(_Coc_, _WStatus_, _Flags_) \
  47. { \
  48. if (DebugInfo.FetchRetryTrigger && --DebugInfo.FetchRetryTrigger <= 0) { \
  49. if (WIN_SUCCESS(_WStatus_)) { \
  50. StageRelease(&_Coc_->ChangeOrderGuid, _Coc_->FileName, _Flags_, NULL, NULL); \
  51. _WStatus_ = ERROR_RETRY; \
  52. } \
  53. DebugInfo.FetchRetryReset += DebugInfo.FetchRetryInc; \
  54. DebugInfo.FetchRetryTrigger = DebugInfo.FetchRetryReset; \
  55. DPRINT2(0, "++ FETCH RETRY TRIGGER FIRED on %ws; reset to %d\n", \
  56. _Coc_->FileName, DebugInfo.FetchRetryTrigger); \
  57. } \
  58. }
  59. #define CHECK_FETCH_RETRY_TRIGGER(_Always_) \
  60. { \
  61. if (DebugInfo.FetchRetryReset && !_Always_) { \
  62. return FALSE; \
  63. } \
  64. }
  65. #else DBG
  66. #define PULL_FETCH_RETRY_TRIGGER(_WStatus_)
  67. #define CHECK_FETCH_RETRY_TRIGGER()
  68. #endif DBG
  69. DWORD
  70. StuGenerateStage(
  71. IN PCHANGE_ORDER_COMMAND Coc,
  72. IN PCHANGE_ORDER_ENTRY Coe,
  73. IN BOOL FromPreExisting,
  74. IN MD5_CTX *Md5,
  75. PULONGLONG GeneratedSize,
  76. OUT GUID *CompressionFormatUsed
  77. );
  78. DWORD
  79. StuGenerateDecompressedStage(
  80. IN PWCHAR StageDir,
  81. IN GUID *CoGuid,
  82. IN GUID *CompressionFormatUsed
  83. );
  84. BOOL
  85. FetchCsDelCsSubmit(
  86. IN PCOMMAND_PACKET Cmd,
  87. IN BOOL Always
  88. )
  89. /*++
  90. Routine Description:
  91. Set the timer and kick off a delayed staging file command
  92. Arguments:
  93. Cmd
  94. Return Value:
  95. None.
  96. --*/
  97. {
  98. #undef DEBSUB
  99. #define DEBSUB "FetchCsDelCsSubmit:"
  100. //
  101. // Don't bother if the fetch retry trigger is set (error injection)
  102. // MAY RETURN!!!
  103. //
  104. CHECK_FETCH_RETRY_TRIGGER(Always);
  105. //
  106. // Extend the retry time (but not too long)
  107. //
  108. RsTimeout(Cmd) <<= 1;
  109. if (RsTimeout(Cmd) > FETCHCS_RETRY_MAX) {
  110. if (Always) {
  111. RsTimeout(Cmd) = FETCHCS_RETRY_MAX;
  112. }
  113. else {
  114. return (FALSE);
  115. }
  116. }
  117. //
  118. // or too short
  119. //
  120. if (RsTimeout(Cmd) < FETCHCS_RETRY_MIN) {
  121. RsTimeout(Cmd) = FETCHCS_RETRY_MIN;
  122. }
  123. //
  124. // This command will come back to us in a bit
  125. //
  126. FrsDelCsSubmitSubmit(&FetchCs, Cmd, RsTimeout(Cmd));
  127. return (TRUE);
  128. }
  129. VOID
  130. FetchCsRetryFetch(
  131. IN PCOMMAND_PACKET Cmd
  132. )
  133. /*++
  134. Routine Description:
  135. Our upstream partner has requested that we retry the
  136. fetch at a later time because the staging file wasn't present
  137. and couldn't be regenerated because of sharing problems or
  138. lack of disk space.
  139. Arguments:
  140. Cmd
  141. Return Value:
  142. None.
  143. --*/
  144. {
  145. #undef DEBSUB
  146. #define DEBSUB "FetchCsRetryFetch:"
  147. DWORD WStatus;
  148. DWORD Flags;
  149. GUID *CoGuid;
  150. PWCHAR FileName;
  151. PCHANGE_ORDER_COMMAND Coc = RsCoc(Cmd);
  152. //
  153. // Already waited for a bit; retry
  154. //
  155. if (RsTimeout(Cmd)) {
  156. CHANGE_ORDER_COMMAND_TRACE(3, Coc, "Fetch Retry Initiated");
  157. RcsSubmitTransferToRcs(Cmd, CMD_RECEIVED_STAGE);
  158. return;
  159. }
  160. CoGuid = &Coc->ChangeOrderGuid;
  161. FileName = Coc->FileName;
  162. //
  163. // Free the data block
  164. //
  165. RsBlock(Cmd) = FrsFree(RsBlock(Cmd));
  166. RsBlockSize(Cmd) = QUADZERO;
  167. //
  168. // Delete the current staging file if we are starting over
  169. //
  170. if (RsFileOffset(Cmd).QuadPart == QUADZERO) {
  171. //
  172. // Acquire access to the staging file
  173. //
  174. Flags = STAGE_FLAG_RESERVE | STAGE_FLAG_EXCLUSIVE;
  175. if (CoCmdIsDirectory(Coc)) {
  176. SetFlag(Flags, STAGE_FLAG_FORCERESERVE);
  177. }
  178. WStatus = StageAcquire(CoGuid, FileName, Coc->FileSize, &Flags, NULL);
  179. if (WIN_SUCCESS(WStatus)) {
  180. StageDeleteFile(Coc, FALSE);
  181. StageRelease(CoGuid, FileName, STAGE_FLAG_UNRESERVE, NULL, NULL);
  182. }
  183. }
  184. RsTimeout(Cmd) = FETCHCS_RETRY_WAIT;
  185. FrsDelCsSubmitSubmit(&FetchCs, Cmd, RsTimeout(Cmd));
  186. }
  187. VOID
  188. FetchCsAbortFetch(
  189. IN PCOMMAND_PACKET Cmd,
  190. IN DWORD WStatus
  191. )
  192. /*++
  193. Routine Description:
  194. Out inbound partner has requested that we abort the fetch.
  195. The inbound partner sends this response when it is unable to generate
  196. or deliver the staging file due to a non-recoverable error. Currently
  197. this means any error NOT in the following list: (WIN_RETRY_FETCH() Macro)
  198. ERROR_SHARING_VIOLATION
  199. ERROR_DISK_FULL
  200. ERROR_HANDLE_DISK_FULL
  201. ERROR_DIR_NOT_EMPTY
  202. ERROR_OPLOCK_NOT_GRANTED
  203. ERROR_RETRY
  204. Typically we get an abort if the upstream partner has deleted the underlying
  205. file and the staging file associated with this change order has been
  206. cleaned up (e.g. the upstream partner has been stopped and restarted).
  207. Arguments:
  208. Cmd
  209. WStatus - Win32 status code.
  210. Return Value:
  211. None.
  212. --*/
  213. {
  214. #undef DEBSUB
  215. #define DEBSUB "FetchCsAbortFetch:"
  216. SET_COE_FLAG(RsCoe(Cmd), COE_FLAG_STAGE_ABORTED | COE_FLAG_STAGE_DELETED);
  217. ChgOrdInboundRetired(RsCoe(Cmd));
  218. RsCoe(Cmd) = NULL;
  219. FrsCompleteCommand(Cmd, WStatus);
  220. }
  221. VOID
  222. FetchCsReceivingStage(
  223. IN PCOMMAND_PACKET Cmd
  224. )
  225. /*++
  226. Routine Description:
  227. Put this data into the staging file
  228. TODO -- If the MD5 checksum was updated by the upstream member as a part of
  229. demand fetch stage file generation (see FetchCsSendStage()) then we need to
  230. propagate RsMd5Digest(Cmd) into the change order command so it can be
  231. updated in the IDTable when this Co retires. Need to decide the correct
  232. conditions under which this should happen.
  233. Arguments:
  234. Cmd
  235. Return Value:
  236. None.
  237. --*/
  238. {
  239. #undef DEBSUB
  240. #define DEBSUB "FetchCsReceivingStage:"
  241. DWORD WStatus;
  242. ULONG Flags;
  243. PWCHAR StagePath = NULL;
  244. PWCHAR FinalPath = NULL;
  245. HANDLE Handle = INVALID_HANDLE_VALUE;
  246. WIN32_FILE_ATTRIBUTE_DATA Attrs;
  247. STAGE_HEADER Header;
  248. PREPLICA Replica = NULL;
  249. CHANGE_ORDER_TRACE(3, RsCoe(Cmd), "Fetch Receiving");
  250. DPRINT1(4, "++ RsFileSize(Cmd).QuadPart: %08x %08x\n",
  251. PRINTQUAD(RsFileSize(Cmd).QuadPart));
  252. DPRINT1(4, "++ RsFileOffset(Cmd).QuadPart: %08x %08x\n",
  253. PRINTQUAD(RsFileOffset(Cmd).QuadPart));
  254. DPRINT1(4, "++ RsBlockSize(Cmd) : %08x %08x\n",
  255. PRINTQUAD(RsBlockSize(Cmd)));
  256. //
  257. // Acquire access to the staging file
  258. //
  259. Flags = STAGE_FLAG_RESERVE | STAGE_FLAG_EXCLUSIVE;
  260. if (CoCmdIsDirectory(RsCoc(Cmd))) {
  261. SetFlag(Flags, STAGE_FLAG_FORCERESERVE);
  262. }
  263. WStatus = StageAcquire(&RsCoc(Cmd)->ChangeOrderGuid,
  264. RsCoc(Cmd)->FileName,
  265. RsCoc(Cmd)->FileSize,
  266. &Flags,
  267. NULL);
  268. //
  269. // Retriable problem; discard
  270. //
  271. if (WIN_RETRY_FETCH(WStatus)) {
  272. CHANGE_ORDER_TRACEW(3, RsCoe(Cmd), "Fetch Receiving Retry", WStatus);
  273. FrsFetchCsSubmitTransfer(Cmd, CMD_RETRY_FETCH);
  274. return;
  275. }
  276. //
  277. // Unrecoverable error; abort (see FetchCsAbortFetch() for description.)
  278. //
  279. if (!WIN_SUCCESS(WStatus)) {
  280. CHANGE_ORDER_TRACEW(0, RsCoe(Cmd), "fetch Receiving Abort", WStatus);
  281. FetchCsAbortFetch(Cmd, WStatus);
  282. return;
  283. }
  284. if (RsFileOffset(Cmd).QuadPart == QUADZERO) {
  285. //
  286. // This is the first block of file data. It will have the stage header.
  287. // Read the header and get the compression guid for this stage file from
  288. // it. Block size is 64K max. 1st block will atleast have the complete header.
  289. // Check it just to make sure.
  290. //
  291. if (RsBlockSize(Cmd) >= sizeof(STAGE_HEADER)) {
  292. ZeroMemory(&Header, sizeof(STAGE_HEADER));
  293. CopyMemory(&Header, RsBlock(Cmd), sizeof(STAGE_HEADER));
  294. }
  295. if (!IS_GUID_ZERO(&Header.CompressionGuid)) {
  296. SET_COC_FLAG(RsCoc(Cmd), CO_FLAG_COMPRESSED_STAGE);
  297. } else {
  298. CLEAR_COC_FLAG(RsCoc(Cmd), CO_FLAG_COMPRESSED_STAGE);
  299. }
  300. }
  301. //
  302. // Get a handle to the staging file. Use a different prefix depending
  303. // on whether the stage file being sent is compressed or uncompressed.
  304. //
  305. if (COC_FLAG_ON(RsCoc(Cmd), CO_FLAG_COMPRESSED_STAGE)) {
  306. StagePath = StuCreStgPath(RsReplica(Cmd)->Stage, RsCoGuid(Cmd), STAGE_GENERATE_COMPRESSED_PREFIX);
  307. SetFlag(Flags, STAGE_FLAG_COMPRESSED);
  308. } else {
  309. StagePath = StuCreStgPath(RsReplica(Cmd)->Stage, RsCoGuid(Cmd), STAGE_GENERATE_PREFIX);
  310. }
  311. if ((Flags & STAGE_FLAG_DATA_PRESENT) ||
  312. (RsFileOffset(Cmd).QuadPart >= RsFileSize(Cmd).QuadPart)) {
  313. //
  314. // Data has arrived. Go complete the stage file final rename.
  315. //
  316. goto RESTART;
  317. }
  318. if (Flags & STAGE_FLAG_CREATING) {
  319. //
  320. // Make sure to truncate the staging file when our upstream
  321. // partner is sending (or resending) the first block of the
  322. // staging file.
  323. //
  324. // Without the truncation, BackupWrite() can AV if NtFrs
  325. // passes in garbage at the end of a too-large
  326. // staging file. A staging file may be too-large if the
  327. // preexisting file used to generate the local staging
  328. // file is smaller than the version of the same file our
  329. // partner wants to send.
  330. //
  331. // Alternatively, I could have truncated the staging file
  332. // after receiving the last block but this code change is less
  333. // risk and is just as effective.
  334. //
  335. if (RsFileOffset(Cmd).QuadPart == QUADZERO) {
  336. ClearFlag(Flags, STAGE_FLAG_CREATING | STAGE_FLAG_CREATED | STAGE_FLAG_DATA_PRESENT);
  337. } else {
  338. //
  339. // See if the staging file exists. If not, set the flags
  340. // to create it.
  341. //
  342. StuOpenFile(StagePath, GENERIC_READ | GENERIC_WRITE, &Handle);
  343. if (!HANDLE_IS_VALID(Handle)) {
  344. ClearFlag(Flags, STAGE_FLAG_CREATING | STAGE_FLAG_CREATED | STAGE_FLAG_DATA_PRESENT);
  345. }
  346. }
  347. }
  348. if (!(Flags & STAGE_FLAG_CREATING)) {
  349. CHANGE_ORDER_TRACE(3, RsCoe(Cmd), "Fetch Receiving Generate Stage");
  350. //
  351. // No longer have a staging file; digest invalid
  352. //
  353. RsMd5Digest(Cmd) = FrsFree(RsMd5Digest(Cmd));
  354. //
  355. // Create and allocate disk space
  356. //
  357. WStatus = StuCreateFile(StagePath, &Handle);
  358. if (!HANDLE_IS_VALID(Handle) || !WIN_SUCCESS(WStatus)) {
  359. goto ERROUT;
  360. }
  361. WStatus = FrsSetFilePointer(StagePath, Handle, RsFileSize(Cmd).HighPart,
  362. RsFileSize(Cmd).LowPart);
  363. CLEANUP1_WS(0, "++ SetFilePointer failed on %ws;", StagePath, WStatus, ERROUT);
  364. WStatus = FrsSetEndOfFile(StagePath, Handle);
  365. CLEANUP1_WS(0, "++ SetEndOfFile failed on %ws;", StagePath, WStatus, ERROUT);
  366. //
  367. // File was deleted during the fetch; start over
  368. //
  369. if (RsFileOffset(Cmd).QuadPart != QUADZERO) {
  370. CHANGE_ORDER_TRACE(3, RsCoe(Cmd), "Fetch Receiving Restart");
  371. RsFileOffset(Cmd).QuadPart = QUADZERO;
  372. RsBlock(Cmd) = FrsFree(RsBlock(Cmd));
  373. RsBlockSize(Cmd) = QUADZERO;
  374. goto RESTART;
  375. }
  376. }
  377. //
  378. // Seek to the offset for this block
  379. //
  380. WStatus = FrsSetFilePointer(StagePath, Handle, RsFileOffset(Cmd).HighPart,
  381. RsFileOffset(Cmd).LowPart);
  382. CLEANUP1_WS(0, "++ SetFilePointer failed on %ws;", StagePath, WStatus, ERROUT);
  383. //
  384. // write the file and update the offset for the next block
  385. //
  386. WStatus = StuWriteFile(StagePath, Handle, RsBlock(Cmd), (ULONG)RsBlockSize(Cmd));
  387. CLEANUP1_WS(0, "++ WriteFile failed on %ws;", StagePath, WStatus, ERROUT);
  388. //
  389. // Increment the counter Bytes of staging Fetched
  390. //
  391. Replica = RsCoe(Cmd)->NewReplica;
  392. PM_INC_CTR_REPSET(Replica, SFFetchedB, RsBlockSize(Cmd));
  393. RESTART:
  394. FrsFlushFile(StagePath, Handle);
  395. FRS_CLOSE(Handle);
  396. if ((RsFileOffset(Cmd).QuadPart + RsBlockSize(Cmd)) >= RsFileSize(Cmd).QuadPart) {
  397. //
  398. // All the stage file data is here. Do the final rename.
  399. //
  400. SetFlag(Flags, STAGE_FLAG_DATA_PRESENT | STAGE_FLAG_RERESERVE);
  401. if (COC_FLAG_ON(RsCoc(Cmd), CO_FLAG_COMPRESSED_STAGE)) {
  402. FinalPath = StuCreStgPath(RsReplica(Cmd)->Stage, RsCoGuid(Cmd), STAGE_FINAL_COMPRESSED_PREFIX);
  403. } else {
  404. FinalPath = StuCreStgPath(RsReplica(Cmd)->Stage, RsCoGuid(Cmd), STAGE_FINAL_PREFIX);
  405. }
  406. if (!MoveFileEx(StagePath,
  407. FinalPath,
  408. MOVEFILE_WRITE_THROUGH | MOVEFILE_REPLACE_EXISTING)) {
  409. WStatus = GetLastError();
  410. } else {
  411. WStatus = ERROR_SUCCESS;
  412. }
  413. if (!WIN_SUCCESS(WStatus)) {
  414. CHANGE_ORDER_TRACEW(3, RsCoe(Cmd), "Fetch Receiving Rename fail", WStatus);
  415. DPRINT2_WS(0, "++ Can't move fetched %ws to %ws;",
  416. StagePath, FinalPath, WStatus);
  417. FinalPath = FrsFree(FinalPath);
  418. goto ERROUT;
  419. }
  420. //
  421. // Stage file with final name is in place and ready to install
  422. // and/or deliver to our downstream partners.
  423. //
  424. SetFlag(Flags, STAGE_FLAG_CREATED | STAGE_FLAG_INSTALLING);
  425. }
  426. //
  427. // The last block isn't officially "written" into the staging file
  428. // until the above rename finishes. That is because the write of
  429. // the last byte of the staging file signifies "all done" to the
  430. // replica command server (replica.c).
  431. //
  432. RsFileOffset(Cmd).QuadPart += RsBlockSize(Cmd);
  433. //
  434. // This block has been successfully transferred; free the buffer now
  435. //
  436. FrsFree(StagePath);
  437. FrsFree(FinalPath);
  438. RsBlock(Cmd) = FrsFree(RsBlock(Cmd));
  439. RsBlockSize(Cmd) = QUADZERO;
  440. //
  441. // Release staging resources
  442. //
  443. SetFlag(Flags, STAGE_FLAG_CREATING);
  444. if (!IS_GUID_ZERO(&Header.CompressionGuid)) {
  445. StageRelease(&RsCoc(Cmd)->ChangeOrderGuid,
  446. RsCoc(Cmd)->FileName,
  447. Flags | STAGE_FLAG_COMPRESSED |
  448. STAGE_FLAG_COMPRESSION_FORMAT_KNOWN,
  449. &(RsFileOffset(Cmd).QuadPart),
  450. &Header.CompressionGuid);
  451. } else {
  452. StageRelease(&RsCoc(Cmd)->ChangeOrderGuid,
  453. RsCoc(Cmd)->FileName,
  454. Flags,
  455. &(RsFileOffset(Cmd).QuadPart),
  456. NULL);
  457. }
  458. RcsSubmitTransferToRcs(Cmd, CMD_RECEIVED_STAGE);
  459. return;
  460. ERROUT:
  461. //
  462. // Discard local state
  463. //
  464. FRS_CLOSE(Handle);
  465. FrsFree(StagePath);
  466. if (!IS_GUID_ZERO(&Header.CompressionGuid)) {
  467. StageRelease(&RsCoc(Cmd)->ChangeOrderGuid,
  468. RsCoc(Cmd)->FileName,
  469. Flags | STAGE_FLAG_COMPRESSED |
  470. STAGE_FLAG_COMPRESSION_FORMAT_KNOWN,
  471. NULL,
  472. &Header.CompressionGuid);
  473. } else {
  474. StageRelease(&RsCoc(Cmd)->ChangeOrderGuid, RsCoc(Cmd)->FileName, Flags, NULL, NULL);
  475. }
  476. //
  477. // Pretend it is retriable
  478. //
  479. CHANGE_ORDER_TRACE(3, RsCoe(Cmd), "Fetch Receiving Retry on Error");
  480. FrsFetchCsSubmitTransfer(Cmd, CMD_RETRY_FETCH);
  481. }
  482. VOID
  483. FetchCsSendStage(
  484. IN PCOMMAND_PACKET Cmd
  485. )
  486. /*++
  487. Routine Description:
  488. Send the local staging file to the requesting outbound partner.
  489. Arguments:
  490. Cmd
  491. Return Value:
  492. None.
  493. --*/
  494. {
  495. #undef DEBSUB
  496. #define DEBSUB "FetchCsSendStage:"
  497. ULONGLONG GeneratedSize = 0;
  498. FILE_NETWORK_OPEN_INFORMATION Attrs;
  499. PCHANGE_ORDER_COMMAND Coc = RsPartnerCoc(Cmd);
  500. GUID *CoGuid;
  501. PWCHAR FileName;
  502. ULONG Flags;
  503. DWORD WStatus;
  504. DWORD BytesRead;
  505. USN Usn = 0;
  506. PWCHAR StagePath = NULL;
  507. HANDLE Handle = INVALID_HANDLE_VALUE;
  508. BOOL Md5Valid = FALSE;
  509. MD5_CTX Md5;
  510. GUID CompressionFormatUsed;
  511. PREPLICA Replica = RsReplica(Cmd);
  512. PCXTION OutCxtion;
  513. STAGE_HEADER Header;
  514. CHANGE_ORDER_COMMAND_TRACE(3, Coc, "Fetch Send");
  515. ZeroMemory(&CompressionFormatUsed, sizeof(GUID));
  516. //
  517. // Even if the file is 0 bytes in length, the staging file will
  518. // always have at least the header. There are some retry paths
  519. // that will incorrectly think the staging file has been fetched
  520. // if RsFileSize(Cmd) is 0. So make sure it isn't.
  521. //
  522. if (RsFileSize(Cmd).QuadPart == QUADZERO) {
  523. RsFileSize(Cmd).QuadPart = Coc->FileSize;
  524. if (RsFileSize(Cmd).QuadPart == QUADZERO) {
  525. RsFileSize(Cmd).QuadPart = sizeof(STAGE_HEADER);
  526. }
  527. }
  528. CoGuid = &Coc->ChangeOrderGuid;
  529. FileName = Coc->FileName;
  530. //
  531. // Acquire shared access to the staging file
  532. //
  533. Flags = 0;
  534. WStatus = StageAcquire(CoGuid, FileName, RsFileSize(Cmd).QuadPart,
  535. &Flags, &CompressionFormatUsed);
  536. if (!WIN_SUCCESS(WStatus) || !(Flags & STAGE_FLAG_CREATED)) {
  537. //
  538. // Acquire exclusive access to the file
  539. //
  540. if (WIN_SUCCESS(WStatus)) {
  541. StageRelease(CoGuid, FileName, Flags, NULL, NULL);
  542. }
  543. Flags = STAGE_FLAG_RESERVE | STAGE_FLAG_EXCLUSIVE;
  544. if (CoCmdIsDirectory(Coc)) {
  545. SetFlag(Flags, STAGE_FLAG_FORCERESERVE);
  546. }
  547. WStatus = StageAcquire(CoGuid, FileName, RsFileSize(Cmd).QuadPart,
  548. &Flags, &CompressionFormatUsed);
  549. }
  550. //
  551. // Retry fetch when fetch retry trigger hits
  552. //
  553. PULL_FETCH_RETRY_TRIGGER(Coc, WStatus, Flags);
  554. //
  555. // Retriable problem; do so
  556. //
  557. if (WIN_RETRY_FETCH(WStatus)) {
  558. CHANGE_ORDER_COMMAND_TRACEW(3, Coc, "Fetch Send Retry Cmd", WStatus);
  559. if (FetchCsDelCsSubmit(Cmd, FALSE)) {
  560. return;
  561. }
  562. CHANGE_ORDER_COMMAND_TRACEW(3, Coc, "Fetch Send Retry Co", WStatus);
  563. RcsSubmitTransferToRcs(Cmd, CMD_SEND_RETRY_FETCH);
  564. return;
  565. }
  566. //
  567. // Unretriable problem; abort
  568. //
  569. if (!WIN_SUCCESS(WStatus)) {
  570. CHANGE_ORDER_COMMAND_TRACEW(3, Coc, "Fetch Send Abort", WStatus);
  571. RcsSubmitTransferToRcs(Cmd, CMD_SEND_ABORT_FETCH);
  572. return;
  573. }
  574. //
  575. // Create the staging file, if needed
  576. //
  577. if (!(Flags & STAGE_FLAG_CREATED)) {
  578. CHANGE_ORDER_COMMAND_TRACE(3, Coc, "Fetch Send Gen Stage");
  579. //
  580. // Make sure we start at the beginning of the staging file
  581. //
  582. RsFileOffset(Cmd).QuadPart = QUADZERO;
  583. //
  584. // Create the staging file.
  585. //
  586. if (RsMd5Digest(Cmd)) {
  587. //
  588. // The requesting downstream partner had a pre-exisitng file
  589. // and included an Md5 digest in the fetch request. So calc
  590. // the MD5 digest as we generate the staging file.
  591. //
  592. WStatus = StuGenerateStage(Coc, NULL, FALSE, &Md5, &GeneratedSize,
  593. &CompressionFormatUsed);
  594. Md5Valid = TRUE;
  595. } else {
  596. WStatus = StuGenerateStage(Coc, NULL, FALSE, NULL, &GeneratedSize,
  597. &CompressionFormatUsed);
  598. }
  599. //
  600. // Release staging resources if error
  601. //
  602. if (!WIN_SUCCESS(WStatus)) {
  603. StageDeleteFile(Coc, FALSE);
  604. StageRelease(CoGuid, FileName, STAGE_FLAG_UNRESERVE, NULL, NULL);
  605. } else {
  606. //
  607. // Increment the staging files regenerated counter
  608. //
  609. PREPLICA NewReplica = ReplicaIdToAddr(Coc->NewReplicaNum);
  610. PM_INC_CTR_REPSET(NewReplica, SFReGenerated, 1);
  611. }
  612. //
  613. // Retriable problem; do so
  614. //
  615. if (WIN_RETRY_FETCH(WStatus)) {
  616. CHANGE_ORDER_COMMAND_TRACE(3, Coc, "Fetch Send Gen Stage Retry Cmd");
  617. if (FetchCsDelCsSubmit(Cmd, FALSE)) {
  618. return;
  619. }
  620. CHANGE_ORDER_COMMAND_TRACEW(3, Coc, "Fetch Send Gen Stage Retry Co", WStatus);
  621. RcsSubmitTransferToRcs(Cmd, CMD_SEND_RETRY_FETCH);
  622. return;
  623. }
  624. //
  625. // Unretriable problem; abort
  626. //
  627. if (!WIN_SUCCESS(WStatus)) {
  628. CHANGE_ORDER_COMMAND_TRACEW(3, Coc, "Fetch Send Gen Stage Abort", WStatus);
  629. RcsSubmitTransferToRcs(Cmd, CMD_SEND_ABORT_FETCH);
  630. return;
  631. }
  632. if (!IS_GUID_ZERO(&CompressionFormatUsed)) {
  633. SetFlag(Flags, (STAGE_FLAG_DATA_PRESENT |
  634. STAGE_FLAG_CREATED | STAGE_FLAG_INSTALLING |
  635. STAGE_FLAG_INSTALLED | STAGE_FLAG_RERESERVE |
  636. STAGE_FLAG_COMPRESSED | STAGE_FLAG_COMPRESSION_FORMAT_KNOWN));
  637. } else {
  638. SetFlag(Flags, (STAGE_FLAG_DATA_PRESENT |
  639. STAGE_FLAG_CREATED | STAGE_FLAG_INSTALLING |
  640. STAGE_FLAG_INSTALLED | STAGE_FLAG_RERESERVE));
  641. }
  642. }
  643. //
  644. // ERROUT is now valid
  645. //
  646. //
  647. // Open the file
  648. //
  649. if (COC_FLAG_ON(Coc, CO_FLAG_COMPRESSED_STAGE) && (Flags & STAGE_FLAG_COMPRESSED) ) {
  650. StagePath = StuCreStgPath(RsReplica(Cmd)->Stage, RsCoGuid(Cmd), STAGE_FINAL_COMPRESSED_PREFIX);
  651. if (!(Flags & STAGE_FLAG_COMPRESSION_FORMAT_KNOWN)) {
  652. //
  653. // Compression format is not known and should be zero. Read from stage header.
  654. //
  655. FRS_ASSERT(IS_GUID_ZERO(&CompressionFormatUsed));
  656. StuOpenFile(StagePath, GENERIC_READ, &Handle);
  657. if (!HANDLE_IS_VALID(Handle)) {
  658. goto ERROUT;
  659. }
  660. if (!StuReadBlockFile(StagePath, Handle, &Header, sizeof(STAGE_HEADER))) {
  661. goto ERROUT;
  662. }
  663. COPY_GUID(&CompressionFormatUsed, &Header.CompressionGuid);
  664. SetFlag(Flags, STAGE_FLAG_COMPRESSED);
  665. SetFlag(Flags, STAGE_FLAG_COMPRESSION_FORMAT_KNOWN);
  666. }
  667. //
  668. // There is a compressed staging file for this change order. Check if the
  669. // outbound partner understands this compression format.
  670. //
  671. LOCK_CXTION_TABLE(Replica);
  672. OutCxtion = GTabLookupNoLock(Replica->Cxtions, RsCxtion(Cmd)->Guid, NULL);
  673. //
  674. // This connection does not exist any more.
  675. //
  676. if (OutCxtion == NULL) {
  677. UNLOCK_CXTION_TABLE(Replica);
  678. goto ERROUT;
  679. }
  680. if (!GTabIsEntryPresent(OutCxtion->CompressionTable, &CompressionFormatUsed, NULL)) {
  681. //
  682. // The outbound partner does not understand this compression format.
  683. //
  684. //
  685. // Unlock the cxtion table here so we do not hold the lock while generating
  686. // the staging file.
  687. //
  688. UNLOCK_CXTION_TABLE(Replica);
  689. StagePath = FrsFree(StagePath);
  690. FRS_CLOSE(Handle);
  691. StagePath = StuCreStgPath(RsReplica(Cmd)->Stage, RsCoGuid(Cmd), STAGE_FINAL_PREFIX);
  692. if (!(Flags & STAGE_FLAG_DECOMPRESSED)) {
  693. //
  694. // The the file is not decompressed yet. Create decompressed staging file.
  695. // Acquire exclusive access to the file if we didn't get it above.
  696. // Case is Stage file exists as compressed so we don't get exclusive
  697. // access above.
  698. //
  699. if (!BooleanFlagOn(Flags, STAGE_FLAG_EXCLUSIVE)) {
  700. StageRelease(CoGuid, FileName, Flags, NULL, &CompressionFormatUsed);
  701. Flags = STAGE_FLAG_RESERVE | STAGE_FLAG_EXCLUSIVE;
  702. if (CoCmdIsDirectory(Coc)) {
  703. SetFlag(Flags, STAGE_FLAG_FORCERESERVE);
  704. }
  705. WStatus = StageAcquire(CoGuid, FileName, RsFileSize(Cmd).QuadPart,
  706. &Flags, NULL);
  707. CLEANUP_WS(0,"Error acquiring exclusive access for creating a decompressed staging file.",
  708. WStatus, ERROUT_NOACQUIRE);
  709. }
  710. CHANGE_ORDER_COMMAND_TRACE(3, Coc, "Decompressing stage for downlevel partner");
  711. WStatus = StuGenerateDecompressedStage(RsReplica(Cmd)->Stage, RsCoGuid(Cmd), &CompressionFormatUsed);
  712. CLEANUP_WS(0,"Error generating decompressed staging file.", WStatus, ERROUT);
  713. SetFlag(Flags, STAGE_FLAG_DECOMPRESSED);
  714. CLEAR_COC_FLAG(Coc, CO_FLAG_COMPRESSED_STAGE);
  715. }
  716. } else {
  717. UNLOCK_CXTION_TABLE(Replica);
  718. }
  719. } else {
  720. StagePath = StuCreStgPath(RsReplica(Cmd)->Stage, RsCoGuid(Cmd), STAGE_FINAL_PREFIX);
  721. }
  722. if (!HANDLE_IS_VALID(Handle)) {
  723. StuOpenFile(StagePath, GENERIC_READ, &Handle);
  724. }
  725. if (!HANDLE_IS_VALID(Handle)) {
  726. goto ERROUT;
  727. }
  728. if (RsFileOffset(Cmd).QuadPart == QUADZERO) {
  729. //
  730. // This is the first request for this file; Fill in the file size
  731. //
  732. if (!FrsGetFileInfoByHandle(StagePath, Handle, &Attrs)) {
  733. goto ERROUT;
  734. }
  735. RsFileSize(Cmd) = Attrs.EndOfFile;
  736. }
  737. if (Md5Valid) {
  738. if (MD5_EQUAL(Md5.digest, RsMd5Digest(Cmd))) {
  739. //
  740. // MD5 digest matches so downstream partner's file is good.
  741. // Set the offset to the size of the stage file so we don't send
  742. // any data.
  743. //
  744. RsFileOffset(Cmd).QuadPart = RsFileSize(Cmd).QuadPart;
  745. CHANGE_ORDER_COMMAND_TRACE(3, Coc, "Fetch Send Md5 matches, do not send");
  746. } else {
  747. CHANGE_ORDER_COMMAND_TRACE(3, Coc, "Fetch Send Md5 mismatch, send");
  748. //
  749. // Update the MD5 checksum in the cmd so we can send it downstream.
  750. //
  751. CopyMemory(RsMd5Digest(Cmd), Md5.digest, MD5DIGESTLEN);
  752. }
  753. }
  754. //
  755. // Calculate the block size of the next data chunk to deliver.
  756. //
  757. RsBlockSize(Cmd) = QUADZERO;
  758. if (RsFileOffset(Cmd).QuadPart < RsFileSize(Cmd).QuadPart) {
  759. //
  760. // Calc bytes left in file.
  761. //
  762. RsBlockSize(Cmd) = RsFileSize(Cmd).QuadPart - RsFileOffset(Cmd).QuadPart;
  763. //
  764. // But not more than max block size.
  765. //
  766. if (RsBlockSize(Cmd) > FETCHCS_MAX_BLOCK_SIZE) {
  767. RsBlockSize(Cmd) = FETCHCS_MAX_BLOCK_SIZE;
  768. }
  769. }
  770. //
  771. // If data left to deliver, allocate a buffer, seek to the block offset in
  772. // the file and read the data.
  773. //
  774. RsBlock(Cmd) = NULL;
  775. if (RsBlockSize(Cmd) > QUADZERO) {
  776. RsBlock(Cmd) = FrsAlloc((ULONG)RsBlockSize(Cmd));
  777. WStatus = FrsSetFilePointer(StagePath, Handle, RsFileOffset(Cmd).HighPart,
  778. RsFileOffset(Cmd).LowPart);
  779. CLEANUP1_WS(0, "++ SetFilePointer failed on %ws;", StagePath, WStatus, ERROUT);
  780. if (!StuReadBlockFile(StagePath, Handle, RsBlock(Cmd), (ULONG)RsBlockSize(Cmd))) {
  781. goto ERROUT;
  782. }
  783. }
  784. //
  785. // Done, transfer to the replica set command server
  786. //
  787. FRS_CLOSE(Handle);
  788. FrsFree(StagePath);
  789. if (!IS_GUID_ZERO(&CompressionFormatUsed)) {
  790. StageRelease(CoGuid, FileName, Flags, &GeneratedSize, &CompressionFormatUsed);
  791. } else {
  792. StageRelease(CoGuid, FileName, Flags, &GeneratedSize, NULL);
  793. }
  794. RcsSubmitTransferToRcs(Cmd, CMD_SENDING_STAGE);
  795. return;
  796. ERROUT:
  797. //
  798. // Delete the staging file, if possible. Don't delete a staging
  799. // file that has not been installed (it cannot be regenerated!).
  800. //
  801. if (Flags & STAGE_FLAG_INSTALLED) {
  802. //
  803. // Get exclusive access
  804. //
  805. WStatus = ERROR_SUCCESS;
  806. if (!(Flags & STAGE_FLAG_EXCLUSIVE)) {
  807. StageRelease(CoGuid, FileName, Flags, &GeneratedSize, NULL);
  808. Flags = STAGE_FLAG_RESERVE | STAGE_FLAG_EXCLUSIVE;
  809. if (CoCmdIsDirectory(Coc)) {
  810. SetFlag(Flags, STAGE_FLAG_FORCERESERVE);
  811. }
  812. WStatus = StageAcquire(CoGuid, FileName, Coc->FileSize, &Flags, NULL);
  813. }
  814. if (WIN_SUCCESS(WStatus)) {
  815. //
  816. // Discard the current staging file
  817. //
  818. StageDeleteFile(Coc, FALSE);
  819. StageRelease(CoGuid, FileName, STAGE_FLAG_UNRESERVE, NULL, NULL);
  820. //
  821. // Make sure we start over at the beginning of the staging file
  822. //
  823. RsFileOffset(Cmd).QuadPart = QUADZERO;
  824. }
  825. } else {
  826. StageRelease(CoGuid, FileName, Flags, &GeneratedSize, NULL);
  827. }
  828. ERROUT_NOACQUIRE:
  829. FRS_CLOSE(Handle);
  830. if (StagePath) {
  831. FrsFree(StagePath);
  832. }
  833. RsBlock(Cmd) = FrsFree(RsBlock(Cmd));
  834. RsBlockSize(Cmd) = QUADZERO;
  835. CHANGE_ORDER_COMMAND_TRACE(3, Coc, "Fetch Send Retry on Error");
  836. if (FetchCsDelCsSubmit(Cmd, FALSE)) {
  837. return;
  838. }
  839. CHANGE_ORDER_COMMAND_TRACE(3, Coc, "Fetch Send Retry on Error");
  840. RcsSubmitTransferToRcs(Cmd, CMD_SEND_RETRY_FETCH);
  841. }
  842. DWORD
  843. MainFetchCs(
  844. PVOID Arg
  845. )
  846. /*++
  847. Routine Description:
  848. Entry point for a thread serving the Staging area Command Server.
  849. Arguments:
  850. Arg - thread
  851. Return Value:
  852. None.
  853. --*/
  854. {
  855. #undef DEBSUB
  856. #define DEBSUB "MainFetchCs:"
  857. DWORD WStatus = ERROR_SUCCESS;
  858. PCOMMAND_PACKET Cmd;
  859. PFRS_THREAD FrsThread = (PFRS_THREAD)Arg;
  860. //
  861. // Thread is pointing at the correct command server
  862. //
  863. FRS_ASSERT(FrsThread->Data == &FetchCs);
  864. FrsThread->Exit = ThSupExitWithTombstone;
  865. //
  866. // Try-Finally
  867. //
  868. try {
  869. //
  870. // Capture exception.
  871. //
  872. try {
  873. //
  874. // Pull entries off the queue and process them
  875. //
  876. cant_exit_yet:
  877. while (Cmd = FrsGetCommandServer(&FetchCs)) {
  878. switch (Cmd->Command) {
  879. case CMD_SEND_STAGE:
  880. DPRINT1(5, "Fetch: command send stage %08x\n", Cmd);
  881. FetchCsSendStage(Cmd);
  882. break;
  883. case CMD_RECEIVING_STAGE:
  884. DPRINT1(5, "Fetch: command receiving stage %08x\n", Cmd);
  885. FetchCsReceivingStage(Cmd);
  886. break;
  887. case CMD_RETRY_FETCH:
  888. DPRINT1(5, "Fetch: command retry fetch %08x\n", Cmd);
  889. FetchCsRetryFetch(Cmd);
  890. break;
  891. case CMD_ABORT_FETCH:
  892. DPRINT1(5, "Fetch: command abort fetch %08x\n", Cmd);
  893. CHANGE_ORDER_TRACEW(0, RsCoe(Cmd), "Aborting fetch", ERROR_SUCCESS);
  894. FetchCsAbortFetch(Cmd, ERROR_SUCCESS);
  895. break;
  896. default:
  897. DPRINT1(0, "Staging File Fetch: unknown command 0x%x\n", Cmd->Command);
  898. FrsCompleteCommand(Cmd, ERROR_INVALID_FUNCTION);
  899. break;
  900. }
  901. }
  902. //
  903. // Exit
  904. //
  905. FrsExitCommandServer(&FetchCs, FrsThread);
  906. goto cant_exit_yet;
  907. //
  908. // Get exception status.
  909. //
  910. } except (EXCEPTION_EXECUTE_HANDLER) {
  911. GET_EXCEPTION_CODE(WStatus);
  912. }
  913. } finally {
  914. if (WIN_SUCCESS(WStatus)) {
  915. if (AbnormalTermination()) {
  916. WStatus = ERROR_OPERATION_ABORTED;
  917. }
  918. }
  919. DPRINT_WS(0, "MainFetchCs finally.", WStatus);
  920. //
  921. // Trigger FRS shutdown if we terminated abnormally.
  922. //
  923. if (!WIN_SUCCESS(WStatus)) {
  924. DPRINT(0, "MainFetchCs terminated abnormally, forcing service shutdown.\n");
  925. FrsIsShuttingDown = TRUE;
  926. SetEvent(ShutDownEvent);
  927. }
  928. }
  929. return (WStatus);
  930. }
  931. VOID
  932. FrsFetchCsInitialize(
  933. VOID
  934. )
  935. /*++
  936. Routine Description:
  937. Initialize the staging file fetcher
  938. Arguments:
  939. None.
  940. Return Value:
  941. None.
  942. --*/
  943. {
  944. #undef DEBSUB
  945. #define DEBSUB "FetchCsInitialize:"
  946. //
  947. // Initialize the command server
  948. //
  949. CfgRegReadDWord(FKC_MAX_STAGE_FETCHCS_THREADS, NULL, 0, &MaxFetchCsThreads);
  950. FrsInitializeCommandServer(&FetchCs, MaxFetchCsThreads, L"FetchCs", MainFetchCs);
  951. }
  952. VOID
  953. ShutDownFetchCs(
  954. VOID
  955. )
  956. /*++
  957. Routine Description:
  958. Shutdown the staging file fetcher command server.
  959. Arguments:
  960. None.
  961. Return Value:
  962. None.
  963. --*/
  964. {
  965. #undef DEBSUB
  966. #define DEBSUB "ShutDownFetchCs:"
  967. FrsRunDownCommandServer(&FetchCs, &FetchCs.Queue);
  968. }
  969. VOID
  970. FrsFetchCsSubmitTransfer(
  971. IN PCOMMAND_PACKET Cmd,
  972. IN USHORT Command
  973. )
  974. /*++
  975. Routine Description:
  976. Transfer a request to the staging file generator
  977. Arguments:
  978. Cmd
  979. Return Value:
  980. None.
  981. --*/
  982. {
  983. #undef DEBSUB
  984. #define DEBSUB "FrsFetchCsSubmitTransfer:"
  985. //
  986. // Submit a request to allocate staging area
  987. //
  988. Cmd->TargetQueue = &FetchCs.Queue;
  989. Cmd->Command = Command;
  990. RsTimeout(Cmd) = 0;
  991. DPRINT1(5, "Fetch: submit 0x%x\n", Cmd);
  992. FrsSubmitCommandServer(&FetchCs, Cmd);
  993. }