diff --git a/transcoder/src/main.rs b/transcoder/src/main.rs index 9813d530..24c7ab39 100644 --- a/transcoder/src/main.rs +++ b/transcoder/src/main.rs @@ -18,8 +18,8 @@ async fn get_movie_direct(query: web::Path) -> Result { Ok(NamedFile::open_async(path).await?) } -#[get("/movie/{quality}/{slug}/master.m3u8")] -async fn get_movie_auto( +#[get("/movie/{quality}/{slug}/index.m3u8")] +async fn transcode_movie( req: HttpRequest, query: web::Path<(String, String)>, transcoder: web::Data, @@ -40,6 +40,32 @@ async fn get_movie_auto( .map_err(|_| ApiError::InternalError) } +#[get("/movie/{quality}/{slug}/segments/{chunk}")] +async fn get_movie_chunk( + req: HttpRequest, + query: web::Path<(String, String, u32)>, + transcoder: web::Data, +) -> Result { + let (quality, slug, chunk) = query.into_inner(); + let quality = Quality::from_str(quality.as_str()).map_err(|_| ApiError::BadRequest { + error: "Invalid quality".to_string(), + })?; + let client_id = req.headers().get("x-client-id") + .ok_or(ApiError::BadRequest { error: String::from("Missing client id. Please specify the X-CLIENT-ID header to a guid constant for the lifetime of the player (but unique per instance)."), })? + .to_str().unwrap(); + + let path = paths::get_movie_path(slug); + // TODO: Handle start_time that is not 0 + transcoder + .get_segment(client_id.to_string(), path, quality, chunk) + .await + .map_err(|_| ApiError::InternalError) + .and_then(|path| { + NamedFile::open(path).map_err(|_| ApiError::BadRequest { + error: "Invalid segment number.".to_string(), + }) + }) +} #[actix_web::main] async fn main() -> std::io::Result<()> { @@ -49,7 +75,8 @@ async fn main() -> std::io::Result<()> { App::new() .app_data(state.clone()) .service(get_movie_direct) - .service(get_movie_auto) + .service(transcode_movie) + .service(get_movie_chunk) }) .bind(("0.0.0.0", 7666))? .run() diff --git a/transcoder/src/transcode.rs b/transcoder/src/transcode.rs index e326de8c..e40a3a9d 100644 --- a/transcoder/src/transcode.rs +++ b/transcoder/src/transcode.rs @@ -1,10 +1,11 @@ use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; +use std::path::PathBuf; use std::process::Stdio; use std::str::FromStr; use std::sync::atomic::AtomicI32; -use std::sync::Arc; -use std::{collections::HashMap, sync::Mutex}; +use std::sync::{Arc, RwLock}; +use std::collections::HashMap; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; @@ -136,6 +137,14 @@ async fn start_transcode(path: String, quality: Quality, start_time: i32) -> Tra return info; } +fn get_cache_path(info: &TranscodeInfo) -> PathBuf { + return get_cache_path_from_uuid(&info.uuid); +} + +fn get_cache_path_from_uuid(uuid: &String) -> PathBuf { + return PathBuf::from(format!("/cache/{uuid}/stream.m3u8", uuid = &uuid)); +} + struct TranscodeInfo { show: (String, Quality), // TODO: Store if the process as ended (probably Option for the job) @@ -147,13 +156,13 @@ struct TranscodeInfo { } pub struct Transcoder { - running: Mutex>, + running: RwLock>, } impl Transcoder { pub fn new() -> Transcoder { Self { - running: Mutex::new(HashMap::new()), + running: RwLock::new(HashMap::new()), } } @@ -172,19 +181,38 @@ impl Transcoder { job, uuid, .. - }) = self.running.lock().unwrap().get_mut(&client_id) + }) = self.running.write().unwrap().get_mut(&client_id) { if path != *old_path || quality != *old_qual { job.interrupt()?; } else { - let path = format!("/cache/{uuid}/stream.m3u8", uuid = &uuid); - return std::fs::read_to_string(path); + return std::fs::read_to_string(get_cache_path_from_uuid(uuid)); } } let info = start_transcode(path, quality, start_time).await; - let path = format!("/cache/{uuid}/stream.m3u8", uuid = &info.uuid); - self.running.lock().unwrap().insert(client_id, info); + let path = get_cache_path(&info); + self.running.write().unwrap().insert(client_id, info); std::fs::read_to_string(path) } + + pub async fn get_segment( + &self, + client_id: String, + _path: String, + _quality: Quality, + chunk: u32, + ) -> Result { + let hashmap = self.running.read().unwrap(); + let info = hashmap.get(&client_id).ok_or(SegmentError::NoTranscode)?; + + // TODO: Check if ready_time is far enough for this fragment to exist. + let mut path = get_cache_path(&info); + path.push(format!("segments-{0:02}.ts", chunk)); + Ok(path) + } +} + +pub enum SegmentError { + NoTranscode, }