Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1015 lines
42 KiB

  1. // !!! PipeCount needs a critical section?
  2. /* Send a list of files across the named pipe as fast as possible
  3. *
  4. * The overall organisation:
  5. *
  6. * Sumserve receives requests over a named pipe. (See sumserve.h)
  7. * The requests can be for details of files or for the files
  8. * themselves. File details involve sending relatively small
  9. * quantities of data and therefore no attempt is made to
  10. * double-buffer or overlap execution.
  11. *
  12. * For a send-files request (SSREQ_FILES), the data is typically large,
  13. * and can be a whole NT build which means sending hundreds
  14. * of megabytes. Such a transfer can take literally days and
  15. * so optimisation to achieve maximum throughput is essential.
  16. *
  17. * To maximise throughput
  18. * 1. The data is packed before sending
  19. * 2. One thread per pipe does almost nothing except send data through
  20. * its pipe with all other work being done on other threads.
  21. *
  22. * Because we have had trouble with bad files being transmitted over
  23. * the network, we checksum each file. Windiff requires that we
  24. * do a scan first before doing a copy, so we already have checksums.
  25. * All we need to do is to check the newly received files.
  26. * LATER: We should not require checksums in advance. The checksuming
  27. * could be done by (yet another) pass, created if need be. An extra
  28. * flag could be added to the request to indicate "send checksums".
  29. *
  30. * The packing is done by a separate program that reads from a file and
  31. * writes to a file. This means that we get three lots of file I/O
  32. * (read; write; read) before the file is sent. For a small
  33. * file the disk cacheing may eliminate this, for a large file we
  34. * probably pay the price. A possible future enhancement is therefore
  35. * to rewrite the packing to do it in-storage so that the file is read
  36. * once only. In the meantime we run threads to overlap the packing
  37. * with the sending of the previous file(s).
  38. *
  39. * The main program sets up a named pipe which a client connects to.
  40. * This is necessary because pipes are only half-duplex. i.e. the
  41. * following hangs:
  42. * client read; server read; client write;
  43. * The write hangs waiting for the client read. We broadly speaking have one
  44. * pipe running in each direction.
  45. *
  46. * To eliminate the overhead of setting up a virtual circuit for each
  47. * file request there is a request code to send a list of files.
  48. * The protocol (for the control pipe) is then as follows:
  49. * 1. Typical session:
  50. * CLIENT SERVER
  51. * ----<SSREQ_FILES------------------>
  52. * <------(SSRESP_PIPENAME,pipename)--
  53. * ----<SSREQ_NEXTFILE,filename>----->
  54. * ----<SSREQ_NEXTFILE,filename>----->
  55. * ...
  56. * --------<SSREQ_ENDFILES>---------->
  57. *
  58. * Meanwhile, asynchronously with this, the data goes back the other way like
  59. *
  60. * CLIENT SERVER
  61. * <-----<SSNEWRESP>----------
  62. * <---<1 or more SSNEWPACK>--
  63. ...
  64. * <-----<SSNEWRESP>----------
  65. * <---<1 or more SSNEWPACK>--
  66. * ...
  67. * <-----<End>----------------
  68. *
  69. * Even a zero length file gets 1 SSNEWPACK record.
  70. * An Erroneous file (can't read etc) gets no SSNEWPACKs and a negative lCode
  71. * in its SSNEWRESP.
  72. * A file that goes wrong during read-in gets a packet length code of -1 or -2.
  73. * The end of the sequence of SSNEWPACKs is signalled by a shorter
  74. * than maximum length one. If the file is EXACTLY n buffers long
  75. * then an extra SSNEWPACK with zero bytes of data comes on the end.
  76. *
  77. * The work is broken into the following threads:
  78. * Control thread (ss_sendfiles):
  79. * Receives lists of files to be sent
  80. * Creates pipes for the actual transmission
  81. * Creates queues (see below. Queue parameters must match pipes)
  82. * Puts filenames onto first queue
  83. * Destroys first queue at end.
  84. * Packing thread
  85. * Takes file details from the packing queue
  86. * Packs the file (to create a temporary file)
  87. * Puts the file details (including the temp name) onto the reading queue
  88. * Destroys the reading queue at the end
  89. * Reading thread
  90. * Takes the file details from the reading queue
  91. * Splits the file into a header and a list of data packets
  92. * and enqueues each of these on the Sending thread.
  93. * (Note this means no more than one reading thread to be running).
  94. * Erases the temp file
  95. * Destroys the sending queue at the end
  96. * Sending thread
  97. * Takes the things from the sending thread and sends them
  98. *
  99. * This whole scheme can be running for multiple clients, so we
  100. * need some instance data that defines which pipeline we are
  101. * running. This is held in the instance data of the QUEUEs that
  102. * are created (retrieved by the queue emptiers by Queue_GetInstanceData).
  103. * The instance data at each stage is the handle of the following stage
  104. * i.e. the next QUEUE, or the hpipe of the data pipe for the last stage.
  105. * The current design only allows for one data pipe. If we have
  106. * multiple data pipes then we need to solve the following problems:
  107. * 1. Communication of the number and names of the data pipes
  108. * to the client (presumably across the control thread.
  109. * 2. Error handling
  110. * 3. Balancing the load between the pipes
  111. *
  112. * NORMAL SHUTDOWN:
  113. * After the last element has been Put to the first Queue the main thread
  114. * calls Queue_Destroy to destroy the first queue. This will result in
  115. * the queue being destroyed BUT NOT UNTIL THE LAST ELEMENT HAS BEEN GOT.
  116. * When the last packing thread gets its ENDQUEUE it calls Queue_Destroy
  117. * to destroy the next queue, and so on down the line.
  118. *
  119. * ERROR RECOVERY
  120. * Errors can occur at almost any stage.
  121. * The obvious implementation of having a global BOOL that tells
  122. * whether a disaster has happened won't work because there
  123. * could be multiple clients and only one of them with a disaster.
  124. *
  125. * An error in a single file is propagated forwards to the client end.
  126. * An error in the whole mechanism (net blown away) can mean that the
  127. * whole thing needs to be shut down. In this case the error must
  128. * be propagated backwards. That works as follows:
  129. * The Sending thread Queue_Aborts the SendQueue which it was Getting from.
  130. * This results in Puts to this queue returning FALSE.
  131. * Case 1. There are no more Puts anyway:
  132. * We are on the last file, the filling thread was about to Destroy the
  133. * queue anyway. It does so.
  134. * Case 2. The next Put gets a FALSE return code.
  135. * The thread attempting the Put does a Queue_Destroy on its output
  136. * queue and a Queue_Abort on its input queue.
  137. * This propagates all the way back until either the first queue
  138. * is aborted or it reaches a queue that was being destroyed anyway.
  139. * See Queue.h
  140. * Once the Putting thread has done a Destroy on its output queue,
  141. * the threads Getting from it (which are still running, even if
  142. * they did the Abort) get STOPTHREAD/ENDQUEUE back from a Get. The last Get
  143. * to a queue that has had a Queue_Destroy done on it has a side effect
  144. * of actually deallocating the queue. In our case we only have one
  145. * Getting thread, so what happens is that it Queue_Aborts the queue
  146. * and then does a Queue_Get which WAITs. When the Queue_Destroy comes in
  147. * from the Putting thread, this releases the WAITing Getting thread which
  148. * then actually deallocates the Queue.
  149. *
  150. * You can also get shutdown happening from both ends at once. This happens
  151. * when the control thread's pipe goes down getting names and the sending pipe
  152. * also breaks. (e.g. general net collapse or client aborted).
  153. */
  154. #include <windows.h>
  155. #include <stdio.h>
  156. #include <string.h>
  157. #include <gutils.h>
  158. #include "sumserve.h"
  159. #include "errlog.h"
  160. #include "server.h"
  161. #include "queue.h"
  162. #if DBG
  163. #define STATIC // allow for debug.
  164. #else
  165. #define STATIC static
  166. #endif
  167. /* SOCKETS / NAMED PIPES macros
  168. */
  169. #ifdef SOCKETS
  170. #define CLOSEHANDLE( handle ) closesocket( handle )
  171. #define TCPPORT 1024
  172. #else
  173. #define CLOSEHANDLE( handle ) CloseHandle( handle )
  174. #endif
  175. //////////////ULONG ss_checksum_block(PSTR block, int size);
  176. #define PIPEPREFIX "Sdpx" // for making an unlikely pipe name
  177. static PipeCount = 0; // for making pipe names unique.
  178. /* structure for recording all we need to know about a file as it
  179. * progresses along the chain of pipes */
  180. typedef struct {
  181. FILETIME ft_create;
  182. FILETIME ft_lastaccess;
  183. FILETIME ft_lastwrite;
  184. DWORD fileattribs;
  185. DWORD SizeHi; /* Anticipating files larger that 4GB! */
  186. DWORD SizeLo; /* Anticipating files larger that 4GB! */
  187. int ErrorCode;
  188. long Checksum; /* Uunused except for debug. */
  189. char TempName[MAX_PATH]; /* name of packed file at server */
  190. char Path[MAX_PATH]; /* name of file to fetch */
  191. char LocalName[MAX_PATH]; /* name of file at client end */
  192. } FILEDETAILS;
  193. /* forward declarations for procedures */
  194. STATIC int PackFile(QUEUE Queue);
  195. STATIC int ReadInFile(QUEUE Queue);
  196. STATIC int SendData(QUEUE Queue);
  197. STATIC void PurgePackedFiles(PSTR Ptr, int Len);
  198. STATIC BOOL EnqueueName(QUEUE Queue, LPSTR Path, UINT BuffLen);
  199. STATIC BOOL AddFileAttributes(FILEDETAILS * fd);
  200. static void Error(PSTR Title)
  201. {
  202. dprintf1(("Error %d from %s when creating data pipe.\n", GetLastError(), Title));
  203. }
  204. /* ss_sendfiles:
  205. Send a response naming the data pipe, collect further names
  206. from further client messages, all according to the protocol above.
  207. Start the data pipe and arrange that all the files are sent
  208. by getting them all enqueued on the first queue.
  209. Destroy PackQueue at the end. Arrange for the other queues
  210. to be destroyed by the usual Queue mechanism, or destroy them
  211. explicitly if they never get started.
  212. */
  213. BOOL
  214. ss_sendfiles(HANDLE hPipe, long lVersion)
  215. { /* Create the queues and set about filling the first one */
  216. QUEUE PackQueue, ReadQueue, SendQueue;
  217. #ifdef SOCKETS
  218. SOCKET hpSend;
  219. static BOOL SocketsInitialized = FALSE;
  220. #else
  221. HANDLE hpSend; /* the data pipe */
  222. #endif /* SOCKETS */
  223. char PipeName[80]; /* The name of the new data pipe */
  224. BOOL Started = FALSE; /* TRUE if something enqueued */
  225. #ifdef SOCKETS
  226. if( !SocketsInitialized )
  227. {
  228. WSADATA WSAData;
  229. if( ( WSAStartup( MAKEWORD( 1, 1 ), &WSAData ) ) == 0 )
  230. {
  231. SocketsInitialized = TRUE;
  232. }
  233. else
  234. {
  235. printf("WSAStartup failed");
  236. }
  237. }
  238. #endif
  239. {
  240. /****************************************
  241. We need security attributes for the pipe to let anyone other than the
  242. current user log on to it.
  243. ***************************************/
  244. /* Allocate DWORDs for the ACL to get them aligned. Round up to next DWORD above */
  245. DWORD Acl[(sizeof(ACL)+sizeof(ACCESS_ALLOWED_ACE)+3)/4+4]; // + 4 by experiment!!
  246. SECURITY_DESCRIPTOR sd;
  247. PSECURITY_DESCRIPTOR psd = &sd;
  248. PSID psid;
  249. SID_IDENTIFIER_AUTHORITY SidWorld = SECURITY_WORLD_SID_AUTHORITY;
  250. PACL pacl = (PACL)(&(Acl[0]));
  251. SECURITY_ATTRIBUTES sa;
  252. if (!AllocateAndInitializeSid( &SidWorld, 1, SECURITY_WORLD_RID
  253. , 1, 2, 3, 4, 5, 6, 7
  254. , &psid
  255. )
  256. ) {
  257. Error("AllocateAndInitializeSid");
  258. return FALSE;
  259. }
  260. if (!InitializeAcl(pacl, sizeof(Acl), ACL_REVISION)){
  261. Error("InitializeAcl");
  262. return FALSE;
  263. }
  264. if (!AddAccessAllowedAce(pacl, ACL_REVISION, GENERIC_WRITE|GENERIC_READ, psid)){
  265. Error("AddAccessAllowedAce");
  266. return FALSE;
  267. }
  268. if (!InitializeSecurityDescriptor(psd, SECURITY_DESCRIPTOR_REVISION)){
  269. Error("InitializeSecurityDescriptor");
  270. return FALSE;
  271. }
  272. if (!SetSecurityDescriptorDacl(psd, TRUE, pacl, FALSE)){
  273. Error("SetSecurityDescriptorDacl");
  274. return FALSE;
  275. }
  276. sa.nLength = sizeof(sa);
  277. sa.lpSecurityDescriptor = psd;
  278. sa.bInheritHandle = TRUE;
  279. /* We now have a good security descriptor! */
  280. /* Create the (new, unique) name of the pipe and then create the pipe */
  281. /* I am finding it hard to decide whether the following line (++PpipeCount)
  282. actually needs a critical section or not. The worst that could happen
  283. would be that we got an attempt to create a pipe with an existing name.
  284. */
  285. ++PipeCount;
  286. sprintf(PipeName, "\\\\.\\pipe\\%s%d", PIPEPREFIX, PipeCount);
  287. #ifdef SOCKETS
  288. if (!ss_sendnewresp( hPipe, SS_VERSION, SSRESP_PIPENAME
  289. , 0, 0, 0, TCPPORT, "")) {
  290. dprintf1(( "Failed to send response on pipe %x naming new pipe.\n"
  291. , hPipe));
  292. return FALSE; /* Caller will close hPipe */
  293. }
  294. if( !SocketListen( TCPPORT, &hpSend ) )
  295. {
  296. dprintf1(("Could not create socket\n"));
  297. return FALSE;
  298. }
  299. FreeSid(psid);
  300. #else
  301. hpSend = CreateNamedPipe(PipeName, /* pipe name */
  302. PIPE_ACCESS_DUPLEX, /* both read and write */
  303. PIPE_WAIT|PIPE_TYPE_MESSAGE|PIPE_READMODE_MESSAGE,
  304. 1, /* at most one instance */
  305. 10000, /* sizeof(SSNEWPACK) + some for luck */
  306. 0, /* dynamic inbound buffer allocation */
  307. 5000, /* def. timeout 5 seconds */
  308. &sa /* security descriptor */
  309. );
  310. FreeSid(psid);
  311. if (hpSend == INVALID_HANDLE_VALUE) {
  312. dprintf1(("Could not create named data pipe\n"));
  313. return FALSE;
  314. }
  315. dprintf1(("Data pipe %x called '%s' created for main pipe %x.\n", hpSend, PipeName, hPipe));
  316. #endif /* SOCKETS */
  317. }
  318. /* Send the response which names the data pipe */
  319. #ifndef SOCKETS
  320. if (!ss_sendnewresp( hPipe, SS_VERSION, SSRESP_PIPENAME
  321. , 0, 0, 0, 0, PipeName)) {
  322. dprintf1(( "Failed to send response on pipe %x naming new pipe.\n"
  323. , hPipe));
  324. CLOSEHANDLE(hpSend);
  325. return FALSE; /* Caller will close hPipe */
  326. }
  327. if (!ConnectNamedPipe(hpSend, NULL)) {
  328. CLOSEHANDLE(hpSend);
  329. return FALSE;
  330. }
  331. #endif /* NOT SOCKETS */
  332. //dprintf1(("Client connected to data pipe -- here we go...\n"));
  333. /* Create all the queues: Allow up to 10K file names to be queued
  334. up to 10 files to be packed in advance and 6 buffers of data to be
  335. read into main storage in advance:
  336. proc MxMT MnQS MxQ Event InstData Name*/
  337. SendQueue = Queue_Create(SendData, 1, 0, 6, NULL, (DWORD)hpSend, "SendQueue");
  338. ReadQueue = Queue_Create(ReadInFile, 1, 0, 10, NULL, (DWORD)SendQueue, "ReadQueue");
  339. PackQueue = Queue_Create(PackFile, 3, 0, 99999, NULL, (DWORD)ReadQueue, "PackQueue");
  340. /* Abort unless it all worked */
  341. if (PackQueue==NULL || ReadQueue==NULL || SendQueue==NULL) {
  342. dprintf1(("Queues for pipe %x failed to Create. Aborting...\n", hPipe));
  343. if (PackQueue) Queue_Destroy(PackQueue);
  344. if (ReadQueue) Queue_Destroy(ReadQueue);
  345. if (SendQueue) Queue_Destroy(SendQueue);
  346. CLOSEHANDLE(hpSend);
  347. return FALSE; /* Caller will close hPipe */
  348. }
  349. /* Collect names from client and enqueue each one */
  350. for (; ; )
  351. { SSNEWREQ Request; /* message from client */
  352. DWORD ActSize; /* bytes read from (main) pipe */
  353. if (ReadFile(hPipe, &Request, sizeof(Request), &ActSize, NULL)){
  354. if (Request.lVersion>SS_VERSION) {
  355. dprintf1(("Bad version %d in file list request on pipe %x\n"
  356. , Request.lVersion, hPipe));
  357. break;
  358. }
  359. if (Request.lRequest!=LREQUEST) {
  360. dprintf1(("Bad LREQUEST from pipe %x\n", hPipe));
  361. break;
  362. }
  363. if (Request.lCode == -SSREQ_ENDFILES) {
  364. dprintf1(("End of client's files list on pipe %x\n", hPipe));
  365. /* This is the clean way to end */
  366. Queue_Destroy(PackQueue);
  367. if (!Started) {
  368. /* OK - so the clever clogs requested zero files */
  369. Queue_Destroy(ReadQueue);
  370. Queue_Destroy(SendQueue);
  371. /* Send a No More Files response */
  372. #ifdef SOCKETS
  373. {
  374. SSNEWRESP resp;
  375. resp.lVersion = SS_VERSION;
  376. resp.lResponse = LRESPONSE;
  377. resp.lCode = SSRESP_END;
  378. resp.ulSize = 0;
  379. resp.ulSum = 0;
  380. resp.ft_lastwrite.dwLowDateTime = 0;
  381. resp.ft_lastwrite.dwHighDateTime = 0;
  382. send(hpSend, (PSTR) &resp, sizeof(resp), 0);
  383. }
  384. #else
  385. ss_sendnewresp( hpSend, SS_VERSION, SSRESP_END
  386. , 0,0, 0,0, NULL);
  387. #endif /* SOCKETS */
  388. CLOSEHANDLE(hpSend);
  389. }
  390. return TRUE;
  391. }
  392. if (Request.lCode != -SSREQ_NEXTFILE) {
  393. dprintf1(( "Bad code (%d) in files list from pipe %x\n"
  394. , Request.lCode, hPipe));
  395. break;
  396. }
  397. }
  398. else { DWORD errorcode = GetLastError();
  399. switch(errorcode) {
  400. case ERROR_NO_DATA:
  401. case ERROR_BROKEN_PIPE:
  402. /* pipe connection lost - forget it */
  403. dprintf1(("main pipe %x broken on read\n", hPipe));
  404. break;
  405. default:
  406. dprintf1(("read error %d on main pipe %x\n", errorcode, hPipe));
  407. break;
  408. }
  409. break;
  410. }
  411. if (!EnqueueName( PackQueue, Request.szPath
  412. , (UINT)((LPBYTE)(&Request) + ActSize - (LPBYTE)(&Request.szPath))
  413. )
  414. ){
  415. break;
  416. }
  417. Started = TRUE;
  418. } /* loop */
  419. /* only exit this way on error */
  420. /* Close the queues down. Allow what's in them to run through */
  421. Queue_Destroy(PackQueue);
  422. if (!Started) {
  423. Queue_Destroy(ReadQueue);
  424. Queue_Destroy(SendQueue);
  425. }
  426. return FALSE;
  427. } /* ss_sendfiles */
  428. /* Attempt to Queue.Put Path onto Queue as a FILEDETAILS
  429. with default values for all other fields.
  430. Return TRUE or FALSE according as it succeeded.
  431. */
  432. STATIC BOOL EnqueueName(QUEUE Queue, LPSTR Path, UINT BuffLen)
  433. {
  434. FILEDETAILS fd;
  435. /* unpack Path and LocalName from "superstring" */
  436. strcpy(fd.Path, Path);
  437. BuffLen -= (strlen(Path)+1);
  438. if (BuffLen<0) return FALSE; // Uh oh! strlen just looked at garbage.
  439. Path += strlen(Path)+1;
  440. BuffLen -= (strlen(Path)+1);
  441. if (BuffLen<0) return FALSE; // Uh oh! strlen just looked at garbage.
  442. strcpy(fd.LocalName, Path);
  443. /* set defaults for every field */
  444. fd.ErrorCode = 0;
  445. fd.ft_lastwrite.dwLowDateTime = 0;
  446. fd.ft_lastwrite.dwHighDateTime = 0;
  447. fd.ft_create.dwLowDateTime = 0;
  448. fd.ft_create.dwHighDateTime = 0;
  449. fd.ft_lastaccess.dwLowDateTime = 0;
  450. fd.ft_lastaccess.dwHighDateTime = 0;
  451. fd.fileattribs = 0;
  452. fd.SizeHi = 0;
  453. fd.SizeLo = 0;
  454. fd.Checksum = 0;
  455. fd.TempName[0] = '\0';
  456. if(!Queue_Put(Queue, (LPBYTE)&fd, sizeof(fd))){
  457. dprintf1(("Put to pack queue failed\n"));
  458. return FALSE;
  459. }
  460. return TRUE;
  461. } /* EnqueueName */
  462. /* Dequeue elements from Queue, pack them and enqueue them on the next
  463. queue whose queue handle is the InstanceData of Queue.
  464. The ErrorCode in fd when Dequeued must be 0. ??? Incautious?
  465. Destroy the output queue at the end.
  466. On a serious error, Queue_Abort Queue and Queue_Destroy the output queue.
  467. */
  468. STATIC int PackFile(QUEUE Queue)
  469. {
  470. FILEDETAILS fd; /* the queue element processed */
  471. QUEUE OutQueue;
  472. BOOL Aborting = FALSE; /* TRUE means input has been aborted (probably output is sick) */
  473. DWORD ThreadId;
  474. ThreadId = GetCurrentThreadId();
  475. dprintf1(("File packer %d starting \n", ThreadId)); // can't quote hPipe, don't know it
  476. OutQueue = (QUEUE)Queue_GetInstanceData(Queue);
  477. for (; ; )
  478. { int rc; /* return code from Queue_Get */
  479. rc = Queue_Get(Queue, (LPBYTE)&fd, sizeof(fd));
  480. if (rc==ENDQUEUE) {
  481. dprintf1(("Packing thread %d ending.\n", ThreadId));
  482. Queue_Destroy(OutQueue);
  483. // dprintf1(("%d has done Queue_Destroy on ReadQueue.\n", ThreadId));
  484. ExitThread(0);
  485. }
  486. if (rc==STOPTHREAD) {
  487. dprintf1(("%d, a packing thread ending.\n", ThreadId));
  488. ExitThread(0);
  489. }
  490. else if (rc<0) {
  491. dprintf1(( "Packing thread %d aborting. Bad return code %d from Get.\n"
  492. , ThreadId, rc));
  493. if (Aborting) break; /* Touch nothing, just quit! */
  494. Queue_Abort(Queue, NULL);
  495. continue; /* Next Queue_Get destroys Queue */
  496. }
  497. /* First add the file attributes to fd */
  498. AddFileAttributes(&fd);
  499. /* no need to look at return code fd.ErrorCode tells all */
  500. /* create temp filename */
  501. if ( 0 != fd.ErrorCode
  502. || 0==GetTempPath(sizeof(fd.TempName), fd.TempName)
  503. || 0==GetTempFileName(fd.TempName, "sum", 0, fd.TempName)
  504. )
  505. fd.ErrorCode = SSRESP_NOTEMPPATH;
  506. /* Pack into temp file */
  507. if (fd.ErrorCode==0) {
  508. BOOL bOK = FALSE;
  509. //dprintf1(("%d Compressing file '%s' => '%s'\n", ThreadId, fd.Path, fd.TempName));
  510. /* compress the file into this temporary file
  511. Maybe it will behave badly if there's a large file or
  512. no temp space or something...
  513. */
  514. try{
  515. if (!ss_compress(fd.Path, fd.TempName)) {
  516. fd.ErrorCode = SSRESP_COMPRESSFAIL;
  517. dprintf1(("Compress failure on %d for %s\n", ThreadId, fd.Path));
  518. }
  519. else bOK = TRUE;
  520. } except(EXCEPTION_EXECUTE_HANDLER) {
  521. if (!bOK){
  522. fd.ErrorCode = SSRESP_COMPRESSEXCEPT;
  523. dprintf1(("Compress failure on %d for %s\n", ThreadId, fd.Path));
  524. #ifdef trace
  525. { char msg[80];
  526. wsprintf( msg, "Compress failure on %d for %s\n"
  527. , ThreadId, fd.Path);
  528. Trace_File(msg);
  529. }
  530. #endif
  531. }
  532. }
  533. }
  534. //dprintf1(("%d Putting file '%s' onto Read Queue\n", ThreadId, fd.Path));
  535. if (!Queue_Put(OutQueue, (LPBYTE)&fd, sizeof(fd))) {
  536. dprintf1(("%d Put to ReadQueue failed for %s.\n", ThreadId, fd.Path));
  537. Queue_Abort(Queue, NULL);
  538. DeleteFile(fd.TempName);
  539. Aborting = TRUE;
  540. /* bug: If this Queue_Put fails on the very first Put,
  541. then the next queue in the chain after OutQueue will
  542. never come alive and so will never get Destroyed.
  543. Worst it could cause is a memory leak. ???
  544. */
  545. continue; /* next Queue_Get destroys Queue */
  546. }
  547. }
  548. return 0;
  549. } /* PackFile */
  550. /* Use the file name in *fd and get its attributes (size, time etc)
  551. Add these to fd. If it fails, set the ErrorCode in *fd
  552. to an appropriate non-zero value.
  553. */
  554. STATIC BOOL AddFileAttributes(FILEDETAILS * fd)
  555. {
  556. HANDLE hFile;
  557. BY_HANDLE_FILE_INFORMATION bhfi;
  558. hFile = CreateFile(fd->Path, GENERIC_READ, FILE_SHARE_READ,
  559. NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0);
  560. if (hFile == INVALID_HANDLE_VALUE) {
  561. fd->ErrorCode = SSRESP_CANTOPEN;
  562. return FALSE;
  563. }
  564. /* bug in GetFileInformationByHandle if file not on local
  565. * machine? Avoid it!
  566. */
  567. bhfi.dwFileAttributes = GetFileAttributes(fd->Path);
  568. if (bhfi.dwFileAttributes == 0xFFFFFFFF) {
  569. fd->ErrorCode = SSRESP_NOATTRIBS;
  570. CloseHandle(hFile);
  571. return FALSE;
  572. }
  573. if (!GetFileTime(hFile, &bhfi.ftCreationTime,
  574. &bhfi.ftLastAccessTime, &bhfi.ftLastWriteTime)){
  575. fd->ErrorCode = SSRESP_NOATTRIBS;
  576. dprintf1(("Can't get file attributes for %s\n"
  577. , (fd->Path?fd->Path : "NULL")));
  578. CloseHandle(hFile);
  579. return FALSE;
  580. }
  581. CloseHandle(hFile);
  582. {
  583. LONG err;
  584. fd->Checksum = checksum_file(fd->Path, &err);
  585. if (err!=0) {
  586. fd->ErrorCode = SSRESP_CANTOPEN;
  587. return FALSE;
  588. }
  589. }
  590. fd->ft_lastwrite = bhfi.ftLastWriteTime;
  591. fd->ft_lastaccess = bhfi.ftLastAccessTime;
  592. fd->ft_create = bhfi.ftCreationTime;
  593. fd->SizeHi = bhfi.nFileSizeHigh;
  594. fd->SizeLo = bhfi.nFileSizeLow;
  595. fd->fileattribs = bhfi.dwFileAttributes;
  596. return TRUE;
  597. } /* AddFileAttributes */
  598. /* Dequeue elements from Queue, Create on the output queue a SSNEWRESP
  599. followed by 1 or more SSNEWPACK structures, the last of which will be
  600. shorter than full length (zero length data if need be) to mark end-of-file.
  601. Files with errors already get zero SSNEWPACKs but bad code in SSNEWRESP.
  602. The output queue is the instance data of Queue.
  603. */
  604. STATIC int ReadInFile(QUEUE Queue)
  605. { FILEDETAILS fd; /* The queue element processed */
  606. QUEUE OutQueue;
  607. HANDLE hFile; /* The packed file */
  608. SSNEWPACK Pack; /* output message */
  609. BOOL ShortBlockSent; /* no need to send another SSNEWPACK
  610. Client knows the file has ended */
  611. BOOL Aborting = FALSE; /* Input has been aborted. e.g. because output sick */
  612. dprintf1(("File reader starting \n"));
  613. OutQueue = (QUEUE)Queue_GetInstanceData(Queue);
  614. for (; ; ) /* for each file */
  615. { int rc; /* return code from Queue_Get */
  616. rc = Queue_Get(Queue, (LPBYTE)&fd, sizeof(fd));
  617. if (rc==STOPTHREAD || rc==ENDQUEUE) {
  618. if (!Aborting) {
  619. /* Enqueue a No More Files response */
  620. SSNEWRESP resp;
  621. resp.lVersion = SS_VERSION;
  622. resp.lResponse = LRESPONSE;
  623. resp.lCode = SSRESP_END;
  624. if (!Queue_Put( OutQueue, (LPBYTE)&resp , RESPHEADSIZE)) {
  625. dprintf1(("Failed to Put SSRESP_END on SendQueue\n"));
  626. }
  627. //// dprintf1(( "Qued SSRESP_END: %x %x %x %x...\n"
  628. //// , resp.lVersion, resp.lResponse, resp.lCode, resp.ulSize));
  629. }
  630. if (rc==ENDQUEUE)
  631. Queue_Destroy(OutQueue);
  632. dprintf1(("File reader ending\n"));
  633. ExitThread(0);
  634. }
  635. else if (rc<0){
  636. dprintf1(("ReadIn aborting. Bad return code %d from Queue_Get.\n", rc));
  637. if (Aborting) break; /* All gone wrong. Just quit! */
  638. Queue_Abort(Queue, PurgePackedFiles);
  639. CloseHandle(hFile);
  640. Aborting = TRUE;
  641. continue; /* next Get gets STOPTHREAD */
  642. }
  643. //dprintf1(( "Reading file '%s' Error code %d\n"
  644. // , (fd.TempName?fd.TempName:"NULL"), fd.ErrorCode
  645. // ));
  646. if (fd.ErrorCode==0) {
  647. /* open temp (compressed) file */
  648. hFile = CreateFile(fd.TempName, GENERIC_READ, 0, NULL,
  649. OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0);
  650. if (hFile == INVALID_HANDLE_VALUE) {
  651. /* report that we could not read the file */
  652. fd.ErrorCode = SSRESP_NOREADCOMP;
  653. dprintf1(( "Couldn't open compressed file for %s %s\n"
  654. , fd.Path, fd.TempName));
  655. }
  656. }
  657. if ( fd.ErrorCode==SSRESP_COMPRESSFAIL
  658. || fd.ErrorCode==SSRESP_NOREADCOMP
  659. || fd.ErrorCode==SSRESP_NOTEMPPATH
  660. || fd.ErrorCode==SSRESP_COMPRESSEXCEPT
  661. ) {
  662. /* open original uncompressed file */
  663. hFile = CreateFile(fd.Path, GENERIC_READ, 0, NULL,
  664. OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0);
  665. if (hFile == INVALID_HANDLE_VALUE) {
  666. /* report that we could not read the file */
  667. fd.ErrorCode = SSRESP_NOREAD;
  668. dprintf1(( "Couldn't open file %s \n", fd.Path));
  669. }
  670. }
  671. /* Put the file name etc on the output queue as a SSNEWRESP */
  672. { SSNEWRESP resp;
  673. LPSTR LocalName;
  674. resp.lVersion = SS_VERSION;
  675. resp.lResponse = LRESPONSE;
  676. resp.lCode = (fd.ErrorCode ? fd.ErrorCode: SSRESP_FILE);
  677. resp.ulSize = fd.SizeLo; /* file size <= 4GB !!! */
  678. resp.fileattribs = fd.fileattribs;
  679. resp.ft_create = fd.ft_create;
  680. resp.ft_lastwrite = fd.ft_lastwrite;
  681. resp.ft_lastaccess = fd.ft_lastaccess;
  682. resp.ulSum = fd.Checksum;
  683. resp.bSumValid = FALSE;
  684. strcpy(resp.szFile, fd.Path);
  685. LocalName = resp.szFile+strlen(resp.szFile)+1;
  686. strcpy(LocalName, fd.LocalName);
  687. if(!Queue_Put( OutQueue, (LPBYTE)&resp
  688. , RESPHEADSIZE + strlen(resp.szFile)
  689. +strlen(LocalName)+2)
  690. ) {
  691. dprintf1(("Put to SendQueue failed.\n"));
  692. Queue_Abort(Queue, PurgePackedFiles);
  693. Aborting = TRUE;
  694. CloseHandle(hFile);
  695. continue; /* next Get gets STOPTHREAD */
  696. }
  697. // dprintf1(( "Qued SSRESP_FILE: %x %x %x %x...\n"
  698. // , resp.lVersion, resp.lResponse, resp.lCode, resp.ulSize));
  699. }
  700. Pack.lSequence = 0;
  701. /* Loop reading blocks of the file and queueing them
  702. Set fd.ErrorCode for failures.
  703. I'm worried about file systems that give me short blocks in the
  704. middles of files!!!
  705. */
  706. ShortBlockSent = FALSE;
  707. if ( fd.ErrorCode==SSRESP_COMPRESSFAIL
  708. || fd.ErrorCode==SSRESP_NOREADCOMP
  709. || fd.ErrorCode==SSRESP_NOTEMPPATH
  710. || fd.ErrorCode==SSRESP_COMPRESSEXCEPT
  711. || fd.ErrorCode==0
  712. ) {
  713. for(;;) /* for each block */
  714. {
  715. DWORD ActSize; /* bytes read */
  716. if( !ReadFile( hFile, &(Pack.Data), sizeof(Pack.Data)
  717. , &ActSize, NULL) ) {
  718. /* error reading temp file. */
  719. if (ShortBlockSent) {
  720. /* Fine. End reached */
  721. /* Should check error was end of file !!! */
  722. CloseHandle(hFile);
  723. break; /* blocks loop */
  724. }
  725. dprintf1(( "Error reading temp file %s.\n"
  726. , (fd.TempName?fd.TempName:"NULL")));
  727. CloseHandle(hFile);
  728. dprintf1(("deleting bad file: %s\n", fd.TempName));
  729. DeleteFile(fd.TempName);
  730. Pack.ulSize = (ULONG)(-2); /* tell client */
  731. break; /* blocks loop */
  732. }
  733. else if (ActSize > sizeof(Pack.Data)) {
  734. dprintf1(( "!!? Read too long! %d %d\n"
  735. , ActSize, sizeof(Pack.Data)));
  736. Pack.ulSize = (ULONG)(-1); /* tell client */
  737. }
  738. else Pack.ulSize = ActSize;
  739. if (ActSize==0 && ShortBlockSent) {
  740. /* This is normal! */
  741. CloseHandle(hFile);
  742. break;
  743. }
  744. else ++Pack.lSequence;
  745. Pack.lPacket = LPACKET;
  746. Pack.lVersion = SS_VERSION;
  747. Pack.ulSum = 0;
  748. //////////////////// Pack.ulSum = ss_checksum_block(Pack.Data, ActSize); ///////////
  749. if(!Queue_Put( OutQueue, (LPBYTE)&Pack
  750. , PACKHEADSIZE+ActSize)){
  751. dprintf1(("Put to SendQueue failed.\n"));
  752. Queue_Abort(Queue, PurgePackedFiles);
  753. CloseHandle(hFile);
  754. Aborting = TRUE;
  755. break; /* from blocks loop */
  756. }
  757. // dprintf1(( "Qued SSNEWPACK: %x %x %x %x %x...\n"
  758. // , Pack.lVersion, Pack.lPacket, Pack.lSequence, Pack.ulSize
  759. // , Pack.ulSum));
  760. if (ActSize<PACKDATALENGTH) { /* Success. Finished */
  761. ShortBlockSent = TRUE;
  762. }
  763. }
  764. } /* blocks */
  765. /* The data is all in storage now. Delete the temp file
  766. If there was no temp file (due to error) this still should be harmless.
  767. */
  768. #ifndef LAURIE
  769. DeleteFile(fd.TempName);
  770. #endif // LAURIE
  771. // dprintf1(("deleting file: %s\n", fd.TempName));
  772. } /* files */
  773. return 0;
  774. } /* ReadInFile */
  775. /* Dequeue elements from Queue, send them down the pipe whose
  776. handle is the instance data of Queue.
  777. On error Abort Queue.
  778. */
  779. STATIC int SendData(QUEUE Queue)
  780. {
  781. SSNEWPACK ssp; /* relies on this being no shorter than a SSRESP */
  782. #ifdef SOCKETS
  783. SOCKET OutPipe;
  784. #else
  785. HANDLE OutPipe;
  786. #endif /* SOCKETS */
  787. BOOL Aborting = FALSE; /* TRUE means input has been aborted */
  788. dprintf1(("File sender starting \n"));
  789. if (!SetThreadPriority(GetCurrentThread(),THREAD_PRIORITY_HIGHEST))
  790. dprintf1(("Failed to set thread priority\n"));
  791. #ifdef SOCKETS
  792. OutPipe = (SOCKET)Queue_GetInstanceData(Queue);
  793. #else
  794. OutPipe = (HANDLE)Queue_GetInstanceData(Queue);
  795. #endif
  796. try{
  797. for (; ; ) {
  798. int rc; /* return code of Queue_Get */
  799. rc = Queue_Get(Queue, (LPBYTE)&ssp, sizeof(ssp));
  800. if (rc==STOPTHREAD || rc==ENDQUEUE)
  801. {
  802. break;
  803. }
  804. else if (rc<0) {
  805. dprintf1(("Send thread aborting. Bad rc %d from Get_Queue.\n", rc));
  806. if (Aborting) break; /* All gone wrong. Just quit! */
  807. Queue_Abort(Queue, NULL);
  808. Aborting = TRUE;
  809. continue; /* next Queue_Get destroys Queue */
  810. }
  811. // // { ULONG Sum;
  812. // // if (ssp.lPacket==LPACKET) {
  813. // // if (ssp.ulSum != (Sum =ss_checksum_block(ssp.Data, ssp.ulSize))) {
  814. // // dprintf1(( "!!Checksum error at send. Was %x should be %x\n"
  815. // // , Sum, ssp.ulSum));
  816. // // }
  817. // // }
  818. // // }
  819. #ifdef SOCKETS
  820. if(SOCKET_ERROR != send(OutPipe, (char far *)&ssp, ssp.ulSize+PACKHEADSIZE, 0) )
  821. #else
  822. if (!ss_sendblock(OutPipe, (PSTR) &ssp, rc))
  823. #endif /* SOCKETS */
  824. {
  825. dprintf1(("Connection on pipe %x lost during send\n", OutPipe));
  826. Queue_Abort(Queue, NULL);
  827. Aborting = TRUE;
  828. continue; /* next Queue_Get destroys Queue */
  829. }
  830. ////dprintf1(( "Sent %x %x %x %x %x...\n"
  831. //// , ssp.lVersion, ssp.lPacket, ssp.lSequence, ssp.ulSize, ssp.ulSum));
  832. } /* packets */
  833. }
  834. finally{
  835. /* kill the data pipe cleanly */
  836. #ifndef SOCKETS
  837. FlushFileBuffers(OutPipe);
  838. DisconnectNamedPipe(OutPipe);
  839. #endif /* NOT SOCKETS */
  840. CLOSEHANDLE(OutPipe);
  841. dprintf1(("Data send thread ending.\n"));
  842. }
  843. return 0; /* exit thread */
  844. } /* SendData */
  845. /* This gets called once for every FILEDETAILS on the ReadInQueue
  846. to delete the temp files.
  847. */
  848. STATIC void PurgePackedFiles(PSTR Ptr, int Len)
  849. { FILEDETAILS * pfd;
  850. pfd = (FILEDETAILS *)Ptr;
  851. // dprintf1(("purging file: %s\n", pfd->TempName));
  852. DeleteFile(pfd->TempName);
  853. } /* PurgePackedFiles */
  854. #if 0
  855. /* produce a checksum of a block of data.
  856. *
  857. * This is undoubtedly a good checksum algorithm, but it's also compute bound.
  858. * For version 1 we turn it off. If we decide in version 2 to turn it back
  859. * on again then we will use a faster algorithm (e.g. the one used to checksum
  860. * a whole file.
  861. *
  862. * Generate checksum by the formula
  863. * checksum = SUM( rnd(i)*(1+byte[i]) )
  864. * where byte[i] is the i-th byte in the file, counting from 1
  865. * rnd(x) is a pseudo-random number generated from the seed x.
  866. *
  867. * Adding 1 to byte ensures that all null bytes contribute, rather than
  868. * being ignored. Multiplying each such byte by a pseudo-random
  869. * function of its position ensures that "anagrams" of each other come
  870. * to different sums. The pseudorandom function chosen is successive
  871. * powers of 1664525 modulo 2**32. 1664525 is a magic number taken
  872. * from Donald Knuth's "The Art Of Computer Programming"
  873. */
  874. ULONG
  875. ss_checksum_block(PSTR block, int size)
  876. {
  877. unsigned long lCheckSum = 0; /* grows into the checksum */
  878. const unsigned long lSeed = 1664525; /* seed for random Knuth */
  879. unsigned long lRand = 1; /* seed**n */
  880. unsigned long lIndex = 1; /* byte number in block */
  881. unsigned Byte; /* next byte to process in buffer */
  882. unsigned length; /* unsigned copy of size */
  883. length = size;
  884. for (Byte = 0; Byte < length ;++Byte, ++lIndex) {
  885. lRand = lRand*lSeed;
  886. lCheckSum += lIndex*(1+block[Byte])*lRand;
  887. }
  888. return(lCheckSum);
  889. } /* ss_checksum_block */
  890. #endif