Make a clean transcoder state

This commit is contained in:
Zoe Roux 2023-04-11 22:12:06 +09:00
parent 33d212bd84
commit 64adc63920
No known key found for this signature in database
6 changed files with 156 additions and 26 deletions

1
transcoder/Cargo.lock generated
View File

@ -1072,6 +1072,7 @@ version = "0.1.0"
dependencies = [
"actix-files",
"actix-web",
"derive_more",
"rand",
"serde",
"tokio",

View File

@ -9,3 +9,4 @@ actix-files = "0.6.2"
tokio = { version = "1.27.0", features = ["process"] }
serde = { version = "1.0.159", features = ["derive"] }
rand = "0.8.5"
derive_more = "0.99.17"

33
transcoder/src/error.rs Normal file
View File

@ -0,0 +1,33 @@
use actix_web::{
error,
http::{header::ContentType, StatusCode},
HttpResponse,
};
use derive_more::{Display, Error};
#[derive(Debug, Display, Error)]
pub enum ApiError {
#[display(fmt = "{}", error)]
BadRequest { error: String },
#[display(fmt = "An internal error occurred. Please try again later.")]
InternalError,
}
impl error::ResponseError for ApiError {
fn error_response(&self) -> HttpResponse {
HttpResponse::build(self.status_code())
.insert_header(ContentType::json())
.body(format!(
"{{ \"status\": \"{status}\", \"error\": \"{err}\" }}",
status = self.status_code(),
err = self.to_string()
))
}
fn status_code(&self) -> StatusCode {
match *self {
ApiError::BadRequest { error } => StatusCode::BAD_REQUEST,
ApiError::InternalError => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}

View File

@ -1,9 +1,14 @@
use actix_files::NamedFile;
use actix_web::{get, web, App, HttpServer, Result};
use std::str::FromStr;
use crate::transcode::{Quality, TranscoderState};
use actix_files::NamedFile;
use actix_web::{get, web, App, HttpRequest, HttpServer, Result};
use error::ApiError;
use crate::transcode::{Quality, Transcoder};
mod error;
mod paths;
mod transcode;
mod utils;
#[get("/movie/direct/{slug}")]
async fn get_movie_direct(query: web::Path<String>) -> Result<NamedFile> {
@ -13,20 +18,27 @@ async fn get_movie_direct(query: web::Path<String>) -> Result<NamedFile> {
Ok(NamedFile::open_async(path).await?)
}
#[get("/movie/{quality}/{slug}")]
#[get("/movie/{quality}/{slug}/master.m3u8")]
async fn get_movie_auto(
req: HttpRequest,
query: web::Path<(String, String)>,
state: web::Data<TranscoderState>,
) -> Result<NamedFile> {
transcoder: web::Data<Transcoder>,
) -> Result<NamedFile, ApiError> {
let (quality, slug) = query.into_inner();
let quality = Quality::from_str(quality.as_str()).map_err(|_| ApiError::BadRequest {
error: "Invalid quality".to_string(),
})?;
let client_id = req.headers().get("x-client-id")
.ok_or(ApiError::BadRequest { error: String::from("Missing client id. Please specify the X-CLIENT-ID header to a guid constant for the lifetime of the player (but unique per instance)."), })?;
let path = paths::get_movie_path(slug);
Ok(NamedFile::open_async(path).await?)
todo!()
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let state = web::Data::new(TranscoderState::new());
let state = web::Data::new(Transcoder::new());
HttpServer::new(move || {
App::new()

View File

@ -1,21 +1,12 @@
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};
use std::process::{Command, Child};
use std::process::{Child, Command};
use std::str::FromStr;
use std::{collections::HashMap, sync::Mutex};
pub struct TranscoderState {
running: Mutex<HashMap<u32, Child>>,
}
impl TranscoderState {
pub fn new() -> TranscoderState {
Self {
running: Mutex::new(HashMap::new()),
}
}
}
use crate::utils::Signalable;
#[derive(PartialEq, Eq)]
pub enum Quality {
P240,
P360,
@ -28,9 +19,29 @@ pub enum Quality {
Original,
}
imhttps://sgsot3a.sic.shibaura-it.ac.jp/
#[derive(Debug, PartialEq, Eq)]
pub struct InvalidValueError;
fn get_transcode_video_quality_args(quality: Quality) -> Vec<&'static str> {
impl FromStr for Quality {
type Err = InvalidValueError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"240p" => Ok(Quality::P240),
"360p" => Ok(Quality::P360),
"480p" => Ok(Quality::P480),
"720p" => Ok(Quality::P720),
"1080p" => Ok(Quality::P1080),
"1440p" => Ok(Quality::P1440),
"4k" => Ok(Quality::P4k),
"8k" => Ok(Quality::P8k),
"original" => Ok(Quality::Original),
_ => Err(InvalidValueError),
}
}
}
fn get_transcode_video_quality_args(quality: &Quality) -> Vec<&'static str> {
// superfast or ultrafast would produce a file extremly big so we prever veryfast.
let enc_base: Vec<&str> = vec![
"-map", "0:v:0", "-c:v", "libx264", "-crf", "21", "-preset", "veryfast",
@ -50,7 +61,7 @@ fn get_transcode_video_quality_args(quality: Quality) -> Vec<&'static str> {
}
// TODO: Add audios streams (and transcode them only when necesarry)
async fn start_transcode(path: &str, quality: Quality, start_time_sec: f32) -> (String, Child) {
async fn start_transcode(path: &str, quality: &Quality, start_time_sec: f32) -> (String, Child) {
// TODO: Use the out path below once cached segments can be reused.
// let out_dir = format!("/cache/{show_hash}/{quality}");
let uuid: String = thread_rng()
@ -89,6 +100,40 @@ async fn start_transcode(path: &str, quality: Quality, start_time_sec: f32) -> (
(uuid, child)
}
pub async fn transcode(user_id: u32, path: &str, quality: Quality, start_time_sec: f32) {
struct TranscodeInfo {
show: (String, Quality)
job: Child
uuid: String
}
pub struct Transcoder {
running: Mutex<HashMap<String, TranscodeInfo>>,
}
impl Transcoder {
pub fn new() -> Transcoder {
Self {
running: Mutex::new(HashMap::new()),
}
}
pub async fn transcode(
&mut self,
client_id: String,
path: String,
quality: Quality,
start_time_sec: f32,
) {
// TODO: If the stream is not yet up to start_time (and is far), kill it and restart one at the right time.
// TODO: Clear cache at startup/every X time without use.
// TODO: cache transcoded output for a show/quality and reuse it for every future requests.
if let Some(TranscodeInfo{show: (old_path, old_qual), job, uuid}) = self.running.lock().unwrap().get_mut(&client_id) {
if path == *old_path && quality != *old_qual {
job.interrupt();
}
}
let (uuid, job) = start_transcode(&path, &quality, start_time_sec).await;
self.running.lock().unwrap().insert(client_id, TranscodeInfo { show: (path, quality), job, uuid});
}
}

38
transcoder/src/utils.rs Normal file
View File

@ -0,0 +1,38 @@
use std::{io, process::Child};
extern "C" {
fn kill(pid: i32, sig: i32) -> i32;
}
/// Signal the process `pid`
fn signal(pid: i32, signal: i32) -> io::Result<()> {
let ret = unsafe { kill(pid, signal) };
if ret == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}
pub trait Signalable {
/// Signal the thing
fn signal(&mut self, signal: i32) -> io::Result<()>;
/// Send SIGINT
fn interrupt(&mut self) -> io::Result<()> {
self.signal(2)
}
}
impl Signalable for Child {
fn signal(&mut self, signal: i32) -> io::Result<()> {
if self.try_wait()?.is_some() {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid argument: can't signal an exited process",
))
} else {
crate::utils::signal(self.id() as i32, signal)
}
}
}