diff --git a/transcoder/src/transcode.rs b/transcoder/src/transcode.rs index 9b2c3b38..163acd66 100644 --- a/transcoder/src/transcode.rs +++ b/transcoder/src/transcode.rs @@ -3,6 +3,8 @@ use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use serde::Serialize; use std::collections::HashMap; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hasher, Hash}; use std::path::PathBuf; use std::process::Stdio; use std::slice::Iter; @@ -10,10 +12,12 @@ use std::str::FromStr; use std::sync::RwLock; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; -use tokio::sync::watch::{self, Receiver}; +use tokio::sync::watch; use crate::utils::Signalable; +const SEGMENT_TIME: u32 = 10; + #[derive(PartialEq, Eq, Serialize, Display)] pub enum Quality { #[display(fmt = "240p")] @@ -119,6 +123,18 @@ impl FromStr for Quality { } } +fn get_transcode_audio_args(audio_idx: u32) -> Vec { + // TODO: Support multy audio qualities. + return vec![ + "-map".to_string(), + format!("0:a:{}", audio_idx), + "-c:a".to_string(), + "aac".to_string(), + "-b:a".to_string(), + "128k".to_string(), + ]; +} + fn get_transcode_video_quality_args(quality: &Quality, segment_time: u32) -> Vec { if *quality == Quality::Original { return vec!["-map", "0:v:0", "-c:v", "copy"] @@ -156,8 +172,23 @@ fn get_transcode_video_quality_args(quality: &Quality, segment_time: u32) -> Vec .collect() } -// TODO: Add audios streams (and transcode them only when necesarry) -async fn start_transcode(path: String, quality: Quality, start_time: u32) -> TranscodeInfo { +async fn transcode_audio(path: String, audio: u32) -> TranscodeInfo { + let mut hasher = DefaultHasher::new(); + path.hash(&mut hasher); + audio.hash(&mut hasher); + let hash = hasher.finish(); + + let child = start_transcode( + &path, + &format!("/cache/{hash}"), + get_transcode_audio_args(audio), + 0, + ) + .await; + todo!() +} + +async fn transcode_video(path: String, quality: Quality, start_time: u32) -> TranscodeInfo { // TODO: Use the out path below once cached segments can be reused. // let out_dir = format!("/cache/{show_hash}/{quality}"); let uuid: String = thread_rng() @@ -168,7 +199,26 @@ async fn start_transcode(path: String, quality: Quality, start_time: u32) -> Tra let out_dir = format!("/cache/{uuid}"); std::fs::create_dir(&out_dir).expect("Could not create cache directory"); - let segment_time: u32 = 10; + let child = start_transcode( + &path, + &out_dir, + get_transcode_video_quality_args(&quality, SEGMENT_TIME), + start_time, + ) + .await; + TranscodeInfo { + show: (path, quality), + job: child, + uuid, + } +} + +async fn start_transcode( + path: &String, + out_dir: &String, + encode_args: Vec, + start_time: u32, +) -> Child { let mut cmd = Command::new("ffmpeg"); cmd.args(&["-progress", "pipe:1"]) .arg("-nostats") @@ -181,8 +231,8 @@ async fn start_transcode(path: String, quality: Quality, start_time: u32) -> Tra // .args(&["-hls_allow_cache", "1"]) // Keep all segments in the list (else only last X are presents, useful for livestreams) .args(&["-hls_list_size", "0"]) - .args(&["-hls_time", segment_time.to_string().as_str()]) - .args(get_transcode_video_quality_args(&quality, segment_time)) + .args(&["-hls_time", SEGMENT_TIME.to_string().as_str()]) + .args(&encode_args) .args(&[ "-hls_segment_filename".to_string(), format!("{out_dir}/segments-%02d.ts"), @@ -213,18 +263,10 @@ async fn start_transcode(path: String, quality: Quality, start_time: u32) -> Tra loop { rx.changed().await.unwrap(); let ready_time = *rx.borrow(); - if ready_time >= (1.5 * segment_time as f32) as u32 + start_time { - break; + if ready_time >= (1.5 * SEGMENT_TIME as f32) as u32 + start_time { + return child; } } - - TranscodeInfo { - show: (path, quality), - job: child, - uuid, - start_time, - ready_time: rx, - } } fn get_cache_path(info: &TranscodeInfo) -> PathBuf { @@ -235,15 +277,10 @@ fn get_cache_path_from_uuid(uuid: &String) -> PathBuf { return PathBuf::from(format!("/cache/{uuid}/", uuid = &uuid)); } -struct TranscodeInfo { +pub struct TranscodeInfo { show: (String, Quality), - // TODO: Store if the process as ended (probably Option for the job) job: Child, uuid: String, - #[allow(dead_code)] - start_time: u32, - #[allow(dead_code)] - ready_time: Receiver, } pub struct Transcoder { @@ -311,7 +348,7 @@ impl Transcoder { } } - let info = start_transcode(path, quality, start_time).await; + let info = transcode_video(path, quality, start_time).await; let mut path = get_cache_path(&info); path.push("stream.m3u8"); self.running.write().unwrap().insert(client_id, info);