diff --git a/transcoder/.env.example b/transcoder/.env.example index d33d1b9d..ba297618 100644 --- a/transcoder/.env.example +++ b/transcoder/.env.example @@ -4,8 +4,6 @@ # where to store temporary transcoded files GOCODER_CACHE_ROOT="/cache" -# where to store extracted data (subtitles/attachments/comptuted thumbnails for scrubbing) -GOCODER_METADATA_ROOT="/metadata" # path prefix needed to reach the http endpoint GOCODER_PREFIX="" # base absolute path that contains video files (everything in this directory can be served) @@ -40,3 +38,26 @@ POSTGRES_SSLMODE="disable" # If this is not "disabled", the schema will be created (if it does not exists) and # the search_path of the user will be ignored (only the schema specified will be used). POSTGRES_SCHEMA=gocoder + +# Storage backend +# There are two currently supported backends: local filesystem and s3. +# S3 must be used when running multiple instances of the service. The local filesystem is fine +# for a single instance. + +# Local filesystem +GOCODER_METADATA_ROOT="/metadata" + +# S3 +# Setting this configures the transcoder to use S3 as a backend. +# S3_BUCKET_NAME=my-transcoder-bucket +# All environment variables supported by the AWS SDK for Go (v2) are supported: +# https://docs.aws.amazon.com/sdkref/latest/guide/settings-reference.html#EVarSettings +# AWS_ACCESS_KEY_ID=abc123 +# AWS_SECRET_ACCESS_KEY=def456 +# AWS_ENDPOINT_URL_S3=https://s3.my-ceph-rgw-deployment.example +# Unless you're running on an actual EC2 instance, you should set this to true. +# This will disable the SDK from trying to use the EC2 metadata service to get credentials, +# reducing startup time. +# If you are actually using an IAM role profile for authentication, this should be set to false +# or be left unset. +AWS_EC2_METADATA_DISABLED="true" diff --git a/transcoder/go.mod b/transcoder/go.mod index 6f473112..d5ba6fb6 100644 --- a/transcoder/go.mod +++ b/transcoder/go.mod @@ -1,8 +1,6 @@ module github.com/zoriya/kyoo/transcoder -go 1.23.0 - -toolchain go1.24.2 +go 1.24.2 require ( github.com/disintegration/imaging v1.6.2 @@ -14,6 +12,31 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2 v1.36.3 + github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 + golang.org/x/sync v0.13.0 +) + +require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect + github.com/aws/smithy-go v1.22.2 // indirect +) + +require ( + github.com/aws/aws-sdk-go-v2/config v1.29.14 github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/labstack/gommon v0.4.2 // indirect diff --git a/transcoder/go.sum b/transcoder/go.sum index f04ffba1..a16e1e60 100644 --- a/transcoder/go.sum +++ b/transcoder/go.sum @@ -2,6 +2,42 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25 github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM= +github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14= +github.com/aws/aws-sdk-go-v2/config v1.29.14 h1:f+eEi/2cKCg9pqKBoAIwRGzVb70MRKqWX4dg1BDcSJM= +github.com/aws/aws-sdk-go-v2/config v1.29.14/go.mod h1:wVPHWcIFv3WO89w0rE10gzf17ZYy+UVS1Geq8Iei34g= +github.com/aws/aws-sdk-go-v2/credentials v1.17.67 h1:9KxtdcIA/5xPNQyZRgUSpYOE6j9Bc4+D7nZua0KGYOM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.67/go.mod h1:p3C44m+cfnbv763s52gCqrjaqyPikj9Sg47kUVaNZQQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mlnXuFrO4cOd3HLBroh1paFw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 h1:ZNTqv4nIdE/DiBfUUfXcLZ/Spcuz+RjeziUtNJackkM= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34/go.mod h1:zf7Vcd1ViW7cPqYWEHLHJkS50X0JS2IKz9Cgaj6ugrs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 h1:4nm2G6A4pV9rdlWzGMPv4BNtQp22v1hg3yrtkYpeLl8= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1/go.mod h1:iu6FSzgt+M2/x3Dk8zhycdIcHjEFb36IS8HVUVFoMg0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 h1:moLQUoVq91LiqT1nbvzDukyqAlCv89ZmwaHw/ZFlFZg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15/go.mod h1:ZH34PJUc8ApjBIfgQCFvkWcUDBtl/WTD+uiYHjd8igA= +github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 h1:BRXS0U76Z8wfF+bnkilA2QwpIch6URlm++yPUt9QPmQ= +github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3/go.mod h1:bNXKFFyaiVvWuR6O16h/I1724+aXe/tAkA9/QS01t5k= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 h1:1Gw+9ajCV1jogloEv1RRnvfRFia2cL6c9cuKV2Ps+G8= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.3/go.mod h1:qs4a9T5EMLl/Cajiw2TcbNt2UNo/Hqlyp+GiuG4CFDI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 h1:hXmVKytPfTy5axZ+fYbR5d0cFmC3JvwLm5kM83luako= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1/go.mod h1:MlYRNmYu/fGPoxBQVvBYr9nyr948aY/WLUvwBMBJubs= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 h1:1XuUZ8mYJw9B6lzAkXhqHlJd/XvaX32evhproijJEZY= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.19/go.mod h1:cQnB8CUnxbMU82JvlqjKR2HBOm3fe9pWorWBza6MBJ4= +github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= +github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dhui/dktest v0.4.3 h1:wquqUxAFdcUgabAVLvSCOKOlag5cIZuaOjYIBOWdsR0= @@ -81,6 +117,8 @@ golang.org/x/image v0.23.0 h1:HseQ7c2OpPKTPVzNjG5fwJsOTCiiwS4QdsYi5XU6H68= golang.org/x/image v0.23.0/go.mod h1:wJJBTdLfCCf3tiHa1fNxpZmUI4mmoZvwMCPP0ddoNKY= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= +golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= diff --git a/transcoder/main.go b/transcoder/main.go index acd8844a..e58326f7 100644 --- a/transcoder/main.go +++ b/transcoder/main.go @@ -2,11 +2,15 @@ package main import ( "fmt" + "io" + "mime" "net/http" + "path/filepath" "strconv" "github.com/zoriya/kyoo/transcoder/src" "github.com/zoriya/kyoo/transcoder/src/api" + "github.com/zoriya/kyoo/transcoder/src/utils" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" @@ -43,7 +47,7 @@ func (h *Handler) GetMaster(c echo.Context) error { return err } - ret, err := h.transcoder.GetMaster(path, client, sha) + ret, err := h.transcoder.GetMaster(c.Request().Context(), path, client, sha) if err != nil { return err } @@ -75,7 +79,7 @@ func (h *Handler) GetVideoIndex(c echo.Context) error { return err } - ret, err := h.transcoder.GetVideoIndex(path, uint32(video), quality, client, sha) + ret, err := h.transcoder.GetVideoIndex(c.Request().Context(), path, uint32(video), quality, client, sha) if err != nil { return err } @@ -103,7 +107,7 @@ func (h *Handler) GetAudioIndex(c echo.Context) error { return err } - ret, err := h.transcoder.GetAudioIndex(path, uint32(audio), client, sha) + ret, err := h.transcoder.GetAudioIndex(c.Request().Context(), path, uint32(audio), client, sha) if err != nil { return err } @@ -138,6 +142,7 @@ func (h *Handler) GetVideoSegment(c echo.Context) error { } ret, err := h.transcoder.GetVideoSegment( + c.Request().Context(), path, uint32(video), quality, @@ -174,7 +179,7 @@ func (h *Handler) GetAudioSegment(c echo.Context) error { return err } - ret, err := h.transcoder.GetAudioSegment(path, uint32(audio), segment, client, sha) + ret, err := h.transcoder.GetAudioSegment(c.Request().Context(), path, uint32(audio), segment, client, sha) if err != nil { return err } @@ -192,7 +197,7 @@ func (h *Handler) GetInfo(c echo.Context) error { return err } - ret, err := h.metadata.GetMetadata(path, sha) + ret, err := h.metadata.GetMetadata(c.Request().Context(), path, sha) if err != nil { return err } @@ -208,7 +213,7 @@ func (h *Handler) GetInfo(c echo.Context) error { // Get a specific attachment. // // Path: /:path/attachment/:name -func (h *Handler) GetAttachment(c echo.Context) error { +func (h *Handler) GetAttachment(c echo.Context) (err error) { _, sha, err := GetPath(c) if err != nil { return err @@ -218,11 +223,18 @@ func (h *Handler) GetAttachment(c echo.Context) error { return err } - ret, err := h.metadata.GetAttachmentPath(sha, false, name) + attachementStream, err := h.metadata.GetAttachment(c.Request().Context(), sha, name) if err != nil { return err } - return c.File(ret) + defer utils.CleanupWithErr(&err, attachementStream.Close, "failed to close attachment reader") + + mimeType, err := guessMimeType(name, attachementStream) + if err != nil { + return fmt.Errorf("failed to guess mime type: %w", err) + } + + return c.Stream(200, mimeType, attachementStream) } // Get subtitle @@ -230,7 +242,7 @@ func (h *Handler) GetAttachment(c echo.Context) error { // Get a specific subtitle. // // Path: /:path/subtitle/:name -func (h *Handler) GetSubtitle(c echo.Context) error { +func (h *Handler) GetSubtitle(c echo.Context) (err error) { _, sha, err := GetPath(c) if err != nil { return err @@ -240,11 +252,23 @@ func (h *Handler) GetSubtitle(c echo.Context) error { return err } - ret, err := h.metadata.GetAttachmentPath(sha, true, name) + subtitleStream, err := h.metadata.GetSubtitle(c.Request().Context(), sha, name) if err != nil { return err } - return c.File(ret) + defer utils.CleanupWithErr(&err, subtitleStream.Close, "failed to close subtitle reader") + + mimeType, err := guessMimeType(name, subtitleStream) + if err != nil { + return fmt.Errorf("failed to guess mime type: %w", err) + } + + // Default the mime type to text/plain if it is not recognized + if mimeType == "" { + mimeType = "text/plain" + } + + return c.Stream(200, mimeType, subtitleStream) } // Get thumbnail sprite @@ -252,17 +276,19 @@ func (h *Handler) GetSubtitle(c echo.Context) error { // Get a sprite file containing all the thumbnails of the show. // // Path: /:path/thumbnails.png -func (h *Handler) GetThumbnails(c echo.Context) error { +func (h *Handler) GetThumbnails(c echo.Context) (err error) { path, sha, err := GetPath(c) if err != nil { return err } - sprite, _, err := h.metadata.GetThumb(path, sha) + + sprite, err := h.metadata.GetThumbSprite(c.Request().Context(), path, sha) if err != nil { return err } + defer utils.CleanupWithErr(&err, sprite.Close, "failed to close thumbnail sprite reader") - return c.File(sprite) + return c.Stream(200, "image/png", sprite) } // Get thumbnail vtt @@ -271,17 +297,19 @@ func (h *Handler) GetThumbnails(c echo.Context) error { // https://developer.bitmovin.com/playback/docs/webvtt-based-thumbnails for more info. // // Path: /:path/:resource/:slug/thumbnails.vtt -func (h *Handler) GetThumbnailsVtt(c echo.Context) error { +func (h *Handler) GetThumbnailsVtt(c echo.Context) (err error) { path, sha, err := GetPath(c) if err != nil { return err } - _, vtt, err := h.metadata.GetThumb(path, sha) + + vtt, err := h.metadata.GetThumbVtt(c.Request().Context(), path, sha) if err != nil { return err } + defer utils.CleanupWithErr(&err, vtt.Close, "failed to close thumbnail vtt reader") - return c.File(vtt) + return c.Stream(200, "text/vtt", vtt) } type Handler struct { @@ -289,21 +317,67 @@ type Handler struct { metadata *src.MetadataService } +// Try to guess the mime type of a file based on its extension. +// If the extension is not recognized, return an empty string. +// If path is provided, it should contain a file extension (i.e. ".mp4"). +// If content is provided, it should be of type io.ReadSeeker. Instances of other types are ignored. +// This implementation is based upon http.ServeContent. +func guessMimeType(path string, content any) (string, error) { + // This does not match a large number of different types that are likely in use. + // TODO add telemetry to see what file extensions are used, then add logic + // to detect the type based on the file extension. + mimeType := "" + + // First check the file extension, if there is one. + ext := filepath.Ext(path) + if ext != "" { + if mimeType = mime.TypeByExtension(ext); mimeType != "" { + return mimeType, nil + } + } + + // Try reading the first few bytes of the file to guess the mime type. + // Only do this if seeking is supported + if reader, ok := content.(io.ReadSeeker); ok { + // 512 bytes is the most that DetectContentType will consider, so no + // need to read more than that. + var buf [512]byte + n, _ := io.ReadFull(reader, buf[:]) + mimeType = http.DetectContentType(buf[:n]) + + // Reset the reader to the beginning of the file + _, err := reader.Seek(0, io.SeekStart) + if err != nil { + return "", fmt.Errorf("mime type guesser failed to seek to beginning of file: %w", err) + } + } + + return mimeType, nil +} + func main() { e := echo.New() + + if err := run(e); err != nil { + e.Logger.Fatal(err) + } +} + +func run(e *echo.Echo) (err error) { e.Use(middleware.Logger()) e.HTTPErrorHandler = ErrorHandler metadata, err := src.NewMetadataService() if err != nil { - e.Logger.Fatal(err) - return + return fmt.Errorf("failed to create metadata service: %w", err) } + defer utils.CleanupWithErr(&err, metadata.Close, "failed to close metadata service") + transcoder, err := src.NewTranscoder(metadata) if err != nil { - e.Logger.Fatal(err) - return + return fmt.Errorf("failed to create transcoder: %w", err) } + h := Handler{ transcoder: transcoder, metadata: metadata, @@ -325,5 +399,8 @@ func main() { api.RegisterPProfHandlers(e) - e.Logger.Fatal(e.Start(":7666")) + if err := e.Start(":7666"); err != nil { + return fmt.Errorf("failed to start server: %w", err) + } + return nil } diff --git a/transcoder/src/extract.go b/transcoder/src/extract.go index 0e72e7db..2d225fb5 100644 --- a/transcoder/src/extract.go +++ b/transcoder/src/extract.go @@ -1,21 +1,28 @@ package src import ( + "context" "fmt" + "io" "log" "os" "os/exec" + "path/filepath" + + "github.com/zoriya/kyoo/transcoder/src/storage" + "github.com/zoriya/kyoo/transcoder/src/utils" + "golang.org/x/sync/errgroup" ) const ExtractVersion = 1 -func (s *MetadataService) ExtractSubs(info *MediaInfo) (interface{}, error) { +func (s *MetadataService) ExtractSubs(ctx context.Context, info *MediaInfo) (interface{}, error) { get_running, set := s.extractLock.Start(info.Sha) if get_running != nil { return get_running() } - err := extractSubs(info) + err := s.extractSubs(ctx, info) if err != nil { return set(nil, err) } @@ -23,32 +30,70 @@ func (s *MetadataService) ExtractSubs(info *MediaInfo) (interface{}, error) { return set(nil, err) } -func (s *MetadataService) GetAttachmentPath(sha string, is_sub bool, name string) (string, error) { +func (s *MetadataService) GetAttachment(ctx context.Context, sha string, name string) (io.ReadCloser, error) { _, err := s.extractLock.WaitFor(sha) if err != nil { - return "", err + return nil, err } - dir := "att" - if is_sub { - dir = "sub" + + itemPath := fmt.Sprintf("%s/%s/%s", sha, "att", name) + + item, err := s.storage.GetItem(ctx, itemPath) + if err != nil { + return nil, fmt.Errorf("failed to get attachment with path %q: %w", itemPath, err) } - return fmt.Sprintf("%s/%s/%s/%s", Settings.Metadata, sha, dir, name), nil + return item, nil } -func extractSubs(info *MediaInfo) error { - defer printExecTime("extraction of %s", info.Path)() +func (s *MetadataService) GetSubtitle(ctx context.Context, sha string, name string) (io.ReadCloser, error) { + _, err := s.extractLock.WaitFor(sha) + if err != nil { + return nil, err + } - attachment_path := fmt.Sprintf("%s/%s/att", Settings.Metadata, info.Sha) - subs_path := fmt.Sprintf("%s/%s/sub", Settings.Metadata, info.Sha) + itemPath := fmt.Sprintf("%s/%s/%s", sha, "sub", name) - os.MkdirAll(attachment_path, 0o755) - os.MkdirAll(subs_path, 0o755) + item, err := s.storage.GetItem(ctx, itemPath) + if err != nil { + return nil, fmt.Errorf("failed to get subtitle with path %q: %w", itemPath, err) + } + return item, nil +} + +func (s *MetadataService) extractSubs(ctx context.Context, info *MediaInfo) (err error) { + defer utils.PrintExecTime("extraction of %s", info.Path)() // If there is no subtitles, there is nothing to extract (also fonts would be useless). if len(info.Subtitles) == 0 { return nil } + // Create a temporary directory for writing attachments + // TODO if the transcoder ever uses the ffmpeg library directly, remove this + // and write directly to storage via stream instead + tempWorkingDirectory := filepath.Join(os.TempDir(), info.Sha) + if err := os.MkdirAll(tempWorkingDirectory, 0660); err != nil { + return fmt.Errorf("failed to create temporary directory: %w", err) + } + + attachmentWorkingDirectory := filepath.Join(tempWorkingDirectory, "att") + if err := os.MkdirAll(attachmentWorkingDirectory, 0660); err != nil { + return fmt.Errorf("failed to create attachment directory: %w", err) + } + + subtitlesWorkingDirectory := filepath.Join(tempWorkingDirectory, "sub") + if err := os.MkdirAll(subtitlesWorkingDirectory, 0660); err != nil { + return fmt.Errorf("failed to create subtitles directory: %w", err) + } + + defer utils.CleanupWithErr( + &err, func() error { + return os.RemoveAll(attachmentWorkingDirectory) + }, + "failed to remove attachment working directory", + ) + + // Dump the attachments and subtitles cmd := exec.Command( "ffmpeg", "-dump_attachment:t", "", @@ -56,7 +101,7 @@ func extractSubs(info *MediaInfo) error { "-y", "-i", info.Path, ) - cmd.Dir = attachment_path + cmd.Dir = attachmentWorkingDirectory for _, sub := range info.Subtitles { if ext := sub.Extension; ext != nil { @@ -68,16 +113,39 @@ func extractSubs(info *MediaInfo) error { cmd.Args, "-map", fmt.Sprintf("0:s:%d", *sub.Index), "-c:s", "copy", - fmt.Sprintf("%s/%d.%s", subs_path, *sub.Index, *ext), + fmt.Sprintf("%s/%d.%s", subtitlesWorkingDirectory, *sub.Index, *ext), ) } } log.Printf("Starting extraction with the command: %s", cmd) cmd.Stdout = nil - err := cmd.Run() - if err != nil { + + if err := cmd.Run(); err != nil { fmt.Println("Error starting ffmpeg extract:", err) return err } + + // Move attachments and subtitles to the storage backend + var saveGroup errgroup.Group + saveGroup.Go(func() error { + err := storage.SaveFilesToBackend(ctx, s.storage, attachmentWorkingDirectory, filepath.Join(info.Sha, "att")) + if err != nil { + return fmt.Errorf("failed to save attachments to backend: %w", err) + } + return nil + }) + + saveGroup.Go(func() error { + err := storage.SaveFilesToBackend(ctx, s.storage, subtitlesWorkingDirectory, filepath.Join(info.Sha, "sub")) + if err != nil { + return fmt.Errorf("failed to save subtitles to backend: %w", err) + } + return nil + }) + + if err := saveGroup.Wait(); err != nil { + return fmt.Errorf("failed while saving files to backend: %w", err) + } + return nil } diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index 6be157af..c88ff75c 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -1,12 +1,15 @@ package src import ( + "context" "fmt" "log" "math" "os" "strings" "sync" + + "github.com/zoriya/kyoo/transcoder/src/utils" ) type FileStream struct { @@ -24,7 +27,7 @@ type VideoKey struct { quality Quality } -func (t *Transcoder) newFileStream(path string, sha string) *FileStream { +func (t *Transcoder) newFileStream(ctx context.Context, path string, sha string) *FileStream { ret := &FileStream{ transcoder: t, Out: fmt.Sprintf("%s/%s", Settings.Outpath, sha), @@ -35,7 +38,7 @@ func (t *Transcoder) newFileStream(path string, sha string) *FileStream { ret.ready.Add(1) go func() { defer ret.ready.Done() - info, err := t.metadataService.GetMetadata(path, sha) + info, err := t.metadataService.GetMetadata(ctx, path, sha) ret.Info = info if err != nil { ret.err = err @@ -107,7 +110,7 @@ func (fs *FileStream) GetMaster() string { } if def_video != nil { - qualities := Filter(Qualities, func(quality Quality) bool { + qualities := utils.Filter(Qualities, func(quality Quality) bool { return quality.Height() < def_video.Height }) transcode_count := len(qualities) diff --git a/transcoder/src/info.go b/transcoder/src/info.go index f480c160..60f04a3c 100644 --- a/transcoder/src/info.go +++ b/transcoder/src/info.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/zoriya/kyoo/transcoder/src/utils" "golang.org/x/text/language" "gopkg.in/vansante/go-ffprobe.v2" ) @@ -228,7 +229,7 @@ var SubtitleExtensions = map[string]string{ } func RetriveMediaInfo(path string, sha string) (*MediaInfo, error) { - defer printExecTime("mediainfo for %s", path)() + defer utils.PrintExecTime("mediainfo for %s", path)() ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second) defer cancelFn() diff --git a/transcoder/src/keyframes.go b/transcoder/src/keyframes.go index c13e1c15..b6858b59 100644 --- a/transcoder/src/keyframes.go +++ b/transcoder/src/keyframes.go @@ -11,6 +11,7 @@ import ( "sync" "github.com/lib/pq" + "github.com/zoriya/kyoo/transcoder/src/utils" ) const KeyframeVersion = 1 @@ -162,7 +163,7 @@ func (s *MetadataService) GetKeyframes(info *MediaInfo, isVideo bool, idx uint32 // Returns when all key frames are retrived (or an error occurs) // info.ready.Done() is called when more than 100 are retrived (or extraction is done) func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error { - defer printExecTime("ffprobe keyframe analysis for %s video n%d", path, video_idx)() + defer utils.PrintExecTime("ffprobe keyframe analysis for %s video n%d", path, video_idx)() // run ffprobe to return all IFrames, IFrames are points where we can split the video in segments. // We ask ffprobe to return the time of each frame and it's flags // We could ask it to return only i-frames (keyframes) with the -skip_frame nokey but using it is extremly slow @@ -254,7 +255,7 @@ const DummyKeyframeDuration = float64(4) // we can pretty much cut audio at any point so no need to get specific frames, just cut every 4s func getAudioKeyframes(info *MediaInfo, audio_idx uint32, kf *Keyframe) error { - defer printExecTime("ffprobe keyframe analysis for %s audio n%d", info.Path, audio_idx)() + defer utils.PrintExecTime("ffprobe keyframe analysis for %s audio n%d", info.Path, audio_idx)() // Format's duration CAN be different than audio's duration. To make sure we do not // miss a segment or make one more, we need to check the audio's duration. // diff --git a/transcoder/src/metadata.go b/transcoder/src/metadata.go index 6d7ffbc1..4c7f9801 100644 --- a/transcoder/src/metadata.go +++ b/transcoder/src/metadata.go @@ -1,17 +1,21 @@ package src import ( + "context" "database/sql" "encoding/base64" + "errors" "fmt" "net/url" "os" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/golang-migrate/migrate/v4" "github.com/golang-migrate/migrate/v4/database/postgres" _ "github.com/golang-migrate/migrate/v4/source/file" "github.com/lib/pq" - _ "github.com/lib/pq" + "github.com/zoriya/kyoo/transcoder/src/storage" ) type MetadataService struct { @@ -20,9 +24,60 @@ type MetadataService struct { thumbLock RunLock[string, interface{}] extractLock RunLock[string, interface{}] keyframeLock RunLock[KeyframeKey, *Keyframe] + storage storage.StorageBackend } func NewMetadataService() (*MetadataService, error) { + ctx := context.TODO() + + s := &MetadataService{ + lock: NewRunLock[string, *MediaInfo](), + thumbLock: NewRunLock[string, interface{}](), + extractLock: NewRunLock[string, interface{}](), + keyframeLock: NewRunLock[KeyframeKey, *Keyframe](), + } + + db, err := s.setupDb() + if err != nil { + return nil, fmt.Errorf("failed to setup database: %w", err) + } + s.database = db + + storage, err := s.setupStorage(ctx) + if err != nil { + return nil, fmt.Errorf("failed to setup storage: %w", err) + } + s.storage = storage + + return s, nil +} + +func (s *MetadataService) Close() error { + cleanupErrs := make([]error, 0, 2) + if s.database != nil { + err := s.database.Close() + if err != nil { + cleanupErrs = append(cleanupErrs, fmt.Errorf("failed to close database: %w", err)) + } + } + + if s.storage != nil { + if storageCloser, ok := s.storage.(storage.StorageBackendCloser); ok { + err := storageCloser.Close() + if err != nil { + cleanupErrs = append(cleanupErrs, fmt.Errorf("failed to close storage: %w", err)) + } + } + } + + if err := errors.Join(cleanupErrs...); err != nil { + return fmt.Errorf("failed to cleanup resources: %w", err) + } + + return nil +} + +func (s *MetadataService) setupDb() (*sql.DB, error) { schema := GetEnvOr("POSTGRES_SCHEMA", "gocoder") connectionString := os.Getenv("POSTGRES_URL") @@ -64,26 +119,44 @@ func NewMetadataService() (*MetadataService, error) { } m.Up() - return &MetadataService{ - database: db, - lock: NewRunLock[string, *MediaInfo](), - thumbLock: NewRunLock[string, interface{}](), - extractLock: NewRunLock[string, interface{}](), - keyframeLock: NewRunLock[KeyframeKey, *Keyframe](), - }, nil + return db, nil } -func (s *MetadataService) GetMetadata(path string, sha string) (*MediaInfo, error) { +func (s *MetadataService) setupStorage(ctx context.Context) (storage.StorageBackend, error) { + s3BucketName := os.Getenv("S3_BUCKET_NAME") + if s3BucketName != "" { + // Use S3 storage + // Create the client (use all standard AWS config sources like env vars, config files, etc.) + awsConfig, err := config.LoadDefaultConfig(ctx) + if err != nil { + return nil, fmt.Errorf("failed to load AWS config: %w", err) + } + s3Client := s3.NewFromConfig(awsConfig) + + return storage.NewS3StorageBackend(s3Client, s3BucketName), nil + } + + // Use local file storage + storageRoot := GetEnvOr("GOCODER_METADATA_ROOT", "/metadata") + + localStorage, err := storage.NewFileStorageBackend(storageRoot) + if err != nil { + return nil, fmt.Errorf("failed to create local storage backend: %w", err) + } + return localStorage, nil +} + +func (s *MetadataService) GetMetadata(ctx context.Context, path string, sha string) (*MediaInfo, error) { ret, err := s.getMetadata(path, sha) if err != nil { return nil, err } if ret.Versions.Thumbs < ThumbsVersion { - go s.ExtractThumbs(path, sha) + go s.ExtractThumbs(ctx, path, sha) } if ret.Versions.Extract < ExtractVersion { - go s.ExtractSubs(ret) + go s.ExtractSubs(ctx, ret) } if ret.Versions.Keyframes < KeyframeVersion && ret.Versions.Keyframes != 0 { for _, video := range ret.Videos { @@ -229,6 +302,10 @@ func (s *MetadataService) storeFreshMetadata(path string, sha string) (*MediaInf } tx, err := s.database.Begin() + if err != nil { + return set(ret, err) + } + // it needs to be a delete instead of a on conflict do update because we want to trigger delete casquade for // videos/audios & co. tx.Exec(`delete from info where path = $1`, path) diff --git a/transcoder/src/settings.go b/transcoder/src/settings.go index 474e7eb0..e5062ea6 100644 --- a/transcoder/src/settings.go +++ b/transcoder/src/settings.go @@ -15,7 +15,6 @@ func GetEnvOr(env string, def string) string { type SettingsT struct { Outpath string - Metadata string RoutePrefix string SafePath string HwAccel HwAccelT @@ -32,7 +31,6 @@ type HwAccelT struct { var Settings = SettingsT{ // we manually add a folder to make sure we do not delete user data. Outpath: path.Join(GetEnvOr("GOCODER_CACHE_ROOT", "/cache"), "kyoo_cache"), - Metadata: GetEnvOr("GOCODER_METADATA_ROOT", "/metadata"), RoutePrefix: GetEnvOr("GOCODER_PREFIX", ""), SafePath: GetEnvOr("GOCODER_SAFE_PATH", "/video"), HwAccel: DetectHardwareAccel(), diff --git a/transcoder/src/storage/filestorage.go b/transcoder/src/storage/filestorage.go new file mode 100644 index 00000000..b8617e40 --- /dev/null +++ b/transcoder/src/storage/filestorage.go @@ -0,0 +1,201 @@ +package storage + +import ( + "context" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + + "github.com/zoriya/kyoo/transcoder/src/utils" +) + +type FileStorageBackend struct { + // Base directory for the storage backend + baseDirectory string + baseDirectoryRoot *os.Root +} + +// NewFileStorageBackend creates a new FileStorageBackend with the specified base directory. +func NewFileStorageBackend(baseDirectory string) (*FileStorageBackend, error) { + // Attempt to create the directory if it doesn't exist + // This should be the only filesystem call in this file that does not use os.Root. + // This is to prevent directory traversal attacks when the provided input is untrusted. + if err := os.MkdirAll(baseDirectory, 0770); err != nil { + return nil, fmt.Errorf("failed to create storage base directory: %w", err) + } + + root, err := os.OpenRoot(baseDirectory) + if err != nil { + return nil, fmt.Errorf("failed to open storage base directory %q: %w", baseDirectory, err) + } + + return &FileStorageBackend{ + baseDirectory: baseDirectory, + baseDirectoryRoot: root, + }, nil +} + +func (fsb *FileStorageBackend) Close() error { + if fsb.baseDirectoryRoot != nil { + return fsb.baseDirectoryRoot.Close() + } + return nil +} + +// DoesItemExist checks if an item exists in the file storage backend. +func (fsb *FileStorageBackend) DoesItemExist(_ context.Context, path string) (bool, error) { + _, err := fsb.baseDirectoryRoot.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + + return false, fmt.Errorf("failed to check if item %q exists: %w", path, err) + } + + return true, nil +} + +// ListItemsWithPrefix returns a list of items in the storage backend that match the given prefix. +func (fsb *FileStorageBackend) ListItemsWithPrefix(_ context.Context, pathPrefix string) ([]string, error) { + var items []string + rootFS := fsb.baseDirectoryRoot.FS() + // This is split so that a smaller subset of files are checked, rather than literally everything under the base directory. + // All matching files for the path prefix will also have the prefixDirPath as their parent directory. + prefixDirPath := filepath.Dir(pathPrefix) + + err := fs.WalkDir(rootFS, prefixDirPath, func(path string, d fs.DirEntry, err error) error { + if err != nil { + // This can happen if prefixDirPath does not exist. The walk function will handle + // checking this. + if os.IsNotExist(err) { + return fs.SkipDir + } + return fmt.Errorf("failed on %q while walking directory %q: %w", path, pathPrefix, err) + } + + // If the path does not start with the prefix, skip it. + if !strings.HasPrefix(path, pathPrefix) { + if d.IsDir() { + // Skip directories that do not match the prefix + return fs.SkipDir + } + return nil + } + + // Collect matching non-directory items + if !d.IsDir() { + items = append(items, path) + } + + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to walk directory %q: %w", pathPrefix, err) + } + + return items, nil +} + +// DeleteItem deletes an item from the storage backend. If the item does not exist, it returns nil. +func (fsb *FileStorageBackend) DeleteItem(_ context.Context, path string) error { + err := fsb.baseDirectoryRoot.Remove(path) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to delete item %q: %w", path, err) + } + return nil +} + +// DeleteItemsWithPrefix deletes all items in the storage backend that match the given prefix. +// Deletion should be "syncronous" (i.e. the function should block until the write is complete). +func (fsb *FileStorageBackend) DeleteItemsWithPrefix(ctx context.Context, pathPrefix string) error { + // Unfortunately this implementation is needed until https://go-review.googlesource.com/c/go/+/661595 is released. + // The new os.Root type does not yet have a RemoveAll method. The next Go release will have this. + // Once RemoveAll is available, this function can be reduced to a single ReadDir call, a filter, and a RemoveAll call. + + // Get all items with the prefix + items, err := fsb.ListItemsWithPrefix(ctx, pathPrefix) + if err != nil { + return fmt.Errorf("failed to list items with prefix %q: %w", pathPrefix, err) + } + + // Delete all items. This will leave behind empty directories, but that shouldn't really matter. A future + // implementation that uses os.Root.RemoveAll will handle this. + for _, item := range items { + err = fsb.DeleteItem(ctx, item) + if err != nil { + return fmt.Errorf("failed to delete item %q: %w", item, err) + } + } + + return nil +} + +// SaveItemWithCallback saves an item to the storage backend. If the item already exists, it overwrites it. +// The writeContents function is called with a writer to write the contents of the item. +func (fsb *FileStorageBackend) SaveItemWithCallback(ctx context.Context, path string, writeContents ContentsWriterCallback) (err error) { + // Open the file for writing + file, err := fsb.openFileForWriting(path) + if err != nil { + return fmt.Errorf("failed to open %q for writing: %w", path, err) + } + defer utils.CleanupWithErr(&err, file.Close, "failed to close file %q", path) + + // Write the contents using the provided callback + if err := writeContents(ctx, file); err != nil { + return fmt.Errorf("failed to write contents to file %q: %w", path, err) + } + + return nil +} + +// SaveItem saves an item to the storage backend. If the item already exists, it overwrites it. +func (fsb *FileStorageBackend) SaveItem(ctx context.Context, path string, contents io.Reader) (err error) { + // Open the file for writing + file, err := fsb.openFileForWriting(path) + if err != nil { + return fmt.Errorf("failed to open %q for writing: %w", path, err) + } + defer utils.CleanupWithErr(&err, file.Close, "failed to close file %q", path) + + // Copy the contents to the file + if _, err := io.Copy(file, contents); err != nil { + return fmt.Errorf("failed to copy contents to file %q: %w", path, err) + } + + return nil +} + +// openFileForWriting opens a file for writing. If the file already exists, it overwrites it. +// The parent directory is created if it doesn't exist. +// This function is used internally to create files in the storage backend. +// The returned file should be closed by the caller. +func (fsb *FileStorageBackend) openFileForWriting(path string) (*os.File, error) { + // Create the parent directory if it doesn't exist + dir := filepath.Dir(path) + if err := fsb.baseDirectoryRoot.Mkdir(dir, 0770); err != nil { + return nil, fmt.Errorf("failed to create directory %q: %w", dir, err) + } + + // Open the file for writing + file, err := fsb.baseDirectoryRoot.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0660) + if err != nil { + return nil, fmt.Errorf("failed to create file %q: %w", path, err) + } + + return file, nil +} + +// GetItem retrieves an item from the storage backend. +func (fsb *FileStorageBackend) GetItem(_ context.Context, path string) (io.ReadCloser, error) { + file, err := fsb.baseDirectoryRoot.Open(path) + if err != nil { + return nil, fmt.Errorf("failed to open file %q: %w", path, err) + } + + return file, nil +} diff --git a/transcoder/src/storage/s3storage.go b/transcoder/src/storage/s3storage.go new file mode 100644 index 00000000..f441fea4 --- /dev/null +++ b/transcoder/src/storage/s3storage.go @@ -0,0 +1,226 @@ +package storage + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + + "github.com/aws/aws-sdk-go-v2/aws" + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/zoriya/kyoo/transcoder/src/utils" + "golang.org/x/sync/errgroup" +) + +var noSuchKeyError = &s3types.NoSuchKey{} + +// S3StorageBackend is a struct that implements the StorageBackend interface, +// backed by S3-compatible object storage. +// It is up to the user to ensure that the object storage service supports +// consistent reads, writes, and deletes. This known to be supported by: +// - AWS S3 +// - MinIO +// - Ceph RGW +// This is known to _not_ be supported by: +// - SeaweedFS +type S3StorageBackend struct { + s3Client *s3.Client + bucket string +} + +// NewS3StorageBackend creates a new S3StorageBackend with the specified bucket name and S3 client. +func NewS3StorageBackend(s3Client *s3.Client, bucket string) *S3StorageBackend { + return &S3StorageBackend{ + s3Client: s3Client, + bucket: bucket, + } +} + +// DoesItemExist checks if an item exists in the file storage backend. +func (ssb *S3StorageBackend) DoesItemExist(ctx context.Context, path string) (bool, error) { + _, err := ssb.s3Client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: &ssb.bucket, + Key: &path, + }) + if err != nil { + var responseError *awshttp.ResponseError + if errors.As(err, &responseError) && responseError.ResponseError.HTTPStatusCode() == http.StatusNotFound { + return false, nil + } + + return false, fmt.Errorf("failed to check if item %q exists in bucket %q: %w", path, ssb.bucket, err) + } + + return true, nil +} + +// ListItemsWithPrefix returns a list of items in the storage backend that match the given prefix. +func (ssb *S3StorageBackend) ListItemsWithPrefix(ctx context.Context, pathPrefix string) ([]string, error) { + listObjectsInput := &s3.ListObjectsV2Input{ + Bucket: &ssb.bucket, + Prefix: &pathPrefix, + } + + paginator := s3.NewListObjectsV2Paginator(ssb.s3Client, listObjectsInput) + + var items []string + for paginator.HasMorePages() { + resp, err := paginator.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list items with prefix %q in bucket %q: %w", pathPrefix, ssb.bucket, err) + } + + for _, item := range resp.Contents { + if item.Key != nil { + items = append(items, *item.Key) + } + } + } + + return items, nil +} + +// DeleteItem deletes an item from the storage backend. If the item does not exist, it returns nil. +func (ssb *S3StorageBackend) DeleteItem(ctx context.Context, path string) error { + _, err := ssb.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: &ssb.bucket, + Key: &path, + }) + if err != nil { + if errors.Is(err, noSuchKeyError) { + return nil + } + return fmt.Errorf("failed to delete item %q from bucket %q: %w", path, ssb.bucket, err) + } + + return nil +} + +// DeleteItemsWithPrefix deletes all items in the storage backend that match the given prefix. +// Deletion should be "syncronous" (i.e. the function should block until the write is complete). +func (ssb *S3StorageBackend) DeleteItemsWithPrefix(ctx context.Context, pathPrefix string) error { + // Get all items with the prefix + items, err := ssb.ListItemsWithPrefix(ctx, pathPrefix) + if err != nil { + return fmt.Errorf("failed to list items with prefix %q: %w", pathPrefix, err) + } + + // Fast path: if there are no items, return early. + if len(items) == 0 { + return nil + } + + // Delete all items. This uses the DeleteObjects API call, which is more efficient + // than deleting each item individually. + + // The API call supports up to 1000 items at a time, so chunk the items if needed. + chunkSize := min(len(items), 1000) + chunkItems := make([]s3types.ObjectIdentifier, chunkSize) + var deletionRequests errgroup.Group + + for i := range items { + item := items[i] + chunkIndex := i % chunkSize + + chunkItems[chunkIndex] = s3types.ObjectIdentifier{ + Key: &item, + } + + // If the chunk is full, delete the objects in this chunk. + if chunkIndex == chunkSize-1 { + deletionRequests.Go(func() error { + _, err := ssb.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ + Bucket: &ssb.bucket, + Delete: &s3types.Delete{ + Objects: chunkItems, + // Only include keys in the response that encountered an error. + Quiet: aws.Bool(true), + }, + }) + + if err != nil { + chunkNumber := 1 + i/chunkSize // Int division in Go rounds down. + // TODO if the error doesn't include sufficient information, the below line + // will need to pull in error details from the response. + return fmt.Errorf("failed to delete items in chunk %d with prefix %q: %w", chunkNumber, pathPrefix, err) + } + + return nil + }) + } + } + + err = deletionRequests.Wait() + if err != nil { + return fmt.Errorf("failed to delete one or more items with prefix %q: %w", pathPrefix, err) + } + + return nil +} + +// SaveItemWithCallback saves an item to the storage backend. If the item already exists, it overwrites it. +// The writeContents function is called with a writer to write the contents of the item. +func (ssb *S3StorageBackend) SaveItemWithCallback(ctx context.Context, path string, writeContents ContentsWriterCallback) (err error) { + // Create a pipe to connect the writer and reader. + pr, pw := io.Pipe() + defer utils.CleanupWithErr(&err, pr.Close, "failed to close pipe reader") + + // Start a goroutine to write to the pipe. + // Writing and reading must occur concurrently to avoid deadlocks. + + // Use a separate context for the writer to allow cancellation if the upload fails. This is important to avoid + // a hung goroutine leak if the upload fails. + writeCtx, cancel := context.WithCancel(ctx) + defer cancel() + + var writerGroup errgroup.Group + writerGroup.Go(func() (err error) { + defer utils.CleanupWithErr(&err, pw.Close, "failed to close pipe writer") + return writeContents(writeCtx, pw) + }) + + // Wait for the write to complete and check for errors. + // This should always happen even if saving fails, to prevent a goroutine leak. + defer utils.CleanupWithErr(&err, writerGroup.Wait, "writer callback failed") + + // Upload the object to S3 using the pipe as the body. + if err := ssb.SaveItem(ctx, path, pr); err != nil { + // Ensure that the writer context is cancelled prior to awaiting for the writer to finish. + // This is important to avoid a hung goroutine leak if the upload fails. + cancel() + return err + } + + return nil +} + +// SaveItem saves an item to the storage backend. If the item already exists, it overwrites it. +func (ssb *S3StorageBackend) SaveItem(ctx context.Context, path string, contents io.Reader) error { + // Upload the object to S3 using the provided reader as the body. + _, err := ssb.s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: &ssb.bucket, + Key: &path, + Body: contents, + }) + if err != nil { + return fmt.Errorf("failed to save item %q to bucket %q: %w", path, ssb.bucket, err) + } + + return nil +} + +// GetItem retrieves an item from the storage backend. +func (ssb *S3StorageBackend) GetItem(ctx context.Context, path string) (io.ReadCloser, error) { + resp, err := ssb.s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &ssb.bucket, + Key: &path, + }) + if err != nil { + return nil, fmt.Errorf("failed to get item %q from bucket %q: %w", path, ssb.bucket, err) + } + + return resp.Body, nil +} diff --git a/transcoder/src/storage/storage.go b/transcoder/src/storage/storage.go new file mode 100644 index 00000000..661ff5d9 --- /dev/null +++ b/transcoder/src/storage/storage.go @@ -0,0 +1,95 @@ +package storage + +import ( + "context" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + + "github.com/zoriya/kyoo/transcoder/src/utils" + "golang.org/x/sync/errgroup" +) + +type ContentsWriterCallback func(ctx context.Context, writer io.Writer) error + +// Handles storage and retrieval of items in a storage backend. +// "Items" are pieces of data that can be stored and retrieved. +// Paths with ".." are not allowed, and may result in unexpected behavior. +type StorageBackend interface { + // DoesItemExist checks if an item exists in the storage backend. + DoesItemExist(ctx context.Context, path string) (bool, error) + // ListItemsWithPrefix returns a list of items in the storage backend that match the given prefix. + // Note: returned items may have a "/" in them, e.g. "foo/bar/baz". + ListItemsWithPrefix(ctx context.Context, pathPrefix string) ([]string, error) + // DeleteItem deletes an item from the storage backend. If the item does not exist, it returns nil. + // Deletion should be "synchronous" (i.e. the function should block until the write is complete). + DeleteItem(ctx context.Context, path string) error + // DeleteItemsWithPrefix deletes all items in the storage backend that match the given prefix. + // Deletion should be "synchronous" (i.e. the function should block until the write is complete). + DeleteItemsWithPrefix(ctx context.Context, pathPrefix string) error + // SaveItemWithCallback saves an item to the storage backend. If the item already exists, it overwrites it. + // The writeContents function is called with a writer to write the contents of the item. + // Writes should be "synchronous" (i.e. the function should block until the write is complete). + SaveItemWithCallback(ctx context.Context, path string, writeContents ContentsWriterCallback) error + // SaveItem saves an item to the storage backend. If the item already exists, it overwrites it. + SaveItem(ctx context.Context, path string, contents io.Reader) error + // GetItem retrieves an item from the storage backend. + GetItem(ctx context.Context, path string) (io.ReadCloser, error) +} + +type StorageBackendCloser interface { + StorageBackend + // Close closes the storage backend. This should be called when the backend is no longer needed. + Close() error +} + +// SaveFilesToBackend is a helper function that saves files from the source directory to the backend storage. +func SaveFilesToBackend(ctx context.Context, backend StorageBackend, sourceDirectory, destinationBasePath string) error { + // Allow for save in parallel. Running in series can be slow with many files. + var saveGroup errgroup.Group + + err := filepath.WalkDir(sourceDirectory, func(sourcePath string, d fs.DirEntry, err error) error { + if err != nil { + return fmt.Errorf("error walking the path %q: %v", sourcePath, err) + } + + if d.IsDir() { + return nil + } + + // Path relative to the source directory + relativePath, err := filepath.Rel(sourceDirectory, sourcePath) + if err != nil { + return err + } + + // Destination path in the backend storage + destinationPath := filepath.Join(destinationBasePath, relativePath) + saveGroup.Go(func() (err error) { + file, err := os.Open(sourcePath) + if err != nil { + return fmt.Errorf("failed to open file %q: %w", sourcePath, err) + } + utils.CleanupWithErr(&err, file.Close, "failed to close file %q", sourcePath) + + if err := backend.SaveItem(ctx, destinationPath, file); err != nil { + return fmt.Errorf("failed to save file %q to backend: %w", sourcePath, err) + } + + return nil + }) + + return nil + }) + if err != nil { + return fmt.Errorf("failed while walking over files to save to the backend: %w", err) + } + + if err := saveGroup.Wait(); err != nil { + return fmt.Errorf("failed to save files to backend: %w", err) + } + + return nil +} diff --git a/transcoder/src/subtitles.go b/transcoder/src/subtitles.go index 11569299..e753be5b 100644 --- a/transcoder/src/subtitles.go +++ b/transcoder/src/subtitles.go @@ -8,6 +8,7 @@ import ( "regexp" "strings" + "github.com/zoriya/kyoo/transcoder/src/utils" "golang.org/x/text/language" ) @@ -72,7 +73,7 @@ outer: // "hi" by itself means a language code, but when combined with other lang flags it means Hearing Impaired. // In case Hindi was not detected before, but "hi" is present, assume it is Hindi. if sub.Language == nil { - hiCount := Count(flags, "hi") + hiCount := utils.Count(flags, "hi") if hiCount > 0 { languageStr := language.Hindi.String() sub.Language = &languageStr diff --git a/transcoder/src/thumbnails.go b/transcoder/src/thumbnails.go index c7184f83..05642f45 100644 --- a/transcoder/src/thumbnails.go +++ b/transcoder/src/thumbnails.go @@ -1,17 +1,19 @@ package src import ( + "context" "encoding/base64" "fmt" "image" "image/color" + "io" "log" "math" - "os" - "path/filepath" + "strings" "sync" "github.com/disintegration/imaging" + "github.com/zoriya/kyoo/transcoder/src/utils" "gitlab.com/opennota/screengen" ) @@ -29,33 +31,54 @@ type Thumbnail struct { const ThumbsVersion = 1 -func getThumbGlob(sha string) string { - return fmt.Sprintf("%s/%s/thumbs-v*.*", Settings.Metadata, sha) +// getThumbGlob returns the path prefix for all thumbnail files for a given sha. +func getThumbPrefix(sha string) string { + return sha } func getThumbPath(sha string) string { - return fmt.Sprintf("%s/%s/thumbs-v%d.png", Settings.Metadata, sha, ThumbsVersion) + return fmt.Sprintf("%s/thumbs-v%d.png", sha, ThumbsVersion) } func getThumbVttPath(sha string) string { - return fmt.Sprintf("%s/%s/thumbs-v%d.vtt", Settings.Metadata, sha, ThumbsVersion) + return fmt.Sprintf("%s/thumbs-v%d.vtt", sha, ThumbsVersion) } -func (s *MetadataService) GetThumb(path string, sha string) (string, string, error) { - _, err := s.ExtractThumbs(path, sha) +func (s *MetadataService) GetThumbVtt(ctx context.Context, path string, sha string) (io.ReadCloser, error) { + _, err := s.ExtractThumbs(ctx, path, sha) if err != nil { - return "", "", err + return nil, err } - return getThumbPath(sha), getThumbVttPath(sha), nil + + vttPath := getThumbVttPath(sha) + vtt, err := s.storage.GetItem(ctx, vttPath) + if err != nil { + return nil, fmt.Errorf("failed to get thumbnail vtt with path %q: %w", vttPath, err) + } + return vtt, nil } -func (s *MetadataService) ExtractThumbs(path string, sha string) (interface{}, error) { +func (s *MetadataService) GetThumbSprite(ctx context.Context, path string, sha string) (io.ReadCloser, error) { + _, err := s.ExtractThumbs(ctx, path, sha) + if err != nil { + return nil, err + } + + spritePath := getThumbPath(sha) + sprite, err := s.storage.GetItem(ctx, spritePath) + if err != nil { + return nil, fmt.Errorf("failed to get thumbnail sprite with path %q: %w", spritePath, err) + } + return sprite, nil +} + +func (s *MetadataService) ExtractThumbs(ctx context.Context, path string, sha string) (interface{}, error) { get_running, set := s.thumbLock.Start(sha) if get_running != nil { return get_running() } - err := extractThumbnail(path, sha) + err := s.extractThumbnail(ctx, path, sha) if err != nil { return set(nil, err) } @@ -63,12 +86,18 @@ func (s *MetadataService) ExtractThumbs(path string, sha string) (interface{}, e return set(nil, err) } -func extractThumbnail(path string, sha string) error { - defer printExecTime("extracting thumbnails for %s", path)() +func (s *MetadataService) extractThumbnail(ctx context.Context, path string, sha string) (err error) { + defer utils.PrintExecTime("extracting thumbnails for %s", path)() - os.MkdirAll(fmt.Sprintf("%s/%s", Settings.Metadata, sha), 0o755) + vttPath := getThumbVttPath(sha) + spritePath := getThumbPath(sha) + newItemPaths := []string{spritePath, vttPath} - if _, err := os.Stat(getThumbPath(sha)); err == nil { + doAllExist, err := s.doAllThumbnailFilesExist(ctx, newItemPaths) + if err != nil { + return fmt.Errorf("failed to check if thumbnail files exist: %w", err) + } + if doAllExist { return nil } @@ -77,7 +106,7 @@ func extractThumbnail(path string, sha string) error { log.Printf("Error reading video file: %v", err) return err } - defer gen.Close() + defer utils.CleanupWithErr(&err, gen.Close, "failed to close screengen generator") gen.Fast = true @@ -128,26 +157,49 @@ func extractThumbnail(path string, sha string) error { ) } - // Cleanup old versions of thumbnails - files, err := filepath.Glob(getThumbGlob(sha)) - if err == nil { - for _, f := range files { - // ignore errors - os.Remove(f) - } + // Cleanup old thumbnails + if err := s.storage.DeleteItemsWithPrefix(ctx, getThumbPrefix(sha)); err != nil { + return fmt.Errorf("failed to delete old thumbnails: %w", err) } - err = os.WriteFile(getThumbVttPath(sha), []byte(vtt), 0o644) + // Store the new items + // Thumbnail vtt + if err = s.storage.SaveItem(ctx, vttPath, strings.NewReader(vtt)); err != nil { + return fmt.Errorf("failed to save thumbnail vtt with path %q: %w", vttPath, err) + } + + // Thumbnail sprite + spriteFormat, err := imaging.FormatFromFilename(spritePath) if err != nil { return err } - err = imaging.Save(sprite, getThumbPath(sha)) + + err = s.storage.SaveItemWithCallback(ctx, spritePath, func(_ context.Context, writer io.Writer) error { + if err := imaging.Encode(writer, sprite, spriteFormat); err != nil { + return fmt.Errorf("failed to encode thumbnail sprite: %w", err) + } + return nil + }) if err != nil { - return err + return fmt.Errorf("failed to save thumbnail sprite with path %q: %w", spritePath, err) } + return nil } +func (s *MetadataService) doAllThumbnailFilesExist(ctx context.Context, filePaths []string) (bool, error) { + for _, filePath := range filePaths { + doesExist, err := s.storage.DoesItemExist(ctx, filePath) + if err != nil { + return false, fmt.Errorf("failed to check if thumbnail file %q exists: %w", filePath, err) + } + if !doesExist { + return false, nil + } + } + return true, nil +} + func tsToVttTime(ts int) string { return fmt.Sprintf("%02d:%02d:%02d.000", ts/3600, (ts/60)%60, ts%60) } diff --git a/transcoder/src/transcoder.go b/transcoder/src/transcoder.go index 7dc0e989..4ba0ccca 100644 --- a/transcoder/src/transcoder.go +++ b/transcoder/src/transcoder.go @@ -1,6 +1,7 @@ package src import ( + "context" "os" "path" ) @@ -36,9 +37,9 @@ func NewTranscoder(metadata *MetadataService) (*Transcoder, error) { return ret, nil } -func (t *Transcoder) getFileStream(path string, sha string) (*FileStream, error) { +func (t *Transcoder) getFileStream(ctx context.Context, path string, sha string) (*FileStream, error) { ret, _ := t.streams.GetOrCreate(sha, func() *FileStream { - return t.newFileStream(path, sha) + return t.newFileStream(ctx, path, sha) }) ret.ready.Wait() if ret.err != nil { @@ -48,8 +49,8 @@ func (t *Transcoder) getFileStream(path string, sha string) (*FileStream, error) return ret, nil } -func (t *Transcoder) GetMaster(path string, client string, sha string) (string, error) { - stream, err := t.getFileStream(path, sha) +func (t *Transcoder) GetMaster(ctx context.Context, path string, client string, sha string) (string, error) { + stream, err := t.getFileStream(ctx, path, sha) if err != nil { return "", err } @@ -66,13 +67,14 @@ func (t *Transcoder) GetMaster(path string, client string, sha string) (string, } func (t *Transcoder) GetVideoIndex( + ctx context.Context, path string, video uint32, quality Quality, client string, sha string, ) (string, error) { - stream, err := t.getFileStream(path, sha) + stream, err := t.getFileStream(ctx, path, sha) if err != nil { return "", err } @@ -89,12 +91,13 @@ func (t *Transcoder) GetVideoIndex( } func (t *Transcoder) GetAudioIndex( + ctx context.Context, path string, audio uint32, client string, sha string, ) (string, error) { - stream, err := t.getFileStream(path, sha) + stream, err := t.getFileStream(ctx, path, sha) if err != nil { return "", err } @@ -110,6 +113,7 @@ func (t *Transcoder) GetAudioIndex( } func (t *Transcoder) GetVideoSegment( + ctx context.Context, path string, video uint32, quality Quality, @@ -117,7 +121,7 @@ func (t *Transcoder) GetVideoSegment( client string, sha string, ) (string, error) { - stream, err := t.getFileStream(path, sha) + stream, err := t.getFileStream(ctx, path, sha) if err != nil { return "", err } @@ -134,13 +138,14 @@ func (t *Transcoder) GetVideoSegment( } func (t *Transcoder) GetAudioSegment( + ctx context.Context, path string, audio uint32, segment int32, client string, sha string, ) (string, error) { - stream, err := t.getFileStream(path, sha) + stream, err := t.getFileStream(ctx, path, sha) if err != nil { return "", err } diff --git a/transcoder/src/utils.go b/transcoder/src/utils.go deleted file mode 100644 index 2e9d4749..00000000 --- a/transcoder/src/utils.go +++ /dev/null @@ -1,38 +0,0 @@ -package src - -import ( - "fmt" - "log" - "time" -) - -func printExecTime(message string, args ...any) func() { - msg := fmt.Sprintf(message, args...) - start := time.Now() - log.Printf("Running %s", msg) - - return func() { - log.Printf("%s finished in %s", msg, time.Since(start)) - } -} - -func Filter[E any](s []E, f func(E) bool) []E { - s2 := make([]E, 0, len(s)) - for _, e := range s { - if f(e) { - s2 = append(s2, e) - } - } - return s2 -} - -// Count returns the number of elements in s that are equal to e. -func Count[S []E, E comparable](s S, e E) int { - var n int - for _, v := range s { - if v == e { - n++ - } - } - return n -} diff --git a/transcoder/src/utils/utils.go b/transcoder/src/utils/utils.go new file mode 100644 index 00000000..ad089230 --- /dev/null +++ b/transcoder/src/utils/utils.go @@ -0,0 +1,54 @@ +package utils + +import ( + "errors" + "fmt" + "log" + "time" +) + +func PrintExecTime(message string, args ...any) func() { + msg := fmt.Sprintf(message, args...) + start := time.Now() + log.Printf("Running %s", msg) + + return func() { + log.Printf("%s finished in %s", msg, time.Since(start)) + } +} + +func Filter[E any](s []E, f func(E) bool) []E { + s2 := make([]E, 0, len(s)) + for _, e := range s { + if f(e) { + s2 = append(s2, e) + } + } + return s2 +} + +// Count returns the number of elements in s that are equal to e. +func Count[S []E, E comparable](s S, e E) int { + var n int + for _, v := range s { + if v == e { + n++ + } + } + return n +} + +// CleanupWithErr runs a cleanup function and checks if it returns an error. +// If the cleanup function returns an error, it is joined with the original error +// and assigned to the original error pointer. +func CleanupWithErr(err *error, fn func() error, msg string, args ...any) { + cleanupErr := fn() + if err == nil { + return + } + + if cleanupErr != nil { + *err = fmt.Errorf("%s: %w", fmt.Sprintf(msg, args...), cleanupErr) + } + *err = errors.Join(*err, cleanupErr) +}