Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1345 lines
38 KiB

  1. /*++
  2. Copyright (c) 1997-1999 Microsoft Corporation
  3. Module Name:
  4. frssndcs.c
  5. Abstract:
  6. This command server sends packets over the comm layer.
  7. SndCsInitialize: - Alloc handle table, read reg pars, Create/init SndCs,
  8. Alloc comm queue array, attach comm queues to SndCs
  9. control queue.
  10. SndCsUnInitialize: - Free handle table, delete comm queues.
  11. SndCsShutDown: - Run Down Comm queues, Run down SndCs.
  12. SndCsExit: - Cancel all RPC calls on the FrsThread->Handle.
  13. SndCsAssignCommQueue: - Assign a comm queue to cxtion.
  14. SndCsCreateCxtion: - Create a join Guid for connection, assign comm queue
  15. SndCsDestroyCxtion: - Invalidate cxtion join guid and umbind RPC handle
  16. SndCsUnBindHandles: - Unbind all RPC handles associated with given target server
  17. SndCsCxtionTimeout: - No activity on this connection, request an UNJOIN.
  18. SndCsCheckCxtion: - Check that the join guid is still valid, set the timer if needed.
  19. SndCsDispatchCommError: - Transfer a comm packet to the appropriate command server
  20. for error processing.
  21. SndCsCommTimeout: - Cancel hung RPC send threads and age the RPC handle cache.
  22. SndCsSubmitCommPkt: - Submit comm packet to the Send Cs comm queue for the
  23. target Cxtion.
  24. SndCsSubmitCommPkt2: - Ditto (with arg variation)
  25. SndCsSubmitCmd: - Used for submitting a CMD_JOINING_AFTER_FLUSH to a Send Cs queue.
  26. SndCsMain: - Send command server processing loop. Dispatches requests
  27. off the comm queues.
  28. Author:
  29. Billy J. Fuller 28-May-1997
  30. Environment
  31. User mode winnt
  32. --*/
  33. #include <ntreppch.h>
  34. #pragma hdrstop
  35. #include <frs.h>
  36. #include <perrepsr.h>
  37. //
  38. // Struct for the Send Command Server
  39. // Contains info about the queues and the threads
  40. //
  41. COMMAND_SERVER SndCs;
  42. //
  43. // Comm queues are attached to the SndCs command server above.
  44. // A cxtion is assigned a comm queue when its creates or assigns a join guid
  45. // (session). The cxtion uses that comm queue for as long as the join guid is
  46. // valid. This insures packet order through the comm layer.
  47. //
  48. // Reserve comm queue 0 for join requests to partners whose previous rpc call
  49. // took longer than MinJoinRetry to error off.
  50. //
  51. #define MAX_COMM_QUEUE_NUMBER (32)
  52. FRS_QUEUE CommQueues[MAX_COMM_QUEUE_NUMBER];
  53. DWORD CommQueueRoundRobin = 1;
  54. //
  55. // Cxtion times out if partner response takes too long
  56. //
  57. DWORD CommTimeoutInMilliSeconds; // timeout in msec
  58. ULONGLONG CommTimeoutCheck; // timeout in 100 nsec units
  59. //
  60. // rpc handle cache.
  61. //
  62. // Each entry contains a connection guid and a list of handles protected by
  63. // a lock. Each comm packet sent to a given connection first tries to get
  64. // previously bound handle from the handle cache, creating a new one if necc.
  65. //
  66. // Note: DAO, I don't understand why this is needed, Mario says RPC already
  67. // allows multiple RPC calls on the same binding handle. Ask Billy.
  68. //
  69. PGEN_TABLE GHandleTable;
  70. VOID
  71. CommCompletionRoutine(
  72. PCOMMAND_PACKET,
  73. PVOID
  74. );
  75. VOID
  76. FrsCreateJoinGuid(
  77. OUT GUID *OutGuid
  78. );
  79. VOID
  80. FrsDelCsCompleteSubmit(
  81. IN PCOMMAND_PACKET DelCmd,
  82. IN ULONG Timeout
  83. );
  84. PFRS_THREAD
  85. ThSupEnumThreads(
  86. PFRS_THREAD FrsThread
  87. );
  88. VOID
  89. SndCsUnBindHandles(
  90. IN PGNAME To
  91. )
  92. /*++
  93. Routine Description:
  94. Unbind any handles associated with To in preparation for "join".
  95. Arguments:
  96. To
  97. Return Value:
  98. None.
  99. --*/
  100. {
  101. #undef DEBSUB
  102. #define DEBSUB "SndCsUnBindHandles:"
  103. PGHANDLE GHandle;
  104. PHANDLE_LIST HandleList;
  105. DPRINT1(4, "Unbinding all handles for %ws\n", To->Name);
  106. //
  107. // Find the anchor for all of the bound, rpc handles to the server "To"
  108. //
  109. GHandle = GTabLookup(GHandleTable, To->Guid, NULL);
  110. if (GHandle == NULL) {
  111. return;
  112. }
  113. //
  114. // Unbind the handles
  115. //
  116. EnterCriticalSection(&GHandle->Lock);
  117. while (HandleList = GHandle->HandleList) {
  118. GHandle->HandleList = HandleList->Next;
  119. FrsRpcUnBindFromServer(&HandleList->RpcHandle);
  120. FrsFree(HandleList);
  121. }
  122. LeaveCriticalSection(&GHandle->Lock);
  123. }
  124. DWORD
  125. SndCsAssignCommQueue(
  126. VOID
  127. )
  128. /*++
  129. Routine Description:
  130. A cxtion is assigned a comm queue when its creates or assigns a join guid
  131. (session). The cxtion uses that comm queue for as long as the join guid is
  132. valid. This insures packet order through the comm layer. Old packets have
  133. an invalid join guid and are either not sent or ignored on the receiving side.
  134. Reserve comm queue 0 for join requests to partners whose previous rpc call
  135. took longer than MinJoinRetry to error off.
  136. Arguments:
  137. None.
  138. Return Value:
  139. Comm queue number (1 .. MAX_COMM_QUEUE_NUMBER - 1).
  140. --*/
  141. {
  142. #undef DEBSUB
  143. #define DEBSUB "SndCsAssignCommQueue:"
  144. DWORD CommQueueIndex;
  145. //
  146. // Pseudo round robin. Avoid locks by checking bounds.
  147. //
  148. CommQueueIndex = CommQueueRoundRobin++;
  149. if (CommQueueRoundRobin >= MAX_COMM_QUEUE_NUMBER) {
  150. CommQueueRoundRobin = 1;
  151. }
  152. if (CommQueueIndex >= MAX_COMM_QUEUE_NUMBER) {
  153. CommQueueIndex = 1;
  154. }
  155. DPRINT1(4, "Assigned Comm Queue %d\n", CommQueueIndex);
  156. return CommQueueIndex;
  157. }
  158. VOID
  159. SndCsCreateCxtion(
  160. IN OUT PCXTION Cxtion
  161. )
  162. /*++
  163. Routine Description:
  164. Create a new join guid and comm queue for this cxtion.
  165. Assumes: Caller has CXTION_TABLE lock.
  166. Arguments:
  167. Cxtion
  168. Return Value:
  169. None.
  170. --*/
  171. {
  172. #undef DEBSUB
  173. #define DEBSUB "SndCsCreateCxtion:"
  174. DPRINT1(4, ":X: %ws: Creating join guid.\n", Cxtion->Name->Name);
  175. FrsCreateJoinGuid(&Cxtion->JoinGuid);
  176. SetCxtionFlag(Cxtion, CXTION_FLAGS_JOIN_GUID_VALID |
  177. CXTION_FLAGS_UNJOIN_GUID_VALID);
  178. //
  179. // Assign a comm queue. A cxtion must use the same comm queue for a given
  180. // session (join guid) to maintain packet order. Old packets have an
  181. // invalid join guid and are either not sent or ignored on the receiving side.
  182. //
  183. Cxtion->CommQueueIndex = SndCsAssignCommQueue();
  184. }
  185. VOID
  186. SndCsDestroyCxtion(
  187. IN PCXTION Cxtion,
  188. IN DWORD CxtionFlags
  189. )
  190. /*++
  191. Routine Description:
  192. Destroy a cxtion's join guid and unbind handles.
  193. Assumes: Caller has CXTION_TABLE lock.
  194. Arguments:
  195. Cxtion - Cxtion being destroyed.
  196. CxtionFlags - Caller specifies which state flags are cleared.
  197. Return Value:
  198. None.
  199. --*/
  200. {
  201. #undef DEBSUB
  202. #define DEBSUB "SndCsDestroyCxtion:"
  203. //
  204. // Nothing to do
  205. //
  206. if (Cxtion == NULL) {
  207. return;
  208. }
  209. //
  210. // Invalidate the join guid. Packets to be sent to this connection are
  211. // errored off because of their invalid join guid.
  212. // Packets received are errored off for the same reason.
  213. //
  214. DPRINT2(4, ":X: %ws: Destroying join guid (%08x)\n", Cxtion->Name->Name, CxtionFlags);
  215. ClearCxtionFlag(Cxtion, CxtionFlags |
  216. CXTION_FLAGS_JOIN_GUID_VALID |
  217. CXTION_FLAGS_TIMEOUT_SET);
  218. //
  219. // Unbind the old handles. They aren't very useful without a
  220. // valid join guid. This function is called out of FrsFreeType() when
  221. // freeing a cxtion; hence the partner field may not be filled in. Don't
  222. // unbind handles if there is no partner.
  223. //
  224. if ((Cxtion->Partner != NULL) &&
  225. (Cxtion->Partner->Guid != NULL) &&
  226. !Cxtion->JrnlCxtion) {
  227. SndCsUnBindHandles(Cxtion->Partner);
  228. }
  229. }
  230. VOID
  231. SndCsCxtionTimeout(
  232. IN PCOMMAND_PACKET TimeoutCmd,
  233. IN PVOID Ignore
  234. )
  235. /*++
  236. Routine Description:
  237. The cxtion has not received a reply from its partner for quite
  238. awhile. Unjoin the cxtion.
  239. Arguments:
  240. TimeoutCmd -- Timeout command packet
  241. Ignore
  242. Return Value:
  243. None.
  244. --*/
  245. {
  246. #undef DEBSUB
  247. #define DEBSUB "SndCsCxtionTimeout:"
  248. PREPLICA Replica;
  249. PCXTION Cxtion;
  250. //
  251. // Not a true timeout; just some error condition. Probably
  252. // shutdown. Ignore it.
  253. //
  254. if (!WIN_SUCCESS(TimeoutCmd->ErrorStatus)) {
  255. return;
  256. }
  257. //
  258. // Pull out params from command packet
  259. //
  260. Replica = SRReplica(TimeoutCmd);
  261. Cxtion = SRCxtion(TimeoutCmd);
  262. LOCK_CXTION_TABLE(Replica);
  263. //
  264. // The timeout is associated with a different join guid; ignore it
  265. //
  266. if (!CxtionFlagIs(Cxtion, CXTION_FLAGS_TIMEOUT_SET) ||
  267. !CxtionFlagIs(Cxtion, CXTION_FLAGS_JOIN_GUID_VALID) ||
  268. !GUIDS_EQUAL(&SRJoinGuid(TimeoutCmd), &Cxtion->JoinGuid)) {
  269. ClearCxtionFlag(Cxtion, CXTION_FLAGS_TIMEOUT_SET);
  270. UNLOCK_CXTION_TABLE(Replica);
  271. return;
  272. }
  273. //
  274. // Increment the Communication Timeouts counter for both the
  275. // replica set and the connection.
  276. //
  277. PM_INC_CTR_REPSET(Replica, CommTimeouts, 1);
  278. PM_INC_CTR_CXTION(Cxtion, CommTimeouts, 1);
  279. ClearCxtionFlag(Cxtion, CXTION_FLAGS_TIMEOUT_SET);
  280. UNLOCK_CXTION_TABLE(Replica);
  281. RcsSubmitReplicaCxtion(Replica, Cxtion, CMD_UNJOIN);
  282. return;
  283. }
  284. BOOL
  285. SndCsCheckCxtion(
  286. IN PCOMMAND_PACKET Cmd
  287. )
  288. /*++
  289. Routine Description:
  290. Check that the join guid is still valid, set the timer if needed.
  291. Arguments:
  292. Cmd
  293. Return Value:
  294. None.
  295. --*/
  296. {
  297. #undef DEBSUB
  298. #define DEBSUB "SndCsCheckCxtion:"
  299. PREPLICA Replica;
  300. PCXTION Cxtion;
  301. ULONG WaitTime;
  302. Replica = SRReplica(Cmd);
  303. Cxtion = SRCxtion(Cmd);
  304. //
  305. // Nothing to check
  306. //
  307. if (!SRJoinGuidValid(Cmd) &&
  308. !SRSetTimeout(Cmd) &&
  309. !VOLATILE_OUTBOUND_CXTION(Cxtion)) {
  310. return TRUE;
  311. }
  312. LOCK_CXTION_TABLE(Replica);
  313. //
  314. // Check that our session id (join guid) is still valid
  315. //
  316. if (SRJoinGuidValid(Cmd)) {
  317. if (!CxtionFlagIs(Cxtion, CXTION_FLAGS_JOIN_GUID_VALID) ||
  318. !GUIDS_EQUAL(&SRJoinGuid(Cmd), &Cxtion->JoinGuid)) {
  319. DPRINT1(4, "++ %ws: Join guid is INVALID.\n", Cxtion->Name->Name);
  320. UNLOCK_CXTION_TABLE(Replica);
  321. return FALSE;
  322. }
  323. }
  324. //
  325. // If our partner doesn't respond in time, unjoin the cxtion.
  326. //
  327. // *** NOTE *** Since the following is using state in the Cxtion struct
  328. // to record timeout info, only one fetch request can be active at a time.
  329. // Look at the timeout code to see what it will do.
  330. //
  331. // :SP1: Volatile connection cleanup.
  332. //
  333. // A volatile connection is used to seed sysvols after dcpromo. If there
  334. // is inactivity on a volatile outbound connection for more than
  335. // FRS_VOLATILE_CONNECTION_MAX_IDLE_TIME then this connection is unjoined.
  336. // An unjoin on a volatile outbound connection triggers a delete on that
  337. // connection. This is to prevent the case where staging files are kept
  338. // for ever on the parent for a volatile connection.
  339. //
  340. if (SRSetTimeout(Cmd) || VOLATILE_OUTBOUND_CXTION(Cxtion)) {
  341. if (!CxtionFlagIs(Cxtion, CXTION_FLAGS_TIMEOUT_SET)) {
  342. if (Cxtion->CommTimeoutCmd == NULL) {
  343. Cxtion->CommTimeoutCmd = FrsAllocCommand(NULL, CMD_UNKNOWN);
  344. FrsSetCompletionRoutine(Cxtion->CommTimeoutCmd, SndCsCxtionTimeout, NULL);
  345. SRCxtion(Cxtion->CommTimeoutCmd) = Cxtion;
  346. SRReplica(Cxtion->CommTimeoutCmd) = Replica;
  347. }
  348. //
  349. // Update join guid, cmd packet may be left over from previous join.
  350. //
  351. COPY_GUID(&SRJoinGuid(Cxtion->CommTimeoutCmd), &Cxtion->JoinGuid);
  352. WaitTime = (VOLATILE_OUTBOUND_CXTION(Cxtion) ?
  353. FRS_VOLATILE_CONNECTION_MAX_IDLE_TIME : CommTimeoutInMilliSeconds);
  354. WaitSubmit(Cxtion->CommTimeoutCmd, WaitTime, CMD_DELAYED_COMPLETE);
  355. SetCxtionFlag(Cxtion, CXTION_FLAGS_TIMEOUT_SET);
  356. }
  357. }
  358. UNLOCK_CXTION_TABLE(Replica);
  359. return TRUE;
  360. }
  361. DWORD
  362. SndCsDispatchCommError(
  363. PCOMM_PACKET CommPkt
  364. )
  365. /*++
  366. Routine Description:
  367. Transfering a comm packet to the appropriate command server
  368. for error processing.
  369. Arguments:
  370. CommPkt - comm packet that couldn't be sent
  371. Return Value:
  372. WIN32 Status
  373. --*/
  374. {
  375. #undef DEBSUB
  376. #define DEBSUB "SndCsDispatchCommError:"
  377. DWORD WStatus;
  378. DPRINT1(4, "Comm pkt in error %08x\n", CommPkt);
  379. switch(CommPkt->CsId) {
  380. case CS_RS:
  381. WStatus = RcsSubmitCommPktWithErrorToRcs(CommPkt);
  382. break;
  383. default:
  384. DPRINT1(0, "Unknown command server id %d\n", CommPkt->CsId);
  385. WStatus = ERROR_INVALID_FUNCTION;
  386. }
  387. DPRINT1_WS(0, "Could not process comm pkt with error %08x;", CommPkt, WStatus);
  388. return WStatus;
  389. }
  390. DWORD
  391. SndCsExit(
  392. PFRS_THREAD FrsThread
  393. )
  394. /*++
  395. Routine Description:
  396. Immediate cancel of all outstanding RPC calls for the thread
  397. identified by FrsThread. Set the tombstone to 5 seconds from
  398. now. If this thread does not exit within that time, any calls
  399. to ThSupWaitThread() will return a timeout error.
  400. Arguments:
  401. FrsThread
  402. Return Value:
  403. ERROR_SUCCESS
  404. --*/
  405. {
  406. #undef DEBSUB
  407. #define DEBSUB "SndCsExit:"
  408. if (HANDLE_IS_VALID(FrsThread->Handle)) {
  409. DPRINT1(4, ":X: Canceling RPC requests for thread %ws\n", FrsThread->Name);
  410. RpcCancelThreadEx(FrsThread->Handle, 0);
  411. }
  412. return ThSupExitWithTombstone(FrsThread);
  413. }
  414. DWORD
  415. SndCsMain(
  416. PVOID Arg
  417. )
  418. /*++
  419. Routine Description:
  420. Entry point for a thread serving the Send Command Server.
  421. Arguments:
  422. Arg - thread
  423. Return Value:
  424. None.
  425. --*/
  426. {
  427. #undef DEBSUB
  428. #define DEBSUB "SndCsMain:"
  429. DWORD WStatus;
  430. PFRS_QUEUE IdledQueue;
  431. PCOMMAND_PACKET Cmd;
  432. PGHANDLE GHandle;
  433. PHANDLE_LIST HandleList;
  434. PCXTION Cxtion;
  435. PREPLICA Replica;
  436. ULARGE_INTEGER Now;
  437. PFRS_THREAD FrsThread = (PFRS_THREAD)Arg;
  438. //
  439. // Thread is pointing at the correct command server
  440. //
  441. FRS_ASSERT(FrsThread->Data == &SndCs);
  442. //
  443. // Immediate cancel of outstanding RPC calls during shutdown
  444. //
  445. RpcMgmtSetCancelTimeout(0);
  446. FrsThread->Exit = SndCsExit;
  447. //
  448. // Try-Finally
  449. //
  450. try {
  451. //
  452. // Capture exception.
  453. //
  454. try {
  455. //
  456. // Pull entries off the "send" queue and send them on
  457. //
  458. cant_exit_yet:
  459. while (Cmd = FrsGetCommandServerIdled(&SndCs, &IdledQueue)) {
  460. Cxtion = SRCxtion(Cmd);
  461. Replica = SRReplica(Cmd);
  462. COMMAND_SND_COMM_TRACE(4, Cmd, ERROR_SUCCESS, "SndDeQ");
  463. //
  464. // The RPC interface was initialized at this time but the
  465. // advent of the API RPC interfaces necessitated moving
  466. // the RPC initialization into MainMustInit. The event
  467. // and the CMD_INIT_SUBSYSTEM command are left intact
  468. // until such time as they are deemed completely unneeded.
  469. //
  470. if (Cmd->Command == CMD_INIT_SUBSYSTEM) {
  471. //
  472. // All done
  473. //
  474. FrsCompleteCommand(Cmd, ERROR_SUCCESS);
  475. FrsRtlUnIdledQueue(IdledQueue);
  476. SetEvent(CommEvent);
  477. continue;
  478. }
  479. //
  480. // Send the Cmd to Cs
  481. //
  482. if (Cmd->Command == CMD_SND_CMD) {
  483. FrsSubmitCommandServer(SRCs(Cmd), SRCmd(Cmd));
  484. SRCmd(Cmd) = NULL;
  485. FrsCompleteCommand(Cmd, ERROR_SUCCESS);
  486. FrsRtlUnIdledQueue(IdledQueue);
  487. continue;
  488. }
  489. FRS_ASSERT(SRCommPkt(Cmd));
  490. FRS_ASSERT(SRTo(Cmd));
  491. COMMAND_SND_COMM_TRACE(4, Cmd, ERROR_SUCCESS, "SndStart");
  492. if (FrsIsShuttingDown) {
  493. //
  494. // Complete w/error
  495. //
  496. WStatus = ERROR_PROCESS_ABORTED;
  497. COMMAND_SND_COMM_TRACE(4, Cmd, WStatus, "SndFail - shutting down");
  498. FrsCompleteCommand(Cmd, WStatus);
  499. FrsRtlUnIdledQueue(IdledQueue);
  500. continue;
  501. }
  502. //
  503. // Check that the join guid (if any) is still valid
  504. //
  505. if (!SndCsCheckCxtion(Cmd)) {
  506. COMMAND_SND_COMM_TRACE(4, Cmd, WStatus, "SndFail - stale join guid");
  507. //
  508. // Unjoin the replica\cxtion (if applicable)
  509. //
  510. SndCsDispatchCommError(SRCommPkt(Cmd));
  511. //
  512. // Complete w/error
  513. //
  514. FrsCompleteCommand(Cmd, ERROR_OPERATION_ABORTED);
  515. FrsRtlUnIdledQueue(IdledQueue);
  516. continue;
  517. }
  518. //
  519. // Grab a bound rpc handle for this connection target.
  520. //
  521. GTabLockTable(GHandleTable);
  522. GHandle = GTabLookupNoLock(GHandleTable, SRTo(Cmd)->Guid, NULL);
  523. if (GHandle == NULL) {
  524. GHandle = FrsAllocType(GHANDLE_TYPE);
  525. COPY_GUID(&GHandle->Guid, SRTo(Cmd)->Guid);
  526. GTabInsertEntryNoLock(GHandleTable, GHandle, &GHandle->Guid, NULL);
  527. }
  528. GTabUnLockTable(GHandleTable);
  529. //
  530. // Grab the first handle entry off the list
  531. //
  532. EnterCriticalSection(&GHandle->Lock);
  533. GHandle->Ref = TRUE;
  534. HandleList = GHandle->HandleList;
  535. if (HandleList != NULL) {
  536. GHandle->HandleList = HandleList->Next;
  537. }
  538. LeaveCriticalSection(&GHandle->Lock);
  539. WStatus = ERROR_SUCCESS;
  540. if (HandleList == NULL) {
  541. //
  542. // No free handles bound to the destination server available.
  543. // Allocate a new one.
  544. // Note: Need to add a binding handle throttle.
  545. // Note: Why don't we use the same handle for multiple calls?
  546. //
  547. HandleList = FrsAlloc(sizeof(HANDLE_LIST));
  548. if (FrsIsShuttingDown) {
  549. WStatus = ERROR_PROCESS_ABORTED;
  550. COMMAND_SND_COMM_TRACE(4, Cmd, WStatus, "SndFail - shutting down");
  551. } else {
  552. //
  553. // Rpc call is cancelled if it doesn't return from
  554. // the rpc runtime in time. This is because TCP/IP
  555. // doesn't timeout if the server reboots.
  556. //
  557. GetSystemTimeAsFileTime((FILETIME *)&FrsThread->StartTime);
  558. WStatus = FrsRpcBindToServer(SRTo(Cmd),
  559. SRPrincName(Cmd),
  560. SRAuthLevel(Cmd),
  561. &HandleList->RpcHandle);
  562. //
  563. // Associate a penalty with the cxtion based on the
  564. // time needed to fail the rpc bind call. The penalty
  565. // is applied against joins to prevent a dead machine
  566. // from tying up Snd threads as they wait to timeout
  567. // repeated joins.
  568. //
  569. if (Cxtion != NULL) {
  570. if (!WIN_SUCCESS(WStatus)) {
  571. GetSystemTimeAsFileTime((FILETIME *)&Now);
  572. if (Now.QuadPart > FrsThread->StartTime.QuadPart) {
  573. Cxtion->Penalty += (ULONG)(Now.QuadPart -
  574. FrsThread->StartTime.QuadPart) /
  575. (1000 * 10);
  576. COMMAND_SND_COMM_TRACE(4, Cmd, WStatus, "SndFail - Binding Penalty");
  577. DPRINT1(4, "++ SndFail - Binding Penalty - %d\n", Cxtion->Penalty);
  578. }
  579. }
  580. }
  581. //
  582. // Reset RPC Cancel timeout for thread. No longer in rpc call.
  583. //
  584. FrsThread->StartTime.QuadPart = QUADZERO;
  585. }
  586. if (!WIN_SUCCESS(WStatus)) {
  587. HandleList = FrsFree(HandleList);
  588. COMMAND_SND_COMM_TRACE(0, Cmd, WStatus, "SndFail - binding");
  589. //
  590. // Increment the Bindings in error counter for both the
  591. // replica set and the connection.
  592. //
  593. PM_INC_CTR_REPSET(Replica, BindingsError, 1);
  594. PM_INC_CTR_CXTION(Cxtion, BindingsError, 1);
  595. } else {
  596. //
  597. // Increment the Bindings counter for both the
  598. // replica set and the connection.
  599. //
  600. PM_INC_CTR_REPSET(Replica, Bindings, 1);
  601. PM_INC_CTR_CXTION(Cxtion, Bindings, 1);
  602. }
  603. }
  604. if (WIN_SUCCESS(WStatus)) {
  605. //
  606. // Bind was successful and join guid is okay; send it on
  607. //
  608. try {
  609. //
  610. // Rpc call is cancelled if it doesn't return from
  611. // the rpc runtime in time. This is because TCP/IP
  612. // doesn't timeout if the server reboots.
  613. //
  614. GetSystemTimeAsFileTime((FILETIME *)&FrsThread->StartTime);
  615. if (FrsIsShuttingDown) {
  616. WStatus = ERROR_PROCESS_ABORTED;
  617. COMMAND_SND_COMM_TRACE(4, Cmd, WStatus, "SndFail - shutting down");
  618. } else {
  619. WStatus = FrsRpcSendCommPkt(HandleList->RpcHandle, SRCommPkt(Cmd));
  620. if (!WIN_SUCCESS(WStatus)) {
  621. COMMAND_SND_COMM_TRACE(0, Cmd, WStatus, "SndFail - rpc call");
  622. } else {
  623. COMMAND_SND_COMM_TRACE(4, Cmd, WStatus, "SndSuccess");
  624. }
  625. }
  626. } except (EXCEPTION_EXECUTE_HANDLER) {
  627. GET_EXCEPTION_CODE(WStatus);
  628. COMMAND_SND_COMM_TRACE(0, Cmd, WStatus, "SndFail - rpc exception");
  629. }
  630. //
  631. // Associate a penalty with the cxtion based on the time needed
  632. // to fail the rpc call. The penalty is applied against joins
  633. // to prevent a dead machine from tying up Snd threads as they
  634. // wait to timeout repeated joins.
  635. //
  636. if (Cxtion != NULL) {
  637. if (!WIN_SUCCESS(WStatus)) {
  638. GetSystemTimeAsFileTime((FILETIME *)&Now);
  639. if (Now.QuadPart > FrsThread->StartTime.QuadPart) {
  640. Cxtion->Penalty += (ULONG)(Now.QuadPart -
  641. FrsThread->StartTime.QuadPart) /
  642. (1000 * 10);
  643. COMMAND_SND_COMM_TRACE(0, Cmd, WStatus, "SndFail - Send Penalty");
  644. DPRINT1(4, "++ SndFail - Send Penalty - %d\n", Cxtion->Penalty);
  645. }
  646. } else {
  647. Cxtion->Penalty = 0;
  648. }
  649. }
  650. //
  651. // Reset RPC Cancel timeout for thread. No longer in rpc call.
  652. //
  653. FrsThread->StartTime.QuadPart = QUADZERO;
  654. //
  655. // The binding may be out of date; discard it
  656. //
  657. if (!WIN_SUCCESS(WStatus)) {
  658. //
  659. // Increment the Packets sent with error counter for both the
  660. // replica set and the connection.
  661. //
  662. PM_INC_CTR_REPSET(Replica, PacketsSentError, 1);
  663. PM_INC_CTR_CXTION(Cxtion, PacketsSentError, 1);
  664. if (!FrsIsShuttingDown) {
  665. FrsRpcUnBindFromServer(&HandleList->RpcHandle);
  666. }
  667. HandleList = FrsFree(HandleList);
  668. } else {
  669. //
  670. // Increment the Packets sent and bytes sent counter for both the
  671. // replica set and the connection.
  672. //
  673. PM_INC_CTR_REPSET(Replica, PacketsSent, 1);
  674. PM_INC_CTR_CXTION(Cxtion, PacketsSent, 1);
  675. PM_INC_CTR_REPSET(Replica, PacketsSentBytes, SRCommPkt(Cmd)->PktLen);
  676. PM_INC_CTR_CXTION(Cxtion, PacketsSentBytes, SRCommPkt(Cmd)->PktLen);
  677. }
  678. }
  679. //
  680. // We are done with the rpc handle. Return it to the list for
  681. // another thread to use. This saves rebinding for every call.
  682. //
  683. if (HandleList) {
  684. EnterCriticalSection(&GHandle->Lock);
  685. GHandle->Ref = TRUE;
  686. HandleList->Next = GHandle->HandleList;
  687. GHandle->HandleList = HandleList;
  688. LeaveCriticalSection(&GHandle->Lock);
  689. }
  690. //
  691. // Don't retry. The originator of the command packet is
  692. // responsible for retry. We DO NOT want multiple layers of
  693. // retries because that can lead to frustratingly long timeouts.
  694. //
  695. if (!WIN_SUCCESS(WStatus)) {
  696. //
  697. // Discard future packets that depend on this join
  698. //
  699. LOCK_CXTION_TABLE(Replica);
  700. SndCsDestroyCxtion(Cxtion, 0);
  701. UNLOCK_CXTION_TABLE(Replica);
  702. //
  703. // Unjoin the replica\cxtion (if applicable)
  704. //
  705. SndCsDispatchCommError(SRCommPkt(Cmd));
  706. }
  707. FrsCompleteCommand(Cmd, WStatus);
  708. FrsRtlUnIdledQueue(IdledQueue);
  709. } // end of while()
  710. //
  711. // Exit
  712. //
  713. FrsExitCommandServer(&SndCs, FrsThread);
  714. goto cant_exit_yet;
  715. //
  716. // Get exception status.
  717. //
  718. } except (EXCEPTION_EXECUTE_HANDLER) {
  719. GET_EXCEPTION_CODE(WStatus);
  720. }
  721. } finally {
  722. if (WIN_SUCCESS(WStatus)) {
  723. if (AbnormalTermination()) {
  724. WStatus = ERROR_OPERATION_ABORTED;
  725. }
  726. }
  727. DPRINT_WS(0, "SndCsMain finally.", WStatus);
  728. //
  729. // Trigger FRS shutdown if we terminated abnormally.
  730. //
  731. if (!WIN_SUCCESS(WStatus) && (WStatus != ERROR_PROCESS_ABORTED)) {
  732. DPRINT(0, "SndCsMain terminated abnormally, forcing service shutdown.\n");
  733. FrsIsShuttingDown = TRUE;
  734. SetEvent(ShutDownEvent);
  735. } else {
  736. WStatus = ERROR_SUCCESS;
  737. }
  738. }
  739. return WStatus;
  740. }
  741. VOID
  742. SndCsCommTimeout(
  743. IN PCOMMAND_PACKET Cmd,
  744. IN PVOID Arg
  745. )
  746. /*++
  747. Routine Description:
  748. Age the handle cache and cancel hung rpc requests every so often.
  749. Submit Cmd back onto the Delayed-Command-Server's queue.
  750. Arguments:
  751. Cmd - delayed command packet
  752. Arg - Unused
  753. Return Value:
  754. None. Cmd is submitted to DelCs.
  755. --*/
  756. {
  757. #undef DEBSUB
  758. #define DEBSUB "SndCsCommTimeout:"
  759. DWORD WStatus;
  760. PFRS_THREAD FrsThread;
  761. ULARGE_INTEGER Now;
  762. PVOID Key;
  763. PGHANDLE GHandle;
  764. PHANDLE_LIST HandleList;
  765. extern ULONG RpcAgedBinds;
  766. COMMAND_SND_COMM_TRACE(4, Cmd, ERROR_SUCCESS, "SndChk - Age and Hung");
  767. //
  768. // Age the handle cache
  769. //
  770. GTabLockTable(GHandleTable);
  771. Key = NULL;
  772. while (GHandle = GTabNextDatumNoLock(GHandleTable, &Key)) {
  773. EnterCriticalSection(&GHandle->Lock);
  774. if (!GHandle->Ref) {
  775. while (HandleList = GHandle->HandleList) {
  776. GHandle->HandleList = HandleList->Next;
  777. ++RpcAgedBinds;
  778. FrsRpcUnBindFromServer(&HandleList->RpcHandle);
  779. FrsFree(HandleList);
  780. DPRINT(5, "++ FrsRpcUnBindFromServer\n");
  781. }
  782. }
  783. GHandle->Ref = FALSE;
  784. LeaveCriticalSection(&GHandle->Lock);
  785. }
  786. GTabUnLockTable(GHandleTable);
  787. //
  788. // Cancel hung rpc requests
  789. //
  790. GetSystemTimeAsFileTime((FILETIME *)&Now);
  791. FrsThread = NULL;
  792. while (FrsThread = ThSupEnumThreads(FrsThread)) {
  793. //
  794. // If frs is shutting down; skip it
  795. //
  796. if (FrsIsShuttingDown) {
  797. continue;
  798. }
  799. //
  800. // Some other thread; skip it
  801. //
  802. if (FrsThread->Main != SndCsMain) {
  803. continue;
  804. }
  805. //
  806. // SndCs thread; Is it in an rpc call?
  807. //
  808. if (FrsThread->StartTime.QuadPart == QUADZERO) {
  809. continue;
  810. }
  811. //
  812. // Is the thread running? If not, skip it
  813. //
  814. if (!FrsThread->Running ||
  815. !HANDLE_IS_VALID(FrsThread->Handle)) {
  816. continue;
  817. }
  818. //
  819. // Is it hung in an rpc call?
  820. //
  821. if ((FrsThread->StartTime.QuadPart < Now.QuadPart) &&
  822. ((Now.QuadPart - FrsThread->StartTime.QuadPart) >= CommTimeoutCheck)) {
  823. //
  824. // Yep, cancel the rpc call
  825. //
  826. WStatus = RpcCancelThreadEx(FrsThread->Handle, 0);
  827. DPRINT1_WS(4, "++ RpcCancelThread(%d);", FrsThread->Id, WStatus);
  828. COMMAND_SND_COMM_TRACE(4, Cmd, WStatus, "SndChk - Cancel");
  829. }
  830. }
  831. //
  832. // Re-submit the command packet to the delayed command server
  833. //
  834. if (!FrsIsShuttingDown) {
  835. FrsDelCsCompleteSubmit(Cmd, CommTimeoutInMilliSeconds << 1);
  836. } else {
  837. //
  838. // Send the packet on to the generic completion routine
  839. //
  840. FrsSetCompletionRoutine(Cmd, FrsFreeCommand, NULL);
  841. FrsCompleteCommand(Cmd, Cmd->ErrorStatus);
  842. }
  843. }
  844. VOID
  845. SndCsInitialize(
  846. VOID
  847. )
  848. /*++
  849. Routine Description:
  850. Initialize the send command server subsystem.
  851. Alloc handle table, read reg pars, Create/init SndCs Alloc comm queue
  852. array, attach comm queues to SndCs control queue.
  853. Arguments:
  854. None.
  855. Return Value:
  856. TRUE - command server has been started
  857. FALSE - Not
  858. --*/
  859. {
  860. #undef DEBSUB
  861. #define DEBSUB "SndCsInitialize:"
  862. ULONG Status;
  863. DWORD i;
  864. PCOMMAND_PACKET Cmd;
  865. ULONG MaxThreads;
  866. //
  867. // Get the timeout value and convert to 100 nsec units.
  868. //
  869. CfgRegReadDWord(FKC_COMM_TIMEOUT, NULL, 0, &CommTimeoutInMilliSeconds);
  870. DPRINT1(4, ":S: CommTimeout is %d msec\n", CommTimeoutInMilliSeconds);
  871. CommTimeoutCheck = ((ULONGLONG)CommTimeoutInMilliSeconds) * 1000 * 10;
  872. DPRINT1(4, ":S: CommTimeout is %08x %08x 100-nsec\n",
  873. PRINTQUAD(CommTimeoutCheck));
  874. //
  875. // Binding handle table
  876. //
  877. GHandleTable = GTabAllocTable();
  878. //
  879. // Comm layer command server
  880. //
  881. CfgRegReadDWord(FKC_SNDCS_MAXTHREADS_PAR, NULL, 0, &MaxThreads);
  882. FrsInitializeCommandServer(&SndCs, MaxThreads, L"SndCs", SndCsMain);
  883. //
  884. // A short array of comm queues to increase parallelism. Each Comm queue
  885. // is attached to the Send cmd server control queue. Each cxtion gets
  886. // assigned to a comm queue when it "JOINS" to preserve packet order.
  887. //
  888. for (i = 0; i < MAX_COMM_QUEUE_NUMBER; ++i) {
  889. FrsInitializeQueue(&CommQueues[i], &SndCs.Control);
  890. }
  891. //
  892. // Start the comm layer
  893. //
  894. Cmd = FrsAllocCommand(&SndCs.Queue, CMD_INIT_SUBSYSTEM);
  895. FrsSubmitCommandServer(&SndCs, Cmd);
  896. //
  897. // Age the handle cache and check for hung rpc calls
  898. //
  899. Cmd = FrsAllocCommand(&SndCs.Queue, CMD_VERIFY_SERVICE);
  900. FrsSetCompletionRoutine(Cmd, SndCsCommTimeout, NULL);
  901. FrsDelCsCompleteSubmit(Cmd, CommTimeoutInMilliSeconds << 1);
  902. }
  903. VOID
  904. SndCsUnInitialize(
  905. VOID
  906. )
  907. /*++
  908. Routine Description:
  909. Uninitialize the send command server subsystem.
  910. Arguments:
  911. None.
  912. Return Value:
  913. TRUE - command server has been started
  914. FALSE - Not
  915. --*/
  916. {
  917. #undef DEBSUB
  918. #define DEBSUB "SndCsUnInitialize:"
  919. DWORD i;
  920. GTabFreeTable(GHandleTable, FrsFreeType);
  921. //
  922. // A short array of comm queues to increase parallelism.
  923. //
  924. for (i = 0; i < MAX_COMM_QUEUE_NUMBER; ++i) {
  925. FrsRtlDeleteQueue(&CommQueues[i]);
  926. }
  927. }
  928. VOID
  929. SndCsSubmitCommPkt(
  930. IN PREPLICA Replica,
  931. IN PCXTION Cxtion,
  932. IN PCHANGE_ORDER_ENTRY Coe,
  933. IN GUID *JoinGuid,
  934. IN BOOL SetTimeout,
  935. IN PCOMM_PACKET CommPkt,
  936. IN DWORD CommQueueIndex
  937. )
  938. /*++
  939. Routine Description:
  940. Submit a comm packet to the "send" command server for the target Cxtion.
  941. Arguments:
  942. Replica - Replica struct ptr
  943. Cxtion - Target connection for comm packet.
  944. Coe - Change order entry for related stage file fetch comm packet.
  945. This is used track the change orders that have a fetch request outstanding
  946. on a given inbound connection. Used by Forced Unjoins to send the
  947. CO thru the Retry path.
  948. NOTE: When Coe is supplied then SetTimeout should also be TRUE.
  949. JoinGuid - Current join Guid from Cxtion or null if starting join seq.
  950. SetTimeout - TRUE if caller wants a timeout set on this send request.
  951. It means that the caller is eventually expecting a response
  952. back. E.G. usually set on stage file fetch requests.
  953. CommPkt - Communication packet data to send.
  954. CommQueueIndex - Index of communication queue to use, generally allocated
  955. for each Cxtion struct.
  956. Return Value:
  957. None.
  958. --*/
  959. {
  960. #undef DEBSUB
  961. #define DEBSUB "SndCsSubmitCommPkt:"
  962. PCOMMAND_PACKET Cmd;
  963. FRS_ASSERT(CommQueueIndex < MAX_COMM_QUEUE_NUMBER);
  964. //
  965. // WARN: we assume that this function is called single-threaded per replica.
  966. // Davidor - Would be nice if the friggen comment above said why? I
  967. // currently don't see the reason for this.
  968. // Maybe: its the time out code in SndCsCheckCxtion()?
  969. //
  970. if (Coe != NULL) {
  971. //
  972. // Anytime we are supplying a Coe argument we are expecting a response
  973. // so verify that SetTimeout was requested and put the Coe in the
  974. // Cxtion's Coe table so we can send it through the retry path at
  975. // unjoin (or timeout).
  976. //
  977. FRS_ASSERT(SetTimeout);
  978. LOCK_CXTION_COE_TABLE(Replica, Cxtion);
  979. GTabInsertEntry(Cxtion->CoeTable, Coe, &Coe->Cmd.ChangeOrderGuid, NULL);
  980. UNLOCK_CXTION_COE_TABLE(Replica, Cxtion);
  981. }
  982. Cmd = FrsAllocCommand(&CommQueues[CommQueueIndex], CMD_SND_COMM_PACKET);
  983. SRTo(Cmd) = FrsBuildGName(FrsDupGuid(Cxtion->Partner->Guid),
  984. FrsWcsDup(Cxtion->PartnerDnsName));
  985. SRReplica(Cmd) = Replica;
  986. SRCxtion(Cmd) = Cxtion;
  987. if (JoinGuid) {
  988. COPY_GUID(&SRJoinGuid(Cmd), JoinGuid);
  989. SRJoinGuidValid(Cmd) = TRUE;
  990. }
  991. //
  992. // Partner principal name and Authentication level.
  993. //
  994. SRPrincName(Cmd) = FrsWcsDup(Cxtion->PartnerPrincName);
  995. SRAuthLevel(Cmd) = Cxtion->PartnerAuthLevel;
  996. SRSetTimeout(Cmd) = SetTimeout;
  997. SRCommPkt(Cmd) = CommPkt;
  998. FrsSetCompletionRoutine(Cmd, CommCompletionRoutine, NULL);
  999. //
  1000. // Check Comm packet consistency and put Send cmd on sent CS queue.
  1001. //
  1002. if (CommCheckPkt(CommPkt)) {
  1003. COMMAND_SND_COMM_TRACE(4, Cmd, ERROR_SUCCESS, "SndEnQComm");
  1004. FrsSubmitCommandServer(&SndCs, Cmd);
  1005. } else {
  1006. COMMAND_SND_COMM_TRACE(4, Cmd, ERROR_SUCCESS, "SndEnQERROR");
  1007. FrsCompleteCommand(Cmd, ERROR_INVALID_BLOCK);
  1008. }
  1009. }
  1010. VOID
  1011. SndCsSubmitCommPkt2(
  1012. IN PREPLICA Replica,
  1013. IN PCXTION Cxtion,
  1014. IN PCHANGE_ORDER_ENTRY Coe,
  1015. IN BOOL SetTimeout,
  1016. IN PCOMM_PACKET CommPkt
  1017. )
  1018. /*++
  1019. Routine Description:
  1020. Submit a comm packet to the "send" command server using the assigned
  1021. comm queue for the target Cxtion.
  1022. The Comm queue index and the join Guid come from the cxtion struct.
  1023. Arguments:
  1024. See arg description for SndCsSubmitCommPkt.
  1025. Return Value:
  1026. None.
  1027. --*/
  1028. {
  1029. #undef DEBSUB
  1030. #define DEBSUB "SndCsSubmitCommPkt2:"
  1031. SndCsSubmitCommPkt(Replica,
  1032. Cxtion,
  1033. Coe,
  1034. &Cxtion->JoinGuid,
  1035. SetTimeout,
  1036. CommPkt,
  1037. Cxtion->CommQueueIndex);
  1038. }
  1039. VOID
  1040. SndCsSubmitCmd(
  1041. IN PREPLICA Replica,
  1042. IN PCXTION Cxtion,
  1043. IN PCOMMAND_SERVER FlushCs,
  1044. IN PCOMMAND_PACKET FlushCmd,
  1045. IN DWORD CommQueueIndex
  1046. )
  1047. /*++
  1048. Routine Description:
  1049. Submit the FlushCmd packet to the "send" command server for the
  1050. target Cxtion. The FlushCmd packet will be submitted to
  1051. FlushCs once it bubbles to the top of the queue. I.e., once
  1052. the queue has been flushed.
  1053. Arguments:
  1054. Replica - replica set
  1055. Cxtion - cxtion identifies send queue
  1056. FlushCs - Command server to receive Cmd
  1057. FlushCmd - Command packet to send to Cs
  1058. CommQueueIndex - Identifies the comm queue
  1059. Return Value:
  1060. None.
  1061. --*/
  1062. {
  1063. #undef DEBSUB
  1064. #define DEBSUB "SndCsSubmitCmd:"
  1065. PCOMMAND_PACKET Cmd;
  1066. FRS_ASSERT(CommQueueIndex < MAX_COMM_QUEUE_NUMBER);
  1067. //
  1068. // Alloc the cmd packet and set the target queue and the command.
  1069. //
  1070. Cmd = FrsAllocCommand(&CommQueues[CommQueueIndex], CMD_SND_CMD);
  1071. //
  1072. // Destination Partner Guid / Dns Name
  1073. //
  1074. SRTo(Cmd) = FrsBuildGName(FrsDupGuid(Cxtion->Partner->Guid),
  1075. FrsWcsDup(Cxtion->PartnerDnsName));
  1076. SRReplica(Cmd) = Replica;
  1077. SRCxtion(Cmd) = Cxtion;
  1078. SRCs(Cmd) = FlushCs;
  1079. SRCmd(Cmd) = FlushCmd;
  1080. SRPrincName(Cmd) = FrsWcsDup(Cxtion->PartnerPrincName);
  1081. SRAuthLevel(Cmd) = Cxtion->PartnerAuthLevel;
  1082. FrsSetCompletionRoutine(Cmd, CommCompletionRoutine, NULL);
  1083. COMMAND_SND_COMM_TRACE(4, Cmd, ERROR_SUCCESS, "SndEnQCmd");
  1084. FrsSubmitCommandServer(&SndCs, Cmd);
  1085. }
  1086. VOID
  1087. SndCsShutDown(
  1088. VOID
  1089. )
  1090. /*++
  1091. Routine Description:
  1092. Shutdown the send command server
  1093. Arguments:
  1094. None.
  1095. Return Value:
  1096. None.
  1097. --*/
  1098. {
  1099. #undef DEBSUB
  1100. #define DEBSUB "SndCsShutDown:"
  1101. DWORD i;
  1102. //
  1103. // A short array of comm queues to increase parallelism.
  1104. //
  1105. for (i = 0; i < MAX_COMM_QUEUE_NUMBER; ++i) {
  1106. FrsRunDownCommandServer(&SndCs, &CommQueues[i]);
  1107. }
  1108. FrsRunDownCommandServer(&SndCs, &SndCs.Queue);
  1109. }