Added support for storing transcoder metadata in S3

Signed-off-by: Fred Heinecke <fred.heinecke@yahoo.com>
This commit is contained in:
solidDoWant 2025-04-28 06:30:40 +00:00 committed by Zoe Roux
parent cf7bc456e8
commit 265386f289
18 changed files with 1041 additions and 138 deletions

View File

@ -4,8 +4,6 @@
# where to store temporary transcoded files
GOCODER_CACHE_ROOT="/cache"
# where to store extracted data (subtitles/attachments/comptuted thumbnails for scrubbing)
GOCODER_METADATA_ROOT="/metadata"
# path prefix needed to reach the http endpoint
GOCODER_PREFIX=""
# base absolute path that contains video files (everything in this directory can be served)
@ -40,3 +38,26 @@ POSTGRES_SSLMODE="disable"
# If this is not "disabled", the schema will be created (if it does not exists) and
# the search_path of the user will be ignored (only the schema specified will be used).
POSTGRES_SCHEMA=gocoder
# Storage backend
# There are two currently supported backends: local filesystem and s3.
# S3 must be used when running multiple instances of the service. The local filesystem is fine
# for a single instance.
# Local filesystem
GOCODER_METADATA_ROOT="/metadata"
# S3
# Setting this configures the transcoder to use S3 as a backend.
# S3_BUCKET_NAME=my-transcoder-bucket
# All environment variables supported by the AWS SDK for Go (v2) are supported:
# https://docs.aws.amazon.com/sdkref/latest/guide/settings-reference.html#EVarSettings
# AWS_ACCESS_KEY_ID=abc123
# AWS_SECRET_ACCESS_KEY=def456
# AWS_ENDPOINT_URL_S3=https://s3.my-ceph-rgw-deployment.example
# Unless you're running on an actual EC2 instance, you should set this to true.
# This will disable the SDK from trying to use the EC2 metadata service to get credentials,
# reducing startup time.
# If you are actually using an IAM role profile for authentication, this should be set to false
# or be left unset.
AWS_EC2_METADATA_DISABLED="true"

View File

@ -1,8 +1,6 @@
module github.com/zoriya/kyoo/transcoder
go 1.23.0
toolchain go1.24.2
go 1.24.2
require (
github.com/disintegration/imaging v1.6.2
@ -14,6 +12,31 @@ require (
)
require (
github.com/aws/aws-sdk-go-v2 v1.36.3
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3
golang.org/x/sync v0.13.0
)
require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect
github.com/aws/smithy-go v1.22.2 // indirect
)
require (
github.com/aws/aws-sdk-go-v2/config v1.29.14
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/labstack/gommon v0.4.2 // indirect

View File

@ -2,6 +2,42 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14=
github.com/aws/aws-sdk-go-v2/config v1.29.14 h1:f+eEi/2cKCg9pqKBoAIwRGzVb70MRKqWX4dg1BDcSJM=
github.com/aws/aws-sdk-go-v2/config v1.29.14/go.mod h1:wVPHWcIFv3WO89w0rE10gzf17ZYy+UVS1Geq8Iei34g=
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 h1:9KxtdcIA/5xPNQyZRgUSpYOE6j9Bc4+D7nZua0KGYOM=
github.com/aws/aws-sdk-go-v2/credentials v1.17.67/go.mod h1:p3C44m+cfnbv763s52gCqrjaqyPikj9Sg47kUVaNZQQ=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mlnXuFrO4cOd3HLBroh1paFw=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 h1:ZNTqv4nIdE/DiBfUUfXcLZ/Spcuz+RjeziUtNJackkM=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34/go.mod h1:zf7Vcd1ViW7cPqYWEHLHJkS50X0JS2IKz9Cgaj6ugrs=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 h1:4nm2G6A4pV9rdlWzGMPv4BNtQp22v1hg3yrtkYpeLl8=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1/go.mod h1:iu6FSzgt+M2/x3Dk8zhycdIcHjEFb36IS8HVUVFoMg0=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 h1:moLQUoVq91LiqT1nbvzDukyqAlCv89ZmwaHw/ZFlFZg=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15/go.mod h1:ZH34PJUc8ApjBIfgQCFvkWcUDBtl/WTD+uiYHjd8igA=
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 h1:BRXS0U76Z8wfF+bnkilA2QwpIch6URlm++yPUt9QPmQ=
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3/go.mod h1:bNXKFFyaiVvWuR6O16h/I1724+aXe/tAkA9/QS01t5k=
github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 h1:1Gw+9ajCV1jogloEv1RRnvfRFia2cL6c9cuKV2Ps+G8=
github.com/aws/aws-sdk-go-v2/service/sso v1.25.3/go.mod h1:qs4a9T5EMLl/Cajiw2TcbNt2UNo/Hqlyp+GiuG4CFDI=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 h1:hXmVKytPfTy5axZ+fYbR5d0cFmC3JvwLm5kM83luako=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1/go.mod h1:MlYRNmYu/fGPoxBQVvBYr9nyr948aY/WLUvwBMBJubs=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 h1:1XuUZ8mYJw9B6lzAkXhqHlJd/XvaX32evhproijJEZY=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19/go.mod h1:cQnB8CUnxbMU82JvlqjKR2HBOm3fe9pWorWBza6MBJ4=
github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dhui/dktest v0.4.3 h1:wquqUxAFdcUgabAVLvSCOKOlag5cIZuaOjYIBOWdsR0=
@ -81,6 +117,8 @@ golang.org/x/image v0.23.0 h1:HseQ7c2OpPKTPVzNjG5fwJsOTCiiwS4QdsYi5XU6H68=
golang.org/x/image v0.23.0/go.mod h1:wJJBTdLfCCf3tiHa1fNxpZmUI4mmoZvwMCPP0ddoNKY=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=

View File

@ -2,11 +2,15 @@ package main
import (
"fmt"
"io"
"mime"
"net/http"
"path/filepath"
"strconv"
"github.com/zoriya/kyoo/transcoder/src"
"github.com/zoriya/kyoo/transcoder/src/api"
"github.com/zoriya/kyoo/transcoder/src/utils"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
@ -43,7 +47,7 @@ func (h *Handler) GetMaster(c echo.Context) error {
return err
}
ret, err := h.transcoder.GetMaster(path, client, sha)
ret, err := h.transcoder.GetMaster(c.Request().Context(), path, client, sha)
if err != nil {
return err
}
@ -75,7 +79,7 @@ func (h *Handler) GetVideoIndex(c echo.Context) error {
return err
}
ret, err := h.transcoder.GetVideoIndex(path, uint32(video), quality, client, sha)
ret, err := h.transcoder.GetVideoIndex(c.Request().Context(), path, uint32(video), quality, client, sha)
if err != nil {
return err
}
@ -103,7 +107,7 @@ func (h *Handler) GetAudioIndex(c echo.Context) error {
return err
}
ret, err := h.transcoder.GetAudioIndex(path, uint32(audio), client, sha)
ret, err := h.transcoder.GetAudioIndex(c.Request().Context(), path, uint32(audio), client, sha)
if err != nil {
return err
}
@ -138,6 +142,7 @@ func (h *Handler) GetVideoSegment(c echo.Context) error {
}
ret, err := h.transcoder.GetVideoSegment(
c.Request().Context(),
path,
uint32(video),
quality,
@ -174,7 +179,7 @@ func (h *Handler) GetAudioSegment(c echo.Context) error {
return err
}
ret, err := h.transcoder.GetAudioSegment(path, uint32(audio), segment, client, sha)
ret, err := h.transcoder.GetAudioSegment(c.Request().Context(), path, uint32(audio), segment, client, sha)
if err != nil {
return err
}
@ -192,7 +197,7 @@ func (h *Handler) GetInfo(c echo.Context) error {
return err
}
ret, err := h.metadata.GetMetadata(path, sha)
ret, err := h.metadata.GetMetadata(c.Request().Context(), path, sha)
if err != nil {
return err
}
@ -208,7 +213,7 @@ func (h *Handler) GetInfo(c echo.Context) error {
// Get a specific attachment.
//
// Path: /:path/attachment/:name
func (h *Handler) GetAttachment(c echo.Context) error {
func (h *Handler) GetAttachment(c echo.Context) (err error) {
_, sha, err := GetPath(c)
if err != nil {
return err
@ -218,11 +223,18 @@ func (h *Handler) GetAttachment(c echo.Context) error {
return err
}
ret, err := h.metadata.GetAttachmentPath(sha, false, name)
attachementStream, err := h.metadata.GetAttachment(c.Request().Context(), sha, name)
if err != nil {
return err
}
return c.File(ret)
defer utils.CleanupWithErr(&err, attachementStream.Close, "failed to close attachment reader")
mimeType, err := guessMimeType(name, attachementStream)
if err != nil {
return fmt.Errorf("failed to guess mime type: %w", err)
}
return c.Stream(200, mimeType, attachementStream)
}
// Get subtitle
@ -230,7 +242,7 @@ func (h *Handler) GetAttachment(c echo.Context) error {
// Get a specific subtitle.
//
// Path: /:path/subtitle/:name
func (h *Handler) GetSubtitle(c echo.Context) error {
func (h *Handler) GetSubtitle(c echo.Context) (err error) {
_, sha, err := GetPath(c)
if err != nil {
return err
@ -240,11 +252,23 @@ func (h *Handler) GetSubtitle(c echo.Context) error {
return err
}
ret, err := h.metadata.GetAttachmentPath(sha, true, name)
subtitleStream, err := h.metadata.GetSubtitle(c.Request().Context(), sha, name)
if err != nil {
return err
}
return c.File(ret)
defer utils.CleanupWithErr(&err, subtitleStream.Close, "failed to close subtitle reader")
mimeType, err := guessMimeType(name, subtitleStream)
if err != nil {
return fmt.Errorf("failed to guess mime type: %w", err)
}
// Default the mime type to text/plain if it is not recognized
if mimeType == "" {
mimeType = "text/plain"
}
return c.Stream(200, mimeType, subtitleStream)
}
// Get thumbnail sprite
@ -252,17 +276,19 @@ func (h *Handler) GetSubtitle(c echo.Context) error {
// Get a sprite file containing all the thumbnails of the show.
//
// Path: /:path/thumbnails.png
func (h *Handler) GetThumbnails(c echo.Context) error {
func (h *Handler) GetThumbnails(c echo.Context) (err error) {
path, sha, err := GetPath(c)
if err != nil {
return err
}
sprite, _, err := h.metadata.GetThumb(path, sha)
sprite, err := h.metadata.GetThumbSprite(c.Request().Context(), path, sha)
if err != nil {
return err
}
defer utils.CleanupWithErr(&err, sprite.Close, "failed to close thumbnail sprite reader")
return c.File(sprite)
return c.Stream(200, "image/png", sprite)
}
// Get thumbnail vtt
@ -271,17 +297,19 @@ func (h *Handler) GetThumbnails(c echo.Context) error {
// https://developer.bitmovin.com/playback/docs/webvtt-based-thumbnails for more info.
//
// Path: /:path/:resource/:slug/thumbnails.vtt
func (h *Handler) GetThumbnailsVtt(c echo.Context) error {
func (h *Handler) GetThumbnailsVtt(c echo.Context) (err error) {
path, sha, err := GetPath(c)
if err != nil {
return err
}
_, vtt, err := h.metadata.GetThumb(path, sha)
vtt, err := h.metadata.GetThumbVtt(c.Request().Context(), path, sha)
if err != nil {
return err
}
defer utils.CleanupWithErr(&err, vtt.Close, "failed to close thumbnail vtt reader")
return c.File(vtt)
return c.Stream(200, "text/vtt", vtt)
}
type Handler struct {
@ -289,21 +317,67 @@ type Handler struct {
metadata *src.MetadataService
}
// Try to guess the mime type of a file based on its extension.
// If the extension is not recognized, return an empty string.
// If path is provided, it should contain a file extension (i.e. ".mp4").
// If content is provided, it should be of type io.ReadSeeker. Instances of other types are ignored.
// This implementation is based upon http.ServeContent.
func guessMimeType(path string, content any) (string, error) {
// This does not match a large number of different types that are likely in use.
// TODO add telemetry to see what file extensions are used, then add logic
// to detect the type based on the file extension.
mimeType := ""
// First check the file extension, if there is one.
ext := filepath.Ext(path)
if ext != "" {
if mimeType = mime.TypeByExtension(ext); mimeType != "" {
return mimeType, nil
}
}
// Try reading the first few bytes of the file to guess the mime type.
// Only do this if seeking is supported
if reader, ok := content.(io.ReadSeeker); ok {
// 512 bytes is the most that DetectContentType will consider, so no
// need to read more than that.
var buf [512]byte
n, _ := io.ReadFull(reader, buf[:])
mimeType = http.DetectContentType(buf[:n])
// Reset the reader to the beginning of the file
_, err := reader.Seek(0, io.SeekStart)
if err != nil {
return "", fmt.Errorf("mime type guesser failed to seek to beginning of file: %w", err)
}
}
return mimeType, nil
}
func main() {
e := echo.New()
if err := run(e); err != nil {
e.Logger.Fatal(err)
}
}
func run(e *echo.Echo) (err error) {
e.Use(middleware.Logger())
e.HTTPErrorHandler = ErrorHandler
metadata, err := src.NewMetadataService()
if err != nil {
e.Logger.Fatal(err)
return
return fmt.Errorf("failed to create metadata service: %w", err)
}
defer utils.CleanupWithErr(&err, metadata.Close, "failed to close metadata service")
transcoder, err := src.NewTranscoder(metadata)
if err != nil {
e.Logger.Fatal(err)
return
return fmt.Errorf("failed to create transcoder: %w", err)
}
h := Handler{
transcoder: transcoder,
metadata: metadata,
@ -325,5 +399,8 @@ func main() {
api.RegisterPProfHandlers(e)
e.Logger.Fatal(e.Start(":7666"))
if err := e.Start(":7666"); err != nil {
return fmt.Errorf("failed to start server: %w", err)
}
return nil
}

View File

@ -1,21 +1,28 @@
package src
import (
"context"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"github.com/zoriya/kyoo/transcoder/src/storage"
"github.com/zoriya/kyoo/transcoder/src/utils"
"golang.org/x/sync/errgroup"
)
const ExtractVersion = 1
func (s *MetadataService) ExtractSubs(info *MediaInfo) (interface{}, error) {
func (s *MetadataService) ExtractSubs(ctx context.Context, info *MediaInfo) (interface{}, error) {
get_running, set := s.extractLock.Start(info.Sha)
if get_running != nil {
return get_running()
}
err := extractSubs(info)
err := s.extractSubs(ctx, info)
if err != nil {
return set(nil, err)
}
@ -23,32 +30,70 @@ func (s *MetadataService) ExtractSubs(info *MediaInfo) (interface{}, error) {
return set(nil, err)
}
func (s *MetadataService) GetAttachmentPath(sha string, is_sub bool, name string) (string, error) {
func (s *MetadataService) GetAttachment(ctx context.Context, sha string, name string) (io.ReadCloser, error) {
_, err := s.extractLock.WaitFor(sha)
if err != nil {
return "", err
return nil, err
}
dir := "att"
if is_sub {
dir = "sub"
itemPath := fmt.Sprintf("%s/%s/%s", sha, "att", name)
item, err := s.storage.GetItem(ctx, itemPath)
if err != nil {
return nil, fmt.Errorf("failed to get attachment with path %q: %w", itemPath, err)
}
return fmt.Sprintf("%s/%s/%s/%s", Settings.Metadata, sha, dir, name), nil
return item, nil
}
func extractSubs(info *MediaInfo) error {
defer printExecTime("extraction of %s", info.Path)()
func (s *MetadataService) GetSubtitle(ctx context.Context, sha string, name string) (io.ReadCloser, error) {
_, err := s.extractLock.WaitFor(sha)
if err != nil {
return nil, err
}
attachment_path := fmt.Sprintf("%s/%s/att", Settings.Metadata, info.Sha)
subs_path := fmt.Sprintf("%s/%s/sub", Settings.Metadata, info.Sha)
itemPath := fmt.Sprintf("%s/%s/%s", sha, "sub", name)
os.MkdirAll(attachment_path, 0o755)
os.MkdirAll(subs_path, 0o755)
item, err := s.storage.GetItem(ctx, itemPath)
if err != nil {
return nil, fmt.Errorf("failed to get subtitle with path %q: %w", itemPath, err)
}
return item, nil
}
func (s *MetadataService) extractSubs(ctx context.Context, info *MediaInfo) (err error) {
defer utils.PrintExecTime("extraction of %s", info.Path)()
// If there is no subtitles, there is nothing to extract (also fonts would be useless).
if len(info.Subtitles) == 0 {
return nil
}
// Create a temporary directory for writing attachments
// TODO if the transcoder ever uses the ffmpeg library directly, remove this
// and write directly to storage via stream instead
tempWorkingDirectory := filepath.Join(os.TempDir(), info.Sha)
if err := os.MkdirAll(tempWorkingDirectory, 0660); err != nil {
return fmt.Errorf("failed to create temporary directory: %w", err)
}
attachmentWorkingDirectory := filepath.Join(tempWorkingDirectory, "att")
if err := os.MkdirAll(attachmentWorkingDirectory, 0660); err != nil {
return fmt.Errorf("failed to create attachment directory: %w", err)
}
subtitlesWorkingDirectory := filepath.Join(tempWorkingDirectory, "sub")
if err := os.MkdirAll(subtitlesWorkingDirectory, 0660); err != nil {
return fmt.Errorf("failed to create subtitles directory: %w", err)
}
defer utils.CleanupWithErr(
&err, func() error {
return os.RemoveAll(attachmentWorkingDirectory)
},
"failed to remove attachment working directory",
)
// Dump the attachments and subtitles
cmd := exec.Command(
"ffmpeg",
"-dump_attachment:t", "",
@ -56,7 +101,7 @@ func extractSubs(info *MediaInfo) error {
"-y",
"-i", info.Path,
)
cmd.Dir = attachment_path
cmd.Dir = attachmentWorkingDirectory
for _, sub := range info.Subtitles {
if ext := sub.Extension; ext != nil {
@ -68,16 +113,39 @@ func extractSubs(info *MediaInfo) error {
cmd.Args,
"-map", fmt.Sprintf("0:s:%d", *sub.Index),
"-c:s", "copy",
fmt.Sprintf("%s/%d.%s", subs_path, *sub.Index, *ext),
fmt.Sprintf("%s/%d.%s", subtitlesWorkingDirectory, *sub.Index, *ext),
)
}
}
log.Printf("Starting extraction with the command: %s", cmd)
cmd.Stdout = nil
err := cmd.Run()
if err != nil {
if err := cmd.Run(); err != nil {
fmt.Println("Error starting ffmpeg extract:", err)
return err
}
// Move attachments and subtitles to the storage backend
var saveGroup errgroup.Group
saveGroup.Go(func() error {
err := storage.SaveFilesToBackend(ctx, s.storage, attachmentWorkingDirectory, filepath.Join(info.Sha, "att"))
if err != nil {
return fmt.Errorf("failed to save attachments to backend: %w", err)
}
return nil
})
saveGroup.Go(func() error {
err := storage.SaveFilesToBackend(ctx, s.storage, subtitlesWorkingDirectory, filepath.Join(info.Sha, "sub"))
if err != nil {
return fmt.Errorf("failed to save subtitles to backend: %w", err)
}
return nil
})
if err := saveGroup.Wait(); err != nil {
return fmt.Errorf("failed while saving files to backend: %w", err)
}
return nil
}

View File

@ -1,12 +1,15 @@
package src
import (
"context"
"fmt"
"log"
"math"
"os"
"strings"
"sync"
"github.com/zoriya/kyoo/transcoder/src/utils"
)
type FileStream struct {
@ -24,7 +27,7 @@ type VideoKey struct {
quality Quality
}
func (t *Transcoder) newFileStream(path string, sha string) *FileStream {
func (t *Transcoder) newFileStream(ctx context.Context, path string, sha string) *FileStream {
ret := &FileStream{
transcoder: t,
Out: fmt.Sprintf("%s/%s", Settings.Outpath, sha),
@ -35,7 +38,7 @@ func (t *Transcoder) newFileStream(path string, sha string) *FileStream {
ret.ready.Add(1)
go func() {
defer ret.ready.Done()
info, err := t.metadataService.GetMetadata(path, sha)
info, err := t.metadataService.GetMetadata(ctx, path, sha)
ret.Info = info
if err != nil {
ret.err = err
@ -107,7 +110,7 @@ func (fs *FileStream) GetMaster() string {
}
if def_video != nil {
qualities := Filter(Qualities, func(quality Quality) bool {
qualities := utils.Filter(Qualities, func(quality Quality) bool {
return quality.Height() < def_video.Height
})
transcode_count := len(qualities)

View File

@ -12,6 +12,7 @@ import (
"sync"
"time"
"github.com/zoriya/kyoo/transcoder/src/utils"
"golang.org/x/text/language"
"gopkg.in/vansante/go-ffprobe.v2"
)
@ -228,7 +229,7 @@ var SubtitleExtensions = map[string]string{
}
func RetriveMediaInfo(path string, sha string) (*MediaInfo, error) {
defer printExecTime("mediainfo for %s", path)()
defer utils.PrintExecTime("mediainfo for %s", path)()
ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFn()

View File

@ -11,6 +11,7 @@ import (
"sync"
"github.com/lib/pq"
"github.com/zoriya/kyoo/transcoder/src/utils"
)
const KeyframeVersion = 1
@ -162,7 +163,7 @@ func (s *MetadataService) GetKeyframes(info *MediaInfo, isVideo bool, idx uint32
// Returns when all key frames are retrived (or an error occurs)
// info.ready.Done() is called when more than 100 are retrived (or extraction is done)
func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error {
defer printExecTime("ffprobe keyframe analysis for %s video n%d", path, video_idx)()
defer utils.PrintExecTime("ffprobe keyframe analysis for %s video n%d", path, video_idx)()
// run ffprobe to return all IFrames, IFrames are points where we can split the video in segments.
// We ask ffprobe to return the time of each frame and it's flags
// We could ask it to return only i-frames (keyframes) with the -skip_frame nokey but using it is extremly slow
@ -254,7 +255,7 @@ const DummyKeyframeDuration = float64(4)
// we can pretty much cut audio at any point so no need to get specific frames, just cut every 4s
func getAudioKeyframes(info *MediaInfo, audio_idx uint32, kf *Keyframe) error {
defer printExecTime("ffprobe keyframe analysis for %s audio n%d", info.Path, audio_idx)()
defer utils.PrintExecTime("ffprobe keyframe analysis for %s audio n%d", info.Path, audio_idx)()
// Format's duration CAN be different than audio's duration. To make sure we do not
// miss a segment or make one more, we need to check the audio's duration.
//

View File

@ -1,17 +1,21 @@
package src
import (
"context"
"database/sql"
"encoding/base64"
"errors"
"fmt"
"net/url"
"os"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/lib/pq"
_ "github.com/lib/pq"
"github.com/zoriya/kyoo/transcoder/src/storage"
)
type MetadataService struct {
@ -20,9 +24,60 @@ type MetadataService struct {
thumbLock RunLock[string, interface{}]
extractLock RunLock[string, interface{}]
keyframeLock RunLock[KeyframeKey, *Keyframe]
storage storage.StorageBackend
}
func NewMetadataService() (*MetadataService, error) {
ctx := context.TODO()
s := &MetadataService{
lock: NewRunLock[string, *MediaInfo](),
thumbLock: NewRunLock[string, interface{}](),
extractLock: NewRunLock[string, interface{}](),
keyframeLock: NewRunLock[KeyframeKey, *Keyframe](),
}
db, err := s.setupDb()
if err != nil {
return nil, fmt.Errorf("failed to setup database: %w", err)
}
s.database = db
storage, err := s.setupStorage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to setup storage: %w", err)
}
s.storage = storage
return s, nil
}
func (s *MetadataService) Close() error {
cleanupErrs := make([]error, 0, 2)
if s.database != nil {
err := s.database.Close()
if err != nil {
cleanupErrs = append(cleanupErrs, fmt.Errorf("failed to close database: %w", err))
}
}
if s.storage != nil {
if storageCloser, ok := s.storage.(storage.StorageBackendCloser); ok {
err := storageCloser.Close()
if err != nil {
cleanupErrs = append(cleanupErrs, fmt.Errorf("failed to close storage: %w", err))
}
}
}
if err := errors.Join(cleanupErrs...); err != nil {
return fmt.Errorf("failed to cleanup resources: %w", err)
}
return nil
}
func (s *MetadataService) setupDb() (*sql.DB, error) {
schema := GetEnvOr("POSTGRES_SCHEMA", "gocoder")
connectionString := os.Getenv("POSTGRES_URL")
@ -64,26 +119,44 @@ func NewMetadataService() (*MetadataService, error) {
}
m.Up()
return &MetadataService{
database: db,
lock: NewRunLock[string, *MediaInfo](),
thumbLock: NewRunLock[string, interface{}](),
extractLock: NewRunLock[string, interface{}](),
keyframeLock: NewRunLock[KeyframeKey, *Keyframe](),
}, nil
return db, nil
}
func (s *MetadataService) GetMetadata(path string, sha string) (*MediaInfo, error) {
func (s *MetadataService) setupStorage(ctx context.Context) (storage.StorageBackend, error) {
s3BucketName := os.Getenv("S3_BUCKET_NAME")
if s3BucketName != "" {
// Use S3 storage
// Create the client (use all standard AWS config sources like env vars, config files, etc.)
awsConfig, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}
s3Client := s3.NewFromConfig(awsConfig)
return storage.NewS3StorageBackend(s3Client, s3BucketName), nil
}
// Use local file storage
storageRoot := GetEnvOr("GOCODER_METADATA_ROOT", "/metadata")
localStorage, err := storage.NewFileStorageBackend(storageRoot)
if err != nil {
return nil, fmt.Errorf("failed to create local storage backend: %w", err)
}
return localStorage, nil
}
func (s *MetadataService) GetMetadata(ctx context.Context, path string, sha string) (*MediaInfo, error) {
ret, err := s.getMetadata(path, sha)
if err != nil {
return nil, err
}
if ret.Versions.Thumbs < ThumbsVersion {
go s.ExtractThumbs(path, sha)
go s.ExtractThumbs(ctx, path, sha)
}
if ret.Versions.Extract < ExtractVersion {
go s.ExtractSubs(ret)
go s.ExtractSubs(ctx, ret)
}
if ret.Versions.Keyframes < KeyframeVersion && ret.Versions.Keyframes != 0 {
for _, video := range ret.Videos {
@ -229,6 +302,10 @@ func (s *MetadataService) storeFreshMetadata(path string, sha string) (*MediaInf
}
tx, err := s.database.Begin()
if err != nil {
return set(ret, err)
}
// it needs to be a delete instead of a on conflict do update because we want to trigger delete casquade for
// videos/audios & co.
tx.Exec(`delete from info where path = $1`, path)

View File

@ -15,7 +15,6 @@ func GetEnvOr(env string, def string) string {
type SettingsT struct {
Outpath string
Metadata string
RoutePrefix string
SafePath string
HwAccel HwAccelT
@ -32,7 +31,6 @@ type HwAccelT struct {
var Settings = SettingsT{
// we manually add a folder to make sure we do not delete user data.
Outpath: path.Join(GetEnvOr("GOCODER_CACHE_ROOT", "/cache"), "kyoo_cache"),
Metadata: GetEnvOr("GOCODER_METADATA_ROOT", "/metadata"),
RoutePrefix: GetEnvOr("GOCODER_PREFIX", ""),
SafePath: GetEnvOr("GOCODER_SAFE_PATH", "/video"),
HwAccel: DetectHardwareAccel(),

View File

@ -0,0 +1,201 @@
package storage
import (
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
"github.com/zoriya/kyoo/transcoder/src/utils"
)
type FileStorageBackend struct {
// Base directory for the storage backend
baseDirectory string
baseDirectoryRoot *os.Root
}
// NewFileStorageBackend creates a new FileStorageBackend with the specified base directory.
func NewFileStorageBackend(baseDirectory string) (*FileStorageBackend, error) {
// Attempt to create the directory if it doesn't exist
// This should be the only filesystem call in this file that does not use os.Root.
// This is to prevent directory traversal attacks when the provided input is untrusted.
if err := os.MkdirAll(baseDirectory, 0770); err != nil {
return nil, fmt.Errorf("failed to create storage base directory: %w", err)
}
root, err := os.OpenRoot(baseDirectory)
if err != nil {
return nil, fmt.Errorf("failed to open storage base directory %q: %w", baseDirectory, err)
}
return &FileStorageBackend{
baseDirectory: baseDirectory,
baseDirectoryRoot: root,
}, nil
}
func (fsb *FileStorageBackend) Close() error {
if fsb.baseDirectoryRoot != nil {
return fsb.baseDirectoryRoot.Close()
}
return nil
}
// DoesItemExist checks if an item exists in the file storage backend.
func (fsb *FileStorageBackend) DoesItemExist(_ context.Context, path string) (bool, error) {
_, err := fsb.baseDirectoryRoot.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, fmt.Errorf("failed to check if item %q exists: %w", path, err)
}
return true, nil
}
// ListItemsWithPrefix returns a list of items in the storage backend that match the given prefix.
func (fsb *FileStorageBackend) ListItemsWithPrefix(_ context.Context, pathPrefix string) ([]string, error) {
var items []string
rootFS := fsb.baseDirectoryRoot.FS()
// This is split so that a smaller subset of files are checked, rather than literally everything under the base directory.
// All matching files for the path prefix will also have the prefixDirPath as their parent directory.
prefixDirPath := filepath.Dir(pathPrefix)
err := fs.WalkDir(rootFS, prefixDirPath, func(path string, d fs.DirEntry, err error) error {
if err != nil {
// This can happen if prefixDirPath does not exist. The walk function will handle
// checking this.
if os.IsNotExist(err) {
return fs.SkipDir
}
return fmt.Errorf("failed on %q while walking directory %q: %w", path, pathPrefix, err)
}
// If the path does not start with the prefix, skip it.
if !strings.HasPrefix(path, pathPrefix) {
if d.IsDir() {
// Skip directories that do not match the prefix
return fs.SkipDir
}
return nil
}
// Collect matching non-directory items
if !d.IsDir() {
items = append(items, path)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to walk directory %q: %w", pathPrefix, err)
}
return items, nil
}
// DeleteItem deletes an item from the storage backend. If the item does not exist, it returns nil.
func (fsb *FileStorageBackend) DeleteItem(_ context.Context, path string) error {
err := fsb.baseDirectoryRoot.Remove(path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to delete item %q: %w", path, err)
}
return nil
}
// DeleteItemsWithPrefix deletes all items in the storage backend that match the given prefix.
// Deletion should be "syncronous" (i.e. the function should block until the write is complete).
func (fsb *FileStorageBackend) DeleteItemsWithPrefix(ctx context.Context, pathPrefix string) error {
// Unfortunately this implementation is needed until https://go-review.googlesource.com/c/go/+/661595 is released.
// The new os.Root type does not yet have a RemoveAll method. The next Go release will have this.
// Once RemoveAll is available, this function can be reduced to a single ReadDir call, a filter, and a RemoveAll call.
// Get all items with the prefix
items, err := fsb.ListItemsWithPrefix(ctx, pathPrefix)
if err != nil {
return fmt.Errorf("failed to list items with prefix %q: %w", pathPrefix, err)
}
// Delete all items. This will leave behind empty directories, but that shouldn't really matter. A future
// implementation that uses os.Root.RemoveAll will handle this.
for _, item := range items {
err = fsb.DeleteItem(ctx, item)
if err != nil {
return fmt.Errorf("failed to delete item %q: %w", item, err)
}
}
return nil
}
// SaveItemWithCallback saves an item to the storage backend. If the item already exists, it overwrites it.
// The writeContents function is called with a writer to write the contents of the item.
func (fsb *FileStorageBackend) SaveItemWithCallback(ctx context.Context, path string, writeContents ContentsWriterCallback) (err error) {
// Open the file for writing
file, err := fsb.openFileForWriting(path)
if err != nil {
return fmt.Errorf("failed to open %q for writing: %w", path, err)
}
defer utils.CleanupWithErr(&err, file.Close, "failed to close file %q", path)
// Write the contents using the provided callback
if err := writeContents(ctx, file); err != nil {
return fmt.Errorf("failed to write contents to file %q: %w", path, err)
}
return nil
}
// SaveItem saves an item to the storage backend. If the item already exists, it overwrites it.
func (fsb *FileStorageBackend) SaveItem(ctx context.Context, path string, contents io.Reader) (err error) {
// Open the file for writing
file, err := fsb.openFileForWriting(path)
if err != nil {
return fmt.Errorf("failed to open %q for writing: %w", path, err)
}
defer utils.CleanupWithErr(&err, file.Close, "failed to close file %q", path)
// Copy the contents to the file
if _, err := io.Copy(file, contents); err != nil {
return fmt.Errorf("failed to copy contents to file %q: %w", path, err)
}
return nil
}
// openFileForWriting opens a file for writing. If the file already exists, it overwrites it.
// The parent directory is created if it doesn't exist.
// This function is used internally to create files in the storage backend.
// The returned file should be closed by the caller.
func (fsb *FileStorageBackend) openFileForWriting(path string) (*os.File, error) {
// Create the parent directory if it doesn't exist
dir := filepath.Dir(path)
if err := fsb.baseDirectoryRoot.Mkdir(dir, 0770); err != nil {
return nil, fmt.Errorf("failed to create directory %q: %w", dir, err)
}
// Open the file for writing
file, err := fsb.baseDirectoryRoot.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0660)
if err != nil {
return nil, fmt.Errorf("failed to create file %q: %w", path, err)
}
return file, nil
}
// GetItem retrieves an item from the storage backend.
func (fsb *FileStorageBackend) GetItem(_ context.Context, path string) (io.ReadCloser, error) {
file, err := fsb.baseDirectoryRoot.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open file %q: %w", path, err)
}
return file, nil
}

View File

@ -0,0 +1,226 @@
package storage
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/zoriya/kyoo/transcoder/src/utils"
"golang.org/x/sync/errgroup"
)
var noSuchKeyError = &s3types.NoSuchKey{}
// S3StorageBackend is a struct that implements the StorageBackend interface,
// backed by S3-compatible object storage.
// It is up to the user to ensure that the object storage service supports
// consistent reads, writes, and deletes. This known to be supported by:
// - AWS S3
// - MinIO
// - Ceph RGW
// This is known to _not_ be supported by:
// - SeaweedFS
type S3StorageBackend struct {
s3Client *s3.Client
bucket string
}
// NewS3StorageBackend creates a new S3StorageBackend with the specified bucket name and S3 client.
func NewS3StorageBackend(s3Client *s3.Client, bucket string) *S3StorageBackend {
return &S3StorageBackend{
s3Client: s3Client,
bucket: bucket,
}
}
// DoesItemExist checks if an item exists in the file storage backend.
func (ssb *S3StorageBackend) DoesItemExist(ctx context.Context, path string) (bool, error) {
_, err := ssb.s3Client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &ssb.bucket,
Key: &path,
})
if err != nil {
var responseError *awshttp.ResponseError
if errors.As(err, &responseError) && responseError.ResponseError.HTTPStatusCode() == http.StatusNotFound {
return false, nil
}
return false, fmt.Errorf("failed to check if item %q exists in bucket %q: %w", path, ssb.bucket, err)
}
return true, nil
}
// ListItemsWithPrefix returns a list of items in the storage backend that match the given prefix.
func (ssb *S3StorageBackend) ListItemsWithPrefix(ctx context.Context, pathPrefix string) ([]string, error) {
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: &ssb.bucket,
Prefix: &pathPrefix,
}
paginator := s3.NewListObjectsV2Paginator(ssb.s3Client, listObjectsInput)
var items []string
for paginator.HasMorePages() {
resp, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list items with prefix %q in bucket %q: %w", pathPrefix, ssb.bucket, err)
}
for _, item := range resp.Contents {
if item.Key != nil {
items = append(items, *item.Key)
}
}
}
return items, nil
}
// DeleteItem deletes an item from the storage backend. If the item does not exist, it returns nil.
func (ssb *S3StorageBackend) DeleteItem(ctx context.Context, path string) error {
_, err := ssb.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: &ssb.bucket,
Key: &path,
})
if err != nil {
if errors.Is(err, noSuchKeyError) {
return nil
}
return fmt.Errorf("failed to delete item %q from bucket %q: %w", path, ssb.bucket, err)
}
return nil
}
// DeleteItemsWithPrefix deletes all items in the storage backend that match the given prefix.
// Deletion should be "syncronous" (i.e. the function should block until the write is complete).
func (ssb *S3StorageBackend) DeleteItemsWithPrefix(ctx context.Context, pathPrefix string) error {
// Get all items with the prefix
items, err := ssb.ListItemsWithPrefix(ctx, pathPrefix)
if err != nil {
return fmt.Errorf("failed to list items with prefix %q: %w", pathPrefix, err)
}
// Fast path: if there are no items, return early.
if len(items) == 0 {
return nil
}
// Delete all items. This uses the DeleteObjects API call, which is more efficient
// than deleting each item individually.
// The API call supports up to 1000 items at a time, so chunk the items if needed.
chunkSize := min(len(items), 1000)
chunkItems := make([]s3types.ObjectIdentifier, chunkSize)
var deletionRequests errgroup.Group
for i := range items {
item := items[i]
chunkIndex := i % chunkSize
chunkItems[chunkIndex] = s3types.ObjectIdentifier{
Key: &item,
}
// If the chunk is full, delete the objects in this chunk.
if chunkIndex == chunkSize-1 {
deletionRequests.Go(func() error {
_, err := ssb.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
Bucket: &ssb.bucket,
Delete: &s3types.Delete{
Objects: chunkItems,
// Only include keys in the response that encountered an error.
Quiet: aws.Bool(true),
},
})
if err != nil {
chunkNumber := 1 + i/chunkSize // Int division in Go rounds down.
// TODO if the error doesn't include sufficient information, the below line
// will need to pull in error details from the response.
return fmt.Errorf("failed to delete items in chunk %d with prefix %q: %w", chunkNumber, pathPrefix, err)
}
return nil
})
}
}
err = deletionRequests.Wait()
if err != nil {
return fmt.Errorf("failed to delete one or more items with prefix %q: %w", pathPrefix, err)
}
return nil
}
// SaveItemWithCallback saves an item to the storage backend. If the item already exists, it overwrites it.
// The writeContents function is called with a writer to write the contents of the item.
func (ssb *S3StorageBackend) SaveItemWithCallback(ctx context.Context, path string, writeContents ContentsWriterCallback) (err error) {
// Create a pipe to connect the writer and reader.
pr, pw := io.Pipe()
defer utils.CleanupWithErr(&err, pr.Close, "failed to close pipe reader")
// Start a goroutine to write to the pipe.
// Writing and reading must occur concurrently to avoid deadlocks.
// Use a separate context for the writer to allow cancellation if the upload fails. This is important to avoid
// a hung goroutine leak if the upload fails.
writeCtx, cancel := context.WithCancel(ctx)
defer cancel()
var writerGroup errgroup.Group
writerGroup.Go(func() (err error) {
defer utils.CleanupWithErr(&err, pw.Close, "failed to close pipe writer")
return writeContents(writeCtx, pw)
})
// Wait for the write to complete and check for errors.
// This should always happen even if saving fails, to prevent a goroutine leak.
defer utils.CleanupWithErr(&err, writerGroup.Wait, "writer callback failed")
// Upload the object to S3 using the pipe as the body.
if err := ssb.SaveItem(ctx, path, pr); err != nil {
// Ensure that the writer context is cancelled prior to awaiting for the writer to finish.
// This is important to avoid a hung goroutine leak if the upload fails.
cancel()
return err
}
return nil
}
// SaveItem saves an item to the storage backend. If the item already exists, it overwrites it.
func (ssb *S3StorageBackend) SaveItem(ctx context.Context, path string, contents io.Reader) error {
// Upload the object to S3 using the provided reader as the body.
_, err := ssb.s3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &ssb.bucket,
Key: &path,
Body: contents,
})
if err != nil {
return fmt.Errorf("failed to save item %q to bucket %q: %w", path, ssb.bucket, err)
}
return nil
}
// GetItem retrieves an item from the storage backend.
func (ssb *S3StorageBackend) GetItem(ctx context.Context, path string) (io.ReadCloser, error) {
resp, err := ssb.s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &ssb.bucket,
Key: &path,
})
if err != nil {
return nil, fmt.Errorf("failed to get item %q from bucket %q: %w", path, ssb.bucket, err)
}
return resp.Body, nil
}

View File

@ -0,0 +1,95 @@
package storage
import (
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"github.com/zoriya/kyoo/transcoder/src/utils"
"golang.org/x/sync/errgroup"
)
type ContentsWriterCallback func(ctx context.Context, writer io.Writer) error
// Handles storage and retrieval of items in a storage backend.
// "Items" are pieces of data that can be stored and retrieved.
// Paths with ".." are not allowed, and may result in unexpected behavior.
type StorageBackend interface {
// DoesItemExist checks if an item exists in the storage backend.
DoesItemExist(ctx context.Context, path string) (bool, error)
// ListItemsWithPrefix returns a list of items in the storage backend that match the given prefix.
// Note: returned items may have a "/" in them, e.g. "foo/bar/baz".
ListItemsWithPrefix(ctx context.Context, pathPrefix string) ([]string, error)
// DeleteItem deletes an item from the storage backend. If the item does not exist, it returns nil.
// Deletion should be "synchronous" (i.e. the function should block until the write is complete).
DeleteItem(ctx context.Context, path string) error
// DeleteItemsWithPrefix deletes all items in the storage backend that match the given prefix.
// Deletion should be "synchronous" (i.e. the function should block until the write is complete).
DeleteItemsWithPrefix(ctx context.Context, pathPrefix string) error
// SaveItemWithCallback saves an item to the storage backend. If the item already exists, it overwrites it.
// The writeContents function is called with a writer to write the contents of the item.
// Writes should be "synchronous" (i.e. the function should block until the write is complete).
SaveItemWithCallback(ctx context.Context, path string, writeContents ContentsWriterCallback) error
// SaveItem saves an item to the storage backend. If the item already exists, it overwrites it.
SaveItem(ctx context.Context, path string, contents io.Reader) error
// GetItem retrieves an item from the storage backend.
GetItem(ctx context.Context, path string) (io.ReadCloser, error)
}
type StorageBackendCloser interface {
StorageBackend
// Close closes the storage backend. This should be called when the backend is no longer needed.
Close() error
}
// SaveFilesToBackend is a helper function that saves files from the source directory to the backend storage.
func SaveFilesToBackend(ctx context.Context, backend StorageBackend, sourceDirectory, destinationBasePath string) error {
// Allow for save in parallel. Running in series can be slow with many files.
var saveGroup errgroup.Group
err := filepath.WalkDir(sourceDirectory, func(sourcePath string, d fs.DirEntry, err error) error {
if err != nil {
return fmt.Errorf("error walking the path %q: %v", sourcePath, err)
}
if d.IsDir() {
return nil
}
// Path relative to the source directory
relativePath, err := filepath.Rel(sourceDirectory, sourcePath)
if err != nil {
return err
}
// Destination path in the backend storage
destinationPath := filepath.Join(destinationBasePath, relativePath)
saveGroup.Go(func() (err error) {
file, err := os.Open(sourcePath)
if err != nil {
return fmt.Errorf("failed to open file %q: %w", sourcePath, err)
}
utils.CleanupWithErr(&err, file.Close, "failed to close file %q", sourcePath)
if err := backend.SaveItem(ctx, destinationPath, file); err != nil {
return fmt.Errorf("failed to save file %q to backend: %w", sourcePath, err)
}
return nil
})
return nil
})
if err != nil {
return fmt.Errorf("failed while walking over files to save to the backend: %w", err)
}
if err := saveGroup.Wait(); err != nil {
return fmt.Errorf("failed to save files to backend: %w", err)
}
return nil
}

View File

@ -8,6 +8,7 @@ import (
"regexp"
"strings"
"github.com/zoriya/kyoo/transcoder/src/utils"
"golang.org/x/text/language"
)
@ -72,7 +73,7 @@ outer:
// "hi" by itself means a language code, but when combined with other lang flags it means Hearing Impaired.
// In case Hindi was not detected before, but "hi" is present, assume it is Hindi.
if sub.Language == nil {
hiCount := Count(flags, "hi")
hiCount := utils.Count(flags, "hi")
if hiCount > 0 {
languageStr := language.Hindi.String()
sub.Language = &languageStr

View File

@ -1,17 +1,19 @@
package src
import (
"context"
"encoding/base64"
"fmt"
"image"
"image/color"
"io"
"log"
"math"
"os"
"path/filepath"
"strings"
"sync"
"github.com/disintegration/imaging"
"github.com/zoriya/kyoo/transcoder/src/utils"
"gitlab.com/opennota/screengen"
)
@ -29,33 +31,54 @@ type Thumbnail struct {
const ThumbsVersion = 1
func getThumbGlob(sha string) string {
return fmt.Sprintf("%s/%s/thumbs-v*.*", Settings.Metadata, sha)
// getThumbGlob returns the path prefix for all thumbnail files for a given sha.
func getThumbPrefix(sha string) string {
return sha
}
func getThumbPath(sha string) string {
return fmt.Sprintf("%s/%s/thumbs-v%d.png", Settings.Metadata, sha, ThumbsVersion)
return fmt.Sprintf("%s/thumbs-v%d.png", sha, ThumbsVersion)
}
func getThumbVttPath(sha string) string {
return fmt.Sprintf("%s/%s/thumbs-v%d.vtt", Settings.Metadata, sha, ThumbsVersion)
return fmt.Sprintf("%s/thumbs-v%d.vtt", sha, ThumbsVersion)
}
func (s *MetadataService) GetThumb(path string, sha string) (string, string, error) {
_, err := s.ExtractThumbs(path, sha)
func (s *MetadataService) GetThumbVtt(ctx context.Context, path string, sha string) (io.ReadCloser, error) {
_, err := s.ExtractThumbs(ctx, path, sha)
if err != nil {
return "", "", err
return nil, err
}
return getThumbPath(sha), getThumbVttPath(sha), nil
vttPath := getThumbVttPath(sha)
vtt, err := s.storage.GetItem(ctx, vttPath)
if err != nil {
return nil, fmt.Errorf("failed to get thumbnail vtt with path %q: %w", vttPath, err)
}
return vtt, nil
}
func (s *MetadataService) ExtractThumbs(path string, sha string) (interface{}, error) {
func (s *MetadataService) GetThumbSprite(ctx context.Context, path string, sha string) (io.ReadCloser, error) {
_, err := s.ExtractThumbs(ctx, path, sha)
if err != nil {
return nil, err
}
spritePath := getThumbPath(sha)
sprite, err := s.storage.GetItem(ctx, spritePath)
if err != nil {
return nil, fmt.Errorf("failed to get thumbnail sprite with path %q: %w", spritePath, err)
}
return sprite, nil
}
func (s *MetadataService) ExtractThumbs(ctx context.Context, path string, sha string) (interface{}, error) {
get_running, set := s.thumbLock.Start(sha)
if get_running != nil {
return get_running()
}
err := extractThumbnail(path, sha)
err := s.extractThumbnail(ctx, path, sha)
if err != nil {
return set(nil, err)
}
@ -63,12 +86,18 @@ func (s *MetadataService) ExtractThumbs(path string, sha string) (interface{}, e
return set(nil, err)
}
func extractThumbnail(path string, sha string) error {
defer printExecTime("extracting thumbnails for %s", path)()
func (s *MetadataService) extractThumbnail(ctx context.Context, path string, sha string) (err error) {
defer utils.PrintExecTime("extracting thumbnails for %s", path)()
os.MkdirAll(fmt.Sprintf("%s/%s", Settings.Metadata, sha), 0o755)
vttPath := getThumbVttPath(sha)
spritePath := getThumbPath(sha)
newItemPaths := []string{spritePath, vttPath}
if _, err := os.Stat(getThumbPath(sha)); err == nil {
doAllExist, err := s.doAllThumbnailFilesExist(ctx, newItemPaths)
if err != nil {
return fmt.Errorf("failed to check if thumbnail files exist: %w", err)
}
if doAllExist {
return nil
}
@ -77,7 +106,7 @@ func extractThumbnail(path string, sha string) error {
log.Printf("Error reading video file: %v", err)
return err
}
defer gen.Close()
defer utils.CleanupWithErr(&err, gen.Close, "failed to close screengen generator")
gen.Fast = true
@ -128,26 +157,49 @@ func extractThumbnail(path string, sha string) error {
)
}
// Cleanup old versions of thumbnails
files, err := filepath.Glob(getThumbGlob(sha))
if err == nil {
for _, f := range files {
// ignore errors
os.Remove(f)
}
// Cleanup old thumbnails
if err := s.storage.DeleteItemsWithPrefix(ctx, getThumbPrefix(sha)); err != nil {
return fmt.Errorf("failed to delete old thumbnails: %w", err)
}
err = os.WriteFile(getThumbVttPath(sha), []byte(vtt), 0o644)
// Store the new items
// Thumbnail vtt
if err = s.storage.SaveItem(ctx, vttPath, strings.NewReader(vtt)); err != nil {
return fmt.Errorf("failed to save thumbnail vtt with path %q: %w", vttPath, err)
}
// Thumbnail sprite
spriteFormat, err := imaging.FormatFromFilename(spritePath)
if err != nil {
return err
}
err = imaging.Save(sprite, getThumbPath(sha))
err = s.storage.SaveItemWithCallback(ctx, spritePath, func(_ context.Context, writer io.Writer) error {
if err := imaging.Encode(writer, sprite, spriteFormat); err != nil {
return fmt.Errorf("failed to encode thumbnail sprite: %w", err)
}
return nil
})
if err != nil {
return err
return fmt.Errorf("failed to save thumbnail sprite with path %q: %w", spritePath, err)
}
return nil
}
func (s *MetadataService) doAllThumbnailFilesExist(ctx context.Context, filePaths []string) (bool, error) {
for _, filePath := range filePaths {
doesExist, err := s.storage.DoesItemExist(ctx, filePath)
if err != nil {
return false, fmt.Errorf("failed to check if thumbnail file %q exists: %w", filePath, err)
}
if !doesExist {
return false, nil
}
}
return true, nil
}
func tsToVttTime(ts int) string {
return fmt.Sprintf("%02d:%02d:%02d.000", ts/3600, (ts/60)%60, ts%60)
}

View File

@ -1,6 +1,7 @@
package src
import (
"context"
"os"
"path"
)
@ -36,9 +37,9 @@ func NewTranscoder(metadata *MetadataService) (*Transcoder, error) {
return ret, nil
}
func (t *Transcoder) getFileStream(path string, sha string) (*FileStream, error) {
func (t *Transcoder) getFileStream(ctx context.Context, path string, sha string) (*FileStream, error) {
ret, _ := t.streams.GetOrCreate(sha, func() *FileStream {
return t.newFileStream(path, sha)
return t.newFileStream(ctx, path, sha)
})
ret.ready.Wait()
if ret.err != nil {
@ -48,8 +49,8 @@ func (t *Transcoder) getFileStream(path string, sha string) (*FileStream, error)
return ret, nil
}
func (t *Transcoder) GetMaster(path string, client string, sha string) (string, error) {
stream, err := t.getFileStream(path, sha)
func (t *Transcoder) GetMaster(ctx context.Context, path string, client string, sha string) (string, error) {
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}
@ -66,13 +67,14 @@ func (t *Transcoder) GetMaster(path string, client string, sha string) (string,
}
func (t *Transcoder) GetVideoIndex(
ctx context.Context,
path string,
video uint32,
quality Quality,
client string,
sha string,
) (string, error) {
stream, err := t.getFileStream(path, sha)
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}
@ -89,12 +91,13 @@ func (t *Transcoder) GetVideoIndex(
}
func (t *Transcoder) GetAudioIndex(
ctx context.Context,
path string,
audio uint32,
client string,
sha string,
) (string, error) {
stream, err := t.getFileStream(path, sha)
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}
@ -110,6 +113,7 @@ func (t *Transcoder) GetAudioIndex(
}
func (t *Transcoder) GetVideoSegment(
ctx context.Context,
path string,
video uint32,
quality Quality,
@ -117,7 +121,7 @@ func (t *Transcoder) GetVideoSegment(
client string,
sha string,
) (string, error) {
stream, err := t.getFileStream(path, sha)
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}
@ -134,13 +138,14 @@ func (t *Transcoder) GetVideoSegment(
}
func (t *Transcoder) GetAudioSegment(
ctx context.Context,
path string,
audio uint32,
segment int32,
client string,
sha string,
) (string, error) {
stream, err := t.getFileStream(path, sha)
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}

View File

@ -1,38 +0,0 @@
package src
import (
"fmt"
"log"
"time"
)
func printExecTime(message string, args ...any) func() {
msg := fmt.Sprintf(message, args...)
start := time.Now()
log.Printf("Running %s", msg)
return func() {
log.Printf("%s finished in %s", msg, time.Since(start))
}
}
func Filter[E any](s []E, f func(E) bool) []E {
s2 := make([]E, 0, len(s))
for _, e := range s {
if f(e) {
s2 = append(s2, e)
}
}
return s2
}
// Count returns the number of elements in s that are equal to e.
func Count[S []E, E comparable](s S, e E) int {
var n int
for _, v := range s {
if v == e {
n++
}
}
return n
}

View File

@ -0,0 +1,54 @@
package utils
import (
"errors"
"fmt"
"log"
"time"
)
func PrintExecTime(message string, args ...any) func() {
msg := fmt.Sprintf(message, args...)
start := time.Now()
log.Printf("Running %s", msg)
return func() {
log.Printf("%s finished in %s", msg, time.Since(start))
}
}
func Filter[E any](s []E, f func(E) bool) []E {
s2 := make([]E, 0, len(s))
for _, e := range s {
if f(e) {
s2 = append(s2, e)
}
}
return s2
}
// Count returns the number of elements in s that are equal to e.
func Count[S []E, E comparable](s S, e E) int {
var n int
for _, v := range s {
if v == e {
n++
}
}
return n
}
// CleanupWithErr runs a cleanup function and checks if it returns an error.
// If the cleanup function returns an error, it is joined with the original error
// and assigned to the original error pointer.
func CleanupWithErr(err *error, fn func() error, msg string, args ...any) {
cleanupErr := fn()
if err == nil {
return
}
if cleanupErr != nil {
*err = fmt.Errorf("%s: %w", fmt.Sprintf(msg, args...), cleanupErr)
}
*err = errors.Join(*err, cleanupErr)
}