mirror of
https://github.com/zoriya/Kyoo.git
synced 2025-05-31 04:04:21 -04:00
Split transcode commands and state
This commit is contained in:
parent
a5fc5b3753
commit
47c7617d24
@ -1,7 +1,7 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
error::ApiError,
|
error::ApiError,
|
||||||
paths,
|
paths,
|
||||||
transcode::Transcoder,
|
state::Transcoder,
|
||||||
};
|
};
|
||||||
use actix_files::NamedFile;
|
use actix_files::NamedFile;
|
||||||
use actix_web::{get, web, Result};
|
use actix_web::{get, web, Result};
|
||||||
|
@ -10,13 +10,14 @@ use utoipa::OpenApi;
|
|||||||
use crate::{
|
use crate::{
|
||||||
audio::*,
|
audio::*,
|
||||||
identify::{identify, Chapter, MediaInfo, Track},
|
identify::{identify, Chapter, MediaInfo, Track},
|
||||||
transcode::Transcoder,
|
state::Transcoder,
|
||||||
video::*,
|
video::*,
|
||||||
};
|
};
|
||||||
mod audio;
|
mod audio;
|
||||||
mod error;
|
mod error;
|
||||||
mod identify;
|
mod identify;
|
||||||
mod paths;
|
mod paths;
|
||||||
|
mod state;
|
||||||
mod transcode;
|
mod transcode;
|
||||||
mod utils;
|
mod utils;
|
||||||
mod video;
|
mod video;
|
||||||
|
99
transcoder/src/state.rs
Normal file
99
transcoder/src/state.rs
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
use crate::transcode::*;
|
||||||
|
use crate::utils::Signalable;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::RwLock;
|
||||||
|
|
||||||
|
pub struct Transcoder {
|
||||||
|
running: RwLock<HashMap<String, TranscodeInfo>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Transcoder {
|
||||||
|
pub fn new() -> Transcoder {
|
||||||
|
Self {
|
||||||
|
running: RwLock::new(HashMap::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn build_master(&self, _resource: String, _slug: String) -> String {
|
||||||
|
let mut master = String::from("#EXTM3U\n");
|
||||||
|
// TODO: Add transmux (original quality) in this master playlist.
|
||||||
|
// Transmux should be the first variant since it's used to test bandwidth
|
||||||
|
// and serve as a hint for preffered variant for clients.
|
||||||
|
|
||||||
|
// TODO: Fetch kyoo to retrieve the max quality and the aspect_ratio
|
||||||
|
let aspect_ratio = 16.0 / 9.0;
|
||||||
|
for quality in Quality::iter() {
|
||||||
|
master.push_str("#EXT-X-STREAM-INF:");
|
||||||
|
master.push_str(format!("AVERAGE-BANDWIDTH={},", quality.average_bitrate()).as_str());
|
||||||
|
master.push_str(format!("BANDWIDTH={},", quality.max_bitrate()).as_str());
|
||||||
|
master.push_str(
|
||||||
|
format!(
|
||||||
|
"RESOLUTION={}x{},",
|
||||||
|
(aspect_ratio * quality.height() as f32).round() as u32,
|
||||||
|
quality.height()
|
||||||
|
)
|
||||||
|
.as_str(),
|
||||||
|
);
|
||||||
|
master.push_str("CODECS=\"avc1.640028\"\n");
|
||||||
|
master.push_str(format!("./{}/index.m3u8\n", quality).as_str());
|
||||||
|
}
|
||||||
|
// TODO: Add audio streams
|
||||||
|
master
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn transcode(
|
||||||
|
&self,
|
||||||
|
client_id: String,
|
||||||
|
path: String,
|
||||||
|
quality: Quality,
|
||||||
|
start_time: u32,
|
||||||
|
) -> Result<String, std::io::Error> {
|
||||||
|
// TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time.
|
||||||
|
// TODO: Clear cache at startup/every X time without use.
|
||||||
|
// TODO: cache transcoded output for a show/quality and reuse it for every future requests.
|
||||||
|
if let Some(TranscodeInfo {
|
||||||
|
show: (old_path, old_qual),
|
||||||
|
job,
|
||||||
|
uuid,
|
||||||
|
..
|
||||||
|
}) = self.running.write().unwrap().get_mut(&client_id)
|
||||||
|
{
|
||||||
|
if path != *old_path || quality != *old_qual {
|
||||||
|
// If the job has already ended, interrupt returns an error but we don't care.
|
||||||
|
_ = job.interrupt();
|
||||||
|
} else {
|
||||||
|
let mut path = get_cache_path_from_uuid(uuid);
|
||||||
|
path.push("stream.m3u8");
|
||||||
|
return std::fs::read_to_string(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let info = transcode_video(path, quality, start_time).await;
|
||||||
|
let mut path = get_cache_path(&info);
|
||||||
|
path.push("stream.m3u8");
|
||||||
|
self.running.write().unwrap().insert(client_id, info);
|
||||||
|
std::fs::read_to_string(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Use path/quality instead of client_id
|
||||||
|
pub async fn get_segment(
|
||||||
|
&self,
|
||||||
|
client_id: String,
|
||||||
|
_path: String,
|
||||||
|
_quality: Quality,
|
||||||
|
chunk: u32,
|
||||||
|
) -> Result<PathBuf, SegmentError> {
|
||||||
|
let hashmap = self.running.read().unwrap();
|
||||||
|
let info = hashmap.get(&client_id).ok_or(SegmentError::NoTranscode)?;
|
||||||
|
|
||||||
|
// TODO: Check if ready_time is far enough for this fragment to exist.
|
||||||
|
let mut path = get_cache_path(&info);
|
||||||
|
path.push(format!("segments-{0:02}.ts", chunk));
|
||||||
|
Ok(path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum SegmentError {
|
||||||
|
NoTranscode,
|
||||||
|
}
|
@ -2,20 +2,16 @@ use derive_more::Display;
|
|||||||
use rand::distributions::Alphanumeric;
|
use rand::distributions::Alphanumeric;
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::collections::hash_map::DefaultHasher;
|
use std::collections::hash_map::DefaultHasher;
|
||||||
use std::hash::{Hasher, Hash};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::process::Stdio;
|
use std::process::Stdio;
|
||||||
use std::slice::Iter;
|
use std::slice::Iter;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::RwLock;
|
|
||||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||||
use tokio::process::{Child, Command};
|
use tokio::process::{Child, Command};
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use crate::utils::Signalable;
|
|
||||||
|
|
||||||
const SEGMENT_TIME: u32 = 10;
|
const SEGMENT_TIME: u32 = 10;
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Serialize, Display)]
|
#[derive(PartialEq, Eq, Serialize, Display)]
|
||||||
@ -41,7 +37,7 @@ pub enum Quality {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Quality {
|
impl Quality {
|
||||||
fn iter() -> Iter<'static, Quality> {
|
pub fn iter() -> Iter<'static, Quality> {
|
||||||
static QUALITIES: [Quality; 8] = [
|
static QUALITIES: [Quality; 8] = [
|
||||||
Quality::P240,
|
Quality::P240,
|
||||||
Quality::P360,
|
Quality::P360,
|
||||||
@ -57,7 +53,7 @@ impl Quality {
|
|||||||
QUALITIES.iter()
|
QUALITIES.iter()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn height(&self) -> u32 {
|
pub fn height(&self) -> u32 {
|
||||||
match self {
|
match self {
|
||||||
Self::P240 => 240,
|
Self::P240 => 240,
|
||||||
Self::P360 => 360,
|
Self::P360 => 360,
|
||||||
@ -72,7 +68,7 @@ impl Quality {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// I'm not entierly sure about the values for bitrates. Double checking would be nice.
|
// I'm not entierly sure about the values for bitrates. Double checking would be nice.
|
||||||
fn average_bitrate(&self) -> u32 {
|
pub fn average_bitrate(&self) -> u32 {
|
||||||
match self {
|
match self {
|
||||||
Self::P240 => 400_000,
|
Self::P240 => 400_000,
|
||||||
Self::P360 => 800_000,
|
Self::P360 => 800_000,
|
||||||
@ -86,7 +82,7 @@ impl Quality {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn max_bitrate(&self) -> u32 {
|
pub fn max_bitrate(&self) -> u32 {
|
||||||
match self {
|
match self {
|
||||||
Self::P240 => 700_000,
|
Self::P240 => 700_000,
|
||||||
Self::P360 => 1400_000,
|
Self::P360 => 1400_000,
|
||||||
@ -172,7 +168,7 @@ fn get_transcode_video_quality_args(quality: &Quality, segment_time: u32) -> Vec
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn transcode_audio(path: String, audio: u32) -> TranscodeInfo {
|
pub async fn transcode_audio(path: String, audio: u32) -> TranscodeInfo {
|
||||||
let mut hasher = DefaultHasher::new();
|
let mut hasher = DefaultHasher::new();
|
||||||
path.hash(&mut hasher);
|
path.hash(&mut hasher);
|
||||||
audio.hash(&mut hasher);
|
audio.hash(&mut hasher);
|
||||||
@ -188,7 +184,7 @@ async fn transcode_audio(path: String, audio: u32) -> TranscodeInfo {
|
|||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn transcode_video(path: String, quality: Quality, start_time: u32) -> TranscodeInfo {
|
pub async fn transcode_video(path: String, quality: Quality, start_time: u32) -> TranscodeInfo {
|
||||||
// TODO: Use the out path below once cached segments can be reused.
|
// TODO: Use the out path below once cached segments can be reused.
|
||||||
// let out_dir = format!("/cache/{show_hash}/{quality}");
|
// let out_dir = format!("/cache/{show_hash}/{quality}");
|
||||||
let uuid: String = thread_rng()
|
let uuid: String = thread_rng()
|
||||||
@ -269,11 +265,11 @@ async fn start_transcode(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_cache_path(info: &TranscodeInfo) -> PathBuf {
|
pub fn get_cache_path(info: &TranscodeInfo) -> PathBuf {
|
||||||
return get_cache_path_from_uuid(&info.uuid);
|
return get_cache_path_from_uuid(&info.uuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_cache_path_from_uuid(uuid: &String) -> PathBuf {
|
pub fn get_cache_path_from_uuid(uuid: &String) -> PathBuf {
|
||||||
return PathBuf::from(format!("/cache/{uuid}/", uuid = &uuid));
|
return PathBuf::from(format!("/cache/{uuid}/", uuid = &uuid));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -282,97 +278,3 @@ pub struct TranscodeInfo {
|
|||||||
job: Child,
|
job: Child,
|
||||||
uuid: String,
|
uuid: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Transcoder {
|
|
||||||
running: RwLock<HashMap<String, TranscodeInfo>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Transcoder {
|
|
||||||
pub fn new() -> Transcoder {
|
|
||||||
Self {
|
|
||||||
running: RwLock::new(HashMap::new()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn build_master(&self, _resource: String, _slug: String) -> String {
|
|
||||||
let mut master = String::from("#EXTM3U\n");
|
|
||||||
// TODO: Add transmux (original quality) in this master playlist.
|
|
||||||
// Transmux should be the first variant since it's used to test bandwidth
|
|
||||||
// and serve as a hint for preffered variant for clients.
|
|
||||||
|
|
||||||
// TODO: Fetch kyoo to retrieve the max quality and the aspect_ratio
|
|
||||||
let aspect_ratio = 16.0 / 9.0;
|
|
||||||
for quality in Quality::iter() {
|
|
||||||
master.push_str("#EXT-X-STREAM-INF:");
|
|
||||||
master.push_str(format!("AVERAGE-BANDWIDTH={},", quality.average_bitrate()).as_str());
|
|
||||||
master.push_str(format!("BANDWIDTH={},", quality.max_bitrate()).as_str());
|
|
||||||
master.push_str(
|
|
||||||
format!(
|
|
||||||
"RESOLUTION={}x{},",
|
|
||||||
(aspect_ratio * quality.height() as f32).round() as u32,
|
|
||||||
quality.height()
|
|
||||||
)
|
|
||||||
.as_str(),
|
|
||||||
);
|
|
||||||
master.push_str("CODECS=\"avc1.640028\"\n");
|
|
||||||
master.push_str(format!("./{}/index.m3u8\n", quality).as_str());
|
|
||||||
}
|
|
||||||
// TODO: Add audio streams
|
|
||||||
master
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn transcode(
|
|
||||||
&self,
|
|
||||||
client_id: String,
|
|
||||||
path: String,
|
|
||||||
quality: Quality,
|
|
||||||
start_time: u32,
|
|
||||||
) -> Result<String, std::io::Error> {
|
|
||||||
// TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time.
|
|
||||||
// TODO: Clear cache at startup/every X time without use.
|
|
||||||
// TODO: cache transcoded output for a show/quality and reuse it for every future requests.
|
|
||||||
if let Some(TranscodeInfo {
|
|
||||||
show: (old_path, old_qual),
|
|
||||||
job,
|
|
||||||
uuid,
|
|
||||||
..
|
|
||||||
}) = self.running.write().unwrap().get_mut(&client_id)
|
|
||||||
{
|
|
||||||
if path != *old_path || quality != *old_qual {
|
|
||||||
// If the job has already ended, interrupt returns an error but we don't care.
|
|
||||||
_ = job.interrupt();
|
|
||||||
} else {
|
|
||||||
let mut path = get_cache_path_from_uuid(uuid);
|
|
||||||
path.push("stream.m3u8");
|
|
||||||
return std::fs::read_to_string(path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let info = transcode_video(path, quality, start_time).await;
|
|
||||||
let mut path = get_cache_path(&info);
|
|
||||||
path.push("stream.m3u8");
|
|
||||||
self.running.write().unwrap().insert(client_id, info);
|
|
||||||
std::fs::read_to_string(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Use path/quality instead of client_id
|
|
||||||
pub async fn get_segment(
|
|
||||||
&self,
|
|
||||||
client_id: String,
|
|
||||||
_path: String,
|
|
||||||
_quality: Quality,
|
|
||||||
chunk: u32,
|
|
||||||
) -> Result<PathBuf, SegmentError> {
|
|
||||||
let hashmap = self.running.read().unwrap();
|
|
||||||
let info = hashmap.get(&client_id).ok_or(SegmentError::NoTranscode)?;
|
|
||||||
|
|
||||||
// TODO: Check if ready_time is far enough for this fragment to exist.
|
|
||||||
let mut path = get_cache_path(&info);
|
|
||||||
path.push(format!("segments-{0:02}.ts", chunk));
|
|
||||||
Ok(path)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum SegmentError {
|
|
||||||
NoTranscode,
|
|
||||||
}
|
|
||||||
|
@ -1,10 +1,6 @@
|
|||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
use crate::{
|
use crate::{error::ApiError, paths, state::Transcoder, transcode::Quality, utils::get_client_id};
|
||||||
error::ApiError,
|
|
||||||
transcode::{Quality, Transcoder},
|
|
||||||
utils::get_client_id, paths,
|
|
||||||
};
|
|
||||||
use actix_files::NamedFile;
|
use actix_files::NamedFile;
|
||||||
use actix_web::{get, web, HttpRequest, Result};
|
use actix_web::{get, web, HttpRequest, Result};
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user