//------------------------------------------------------------------------------ // // Copyright (c) Microsoft Corporation. All rights reserved. // // The use and distribution terms for this software are covered by the // Microsoft Limited Public License (Ms-LPL) // which can be found in the file MS-LPL.txt at the root of this distribution. // By using this software in any fashion, you are agreeing to be bound by // the terms of this license. // // The software is licensed “as-is.” // // You must not remove this notice, or any other, from this software. // // // // Demux code for Windows CE // //------------------------------------------------------------------------- //====================================================================== // Demux code for Windows CE //====================================================================== /*++ Module Name: pin_in.cpp Abstract: This module contains the input pin framework Revision History: 02-Jul-1999 created --*/ #include "precomp.h" #include // for ksmedia.h #include // for bdamedia.h #include // for KSDATAFORMAT_SUBTYPE_BDA_MPEG2_TRANSPORT #include "mp2demux.h" #include "tsstats.h" #include "mp2seek.h" #include "pin_out.h" #include "filter.h" #include "pin_in.h" static HRESULT GetStrideLengths ( IN CMediaType * pmt, OUT int * piPrePacketStride, OUT int * piPostPacketStride ) { HRESULT hr ; MPEG2_TRANSPORT_STRIDE * pMpeg2TransportStride ; ASSERT (piPrePacketStride) ; ASSERT (piPostPacketStride) ; if (pmt -> majortype == MEDIATYPE_Stream && pmt -> subtype == MEDIASUBTYPE_MPEG2_TRANSPORT_STRIDE) { if (pmt -> cbFormat == 0 || pmt -> cbFormat >= sizeof MPEG2_TRANSPORT_STRIDE) { if (pmt -> cbFormat > 0) { // format block given; extract values pMpeg2TransportStride = reinterpret_cast (pmt -> pbFormat) ; (* piPrePacketStride) = pMpeg2TransportStride -> dwOffset ; (* piPostPacketStride) = pMpeg2TransportStride -> dwStride - TS_PACKET_SIZE - (* piPrePacketStride) ; } else { // no format block given; default to 0 (* piPrePacketStride) = 0 ; (* piPostPacketStride) = 0 ; } // set the retval hr = (((* piPrePacketStride) >= 0 && (* piPostPacketStride) >= 0) ? S_OK : E_INVALIDARG) ; } else { // bogus format block: wrong size hr = E_INVALIDARG ; } } else { // stride is currently defined for 1 media type only hr = E_FAIL ; } return hr ; } CMPEG2DemuxInputPin::CMPEG2DemuxInputPin ( IN TCHAR * pObjectName, IN CMPEG2Demultiplexer * pOwningFilter, IN CRefCountedCritSec * pcrtFilterLock, IN CRefCountedCritSec * pcrtSeekingLock, IN CMpeg2Stats * pStats, OUT HRESULT * pHr ) : CBaseInputPin ( pObjectName, pOwningFilter, pcrtFilterLock, pHr, INPUT_PIN_NAME ), m_pPull (NULL), m_cRefcount (1), m_pStats (pStats), m_pcrtSeekingLock (pcrtSeekingLock) { O_TRACE_ENTER_0 (TEXT("CMPEG2DemuxInputPin::CMPEG2DemuxInputPin ()")) ; TRACE_CONSTRUCTOR (TEXT ("CMPEG2DemuxInputPin")) ; ASSERT (m_pcrtSeekingLock) ; m_pcrtSeekingLock -> AddRef () ; // set this pin's reference ASSERT (pcrtFilterLock) ; pcrtFilterLock -> AddRef () ; ASSERT (m_pStats) ; m_pStats -> AddRef () ; #ifdef DEBUG // need to do this because my pin starts with a refcount of 1, whereas all the // base class assumes starting with 0, and explicitely AddRef'ing to pass out // the first reference m_cRef = 1 ; #endif } CMPEG2DemuxInputPin::~CMPEG2DemuxInputPin ( ) { O_TRACE_ENTER_0 (TEXT("CMPEG2DemuxInputPin::~CMPEG2DemuxInputPin ()")) ; TRACE_DESTRUCTOR (TEXT ("CMPEG2DemuxInputPin")) ; delete m_pPull ; // done with the filter lock ASSERT (m_pLock) ; (reinterpret_cast (m_pLock)) -> Release () ; // done with the seeking lock m_pcrtSeekingLock -> Release () ; ASSERT (m_pStats) ; m_pStats -> Release () ; } STDMETHODIMP_(ULONG) CMPEG2DemuxInputPin::NonDelegatingAddRef ( ) { O_TRACE_ENTER_0 (TEXT("CMPEG2DemuxInputPin::NonDelegatingAddRef ()")) ; #ifdef DEBUG // Update the debug only variable maintained by the base class m_cRef++; ASSERT(m_cRef > 0); #endif return InterlockedIncrement (& m_cRefcount) ; } STDMETHODIMP_(ULONG) CMPEG2DemuxInputPin::NonDelegatingRelease() { O_TRACE_ENTER_0 (TEXT("CMPEG2DemuxInputPin::NonDelegatingRelease ()")) ; #ifdef DEBUG // Update the debug only variable in CBasePin m_cRef--; ASSERT(m_cRef >= 0); #endif if (InterlockedDecrement (& m_cRefcount) == 0) { delete this ; return 0 ; } ASSERT (m_cRefcount > 0) ; return m_cRefcount ; } HRESULT CMPEG2DemuxInputPin::CheckMediaType ( IN const CMediaType * pmt ) /*++ Purpose: Called during the connection process to check if the passed media type is supported on this pin. This method is called from the base classes in the following situations: 1. from CBasePin::ReceiveConnection, which is called by the output pin initiating the connection 2. from CBaseInputPin::Receive when the media sample changes format Parameters: pmt CMediaType to check against Return Values: S_OK if the media type is ok XXXX: MPEG2 TS only for now S_FALSE if the media type is not ok --*/ { HRESULT hr ; CAutoLock ca (m_pLock) ; O_TRACE_ENTER_0 (TEXT("CMPEG2DemuxInputPin::CheckMediaType ()")) ; ASSERT (pmt) ; // transport stream if (pmt -> majortype == MEDIATYPE_Stream && ( (pmt -> subtype == MEDIASUBTYPE_MPEG2_TRANSPORT) || // vanilla transport (pmt -> subtype == KSDATAFORMAT_SUBTYPE_BDA_MPEG2_TRANSPORT) || // BDA-transport (for graph building) (pmt -> subtype == MEDIASUBTYPE_MPEG2_TRANSPORT_STRIDE))) { // "strided" transport // commit to transport hr = GetMpeg2DemuxFilter () -> SetStreamType (MPEG2_STREAM_TRANSPORT) ; } // program stream else if (pmt -> majortype == MEDIATYPE_Stream && pmt -> subtype == MEDIASUBTYPE_MPEG2_PROGRAM) { // commit to program hr = GetMpeg2DemuxFilter () -> SetStreamType (MPEG2_STREAM_PROGRAM) ; } // generic streamer doesn't set subtype else if (pmt -> majortype == MEDIATYPE_Stream && pmt -> subtype == GUID_NULL) { // commit to program todo: do nothing, CompleteConnect can still fail... hr = GetMpeg2DemuxFilter () -> SetStreamType (MPEG2_STREAM_PROGRAM) ; } else { hr = S_FALSE ; } return hr ; } HRESULT CMPEG2DemuxInputPin::CompleteConnect ( IN IPin * pIPin ) { HRESULT hr ; IAsyncReader * pIAsyncReader ; int iPrePacketStride ; int iPostPacketStride ; O_TRACE_ENTER_0 (TEXT("CMPEG2DemuxInputPin::CompleteConnect ()")) ; ASSERT (pIPin) ; hr = S_OK ; // if the output pin implements IAsyncReader, we let the pull pin do the rest if (GetMpeg2DemuxFilter () -> CanOperateInPullMode () && SUCCEEDED (pIPin -> QueryInterface (IID_IAsyncReader, (void **) & pIAsyncReader))) { hr = GetMpeg2DemuxFilter () -> InitPullModeStream (pIAsyncReader) ; pIAsyncReader -> Release () ; if (SUCCEEDED (hr)) { DELETE_RESET (m_pPull) ; m_pPull = new CStreamPull (this) ; if (m_pPull != NULL) { hr = m_pPull -> Connect ( pIPin, m_pAllocator, TRUE ) ; } else { TRACE_ERROR_0 (TEXT ("new CStreamPull (this)")) ; hr = E_OUTOFMEMORY ; } } } if (SUCCEEDED (hr)) { // if the connection is transport and of type stride, set the filter // up appropriately if (m_mt.majortype == MEDIATYPE_Stream && m_mt.subtype == MEDIASUBTYPE_MPEG2_TRANSPORT_STRIDE) { hr = GetStrideLengths ( & m_mt, & iPrePacketStride, & iPostPacketStride ) ; if (SUCCEEDED (hr)) { GetMpeg2DemuxFilter () -> SetStrideLengths ( iPrePacketStride, iPostPacketStride ) ; } } } return hr ; } // called by the filter base classes HRESULT CMPEG2DemuxInputPin::Active ( ) { HRESULT hr ; O_TRACE_ENTER_0 (TEXT("CMPEG2DemuxInputPin::Active ()")) ; hr = CBaseInputPin::Active () ; if (SUCCEEDED (hr) && m_pPull != NULL) { return m_pPull -> Active () ; } return hr ; } HRESULT CMPEG2DemuxInputPin::Inactive ( ) { #ifndef UNDER_CE HRESULT hr ; #endif //UNDER_CE O_TRACE_ENTER_0 (TEXT("CMPEG2DemuxInputPin::Inactive ()")) ; if (m_pPull) { m_pPull -> Inactive () ; } return CBaseInputPin::Inactive () ; } STDMETHODIMP CMPEG2DemuxInputPin::Receive ( IN IMediaSample * pIMediaSample ) { HRESULT hr ; ASSERT (pIMediaSample) ; O_TRACE_ENTER_1 (TEXT("CMPEG2DemuxInputPin::Receive (%08xh)"), pIMediaSample) ; // acquire the receiver lock GetMpeg2DemuxFilter () -> LockReceive () ; ASSERT (m_pStats) ; m_pStats -> Global_MediaSampleIn () ; // for now let the base class worry about media type // updates, state confirmation, etc... hr = CBaseInputPin::Receive (pIMediaSample) ; if (SUCCEEDED (hr)) { hr = GetMpeg2DemuxFilter () -> ProcessMediaSampleLocked (pIMediaSample) ; } // release the receiver lock GetMpeg2DemuxFilter () -> UnlockReceive () ; NE_SPEW (hr, S_OK, TEXT ("CBaseInputPin::Receive (pIMediaSample)")) ; return hr ; } STDMETHODIMP CMPEG2DemuxInputPin::BeginFlush ( ) { HRESULT hr ; O_TRACE_ENTER_0 (TEXT("CMPEG2DemuxInputPin::BeginFlush ()")) ; // grab just the filter lock across this call; we want to get this call // downstream, which is impossible if we block on the receiver lock and // the pull pin thread is blocked getting a media sample because the // graph is paused; the graph manager pauses the graph across a // IMediaSeeking::SetPositions() call, which eventually calls through // here GetMpeg2DemuxFilter () -> LockFilter () ; hr = CBaseInputPin::BeginFlush () ; if (SUCCEEDED (hr)) { ASSERT (m_bFlushing) ; GetMpeg2DemuxFilter () -> BeginFlushLocked () ; } GetMpeg2DemuxFilter () -> UnlockFilter () ; return hr ; } STDMETHODIMP CMPEG2DemuxInputPin::EndFlush ( ) { HRESULT hr ; O_TRACE_ENTER_0 (TEXT("CMPEG2DemuxInputPin::EndFlush ()")) ; GetMpeg2DemuxFilter () -> LockReceive () ; GetMpeg2DemuxFilter () -> LockFilter () ; hr = CBaseInputPin::EndFlush () ; if (SUCCEEDED (hr)) { GetMpeg2DemuxFilter () -> EndFlushLocked () ; } GetMpeg2DemuxFilter () -> UnlockFilter () ; GetMpeg2DemuxFilter () -> UnlockReceive () ; return hr ; } HRESULT CMPEG2DemuxInputPin::Seek ( IN REFERENCE_TIME rtStart, // in pull pin time .. IN REFERENCE_TIME rtStop, // in pull pin time .. IN double dPlaybackRate ) { HRESULT hr ; BOOL fRestart ; O_TRACE_ENTER_2 (TEXT("CMPEG2DemuxInputPin::Seek (%I64d, %I64d)"), rtStart, rtStop) ; ASSERT (m_pPull) ; if (!GetMpeg2DemuxFilter () -> IsStopped ()) { // we're active; flush pause prime BeginFlush () ; fRestart = m_pPull -> StartSeek () ; EndFlush () ; } else { fRestart = FALSE ; } // start & stop times ASSERT (rtStart <= rtStop) ; m_pPull -> SetStart (rtStart) ; m_pPull -> SetStop (rtStop) ; // playback rate ASSERT (dPlaybackRate > 0.0) ; GetMpeg2DemuxFilter () -> SetPlaybackRate (dPlaybackRate) ; // go if (fRestart) { hr = Active () ; } else { hr = S_OK ; } return hr ; } HRESULT CMPEG2DemuxInputPin::BreakConnect ( ) { HRESULT hr ; hr = CBaseInputPin::BreakConnect () ; if (SUCCEEDED (hr)) { GetMpeg2DemuxFilter () -> BreakConnectInput () ; } return hr ; } // ---------------------------------------------------------------------------- // ---------------------------------------------------------------------------- CMp2PullPin::CMp2PullPin() : m_pReader(NULL), m_pAlloc(NULL), m_State(TM_Exit) { } CMp2PullPin::~CMp2PullPin() { Disconnect(); } // returns S_OK if successfully connected to an IAsyncReader interface // from this object // Optional allocator should be proposed as a preferred allocator if // necessary HRESULT CMp2PullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync) { CAutoLock lock(&m_AccessLock); if (m_pReader) { return VFW_E_ALREADY_CONNECTED; } HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader); if (FAILED(hr)) { return(hr); } hr = DecideAllocator(pAlloc, NULL); if (FAILED(hr)) { Disconnect(); return hr; } LONGLONG llTotal, llAvail; hr = m_pReader->Length(&llTotal, &llAvail); if (FAILED(hr)) { Disconnect(); return hr; } // convert from file position to reference time m_tDuration = llTotal * UNITS; m_tStop = m_tDuration; m_tStart = 0; m_bSync = bSync; return S_OK; } // disconnect any connection made in Connect HRESULT CMp2PullPin::Disconnect() { CAutoLock lock(&m_AccessLock); StopThread(); if (m_pReader) { m_pReader->Release(); m_pReader = NULL; } if (m_pAlloc) { m_pAlloc->Release(); m_pAlloc = NULL; } return S_OK; } // agree an allocator using RequestAllocator - optional // props param specifies your requirements (non-zero fields). // returns an error code if fail to match requirements. // optional IMemAllocator interface is offered as a preferred allocator // but no error occurs if it can't be met. HRESULT CMp2PullPin::DecideAllocator( IMemAllocator * pAlloc, ALLOCATOR_PROPERTIES * pProps) { ALLOCATOR_PROPERTIES *pRequest; ALLOCATOR_PROPERTIES Request; if (pProps == NULL) { Request.cBuffers = 3; Request.cbBuffer = 64*1024; Request.cbAlign = 0; Request.cbPrefix = 0; pRequest = &Request; } else { pRequest = pProps; } HRESULT hr = m_pReader->RequestAllocator( pAlloc, pRequest, &m_pAlloc); return hr; } // start pulling data HRESULT CMp2PullPin::Active(void) { //ASSERT(!ThreadExists()); -- invalid assert: during seek, the thread will be paused when we enter here return StartThread(); } // stop pulling data HRESULT CMp2PullPin::Inactive(void) { StopThread() ; return S_OK; } HRESULT CMp2PullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop) { CAutoLock lock(&m_AccessLock); ThreadMsg AtStart = m_State; if (AtStart == TM_Start) { BeginFlush(); PauseThread(); EndFlush(); } m_tStart = tStart; m_tStop = tStop; HRESULT hr = S_OK; if (AtStart == TM_Start) { hr = StartThread(); } return hr; } HRESULT CMp2PullPin::Duration(REFERENCE_TIME* ptDuration) { *ptDuration = m_tDuration; return S_OK; } HRESULT CMp2PullPin::StartThread() { CAutoLock lock(&m_AccessLock); if (!m_pAlloc || !m_pReader) { return E_UNEXPECTED; } HRESULT hr; if (!ThreadExists()) { // commit allocator hr = m_pAlloc->Commit(); if (FAILED(hr)) { return hr; } // start thread if (!Create()) { return E_FAIL; } } m_State = TM_Start; hr = (HRESULT) CallWorker(m_State); return hr; } HRESULT CMp2PullPin::PauseThread() { CAutoLock lock(&m_AccessLock); if (!ThreadExists()) { return E_UNEXPECTED; } // need to flush to ensure the thread is not blocked // in WaitForNext HRESULT hr = m_pReader->BeginFlush(); if (FAILED(hr)) { return hr; } m_State = TM_Pause; hr = CallWorker(TM_Pause); m_pReader->EndFlush(); return hr; } HRESULT CMp2PullPin::StopThread() { CAutoLock lock(&m_AccessLock); if (!ThreadExists()) { return S_FALSE; } // need to flush to ensure the thread is not blocked // in WaitForNext HRESULT hr = m_pReader->BeginFlush(); if (FAILED(hr)) { return hr; } m_State = TM_Exit; hr = CallWorker(TM_Exit); m_pReader->EndFlush(); // wait for thread to completely exit Close(); // decommit allocator if (m_pAlloc) { m_pAlloc->Decommit(); } return S_OK; } DWORD CMp2PullPin::ThreadProc(void) { while(1) { DWORD cmd = GetRequest(); switch(cmd) { case TM_Exit: Reply(S_OK); return 0; case TM_Pause: // we are paused already Reply(S_OK); break; case TM_Start: Reply(S_OK); Process(); break; } // at this point, there should be no outstanding requests on the // upstream filter. // We should force begin/endflush to ensure that this is true. // !!!Note that we may currently be inside a BeginFlush/EndFlush pair // on another thread, but the premature EndFlush will do no harm now // that we are idle. m_pReader->BeginFlush(); CleanupCancelled(); m_pReader->EndFlush(); } } HRESULT CMp2PullPin::QueueSample( REFERENCE_TIME& tCurrent, REFERENCE_TIME tAlignStop, BOOL bDiscontinuity ) { IMediaSample* pSample; HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0); if (FAILED(hr)) { return hr; } LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS); if (tStopThis > tAlignStop) { tStopThis = tAlignStop; } pSample->SetTime(&tCurrent, &tStopThis); tCurrent = tStopThis; pSample->SetDiscontinuity(bDiscontinuity); hr = m_pReader->Request( pSample, 0); if (FAILED(hr)) { pSample->Release(); CleanupCancelled(); OnError(hr); } return hr; } HRESULT CMp2PullPin::CollectAndDeliver( REFERENCE_TIME tStart, REFERENCE_TIME tStop) { IMediaSample* pSample = NULL; // better be sure pSample is set DWORD_PTR dwUnused; HRESULT hr = m_pReader->WaitForNext( INFINITE, &pSample, &dwUnused); if (FAILED(hr)) { if (pSample) { pSample->Release(); } } else { hr = DeliverSample(pSample, tStart, tStop); } if (FAILED(hr)) { CleanupCancelled(); OnError(hr); } return hr; } HRESULT CMp2PullPin::DeliverSample( IMediaSample* pSample, REFERENCE_TIME tStart, REFERENCE_TIME tStop ) { // fix up sample if past actual stop (for sector alignment) REFERENCE_TIME t1, t2; pSample->GetTime(&t1, &t2); if (t2 > tStop) { t2 = tStop; pSample->SetTime(&t1, &t2); } HRESULT hr = Receive(pSample); pSample->Release(); return hr; } void CMp2PullPin::Process(void) { // is there anything to do? if (m_tStop <= m_tStart) { EndOfStream(); return; } BOOL bDiscontinuity = TRUE; // if there is more than one sample at the allocator, // then try to queue 2 at once in order to overlap. // -- get buffer count and required alignment ALLOCATOR_PROPERTIES Actual; HRESULT hr = m_pAlloc->GetProperties(&Actual); // align the start position downwards REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS; REFERENCE_TIME tCurrent = tStart; REFERENCE_TIME tStop = m_tStop; if (tStop > m_tDuration) { tStop = m_tDuration; } // align the stop position - may be past stop, but that // doesn't matter REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS; DWORD dwRequest; if (!m_bSync) { // Break out of the loop either if we get to the end or we're asked // to do something else while (tCurrent < tAlignStop) { // Break out without calling EndOfStream if we're asked to // do something different if (CheckRequest(&dwRequest)) { return; } // queue a first sample if (Actual.cBuffers > 1) { hr = QueueSample(tCurrent, tAlignStop, TRUE); bDiscontinuity = FALSE; if (FAILED(hr)) { return; } } // loop queueing second and waiting for first.. while (tCurrent < tAlignStop) { hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity); bDiscontinuity = FALSE; if (FAILED(hr)) { return; } hr = CollectAndDeliver(tStart, tStop); if (S_OK != hr) { // stop if error, or if downstream filter said // to stop. return; } } if (Actual.cBuffers > 1) { hr = CollectAndDeliver(tStart, tStop); if (FAILED(hr)) { return; } } } } else { // sync version of above loop while (tCurrent < tAlignStop) { // Break out without calling EndOfStream if we're asked to // do something different if (CheckRequest(&dwRequest)) { return; } IMediaSample* pSample; hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0); if (FAILED(hr)) { OnError(hr); return; } LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS); if (tStopThis > tAlignStop) { tStopThis = tAlignStop; } pSample->SetTime(&tCurrent, &tStopThis); tCurrent = tStopThis; if (bDiscontinuity) { pSample->SetDiscontinuity(TRUE); bDiscontinuity = FALSE; } hr = m_pReader->SyncReadAligned(pSample); if (FAILED(hr)) { pSample->Release(); OnError(hr); return; } hr = DeliverSample(pSample, tStart, tStop); if (hr != S_OK) { if (FAILED(hr)) { OnError(hr); } return; } } } EndOfStream(); } // after a flush, cancelled i/o will be waiting for collection // and release void CMp2PullPin::CleanupCancelled(void) { while (1) { IMediaSample * pSample; DWORD_PTR dwUnused; HRESULT hr = m_pReader->WaitForNext( 0, // no wait &pSample, &dwUnused); if(pSample) { pSample->Release(); } else { // no more samples return; } } }