Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1015 lines
42 KiB

// !!! PipeCount needs a critical section?
/* Send a list of files across the named pipe as fast as possible
*
* The overall organisation:
*
* Sumserve receives requests over a named pipe. (See sumserve.h)
* The requests can be for details of files or for the files
* themselves. File details involve sending relatively small
* quantities of data and therefore no attempt is made to
* double-buffer or overlap execution.
*
* For a send-files request (SSREQ_FILES), the data is typically large,
* and can be a whole NT build which means sending hundreds
* of megabytes. Such a transfer can take literally days and
* so optimisation to achieve maximum throughput is essential.
*
* To maximise throughput
* 1. The data is packed before sending
* 2. One thread per pipe does almost nothing except send data through
* its pipe with all other work being done on other threads.
*
* Because we have had trouble with bad files being transmitted over
* the network, we checksum each file. Windiff requires that we
* do a scan first before doing a copy, so we already have checksums.
* All we need to do is to check the newly received files.
* LATER: We should not require checksums in advance. The checksuming
* could be done by (yet another) pass, created if need be. An extra
* flag could be added to the request to indicate "send checksums".
*
* The packing is done by a separate program that reads from a file and
* writes to a file. This means that we get three lots of file I/O
* (read; write; read) before the file is sent. For a small
* file the disk cacheing may eliminate this, for a large file we
* probably pay the price. A possible future enhancement is therefore
* to rewrite the packing to do it in-storage so that the file is read
* once only. In the meantime we run threads to overlap the packing
* with the sending of the previous file(s).
*
* The main program sets up a named pipe which a client connects to.
* This is necessary because pipes are only half-duplex. i.e. the
* following hangs:
* client read; server read; client write;
* The write hangs waiting for the client read. We broadly speaking have one
* pipe running in each direction.
*
* To eliminate the overhead of setting up a virtual circuit for each
* file request there is a request code to send a list of files.
* The protocol (for the control pipe) is then as follows:
* 1. Typical session:
* CLIENT SERVER
* ----<SSREQ_FILES------------------>
* <------(SSRESP_PIPENAME,pipename)--
* ----<SSREQ_NEXTFILE,filename>----->
* ----<SSREQ_NEXTFILE,filename>----->
* ...
* --------<SSREQ_ENDFILES>---------->
*
* Meanwhile, asynchronously with this, the data goes back the other way like
*
* CLIENT SERVER
* <-----<SSNEWRESP>----------
* <---<1 or more SSNEWPACK>--
...
* <-----<SSNEWRESP>----------
* <---<1 or more SSNEWPACK>--
* ...
* <-----<End>----------------
*
* Even a zero length file gets 1 SSNEWPACK record.
* An Erroneous file (can't read etc) gets no SSNEWPACKs and a negative lCode
* in its SSNEWRESP.
* A file that goes wrong during read-in gets a packet length code of -1 or -2.
* The end of the sequence of SSNEWPACKs is signalled by a shorter
* than maximum length one. If the file is EXACTLY n buffers long
* then an extra SSNEWPACK with zero bytes of data comes on the end.
*
* The work is broken into the following threads:
* Control thread (ss_sendfiles):
* Receives lists of files to be sent
* Creates pipes for the actual transmission
* Creates queues (see below. Queue parameters must match pipes)
* Puts filenames onto first queue
* Destroys first queue at end.
* Packing thread
* Takes file details from the packing queue
* Packs the file (to create a temporary file)
* Puts the file details (including the temp name) onto the reading queue
* Destroys the reading queue at the end
* Reading thread
* Takes the file details from the reading queue
* Splits the file into a header and a list of data packets
* and enqueues each of these on the Sending thread.
* (Note this means no more than one reading thread to be running).
* Erases the temp file
* Destroys the sending queue at the end
* Sending thread
* Takes the things from the sending thread and sends them
*
* This whole scheme can be running for multiple clients, so we
* need some instance data that defines which pipeline we are
* running. This is held in the instance data of the QUEUEs that
* are created (retrieved by the queue emptiers by Queue_GetInstanceData).
* The instance data at each stage is the handle of the following stage
* i.e. the next QUEUE, or the hpipe of the data pipe for the last stage.
* The current design only allows for one data pipe. If we have
* multiple data pipes then we need to solve the following problems:
* 1. Communication of the number and names of the data pipes
* to the client (presumably across the control thread.
* 2. Error handling
* 3. Balancing the load between the pipes
*
* NORMAL SHUTDOWN:
* After the last element has been Put to the first Queue the main thread
* calls Queue_Destroy to destroy the first queue. This will result in
* the queue being destroyed BUT NOT UNTIL THE LAST ELEMENT HAS BEEN GOT.
* When the last packing thread gets its ENDQUEUE it calls Queue_Destroy
* to destroy the next queue, and so on down the line.
*
* ERROR RECOVERY
* Errors can occur at almost any stage.
* The obvious implementation of having a global BOOL that tells
* whether a disaster has happened won't work because there
* could be multiple clients and only one of them with a disaster.
*
* An error in a single file is propagated forwards to the client end.
* An error in the whole mechanism (net blown away) can mean that the
* whole thing needs to be shut down. In this case the error must
* be propagated backwards. That works as follows:
* The Sending thread Queue_Aborts the SendQueue which it was Getting from.
* This results in Puts to this queue returning FALSE.
* Case 1. There are no more Puts anyway:
* We are on the last file, the filling thread was about to Destroy the
* queue anyway. It does so.
* Case 2. The next Put gets a FALSE return code.
* The thread attempting the Put does a Queue_Destroy on its output
* queue and a Queue_Abort on its input queue.
* This propagates all the way back until either the first queue
* is aborted or it reaches a queue that was being destroyed anyway.
* See Queue.h
* Once the Putting thread has done a Destroy on its output queue,
* the threads Getting from it (which are still running, even if
* they did the Abort) get STOPTHREAD/ENDQUEUE back from a Get. The last Get
* to a queue that has had a Queue_Destroy done on it has a side effect
* of actually deallocating the queue. In our case we only have one
* Getting thread, so what happens is that it Queue_Aborts the queue
* and then does a Queue_Get which WAITs. When the Queue_Destroy comes in
* from the Putting thread, this releases the WAITing Getting thread which
* then actually deallocates the Queue.
*
* You can also get shutdown happening from both ends at once. This happens
* when the control thread's pipe goes down getting names and the sending pipe
* also breaks. (e.g. general net collapse or client aborted).
*/
#include <windows.h>
#include <stdio.h>
#include <string.h>
#include <gutils.h>
#include "sumserve.h"
#include "errlog.h"
#include "server.h"
#include "queue.h"
#if DBG
#define STATIC // allow for debug.
#else
#define STATIC static
#endif
/* SOCKETS / NAMED PIPES macros
*/
#ifdef SOCKETS
#define CLOSEHANDLE( handle ) closesocket( handle )
#define TCPPORT 1024
#else
#define CLOSEHANDLE( handle ) CloseHandle( handle )
#endif
//////////////ULONG ss_checksum_block(PSTR block, int size);
#define PIPEPREFIX "Sdpx" // for making an unlikely pipe name
static PipeCount = 0; // for making pipe names unique.
/* structure for recording all we need to know about a file as it
* progresses along the chain of pipes */
typedef struct {
FILETIME ft_create;
FILETIME ft_lastaccess;
FILETIME ft_lastwrite;
DWORD fileattribs;
DWORD SizeHi; /* Anticipating files larger that 4GB! */
DWORD SizeLo; /* Anticipating files larger that 4GB! */
int ErrorCode;
long Checksum; /* Uunused except for debug. */
char TempName[MAX_PATH]; /* name of packed file at server */
char Path[MAX_PATH]; /* name of file to fetch */
char LocalName[MAX_PATH]; /* name of file at client end */
} FILEDETAILS;
/* forward declarations for procedures */
STATIC int PackFile(QUEUE Queue);
STATIC int ReadInFile(QUEUE Queue);
STATIC int SendData(QUEUE Queue);
STATIC void PurgePackedFiles(PSTR Ptr, int Len);
STATIC BOOL EnqueueName(QUEUE Queue, LPSTR Path, UINT BuffLen);
STATIC BOOL AddFileAttributes(FILEDETAILS * fd);
static void Error(PSTR Title)
{
dprintf1(("Error %d from %s when creating data pipe.\n", GetLastError(), Title));
}
/* ss_sendfiles:
Send a response naming the data pipe, collect further names
from further client messages, all according to the protocol above.
Start the data pipe and arrange that all the files are sent
by getting them all enqueued on the first queue.
Destroy PackQueue at the end. Arrange for the other queues
to be destroyed by the usual Queue mechanism, or destroy them
explicitly if they never get started.
*/
BOOL
ss_sendfiles(HANDLE hPipe, long lVersion)
{ /* Create the queues and set about filling the first one */
QUEUE PackQueue, ReadQueue, SendQueue;
#ifdef SOCKETS
SOCKET hpSend;
static BOOL SocketsInitialized = FALSE;
#else
HANDLE hpSend; /* the data pipe */
#endif /* SOCKETS */
char PipeName[80]; /* The name of the new data pipe */
BOOL Started = FALSE; /* TRUE if something enqueued */
#ifdef SOCKETS
if( !SocketsInitialized )
{
WSADATA WSAData;
if( ( WSAStartup( MAKEWORD( 1, 1 ), &WSAData ) ) == 0 )
{
SocketsInitialized = TRUE;
}
else
{
printf("WSAStartup failed");
}
}
#endif
{
/****************************************
We need security attributes for the pipe to let anyone other than the
current user log on to it.
***************************************/
/* Allocate DWORDs for the ACL to get them aligned. Round up to next DWORD above */
DWORD Acl[(sizeof(ACL)+sizeof(ACCESS_ALLOWED_ACE)+3)/4+4]; // + 4 by experiment!!
SECURITY_DESCRIPTOR sd;
PSECURITY_DESCRIPTOR psd = &sd;
PSID psid;
SID_IDENTIFIER_AUTHORITY SidWorld = SECURITY_WORLD_SID_AUTHORITY;
PACL pacl = (PACL)(&(Acl[0]));
SECURITY_ATTRIBUTES sa;
if (!AllocateAndInitializeSid( &SidWorld, 1, SECURITY_WORLD_RID
, 1, 2, 3, 4, 5, 6, 7
, &psid
)
) {
Error("AllocateAndInitializeSid");
return FALSE;
}
if (!InitializeAcl(pacl, sizeof(Acl), ACL_REVISION)){
Error("InitializeAcl");
return FALSE;
}
if (!AddAccessAllowedAce(pacl, ACL_REVISION, GENERIC_WRITE|GENERIC_READ, psid)){
Error("AddAccessAllowedAce");
return FALSE;
}
if (!InitializeSecurityDescriptor(psd, SECURITY_DESCRIPTOR_REVISION)){
Error("InitializeSecurityDescriptor");
return FALSE;
}
if (!SetSecurityDescriptorDacl(psd, TRUE, pacl, FALSE)){
Error("SetSecurityDescriptorDacl");
return FALSE;
}
sa.nLength = sizeof(sa);
sa.lpSecurityDescriptor = psd;
sa.bInheritHandle = TRUE;
/* We now have a good security descriptor! */
/* Create the (new, unique) name of the pipe and then create the pipe */
/* I am finding it hard to decide whether the following line (++PpipeCount)
actually needs a critical section or not. The worst that could happen
would be that we got an attempt to create a pipe with an existing name.
*/
++PipeCount;
sprintf(PipeName, "\\\\.\\pipe\\%s%d", PIPEPREFIX, PipeCount);
#ifdef SOCKETS
if (!ss_sendnewresp( hPipe, SS_VERSION, SSRESP_PIPENAME
, 0, 0, 0, TCPPORT, "")) {
dprintf1(( "Failed to send response on pipe %x naming new pipe.\n"
, hPipe));
return FALSE; /* Caller will close hPipe */
}
if( !SocketListen( TCPPORT, &hpSend ) )
{
dprintf1(("Could not create socket\n"));
return FALSE;
}
FreeSid(psid);
#else
hpSend = CreateNamedPipe(PipeName, /* pipe name */
PIPE_ACCESS_DUPLEX, /* both read and write */
PIPE_WAIT|PIPE_TYPE_MESSAGE|PIPE_READMODE_MESSAGE,
1, /* at most one instance */
10000, /* sizeof(SSNEWPACK) + some for luck */
0, /* dynamic inbound buffer allocation */
5000, /* def. timeout 5 seconds */
&sa /* security descriptor */
);
FreeSid(psid);
if (hpSend == INVALID_HANDLE_VALUE) {
dprintf1(("Could not create named data pipe\n"));
return FALSE;
}
dprintf1(("Data pipe %x called '%s' created for main pipe %x.\n", hpSend, PipeName, hPipe));
#endif /* SOCKETS */
}
/* Send the response which names the data pipe */
#ifndef SOCKETS
if (!ss_sendnewresp( hPipe, SS_VERSION, SSRESP_PIPENAME
, 0, 0, 0, 0, PipeName)) {
dprintf1(( "Failed to send response on pipe %x naming new pipe.\n"
, hPipe));
CLOSEHANDLE(hpSend);
return FALSE; /* Caller will close hPipe */
}
if (!ConnectNamedPipe(hpSend, NULL)) {
CLOSEHANDLE(hpSend);
return FALSE;
}
#endif /* NOT SOCKETS */
//dprintf1(("Client connected to data pipe -- here we go...\n"));
/* Create all the queues: Allow up to 10K file names to be queued
up to 10 files to be packed in advance and 6 buffers of data to be
read into main storage in advance:
proc MxMT MnQS MxQ Event InstData Name*/
SendQueue = Queue_Create(SendData, 1, 0, 6, NULL, (DWORD)hpSend, "SendQueue");
ReadQueue = Queue_Create(ReadInFile, 1, 0, 10, NULL, (DWORD)SendQueue, "ReadQueue");
PackQueue = Queue_Create(PackFile, 3, 0, 99999, NULL, (DWORD)ReadQueue, "PackQueue");
/* Abort unless it all worked */
if (PackQueue==NULL || ReadQueue==NULL || SendQueue==NULL) {
dprintf1(("Queues for pipe %x failed to Create. Aborting...\n", hPipe));
if (PackQueue) Queue_Destroy(PackQueue);
if (ReadQueue) Queue_Destroy(ReadQueue);
if (SendQueue) Queue_Destroy(SendQueue);
CLOSEHANDLE(hpSend);
return FALSE; /* Caller will close hPipe */
}
/* Collect names from client and enqueue each one */
for (; ; )
{ SSNEWREQ Request; /* message from client */
DWORD ActSize; /* bytes read from (main) pipe */
if (ReadFile(hPipe, &Request, sizeof(Request), &ActSize, NULL)){
if (Request.lVersion>SS_VERSION) {
dprintf1(("Bad version %d in file list request on pipe %x\n"
, Request.lVersion, hPipe));
break;
}
if (Request.lRequest!=LREQUEST) {
dprintf1(("Bad LREQUEST from pipe %x\n", hPipe));
break;
}
if (Request.lCode == -SSREQ_ENDFILES) {
dprintf1(("End of client's files list on pipe %x\n", hPipe));
/* This is the clean way to end */
Queue_Destroy(PackQueue);
if (!Started) {
/* OK - so the clever clogs requested zero files */
Queue_Destroy(ReadQueue);
Queue_Destroy(SendQueue);
/* Send a No More Files response */
#ifdef SOCKETS
{
SSNEWRESP resp;
resp.lVersion = SS_VERSION;
resp.lResponse = LRESPONSE;
resp.lCode = SSRESP_END;
resp.ulSize = 0;
resp.ulSum = 0;
resp.ft_lastwrite.dwLowDateTime = 0;
resp.ft_lastwrite.dwHighDateTime = 0;
send(hpSend, (PSTR) &resp, sizeof(resp), 0);
}
#else
ss_sendnewresp( hpSend, SS_VERSION, SSRESP_END
, 0,0, 0,0, NULL);
#endif /* SOCKETS */
CLOSEHANDLE(hpSend);
}
return TRUE;
}
if (Request.lCode != -SSREQ_NEXTFILE) {
dprintf1(( "Bad code (%d) in files list from pipe %x\n"
, Request.lCode, hPipe));
break;
}
}
else { DWORD errorcode = GetLastError();
switch(errorcode) {
case ERROR_NO_DATA:
case ERROR_BROKEN_PIPE:
/* pipe connection lost - forget it */
dprintf1(("main pipe %x broken on read\n", hPipe));
break;
default:
dprintf1(("read error %d on main pipe %x\n", errorcode, hPipe));
break;
}
break;
}
if (!EnqueueName( PackQueue, Request.szPath
, (UINT)((LPBYTE)(&Request) + ActSize - (LPBYTE)(&Request.szPath))
)
){
break;
}
Started = TRUE;
} /* loop */
/* only exit this way on error */
/* Close the queues down. Allow what's in them to run through */
Queue_Destroy(PackQueue);
if (!Started) {
Queue_Destroy(ReadQueue);
Queue_Destroy(SendQueue);
}
return FALSE;
} /* ss_sendfiles */
/* Attempt to Queue.Put Path onto Queue as a FILEDETAILS
with default values for all other fields.
Return TRUE or FALSE according as it succeeded.
*/
STATIC BOOL EnqueueName(QUEUE Queue, LPSTR Path, UINT BuffLen)
{
FILEDETAILS fd;
/* unpack Path and LocalName from "superstring" */
strcpy(fd.Path, Path);
BuffLen -= (strlen(Path)+1);
if (BuffLen<0) return FALSE; // Uh oh! strlen just looked at garbage.
Path += strlen(Path)+1;
BuffLen -= (strlen(Path)+1);
if (BuffLen<0) return FALSE; // Uh oh! strlen just looked at garbage.
strcpy(fd.LocalName, Path);
/* set defaults for every field */
fd.ErrorCode = 0;
fd.ft_lastwrite.dwLowDateTime = 0;
fd.ft_lastwrite.dwHighDateTime = 0;
fd.ft_create.dwLowDateTime = 0;
fd.ft_create.dwHighDateTime = 0;
fd.ft_lastaccess.dwLowDateTime = 0;
fd.ft_lastaccess.dwHighDateTime = 0;
fd.fileattribs = 0;
fd.SizeHi = 0;
fd.SizeLo = 0;
fd.Checksum = 0;
fd.TempName[0] = '\0';
if(!Queue_Put(Queue, (LPBYTE)&fd, sizeof(fd))){
dprintf1(("Put to pack queue failed\n"));
return FALSE;
}
return TRUE;
} /* EnqueueName */
/* Dequeue elements from Queue, pack them and enqueue them on the next
queue whose queue handle is the InstanceData of Queue.
The ErrorCode in fd when Dequeued must be 0. ??? Incautious?
Destroy the output queue at the end.
On a serious error, Queue_Abort Queue and Queue_Destroy the output queue.
*/
STATIC int PackFile(QUEUE Queue)
{
FILEDETAILS fd; /* the queue element processed */
QUEUE OutQueue;
BOOL Aborting = FALSE; /* TRUE means input has been aborted (probably output is sick) */
DWORD ThreadId;
ThreadId = GetCurrentThreadId();
dprintf1(("File packer %d starting \n", ThreadId)); // can't quote hPipe, don't know it
OutQueue = (QUEUE)Queue_GetInstanceData(Queue);
for (; ; )
{ int rc; /* return code from Queue_Get */
rc = Queue_Get(Queue, (LPBYTE)&fd, sizeof(fd));
if (rc==ENDQUEUE) {
dprintf1(("Packing thread %d ending.\n", ThreadId));
Queue_Destroy(OutQueue);
// dprintf1(("%d has done Queue_Destroy on ReadQueue.\n", ThreadId));
ExitThread(0);
}
if (rc==STOPTHREAD) {
dprintf1(("%d, a packing thread ending.\n", ThreadId));
ExitThread(0);
}
else if (rc<0) {
dprintf1(( "Packing thread %d aborting. Bad return code %d from Get.\n"
, ThreadId, rc));
if (Aborting) break; /* Touch nothing, just quit! */
Queue_Abort(Queue, NULL);
continue; /* Next Queue_Get destroys Queue */
}
/* First add the file attributes to fd */
AddFileAttributes(&fd);
/* no need to look at return code fd.ErrorCode tells all */
/* create temp filename */
if ( 0 != fd.ErrorCode
|| 0==GetTempPath(sizeof(fd.TempName), fd.TempName)
|| 0==GetTempFileName(fd.TempName, "sum", 0, fd.TempName)
)
fd.ErrorCode = SSRESP_NOTEMPPATH;
/* Pack into temp file */
if (fd.ErrorCode==0) {
BOOL bOK = FALSE;
//dprintf1(("%d Compressing file '%s' => '%s'\n", ThreadId, fd.Path, fd.TempName));
/* compress the file into this temporary file
Maybe it will behave badly if there's a large file or
no temp space or something...
*/
try{
if (!ss_compress(fd.Path, fd.TempName)) {
fd.ErrorCode = SSRESP_COMPRESSFAIL;
dprintf1(("Compress failure on %d for %s\n", ThreadId, fd.Path));
}
else bOK = TRUE;
} except(EXCEPTION_EXECUTE_HANDLER) {
if (!bOK){
fd.ErrorCode = SSRESP_COMPRESSEXCEPT;
dprintf1(("Compress failure on %d for %s\n", ThreadId, fd.Path));
#ifdef trace
{ char msg[80];
wsprintf( msg, "Compress failure on %d for %s\n"
, ThreadId, fd.Path);
Trace_File(msg);
}
#endif
}
}
}
//dprintf1(("%d Putting file '%s' onto Read Queue\n", ThreadId, fd.Path));
if (!Queue_Put(OutQueue, (LPBYTE)&fd, sizeof(fd))) {
dprintf1(("%d Put to ReadQueue failed for %s.\n", ThreadId, fd.Path));
Queue_Abort(Queue, NULL);
DeleteFile(fd.TempName);
Aborting = TRUE;
/* bug: If this Queue_Put fails on the very first Put,
then the next queue in the chain after OutQueue will
never come alive and so will never get Destroyed.
Worst it could cause is a memory leak. ???
*/
continue; /* next Queue_Get destroys Queue */
}
}
return 0;
} /* PackFile */
/* Use the file name in *fd and get its attributes (size, time etc)
Add these to fd. If it fails, set the ErrorCode in *fd
to an appropriate non-zero value.
*/
STATIC BOOL AddFileAttributes(FILEDETAILS * fd)
{
HANDLE hFile;
BY_HANDLE_FILE_INFORMATION bhfi;
hFile = CreateFile(fd->Path, GENERIC_READ, FILE_SHARE_READ,
NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0);
if (hFile == INVALID_HANDLE_VALUE) {
fd->ErrorCode = SSRESP_CANTOPEN;
return FALSE;
}
/* bug in GetFileInformationByHandle if file not on local
* machine? Avoid it!
*/
bhfi.dwFileAttributes = GetFileAttributes(fd->Path);
if (bhfi.dwFileAttributes == 0xFFFFFFFF) {
fd->ErrorCode = SSRESP_NOATTRIBS;
CloseHandle(hFile);
return FALSE;
}
if (!GetFileTime(hFile, &bhfi.ftCreationTime,
&bhfi.ftLastAccessTime, &bhfi.ftLastWriteTime)){
fd->ErrorCode = SSRESP_NOATTRIBS;
dprintf1(("Can't get file attributes for %s\n"
, (fd->Path?fd->Path : "NULL")));
CloseHandle(hFile);
return FALSE;
}
CloseHandle(hFile);
{
LONG err;
fd->Checksum = checksum_file(fd->Path, &err);
if (err!=0) {
fd->ErrorCode = SSRESP_CANTOPEN;
return FALSE;
}
}
fd->ft_lastwrite = bhfi.ftLastWriteTime;
fd->ft_lastaccess = bhfi.ftLastAccessTime;
fd->ft_create = bhfi.ftCreationTime;
fd->SizeHi = bhfi.nFileSizeHigh;
fd->SizeLo = bhfi.nFileSizeLow;
fd->fileattribs = bhfi.dwFileAttributes;
return TRUE;
} /* AddFileAttributes */
/* Dequeue elements from Queue, Create on the output queue a SSNEWRESP
followed by 1 or more SSNEWPACK structures, the last of which will be
shorter than full length (zero length data if need be) to mark end-of-file.
Files with errors already get zero SSNEWPACKs but bad code in SSNEWRESP.
The output queue is the instance data of Queue.
*/
STATIC int ReadInFile(QUEUE Queue)
{ FILEDETAILS fd; /* The queue element processed */
QUEUE OutQueue;
HANDLE hFile; /* The packed file */
SSNEWPACK Pack; /* output message */
BOOL ShortBlockSent; /* no need to send another SSNEWPACK
Client knows the file has ended */
BOOL Aborting = FALSE; /* Input has been aborted. e.g. because output sick */
dprintf1(("File reader starting \n"));
OutQueue = (QUEUE)Queue_GetInstanceData(Queue);
for (; ; ) /* for each file */
{ int rc; /* return code from Queue_Get */
rc = Queue_Get(Queue, (LPBYTE)&fd, sizeof(fd));
if (rc==STOPTHREAD || rc==ENDQUEUE) {
if (!Aborting) {
/* Enqueue a No More Files response */
SSNEWRESP resp;
resp.lVersion = SS_VERSION;
resp.lResponse = LRESPONSE;
resp.lCode = SSRESP_END;
if (!Queue_Put( OutQueue, (LPBYTE)&resp , RESPHEADSIZE)) {
dprintf1(("Failed to Put SSRESP_END on SendQueue\n"));
}
//// dprintf1(( "Qued SSRESP_END: %x %x %x %x...\n"
//// , resp.lVersion, resp.lResponse, resp.lCode, resp.ulSize));
}
if (rc==ENDQUEUE)
Queue_Destroy(OutQueue);
dprintf1(("File reader ending\n"));
ExitThread(0);
}
else if (rc<0){
dprintf1(("ReadIn aborting. Bad return code %d from Queue_Get.\n", rc));
if (Aborting) break; /* All gone wrong. Just quit! */
Queue_Abort(Queue, PurgePackedFiles);
CloseHandle(hFile);
Aborting = TRUE;
continue; /* next Get gets STOPTHREAD */
}
//dprintf1(( "Reading file '%s' Error code %d\n"
// , (fd.TempName?fd.TempName:"NULL"), fd.ErrorCode
// ));
if (fd.ErrorCode==0) {
/* open temp (compressed) file */
hFile = CreateFile(fd.TempName, GENERIC_READ, 0, NULL,
OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0);
if (hFile == INVALID_HANDLE_VALUE) {
/* report that we could not read the file */
fd.ErrorCode = SSRESP_NOREADCOMP;
dprintf1(( "Couldn't open compressed file for %s %s\n"
, fd.Path, fd.TempName));
}
}
if ( fd.ErrorCode==SSRESP_COMPRESSFAIL
|| fd.ErrorCode==SSRESP_NOREADCOMP
|| fd.ErrorCode==SSRESP_NOTEMPPATH
|| fd.ErrorCode==SSRESP_COMPRESSEXCEPT
) {
/* open original uncompressed file */
hFile = CreateFile(fd.Path, GENERIC_READ, 0, NULL,
OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0);
if (hFile == INVALID_HANDLE_VALUE) {
/* report that we could not read the file */
fd.ErrorCode = SSRESP_NOREAD;
dprintf1(( "Couldn't open file %s \n", fd.Path));
}
}
/* Put the file name etc on the output queue as a SSNEWRESP */
{ SSNEWRESP resp;
LPSTR LocalName;
resp.lVersion = SS_VERSION;
resp.lResponse = LRESPONSE;
resp.lCode = (fd.ErrorCode ? fd.ErrorCode: SSRESP_FILE);
resp.ulSize = fd.SizeLo; /* file size <= 4GB !!! */
resp.fileattribs = fd.fileattribs;
resp.ft_create = fd.ft_create;
resp.ft_lastwrite = fd.ft_lastwrite;
resp.ft_lastaccess = fd.ft_lastaccess;
resp.ulSum = fd.Checksum;
resp.bSumValid = FALSE;
strcpy(resp.szFile, fd.Path);
LocalName = resp.szFile+strlen(resp.szFile)+1;
strcpy(LocalName, fd.LocalName);
if(!Queue_Put( OutQueue, (LPBYTE)&resp
, RESPHEADSIZE + strlen(resp.szFile)
+strlen(LocalName)+2)
) {
dprintf1(("Put to SendQueue failed.\n"));
Queue_Abort(Queue, PurgePackedFiles);
Aborting = TRUE;
CloseHandle(hFile);
continue; /* next Get gets STOPTHREAD */
}
// dprintf1(( "Qued SSRESP_FILE: %x %x %x %x...\n"
// , resp.lVersion, resp.lResponse, resp.lCode, resp.ulSize));
}
Pack.lSequence = 0;
/* Loop reading blocks of the file and queueing them
Set fd.ErrorCode for failures.
I'm worried about file systems that give me short blocks in the
middles of files!!!
*/
ShortBlockSent = FALSE;
if ( fd.ErrorCode==SSRESP_COMPRESSFAIL
|| fd.ErrorCode==SSRESP_NOREADCOMP
|| fd.ErrorCode==SSRESP_NOTEMPPATH
|| fd.ErrorCode==SSRESP_COMPRESSEXCEPT
|| fd.ErrorCode==0
) {
for(;;) /* for each block */
{
DWORD ActSize; /* bytes read */
if( !ReadFile( hFile, &(Pack.Data), sizeof(Pack.Data)
, &ActSize, NULL) ) {
/* error reading temp file. */
if (ShortBlockSent) {
/* Fine. End reached */
/* Should check error was end of file !!! */
CloseHandle(hFile);
break; /* blocks loop */
}
dprintf1(( "Error reading temp file %s.\n"
, (fd.TempName?fd.TempName:"NULL")));
CloseHandle(hFile);
dprintf1(("deleting bad file: %s\n", fd.TempName));
DeleteFile(fd.TempName);
Pack.ulSize = (ULONG)(-2); /* tell client */
break; /* blocks loop */
}
else if (ActSize > sizeof(Pack.Data)) {
dprintf1(( "!!? Read too long! %d %d\n"
, ActSize, sizeof(Pack.Data)));
Pack.ulSize = (ULONG)(-1); /* tell client */
}
else Pack.ulSize = ActSize;
if (ActSize==0 && ShortBlockSent) {
/* This is normal! */
CloseHandle(hFile);
break;
}
else ++Pack.lSequence;
Pack.lPacket = LPACKET;
Pack.lVersion = SS_VERSION;
Pack.ulSum = 0;
//////////////////// Pack.ulSum = ss_checksum_block(Pack.Data, ActSize); ///////////
if(!Queue_Put( OutQueue, (LPBYTE)&Pack
, PACKHEADSIZE+ActSize)){
dprintf1(("Put to SendQueue failed.\n"));
Queue_Abort(Queue, PurgePackedFiles);
CloseHandle(hFile);
Aborting = TRUE;
break; /* from blocks loop */
}
// dprintf1(( "Qued SSNEWPACK: %x %x %x %x %x...\n"
// , Pack.lVersion, Pack.lPacket, Pack.lSequence, Pack.ulSize
// , Pack.ulSum));
if (ActSize<PACKDATALENGTH) { /* Success. Finished */
ShortBlockSent = TRUE;
}
}
} /* blocks */
/* The data is all in storage now. Delete the temp file
If there was no temp file (due to error) this still should be harmless.
*/
#ifndef LAURIE
DeleteFile(fd.TempName);
#endif // LAURIE
// dprintf1(("deleting file: %s\n", fd.TempName));
} /* files */
return 0;
} /* ReadInFile */
/* Dequeue elements from Queue, send them down the pipe whose
handle is the instance data of Queue.
On error Abort Queue.
*/
STATIC int SendData(QUEUE Queue)
{
SSNEWPACK ssp; /* relies on this being no shorter than a SSRESP */
#ifdef SOCKETS
SOCKET OutPipe;
#else
HANDLE OutPipe;
#endif /* SOCKETS */
BOOL Aborting = FALSE; /* TRUE means input has been aborted */
dprintf1(("File sender starting \n"));
if (!SetThreadPriority(GetCurrentThread(),THREAD_PRIORITY_HIGHEST))
dprintf1(("Failed to set thread priority\n"));
#ifdef SOCKETS
OutPipe = (SOCKET)Queue_GetInstanceData(Queue);
#else
OutPipe = (HANDLE)Queue_GetInstanceData(Queue);
#endif
try{
for (; ; ) {
int rc; /* return code of Queue_Get */
rc = Queue_Get(Queue, (LPBYTE)&ssp, sizeof(ssp));
if (rc==STOPTHREAD || rc==ENDQUEUE)
{
break;
}
else if (rc<0) {
dprintf1(("Send thread aborting. Bad rc %d from Get_Queue.\n", rc));
if (Aborting) break; /* All gone wrong. Just quit! */
Queue_Abort(Queue, NULL);
Aborting = TRUE;
continue; /* next Queue_Get destroys Queue */
}
// // { ULONG Sum;
// // if (ssp.lPacket==LPACKET) {
// // if (ssp.ulSum != (Sum =ss_checksum_block(ssp.Data, ssp.ulSize))) {
// // dprintf1(( "!!Checksum error at send. Was %x should be %x\n"
// // , Sum, ssp.ulSum));
// // }
// // }
// // }
#ifdef SOCKETS
if(SOCKET_ERROR != send(OutPipe, (char far *)&ssp, ssp.ulSize+PACKHEADSIZE, 0) )
#else
if (!ss_sendblock(OutPipe, (PSTR) &ssp, rc))
#endif /* SOCKETS */
{
dprintf1(("Connection on pipe %x lost during send\n", OutPipe));
Queue_Abort(Queue, NULL);
Aborting = TRUE;
continue; /* next Queue_Get destroys Queue */
}
////dprintf1(( "Sent %x %x %x %x %x...\n"
//// , ssp.lVersion, ssp.lPacket, ssp.lSequence, ssp.ulSize, ssp.ulSum));
} /* packets */
}
finally{
/* kill the data pipe cleanly */
#ifndef SOCKETS
FlushFileBuffers(OutPipe);
DisconnectNamedPipe(OutPipe);
#endif /* NOT SOCKETS */
CLOSEHANDLE(OutPipe);
dprintf1(("Data send thread ending.\n"));
}
return 0; /* exit thread */
} /* SendData */
/* This gets called once for every FILEDETAILS on the ReadInQueue
to delete the temp files.
*/
STATIC void PurgePackedFiles(PSTR Ptr, int Len)
{ FILEDETAILS * pfd;
pfd = (FILEDETAILS *)Ptr;
// dprintf1(("purging file: %s\n", pfd->TempName));
DeleteFile(pfd->TempName);
} /* PurgePackedFiles */
#if 0
/* produce a checksum of a block of data.
*
* This is undoubtedly a good checksum algorithm, but it's also compute bound.
* For version 1 we turn it off. If we decide in version 2 to turn it back
* on again then we will use a faster algorithm (e.g. the one used to checksum
* a whole file.
*
* Generate checksum by the formula
* checksum = SUM( rnd(i)*(1+byte[i]) )
* where byte[i] is the i-th byte in the file, counting from 1
* rnd(x) is a pseudo-random number generated from the seed x.
*
* Adding 1 to byte ensures that all null bytes contribute, rather than
* being ignored. Multiplying each such byte by a pseudo-random
* function of its position ensures that "anagrams" of each other come
* to different sums. The pseudorandom function chosen is successive
* powers of 1664525 modulo 2**32. 1664525 is a magic number taken
* from Donald Knuth's "The Art Of Computer Programming"
*/
ULONG
ss_checksum_block(PSTR block, int size)
{
unsigned long lCheckSum = 0; /* grows into the checksum */
const unsigned long lSeed = 1664525; /* seed for random Knuth */
unsigned long lRand = 1; /* seed**n */
unsigned long lIndex = 1; /* byte number in block */
unsigned Byte; /* next byte to process in buffer */
unsigned length; /* unsigned copy of size */
length = size;
for (Byte = 0; Byte < length ;++Byte, ++lIndex) {
lRand = lRand*lSeed;
lCheckSum += lIndex*(1+block[Byte])*lRand;
}
return(lCheckSum);
} /* ss_checksum_block */
#endif