diff --git a/transcoder/src/audio.rs b/transcoder/src/audio.rs index 2251a802..a8dcc070 100644 --- a/transcoder/src/audio.rs +++ b/transcoder/src/audio.rs @@ -22,7 +22,7 @@ use actix_web::{get, web, Result}; ("audio" = u32, Path, description = "Specify the audio stream you want. For mappings, refer to the audios fields of the /watch response."), ) )] -#[get("/audio/{resource}/{slug}/{audio}/index.m3u8")] +#[get("/{resource}/{slug}/audio/{audio}/index.m3u8")] async fn get_audio_transcoded( query: web::Path<(String, String, u32)>, transcoder: web::Data, @@ -33,7 +33,7 @@ async fn get_audio_transcoded( .map_err(|_| ApiError::NotFound)?; transcoder - .get_transcoded_audio(path, audio) + .transcode_audio(path, audio) .await .map_err(|_| ApiError::InternalError) } @@ -53,7 +53,7 @@ async fn get_audio_transcoded( ("chunk" = u32, Path, description = "The number of the chunk"), ) )] -#[get("/audio/{resource}/{slug}/{audio}/segments-{chunk}.ts")] +#[get("/{resource}/{slug}/audio/{audio}/segments-{chunk}.ts")] async fn get_audio_chunk( query: web::Path<(String, String, u32, u32)>, transcoder: web::Data, diff --git a/transcoder/src/state.rs b/transcoder/src/state.rs index b6dadb62..ff2d18b9 100644 --- a/transcoder/src/state.rs +++ b/transcoder/src/state.rs @@ -6,12 +6,14 @@ use std::sync::RwLock; pub struct Transcoder { running: RwLock>, + audio_jobs: RwLock>, } impl Transcoder { pub fn new() -> Transcoder { Self { running: RwLock::new(HashMap::new()), + audio_jobs: RwLock::new(Vec::new()), } } @@ -24,6 +26,7 @@ impl Transcoder { // TODO: Fetch kyoo to retrieve the max quality and the aspect_ratio let aspect_ratio = 16.0 / 9.0; for quality in Quality::iter() { + // Doc: https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/creating_a_multivariant_playlist master.push_str("#EXT-X-STREAM-INF:"); master.push_str(format!("AVERAGE-BANDWIDTH={},", quality.average_bitrate()).as_str()); master.push_str(format!("BANDWIDTH={},", quality.max_bitrate()).as_str()); @@ -35,10 +38,26 @@ impl Transcoder { ) .as_str(), ); - master.push_str("CODECS=\"avc1.640028\"\n"); + master.push_str("CODECS=\"avc1.640028\","); + // With multiple audio qualities, maybe switch qualities depending on the video quality. + master.push_str("AUDIO=\"audio\"\n"); master.push_str(format!("./{}/index.m3u8\n", quality).as_str()); } - // TODO: Add audio streams + // TODO: Fetch audio stream list/metadata from kyoo. + for audio in vec![0] { + // Doc: https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/adding_alternate_media_to_a_playlist + master.push_str("#EXT-X-MEDIA:TYPE=AUDIO,"); + // The group-id allows to distinguish multiple qualities from multiple variants. + // We could create another quality set and use group-ids hiqual and lowqual. + master.push_str("GROUP-ID=\"audio\","); + // master.push_str(format!("LANGUAGE=\"{}\",", "eng").as_str()); + master.push_str(format!("NAME=\"{}\",", "Default").as_str()); + // TODO: Support aac5.1 (and specify the number of channel bellow) + // master.push_str(format!("CHANNELS=\"{}\",", 2).as_str()); + master.push_str("DEFAULT=YES,"); + master.push_str(format!("URI=\"./audio/{}/index.m3u8\"\n", audio).as_str()); + } + master } @@ -87,11 +106,45 @@ impl Transcoder { let hashmap = self.running.read().unwrap(); let info = hashmap.get(&client_id).ok_or(SegmentError::NoTranscode)?; - // TODO: Check if ready_time is far enough for this fragment to exist. + // If the segment is in the playlist file, it is available so we don't need to check that. let mut path = get_cache_path(&info); path.push(format!("segments-{0:02}.ts", chunk)); Ok(path) } + + pub async fn transcode_audio( + &self, + path: String, + audio: u32, + ) -> Result { + let mut stream = PathBuf::from(get_audio_path(&path, audio)); + stream.push("stream.m3u8"); + + if !self + .audio_jobs + .read() + .unwrap() + .contains(&(path.clone(), audio)) + { + // TODO: If two concurrent requests for the same audio came, the first one will + // initialize the transcode and wait for the second segment while the second will use + // the same transcode but not wait and retrieve a potentially invalid playlist file. + self.audio_jobs.write().unwrap().push((path.clone(), audio)); + transcode_audio(path, audio).await; + } + std::fs::read_to_string(stream) + } + + pub async fn get_audio_segment( + &self, + path: String, + audio: u32, + chunk: u32, + ) -> Result { + let mut path = PathBuf::from(get_audio_path(&path, audio)); + path.push(format!("segments-{0:02}.ts", chunk)); + Ok(path) + } } pub enum SegmentError { diff --git a/transcoder/src/transcode.rs b/transcoder/src/transcode.rs index 2e692dd1..13fc3532 100644 --- a/transcoder/src/transcode.rs +++ b/transcoder/src/transcode.rs @@ -168,20 +168,14 @@ fn get_transcode_video_quality_args(quality: &Quality, segment_time: u32) -> Vec .collect() } -pub async fn transcode_audio(path: String, audio: u32) -> TranscodeInfo { - let mut hasher = DefaultHasher::new(); - path.hash(&mut hasher); - audio.hash(&mut hasher); - let hash = hasher.finish(); - - let child = start_transcode( +pub async fn transcode_audio(path: String, audio: u32) { + start_transcode( &path, - &format!("/cache/{hash}"), + &get_audio_path(&path, audio), get_transcode_audio_args(audio), 0, ) .await; - todo!() } pub async fn transcode_video(path: String, quality: Quality, start_time: u32) -> TranscodeInfo { @@ -193,7 +187,6 @@ pub async fn transcode_video(path: String, quality: Quality, start_time: u32) -> .map(char::from) .collect(); let out_dir = format!("/cache/{uuid}"); - std::fs::create_dir(&out_dir).expect("Could not create cache directory"); let child = start_transcode( &path, @@ -215,6 +208,8 @@ async fn start_transcode( encode_args: Vec, start_time: u32, ) -> Child { + std::fs::create_dir(&out_dir).expect("Could not create cache directory"); + let mut cmd = Command::new("ffmpeg"); cmd.args(&["-progress", "pipe:1"]) .arg("-nostats") @@ -248,16 +243,16 @@ async fn start_transcode( let value = &value[1..]; // Can't use ms since ms and us are both set to us /shrug if key == "out_time_us" { - tx.send(value.parse::().unwrap() / 1_000_000).unwrap(); + let _ = tx.send(value.parse::().unwrap() / 1_000_000); } - // TODO: maybe store speed too. } } }); // Wait for 1.5 * segment time after start_time to be ready. loop { - rx.changed().await.unwrap(); + // TODO: Create a better error handling for here. + rx.changed().await.expect("Invalid audio index."); let ready_time = *rx.borrow(); if ready_time >= (1.5 * SEGMENT_TIME as f32) as u32 + start_time { return child; @@ -265,6 +260,14 @@ async fn start_transcode( } } +pub fn get_audio_path(path: &String, audio: u32) -> String { + let mut hasher = DefaultHasher::new(); + path.hash(&mut hasher); + audio.hash(&mut hasher); + let hash = hasher.finish(); + format!("/cache/{hash:x}") +} + pub fn get_cache_path(info: &TranscodeInfo) -> PathBuf { return get_cache_path_from_uuid(&info.uuid); } @@ -274,7 +277,7 @@ pub fn get_cache_path_from_uuid(uuid: &String) -> PathBuf { } pub struct TranscodeInfo { - show: (String, Quality), - job: Child, - uuid: String, + pub show: (String, Quality), + pub job: Child, + pub uuid: String, } diff --git a/transcoder/src/utils.rs b/transcoder/src/utils.rs index 9b254bf4..a35a87db 100644 --- a/transcoder/src/utils.rs +++ b/transcoder/src/utils.rs @@ -43,6 +43,7 @@ impl Signalable for Child { } pub fn get_client_id(req: HttpRequest) -> Result { + // return Ok(String::from("1234")); req.headers().get("x-client-id") .ok_or(ApiError::BadRequest { error: String::from("Missing client id. Please specify the X-CLIENT-ID header to a guid constant for the lifetime of the player (but unique per instance)."), }) .map(|x| x.to_str().unwrap().to_string())