Add audio transcoding

This commit is contained in:
Zoe Roux 2023-05-02 02:14:08 +09:00
parent 47c7617d24
commit 8ba80e93e3
No known key found for this signature in database
4 changed files with 79 additions and 22 deletions

View File

@ -22,7 +22,7 @@ use actix_web::{get, web, Result};
("audio" = u32, Path, description = "Specify the audio stream you want. For mappings, refer to the audios fields of the /watch response."), ("audio" = u32, Path, description = "Specify the audio stream you want. For mappings, refer to the audios fields of the /watch response."),
) )
)] )]
#[get("/audio/{resource}/{slug}/{audio}/index.m3u8")] #[get("/{resource}/{slug}/audio/{audio}/index.m3u8")]
async fn get_audio_transcoded( async fn get_audio_transcoded(
query: web::Path<(String, String, u32)>, query: web::Path<(String, String, u32)>,
transcoder: web::Data<Transcoder>, transcoder: web::Data<Transcoder>,
@ -33,7 +33,7 @@ async fn get_audio_transcoded(
.map_err(|_| ApiError::NotFound)?; .map_err(|_| ApiError::NotFound)?;
transcoder transcoder
.get_transcoded_audio(path, audio) .transcode_audio(path, audio)
.await .await
.map_err(|_| ApiError::InternalError) .map_err(|_| ApiError::InternalError)
} }
@ -53,7 +53,7 @@ async fn get_audio_transcoded(
("chunk" = u32, Path, description = "The number of the chunk"), ("chunk" = u32, Path, description = "The number of the chunk"),
) )
)] )]
#[get("/audio/{resource}/{slug}/{audio}/segments-{chunk}.ts")] #[get("/{resource}/{slug}/audio/{audio}/segments-{chunk}.ts")]
async fn get_audio_chunk( async fn get_audio_chunk(
query: web::Path<(String, String, u32, u32)>, query: web::Path<(String, String, u32, u32)>,
transcoder: web::Data<Transcoder>, transcoder: web::Data<Transcoder>,

View File

@ -6,12 +6,14 @@ use std::sync::RwLock;
pub struct Transcoder { pub struct Transcoder {
running: RwLock<HashMap<String, TranscodeInfo>>, running: RwLock<HashMap<String, TranscodeInfo>>,
audio_jobs: RwLock<Vec<(String, u32)>>,
} }
impl Transcoder { impl Transcoder {
pub fn new() -> Transcoder { pub fn new() -> Transcoder {
Self { Self {
running: RwLock::new(HashMap::new()), running: RwLock::new(HashMap::new()),
audio_jobs: RwLock::new(Vec::new()),
} }
} }
@ -24,6 +26,7 @@ impl Transcoder {
// TODO: Fetch kyoo to retrieve the max quality and the aspect_ratio // TODO: Fetch kyoo to retrieve the max quality and the aspect_ratio
let aspect_ratio = 16.0 / 9.0; let aspect_ratio = 16.0 / 9.0;
for quality in Quality::iter() { for quality in Quality::iter() {
// Doc: https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/creating_a_multivariant_playlist
master.push_str("#EXT-X-STREAM-INF:"); master.push_str("#EXT-X-STREAM-INF:");
master.push_str(format!("AVERAGE-BANDWIDTH={},", quality.average_bitrate()).as_str()); master.push_str(format!("AVERAGE-BANDWIDTH={},", quality.average_bitrate()).as_str());
master.push_str(format!("BANDWIDTH={},", quality.max_bitrate()).as_str()); master.push_str(format!("BANDWIDTH={},", quality.max_bitrate()).as_str());
@ -35,10 +38,26 @@ impl Transcoder {
) )
.as_str(), .as_str(),
); );
master.push_str("CODECS=\"avc1.640028\"\n"); master.push_str("CODECS=\"avc1.640028\",");
// With multiple audio qualities, maybe switch qualities depending on the video quality.
master.push_str("AUDIO=\"audio\"\n");
master.push_str(format!("./{}/index.m3u8\n", quality).as_str()); master.push_str(format!("./{}/index.m3u8\n", quality).as_str());
} }
// TODO: Add audio streams // TODO: Fetch audio stream list/metadata from kyoo.
for audio in vec![0] {
// Doc: https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/adding_alternate_media_to_a_playlist
master.push_str("#EXT-X-MEDIA:TYPE=AUDIO,");
// The group-id allows to distinguish multiple qualities from multiple variants.
// We could create another quality set and use group-ids hiqual and lowqual.
master.push_str("GROUP-ID=\"audio\",");
// master.push_str(format!("LANGUAGE=\"{}\",", "eng").as_str());
master.push_str(format!("NAME=\"{}\",", "Default").as_str());
// TODO: Support aac5.1 (and specify the number of channel bellow)
// master.push_str(format!("CHANNELS=\"{}\",", 2).as_str());
master.push_str("DEFAULT=YES,");
master.push_str(format!("URI=\"./audio/{}/index.m3u8\"\n", audio).as_str());
}
master master
} }
@ -87,11 +106,45 @@ impl Transcoder {
let hashmap = self.running.read().unwrap(); let hashmap = self.running.read().unwrap();
let info = hashmap.get(&client_id).ok_or(SegmentError::NoTranscode)?; let info = hashmap.get(&client_id).ok_or(SegmentError::NoTranscode)?;
// TODO: Check if ready_time is far enough for this fragment to exist. // If the segment is in the playlist file, it is available so we don't need to check that.
let mut path = get_cache_path(&info); let mut path = get_cache_path(&info);
path.push(format!("segments-{0:02}.ts", chunk)); path.push(format!("segments-{0:02}.ts", chunk));
Ok(path) Ok(path)
} }
pub async fn transcode_audio(
&self,
path: String,
audio: u32,
) -> Result<String, std::io::Error> {
let mut stream = PathBuf::from(get_audio_path(&path, audio));
stream.push("stream.m3u8");
if !self
.audio_jobs
.read()
.unwrap()
.contains(&(path.clone(), audio))
{
// TODO: If two concurrent requests for the same audio came, the first one will
// initialize the transcode and wait for the second segment while the second will use
// the same transcode but not wait and retrieve a potentially invalid playlist file.
self.audio_jobs.write().unwrap().push((path.clone(), audio));
transcode_audio(path, audio).await;
}
std::fs::read_to_string(stream)
}
pub async fn get_audio_segment(
&self,
path: String,
audio: u32,
chunk: u32,
) -> Result<PathBuf, std::io::Error> {
let mut path = PathBuf::from(get_audio_path(&path, audio));
path.push(format!("segments-{0:02}.ts", chunk));
Ok(path)
}
} }
pub enum SegmentError { pub enum SegmentError {

View File

@ -168,20 +168,14 @@ fn get_transcode_video_quality_args(quality: &Quality, segment_time: u32) -> Vec
.collect() .collect()
} }
pub async fn transcode_audio(path: String, audio: u32) -> TranscodeInfo { pub async fn transcode_audio(path: String, audio: u32) {
let mut hasher = DefaultHasher::new(); start_transcode(
path.hash(&mut hasher);
audio.hash(&mut hasher);
let hash = hasher.finish();
let child = start_transcode(
&path, &path,
&format!("/cache/{hash}"), &get_audio_path(&path, audio),
get_transcode_audio_args(audio), get_transcode_audio_args(audio),
0, 0,
) )
.await; .await;
todo!()
} }
pub async fn transcode_video(path: String, quality: Quality, start_time: u32) -> TranscodeInfo { pub async fn transcode_video(path: String, quality: Quality, start_time: u32) -> TranscodeInfo {
@ -193,7 +187,6 @@ pub async fn transcode_video(path: String, quality: Quality, start_time: u32) ->
.map(char::from) .map(char::from)
.collect(); .collect();
let out_dir = format!("/cache/{uuid}"); let out_dir = format!("/cache/{uuid}");
std::fs::create_dir(&out_dir).expect("Could not create cache directory");
let child = start_transcode( let child = start_transcode(
&path, &path,
@ -215,6 +208,8 @@ async fn start_transcode(
encode_args: Vec<String>, encode_args: Vec<String>,
start_time: u32, start_time: u32,
) -> Child { ) -> Child {
std::fs::create_dir(&out_dir).expect("Could not create cache directory");
let mut cmd = Command::new("ffmpeg"); let mut cmd = Command::new("ffmpeg");
cmd.args(&["-progress", "pipe:1"]) cmd.args(&["-progress", "pipe:1"])
.arg("-nostats") .arg("-nostats")
@ -248,16 +243,16 @@ async fn start_transcode(
let value = &value[1..]; let value = &value[1..];
// Can't use ms since ms and us are both set to us /shrug // Can't use ms since ms and us are both set to us /shrug
if key == "out_time_us" { if key == "out_time_us" {
tx.send(value.parse::<u32>().unwrap() / 1_000_000).unwrap(); let _ = tx.send(value.parse::<u32>().unwrap() / 1_000_000);
} }
// TODO: maybe store speed too.
} }
} }
}); });
// Wait for 1.5 * segment time after start_time to be ready. // Wait for 1.5 * segment time after start_time to be ready.
loop { loop {
rx.changed().await.unwrap(); // TODO: Create a better error handling for here.
rx.changed().await.expect("Invalid audio index.");
let ready_time = *rx.borrow(); let ready_time = *rx.borrow();
if ready_time >= (1.5 * SEGMENT_TIME as f32) as u32 + start_time { if ready_time >= (1.5 * SEGMENT_TIME as f32) as u32 + start_time {
return child; return child;
@ -265,6 +260,14 @@ async fn start_transcode(
} }
} }
pub fn get_audio_path(path: &String, audio: u32) -> String {
let mut hasher = DefaultHasher::new();
path.hash(&mut hasher);
audio.hash(&mut hasher);
let hash = hasher.finish();
format!("/cache/{hash:x}")
}
pub fn get_cache_path(info: &TranscodeInfo) -> PathBuf { pub fn get_cache_path(info: &TranscodeInfo) -> PathBuf {
return get_cache_path_from_uuid(&info.uuid); return get_cache_path_from_uuid(&info.uuid);
} }
@ -274,7 +277,7 @@ pub fn get_cache_path_from_uuid(uuid: &String) -> PathBuf {
} }
pub struct TranscodeInfo { pub struct TranscodeInfo {
show: (String, Quality), pub show: (String, Quality),
job: Child, pub job: Child,
uuid: String, pub uuid: String,
} }

View File

@ -43,6 +43,7 @@ impl Signalable for Child {
} }
pub fn get_client_id(req: HttpRequest) -> Result<String, ApiError> { pub fn get_client_id(req: HttpRequest) -> Result<String, ApiError> {
// return Ok(String::from("1234"));
req.headers().get("x-client-id") req.headers().get("x-client-id")
.ok_or(ApiError::BadRequest { error: String::from("Missing client id. Please specify the X-CLIENT-ID header to a guid constant for the lifetime of the player (but unique per instance)."), }) .ok_or(ApiError::BadRequest { error: String::from("Missing client id. Please specify the X-CLIENT-ID header to a guid constant for the lifetime of the player (but unique per instance)."), })
.map(|x| x.to_str().unwrap().to_string()) .map(|x| x.to_str().unwrap().to_string())