diff --git a/backends/gstreamer/player.rs b/backends/gstreamer/player.rs index 95101005..64548bc5 100644 --- a/backends/gstreamer/player.rs +++ b/backends/gstreamer/player.rs @@ -116,28 +116,44 @@ enum PlayerSource { Stream(ServoMediaStreamSrc), } +struct SharedState { + category: gst::DebugCategory, + // Track `play` state to send expected `paused` state change event. + // TODO: + play_state: gst_play::PlayState, + pending_input_size: Option, + // The playback rate will not be passed to the pipeline if the current + // GstPlay state is less than GST_STATE_PAUSED. + pending_playback_rate: Option, + metadata: Option, +} + struct PlayerInner { player: gst_play::Play, _signal_adapter: gst_play::PlaySignalAdapter, source: Option, video_sink: gst_app::AppSink, - input_size: u64, - play_state: gst_play::PlayState, + input_size: Cell, paused: Cell, playback_rate: Cell, muted: Cell, volume: Cell, stream_type: StreamType, - last_metadata: Option, cat: gst::DebugCategory, enough_data: Arc, + shared_state: Arc>, } impl PlayerInner { pub fn set_input_size(&mut self, size: u64) -> Result<(), PlayerError> { - // Set input_size to proxy its value, since it - // could be set by the user before calling .setup(). - self.input_size = size; + // Set input_size to proxy its value, since it could be set by the user + // before calling .setup(). + if self.input_size.get() == size { + return Ok(()); + } + + self.input_size.set(size); + match self.source { // The input size is only useful for seekable streams. Some(ref mut source) => { @@ -149,7 +165,9 @@ impl PlayerInner { }); } }, - _ => (), + _ => { + self.shared_state.lock().unwrap().pending_input_size = Some(size); + }, } Ok(()) } @@ -183,9 +201,13 @@ impl PlayerInner { // current GstPlay state is less than GST_STATE_PAUSED, which will be // set immediately before the initial GST_PLAY_MESSAGE_MEDIA_INFO_UPDATED // message is posted to bus. - if self.last_metadata.is_some() { + let mut shared_state = self.shared_state.lock().unwrap(); + if shared_state.metadata.is_some() { self.player.set_rate(playback_rate); + } else { + shared_state.pending_playback_rate = Some(playback_rate); } + Ok(()) } @@ -206,8 +228,12 @@ impl PlayerInner { pub fn stop(&mut self) -> Result<(), PlayerError> { self.player.stop(); self.paused.set(true); - self.last_metadata = None; - self.source = None; + self.shared_state.lock().unwrap().metadata = None; + if let Some(source) = self.source.take() { + if let PlayerSource::Seekable(source) = source { + source.set_callbacks(gst_app::AppSrcCallbacks::builder().build()); + } + } Ok(()) } @@ -245,12 +271,19 @@ impl PlayerInner { if self.stream_type != StreamType::Seekable { return Err(PlayerError::NonSeekableStream); } - if let Some(ref metadata) = self.last_metadata { - if let Some(ref duration) = metadata.duration { - if duration < &time::Duration::new(time as u64, 0) { - gst::warning!(self.cat, obj = &self.player, "Trying to seek out of range"); - return Err(PlayerError::SeekOutOfRange); - } + + if let Some(ref duration) = self + .shared_state + .lock() + .unwrap() + .metadata + .as_ref() + .map(|metadata| metadata.duration) + .flatten() + { + if duration < &time::Duration::new(time as u64, 0) { + gst::warning!(self.cat, obj = &self.player, "Trying to seek out of range"); + return Err(PlayerError::SeekOutOfRange); } } @@ -296,7 +329,10 @@ impl PlayerInner { let mut buffered_ranges = vec![]; let Some(duration) = self - .last_metadata + .shared_state + .lock() + .unwrap() + .metadata .as_ref() .map(|metadata| metadata.duration) .flatten() @@ -330,14 +366,19 @@ impl PlayerInner { pub fn seekable(&self) -> Vec> { // if the servosrc is seekable, we should return the duration of the media - if let Some(metadata) = self.last_metadata.as_ref() { - if metadata.is_seekable { - if let Some(duration) = metadata.duration { - return vec![Range { - start: 0.0, - end: duration.as_secs_f64(), - }]; - } + if let Some(ref metadata) = self + .shared_state + .lock() + .unwrap() + .metadata + .as_ref() + .filter(|metadata| metadata.is_seekable) + { + if let Some(duration) = metadata.duration { + return vec![Range { + start: 0.0, + end: duration.as_secs_f64(), + }]; } } @@ -610,137 +651,181 @@ impl GStreamerPlayer { player.set_video_track_enabled(false); } + let shared_state = Arc::new(Mutex::new(SharedState { + category: gst::DebugCategory::get("servoplayer").unwrap(), + play_state: gst_play::PlayState::Stopped, + pending_input_size: None, + pending_playback_rate: None, + metadata: None, + })); + *self.inner.borrow_mut() = Some(Arc::new(Mutex::new(PlayerInner { player, _signal_adapter: signal_adapter.clone(), source: None, video_sink, - input_size: 0, - play_state: gst_play::PlayState::Stopped, + input_size: Cell::new(0), paused: Cell::new(DEFAULT_PAUSED), playback_rate: Cell::new(DEFAULT_PLAYBACK_RATE), muted: Cell::new(DEFAULT_MUTED), volume: Cell::new(DEFAULT_VOLUME), stream_type: self.stream_type, - last_metadata: None, cat: gst::DebugCategory::get("servoplayer").unwrap(), enough_data: Arc::new(AtomicBool::new(false)), + shared_state: shared_state.clone(), }))); let inner = self.inner.borrow(); let inner = inner.as_ref().unwrap(); - let observer = self.observer.clone(); + // Handle `end-of-stream` signal. - signal_adapter.connect_end_of_stream(move |_| { - let _ = notify!(observer, PlayerEvent::EndOfStream); - }); - - let observer = self.observer.clone(); - // Handle `error` signal - signal_adapter.connect_error(move |_self, error, _details| { - let _ = notify!(observer, PlayerEvent::Error(error.to_string())); - }); - - let inner_clone = inner.clone(); - let observer = self.observer.clone(); + signal_adapter.connect_end_of_stream(glib::clone!( + #[strong(rename_to = observer)] + self.observer, + move |_| { + let _ = notify!(observer, PlayerEvent::EndOfStream); + } + )); + + // Handle `error` signal. + signal_adapter.connect_error(glib::clone!( + #[strong(rename_to = observer)] + self.observer, + move |_, error, _| { + let _ = notify!(observer, PlayerEvent::Error(error.to_string())); + } + )); + // Handle `state-changed` signal. - signal_adapter.connect_state_changed(move |_, play_state| { - inner_clone.lock().unwrap().play_state = play_state; - - let state = match play_state { - gst_play::PlayState::Buffering => Some(PlaybackState::Buffering), - gst_play::PlayState::Stopped => Some(PlaybackState::Stopped), - gst_play::PlayState::Paused => Some(PlaybackState::Paused), - gst_play::PlayState::Playing => Some(PlaybackState::Playing), - _ => None, - }; - if let Some(v) = state { - let _ = notify!(observer, PlayerEvent::StateChanged(v)); + signal_adapter.connect_state_changed(glib::clone!( + #[strong(rename_to = observer)] + self.observer, + #[strong] + shared_state, + move |_, play_state| { + shared_state.lock().unwrap().play_state = play_state; + + let state = match play_state { + gst_play::PlayState::Buffering => Some(PlaybackState::Buffering), + gst_play::PlayState::Stopped => Some(PlaybackState::Stopped), + gst_play::PlayState::Paused => Some(PlaybackState::Paused), + gst_play::PlayState::Playing => Some(PlaybackState::Playing), + _ => None, + }; + if let Some(v) = state { + let _ = notify!(observer, PlayerEvent::StateChanged(v)); + } } - }); + )); - let observer = self.observer.clone(); // Handle `position-update` signal. - signal_adapter.connect_position_updated(move |_, position| { - if let Some(seconds) = position.map(|p| p.seconds_f64()) { - let _ = notify!(observer, PlayerEvent::PositionChanged(seconds)); + signal_adapter.connect_position_updated(glib::clone!( + #[strong(rename_to = observer)] + self.observer, + move |_, position| { + if let Some(seconds) = position.map(|p| p.seconds_f64()) { + let _ = notify!(observer, PlayerEvent::PositionChanged(seconds)); + } } - }); + )); - let observer = self.observer.clone(); // Handle `seek-done` signal. - signal_adapter.connect_seek_done(move |_, position| { - let _ = notify!(observer, PlayerEvent::SeekDone(position.seconds_f64())); - }); + signal_adapter.connect_seek_done(glib::clone!( + #[strong(rename_to = observer)] + self.observer, + move |_, position| { + let _ = notify!(observer, PlayerEvent::SeekDone(position.seconds_f64())); + } + )); // Handle `media-info-updated` signal. - let inner_clone = inner.clone(); - let observer = self.observer.clone(); - signal_adapter.connect_media_info_updated(move |_, info| { - let Ok(metadata) = metadata_from_media_info(info) else { - return; - }; + signal_adapter.connect_media_info_updated(glib::clone!( + #[strong(rename_to = observer)] + self.observer, + #[strong] + shared_state, + move |signal_adapter, info| { + let Ok(metadata) = metadata_from_media_info(info) else { + return; + }; - let mut inner = inner_clone.lock().unwrap(); + let mut shared_state = shared_state.lock().unwrap(); - if inner.last_metadata.as_ref() == Some(&metadata) { - return; - } + if shared_state.metadata.as_ref() == Some(&metadata) { + return; + } - // TODO: Workaround to generate expected `paused` state change event. - // - let mut send_pause_event = false; - - if inner.last_metadata.is_none() && metadata.is_seekable { - if inner.playback_rate.get() != DEFAULT_PLAYBACK_RATE { - // The `paused` state change event will be fired after the - // seek initiated by the playback rate change has - // completed. - inner.player.set_rate(inner.playback_rate.get()); - } else if inner.play_state == gst_play::PlayState::Paused { - send_pause_event = true; + // TODO: Workaround to generate expected `paused` state change event. + // + let mut send_pause_event = false; + + if shared_state.metadata.is_none() && metadata.is_seekable { + if shared_state + .pending_playback_rate + .is_some_and(|playback_rate| playback_rate != DEFAULT_PLAYBACK_RATE) + { + // The `paused` state change event will be fired after the + // seek initiated by the playback rate change has + // completed. + signal_adapter + .play() + .set_rate(shared_state.pending_playback_rate.take().unwrap()); + } else if shared_state.play_state == gst_play::PlayState::Paused { + send_pause_event = true; + } } - } - inner.last_metadata = Some(metadata.clone()); - gst::info!( - inner.cat, - obj = &inner.player, - "Metadata updated: {:?}", - metadata - ); - let _ = notify!(observer, PlayerEvent::MetadataUpdated(metadata)); + shared_state.metadata = Some(metadata.clone()); + + gst::info!( + shared_state.category, + obj = &signal_adapter.play(), + "New metadata: {metadata:?}", + ); + + let _ = notify!(observer, PlayerEvent::MetadataUpdated(metadata)); - if send_pause_event { - let _ = notify!(observer, PlayerEvent::StateChanged(PlaybackState::Paused)); + if send_pause_event { + let _ = notify!(observer, PlayerEvent::StateChanged(PlaybackState::Paused)); + } } - }); + )); // Handle `duration-changed` signal. - let inner_clone = inner.clone(); - let observer = self.observer.clone(); - signal_adapter.connect_duration_changed(move |_, duration| { - let duration = duration.map(|duration| { - time::Duration::new( - duration.seconds(), - (duration.nseconds() % 1_000_000_000) as u32, - ) - }); + signal_adapter.connect_duration_changed(glib::clone!( + #[strong(rename_to = observer)] + self.observer, + #[strong] + shared_state, + move |signal_adapter, duration| { + let duration = duration.map(|duration| { + time::Duration::new( + duration.seconds(), + (duration.nseconds() % 1_000_000_000) as u32, + ) + }); - let mut inner = inner_clone.lock().unwrap(); - if let Some(ref mut metadata) = inner.last_metadata { - if metadata.duration != duration { - metadata.duration = duration; - gst::info!( - inner.cat, - obj = &inner.player, - "Duration changed: {:?}", - duration - ); - let _ = notify!(observer, PlayerEvent::DurationChanged(duration)); - } + let mut shared_state = shared_state.lock().unwrap(); + + let Some(metadata) = shared_state + .metadata + .as_mut() + .filter(|metadata| metadata.duration != duration) + else { + return; + }; + + metadata.duration = duration; + + gst::info!( + shared_state.category, + obj = &signal_adapter.play(), + "New duration: {duration:?}", + ); + + let _ = notify!(observer, PlayerEvent::DurationChanged(duration)); } - }); + )); if let Some(video_renderer) = self.video_renderer.clone() { // Creates a closure that renders a frame using the video_renderer @@ -786,115 +871,154 @@ impl GStreamerPlayer { ); }; - let (receiver, error_handler_id) = { + let (done_receiver, error_handler_id) = { let inner_clone = inner.clone(); let inner = inner.lock().unwrap(); - let pipeline = inner.player.pipeline(); - - let (sender, receiver) = mpsc::channel(); - - let sender = Arc::new(Mutex::new(sender)); - let sender_clone = sender.clone(); - let is_ready_clone = self.is_ready.clone(); - let observer = self.observer.clone(); - pipeline.connect("source-setup", false, move |args| { - let source = args[1].get::().unwrap(); - - let mut inner = inner_clone.lock().unwrap(); - let source = match inner.stream_type { - StreamType::Seekable => { - let servosrc = source - .dynamic_cast::() - .expect("Source element is expected to be a ServoSrc!"); - - if inner.input_size > 0 { - servosrc.set_size(inner.input_size as i64); - } - - let sender_clone = sender.clone(); - let is_ready = is_ready_clone.clone(); - let observer_ = observer.clone(); - let observer__ = observer.clone(); - let observer___ = observer.clone(); - let servosrc_ = servosrc.clone(); - let enough_data_ = inner.enough_data.clone(); - let enough_data__ = inner.enough_data.clone(); - let seek_channel = Arc::new(Mutex::new(SeekChannel::new())); - servosrc.set_callbacks( - gst_app::AppSrcCallbacks::builder() - .need_data(move |_, _| { - // We block the caller of the setup method until we get - // the first need-data signal, so we ensure that we - // don't miss any data between the moment the client - // calls setup and the player is actually ready to - // get any data. - is_ready.call_once(|| { - let _ = sender_clone.lock().unwrap().send(Ok(())); - }); - - enough_data_.store(false, Ordering::Relaxed); - let _ = notify!(observer_, PlayerEvent::NeedData); - }) - .enough_data(move |_| { - enough_data__.store(true, Ordering::Relaxed); - let _ = notify!(observer__, PlayerEvent::EnoughData); - }) - .seek_data(move |_, offset| { - let (ret, ack_channel) = if servosrc_.set_seek_offset(offset) { - let _ = notify!( - observer___, - PlayerEvent::SeekData( - offset, - seek_channel.lock().unwrap().sender() - ) - ); - let (ret, ack_channel) = - seek_channel.lock().unwrap()._await(); - (ret, Some(ack_channel)) - } else { - (true, None) - }; - - servosrc_.set_seek_done(); - if let Some(ack_channel) = ack_channel { - ack_channel.send(()).unwrap(); - } - ret - }) - .build(), - ); - - PlayerSource::Seekable(servosrc) - }, - StreamType::Stream => { - let media_stream_src = source - .dynamic_cast::() - .expect("Source element is expected to be a ServoMediaStreamSrc!"); - let sender_clone = sender.clone(); - is_ready_clone.call_once(|| { - let _ = notify!(sender_clone, Ok(())); - }); - PlayerSource::Stream(media_stream_src) - }, - }; - inner.set_src(source); + let (done_sender, done_receiver) = mpsc::channel(); + let done_sender = Arc::new(Mutex::new(done_sender)); + + inner.player.pipeline().connect_closure( + "source-setup", + false, + glib::closure!( + #[strong(rename_to = observer)] + self.observer, + #[strong] + done_sender, + #[strong] + shared_state, + #[strong(rename_to = enough_data)] + inner.enough_data, + #[strong(rename_to = is_ready)] + self.is_ready, + #[strong(rename_to = stream_type)] + self.stream_type, + #[weak(rename_to = inner)] + inner_clone, + move |_pipeline: &gst::Element, source: &gst::Element| { + let source = match stream_type { + StreamType::Seekable => { + let servosrc = source + .clone() + .dynamic_cast::() + .expect("Source element is expected to be a ServoSrc!"); + + let mut shared_state = shared_state.lock().unwrap(); + + if shared_state.pending_input_size.is_some_and(|size| size > 0) { + servosrc.set_size( + shared_state.pending_input_size.take().unwrap() as i64, + ); + } + + servosrc.set_callbacks( + gst_app::AppSrcCallbacks::builder() + .need_data(glib::clone!( + #[strong] + observer, + #[strong] + done_sender, + #[strong] + enough_data, + #[strong] + is_ready, + move |_, _| { + // We block the caller of the setup method until we get + // the first need-data signal, so we ensure that we + // don't miss any data between the moment the client + // calls setup and the player is actually ready to + // get any data. + is_ready.call_once(|| { + let _ = + done_sender.lock().unwrap().send(Ok(())); + }); + enough_data.store(false, Ordering::Relaxed); + let _ = notify!(observer, PlayerEvent::NeedData); + } + )) + .enough_data(glib::clone!( + #[strong] + observer, + #[strong] + enough_data, + move |_| { + enough_data.store(true, Ordering::Relaxed); + let _ = notify!(observer, PlayerEvent::EnoughData); + } + )) + .seek_data(glib::clone!( + #[strong] + observer, + #[weak] + servosrc, + #[upgrade_or] + false, + move |_, offset| { + let (ret, ack_channel) = if servosrc + .set_seek_offset(offset) + { + let seek_channel = + Arc::new(Mutex::new(SeekChannel::new())); + + let _ = notify!( + observer, + PlayerEvent::SeekData( + offset, + seek_channel.lock().unwrap().sender() + ) + ); + let (ret, ack_channel) = + seek_channel.lock().unwrap()._await(); + (ret, Some(ack_channel)) + } else { + (true, None) + }; + + servosrc.set_seek_done(); + if let Some(ack_channel) = ack_channel { + ack_channel.send(()).unwrap(); + } + ret + } + )) + .build(), + ); + + PlayerSource::Seekable(servosrc) + }, + StreamType::Stream => { + let media_stream_src = + source.clone().dynamic_cast::().expect( + "Source element is expected to be a ServoMediaStreamSrc!", + ); + is_ready.call_once(|| { + let _ = notify!(done_sender, Ok(())); + }); + PlayerSource::Stream(media_stream_src) + }, + }; - None - }); + inner.lock().unwrap().set_src(source); + }, + ), + ); - let error_handler_id = - signal_adapter.connect_error(move |signal_adapter, error, _details| { - let _ = notify!(sender_clone, Err(PlayerError::Backend(error.to_string()))); + let error_handler_id = signal_adapter.connect_error(glib::clone!( + #[strong] + done_sender, + move |signal_adapter, error, _| { + let _ = notify!(done_sender, Err(PlayerError::Backend(error.to_string()))); signal_adapter.play().stop(); - }); + } + )); let _ = inner.player.pause(); - (receiver, error_handler_id) + (done_receiver, error_handler_id) }; - let result = receiver.recv().unwrap(); + let result = done_receiver.recv().unwrap(); glib::signal::signal_handler_disconnect(&inner.lock().unwrap().player, error_handler_id); result }