Allow the transcoder to be run

This commit is contained in:
Zoe Roux 2023-04-13 18:09:12 +09:00
parent 64adc63920
commit 2939ea0787
No known key found for this signature in database
10 changed files with 94 additions and 31 deletions

View File

@ -1,5 +1,6 @@
# Useful config options # Useful config options
LIBRARY_ROOT=/video LIBRARY_ROOT=/video
CACHE_ROOT=/tmp/kyoo_cache
LIBRARY_LANGUAGES=en LIBRARY_LANGUAGES=en
# The following two values should be set to a random sequence of characters. # The following two values should be set to a random sequence of characters.

View File

@ -60,6 +60,7 @@ services:
volumes: volumes:
- ./transcoder:/app - ./transcoder:/app
- ${LIBRARY_ROOT}:/video - ${LIBRARY_ROOT}:/video
- ${CACHE_ROOT}:/cache
ingress: ingress:
image: nginx image: nginx

View File

@ -39,6 +39,7 @@ services:
restart: on-failure restart: on-failure
volumes: volumes:
- ${LIBRARY_ROOT}:/video - ${LIBRARY_ROOT}:/video
- ${CACHE_ROOT}:/cache
ingress: ingress:
image: nginx image: nginx

View File

@ -39,6 +39,7 @@ services:
restart: on-failure restart: on-failure
volumes: volumes:
- ${LIBRARY_ROOT}:/video - ${LIBRARY_ROOT}:/video
- ${CACHE_ROOT}:/cache
ingress: ingress:
image: nginx image: nginx

View File

@ -10,8 +10,8 @@ RUN rm src/lib.rs
COPY src src COPY src src
RUN cargo install --path . RUN cargo install --path .
FROM debian:bullseye-slim FROM alpine
#RUN apt-get update && apt-get install -y extra-runtime-dependencies && rm -rf /var/lib/apt/lists/* RUN apk add --no-cache ffmpeg
COPY --from=builder /usr/local/cargo/bin/transcoder ./transcoder COPY --from=builder /usr/local/cargo/bin/transcoder ./transcoder
EXPOSE 7666 EXPOSE 7666

View File

@ -1,4 +1,5 @@
FROM rust FROM rust:alpine
RUN apk add --no-cache musl-dev ffmpeg
RUN cargo install cargo-watch RUN cargo install cargo-watch
WORKDIR /app WORKDIR /app

View File

@ -25,8 +25,8 @@ impl error::ResponseError for ApiError {
} }
fn status_code(&self) -> StatusCode { fn status_code(&self) -> StatusCode {
match *self { match self {
ApiError::BadRequest { error } => StatusCode::BAD_REQUEST, ApiError::BadRequest { error: _ } => StatusCode::BAD_REQUEST,
ApiError::InternalError => StatusCode::INTERNAL_SERVER_ERROR, ApiError::InternalError => StatusCode::INTERNAL_SERVER_ERROR,
} }
} }

View File

@ -23,19 +23,24 @@ async fn get_movie_auto(
req: HttpRequest, req: HttpRequest,
query: web::Path<(String, String)>, query: web::Path<(String, String)>,
transcoder: web::Data<Transcoder>, transcoder: web::Data<Transcoder>,
) -> Result<NamedFile, ApiError> { ) -> Result<String, ApiError> {
let (quality, slug) = query.into_inner(); let (quality, slug) = query.into_inner();
let quality = Quality::from_str(quality.as_str()).map_err(|_| ApiError::BadRequest { let quality = Quality::from_str(quality.as_str()).map_err(|_| ApiError::BadRequest {
error: "Invalid quality".to_string(), error: "Invalid quality".to_string(),
})?; })?;
let client_id = req.headers().get("x-client-id") let client_id = req.headers().get("x-client-id")
.ok_or(ApiError::BadRequest { error: String::from("Missing client id. Please specify the X-CLIENT-ID header to a guid constant for the lifetime of the player (but unique per instance)."), })?; .ok_or(ApiError::BadRequest { error: String::from("Missing client id. Please specify the X-CLIENT-ID header to a guid constant for the lifetime of the player (but unique per instance)."), })?
.to_str().unwrap();
let path = paths::get_movie_path(slug); let path = paths::get_movie_path(slug);
// TODO: Handle start_time that is not 0
todo!() transcoder
.transcode(client_id.to_string(), path, quality, 0)
.await
.map_err(|_| ApiError::InternalError)
} }
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let state = web::Data::new(Transcoder::new()); let state = web::Data::new(Transcoder::new());

View File

@ -1,8 +1,12 @@
use rand::distributions::Alphanumeric; use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use std::process::{Child, Command}; use std::process::Stdio;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::AtomicI32;
use std::sync::Arc;
use std::{collections::HashMap, sync::Mutex}; use std::{collections::HashMap, sync::Mutex};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use crate::utils::Signalable; use crate::utils::Signalable;
@ -61,7 +65,7 @@ fn get_transcode_video_quality_args(quality: &Quality) -> Vec<&'static str> {
} }
// TODO: Add audios streams (and transcode them only when necesarry) // TODO: Add audios streams (and transcode them only when necesarry)
async fn start_transcode(path: &str, quality: &Quality, start_time_sec: f32) -> (String, Child) { async fn start_transcode(path: String, quality: Quality, start_time: i32) -> TranscodeInfo {
// TODO: Use the out path below once cached segments can be reused. // TODO: Use the out path below once cached segments can be reused.
// let out_dir = format!("/cache/{show_hash}/{quality}"); // let out_dir = format!("/cache/{show_hash}/{quality}");
let uuid: String = thread_rng() let uuid: String = thread_rng()
@ -70,15 +74,19 @@ async fn start_transcode(path: &str, quality: &Quality, start_time_sec: f32) ->
.map(char::from) .map(char::from)
.collect(); .collect();
let out_dir = format!("/cache/{uuid}"); let out_dir = format!("/cache/{uuid}");
std::fs::create_dir(&out_dir).expect("Could not create cache directory");
let segment_time = "10"; let segment_time = "10";
let child = Command::new("ffmpeg") let mut child = Command::new("ffmpeg")
.args(&["-ss", start_time_sec.to_string().as_str()]) .args(&["-progress", "pipe:1"])
.args(&["-i", path]) .args(&["-ss", start_time.to_string().as_str()])
.args(&["-i", path.as_str()])
.args(&["-f", "segment"]) .args(&["-f", "segment"])
.args(&["-segment_list_type", "m3u8"]) .args(&["-segment_list_type", "m3u8"])
// Disable the .tmp file to serve it instantly to the client.
.args(&["-hls_flags", "temp_files"])
// Keep all segments in the list (else only last X are presents, useful for livestreams) // Keep all segments in the list (else only last X are presents, useful for livestreams)
.args(&["--segment_list_size", "0"]) .args(&["-segment_list_size", "0"])
.args(&["-segment_time", segment_time]) .args(&["-segment_time", segment_time])
// Force segments to be exactly segment_time (only works when transcoding) // Force segments to be exactly segment_time (only works when transcoding)
.args(&[ .args(&[
@ -89,21 +97,53 @@ async fn start_transcode(path: &str, quality: &Quality, start_time_sec: f32) ->
"-segment_time_delta", "-segment_time_delta",
"0.1", "0.1",
]) ])
.args(get_transcode_video_quality_args(quality)) .args(get_transcode_video_quality_args(&quality))
.args(&[ .args(&[
"-segment_list".to_string(), "-segment_list".to_string(),
format!("{out_dir}/stream.m3u8"), format!("{out_dir}/stream.m3u8"),
format!("{out_dir}/segments-%02d.ts"), format!("{out_dir}/segments-%02d.ts"),
]) ])
.stdout(Stdio::piped())
.spawn() .spawn()
.expect("ffmpeg failed to start"); .expect("ffmpeg failed to start");
(uuid, child)
let stdout = child.stdout.take().unwrap();
let info = TranscodeInfo {
show: (path, quality),
job: child,
uuid,
start_time,
ready_time: Arc::new(AtomicI32::new(0)),
};
let ready_time = Arc::clone(&info.ready_time);
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Some(line) = reader.next_line().await.unwrap() {
if let Some((key, value)) = line.find(':').map(|i| line.split_at(i)) {
if key == "out_time_ms" {
ready_time.store(
value.parse::<i32>().unwrap() / 1000,
std::sync::atomic::Ordering::Relaxed,
);
}
// TODO: maybe store speed too.
}
}
});
// TODO: Wait for 1.5 * segment time after start_time to be ready.
return info;
} }
struct TranscodeInfo { struct TranscodeInfo {
show: (String, Quality) show: (String, Quality),
job: Child // TODO: Store if the process as ended (probably Option<Child> for the job)
uuid: String job: Child,
uuid: String,
#[allow(dead_code)]
start_time: i32,
ready_time: Arc<AtomicI32>,
} }
pub struct Transcoder { pub struct Transcoder {
@ -118,22 +158,33 @@ impl Transcoder {
} }
pub async fn transcode( pub async fn transcode(
&mut self, &self,
client_id: String, client_id: String,
path: String, path: String,
quality: Quality, quality: Quality,
start_time_sec: f32, start_time: i32,
) { ) -> Result<String, std::io::Error> {
// TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time. // TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time.
// TODO: Clear cache at startup/every X time without use. // TODO: Clear cache at startup/every X time without use.
// TODO: cache transcoded output for a show/quality and reuse it for every future requests. // TODO: cache transcoded output for a show/quality and reuse it for every future requests.
if let Some(TranscodeInfo{show: (old_path, old_qual), job, uuid}) = self.running.lock().unwrap().get_mut(&client_id) { if let Some(TranscodeInfo {
if path == *old_path && quality != *old_qual { show: (old_path, old_qual),
job.interrupt(); job,
uuid,
..
}) = self.running.lock().unwrap().get_mut(&client_id)
{
if path != *old_path || quality != *old_qual {
job.interrupt()?;
} else {
let path = format!("/cache/{uuid}/stream.m3u8", uuid = &uuid);
return std::fs::read_to_string(path);
} }
} }
let (uuid, job) = start_transcode(&path, &quality, start_time_sec).await; let info = start_transcode(path, quality, start_time).await;
self.running.lock().unwrap().insert(client_id, TranscodeInfo { show: (path, quality), job, uuid}); let path = format!("/cache/{uuid}/stream.m3u8", uuid = &info.uuid);
self.running.lock().unwrap().insert(client_id, info);
std::fs::read_to_string(path)
} }
} }

View File

@ -1,4 +1,4 @@
use std::{io, process::Child}; use tokio::{io, process::Child};
extern "C" { extern "C" {
fn kill(pid: i32, sig: i32) -> i32; fn kill(pid: i32, sig: i32) -> i32;
@ -26,13 +26,15 @@ pub trait Signalable {
impl Signalable for Child { impl Signalable for Child {
fn signal(&mut self, signal: i32) -> io::Result<()> { fn signal(&mut self, signal: i32) -> io::Result<()> {
if self.try_wait()?.is_some() { let id = self.id();
if self.try_wait()?.is_some() || id.is_none() {
Err(io::Error::new( Err(io::Error::new(
io::ErrorKind::InvalidInput, io::ErrorKind::InvalidInput,
"invalid argument: can't signal an exited process", "invalid argument: can't signal an exited process",
)) ))
} else { } else {
crate::utils::signal(self.id() as i32, signal) crate::utils::signal(id.unwrap() as i32, signal)
} }
} }
} }