Add error handling in the transcoder

This commit is contained in:
Zoe Roux 2023-07-04 11:14:30 +09:00
parent 28d0fc161b
commit 1734e57d43
5 changed files with 85 additions and 27 deletions

View File

@ -21,6 +21,8 @@ in
rustc
pkgconfig
openssl
mediainfo
ffmpeg
];
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";

View File

@ -1,4 +1,4 @@
use crate::{error::ApiError, paths, state::Transcoder};
use crate::{error::ApiError, paths, state::Transcoder, transcode::TranscodeError};
use actix_files::NamedFile;
use actix_web::{get, web, Result};
@ -28,9 +28,25 @@ async fn get_audio_transcoded(
.await
.map_err(|_| ApiError::NotFound)?;
transcoder.transcode_audio(path, audio).await.map_err(|e| {
eprintln!("Error while transcoding audio: {}", e);
transcoder
.transcode_audio(path, audio)
.await
.map_err(|e| match e {
TranscodeError::ArgumentError(err) => ApiError::BadRequest { error: err },
TranscodeError::FFmpegError(err) => {
eprintln!(
"Unhandled ffmpeg error occured while transcoding audio: {}",
err
);
ApiError::InternalError
}
TranscodeError::ReadError(err) => {
eprintln!(
"Unhandled read error occured while transcoding audio: {}",
err
);
ApiError::InternalError
}
})
}

View File

@ -96,7 +96,7 @@ impl Transcoder {
path: String,
quality: Quality,
start_time: u32,
) -> Result<String, std::io::Error> {
) -> Result<String, TranscodeError> {
// TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time.
// TODO: cache transcoded output for a show/quality and reuse it for every future requests.
if let Some(TranscodeInfo {
@ -113,15 +113,15 @@ impl Transcoder {
} else {
let mut path = get_cache_path_from_uuid(uuid);
path.push("stream.m3u8");
return std::fs::read_to_string(path);
return std::fs::read_to_string(path).map_err(|e| TranscodeError::ReadError(e));
}
}
let info = transcode_video(path, quality, start_time).await;
let info = transcode_video(path, quality, start_time).await?;
let mut path = get_cache_path(&info);
path.push("stream.m3u8");
self.running.write().unwrap().insert(client_id, info);
std::fs::read_to_string(path)
std::fs::read_to_string(path).map_err(|e| TranscodeError::ReadError(e))
}
// TODO: Use path/quality instead of client_id
@ -145,7 +145,7 @@ impl Transcoder {
&self,
path: String,
audio: u32,
) -> Result<String, std::io::Error> {
) -> Result<String, TranscodeError> {
let mut stream = PathBuf::from(get_audio_path(&path, audio));
stream.push("stream.m3u8");
@ -159,9 +159,9 @@ impl Transcoder {
// initialize the transcode and wait for the second segment while the second will use
// the same transcode but not wait and retrieve a potentially invalid playlist file.
self.audio_jobs.write().unwrap().push((path.clone(), audio));
transcode_audio(path, audio).await;
transcode_audio(path, audio).await?;
}
std::fs::read_to_string(stream)
std::fs::read_to_string(stream).map_err(|e| TranscodeError::ReadError(e))
}
pub async fn get_audio_segment(

View File

@ -14,6 +14,12 @@ use tokio::sync::watch;
const SEGMENT_TIME: u32 = 10;
pub enum TranscodeError {
ReadError(std::io::Error),
FFmpegError(String),
ArgumentError(String),
}
#[derive(PartialEq, Eq, Serialize, Display, Clone, Copy)]
pub enum Quality {
#[display(fmt = "240p")]
@ -178,17 +184,30 @@ fn get_transcode_video_quality_args(quality: &Quality, segment_time: u32) -> Vec
.collect()
}
pub async fn transcode_audio(path: String, audio: u32) {
pub async fn transcode_audio(path: String, audio: u32) -> Result<Child, TranscodeError> {
start_transcode(
&path,
&get_audio_path(&path, audio),
get_transcode_audio_args(audio),
0,
)
.await;
.await
.map_err(|e| {
if let TranscodeError::FFmpegError(message) = e {
if message.contains("matches no streams.") {
return TranscodeError::ArgumentError("Invalid audio index".to_string());
}
return TranscodeError::FFmpegError(message);
}
e
})
}
pub async fn transcode_video(path: String, quality: Quality, start_time: u32) -> TranscodeInfo {
pub async fn transcode_video(
path: String,
quality: Quality,
start_time: u32,
) -> Result<TranscodeInfo, TranscodeError> {
// TODO: Use the out path below once cached segments can be reused.
// let out_dir = format!("/cache/{show_hash}/{quality}");
let uuid: String = thread_rng()
@ -204,12 +223,12 @@ pub async fn transcode_video(path: String, quality: Quality, start_time: u32) ->
get_transcode_video_quality_args(&quality, SEGMENT_TIME),
start_time,
)
.await;
TranscodeInfo {
.await?;
Ok(TranscodeInfo {
show: (path, quality),
job: child,
uuid,
}
})
}
async fn start_transcode(
@ -217,7 +236,7 @@ async fn start_transcode(
out_dir: &String,
encode_args: Vec<String>,
start_time: u32,
) -> Child {
) -> Result<Child, TranscodeError> {
std::fs::create_dir_all(&out_dir).expect("Could not create cache directory");
let mut cmd = Command::new("ffmpeg");
@ -239,7 +258,8 @@ async fn start_transcode(
format!("{out_dir}/segments-%02d.ts"),
format!("{out_dir}/stream.m3u8"),
])
.stdout(Stdio::piped());
.stdout(Stdio::piped())
.stderr(Stdio::piped());
println!("Starting a transcode with the command: {:?}", cmd);
let mut child = cmd.spawn().expect("ffmpeg failed to start");
@ -260,13 +280,24 @@ async fn start_transcode(
}
});
// Wait for 1.5 * segment time after start_time to be ready.
loop {
// TODO: Create a better error handling for here.
rx.changed().await.expect("Invalid audio index.");
// rx.changed() returns an error if the sender is dropped aka if the coroutine 10 lines
// higher has finished aka if the process has finished.
if let Err(_) = rx.changed().await {
let es = child.wait().await.unwrap();
if es.success() {
return Ok(child);
}
let output = child.wait_with_output().await.unwrap();
return Err(TranscodeError::FFmpegError(
String::from_utf8(output.stderr).unwrap(),
));
}
// Wait for 1.5 * segment time after start_time to be ready.
let ready_time = *rx.borrow();
if ready_time >= (1.5 * SEGMENT_TIME as f32) as u32 + start_time {
return child;
return Ok(child);
}
}
}

View File

@ -1,6 +1,6 @@
use std::str::FromStr;
use crate::{error::ApiError, paths, state::Transcoder, transcode::Quality, utils::get_client_id};
use crate::{error::ApiError, paths, state::Transcoder, transcode::{Quality, TranscodeError}, utils::get_client_id};
use actix_files::NamedFile;
use actix_web::{get, web, HttpRequest, Result};
@ -41,8 +41,17 @@ async fn get_transcoded(
.transcode(client_id, path, quality, 0)
.await
.map_err(|e| {
eprintln!("Unhandled error occured while transcoding: {}", e);
match e {
TranscodeError::ArgumentError(err) => ApiError::BadRequest { error: err },
TranscodeError::FFmpegError(err) => {
eprintln!("Unhandled ffmpeg error occured while transcoding video: {}", err);
ApiError::InternalError
},
TranscodeError::ReadError(err) => {
eprintln!("Unhandled read error occured while transcoding video: {}", err);
ApiError::InternalError
}
}
})
}