Init go rewrite

This commit is contained in:
Zoe Roux 2024-01-11 15:16:13 +01:00
parent 439407ea59
commit b2a0dfa702
18 changed files with 21 additions and 3318 deletions

View File

@ -61,8 +61,5 @@ jobs:
steps:
- uses: actions/checkout@v1
- uses: dtolnay/rust-toolchain@stable
- name: Run cargo fmt
run: |
cargo fmt --check
- name: Run go fmt
run: if [ "$(gofmt -s -l . | wc -l)" -gt 0 ]; then exit 1; fi

View File

@ -18,12 +18,8 @@ in
python
python312Packages.setuptools
python312Packages.pip
cargo
cargo-watch
rustfmt
rustc
pkgconfig
openssl
go
wgo
mediainfo
ffmpeg
postgresql_15
@ -32,7 +28,6 @@ in
pgformatter
];
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";
DOTNET_ROOT = "${dotnet}";
shellHook = ''

1873
transcoder/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +0,0 @@
[package]
name = "transcoder"
version = "0.1.0"
edition = "2021"
[dependencies]
actix-web = "4"
actix-files = "0.6.2"
tokio = { version = "1.27.0", features = ["process"] }
serde = { version = "1.0.159", features = ["derive"] }
rand = "0.8.5"
derive_more = "0.99.17"
reqwest = { version = "0.11.16", default_features = false, features = ["json", "rustls-tls"] }
utoipa = { version = "3", features = ["actix_extras"] }
json = "0.12.4"
cached = { version = "0.46.1", features = ["async"] }

View File

@ -1,20 +1,9 @@
FROM rust:alpine as builder
RUN apk add --no-cache musl-dev
FROM golang:1.20-alpine
WORKDIR /app
# FIX: see https://github.com/rust-lang/cargo/issues/2644
RUN mkdir src/ && touch src/lib.rs
COPY Cargo.toml Cargo.lock ./
ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse
RUN cargo build
RUN rm src/lib.rs
COPY src src
RUN cargo install --path .
FROM alpine
RUN apk add --no-cache ffmpeg mediainfo musl-dev
COPY --from=builder /usr/local/cargo/bin/transcoder ./transcoder
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o ./transcoder
EXPOSE 7666
ENTRYPOINT ["./transcoder"]
CMD ./transcoder

View File

@ -1,13 +1,6 @@
FROM rust:alpine
RUN apk add --no-cache musl-dev ffmpeg mediainfo
RUN cargo install cargo-watch
FROM golang:1.20-alpine
RUN go install github.com/bokwoon95/wgo@latest
WORKDIR /app
# FIX: see https://github.com/rust-lang/cargo/issues/2644
RUN mkdir src/ && touch src/lib.rs
COPY Cargo.toml Cargo.lock ./
RUN cargo build
RUN rm src/lib.rs
EXPOSE 7666
ENTRYPOINT ["cargo", "watch", "-x", "run"]
CMD wgo run .

3
transcoder/go.mod Normal file
View File

@ -0,0 +1,3 @@
module github.com/zoriya/kyoo/transcoder
go 1.20

5
transcoder/main.go Normal file
View File

@ -0,0 +1,5 @@
package main
func main() {
print("")
}

View File

@ -1 +0,0 @@
hard_tabs = true

View File

@ -1,89 +0,0 @@
use crate::{error::ApiError, paths, state::Transcoder, transcode::TranscodeError};
use actix_files::NamedFile;
use actix_web::{get, web, Result};
/// Transcode audio
///
/// Get the selected audio
/// This route can take a few seconds to respond since it will way for at least one segment to be
/// available.
#[utoipa::path(
responses(
(status = 200, description = "Get the m3u8 playlist."),
(status = NOT_FOUND, description = "Invalid slug.")
),
params(
("resource" = String, Path, description = "Episode or movie"),
("slug" = String, Path, description = "The slug of the movie/episode."),
("audio" = u32, Path, description = "Specify the audio stream you want. For mappings, refer to the audios fields of the /watch response."),
)
)]
#[get("/{resource}/{slug}/audio/{audio}/index.m3u8")]
async fn get_audio_transcoded(
query: web::Path<(String, String, u32)>,
transcoder: web::Data<Transcoder>,
) -> Result<String, ApiError> {
let (resource, slug, audio) = query.into_inner();
let path = paths::get_path(resource, slug)
.await
.map_err(|_| ApiError::NotFound)?;
transcoder
.transcode_audio(path, audio)
.await
.map_err(|e| match e {
TranscodeError::ArgumentError(err) => ApiError::BadRequest { error: err },
TranscodeError::FFmpegError(err) => {
eprintln!(
"Unhandled ffmpeg error occured while transcoding audio: {}",
err
);
ApiError::InternalError
}
TranscodeError::ReadError(err) => {
eprintln!(
"Unhandled read error occured while transcoding audio: {}",
err
);
ApiError::InternalError
}
})
}
/// Get audio chunk
///
/// Retrieve a chunk of a transcoded audio.
#[utoipa::path(
responses(
(status = 200, description = "Get a hls chunk."),
(status = NOT_FOUND, description = "Invalid slug.")
),
params(
("resource" = String, Path, description = "Episode or movie"),
("slug" = String, Path, description = "The slug of the movie/episode."),
("audio" = u32, Path, description = "Specify the audio you want"),
("chunk" = u32, Path, description = "The number of the chunk"),
)
)]
#[get("/{resource}/{slug}/audio/{audio}/segments-{chunk}.ts")]
async fn get_audio_chunk(
query: web::Path<(String, String, u32, u32)>,
transcoder: web::Data<Transcoder>,
) -> Result<NamedFile, ApiError> {
let (resource, slug, audio, chunk) = query.into_inner();
let path = paths::get_path(resource, slug)
.await
.map_err(|_| ApiError::NotFound)?;
transcoder
.get_audio_segment(path, audio, chunk)
.await
.map_err(|_| ApiError::BadRequest {
error: "No transcode started for the selected show/audio.".to_string(),
})
.and_then(|path| {
NamedFile::open(path).map_err(|_| ApiError::BadRequest {
error: "Invalid segment number.".to_string(),
})
})
}

View File

@ -1,36 +0,0 @@
use actix_web::{
error,
http::{header::ContentType, StatusCode},
HttpResponse,
};
use derive_more::{Display, Error};
#[derive(Debug, Display, Error)]
pub enum ApiError {
#[display(fmt = "{}", error)]
BadRequest { error: String },
#[display(fmt = "Resource not found.")]
NotFound,
#[display(fmt = "An internal error occurred. Please try again later.")]
InternalError,
}
impl error::ResponseError for ApiError {
fn error_response(&self) -> HttpResponse {
HttpResponse::build(self.status_code())
.insert_header(ContentType::json())
.body(format!(
"{{ \"status\": \"{status}\", \"errors\": [\"{err}\"] }}",
status = self.status_code(),
err = self.to_string()
))
}
fn status_code(&self) -> StatusCode {
match self {
ApiError::BadRequest { error: _ } => StatusCode::BAD_REQUEST,
ApiError::NotFound => StatusCode::NOT_FOUND,
ApiError::InternalError => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}

View File

@ -1,281 +0,0 @@
use cached::proc_macro::cached;
use json::JsonValue;
use serde::Serialize;
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
path::{Path, PathBuf},
process::Stdio,
str::{self, FromStr},
};
use tokio::process::Command;
use utoipa::ToSchema;
use crate::transcode::Quality;
#[derive(Serialize, ToSchema, Clone)]
#[serde(rename_all = "camelCase")]
pub struct MediaInfo {
/// The sha1 of the video file.
pub sha: String,
/// The internal path of the video file.
pub path: String,
/// The extension currently used to store this video file
pub extension: String,
/// The length of the media in seconds.
pub length: f32,
/// The container of the video file of this episode.
pub container: String,
/// The video codec and infromations.
pub video: Video,
/// The list of audio tracks.
pub audios: Vec<Audio>,
/// The list of subtitles tracks.
pub subtitles: Vec<Subtitle>,
/// The list of fonts that can be used to display subtitles.
pub fonts: Vec<String>,
/// The list of chapters. See Chapter for more information.
pub chapters: Vec<Chapter>,
}
#[derive(Serialize, ToSchema, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Video {
/// The codec of this stream (defined as the RFC 6381).
pub codec: String,
/// The language of this stream (as a ISO-639-2 language code)
pub language: Option<String>,
/// The max quality of this video track.
pub quality: Quality,
/// The width of the video stream
pub width: u32,
/// The height of the video stream
pub height: u32,
/// The average bitrate of the video in bytes/s
pub bitrate: u32,
}
#[derive(Serialize, ToSchema, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Audio {
/// The index of this track on the media.
pub index: u32,
/// The title of the stream.
pub title: Option<String>,
/// The language of this stream (as a ISO-639-2 language code)
pub language: Option<String>,
/// The codec of this stream.
pub codec: String,
/// Is this stream the default one of it's type?
pub is_default: bool,
/// Is this stream tagged as forced? (useful only for subtitles)
pub is_forced: bool,
}
#[derive(Serialize, ToSchema, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Subtitle {
/// The index of this track on the media.
pub index: u32,
/// The title of the stream.
pub title: Option<String>,
/// The language of this stream (as a ISO-639-2 language code)
pub language: Option<String>,
/// The codec of this stream.
pub codec: String,
/// The extension for the codec.
pub extension: Option<String>,
/// Is this stream the default one of it's type?
pub is_default: bool,
/// Is this stream tagged as forced? (useful only for subtitles)
pub is_forced: bool,
/// The link to access this subtitle.
pub link: Option<String>,
}
#[derive(Serialize, ToSchema, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Chapter {
/// The start time of the chapter (in second from the start of the episode).
pub start_time: f32,
/// The end time of the chapter (in second from the start of the episode).
pub end_time: f32,
/// The name of this chapter. This should be a human-readable name that could be presented to the user.
pub name: String, // TODO: add a type field for Opening, Credits...
}
async fn extract(path: String, sha: &String, subs: &Vec<Subtitle>) {
println!("Extract subs and fonts for {}", path);
let mut cmd = Command::new("ffmpeg");
cmd.current_dir(format!("/metadata/{sha}/att/"))
.args(&["-dump_attachment:t", ""])
.args(&["-i", path.as_str()]);
for sub in subs {
if let Some(ext) = sub.extension.clone() {
cmd.args(&[
"-map",
format!("0:s:{idx}", idx = sub.index).as_str(),
"-c:s",
"copy",
format!("/metadata/{sha}/sub/{idx}.{ext}", idx = sub.index,).as_str(),
]);
}
}
println!("Starting extraction with the command: {:?}", cmd);
cmd.stdout(Stdio::null())
.spawn()
.expect("Error starting ffmpeg extract")
.wait()
.await
.expect("Error running ffmpeg extract");
}
#[cached(sync_writes = true, time = 30)]
pub async fn identify(path: String) -> Option<MediaInfo> {
let extension_table: HashMap<&str, &str> =
HashMap::from([("subrip", "srt"), ("ass", "ass"), ("vtt", "vtt")]);
let mediainfo = Command::new("mediainfo")
.arg("--Output=JSON")
.arg("--Language=raw")
.arg(path.clone())
.output()
.await
.expect("Error running the mediainfo command");
if !mediainfo.status.success() {
return None;
}
let output = json::parse(str::from_utf8(mediainfo.stdout.as_slice()).unwrap()).unwrap();
let general = output["media"]["track"]
.members()
.find(|x| x["@type"] == "General")
.unwrap();
let sha = general["UniqueID"]
.as_str()
.and_then(|x| {
// Remove dummy values that some tools use.
if x.len() < 5 {
return None;
}
Some(x.to_string())
})
.unwrap_or_else(|| {
let mut hasher = DefaultHasher::new();
path.hash(&mut hasher);
general["File_Modified_Date"].to_string().hash(&mut hasher);
format!("{hash:x}", hash = hasher.finish())
});
let subs: Vec<Subtitle> = output["media"]["track"]
.members()
.filter(|x| x["@type"] == "Text")
.map(|x| {
let index = parse::<u32>(&x["@typeorder"]).unwrap_or(1) - 1;
let mut codec = x["Format"].as_str().unwrap().to_string().to_lowercase();
if codec == "utf-8" {
codec = "subrip".to_string();
}
let extension = extension_table.get(codec.as_str()).map(|x| x.to_string());
Subtitle {
link: extension
.as_ref()
.map(|ext| format!("/video/{sha}/subtitle/{index}.{ext}")),
index,
title: x["Title"].as_str().map(|x| x.to_string()),
language: x["Language"].as_str().map(|x| x.to_string()),
codec,
extension,
is_default: x["Default"] == "Yes",
is_forced: x["Forced"] == "No",
}
})
.collect();
if !PathBuf::from(format!("/metadata/{sha}")).exists() {
std::fs::create_dir_all(format!("/metadata/{sha}/att")).unwrap();
std::fs::create_dir_all(format!("/metadata/{sha}/sub")).unwrap();
extract(path.clone(), &sha, &subs).await;
}
fn parse<F: FromStr>(v: &JsonValue) -> Option<F> {
v.as_str().and_then(|x| x.parse::<F>().ok())
}
Some(MediaInfo {
length: parse::<f32>(&general["Duration"])?,
extension: Path::new(&path)
.extension()
.map(|x| x.to_os_string().into_string().unwrap())
.unwrap_or(String::from("mkv")),
container: general["Format"].as_str().unwrap().to_string(),
video: {
let v = output["media"]["track"]
.members()
.find(|x| x["@type"] == "Video")
.expect("File without video found. This is not supported");
Video {
// This codec is not in the right format (does not include bitdepth...).
codec: v["Format"].as_str().unwrap().to_string(),
language: v["Language"].as_str().map(|x| x.to_string()),
quality: Quality::from_height(parse::<u32>(&v["Height"]).unwrap()),
width: parse::<u32>(&v["Width"]).unwrap(),
height: parse::<u32>(&v["Height"]).unwrap(),
bitrate: parse::<u32>(&v["BitRate"])
.unwrap_or(parse(&general["OverallBitRate"]).unwrap()),
}
},
audios: output["media"]["track"]
.members()
.filter(|x| x["@type"] == "Audio")
.map(|a| Audio {
index: parse::<u32>(&a["@typeorder"]).unwrap_or(1) - 1,
title: a["Title"].as_str().map(|x| x.to_string()),
language: a["Language"].as_str().map(|x| x.to_string()),
// TODO: format is invalid. Channels count missing...
codec: a["Format"].as_str().unwrap().to_string(),
is_default: a["Default"] == "Yes",
is_forced: a["Forced"] == "No",
})
.collect(),
subtitles: subs,
fonts: general["extra"]["Attachments"]
.as_str()
.map_or(Vec::new(), |x| {
x.to_string()
.split(" / ")
.map(|x| format!("/video/{sha}/attachment/{x}"))
.collect()
}),
chapters: output["media"]["track"]
.members()
.find(|x| x["@type"] == "Menu")
.map(|x| {
std::iter::zip(x["extra"].entries(), x["extra"].entries().skip(1))
.map(|((start, name), (end, _))| Chapter {
start_time: time_to_seconds(start),
end_time: time_to_seconds(end),
name: name.as_str().unwrap().to_string(),
})
.collect()
})
.unwrap_or(vec![]),
sha,
path,
})
}
fn time_to_seconds(time: &str) -> f32 {
let splited: Vec<f32> = time
.split('_')
.skip(1)
.map(|x| x.parse().unwrap())
.collect();
let hours = splited[0];
let minutes = splited[1];
let seconds = splited[2];
let ms = splited[3];
(hours * 60. + minutes) * 60. + seconds + ms / 1000.
}

View File

@ -1,215 +0,0 @@
use std::path::PathBuf;
use actix_files::NamedFile;
use actix_web::{
get,
web::{self, Json},
App, HttpServer, Result,
};
use error::ApiError;
use utoipa::OpenApi;
use crate::{
audio::*,
identify::{identify, Audio, Chapter, MediaInfo, Subtitle, Video},
state::Transcoder,
transcode::Quality,
video::*,
};
mod audio;
mod error;
mod identify;
mod paths;
mod state;
mod transcode;
mod utils;
mod video;
/// Direct video
///
/// Retrieve the raw video stream, in the same container as the one on the server. No transcoding or
/// transmuxing is done.
#[utoipa::path(
responses(
(status = 200, description = "The item is returned"),
(status = NOT_FOUND, description = "Invalid slug.")
),
params(
("resource" = String, Path, description = "Episode or movie"),
("slug" = String, Path, description = "The slug of the movie/episode."),
)
)]
#[get("/{resource}/{slug}/direct")]
async fn get_direct(query: web::Path<(String, String)>) -> Result<NamedFile, ApiError> {
let (resource, slug) = query.into_inner();
let path = paths::get_path(resource, slug).await.map_err(|e| {
eprintln!("Unhandled error occured while getting the path: {}", e);
ApiError::NotFound
})?;
NamedFile::open_async(path).await.map_err(|e| {
eprintln!(
"Unhandled error occured while openning the direct stream: {}",
e
);
ApiError::InternalError
})
}
/// Get master playlist
///
/// Get a master playlist containing all possible video qualities and audios available for this resource.
/// Note that the direct stream is missing (since the direct is not an hls stream) and
/// subtitles/fonts are not included to support more codecs than just webvtt.
#[utoipa::path(
responses(
(status = 200, description = "Get the m3u8 master playlist."),
(status = NOT_FOUND, description = "Invalid slug.")
),
params(
("resource" = String, Path, description = "Episode or movie"),
("slug" = String, Path, description = "The slug of the movie/episode."),
)
)]
#[get("/{resource}/{slug}/master.m3u8")]
async fn get_master(
query: web::Path<(String, String)>,
transcoder: web::Data<Transcoder>,
) -> Result<String, ApiError> {
let (resource, slug) = query.into_inner();
transcoder
.build_master(resource, slug)
.await
.ok_or(ApiError::InternalError)
}
/// Identify
///
/// Identify metadata about a file
#[utoipa::path(
responses(
(status = 200, description = "Ok", body = MediaInfo),
(status = NOT_FOUND, description = "Invalid slug.")
),
params(
("resource" = String, Path, description = "Episode or movie"),
("slug" = String, Path, description = "The slug of the movie/episode."),
)
)]
#[get("/{resource}/{slug}/info")]
async fn identify_resource(
query: web::Path<(String, String)>,
) -> Result<Json<MediaInfo>, ApiError> {
let (resource, slug) = query.into_inner();
let path = paths::get_path(resource, slug)
.await
.map_err(|_| ApiError::NotFound)?;
identify(path)
.await
.map(|info| Json(info))
.ok_or_else(|| ApiError::BadRequest {
error: "Invalid video file. Could not parse informations.".to_string(),
})
}
/// Get attachments
///
/// Get a specific attachment
#[utoipa::path(
responses(
(status = 200, description = "Ok", body = MediaInfo),
(status = NOT_FOUND, description = "Invalid slug.")
),
params(
("sha" = String, Path, description = "The sha1 of the file"),
("name" = String, Path, description = "The name of the attachment."),
)
)]
#[get("/{sha}/attachment/{name}")]
async fn get_attachment(query: web::Path<(String, String)>) -> Result<NamedFile, ApiError> {
let (sha, name) = query.into_inner();
let mut attpath = PathBuf::from("/metadata");
attpath.push(sha);
attpath.push("att");
attpath.push(name);
NamedFile::open_async(attpath)
.await
.map_err(|_| ApiError::NotFound)
}
/// Get subtitle
///
/// Get a specific subtitle
#[utoipa::path(
responses(
(status = 200, description = "Ok", body = MediaInfo),
(status = NOT_FOUND, description = "Invalid slug.")
),
params(
("sha" = String, Path, description = "The sha1 of the file"),
("name" = String, Path, description = "The name of the subtitle."),
)
)]
#[get("/{sha}/subtitle/{name}")]
async fn get_subtitle(query: web::Path<(String, String)>) -> Result<NamedFile, ApiError> {
let (sha, name) = query.into_inner();
let mut subpath = PathBuf::from("/metadata");
subpath.push(sha);
subpath.push("sub");
subpath.push(name);
NamedFile::open_async(subpath)
.await
.map_err(|_| ApiError::NotFound)
}
#[get("/openapi.json")]
async fn get_swagger() -> String {
#[derive(OpenApi)]
#[openapi(
info(description = "Transcoder's open api."),
paths(
get_direct,
get_master,
get_transcoded,
get_chunk,
get_audio_transcoded,
get_audio_chunk,
identify_resource,
get_attachment,
get_subtitle,
),
components(schemas(MediaInfo, Video, Audio, Subtitle, Chapter, Quality))
)]
struct ApiDoc;
ApiDoc::openapi().to_pretty_json().unwrap()
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let state = web::Data::new(Transcoder::new());
// Clear the cache
for entry in std::fs::read_dir("/cache")? {
_ = std::fs::remove_dir_all(entry?.path());
}
HttpServer::new(move || {
App::new()
.app_data(state.clone())
.service(get_direct)
.service(get_master)
.service(get_transcoded)
.service(get_chunk)
.service(get_audio_transcoded)
.service(get_audio_chunk)
.service(identify_resource)
.service(get_swagger)
.service(get_attachment)
.service(get_subtitle)
})
.bind(("0.0.0.0", 7666))?
.run()
.await
}

View File

@ -1,28 +0,0 @@
use serde::Deserialize;
#[derive(Deserialize)]
struct Item {
path: String,
}
pub async fn get_path(resource: String, slug: String) -> Result<String, reqwest::Error> {
let api_url = std::env::var("API_URL").unwrap_or("http://back:5000".to_string());
let api_key = std::env::var("KYOO_APIKEYS")
.expect("Missing api keys.")
.split(',')
.next()
.unwrap()
.to_string();
// TODO: Store the client somewhere gobal
let client = reqwest::Client::new();
client
.get(format!("{api_url}/{resource}/{slug}"))
.header("X-API-KEY", api_key)
.send()
.await?
.error_for_status()?
.json::<Item>()
.await
.map(|x| x.path)
}

View File

@ -1,244 +0,0 @@
use crate::identify::identify;
use crate::paths::get_path;
use crate::transcode::*;
use crate::utils::Signalable;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::RwLock;
use std::time::{Duration, SystemTime};
pub struct Transcoder {
running: RwLock<HashMap<String, TranscodeInfo>>,
audio_jobs: RwLock<HashMap<(String, u32), AudioInfo>>,
}
impl Transcoder {
pub fn new() -> Transcoder {
Self {
running: RwLock::new(HashMap::new()),
audio_jobs: RwLock::new(HashMap::new()),
}
}
fn clean_old_transcode(&self) {
self.running.write().unwrap().retain(|_, info| {
if SystemTime::now()
.duration_since(*info.last_used.read().unwrap())
.is_ok_and(|d| d > Duration::new(4 * 60 * 60, 0))
{
_ = info.job.interrupt();
_ = std::fs::remove_dir_all(get_cache_path_from_uuid(&info.uuid));
return false;
}
return true;
});
}
fn clean_old_audio_transcode(&self) {
self.audio_jobs
.write()
.unwrap()
.retain(|(path, idx), info| {
if SystemTime::now()
.duration_since(*info.last_used.read().unwrap())
.is_ok_and(|d| d > Duration::new(4 * 60 * 60, 0))
{
_ = info.job.interrupt();
_ = std::fs::remove_dir_all(get_audio_path(path, *idx));
return false;
}
return true;
});
}
pub async fn build_master(&self, resource: String, slug: String) -> Option<String> {
let mut master = String::from("#EXTM3U\n");
let path = get_path(resource, slug).await.ok()?;
let info = identify(path).await?;
// TODO: Only add this if transmuxing is possible.
if true {
// Doc: https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/creating_a_multivariant_playlist
master.push_str("#EXT-X-STREAM-INF:");
master.push_str(format!("AVERAGE-BANDWIDTH={},", info.video.bitrate).as_str());
// Approximate a bit more because we can't know the maximum bandwidth.
master.push_str(
format!("BANDWIDTH={},", (info.video.bitrate as f32 * 1.2) as u32).as_str(),
);
master.push_str(
format!("RESOLUTION={}x{},", info.video.width, info.video.height).as_str(),
);
// TODO: Find codecs in the RFC 6381 format.
// master.push_str("CODECS=\"avc1.640028\",");
// TODO: With multiple audio qualities, maybe switch qualities depending on the video quality.
master.push_str("AUDIO=\"audio\",");
// Not adding this attribute result in some players (like android's exoplayer) to
// assume a non existant closed caption exist
master.push_str("CLOSED-CAPTIONS=NONE\n");
master.push_str(format!("./{}/index.m3u8\n", Quality::Original).as_str());
}
let aspect_ratio = info.video.width as f32 / info.video.height as f32;
// Do not include a quality with the same height as the original (simpler for automatic
// selection on the client side.)
for quality in Quality::iter().filter(|x| {
x.height() < info.video.quality.height() && x.average_bitrate() < info.video.bitrate
}) {
// Doc: https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/creating_a_multivariant_playlist
master.push_str("#EXT-X-STREAM-INF:");
master.push_str(format!("AVERAGE-BANDWIDTH={},", quality.average_bitrate()).as_str());
master.push_str(format!("BANDWIDTH={},", quality.max_bitrate()).as_str());
master.push_str(
format!(
"RESOLUTION={}x{},",
(aspect_ratio * quality.height() as f32).round() as u32,
quality.height()
)
.as_str(),
);
master.push_str("CODECS=\"avc1.640028\",");
// TODO: With multiple audio qualities, maybe switch qualities depending on the video quality.
master.push_str("AUDIO=\"audio\",");
// Not adding this attribute result in some players (like android's exoplayer) to
// assume a non existant closed caption exist
master.push_str("CLOSED-CAPTIONS=NONE\n");
master.push_str(format!("./{}/index.m3u8\n", quality).as_str());
}
for audio in info.audios {
// Doc: https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/adding_alternate_media_to_a_playlist
master.push_str("#EXT-X-MEDIA:TYPE=AUDIO,");
// The group-id allows to distinguish multiple qualities from multiple variants.
// We could create another quality set and use group-ids hiqual and lowqual.
master.push_str("GROUP-ID=\"audio\",");
if let Some(language) = audio.language.clone() {
master.push_str(format!("LANGUAGE=\"{}\",", language).as_str());
}
// Exoplayer throws if audio tracks dont have names so we always add one.
if let Some(title) = audio.title {
master.push_str(format!("NAME=\"{}\",", title).as_str());
} else if let Some(language) = audio.language {
master.push_str(format!("NAME=\"{}\",", language).as_str());
} else {
master.push_str(format!("NAME=\"Audio {}\",", audio.index).as_str());
}
// TODO: Support aac5.1 (and specify the number of channel bellow)
// master.push_str(format!("CHANNELS=\"{}\",", 2).as_str());
master.push_str("DEFAULT=YES,");
master.push_str(format!("URI=\"./audio/{}/index.m3u8\"\n", audio.index).as_str());
}
Some(master)
}
pub async fn transcode(
&self,
client_id: String,
path: String,
quality: Quality,
start_time: u32,
) -> Result<String, TranscodeError> {
// TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time.
// TODO: cache transcoded output for a show/quality and reuse it for every future requests.
if let Some(TranscodeInfo {
show: (old_path, old_qual),
job,
uuid,
..
}) = self.running.write().unwrap().get_mut(&client_id)
{
if path != *old_path || quality != *old_qual {
// If the job has already ended, interrupt returns an error but we don't care.
_ = job.interrupt();
_ = std::fs::remove_dir_all(get_cache_path_from_uuid(uuid));
} else {
let mut path = get_cache_path_from_uuid(uuid);
path.push("stream.m3u8");
return std::fs::read_to_string(path).map_err(|e| TranscodeError::ReadError(e));
}
}
self.clean_old_transcode();
let info = transcode_video(path, quality, start_time).await?;
let mut path = get_cache_path(&info);
path.push("stream.m3u8");
self.running.write().unwrap().insert(client_id, info);
std::fs::read_to_string(path).map_err(|e| TranscodeError::ReadError(e))
}
// TODO: Use path/quality instead of client_id
pub async fn get_segment(
&self,
client_id: String,
_path: String,
_quality: Quality,
chunk: u32,
) -> Result<PathBuf, SegmentError> {
let hashmap = self.running.read().unwrap();
let info = hashmap.get(&client_id).ok_or(SegmentError::NoTranscode)?;
if let Ok(mut last) = info.last_used.try_write() {
*last = SystemTime::now();
}
// If the segment is in the playlist file, it is available so we don't need to check that.
let mut path = get_cache_path(&info);
path.push(format!("segments-{0:02}.ts", chunk));
Ok(path)
}
pub async fn transcode_audio(
&self,
path: String,
audio: u32,
) -> Result<String, TranscodeError> {
let mut stream = PathBuf::from(get_audio_path(&path, audio));
stream.push("stream.m3u8");
if !self
.audio_jobs
.read()
.unwrap()
.contains_key(&(path.clone(), audio))
{
self.clean_old_audio_transcode();
// TODO: If two concurrent requests for the same audio came, the first one will
// initialize the transcode and wait for the second segment while the second will use
// the same transcode but not wait and retrieve a potentially invalid playlist file.
self.audio_jobs.write().unwrap().insert(
(path.clone(), audio),
AudioInfo {
job: transcode_audio(path, audio).await?,
last_used: RwLock::new(SystemTime::now()),
},
);
}
std::fs::read_to_string(stream).map_err(|e| TranscodeError::ReadError(e))
}
pub async fn get_audio_segment(
&self,
path: String,
audio: u32,
chunk: u32,
) -> Result<PathBuf, std::io::Error> {
if let Some(mut last) = self
.audio_jobs
.read()
.unwrap()
.get(&(path.clone(), audio))
.and_then(|info| info.last_used.try_write().ok())
{
*last = SystemTime::now();
}
let mut path = PathBuf::from(get_audio_path(&path, audio));
path.push(format!("segments-{0:02}.ts", chunk));
Ok(path)
}
}
pub enum SegmentError {
NoTranscode,
}

View File

@ -1,335 +0,0 @@
use derive_more::Display;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use serde::Serialize;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
use std::process::Stdio;
use std::slice::Iter;
use std::str::FromStr;
use std::sync::RwLock;
use std::time::SystemTime;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::watch;
use utoipa::ToSchema;
const SEGMENT_TIME: u32 = 10;
pub enum TranscodeError {
ReadError(std::io::Error),
FFmpegError(String),
ArgumentError(String),
}
#[derive(PartialEq, Eq, Serialize, Display, Clone, Copy, ToSchema)]
pub enum Quality {
#[display(fmt = "240p")]
P240,
#[display(fmt = "360p")]
P360,
#[display(fmt = "480p")]
P480,
#[display(fmt = "720p")]
P720,
#[display(fmt = "1080p")]
P1080,
#[display(fmt = "1440p")]
P1440,
#[display(fmt = "4k")]
P4k,
#[display(fmt = "8k")]
P8k,
#[display(fmt = "original")]
Original,
}
impl Quality {
pub fn iter() -> Iter<'static, Quality> {
static QUALITIES: [Quality; 8] = [
Quality::P240,
Quality::P360,
Quality::P480,
Quality::P720,
Quality::P1080,
Quality::P1440,
Quality::P4k,
Quality::P8k,
// Purposfully removing Original from this list (since it require special treatments
// anyways)
];
QUALITIES.iter()
}
pub fn height(&self) -> u32 {
match self {
Self::P240 => 240,
Self::P360 => 360,
Self::P480 => 480,
Self::P720 => 720,
Self::P1080 => 1080,
Self::P1440 => 1440,
Self::P4k => 2160,
Self::P8k => 4320,
Self::Original => panic!("Original quality must be handled specially"),
}
}
// I'm not entierly sure about the values for bitrates. Double checking would be nice.
pub fn average_bitrate(&self) -> u32 {
match self {
Self::P240 => 400_000,
Self::P360 => 800_000,
Self::P480 => 1200_000,
Self::P720 => 2400_000,
Self::P1080 => 4800_000,
Self::P1440 => 9600_000,
Self::P4k => 16_000_000,
Self::P8k => 28_000_000,
Self::Original => panic!("Original quality must be handled specially"),
}
}
pub fn max_bitrate(&self) -> u32 {
match self {
Self::P240 => 700_000,
Self::P360 => 1400_000,
Self::P480 => 2100_000,
Self::P720 => 4000_000,
Self::P1080 => 8000_000,
Self::P1440 => 12_000_000,
Self::P4k => 28_000_000,
Self::P8k => 40_000_000,
Self::Original => panic!("Original quality must be handled specially"),
}
}
pub fn from_height(height: u32) -> Self {
Self::iter()
.find(|x| x.height() >= height)
.unwrap_or(&Quality::P240)
.clone()
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct InvalidValueError;
impl FromStr for Quality {
type Err = InvalidValueError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"240p" => Ok(Quality::P240),
"360p" => Ok(Quality::P360),
"480p" => Ok(Quality::P480),
"720p" => Ok(Quality::P720),
"1080p" => Ok(Quality::P1080),
"1440p" => Ok(Quality::P1440),
"4k" => Ok(Quality::P4k),
"8k" => Ok(Quality::P8k),
"original" => Ok(Quality::Original),
_ => Err(InvalidValueError),
}
}
}
fn get_transcode_audio_args(audio_idx: u32) -> Vec<String> {
// TODO: Support multi audio qualities.
return vec![
"-map".to_string(),
format!("0:a:{}", audio_idx),
"-c:a".to_string(),
"aac".to_string(),
// TODO: Support 5.1 audio streams.
"-ac".to_string(),
"2".to_string(),
"-b:a".to_string(),
"128k".to_string(),
];
}
fn get_transcode_video_quality_args(quality: &Quality, segment_time: u32) -> Vec<String> {
if *quality == Quality::Original {
return vec!["-map", "0:V:0", "-c:v", "copy"]
.iter()
.map(|a| a.to_string())
.collect();
}
vec![
// superfast or ultrafast would produce a file extremly big so we prever veryfast.
vec![
"-map", "0:V:0", "-c:v", "libx264", "-crf", "21", "-preset", "veryfast",
],
vec![
"-vf",
format!("scale=-2:'min({height},ih)'", height = quality.height()).as_str(),
],
// Even less sure but bufsize are 5x the avergae bitrate since the average bitrate is only
// useful for hls segments.
vec!["-bufsize", (quality.max_bitrate() * 5).to_string().as_str()],
vec!["-b:v", quality.average_bitrate().to_string().as_str()],
vec!["-maxrate", quality.max_bitrate().to_string().as_str()],
// Force segments to be exactly segment_time (only works when transcoding)
vec![
"-force_key_frames",
format!("expr:gte(t,n_forced*{segment_time})").as_str(),
"-strict",
"-2",
"-segment_time_delta",
"0.1",
],
]
.concat()
.iter()
.map(|arg| arg.to_string())
.collect()
}
pub async fn transcode_audio(path: String, audio: u32) -> Result<Child, TranscodeError> {
start_transcode(
&path,
&get_audio_path(&path, audio),
get_transcode_audio_args(audio),
0,
)
.await
.map_err(|e| {
if let TranscodeError::FFmpegError(message) = e {
if message.contains("matches no streams.") {
return TranscodeError::ArgumentError("Invalid audio index".to_string());
}
return TranscodeError::FFmpegError(message);
}
e
})
}
pub async fn transcode_video(
path: String,
quality: Quality,
start_time: u32,
) -> Result<TranscodeInfo, TranscodeError> {
// TODO: Use the out path below once cached segments can be reused.
// let out_dir = format!("/cache/{show_hash}/{quality}");
let uuid: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect();
let out_dir = format!("/cache/{uuid}");
let child = start_transcode(
&path,
&out_dir,
get_transcode_video_quality_args(&quality, SEGMENT_TIME),
start_time,
)
.await?;
Ok(TranscodeInfo {
show: (path, quality),
job: child,
uuid,
last_used: RwLock::new(SystemTime::now()),
})
}
async fn start_transcode(
path: &String,
out_dir: &String,
encode_args: Vec<String>,
start_time: u32,
) -> Result<Child, TranscodeError> {
std::fs::create_dir_all(&out_dir).expect("Could not create cache directory");
let mut cmd = Command::new("ffmpeg");
cmd.args(&["-progress", "pipe:1"])
.args(&["-nostats", "-hide_banner", "-loglevel", "warning"])
.args(&["-ss", start_time.to_string().as_str()])
.args(&["-i", path.as_str()])
.args(&["-f", "hls"])
// Use a .tmp file for segments (.ts files)
.args(&["-hls_flags", "temp_file"])
// Cache can't be allowed since switching quality means starting a new encode for now.
.args(&["-hls_allow_cache", "1"])
// Keep all segments in the list (else only last X are presents, useful for livestreams)
.args(&["-hls_list_size", "0"])
.args(&["-hls_time", SEGMENT_TIME.to_string().as_str()])
.args(&encode_args)
.args(&[
"-hls_segment_filename".to_string(),
format!("{out_dir}/segments-%02d.ts"),
format!("{out_dir}/stream.m3u8"),
])
.stdout(Stdio::piped())
.stderr(Stdio::piped());
println!("Starting a transcode with the command: {:?}", cmd);
let mut child = cmd.spawn().expect("ffmpeg failed to start");
let stdout = child.stdout.take().unwrap();
let (tx, mut rx) = watch::channel(0u32);
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Some(line) = reader.next_line().await.unwrap() {
if let Some((key, value)) = line.find('=').map(|i| line.split_at(i)) {
let value = &value[1..];
// Can't use ms since ms and us are both set to us /shrug
if key == "out_time_us" {
// Sometimes, the value is invalid (or negative), default to 0 in those cases
let _ = tx.send(value.parse::<u32>().unwrap_or(0) / 1_000_000);
}
}
}
});
loop {
// rx.changed() returns an error if the sender is dropped aka if the coroutine 10 lines
// higher has finished aka if the process has finished.
if let Err(_) = rx.changed().await {
let es = child.wait().await.unwrap();
if es.success() {
return Ok(child);
}
let output = child.wait_with_output().await.unwrap();
return Err(TranscodeError::FFmpegError(
String::from_utf8(output.stderr).unwrap(),
));
}
// Wait for 1.5 * segment time after start_time to be ready.
let ready_time = *rx.borrow();
if ready_time >= (1.5 * SEGMENT_TIME as f32) as u32 + start_time {
return Ok(child);
}
}
}
pub fn get_audio_path(path: &String, audio: u32) -> String {
let mut hasher = DefaultHasher::new();
path.hash(&mut hasher);
audio.hash(&mut hasher);
let hash = hasher.finish();
format!("/cache/{hash:x}")
}
pub fn get_cache_path(info: &TranscodeInfo) -> PathBuf {
return get_cache_path_from_uuid(&info.uuid);
}
pub fn get_cache_path_from_uuid(uuid: &String) -> PathBuf {
return PathBuf::from(format!("/cache/{uuid}/", uuid = &uuid));
}
pub struct TranscodeInfo {
pub show: (String, Quality),
pub job: Child,
pub uuid: String,
pub last_used: RwLock<SystemTime>,
}
pub struct AudioInfo {
pub job: Child,
pub last_used: RwLock<SystemTime>,
}

View File

@ -1,50 +0,0 @@
use actix_web::HttpRequest;
use tokio::{io, process::Child};
use crate::error::ApiError;
extern "C" {
fn kill(pid: i32, sig: i32) -> i32;
}
/// Signal the process `pid`
fn signal(pid: i32, signal: i32) -> io::Result<()> {
let ret = unsafe { kill(pid, signal) };
if ret == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}
pub trait Signalable {
/// Signal the thing
fn signal(&mut self, signal: i32) -> io::Result<()>;
/// Send SIGINT
fn interrupt(&mut self) -> io::Result<()> {
self.signal(2)
}
}
impl Signalable for Child {
fn signal(&mut self, signal: i32) -> io::Result<()> {
let id = self.id();
if self.try_wait()?.is_some() || id.is_none() {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid argument: can't signal an exited process",
))
} else {
crate::utils::signal(id.unwrap() as i32, signal)
}
}
}
pub fn get_client_id(req: HttpRequest) -> Result<String, ApiError> {
// return Ok(String::from("1234"));
req.headers().get("x-client-id")
.ok_or(ApiError::BadRequest { error: String::from("Missing client id. Please specify the X-CLIENT-ID header to a guid constant for the lifetime of the player (but unique per instance)."), })
.map(|x| x.to_str().unwrap().to_string())
}

View File

@ -1,111 +0,0 @@
use std::str::FromStr;
use crate::{
error::ApiError,
paths,
state::Transcoder,
transcode::{Quality, TranscodeError},
utils::get_client_id,
};
use actix_files::NamedFile;
use actix_web::{get, web, HttpRequest, Result};
/// Transcode video
///
/// Transcode the video to the selected quality.
/// This route can take a few seconds to respond since it will way for at least one segment to be
/// available.
#[utoipa::path(
responses(
(status = 200, description = "Get the m3u8 playlist."),
(status = NOT_FOUND, description = "Invalid slug.")
),
params(
("resource" = String, Path, description = "Episode or movie"),
("slug" = String, Path, description = "The slug of the movie/episode."),
("quality" = Quality, Path, description = "Specify the quality you want"),
("x-client-id" = String, Header, description = "A unique identify for a player's instance. Used to cancel unused transcode"),
)
)]
#[get("/{resource}/{slug}/{quality}/index.m3u8")]
async fn get_transcoded(
req: HttpRequest,
query: web::Path<(String, String, String)>,
transcoder: web::Data<Transcoder>,
) -> Result<String, ApiError> {
let (resource, slug, quality) = query.into_inner();
let quality = Quality::from_str(quality.as_str()).map_err(|_| ApiError::BadRequest {
error: "Invalid quality".to_string(),
})?;
let client_id = get_client_id(req)?;
let path = paths::get_path(resource, slug)
.await
.map_err(|_| ApiError::NotFound)?;
// TODO: Handle start_time that is not 0
transcoder
.transcode(client_id, path, quality, 0)
.await
.map_err(|e| match e {
TranscodeError::ArgumentError(err) => ApiError::BadRequest { error: err },
TranscodeError::FFmpegError(err) => {
eprintln!(
"Unhandled ffmpeg error occured while transcoding video: {}",
err
);
ApiError::InternalError
}
TranscodeError::ReadError(err) => {
eprintln!(
"Unhandled read error occured while transcoding video: {}",
err
);
ApiError::InternalError
}
})
}
/// Get transmuxed chunk
///
/// Retrieve a chunk of a transmuxed video.
#[utoipa::path(
responses(
(status = 200, description = "Get a hls chunk."),
(status = NOT_FOUND, description = "Invalid slug.")
),
params(
("resource" = String, Path, description = "Episode or movie"),
("slug" = String, Path, description = "The slug of the movie/episode."),
("quality" = Quality, Path, description = "Specify the quality you want"),
("chunk" = u32, Path, description = "The number of the chunk"),
("x-client-id" = String, Header, description = "A unique identify for a player's instance. Used to cancel unused transcode"),
)
)]
#[get("/{resource}/{slug}/{quality}/segments-{chunk}.ts")]
async fn get_chunk(
req: HttpRequest,
query: web::Path<(String, String, String, u32)>,
transcoder: web::Data<Transcoder>,
) -> Result<NamedFile, ApiError> {
let (resource, slug, quality, chunk) = query.into_inner();
let quality = Quality::from_str(quality.as_str()).map_err(|_| ApiError::BadRequest {
error: "Invalid quality".to_string(),
})?;
let client_id = get_client_id(req)?;
let path = paths::get_path(resource, slug)
.await
.map_err(|_| ApiError::NotFound)?;
// TODO: Handle start_time that is not 0
transcoder
.get_segment(client_id, path, quality, chunk)
.await
.map_err(|_| ApiError::BadRequest {
error: "No transcode started for the selected show/quality.".to_string(),
})
.and_then(|path| {
NamedFile::open(path).map_err(|_| ApiError::BadRequest {
error: "Invalid segment number.".to_string(),
})
})
}