mirror of
https://github.com/zoriya/Kyoo.git
synced 2026-05-13 10:52:28 -04:00
Switch transcoder to pgx
This commit is contained in:
@@ -27,7 +27,7 @@ func (s *MetadataService) ExtractSubs(ctx context.Context, info *MediaInfo) (any
|
||||
log.Printf("Couldn't extract subs: %v", err)
|
||||
return set(nil, err)
|
||||
}
|
||||
_, err = s.database.Exec(`update gocoder.info set ver_extract = $2 where sha = $1`, info.Sha, ExtractVersion)
|
||||
_, err = s.database.Exec(ctx, `update gocoder.info set ver_extract = $2 where sha = $1`, info.Sha, ExtractVersion)
|
||||
return set(nil, err)
|
||||
}
|
||||
|
||||
|
||||
+59
-51
@@ -20,125 +20,133 @@ import (
|
||||
const InfoVersion = 3
|
||||
|
||||
type Versions struct {
|
||||
Info int32 `json:"info"`
|
||||
Extract int32 `json:"extract"`
|
||||
Thumbs int32 `json:"thumbs"`
|
||||
Keyframes int32 `json:"keyframes"`
|
||||
Info int32 `json:"info" db:"ver_info"`
|
||||
Extract int32 `json:"extract" db:"ver_extract"`
|
||||
Thumbs int32 `json:"thumbs" db:"ver_thumbs"`
|
||||
Keyframes int32 `json:"keyframes" db:"ver_keyframes"`
|
||||
}
|
||||
|
||||
type MediaInfo struct {
|
||||
// The sha1 of the video file.
|
||||
Sha string `json:"sha"`
|
||||
Sha string `json:"sha" db:"sha"`
|
||||
/// The internal path of the video file.
|
||||
Path string `json:"path"`
|
||||
Path string `json:"path" db:"path"`
|
||||
/// The extension currently used to store this video file
|
||||
Extension string `json:"extension"`
|
||||
/// The whole mimetype (defined as the RFC 6381). ex: `video/mp4; codecs="avc1.640028, mp4a.40.2"`
|
||||
MimeCodec *string `json:"mimeCodec"`
|
||||
Extension string `json:"extension" db:"extension"`
|
||||
/// The whole mimetype (defined as the RFC 6381). ex: `video/mp4; codecs=\"avc1.640028, mp4a.40.2\"`
|
||||
MimeCodec *string `json:"mimeCodec" db:"mime_codec"`
|
||||
/// The file size of the video file.
|
||||
Size int64 `json:"size"`
|
||||
Size int64 `json:"size" db:"size"`
|
||||
/// The length of the media in seconds.
|
||||
Duration float64 `json:"duration"`
|
||||
Duration float64 `json:"duration" db:"duration"`
|
||||
/// The container of the video file of this episode.
|
||||
Container *string `json:"container"`
|
||||
Container *string `json:"container" db:"container"`
|
||||
/// Version of the metadata. This can be used to invalidate older metadata from db if the extraction code has changed.
|
||||
Versions Versions `json:"versions"`
|
||||
Versions Versions `json:"versions" db:"versions"`
|
||||
|
||||
/// The list of videos if there are multiples.
|
||||
Videos []Video `json:"videos"`
|
||||
Videos []Video `json:"videos" db:"-"`
|
||||
/// The list of audio tracks.
|
||||
Audios []Audio `json:"audios"`
|
||||
Audios []Audio `json:"audios" db:"-"`
|
||||
/// The list of subtitles tracks.
|
||||
Subtitles []Subtitle `json:"subtitles"`
|
||||
Subtitles []Subtitle `json:"subtitles" db:"-"`
|
||||
/// The list of fonts that can be used to display subtitles.
|
||||
Fonts []string `json:"fonts"`
|
||||
Fonts []string `json:"fonts" db:"fonts"`
|
||||
/// The list of chapters. See Chapter for more information.
|
||||
Chapters []Chapter `json:"chapters"`
|
||||
Chapters []Chapter `json:"chapters" db:"-"`
|
||||
|
||||
/// lock used to read/set keyframes of video/audio
|
||||
lock sync.Mutex
|
||||
lock sync.Mutex `json:"-" db:"-"`
|
||||
}
|
||||
|
||||
type Video struct {
|
||||
Sha string `json:"-" db:"sha"`
|
||||
|
||||
/// The index of this track on the media.
|
||||
Index uint32 `json:"index"`
|
||||
Index uint32 `json:"index" db:"idx"`
|
||||
/// The title of the stream.
|
||||
Title *string `json:"title"`
|
||||
Title *string `json:"title" db:"title"`
|
||||
/// The language of this stream (as a ISO-639-2 language code)
|
||||
Language *string `json:"language"`
|
||||
Language *string `json:"language" db:"language"`
|
||||
/// The human readable codec name.
|
||||
Codec string `json:"codec"`
|
||||
Codec string `json:"codec" db:"codec"`
|
||||
/// The codec of this stream (defined as the RFC 6381).
|
||||
MimeCodec *string `json:"mimeCodec"`
|
||||
MimeCodec *string `json:"mimeCodec" db:"mime_codec"`
|
||||
/// The width of the video stream
|
||||
Width uint32 `json:"width"`
|
||||
Width uint32 `json:"width" db:"width"`
|
||||
/// The height of the video stream
|
||||
Height uint32 `json:"height"`
|
||||
Height uint32 `json:"height" db:"height"`
|
||||
/// The average bitrate of the video in bytes/s
|
||||
Bitrate uint32 `json:"bitrate"`
|
||||
Bitrate uint32 `json:"bitrate" db:"bitrate"`
|
||||
/// Is this stream the default one of it's type?
|
||||
IsDefault bool `json:"isDefault"`
|
||||
IsDefault bool `json:"isDefault" db:"is_default"`
|
||||
|
||||
/// Keyframes of this video
|
||||
Keyframes *Keyframe `json:"-"`
|
||||
}
|
||||
|
||||
type Audio struct {
|
||||
Sha string `json:"-" db:"sha"`
|
||||
|
||||
/// The index of this track on the media.
|
||||
Index uint32 `json:"index"`
|
||||
Index uint32 `json:"index" db:"idx"`
|
||||
/// The title of the stream.
|
||||
Title *string `json:"title"`
|
||||
Title *string `json:"title" db:"title"`
|
||||
/// The language of this stream (as a IETF-BCP-47 language code)
|
||||
Language *string `json:"language"`
|
||||
Language *string `json:"language" db:"language"`
|
||||
/// The human readable codec name.
|
||||
Codec string `json:"codec"`
|
||||
Codec string `json:"codec" db:"codec"`
|
||||
/// The codec of this stream (defined as the RFC 6381).
|
||||
MimeCodec *string `json:"mimeCodec"`
|
||||
MimeCodec *string `json:"mimeCodec" db:"mime_codec"`
|
||||
/// The average bitrate of the audio in bytes/s
|
||||
Bitrate uint32 `json:"bitrate"`
|
||||
Bitrate uint32 `json:"bitrate" db:"bitrate"`
|
||||
/// Is this stream the default one of it's type?
|
||||
IsDefault bool `json:"isDefault"`
|
||||
IsDefault bool `json:"isDefault" db:"is_default"`
|
||||
|
||||
/// Keyframes of this video
|
||||
Keyframes *Keyframe `json:"-"`
|
||||
}
|
||||
|
||||
type Subtitle struct {
|
||||
Sha string `json:"-" db:"sha"`
|
||||
|
||||
/// The index of this track on the media.
|
||||
Index *uint32 `json:"index"`
|
||||
Index *uint32 `json:"index" db:"idx"`
|
||||
/// The title of the stream.
|
||||
Title *string `json:"title"`
|
||||
Title *string `json:"title" db:"title"`
|
||||
/// The language of this stream (as a IETF-BCP-47 language code)
|
||||
Language *string `json:"language"`
|
||||
Language *string `json:"language" db:"language"`
|
||||
/// The codec of this stream.
|
||||
Codec string `json:"codec"`
|
||||
Codec string `json:"codec" db:"codec"`
|
||||
/// The codec of this stream (defined as the RFC 6381).
|
||||
MimeCodec *string `json:"mimeCodec"`
|
||||
MimeCodec *string `json:"mimeCodec" db:"mime_codec"`
|
||||
/// The extension for the codec.
|
||||
Extension *string `json:"extension"`
|
||||
Extension *string `json:"extension" db:"extension"`
|
||||
/// Is this stream the default one of it's type?
|
||||
IsDefault bool `json:"isDefault"`
|
||||
IsDefault bool `json:"isDefault" db:"is_default"`
|
||||
/// Is this stream tagged as forced?
|
||||
IsForced bool `json:"isForced"`
|
||||
IsForced bool `json:"isForced" db:"is_forced"`
|
||||
/// Is this stream tagged as hearing impaired?
|
||||
IsHearingImpaired bool `json:"isHearingImpaired"`
|
||||
IsHearingImpaired bool `json:"isHearingImpaired" db:"is_hearing_impaired"`
|
||||
/// Is this an external subtitle (as in stored in a different file)
|
||||
IsExternal bool `json:"isExternal"`
|
||||
IsExternal bool `json:"isExternal" db:"-"`
|
||||
/// Where the subtitle is stored (null if stored inside the video)
|
||||
Path *string `json:"path"`
|
||||
Path *string `json:"path" db:"-"`
|
||||
/// The link to access this subtitle.
|
||||
Link *string `json:"link"`
|
||||
Link *string `json:"link" db:"-"`
|
||||
}
|
||||
|
||||
type Chapter struct {
|
||||
Sha string `json:"-" db:"sha"`
|
||||
|
||||
/// The start time of the chapter (in second from the start of the episode).
|
||||
StartTime float32 `json:"startTime"`
|
||||
StartTime float32 `json:"startTime" db:"start_time"`
|
||||
/// The end time of the chapter (in second from the start of the episode).
|
||||
EndTime float32 `json:"endTime"`
|
||||
EndTime float32 `json:"endTime" db:"end_time"`
|
||||
/// The name of this chapter. This should be a human-readable name that could be presented to the user.
|
||||
Name string `json:"name"`
|
||||
Name string `json:"name" db:"name"`
|
||||
/// The type value is used to mark special chapters (openning/credits...)
|
||||
Type ChapterType `json:"type"`
|
||||
Type ChapterType `json:"type" db:"type"`
|
||||
}
|
||||
|
||||
type ChapterType string
|
||||
|
||||
@@ -2,6 +2,7 @@ package src
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
@@ -10,7 +11,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/zoriya/kyoo/transcoder/src/utils"
|
||||
)
|
||||
|
||||
@@ -75,12 +76,20 @@ func (kf *Keyframe) AddListener(callback func(keyframes []float64)) {
|
||||
kf.info.listeners = append(kf.info.listeners, callback)
|
||||
}
|
||||
|
||||
func (kf *Keyframe) Scan(src interface{}) error {
|
||||
var arr pq.Float64Array
|
||||
err := arr.Scan(src)
|
||||
func (kf *Keyframe) Scan(src any) error {
|
||||
var arr []float64
|
||||
|
||||
m := pgtype.NewMap()
|
||||
t, ok := m.TypeForValue(&arr)
|
||||
|
||||
if !ok {
|
||||
return errors.New("failed to parse keyframes")
|
||||
}
|
||||
err := m.Scan(t.OID, pgtype.BinaryFormatCode, src.([]byte), &arr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kf.Keyframes = arr
|
||||
kf.IsDone = true
|
||||
kf.info = &KeyframeInfo{}
|
||||
@@ -131,6 +140,7 @@ func (s *MetadataService) GetKeyframes(info *MediaInfo, isVideo bool, idx uint32
|
||||
info.lock.Unlock()
|
||||
|
||||
go func() {
|
||||
ctx := context.Background()
|
||||
var table string
|
||||
var err error
|
||||
if isVideo {
|
||||
@@ -147,15 +157,16 @@ func (s *MetadataService) GetKeyframes(info *MediaInfo, isVideo bool, idx uint32
|
||||
}
|
||||
|
||||
kf.info.ready.Wait()
|
||||
tx, _ := s.database.Begin()
|
||||
tx, _ := s.database.Begin(ctx)
|
||||
tx.Exec(
|
||||
ctx,
|
||||
fmt.Sprintf(`update %s set keyframes = $3 where sha = $1 and idx = $2`, table),
|
||||
info.Sha,
|
||||
idx,
|
||||
pq.Array(kf.Keyframes),
|
||||
kf.Keyframes,
|
||||
)
|
||||
tx.Exec(`update gocoder.info set ver_keyframes = $2 where sha = $1`, info.Sha, KeyframeVersion)
|
||||
err = tx.Commit()
|
||||
tx.Exec(ctx, `update gocoder.info set ver_keyframes = $2 where sha = $1`, info.Sha, KeyframeVersion)
|
||||
err = tx.Commit(ctx)
|
||||
if err != nil {
|
||||
log.Printf("Couldn't store keyframes on database: %v", err)
|
||||
}
|
||||
|
||||
+109
-106
@@ -2,24 +2,26 @@ package src
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/user"
|
||||
"strings"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
"github.com/golang-migrate/migrate/v4/database/postgres"
|
||||
pgxd "github.com/golang-migrate/migrate/v4/database/pgx/v5"
|
||||
_ "github.com/golang-migrate/migrate/v4/source/file"
|
||||
"github.com/lib/pq"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/jackc/pgx/v5/stdlib"
|
||||
"github.com/zoriya/kyoo/transcoder/src/storage"
|
||||
)
|
||||
|
||||
type MetadataService struct {
|
||||
database *sql.DB
|
||||
database *pgxpool.Pool
|
||||
lock RunLock[string, *MediaInfo]
|
||||
thumbLock RunLock[string, any]
|
||||
extractLock RunLock[string, any]
|
||||
@@ -53,51 +55,69 @@ func NewMetadataService() (*MetadataService, error) {
|
||||
}
|
||||
|
||||
func (s *MetadataService) Close() error {
|
||||
cleanupErrs := make([]error, 0, 2)
|
||||
if s.database != nil {
|
||||
err := s.database.Close()
|
||||
if err != nil {
|
||||
cleanupErrs = append(cleanupErrs, fmt.Errorf("failed to close database: %w", err))
|
||||
}
|
||||
s.database.Close()
|
||||
}
|
||||
|
||||
if s.storage != nil {
|
||||
if storageCloser, ok := s.storage.(storage.StorageBackendCloser); ok {
|
||||
err := storageCloser.Close()
|
||||
if err != nil {
|
||||
cleanupErrs = append(cleanupErrs, fmt.Errorf("failed to close storage: %w", err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := errors.Join(cleanupErrs...); err != nil {
|
||||
return fmt.Errorf("failed to cleanup resources: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MetadataService) setupDb() (*sql.DB, error) {
|
||||
func (s *MetadataService) setupDb() (*pgxpool.Pool, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
connectionString := os.Getenv("POSTGRES_URL")
|
||||
if connectionString == "" {
|
||||
connectionString = fmt.Sprintf(
|
||||
"postgresql://%v:%v@%v:%v/%v?application_name=gocoder&sslmode=%s",
|
||||
url.QueryEscape(os.Getenv("POSTGRES_USER")),
|
||||
url.QueryEscape(os.Getenv("POSTGRES_PASSWORD")),
|
||||
url.QueryEscape(os.Getenv("POSTGRES_SERVER")),
|
||||
url.QueryEscape(os.Getenv("POSTGRES_PORT")),
|
||||
url.QueryEscape(os.Getenv("POSTGRES_DB")),
|
||||
url.QueryEscape(GetEnvOr("POSTGRES_SSLMODE", "disable")),
|
||||
)
|
||||
config, err := pgxpool.ParseConfig(connectionString)
|
||||
if err != nil {
|
||||
return nil, errors.New("failed to create postgres config from environment variables")
|
||||
}
|
||||
|
||||
db, err := sql.Open("postgres", connectionString)
|
||||
// Set default values
|
||||
if config.ConnConfig.Host == "/tmp" {
|
||||
config.ConnConfig.Host = "postgres"
|
||||
}
|
||||
if config.ConnConfig.Database == "" {
|
||||
config.ConnConfig.Database = "kyoo"
|
||||
}
|
||||
// The pgx library will set the username to the name of the current user if not provided via
|
||||
// environment variable or connection string. Make a best-effort attempt to see if the user
|
||||
// was explicitly specified, without implementing full connection string parsing. If not, set
|
||||
// the username to the default value of "kyoo".
|
||||
if os.Getenv("PGUSER") == "" {
|
||||
currentUserName, _ := user.Current()
|
||||
// If the username matches the current user and it's not in the connection string, then it was set
|
||||
// by the pgx library. This doesn't cover the case where the system username happens to be in some other part
|
||||
// of the connection string, but this cannot be checked without full connection string parsing.
|
||||
if currentUserName.Username == config.ConnConfig.User && !strings.Contains(connectionString, currentUserName.Username) {
|
||||
config.ConnConfig.User = "kyoo"
|
||||
}
|
||||
}
|
||||
if _, ok := config.ConnConfig.RuntimeParams["application_name"]; !ok {
|
||||
config.ConnConfig.RuntimeParams["application_name"] = "gocoder"
|
||||
}
|
||||
|
||||
db, err := pgxpool.NewWithConfig(ctx, config)
|
||||
if err != nil {
|
||||
fmt.Printf("Could not connect to database, check your env variables!")
|
||||
fmt.Printf("Could not connect to database, check your env variables!\n")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
driver, err := postgres.WithInstance(db, &postgres.Config{})
|
||||
fmt.Println("Migrating database")
|
||||
dbi := stdlib.OpenDBFromPool(db)
|
||||
defer dbi.Close()
|
||||
|
||||
dbi.Exec("create schema if not exists gocoder")
|
||||
driver, err := pgxd.WithInstance(dbi, &pgxd.Config{
|
||||
SchemaName: "gocoder",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -106,6 +126,7 @@ func (s *MetadataService) setupDb() (*sql.DB, error) {
|
||||
return nil, err
|
||||
}
|
||||
m.Up()
|
||||
fmt.Println("Migrating finished")
|
||||
|
||||
return db, nil
|
||||
}
|
||||
@@ -135,7 +156,7 @@ func (s *MetadataService) setupStorage(ctx context.Context) (storage.StorageBack
|
||||
}
|
||||
|
||||
func (s *MetadataService) GetMetadata(ctx context.Context, path string, sha string) (*MediaInfo, error) {
|
||||
ret, err := s.getMetadata(path, sha)
|
||||
ret, err := s.getMetadata(ctx, path, sha)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -153,14 +174,14 @@ func (s *MetadataService) GetMetadata(ctx context.Context, path string, sha stri
|
||||
for _, audio := range ret.Audios {
|
||||
audio.Keyframes = nil
|
||||
}
|
||||
tx, err := s.database.Begin()
|
||||
tx, err := s.database.Begin(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tx.Exec(`update gocoder.videos set keyframes = null where sha = $1`, sha)
|
||||
tx.Exec(`update gocoder.audios set keyframes = null where sha = $1`, sha)
|
||||
tx.Exec(`update gocoder.info set ver_keyframes = 0 where sha = $1`, sha)
|
||||
err = tx.Commit()
|
||||
tx.Exec(ctx, `update gocoder.videos set keyframes = null where sha = $1`, sha)
|
||||
tx.Exec(ctx, `update gocoder.audios set keyframes = null where sha = $1`, sha)
|
||||
tx.Exec(ctx, `update gocoder.info set ver_keyframes = 0 where sha = $1`, sha)
|
||||
err = tx.Commit(ctx)
|
||||
if err != nil {
|
||||
fmt.Printf("error deleting old keyframes from database: %v", err)
|
||||
}
|
||||
@@ -169,79 +190,60 @@ func (s *MetadataService) GetMetadata(ctx context.Context, path string, sha stri
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (s *MetadataService) getMetadata(path string, sha string) (*MediaInfo, error) {
|
||||
var ret MediaInfo
|
||||
var fonts pq.StringArray
|
||||
err := s.database.QueryRow(
|
||||
`select i.sha, i.path, i.extension, i.mime_codec, i.size, i.duration, i.container,
|
||||
i.fonts, i.ver_info, i.ver_extract, i.ver_thumbs, i.ver_keyframes
|
||||
from gocoder.info as i where i.sha=$1`,
|
||||
func (s *MetadataService) getMetadata(ctx context.Context, path string, sha string) (*MediaInfo, error) {
|
||||
rows, _ := s.database.Query(
|
||||
ctx,
|
||||
`select
|
||||
i.sha, i.path, i.extension, i.mime_codec, i.size, i.duration, i.container, i.fonts,
|
||||
jsonb_build_object(
|
||||
'info', i.ver_info,
|
||||
'extract', i.ver_extract,
|
||||
'thumbs', i.ver_thumbs,
|
||||
'keyframes', i.ver_keyframes
|
||||
) as versions
|
||||
from gocoder.info as i
|
||||
where i.sha=$1 limit 1`,
|
||||
sha,
|
||||
).Scan(
|
||||
&ret.Sha, &ret.Path, &ret.Extension, &ret.MimeCodec, &ret.Size, &ret.Duration, &ret.Container,
|
||||
&fonts, &ret.Versions.Info, &ret.Versions.Extract, &ret.Versions.Thumbs, &ret.Versions.Keyframes,
|
||||
)
|
||||
ret.Fonts = fonts
|
||||
ret.Videos = make([]Video, 0)
|
||||
ret.Audios = make([]Audio, 0)
|
||||
ret.Subtitles = make([]Subtitle, 0)
|
||||
ret.Chapters = make([]Chapter, 0)
|
||||
ret, err := pgx.CollectOneRow(rows, pgx.RowToStructByName[MediaInfo])
|
||||
|
||||
if err == sql.ErrNoRows || (ret.Versions.Info < InfoVersion && ret.Versions.Info != 0) {
|
||||
return s.storeFreshMetadata(path, sha)
|
||||
if errors.Is(err, pgx.ErrNoRows) || (ret.Versions.Info < InfoVersion && ret.Versions.Info != 0) {
|
||||
return s.storeFreshMetadata(ctx, path, sha)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := s.database.Query(
|
||||
`select v.idx, v.title, v.language, v.codec, v.mime_codec, v.width, v.height, v.bitrate, v.is_default, v.keyframes
|
||||
from videos as v where v.sha=$1`,
|
||||
rows, _ = s.database.Query(
|
||||
ctx,
|
||||
`select * from gocoder.videos as v where v.sha=$1`,
|
||||
sha,
|
||||
)
|
||||
ret.Videos, err = pgx.CollectRows(rows, pgx.RowToStructByName[Video])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for rows.Next() {
|
||||
var v Video
|
||||
err := rows.Scan(&v.Index, &v.Title, &v.Language, &v.Codec, &v.MimeCodec, &v.Width, &v.Height, &v.Bitrate, &v.IsDefault, &v.Keyframes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret.Videos = append(ret.Videos, v)
|
||||
}
|
||||
|
||||
rows, err = s.database.Query(
|
||||
`select a.idx, a.title, a.language, a.codec, a.mime_codec, a.bitrate, a.is_default, a.keyframes
|
||||
from audios as a where a.sha=$1`,
|
||||
rows, _ = s.database.Query(
|
||||
ctx,
|
||||
`select * from gocoder.audios as a where a.sha=$1`,
|
||||
sha,
|
||||
)
|
||||
ret.Audios, err = pgx.CollectRows(rows, pgx.RowToStructByName[Audio])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for rows.Next() {
|
||||
var a Audio
|
||||
err := rows.Scan(&a.Index, &a.Title, &a.Language, &a.Codec, &a.MimeCodec, &a.Bitrate, &a.IsDefault, &a.Keyframes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret.Audios = append(ret.Audios, a)
|
||||
}
|
||||
|
||||
rows, err = s.database.Query(
|
||||
`select s.idx, s.title, s.language, s.codec, s.mime_codec, s.extension, s.is_default, s.is_forced, s.is_hearing_impaired
|
||||
from subtitles as s where s.sha=$1`,
|
||||
rows, _ = s.database.Query(
|
||||
ctx,
|
||||
`select * from gocoder.subtitles as s where s.sha=$1`,
|
||||
sha,
|
||||
)
|
||||
ret.Subtitles, err = pgx.CollectRows(rows, pgx.RowToStructByName[Subtitle])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for rows.Next() {
|
||||
var s Subtitle
|
||||
err := rows.Scan(&s.Index, &s.Title, &s.Language, &s.Codec, &s.MimeCodec, &s.Extension, &s.IsDefault, &s.IsForced, &s.IsHearingImpaired)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i, s := range ret.Subtitles {
|
||||
if s.Extension != nil {
|
||||
link := fmt.Sprintf(
|
||||
"/video/%s/subtitle/%d.%s",
|
||||
@@ -249,35 +251,27 @@ func (s *MetadataService) getMetadata(path string, sha string) (*MediaInfo, erro
|
||||
*s.Index,
|
||||
*s.Extension,
|
||||
)
|
||||
s.Link = &link
|
||||
ret.Subtitles[i].Link = &link
|
||||
}
|
||||
ret.Subtitles = append(ret.Subtitles, s)
|
||||
}
|
||||
err = ret.SearchExternalSubtitles()
|
||||
if err != nil {
|
||||
fmt.Printf("Couldn't find external subtitles: %v", err)
|
||||
}
|
||||
|
||||
rows, err = s.database.Query(
|
||||
`select c.start_time, c.end_time, c.name, c.type
|
||||
from chapters as c where c.sha=$1`,
|
||||
rows, _ = s.database.Query(
|
||||
ctx,
|
||||
`select * from gocoder.chapters as c where c.sha=$1`,
|
||||
sha,
|
||||
)
|
||||
ret.Chapters, err = pgx.CollectRows(rows, pgx.RowToStructByName[Chapter])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for rows.Next() {
|
||||
var c Chapter
|
||||
err := rows.Scan(&c.StartTime, &c.EndTime, &c.Name, &c.Type)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret.Chapters = append(ret.Chapters, c)
|
||||
}
|
||||
return &ret, nil
|
||||
}
|
||||
|
||||
func (s *MetadataService) storeFreshMetadata(path string, sha string) (*MediaInfo, error) {
|
||||
func (s *MetadataService) storeFreshMetadata(ctx context.Context, path string, sha string) (*MediaInfo, error) {
|
||||
get_running, set := s.lock.Start(sha)
|
||||
if get_running != nil {
|
||||
return get_running()
|
||||
@@ -288,25 +282,28 @@ func (s *MetadataService) storeFreshMetadata(path string, sha string) (*MediaInf
|
||||
return set(nil, err)
|
||||
}
|
||||
|
||||
tx, err := s.database.Begin()
|
||||
tx, err := s.database.Begin(ctx)
|
||||
if err != nil {
|
||||
return set(ret, err)
|
||||
}
|
||||
|
||||
// it needs to be a delete instead of a on conflict do update because we want to trigger delete casquade for
|
||||
// videos/audios & co.
|
||||
tx.Exec(`delete from gocoder.info where path = $1`, path)
|
||||
tx.Exec(`
|
||||
tx.Exec(ctx, `delete from gocoder.info where path = $1`, path)
|
||||
tx.Exec(ctx,
|
||||
`
|
||||
insert into gocoder.info(sha, path, extension, mime_codec, size, duration, container,
|
||||
fonts, ver_info, ver_extract, ver_thumbs, ver_keyframes)
|
||||
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
||||
`,
|
||||
// on conflict do not update versions of extract/thumbs/keyframes
|
||||
ret.Sha, ret.Path, ret.Extension, ret.MimeCodec, ret.Size, ret.Duration, ret.Container,
|
||||
pq.Array(ret.Fonts), ret.Versions.Info, ret.Versions.Extract, ret.Versions.Thumbs, ret.Versions.Keyframes,
|
||||
ret.Fonts, ret.Versions.Info, ret.Versions.Extract, ret.Versions.Thumbs, ret.Versions.Keyframes,
|
||||
)
|
||||
for _, v := range ret.Videos {
|
||||
tx.Exec(`
|
||||
tx.Exec(
|
||||
ctx,
|
||||
`
|
||||
insert into gocoder.videos(sha, idx, title, language, codec, mime_codec, width, height, is_default, bitrate)
|
||||
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
on conflict (sha, idx) do update set
|
||||
@@ -325,7 +322,9 @@ func (s *MetadataService) storeFreshMetadata(path string, sha string) (*MediaInf
|
||||
)
|
||||
}
|
||||
for _, a := range ret.Audios {
|
||||
tx.Exec(`
|
||||
tx.Exec(
|
||||
ctx,
|
||||
`
|
||||
insert into gocoder.audios(sha, idx, title, language, codec, mime_codec, is_default, bitrate)
|
||||
values ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
on conflict (sha, idx) do update set
|
||||
@@ -342,7 +341,9 @@ func (s *MetadataService) storeFreshMetadata(path string, sha string) (*MediaInf
|
||||
)
|
||||
}
|
||||
for _, s := range ret.Subtitles {
|
||||
tx.Exec(`
|
||||
tx.Exec(
|
||||
ctx,
|
||||
`
|
||||
insert into gocoder.subtitles(sha, idx, title, language, codec, mime_codec, extension, is_default, is_forced, is_hearing_impaired)
|
||||
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
on conflict (sha, idx) do update set
|
||||
@@ -361,7 +362,9 @@ func (s *MetadataService) storeFreshMetadata(path string, sha string) (*MediaInf
|
||||
)
|
||||
}
|
||||
for _, c := range ret.Chapters {
|
||||
tx.Exec(`
|
||||
tx.Exec(
|
||||
ctx,
|
||||
`
|
||||
insert into gocoder.chapters(sha, start_time, end_time, name, type)
|
||||
values ($1, $2, $3, $4, $5)
|
||||
on conflict (sha, start_time) do update set
|
||||
@@ -374,7 +377,7 @@ func (s *MetadataService) storeFreshMetadata(path string, sha string) (*MediaInf
|
||||
ret.Sha, c.StartTime, c.EndTime, c.Name, c.Type,
|
||||
)
|
||||
}
|
||||
err = tx.Commit()
|
||||
err = tx.Commit(ctx)
|
||||
if err != nil {
|
||||
return set(ret, err)
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ func (s *MetadataService) ExtractThumbs(ctx context.Context, path string, sha st
|
||||
if err != nil {
|
||||
return set(nil, err)
|
||||
}
|
||||
_, err = s.database.Exec(`update gocoder.info set ver_thumbs = $2 where sha = $1`, sha, ThumbsVersion)
|
||||
_, err = s.database.Exec(ctx, `update gocoder.info set ver_thumbs = $2 where sha = $1`, sha, ThumbsVersion)
|
||||
return set(nil, err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user