|
|
//------------------------------------------------------------------------------
// File: PullPin.cpp
//
// Desc: DirectShow base classes - implements CPullPin class that pulls data
// from IAsyncReader.
//
// Copyright (c) 1992-2001 Microsoft Corporation. All rights reserved.
//------------------------------------------------------------------------------
#include <streams.h>
#include "pullpin.h"
//@@BEGIN_MSINTERNAL
#ifdef DXMPERF
#include "dxmperf.h"
#endif // DXMPERF
//@@END_MSINTERNAL
CPullPin::CPullPin() : m_pReader(NULL), m_pAlloc(NULL), m_State(TM_Exit) { //@@BEGIN_MSINTERNAL
#ifdef DXMPERF
PERFLOG_CTOR( L"CPullPin", this ); #endif // DXMPERF
//@@END_MSINTERNAL
}
CPullPin::~CPullPin() { Disconnect();
//@@BEGIN_MSINTERNAL
#ifdef DXMPERF
PERFLOG_DTOR( L"CPullPin", this ); #endif // DXMPERF
//@@END_MSINTERNAL
}
// returns S_OK if successfully connected to an IAsyncReader interface
// from this object
// Optional allocator should be proposed as a preferred allocator if
// necessary
HRESULT CPullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync) { CAutoLock lock(&m_AccessLock);
if (m_pReader) { return VFW_E_ALREADY_CONNECTED; }
HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader); if (FAILED(hr)) {
//@@BEGIN_MSINTERNAL
#ifdef DXMPERF
{ AM_MEDIA_TYPE * pmt = NULL; PERFLOG_CONNECT( this, pUnk, hr, pmt ); } #endif // DXMPERF
//@@END_MSINTERNAL
return(hr); }
hr = DecideAllocator(pAlloc, NULL); if (FAILED(hr)) { Disconnect();
//@@BEGIN_MSINTERNAL
#ifdef DXMPERF
{ AM_MEDIA_TYPE * pmt = NULL; PERFLOG_CONNECT( this, pUnk, hr, pmt ); } #endif // DXMPERF
//@@END_MSINTERNAL
return hr; }
LONGLONG llTotal, llAvail; hr = m_pReader->Length(&llTotal, &llAvail); if (FAILED(hr)) { Disconnect();
//@@BEGIN_MSINTERNAL
#ifdef DXMPERF
{ AM_MEDIA_TYPE * pmt = NULL; PERFLOG_CONNECT( this, pUnk, hr, pmt ); } #endif
//@@END_MSINTERNAL
return hr; }
// convert from file position to reference time
m_tDuration = llTotal * UNITS; m_tStop = m_tDuration; m_tStart = 0;
m_bSync = bSync;
//@@BEGIN_MSINTERNAL
#ifdef DXMPERF
{ AM_MEDIA_TYPE * pmt = NULL; PERFLOG_CONNECT( this, pUnk, S_OK, pmt ); } #endif // DXMPERF
//@@END_MSINTERNAL
return S_OK; }
// disconnect any connection made in Connect
HRESULT CPullPin::Disconnect() { CAutoLock lock(&m_AccessLock);
StopThread();
//@@BEGIN_MSINTERNAL
#ifdef DXMPERF
PERFLOG_DISCONNECT( this, m_pReader, S_OK ); #endif // DXMPERF
//@@END_MSINTERNAL
if (m_pReader) { m_pReader->Release(); m_pReader = NULL; }
if (m_pAlloc) { m_pAlloc->Release(); m_pAlloc = NULL; }
return S_OK; }
// agree an allocator using RequestAllocator - optional
// props param specifies your requirements (non-zero fields).
// returns an error code if fail to match requirements.
// optional IMemAllocator interface is offered as a preferred allocator
// but no error occurs if it can't be met.
HRESULT CPullPin::DecideAllocator( IMemAllocator * pAlloc, ALLOCATOR_PROPERTIES * pProps) { ALLOCATOR_PROPERTIES *pRequest; ALLOCATOR_PROPERTIES Request; if (pProps == NULL) { Request.cBuffers = 3; Request.cbBuffer = 64*1024; Request.cbAlign = 0; Request.cbPrefix = 0; pRequest = &Request; } else { pRequest = pProps; } HRESULT hr = m_pReader->RequestAllocator( pAlloc, pRequest, &m_pAlloc); return hr; }
// start pulling data
HRESULT CPullPin::Active(void) { ASSERT(!ThreadExists()); return StartThread(); }
// stop pulling data
HRESULT CPullPin::Inactive(void) { StopThread();
return S_OK; }
HRESULT CPullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop) { CAutoLock lock(&m_AccessLock);
ThreadMsg AtStart = m_State;
if (AtStart == TM_Start) { BeginFlush(); PauseThread(); EndFlush(); }
m_tStart = tStart; m_tStop = tStop;
HRESULT hr = S_OK; if (AtStart == TM_Start) { hr = StartThread(); }
return hr; }
HRESULT CPullPin::Duration(REFERENCE_TIME* ptDuration) { *ptDuration = m_tDuration; return S_OK; }
HRESULT CPullPin::StartThread() { CAutoLock lock(&m_AccessLock);
if (!m_pAlloc || !m_pReader) { return E_UNEXPECTED; }
HRESULT hr; if (!ThreadExists()) {
// commit allocator
hr = m_pAlloc->Commit(); if (FAILED(hr)) { return hr; }
// start thread
if (!Create()) { return E_FAIL; } }
m_State = TM_Start; hr = (HRESULT) CallWorker(m_State); return hr; }
HRESULT CPullPin::PauseThread() { CAutoLock lock(&m_AccessLock);
if (!ThreadExists()) { return E_UNEXPECTED; }
// need to flush to ensure the thread is not blocked
// in WaitForNext
HRESULT hr = m_pReader->BeginFlush(); if (FAILED(hr)) { return hr; }
m_State = TM_Pause; hr = CallWorker(TM_Pause);
m_pReader->EndFlush(); return hr; }
HRESULT CPullPin::StopThread() { CAutoLock lock(&m_AccessLock);
if (!ThreadExists()) { return S_FALSE; }
// need to flush to ensure the thread is not blocked
// in WaitForNext
HRESULT hr = m_pReader->BeginFlush(); if (FAILED(hr)) { return hr; }
m_State = TM_Exit; hr = CallWorker(TM_Exit);
m_pReader->EndFlush();
// wait for thread to completely exit
Close();
// decommit allocator
if (m_pAlloc) { m_pAlloc->Decommit(); }
return S_OK; }
DWORD CPullPin::ThreadProc(void) { while(1) { DWORD cmd = GetRequest(); switch(cmd) { case TM_Exit: Reply(S_OK); return 0;
case TM_Pause: // we are paused already
Reply(S_OK); break;
case TM_Start: Reply(S_OK); Process(); break; }
// at this point, there should be no outstanding requests on the
// upstream filter.
// We should force begin/endflush to ensure that this is true.
// !!!Note that we may currently be inside a BeginFlush/EndFlush pair
// on another thread, but the premature EndFlush will do no harm now
// that we are idle.
m_pReader->BeginFlush(); CleanupCancelled(); m_pReader->EndFlush(); } }
HRESULT CPullPin::QueueSample( REFERENCE_TIME& tCurrent, REFERENCE_TIME tAlignStop, BOOL bDiscontinuity ) { IMediaSample* pSample;
HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0); if (FAILED(hr)) { return hr; }
LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS); if (tStopThis > tAlignStop) { tStopThis = tAlignStop; } pSample->SetTime(&tCurrent, &tStopThis); tCurrent = tStopThis;
pSample->SetDiscontinuity(bDiscontinuity);
hr = m_pReader->Request( pSample, 0); if (FAILED(hr)) { pSample->Release();
CleanupCancelled(); OnError(hr); } return hr; }
HRESULT CPullPin::CollectAndDeliver( REFERENCE_TIME tStart, REFERENCE_TIME tStop) { IMediaSample* pSample = NULL; // better be sure pSample is set
DWORD_PTR dwUnused; HRESULT hr = m_pReader->WaitForNext( INFINITE, &pSample, &dwUnused); if (FAILED(hr)) { if (pSample) { pSample->Release(); } } else { hr = DeliverSample(pSample, tStart, tStop); } if (FAILED(hr)) { CleanupCancelled(); OnError(hr); } return hr;
}
HRESULT CPullPin::DeliverSample( IMediaSample* pSample, REFERENCE_TIME tStart, REFERENCE_TIME tStop ) { // fix up sample if past actual stop (for sector alignment)
REFERENCE_TIME t1, t2; pSample->GetTime(&t1, &t2); if (t2 > tStop) { t2 = tStop; }
// adjust times to be relative to (aligned) start time
t1 -= tStart; t2 -= tStart; pSample->SetTime(&t1, &t2);
//@@BEGIN_MSINTERNAL
#ifdef DXMPERF
{ AM_MEDIA_TYPE * pmt = NULL; pSample->GetMediaType( &pmt ); PERFLOG_RECEIVE( L"CPullPin", m_pReader, this, pSample, pmt ); } #endif
//@@END_MSINTERNAL
HRESULT hr = Receive(pSample); pSample->Release(); return hr; }
void CPullPin::Process(void) { // is there anything to do?
if (m_tStop <= m_tStart) { EndOfStream(); return; }
BOOL bDiscontinuity = TRUE;
// if there is more than one sample at the allocator,
// then try to queue 2 at once in order to overlap.
// -- get buffer count and required alignment
ALLOCATOR_PROPERTIES Actual; HRESULT hr = m_pAlloc->GetProperties(&Actual);
// align the start position downwards
REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS; REFERENCE_TIME tCurrent = tStart;
REFERENCE_TIME tStop = m_tStop; if (tStop > m_tDuration) { tStop = m_tDuration; }
// align the stop position - may be past stop, but that
// doesn't matter
REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS;
DWORD dwRequest;
if (!m_bSync) {
// Break out of the loop either if we get to the end or we're asked
// to do something else
while (tCurrent < tAlignStop) {
// Break out without calling EndOfStream if we're asked to
// do something different
if (CheckRequest(&dwRequest)) { return; }
// queue a first sample
if (Actual.cBuffers > 1) {
hr = QueueSample(tCurrent, tAlignStop, TRUE); bDiscontinuity = FALSE;
if (FAILED(hr)) { return; } }
// loop queueing second and waiting for first..
while (tCurrent < tAlignStop) {
hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity); bDiscontinuity = FALSE;
if (FAILED(hr)) { return; }
hr = CollectAndDeliver(tStart, tStop); if (S_OK != hr) {
// stop if error, or if downstream filter said
// to stop.
return; } }
if (Actual.cBuffers > 1) { hr = CollectAndDeliver(tStart, tStop); if (FAILED(hr)) { return; } } } } else {
// sync version of above loop
while (tCurrent < tAlignStop) {
// Break out without calling EndOfStream if we're asked to
// do something different
if (CheckRequest(&dwRequest)) { return; }
IMediaSample* pSample;
hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0); if (FAILED(hr)) { OnError(hr); return; }
LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS); if (tStopThis > tAlignStop) { tStopThis = tAlignStop; } pSample->SetTime(&tCurrent, &tStopThis); tCurrent = tStopThis;
if (bDiscontinuity) { pSample->SetDiscontinuity(TRUE); bDiscontinuity = FALSE; }
hr = m_pReader->SyncReadAligned(pSample);
if (FAILED(hr)) { pSample->Release(); OnError(hr); return; }
hr = DeliverSample(pSample, tStart, tStop); if (hr != S_OK) { if (FAILED(hr)) { OnError(hr); } return; } } }
EndOfStream(); }
// after a flush, cancelled i/o will be waiting for collection
// and release
void CPullPin::CleanupCancelled(void) { while (1) { IMediaSample * pSample; DWORD_PTR dwUnused;
HRESULT hr = m_pReader->WaitForNext( 0, // no wait
&pSample, &dwUnused); if(pSample) { pSample->Release(); } else { // no more samples
return; } } }
|