Wait for the first segment to exist before sending the m3u8

This commit is contained in:
Zoe Roux 2023-04-18 01:27:22 +09:00
parent e7ace4d497
commit 67e4764a72
No known key found for this signature in database
2 changed files with 42 additions and 27 deletions

View File

@ -41,7 +41,10 @@ async fn transcode_movie(
transcoder transcoder
.transcode(client_id, path, quality, 0) .transcode(client_id, path, quality, 0)
.await .await
.map_err(|_| ApiError::InternalError) .map_err(|e| {
eprintln!("Unhandled error occured while transcoding: {}", e);
ApiError::InternalError
})
} }
#[get("/movie/{quality}/{slug}/segments-{chunk}.ts")] #[get("/movie/{quality}/{slug}/segments-{chunk}.ts")]

View File

@ -4,10 +4,10 @@ use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Stdio; use std::process::Stdio;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::AtomicI32; use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command}; use tokio::process::{Child, Command};
use tokio::sync::watch::{self, Receiver};
use crate::utils::Signalable; use crate::utils::Signalable;
@ -66,7 +66,7 @@ fn get_transcode_video_quality_args(quality: &Quality) -> Vec<&'static str> {
} }
// TODO: Add audios streams (and transcode them only when necesarry) // TODO: Add audios streams (and transcode them only when necesarry)
async fn start_transcode(path: String, quality: Quality, start_time: i32) -> TranscodeInfo { async fn start_transcode(path: String, quality: Quality, start_time: u32) -> TranscodeInfo {
// TODO: Use the out path below once cached segments can be reused. // TODO: Use the out path below once cached segments can be reused.
// let out_dir = format!("/cache/{show_hash}/{quality}"); // let out_dir = format!("/cache/{show_hash}/{quality}");
let uuid: String = thread_rng() let uuid: String = thread_rng()
@ -77,18 +77,19 @@ async fn start_transcode(path: String, quality: Quality, start_time: i32) -> Tra
let out_dir = format!("/cache/{uuid}"); let out_dir = format!("/cache/{uuid}");
std::fs::create_dir(&out_dir).expect("Could not create cache directory"); std::fs::create_dir(&out_dir).expect("Could not create cache directory");
let segment_time = "10"; let segment_time: u32 = 10;
let mut child = Command::new("ffmpeg") let mut child = Command::new("ffmpeg")
.args(&["-progress", "pipe:1"]) .args(&["-progress", "pipe:1"])
.arg("-nostats")
.args(&["-ss", start_time.to_string().as_str()]) .args(&["-ss", start_time.to_string().as_str()])
.args(&["-i", path.as_str()]) .args(&["-i", path.as_str()])
.args(&["-f", "segment"]) .args(&["-f", "segment"])
.args(&["-segment_list_type", "m3u8"]) .args(&["-segment_list_type", "m3u8"])
// Disable the .tmp file to serve it instantly to the client. // Use a .tmp file for segments (.ts files)
.args(&["-hls_flags", "temp_files"]) .args(&["-hls_flags", "temp_files"])
// Keep all segments in the list (else only last X are presents, useful for livestreams) // Keep all segments in the list (else only last X are presents, useful for livestreams)
.args(&["-segment_list_size", "0"]) .args(&["-segment_list_size", "0"])
.args(&["-segment_time", segment_time]) .args(&["-segment_time", segment_time.to_string().as_str()])
// Force segments to be exactly segment_time (only works when transcoding) // Force segments to be exactly segment_time (only works when transcoding)
.args(&[ .args(&[
"-force_key_frames", "-force_key_frames",
@ -104,37 +105,46 @@ async fn start_transcode(path: String, quality: Quality, start_time: i32) -> Tra
format!("{out_dir}/stream.m3u8"), format!("{out_dir}/stream.m3u8"),
format!("{out_dir}/segments-%02d.ts"), format!("{out_dir}/segments-%02d.ts"),
]) ])
// TODO: Figure out which flag would be the best.
.args(&["-segment_list_flags", "live"])
// .args(&["-segment_list_flags", "cache"])
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.spawn() .spawn()
.expect("ffmpeg failed to start"); .expect("ffmpeg failed to start");
let stdout = child.stdout.take().unwrap(); let stdout = child.stdout.take().unwrap();
let info = TranscodeInfo { let (tx, mut rx) = watch::channel(0u32);
show: (path, quality),
job: child,
uuid,
start_time,
ready_time: Arc::new(AtomicI32::new(0)),
};
let ready_time = Arc::clone(&info.ready_time);
tokio::spawn(async move { tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines(); let mut reader = BufReader::new(stdout).lines();
while let Some(line) = reader.next_line().await.unwrap() { while let Some(line) = reader.next_line().await.unwrap() {
if let Some((key, value)) = line.find(':').map(|i| line.split_at(i)) { if let Some((key, value)) = line.find('=').map(|i| line.split_at(i)) {
if key == "out_time_ms" { let value = &value[1..];
ready_time.store( // Can't use ms since ms and us are both set to us /shrug
value.parse::<i32>().unwrap() / 1000, if key == "out_time_us" {
std::sync::atomic::Ordering::Relaxed, tx.send(value.parse::<u32>().unwrap() / 1_000_000).unwrap();
);
} }
// TODO: maybe store speed too. // TODO: maybe store speed too.
} }
} }
}); });
// TODO: Wait for 1.5 * segment time after start_time to be ready. // Wait for 1.5 * segment time after start_time to be ready.
return info; loop {
rx.changed().await.unwrap();
let ready_time = *rx.borrow();
if ready_time >= (1.5 * segment_time as f32) as u32 + start_time {
break;
}
}
TranscodeInfo {
show: (path, quality),
job: child,
uuid,
start_time,
ready_time: rx,
}
} }
fn get_cache_path(info: &TranscodeInfo) -> PathBuf { fn get_cache_path(info: &TranscodeInfo) -> PathBuf {
@ -151,8 +161,9 @@ struct TranscodeInfo {
job: Child, job: Child,
uuid: String, uuid: String,
#[allow(dead_code)] #[allow(dead_code)]
start_time: i32, start_time: u32,
ready_time: Arc<AtomicI32>, #[allow(dead_code)]
ready_time: Receiver<u32>,
} }
pub struct Transcoder { pub struct Transcoder {
@ -171,7 +182,7 @@ impl Transcoder {
client_id: String, client_id: String,
path: String, path: String,
quality: Quality, quality: Quality,
start_time: i32, start_time: u32,
) -> Result<String, std::io::Error> { ) -> Result<String, std::io::Error> {
// TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time. // TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time.
// TODO: Clear cache at startup/every X time without use. // TODO: Clear cache at startup/every X time without use.
@ -184,7 +195,8 @@ impl Transcoder {
}) = self.running.write().unwrap().get_mut(&client_id) }) = self.running.write().unwrap().get_mut(&client_id)
{ {
if path != *old_path || quality != *old_qual { if path != *old_path || quality != *old_qual {
job.interrupt()?; // If the job has already ended, interrupt returns an error but we don't care.
_ = job.interrupt();
} else { } else {
let mut path = get_cache_path_from_uuid(uuid); let mut path = get_cache_path_from_uuid(uuid);
path.push("stream.m3u8"); path.push("stream.m3u8");