From 2939ea0787a3bdd2cc09bca06c1baaa4a76d570b Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Thu, 13 Apr 2023 18:09:12 +0900 Subject: [PATCH] Allow the transcoder to be run --- .env.example | 1 + docker-compose.dev.yml | 1 + docker-compose.prod.yml | 1 + docker-compose.yml | 1 + transcoder/Dockerfile | 4 +- transcoder/Dockerfile.dev | 3 +- transcoder/src/error.rs | 4 +- transcoder/src/main.rs | 13 ++++-- transcoder/src/transcode.rs | 89 +++++++++++++++++++++++++++++-------- transcoder/src/utils.rs | 8 ++-- 10 files changed, 94 insertions(+), 31 deletions(-) diff --git a/.env.example b/.env.example index 123a0388..7dc4f780 100644 --- a/.env.example +++ b/.env.example @@ -1,5 +1,6 @@ # Useful config options LIBRARY_ROOT=/video +CACHE_ROOT=/tmp/kyoo_cache LIBRARY_LANGUAGES=en # The following two values should be set to a random sequence of characters. diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 80777bb7..e27b7d65 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -60,6 +60,7 @@ services: volumes: - ./transcoder:/app - ${LIBRARY_ROOT}:/video + - ${CACHE_ROOT}:/cache ingress: image: nginx diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 0bd3589e..74b42dc8 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -39,6 +39,7 @@ services: restart: on-failure volumes: - ${LIBRARY_ROOT}:/video + - ${CACHE_ROOT}:/cache ingress: image: nginx diff --git a/docker-compose.yml b/docker-compose.yml index 87998ea3..b5d9fafa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,6 +39,7 @@ services: restart: on-failure volumes: - ${LIBRARY_ROOT}:/video + - ${CACHE_ROOT}:/cache ingress: image: nginx diff --git a/transcoder/Dockerfile b/transcoder/Dockerfile index 4ca2d641..eba3c847 100644 --- a/transcoder/Dockerfile +++ b/transcoder/Dockerfile @@ -10,8 +10,8 @@ RUN rm src/lib.rs COPY src src RUN cargo install --path . -FROM debian:bullseye-slim -#RUN apt-get update && apt-get install -y extra-runtime-dependencies && rm -rf /var/lib/apt/lists/* +FROM alpine +RUN apk add --no-cache ffmpeg COPY --from=builder /usr/local/cargo/bin/transcoder ./transcoder EXPOSE 7666 diff --git a/transcoder/Dockerfile.dev b/transcoder/Dockerfile.dev index b33f646a..1d77ee35 100644 --- a/transcoder/Dockerfile.dev +++ b/transcoder/Dockerfile.dev @@ -1,4 +1,5 @@ -FROM rust +FROM rust:alpine +RUN apk add --no-cache musl-dev ffmpeg RUN cargo install cargo-watch WORKDIR /app diff --git a/transcoder/src/error.rs b/transcoder/src/error.rs index 94b67137..6025dd3f 100644 --- a/transcoder/src/error.rs +++ b/transcoder/src/error.rs @@ -25,8 +25,8 @@ impl error::ResponseError for ApiError { } fn status_code(&self) -> StatusCode { - match *self { - ApiError::BadRequest { error } => StatusCode::BAD_REQUEST, + match self { + ApiError::BadRequest { error: _ } => StatusCode::BAD_REQUEST, ApiError::InternalError => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/transcoder/src/main.rs b/transcoder/src/main.rs index c6afb090..9813d530 100644 --- a/transcoder/src/main.rs +++ b/transcoder/src/main.rs @@ -23,19 +23,24 @@ async fn get_movie_auto( req: HttpRequest, query: web::Path<(String, String)>, transcoder: web::Data, -) -> Result { +) -> Result { let (quality, slug) = query.into_inner(); let quality = Quality::from_str(quality.as_str()).map_err(|_| ApiError::BadRequest { error: "Invalid quality".to_string(), })?; let client_id = req.headers().get("x-client-id") - .ok_or(ApiError::BadRequest { error: String::from("Missing client id. Please specify the X-CLIENT-ID header to a guid constant for the lifetime of the player (but unique per instance)."), })?; + .ok_or(ApiError::BadRequest { error: String::from("Missing client id. Please specify the X-CLIENT-ID header to a guid constant for the lifetime of the player (but unique per instance)."), })? + .to_str().unwrap(); let path = paths::get_movie_path(slug); - - todo!() + // TODO: Handle start_time that is not 0 + transcoder + .transcode(client_id.to_string(), path, quality, 0) + .await + .map_err(|_| ApiError::InternalError) } + #[actix_web::main] async fn main() -> std::io::Result<()> { let state = web::Data::new(Transcoder::new()); diff --git a/transcoder/src/transcode.rs b/transcoder/src/transcode.rs index 453d58a2..e326de8c 100644 --- a/transcoder/src/transcode.rs +++ b/transcoder/src/transcode.rs @@ -1,8 +1,12 @@ use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; -use std::process::{Child, Command}; +use std::process::Stdio; use std::str::FromStr; +use std::sync::atomic::AtomicI32; +use std::sync::Arc; use std::{collections::HashMap, sync::Mutex}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::{Child, Command}; use crate::utils::Signalable; @@ -61,7 +65,7 @@ fn get_transcode_video_quality_args(quality: &Quality) -> Vec<&'static str> { } // TODO: Add audios streams (and transcode them only when necesarry) -async fn start_transcode(path: &str, quality: &Quality, start_time_sec: f32) -> (String, Child) { +async fn start_transcode(path: String, quality: Quality, start_time: i32) -> TranscodeInfo { // TODO: Use the out path below once cached segments can be reused. // let out_dir = format!("/cache/{show_hash}/{quality}"); let uuid: String = thread_rng() @@ -70,15 +74,19 @@ async fn start_transcode(path: &str, quality: &Quality, start_time_sec: f32) -> .map(char::from) .collect(); let out_dir = format!("/cache/{uuid}"); + std::fs::create_dir(&out_dir).expect("Could not create cache directory"); let segment_time = "10"; - let child = Command::new("ffmpeg") - .args(&["-ss", start_time_sec.to_string().as_str()]) - .args(&["-i", path]) + let mut child = Command::new("ffmpeg") + .args(&["-progress", "pipe:1"]) + .args(&["-ss", start_time.to_string().as_str()]) + .args(&["-i", path.as_str()]) .args(&["-f", "segment"]) .args(&["-segment_list_type", "m3u8"]) + // Disable the .tmp file to serve it instantly to the client. + .args(&["-hls_flags", "temp_files"]) // Keep all segments in the list (else only last X are presents, useful for livestreams) - .args(&["--segment_list_size", "0"]) + .args(&["-segment_list_size", "0"]) .args(&["-segment_time", segment_time]) // Force segments to be exactly segment_time (only works when transcoding) .args(&[ @@ -89,21 +97,53 @@ async fn start_transcode(path: &str, quality: &Quality, start_time_sec: f32) -> "-segment_time_delta", "0.1", ]) - .args(get_transcode_video_quality_args(quality)) + .args(get_transcode_video_quality_args(&quality)) .args(&[ "-segment_list".to_string(), format!("{out_dir}/stream.m3u8"), format!("{out_dir}/segments-%02d.ts"), ]) + .stdout(Stdio::piped()) .spawn() .expect("ffmpeg failed to start"); - (uuid, child) + + let stdout = child.stdout.take().unwrap(); + let info = TranscodeInfo { + show: (path, quality), + job: child, + uuid, + start_time, + ready_time: Arc::new(AtomicI32::new(0)), + }; + let ready_time = Arc::clone(&info.ready_time); + + tokio::spawn(async move { + let mut reader = BufReader::new(stdout).lines(); + while let Some(line) = reader.next_line().await.unwrap() { + if let Some((key, value)) = line.find(':').map(|i| line.split_at(i)) { + if key == "out_time_ms" { + ready_time.store( + value.parse::().unwrap() / 1000, + std::sync::atomic::Ordering::Relaxed, + ); + } + // TODO: maybe store speed too. + } + } + }); + + // TODO: Wait for 1.5 * segment time after start_time to be ready. + return info; } struct TranscodeInfo { - show: (String, Quality) - job: Child - uuid: String + show: (String, Quality), + // TODO: Store if the process as ended (probably Option for the job) + job: Child, + uuid: String, + #[allow(dead_code)] + start_time: i32, + ready_time: Arc, } pub struct Transcoder { @@ -118,22 +158,33 @@ impl Transcoder { } pub async fn transcode( - &mut self, + &self, client_id: String, path: String, quality: Quality, - start_time_sec: f32, - ) { + start_time: i32, + ) -> Result { // TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time. // TODO: Clear cache at startup/every X time without use. // TODO: cache transcoded output for a show/quality and reuse it for every future requests. - if let Some(TranscodeInfo{show: (old_path, old_qual), job, uuid}) = self.running.lock().unwrap().get_mut(&client_id) { - if path == *old_path && quality != *old_qual { - job.interrupt(); + if let Some(TranscodeInfo { + show: (old_path, old_qual), + job, + uuid, + .. + }) = self.running.lock().unwrap().get_mut(&client_id) + { + if path != *old_path || quality != *old_qual { + job.interrupt()?; + } else { + let path = format!("/cache/{uuid}/stream.m3u8", uuid = &uuid); + return std::fs::read_to_string(path); } } - let (uuid, job) = start_transcode(&path, &quality, start_time_sec).await; - self.running.lock().unwrap().insert(client_id, TranscodeInfo { show: (path, quality), job, uuid}); + let info = start_transcode(path, quality, start_time).await; + let path = format!("/cache/{uuid}/stream.m3u8", uuid = &info.uuid); + self.running.lock().unwrap().insert(client_id, info); + std::fs::read_to_string(path) } } diff --git a/transcoder/src/utils.rs b/transcoder/src/utils.rs index 5414e693..c883ed58 100644 --- a/transcoder/src/utils.rs +++ b/transcoder/src/utils.rs @@ -1,4 +1,4 @@ -use std::{io, process::Child}; +use tokio::{io, process::Child}; extern "C" { fn kill(pid: i32, sig: i32) -> i32; @@ -26,13 +26,15 @@ pub trait Signalable { impl Signalable for Child { fn signal(&mut self, signal: i32) -> io::Result<()> { - if self.try_wait()?.is_some() { + let id = self.id(); + + if self.try_wait()?.is_some() || id.is_none() { Err(io::Error::new( io::ErrorKind::InvalidInput, "invalid argument: can't signal an exited process", )) } else { - crate::utils::signal(self.id() as i32, signal) + crate::utils::signal(id.unwrap() as i32, signal) } } }