diff --git a/shell.nix b/shell.nix index 61e36bd8..b14ef34d 100644 --- a/shell.nix +++ b/shell.nix @@ -21,6 +21,8 @@ in rustc pkgconfig openssl + mediainfo + ffmpeg ]; RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}"; diff --git a/transcoder/src/audio.rs b/transcoder/src/audio.rs index f321cd7f..4348e37e 100644 --- a/transcoder/src/audio.rs +++ b/transcoder/src/audio.rs @@ -1,4 +1,4 @@ -use crate::{error::ApiError, paths, state::Transcoder}; +use crate::{error::ApiError, paths, state::Transcoder, transcode::TranscodeError}; use actix_files::NamedFile; use actix_web::{get, web, Result}; @@ -28,10 +28,26 @@ async fn get_audio_transcoded( .await .map_err(|_| ApiError::NotFound)?; - transcoder.transcode_audio(path, audio).await.map_err(|e| { - eprintln!("Error while transcoding audio: {}", e); - ApiError::InternalError - }) + transcoder + .transcode_audio(path, audio) + .await + .map_err(|e| match e { + TranscodeError::ArgumentError(err) => ApiError::BadRequest { error: err }, + TranscodeError::FFmpegError(err) => { + eprintln!( + "Unhandled ffmpeg error occured while transcoding audio: {}", + err + ); + ApiError::InternalError + } + TranscodeError::ReadError(err) => { + eprintln!( + "Unhandled read error occured while transcoding audio: {}", + err + ); + ApiError::InternalError + } + }) } /// Get audio chunk diff --git a/transcoder/src/state.rs b/transcoder/src/state.rs index 546fcf9d..b1115a9e 100644 --- a/transcoder/src/state.rs +++ b/transcoder/src/state.rs @@ -96,7 +96,7 @@ impl Transcoder { path: String, quality: Quality, start_time: u32, - ) -> Result { + ) -> Result { // TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time. // TODO: cache transcoded output for a show/quality and reuse it for every future requests. if let Some(TranscodeInfo { @@ -113,15 +113,15 @@ impl Transcoder { } else { let mut path = get_cache_path_from_uuid(uuid); path.push("stream.m3u8"); - return std::fs::read_to_string(path); + return std::fs::read_to_string(path).map_err(|e| TranscodeError::ReadError(e)); } } - let info = transcode_video(path, quality, start_time).await; + let info = transcode_video(path, quality, start_time).await?; let mut path = get_cache_path(&info); path.push("stream.m3u8"); self.running.write().unwrap().insert(client_id, info); - std::fs::read_to_string(path) + std::fs::read_to_string(path).map_err(|e| TranscodeError::ReadError(e)) } // TODO: Use path/quality instead of client_id @@ -145,7 +145,7 @@ impl Transcoder { &self, path: String, audio: u32, - ) -> Result { + ) -> Result { let mut stream = PathBuf::from(get_audio_path(&path, audio)); stream.push("stream.m3u8"); @@ -159,9 +159,9 @@ impl Transcoder { // initialize the transcode and wait for the second segment while the second will use // the same transcode but not wait and retrieve a potentially invalid playlist file. self.audio_jobs.write().unwrap().push((path.clone(), audio)); - transcode_audio(path, audio).await; + transcode_audio(path, audio).await?; } - std::fs::read_to_string(stream) + std::fs::read_to_string(stream).map_err(|e| TranscodeError::ReadError(e)) } pub async fn get_audio_segment( diff --git a/transcoder/src/transcode.rs b/transcoder/src/transcode.rs index 7db5f866..fc344eae 100644 --- a/transcoder/src/transcode.rs +++ b/transcoder/src/transcode.rs @@ -14,6 +14,12 @@ use tokio::sync::watch; const SEGMENT_TIME: u32 = 10; +pub enum TranscodeError { + ReadError(std::io::Error), + FFmpegError(String), + ArgumentError(String), +} + #[derive(PartialEq, Eq, Serialize, Display, Clone, Copy)] pub enum Quality { #[display(fmt = "240p")] @@ -178,17 +184,30 @@ fn get_transcode_video_quality_args(quality: &Quality, segment_time: u32) -> Vec .collect() } -pub async fn transcode_audio(path: String, audio: u32) { +pub async fn transcode_audio(path: String, audio: u32) -> Result { start_transcode( &path, &get_audio_path(&path, audio), get_transcode_audio_args(audio), 0, ) - .await; + .await + .map_err(|e| { + if let TranscodeError::FFmpegError(message) = e { + if message.contains("matches no streams.") { + return TranscodeError::ArgumentError("Invalid audio index".to_string()); + } + return TranscodeError::FFmpegError(message); + } + e + }) } -pub async fn transcode_video(path: String, quality: Quality, start_time: u32) -> TranscodeInfo { +pub async fn transcode_video( + path: String, + quality: Quality, + start_time: u32, +) -> Result { // TODO: Use the out path below once cached segments can be reused. // let out_dir = format!("/cache/{show_hash}/{quality}"); let uuid: String = thread_rng() @@ -204,12 +223,12 @@ pub async fn transcode_video(path: String, quality: Quality, start_time: u32) -> get_transcode_video_quality_args(&quality, SEGMENT_TIME), start_time, ) - .await; - TranscodeInfo { + .await?; + Ok(TranscodeInfo { show: (path, quality), job: child, uuid, - } + }) } async fn start_transcode( @@ -217,7 +236,7 @@ async fn start_transcode( out_dir: &String, encode_args: Vec, start_time: u32, -) -> Child { +) -> Result { std::fs::create_dir_all(&out_dir).expect("Could not create cache directory"); let mut cmd = Command::new("ffmpeg"); @@ -239,7 +258,8 @@ async fn start_transcode( format!("{out_dir}/segments-%02d.ts"), format!("{out_dir}/stream.m3u8"), ]) - .stdout(Stdio::piped()); + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); println!("Starting a transcode with the command: {:?}", cmd); let mut child = cmd.spawn().expect("ffmpeg failed to start"); @@ -260,13 +280,24 @@ async fn start_transcode( } }); - // Wait for 1.5 * segment time after start_time to be ready. loop { - // TODO: Create a better error handling for here. - rx.changed().await.expect("Invalid audio index."); + // rx.changed() returns an error if the sender is dropped aka if the coroutine 10 lines + // higher has finished aka if the process has finished. + if let Err(_) = rx.changed().await { + let es = child.wait().await.unwrap(); + if es.success() { + return Ok(child); + } + let output = child.wait_with_output().await.unwrap(); + return Err(TranscodeError::FFmpegError( + String::from_utf8(output.stderr).unwrap(), + )); + } + + // Wait for 1.5 * segment time after start_time to be ready. let ready_time = *rx.borrow(); if ready_time >= (1.5 * SEGMENT_TIME as f32) as u32 + start_time { - return child; + return Ok(child); } } } diff --git a/transcoder/src/video.rs b/transcoder/src/video.rs index e088b981..0df8303e 100644 --- a/transcoder/src/video.rs +++ b/transcoder/src/video.rs @@ -1,6 +1,6 @@ use std::str::FromStr; -use crate::{error::ApiError, paths, state::Transcoder, transcode::Quality, utils::get_client_id}; +use crate::{error::ApiError, paths, state::Transcoder, transcode::{Quality, TranscodeError}, utils::get_client_id}; use actix_files::NamedFile; use actix_web::{get, web, HttpRequest, Result}; @@ -41,8 +41,17 @@ async fn get_transcoded( .transcode(client_id, path, quality, 0) .await .map_err(|e| { - eprintln!("Unhandled error occured while transcoding: {}", e); - ApiError::InternalError + match e { + TranscodeError::ArgumentError(err) => ApiError::BadRequest { error: err }, + TranscodeError::FFmpegError(err) => { + eprintln!("Unhandled ffmpeg error occured while transcoding video: {}", err); + ApiError::InternalError + }, + TranscodeError::ReadError(err) => { + eprintln!("Unhandled read error occured while transcoding video: {}", err); + ApiError::InternalError + } + } }) }