diff --git a/transcoder/Cargo.lock b/transcoder/Cargo.lock index bd751f79..f7e404a1 100644 --- a/transcoder/Cargo.lock +++ b/transcoder/Cargo.lock @@ -1072,6 +1072,7 @@ version = "0.1.0" dependencies = [ "actix-files", "actix-web", + "derive_more", "rand", "serde", "tokio", diff --git a/transcoder/Cargo.toml b/transcoder/Cargo.toml index 271a71aa..531d8c76 100644 --- a/transcoder/Cargo.toml +++ b/transcoder/Cargo.toml @@ -9,3 +9,4 @@ actix-files = "0.6.2" tokio = { version = "1.27.0", features = ["process"] } serde = { version = "1.0.159", features = ["derive"] } rand = "0.8.5" +derive_more = "0.99.17" diff --git a/transcoder/src/error.rs b/transcoder/src/error.rs new file mode 100644 index 00000000..94b67137 --- /dev/null +++ b/transcoder/src/error.rs @@ -0,0 +1,33 @@ +use actix_web::{ + error, + http::{header::ContentType, StatusCode}, + HttpResponse, +}; +use derive_more::{Display, Error}; + +#[derive(Debug, Display, Error)] +pub enum ApiError { + #[display(fmt = "{}", error)] + BadRequest { error: String }, + #[display(fmt = "An internal error occurred. Please try again later.")] + InternalError, +} + +impl error::ResponseError for ApiError { + fn error_response(&self) -> HttpResponse { + HttpResponse::build(self.status_code()) + .insert_header(ContentType::json()) + .body(format!( + "{{ \"status\": \"{status}\", \"error\": \"{err}\" }}", + status = self.status_code(), + err = self.to_string() + )) + } + + fn status_code(&self) -> StatusCode { + match *self { + ApiError::BadRequest { error } => StatusCode::BAD_REQUEST, + ApiError::InternalError => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} diff --git a/transcoder/src/main.rs b/transcoder/src/main.rs index 1c9540b8..c6afb090 100644 --- a/transcoder/src/main.rs +++ b/transcoder/src/main.rs @@ -1,9 +1,14 @@ -use actix_files::NamedFile; -use actix_web::{get, web, App, HttpServer, Result}; +use std::str::FromStr; -use crate::transcode::{Quality, TranscoderState}; +use actix_files::NamedFile; +use actix_web::{get, web, App, HttpRequest, HttpServer, Result}; +use error::ApiError; + +use crate::transcode::{Quality, Transcoder}; +mod error; mod paths; mod transcode; +mod utils; #[get("/movie/direct/{slug}")] async fn get_movie_direct(query: web::Path) -> Result { @@ -13,20 +18,27 @@ async fn get_movie_direct(query: web::Path) -> Result { Ok(NamedFile::open_async(path).await?) } -#[get("/movie/{quality}/{slug}")] +#[get("/movie/{quality}/{slug}/master.m3u8")] async fn get_movie_auto( + req: HttpRequest, query: web::Path<(String, String)>, - state: web::Data, -) -> Result { + transcoder: web::Data, +) -> Result { let (quality, slug) = query.into_inner(); + let quality = Quality::from_str(quality.as_str()).map_err(|_| ApiError::BadRequest { + error: "Invalid quality".to_string(), + })?; + let client_id = req.headers().get("x-client-id") + .ok_or(ApiError::BadRequest { error: String::from("Missing client id. Please specify the X-CLIENT-ID header to a guid constant for the lifetime of the player (but unique per instance)."), })?; + let path = paths::get_movie_path(slug); - Ok(NamedFile::open_async(path).await?) + todo!() } #[actix_web::main] async fn main() -> std::io::Result<()> { - let state = web::Data::new(TranscoderState::new()); + let state = web::Data::new(Transcoder::new()); HttpServer::new(move || { App::new() diff --git a/transcoder/src/transcode.rs b/transcoder/src/transcode.rs index 3b3c07bd..453d58a2 100644 --- a/transcoder/src/transcode.rs +++ b/transcoder/src/transcode.rs @@ -1,21 +1,12 @@ use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; -use serde::{Deserialize, Serialize}; -use std::process::{Command, Child}; +use std::process::{Child, Command}; +use std::str::FromStr; use std::{collections::HashMap, sync::Mutex}; -pub struct TranscoderState { - running: Mutex>, -} - -impl TranscoderState { - pub fn new() -> TranscoderState { - Self { - running: Mutex::new(HashMap::new()), - } - } -} +use crate::utils::Signalable; +#[derive(PartialEq, Eq)] pub enum Quality { P240, P360, @@ -28,9 +19,29 @@ pub enum Quality { Original, } -imhttps://sgsot3a.sic.shibaura-it.ac.jp/ +#[derive(Debug, PartialEq, Eq)] +pub struct InvalidValueError; -fn get_transcode_video_quality_args(quality: Quality) -> Vec<&'static str> { +impl FromStr for Quality { + type Err = InvalidValueError; + + fn from_str(s: &str) -> Result { + match s { + "240p" => Ok(Quality::P240), + "360p" => Ok(Quality::P360), + "480p" => Ok(Quality::P480), + "720p" => Ok(Quality::P720), + "1080p" => Ok(Quality::P1080), + "1440p" => Ok(Quality::P1440), + "4k" => Ok(Quality::P4k), + "8k" => Ok(Quality::P8k), + "original" => Ok(Quality::Original), + _ => Err(InvalidValueError), + } + } +} + +fn get_transcode_video_quality_args(quality: &Quality) -> Vec<&'static str> { // superfast or ultrafast would produce a file extremly big so we prever veryfast. let enc_base: Vec<&str> = vec![ "-map", "0:v:0", "-c:v", "libx264", "-crf", "21", "-preset", "veryfast", @@ -50,7 +61,7 @@ fn get_transcode_video_quality_args(quality: Quality) -> Vec<&'static str> { } // TODO: Add audios streams (and transcode them only when necesarry) -async fn start_transcode(path: &str, quality: Quality, start_time_sec: f32) -> (String, Child) { +async fn start_transcode(path: &str, quality: &Quality, start_time_sec: f32) -> (String, Child) { // TODO: Use the out path below once cached segments can be reused. // let out_dir = format!("/cache/{show_hash}/{quality}"); let uuid: String = thread_rng() @@ -89,6 +100,40 @@ async fn start_transcode(path: &str, quality: Quality, start_time_sec: f32) -> ( (uuid, child) } -pub async fn transcode(user_id: u32, path: &str, quality: Quality, start_time_sec: f32) { - +struct TranscodeInfo { + show: (String, Quality) + job: Child + uuid: String +} + +pub struct Transcoder { + running: Mutex>, +} + +impl Transcoder { + pub fn new() -> Transcoder { + Self { + running: Mutex::new(HashMap::new()), + } + } + + pub async fn transcode( + &mut self, + client_id: String, + path: String, + quality: Quality, + start_time_sec: f32, + ) { + // TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time. + // TODO: Clear cache at startup/every X time without use. + // TODO: cache transcoded output for a show/quality and reuse it for every future requests. + if let Some(TranscodeInfo{show: (old_path, old_qual), job, uuid}) = self.running.lock().unwrap().get_mut(&client_id) { + if path == *old_path && quality != *old_qual { + job.interrupt(); + } + } + + let (uuid, job) = start_transcode(&path, &quality, start_time_sec).await; + self.running.lock().unwrap().insert(client_id, TranscodeInfo { show: (path, quality), job, uuid}); + } } diff --git a/transcoder/src/utils.rs b/transcoder/src/utils.rs new file mode 100644 index 00000000..5414e693 --- /dev/null +++ b/transcoder/src/utils.rs @@ -0,0 +1,38 @@ +use std::{io, process::Child}; + +extern "C" { + fn kill(pid: i32, sig: i32) -> i32; +} + +/// Signal the process `pid` +fn signal(pid: i32, signal: i32) -> io::Result<()> { + let ret = unsafe { kill(pid, signal) }; + if ret == 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } +} + +pub trait Signalable { + /// Signal the thing + fn signal(&mut self, signal: i32) -> io::Result<()>; + + /// Send SIGINT + fn interrupt(&mut self) -> io::Result<()> { + self.signal(2) + } +} + +impl Signalable for Child { + fn signal(&mut self, signal: i32) -> io::Result<()> { + if self.try_wait()?.is_some() { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid argument: can't signal an exited process", + )) + } else { + crate::utils::signal(self.id() as i32, signal) + } + } +}