Leaked source code of windows server 2003
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

599 lines
12 KiB

  1. //------------------------------------------------------------------------------
  2. // File: PullPin.cpp
  3. //
  4. // Desc: DirectShow base classes - implements CPullPin class that pulls data
  5. // from IAsyncReader.
  6. //
  7. // Copyright (c) 1992-2001 Microsoft Corporation. All rights reserved.
  8. //------------------------------------------------------------------------------
  9. #include <streams.h>
  10. #include "pullpin.h"
  11. //@@BEGIN_MSINTERNAL
  12. #ifdef DXMPERF
  13. #include "dxmperf.h"
  14. #endif // DXMPERF
  15. //@@END_MSINTERNAL
  16. CPullPin::CPullPin()
  17. : m_pReader(NULL),
  18. m_pAlloc(NULL),
  19. m_State(TM_Exit)
  20. {
  21. //@@BEGIN_MSINTERNAL
  22. #ifdef DXMPERF
  23. PERFLOG_CTOR( L"CPullPin", this );
  24. #endif // DXMPERF
  25. //@@END_MSINTERNAL
  26. }
  27. CPullPin::~CPullPin()
  28. {
  29. Disconnect();
  30. //@@BEGIN_MSINTERNAL
  31. #ifdef DXMPERF
  32. PERFLOG_DTOR( L"CPullPin", this );
  33. #endif // DXMPERF
  34. //@@END_MSINTERNAL
  35. }
  36. // returns S_OK if successfully connected to an IAsyncReader interface
  37. // from this object
  38. // Optional allocator should be proposed as a preferred allocator if
  39. // necessary
  40. HRESULT
  41. CPullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync)
  42. {
  43. CAutoLock lock(&m_AccessLock);
  44. if (m_pReader) {
  45. return VFW_E_ALREADY_CONNECTED;
  46. }
  47. HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader);
  48. if (FAILED(hr)) {
  49. //@@BEGIN_MSINTERNAL
  50. #ifdef DXMPERF
  51. {
  52. AM_MEDIA_TYPE * pmt = NULL;
  53. PERFLOG_CONNECT( this, pUnk, hr, pmt );
  54. }
  55. #endif // DXMPERF
  56. //@@END_MSINTERNAL
  57. return(hr);
  58. }
  59. hr = DecideAllocator(pAlloc, NULL);
  60. if (FAILED(hr)) {
  61. Disconnect();
  62. //@@BEGIN_MSINTERNAL
  63. #ifdef DXMPERF
  64. {
  65. AM_MEDIA_TYPE * pmt = NULL;
  66. PERFLOG_CONNECT( this, pUnk, hr, pmt );
  67. }
  68. #endif // DXMPERF
  69. //@@END_MSINTERNAL
  70. return hr;
  71. }
  72. LONGLONG llTotal, llAvail;
  73. hr = m_pReader->Length(&llTotal, &llAvail);
  74. if (FAILED(hr)) {
  75. Disconnect();
  76. //@@BEGIN_MSINTERNAL
  77. #ifdef DXMPERF
  78. {
  79. AM_MEDIA_TYPE * pmt = NULL;
  80. PERFLOG_CONNECT( this, pUnk, hr, pmt );
  81. }
  82. #endif
  83. //@@END_MSINTERNAL
  84. return hr;
  85. }
  86. // convert from file position to reference time
  87. m_tDuration = llTotal * UNITS;
  88. m_tStop = m_tDuration;
  89. m_tStart = 0;
  90. m_bSync = bSync;
  91. //@@BEGIN_MSINTERNAL
  92. #ifdef DXMPERF
  93. {
  94. AM_MEDIA_TYPE * pmt = NULL;
  95. PERFLOG_CONNECT( this, pUnk, S_OK, pmt );
  96. }
  97. #endif // DXMPERF
  98. //@@END_MSINTERNAL
  99. return S_OK;
  100. }
  101. // disconnect any connection made in Connect
  102. HRESULT
  103. CPullPin::Disconnect()
  104. {
  105. CAutoLock lock(&m_AccessLock);
  106. StopThread();
  107. //@@BEGIN_MSINTERNAL
  108. #ifdef DXMPERF
  109. PERFLOG_DISCONNECT( this, m_pReader, S_OK );
  110. #endif // DXMPERF
  111. //@@END_MSINTERNAL
  112. if (m_pReader) {
  113. m_pReader->Release();
  114. m_pReader = NULL;
  115. }
  116. if (m_pAlloc) {
  117. m_pAlloc->Release();
  118. m_pAlloc = NULL;
  119. }
  120. return S_OK;
  121. }
  122. // agree an allocator using RequestAllocator - optional
  123. // props param specifies your requirements (non-zero fields).
  124. // returns an error code if fail to match requirements.
  125. // optional IMemAllocator interface is offered as a preferred allocator
  126. // but no error occurs if it can't be met.
  127. HRESULT
  128. CPullPin::DecideAllocator(
  129. IMemAllocator * pAlloc,
  130. ALLOCATOR_PROPERTIES * pProps)
  131. {
  132. ALLOCATOR_PROPERTIES *pRequest;
  133. ALLOCATOR_PROPERTIES Request;
  134. if (pProps == NULL) {
  135. Request.cBuffers = 3;
  136. Request.cbBuffer = 64*1024;
  137. Request.cbAlign = 0;
  138. Request.cbPrefix = 0;
  139. pRequest = &Request;
  140. } else {
  141. pRequest = pProps;
  142. }
  143. HRESULT hr = m_pReader->RequestAllocator(
  144. pAlloc,
  145. pRequest,
  146. &m_pAlloc);
  147. return hr;
  148. }
  149. // start pulling data
  150. HRESULT
  151. CPullPin::Active(void)
  152. {
  153. ASSERT(!ThreadExists());
  154. return StartThread();
  155. }
  156. // stop pulling data
  157. HRESULT
  158. CPullPin::Inactive(void)
  159. {
  160. StopThread();
  161. return S_OK;
  162. }
  163. HRESULT
  164. CPullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop)
  165. {
  166. CAutoLock lock(&m_AccessLock);
  167. ThreadMsg AtStart = m_State;
  168. if (AtStart == TM_Start) {
  169. BeginFlush();
  170. PauseThread();
  171. EndFlush();
  172. }
  173. m_tStart = tStart;
  174. m_tStop = tStop;
  175. HRESULT hr = S_OK;
  176. if (AtStart == TM_Start) {
  177. hr = StartThread();
  178. }
  179. return hr;
  180. }
  181. HRESULT
  182. CPullPin::Duration(REFERENCE_TIME* ptDuration)
  183. {
  184. *ptDuration = m_tDuration;
  185. return S_OK;
  186. }
  187. HRESULT
  188. CPullPin::StartThread()
  189. {
  190. CAutoLock lock(&m_AccessLock);
  191. if (!m_pAlloc || !m_pReader) {
  192. return E_UNEXPECTED;
  193. }
  194. HRESULT hr;
  195. if (!ThreadExists()) {
  196. // commit allocator
  197. hr = m_pAlloc->Commit();
  198. if (FAILED(hr)) {
  199. return hr;
  200. }
  201. // start thread
  202. if (!Create()) {
  203. return E_FAIL;
  204. }
  205. }
  206. m_State = TM_Start;
  207. hr = (HRESULT) CallWorker(m_State);
  208. return hr;
  209. }
  210. HRESULT
  211. CPullPin::PauseThread()
  212. {
  213. CAutoLock lock(&m_AccessLock);
  214. if (!ThreadExists()) {
  215. return E_UNEXPECTED;
  216. }
  217. // need to flush to ensure the thread is not blocked
  218. // in WaitForNext
  219. HRESULT hr = m_pReader->BeginFlush();
  220. if (FAILED(hr)) {
  221. return hr;
  222. }
  223. m_State = TM_Pause;
  224. hr = CallWorker(TM_Pause);
  225. m_pReader->EndFlush();
  226. return hr;
  227. }
  228. HRESULT
  229. CPullPin::StopThread()
  230. {
  231. CAutoLock lock(&m_AccessLock);
  232. if (!ThreadExists()) {
  233. return S_FALSE;
  234. }
  235. // need to flush to ensure the thread is not blocked
  236. // in WaitForNext
  237. HRESULT hr = m_pReader->BeginFlush();
  238. if (FAILED(hr)) {
  239. return hr;
  240. }
  241. m_State = TM_Exit;
  242. hr = CallWorker(TM_Exit);
  243. m_pReader->EndFlush();
  244. // wait for thread to completely exit
  245. Close();
  246. // decommit allocator
  247. if (m_pAlloc) {
  248. m_pAlloc->Decommit();
  249. }
  250. return S_OK;
  251. }
  252. DWORD
  253. CPullPin::ThreadProc(void)
  254. {
  255. while(1) {
  256. DWORD cmd = GetRequest();
  257. switch(cmd) {
  258. case TM_Exit:
  259. Reply(S_OK);
  260. return 0;
  261. case TM_Pause:
  262. // we are paused already
  263. Reply(S_OK);
  264. break;
  265. case TM_Start:
  266. Reply(S_OK);
  267. Process();
  268. break;
  269. }
  270. // at this point, there should be no outstanding requests on the
  271. // upstream filter.
  272. // We should force begin/endflush to ensure that this is true.
  273. // !!!Note that we may currently be inside a BeginFlush/EndFlush pair
  274. // on another thread, but the premature EndFlush will do no harm now
  275. // that we are idle.
  276. m_pReader->BeginFlush();
  277. CleanupCancelled();
  278. m_pReader->EndFlush();
  279. }
  280. }
  281. HRESULT
  282. CPullPin::QueueSample(
  283. REFERENCE_TIME& tCurrent,
  284. REFERENCE_TIME tAlignStop,
  285. BOOL bDiscontinuity
  286. )
  287. {
  288. IMediaSample* pSample;
  289. HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
  290. if (FAILED(hr)) {
  291. return hr;
  292. }
  293. LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
  294. if (tStopThis > tAlignStop) {
  295. tStopThis = tAlignStop;
  296. }
  297. pSample->SetTime(&tCurrent, &tStopThis);
  298. tCurrent = tStopThis;
  299. pSample->SetDiscontinuity(bDiscontinuity);
  300. hr = m_pReader->Request(
  301. pSample,
  302. 0);
  303. if (FAILED(hr)) {
  304. pSample->Release();
  305. CleanupCancelled();
  306. OnError(hr);
  307. }
  308. return hr;
  309. }
  310. HRESULT
  311. CPullPin::CollectAndDeliver(
  312. REFERENCE_TIME tStart,
  313. REFERENCE_TIME tStop)
  314. {
  315. IMediaSample* pSample = NULL; // better be sure pSample is set
  316. DWORD_PTR dwUnused;
  317. HRESULT hr = m_pReader->WaitForNext(
  318. INFINITE,
  319. &pSample,
  320. &dwUnused);
  321. if (FAILED(hr)) {
  322. if (pSample) {
  323. pSample->Release();
  324. }
  325. } else {
  326. hr = DeliverSample(pSample, tStart, tStop);
  327. }
  328. if (FAILED(hr)) {
  329. CleanupCancelled();
  330. OnError(hr);
  331. }
  332. return hr;
  333. }
  334. HRESULT
  335. CPullPin::DeliverSample(
  336. IMediaSample* pSample,
  337. REFERENCE_TIME tStart,
  338. REFERENCE_TIME tStop
  339. )
  340. {
  341. // fix up sample if past actual stop (for sector alignment)
  342. REFERENCE_TIME t1, t2;
  343. pSample->GetTime(&t1, &t2);
  344. if (t2 > tStop) {
  345. t2 = tStop;
  346. }
  347. // adjust times to be relative to (aligned) start time
  348. t1 -= tStart;
  349. t2 -= tStart;
  350. pSample->SetTime(&t1, &t2);
  351. //@@BEGIN_MSINTERNAL
  352. #ifdef DXMPERF
  353. {
  354. AM_MEDIA_TYPE * pmt = NULL;
  355. pSample->GetMediaType( &pmt );
  356. PERFLOG_RECEIVE( L"CPullPin", m_pReader, this, pSample, pmt );
  357. }
  358. #endif
  359. //@@END_MSINTERNAL
  360. HRESULT hr = Receive(pSample);
  361. pSample->Release();
  362. return hr;
  363. }
  364. void
  365. CPullPin::Process(void)
  366. {
  367. // is there anything to do?
  368. if (m_tStop <= m_tStart) {
  369. EndOfStream();
  370. return;
  371. }
  372. BOOL bDiscontinuity = TRUE;
  373. // if there is more than one sample at the allocator,
  374. // then try to queue 2 at once in order to overlap.
  375. // -- get buffer count and required alignment
  376. ALLOCATOR_PROPERTIES Actual;
  377. HRESULT hr = m_pAlloc->GetProperties(&Actual);
  378. // align the start position downwards
  379. REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS;
  380. REFERENCE_TIME tCurrent = tStart;
  381. REFERENCE_TIME tStop = m_tStop;
  382. if (tStop > m_tDuration) {
  383. tStop = m_tDuration;
  384. }
  385. // align the stop position - may be past stop, but that
  386. // doesn't matter
  387. REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS;
  388. DWORD dwRequest;
  389. if (!m_bSync) {
  390. // Break out of the loop either if we get to the end or we're asked
  391. // to do something else
  392. while (tCurrent < tAlignStop) {
  393. // Break out without calling EndOfStream if we're asked to
  394. // do something different
  395. if (CheckRequest(&dwRequest)) {
  396. return;
  397. }
  398. // queue a first sample
  399. if (Actual.cBuffers > 1) {
  400. hr = QueueSample(tCurrent, tAlignStop, TRUE);
  401. bDiscontinuity = FALSE;
  402. if (FAILED(hr)) {
  403. return;
  404. }
  405. }
  406. // loop queueing second and waiting for first..
  407. while (tCurrent < tAlignStop) {
  408. hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity);
  409. bDiscontinuity = FALSE;
  410. if (FAILED(hr)) {
  411. return;
  412. }
  413. hr = CollectAndDeliver(tStart, tStop);
  414. if (S_OK != hr) {
  415. // stop if error, or if downstream filter said
  416. // to stop.
  417. return;
  418. }
  419. }
  420. if (Actual.cBuffers > 1) {
  421. hr = CollectAndDeliver(tStart, tStop);
  422. if (FAILED(hr)) {
  423. return;
  424. }
  425. }
  426. }
  427. } else {
  428. // sync version of above loop
  429. while (tCurrent < tAlignStop) {
  430. // Break out without calling EndOfStream if we're asked to
  431. // do something different
  432. if (CheckRequest(&dwRequest)) {
  433. return;
  434. }
  435. IMediaSample* pSample;
  436. hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
  437. if (FAILED(hr)) {
  438. OnError(hr);
  439. return;
  440. }
  441. LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
  442. if (tStopThis > tAlignStop) {
  443. tStopThis = tAlignStop;
  444. }
  445. pSample->SetTime(&tCurrent, &tStopThis);
  446. tCurrent = tStopThis;
  447. if (bDiscontinuity) {
  448. pSample->SetDiscontinuity(TRUE);
  449. bDiscontinuity = FALSE;
  450. }
  451. hr = m_pReader->SyncReadAligned(pSample);
  452. if (FAILED(hr)) {
  453. pSample->Release();
  454. OnError(hr);
  455. return;
  456. }
  457. hr = DeliverSample(pSample, tStart, tStop);
  458. if (hr != S_OK) {
  459. if (FAILED(hr)) {
  460. OnError(hr);
  461. }
  462. return;
  463. }
  464. }
  465. }
  466. EndOfStream();
  467. }
  468. // after a flush, cancelled i/o will be waiting for collection
  469. // and release
  470. void
  471. CPullPin::CleanupCancelled(void)
  472. {
  473. while (1) {
  474. IMediaSample * pSample;
  475. DWORD_PTR dwUnused;
  476. HRESULT hr = m_pReader->WaitForNext(
  477. 0, // no wait
  478. &pSample,
  479. &dwUnused);
  480. if(pSample) {
  481. pSample->Release();
  482. } else {
  483. // no more samples
  484. return;
  485. }
  486. }
  487. }