diff --git a/transcoder/src/audio.rs b/transcoder/src/audio.rs index 4a772759..2251a802 100644 --- a/transcoder/src/audio.rs +++ b/transcoder/src/audio.rs @@ -1,7 +1,7 @@ use crate::{ error::ApiError, paths, - transcode::Transcoder, + state::Transcoder, }; use actix_files::NamedFile; use actix_web::{get, web, Result}; diff --git a/transcoder/src/main.rs b/transcoder/src/main.rs index b2b92dd9..e841d629 100644 --- a/transcoder/src/main.rs +++ b/transcoder/src/main.rs @@ -10,13 +10,14 @@ use utoipa::OpenApi; use crate::{ audio::*, identify::{identify, Chapter, MediaInfo, Track}, - transcode::Transcoder, + state::Transcoder, video::*, }; mod audio; mod error; mod identify; mod paths; +mod state; mod transcode; mod utils; mod video; diff --git a/transcoder/src/state.rs b/transcoder/src/state.rs new file mode 100644 index 00000000..b6dadb62 --- /dev/null +++ b/transcoder/src/state.rs @@ -0,0 +1,99 @@ +use crate::transcode::*; +use crate::utils::Signalable; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::RwLock; + +pub struct Transcoder { + running: RwLock>, +} + +impl Transcoder { + pub fn new() -> Transcoder { + Self { + running: RwLock::new(HashMap::new()), + } + } + + pub async fn build_master(&self, _resource: String, _slug: String) -> String { + let mut master = String::from("#EXTM3U\n"); + // TODO: Add transmux (original quality) in this master playlist. + // Transmux should be the first variant since it's used to test bandwidth + // and serve as a hint for preffered variant for clients. + + // TODO: Fetch kyoo to retrieve the max quality and the aspect_ratio + let aspect_ratio = 16.0 / 9.0; + for quality in Quality::iter() { + master.push_str("#EXT-X-STREAM-INF:"); + master.push_str(format!("AVERAGE-BANDWIDTH={},", quality.average_bitrate()).as_str()); + master.push_str(format!("BANDWIDTH={},", quality.max_bitrate()).as_str()); + master.push_str( + format!( + "RESOLUTION={}x{},", + (aspect_ratio * quality.height() as f32).round() as u32, + quality.height() + ) + .as_str(), + ); + master.push_str("CODECS=\"avc1.640028\"\n"); + master.push_str(format!("./{}/index.m3u8\n", quality).as_str()); + } + // TODO: Add audio streams + master + } + + pub async fn transcode( + &self, + client_id: String, + path: String, + quality: Quality, + start_time: u32, + ) -> Result { + // TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time. + // TODO: Clear cache at startup/every X time without use. + // TODO: cache transcoded output for a show/quality and reuse it for every future requests. + if let Some(TranscodeInfo { + show: (old_path, old_qual), + job, + uuid, + .. + }) = self.running.write().unwrap().get_mut(&client_id) + { + if path != *old_path || quality != *old_qual { + // If the job has already ended, interrupt returns an error but we don't care. + _ = job.interrupt(); + } else { + let mut path = get_cache_path_from_uuid(uuid); + path.push("stream.m3u8"); + return std::fs::read_to_string(path); + } + } + + let info = transcode_video(path, quality, start_time).await; + let mut path = get_cache_path(&info); + path.push("stream.m3u8"); + self.running.write().unwrap().insert(client_id, info); + std::fs::read_to_string(path) + } + + // TODO: Use path/quality instead of client_id + pub async fn get_segment( + &self, + client_id: String, + _path: String, + _quality: Quality, + chunk: u32, + ) -> Result { + let hashmap = self.running.read().unwrap(); + let info = hashmap.get(&client_id).ok_or(SegmentError::NoTranscode)?; + + // TODO: Check if ready_time is far enough for this fragment to exist. + let mut path = get_cache_path(&info); + path.push(format!("segments-{0:02}.ts", chunk)); + Ok(path) + } +} + +pub enum SegmentError { + NoTranscode, +} diff --git a/transcoder/src/transcode.rs b/transcoder/src/transcode.rs index 163acd66..2e692dd1 100644 --- a/transcoder/src/transcode.rs +++ b/transcoder/src/transcode.rs @@ -2,20 +2,16 @@ use derive_more::Display; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use serde::Serialize; -use std::collections::HashMap; use std::collections::hash_map::DefaultHasher; -use std::hash::{Hasher, Hash}; +use std::hash::{Hash, Hasher}; use std::path::PathBuf; use std::process::Stdio; use std::slice::Iter; use std::str::FromStr; -use std::sync::RwLock; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::sync::watch; -use crate::utils::Signalable; - const SEGMENT_TIME: u32 = 10; #[derive(PartialEq, Eq, Serialize, Display)] @@ -41,7 +37,7 @@ pub enum Quality { } impl Quality { - fn iter() -> Iter<'static, Quality> { + pub fn iter() -> Iter<'static, Quality> { static QUALITIES: [Quality; 8] = [ Quality::P240, Quality::P360, @@ -57,7 +53,7 @@ impl Quality { QUALITIES.iter() } - fn height(&self) -> u32 { + pub fn height(&self) -> u32 { match self { Self::P240 => 240, Self::P360 => 360, @@ -72,7 +68,7 @@ impl Quality { } // I'm not entierly sure about the values for bitrates. Double checking would be nice. - fn average_bitrate(&self) -> u32 { + pub fn average_bitrate(&self) -> u32 { match self { Self::P240 => 400_000, Self::P360 => 800_000, @@ -86,7 +82,7 @@ impl Quality { } } - fn max_bitrate(&self) -> u32 { + pub fn max_bitrate(&self) -> u32 { match self { Self::P240 => 700_000, Self::P360 => 1400_000, @@ -172,7 +168,7 @@ fn get_transcode_video_quality_args(quality: &Quality, segment_time: u32) -> Vec .collect() } -async fn transcode_audio(path: String, audio: u32) -> TranscodeInfo { +pub async fn transcode_audio(path: String, audio: u32) -> TranscodeInfo { let mut hasher = DefaultHasher::new(); path.hash(&mut hasher); audio.hash(&mut hasher); @@ -188,7 +184,7 @@ async fn transcode_audio(path: String, audio: u32) -> TranscodeInfo { todo!() } -async fn transcode_video(path: String, quality: Quality, start_time: u32) -> TranscodeInfo { +pub async fn transcode_video(path: String, quality: Quality, start_time: u32) -> TranscodeInfo { // TODO: Use the out path below once cached segments can be reused. // let out_dir = format!("/cache/{show_hash}/{quality}"); let uuid: String = thread_rng() @@ -269,11 +265,11 @@ async fn start_transcode( } } -fn get_cache_path(info: &TranscodeInfo) -> PathBuf { +pub fn get_cache_path(info: &TranscodeInfo) -> PathBuf { return get_cache_path_from_uuid(&info.uuid); } -fn get_cache_path_from_uuid(uuid: &String) -> PathBuf { +pub fn get_cache_path_from_uuid(uuid: &String) -> PathBuf { return PathBuf::from(format!("/cache/{uuid}/", uuid = &uuid)); } @@ -282,97 +278,3 @@ pub struct TranscodeInfo { job: Child, uuid: String, } - -pub struct Transcoder { - running: RwLock>, -} - -impl Transcoder { - pub fn new() -> Transcoder { - Self { - running: RwLock::new(HashMap::new()), - } - } - - pub async fn build_master(&self, _resource: String, _slug: String) -> String { - let mut master = String::from("#EXTM3U\n"); - // TODO: Add transmux (original quality) in this master playlist. - // Transmux should be the first variant since it's used to test bandwidth - // and serve as a hint for preffered variant for clients. - - // TODO: Fetch kyoo to retrieve the max quality and the aspect_ratio - let aspect_ratio = 16.0 / 9.0; - for quality in Quality::iter() { - master.push_str("#EXT-X-STREAM-INF:"); - master.push_str(format!("AVERAGE-BANDWIDTH={},", quality.average_bitrate()).as_str()); - master.push_str(format!("BANDWIDTH={},", quality.max_bitrate()).as_str()); - master.push_str( - format!( - "RESOLUTION={}x{},", - (aspect_ratio * quality.height() as f32).round() as u32, - quality.height() - ) - .as_str(), - ); - master.push_str("CODECS=\"avc1.640028\"\n"); - master.push_str(format!("./{}/index.m3u8\n", quality).as_str()); - } - // TODO: Add audio streams - master - } - - pub async fn transcode( - &self, - client_id: String, - path: String, - quality: Quality, - start_time: u32, - ) -> Result { - // TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time. - // TODO: Clear cache at startup/every X time without use. - // TODO: cache transcoded output for a show/quality and reuse it for every future requests. - if let Some(TranscodeInfo { - show: (old_path, old_qual), - job, - uuid, - .. - }) = self.running.write().unwrap().get_mut(&client_id) - { - if path != *old_path || quality != *old_qual { - // If the job has already ended, interrupt returns an error but we don't care. - _ = job.interrupt(); - } else { - let mut path = get_cache_path_from_uuid(uuid); - path.push("stream.m3u8"); - return std::fs::read_to_string(path); - } - } - - let info = transcode_video(path, quality, start_time).await; - let mut path = get_cache_path(&info); - path.push("stream.m3u8"); - self.running.write().unwrap().insert(client_id, info); - std::fs::read_to_string(path) - } - - // TODO: Use path/quality instead of client_id - pub async fn get_segment( - &self, - client_id: String, - _path: String, - _quality: Quality, - chunk: u32, - ) -> Result { - let hashmap = self.running.read().unwrap(); - let info = hashmap.get(&client_id).ok_or(SegmentError::NoTranscode)?; - - // TODO: Check if ready_time is far enough for this fragment to exist. - let mut path = get_cache_path(&info); - path.push(format!("segments-{0:02}.ts", chunk)); - Ok(path) - } -} - -pub enum SegmentError { - NoTranscode, -} diff --git a/transcoder/src/video.rs b/transcoder/src/video.rs index f6a1c970..e088b981 100644 --- a/transcoder/src/video.rs +++ b/transcoder/src/video.rs @@ -1,10 +1,6 @@ use std::str::FromStr; -use crate::{ - error::ApiError, - transcode::{Quality, Transcoder}, - utils::get_client_id, paths, -}; +use crate::{error::ApiError, paths, state::Transcoder, transcode::Quality, utils::get_client_id}; use actix_files::NamedFile; use actix_web::{get, web, HttpRequest, Result};