diff --git a/transcoder/src/main.rs b/transcoder/src/main.rs index 818322a1..ea153b21 100644 --- a/transcoder/src/main.rs +++ b/transcoder/src/main.rs @@ -41,7 +41,10 @@ async fn transcode_movie( transcoder .transcode(client_id, path, quality, 0) .await - .map_err(|_| ApiError::InternalError) + .map_err(|e| { + eprintln!("Unhandled error occured while transcoding: {}", e); + ApiError::InternalError + }) } #[get("/movie/{quality}/{slug}/segments-{chunk}.ts")] diff --git a/transcoder/src/transcode.rs b/transcoder/src/transcode.rs index da11a4dd..dca6fe52 100644 --- a/transcoder/src/transcode.rs +++ b/transcoder/src/transcode.rs @@ -4,10 +4,10 @@ use std::collections::HashMap; use std::path::PathBuf; use std::process::Stdio; use std::str::FromStr; -use std::sync::atomic::AtomicI32; -use std::sync::{Arc, RwLock}; +use std::sync::RwLock; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; +use tokio::sync::watch::{self, Receiver}; use crate::utils::Signalable; @@ -66,7 +66,7 @@ fn get_transcode_video_quality_args(quality: &Quality) -> Vec<&'static str> { } // TODO: Add audios streams (and transcode them only when necesarry) -async fn start_transcode(path: String, quality: Quality, start_time: i32) -> TranscodeInfo { +async fn start_transcode(path: String, quality: Quality, start_time: u32) -> TranscodeInfo { // TODO: Use the out path below once cached segments can be reused. // let out_dir = format!("/cache/{show_hash}/{quality}"); let uuid: String = thread_rng() @@ -77,18 +77,19 @@ async fn start_transcode(path: String, quality: Quality, start_time: i32) -> Tra let out_dir = format!("/cache/{uuid}"); std::fs::create_dir(&out_dir).expect("Could not create cache directory"); - let segment_time = "10"; + let segment_time: u32 = 10; let mut child = Command::new("ffmpeg") .args(&["-progress", "pipe:1"]) + .arg("-nostats") .args(&["-ss", start_time.to_string().as_str()]) .args(&["-i", path.as_str()]) .args(&["-f", "segment"]) .args(&["-segment_list_type", "m3u8"]) - // Disable the .tmp file to serve it instantly to the client. + // Use a .tmp file for segments (.ts files) .args(&["-hls_flags", "temp_files"]) // Keep all segments in the list (else only last X are presents, useful for livestreams) .args(&["-segment_list_size", "0"]) - .args(&["-segment_time", segment_time]) + .args(&["-segment_time", segment_time.to_string().as_str()]) // Force segments to be exactly segment_time (only works when transcoding) .args(&[ "-force_key_frames", @@ -104,37 +105,46 @@ async fn start_transcode(path: String, quality: Quality, start_time: i32) -> Tra format!("{out_dir}/stream.m3u8"), format!("{out_dir}/segments-%02d.ts"), ]) + // TODO: Figure out which flag would be the best. + .args(&["-segment_list_flags", "live"]) + // .args(&["-segment_list_flags", "cache"]) .stdout(Stdio::piped()) .spawn() .expect("ffmpeg failed to start"); let stdout = child.stdout.take().unwrap(); - let info = TranscodeInfo { - show: (path, quality), - job: child, - uuid, - start_time, - ready_time: Arc::new(AtomicI32::new(0)), - }; - let ready_time = Arc::clone(&info.ready_time); + let (tx, mut rx) = watch::channel(0u32); tokio::spawn(async move { let mut reader = BufReader::new(stdout).lines(); while let Some(line) = reader.next_line().await.unwrap() { - if let Some((key, value)) = line.find(':').map(|i| line.split_at(i)) { - if key == "out_time_ms" { - ready_time.store( - value.parse::().unwrap() / 1000, - std::sync::atomic::Ordering::Relaxed, - ); + if let Some((key, value)) = line.find('=').map(|i| line.split_at(i)) { + let value = &value[1..]; + // Can't use ms since ms and us are both set to us /shrug + if key == "out_time_us" { + tx.send(value.parse::().unwrap() / 1_000_000).unwrap(); } // TODO: maybe store speed too. } } }); - // TODO: Wait for 1.5 * segment time after start_time to be ready. - return info; + // Wait for 1.5 * segment time after start_time to be ready. + loop { + rx.changed().await.unwrap(); + let ready_time = *rx.borrow(); + if ready_time >= (1.5 * segment_time as f32) as u32 + start_time { + break; + } + } + + TranscodeInfo { + show: (path, quality), + job: child, + uuid, + start_time, + ready_time: rx, + } } fn get_cache_path(info: &TranscodeInfo) -> PathBuf { @@ -151,8 +161,9 @@ struct TranscodeInfo { job: Child, uuid: String, #[allow(dead_code)] - start_time: i32, - ready_time: Arc, + start_time: u32, + #[allow(dead_code)] + ready_time: Receiver, } pub struct Transcoder { @@ -171,7 +182,7 @@ impl Transcoder { client_id: String, path: String, quality: Quality, - start_time: i32, + start_time: u32, ) -> Result { // TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time. // TODO: Clear cache at startup/every X time without use. @@ -184,7 +195,8 @@ impl Transcoder { }) = self.running.write().unwrap().get_mut(&client_id) { if path != *old_path || quality != *old_qual { - job.interrupt()?; + // If the job has already ended, interrupt returns an error but we don't care. + _ = job.interrupt(); } else { let mut path = get_cache_path_from_uuid(uuid); path.push("stream.m3u8");