Fix transcode wait time on extremly slow devices

This commit is contained in:
Zoe Roux 2024-02-09 16:23:49 +01:00
parent 7aca2b2d6d
commit bb29b7e7f7
2 changed files with 46 additions and 24 deletions

View File

@ -31,19 +31,29 @@ type Stream struct {
// You can also wait for it to be ready (non-blocking if already ready) by doing:
// <-ts.segments[i]
segments []chan (struct{})
heads []int32
commands []*exec.Cmd
heads []Head
// the lock used for the segments array and the heads
lock sync.RWMutex
}
type Head struct {
segment int32
end int32
command *exec.Cmd
}
var DeletedHead = Head{
segment: -1,
end: -1,
command: nil,
}
func NewStream(file *FileStream, handle StreamHandle) Stream {
ret := Stream{
handle: handle,
file: file,
segments: make([]chan struct{}, len(file.Keyframes)),
heads: make([]int32, 0),
commands: make([]*exec.Cmd, 0),
heads: make([]Head, 0),
}
for seg := range ret.segments {
ret.segments[seg] = make(chan struct{})
@ -63,6 +73,15 @@ func (ts *Stream) isSegmentReady(segment int32) bool {
}
}
func (ts *Stream) isSegmentTranscoding(segment int32) bool {
for _, head := range ts.heads {
if head.segment == segment {
return true
}
}
return false
}
func (ts *Stream) run(start int32) error {
// Start the transcode up to the 100th segment (or less)
// Stop at the first finished segment
@ -75,9 +94,7 @@ func (ts *Stream) run(start int32) error {
}
}
encoder_id := len(ts.heads)
ts.heads = append(ts.heads, start)
// we set nil while the command has not started, this is just to reserve the index
ts.commands = append(ts.commands, nil)
ts.heads = append(ts.heads, Head{segment: start, end: end, command: nil})
ts.lock.Unlock()
log.Printf(
@ -147,7 +164,7 @@ func (ts *Stream) run(start int32) error {
return err
}
ts.lock.Lock()
ts.commands[encoder_id] = cmd
ts.heads[encoder_id].command = cmd
ts.lock.Unlock()
go func() {
@ -160,7 +177,7 @@ func (ts *Stream) run(start int32) error {
_, _ = fmt.Sscanf(scanner.Text(), format, &segment)
ts.lock.Lock()
ts.heads[encoder_id] = segment
ts.heads[encoder_id].segment = segment
if ts.isSegmentReady(segment) {
// the current segment is already marked at done so another process has already gone up to here.
cmd.Process.Signal(os.Interrupt)
@ -203,8 +220,7 @@ func (ts *Stream) run(start int32) error {
ts.lock.Lock()
defer ts.lock.Unlock()
// we can't delete the head directly because it would invalidate the others encoder_id
ts.heads[encoder_id] = -1
ts.commands[encoder_id] = nil
ts.heads[encoder_id] = DeletedHead
}()
return nil
@ -233,12 +249,19 @@ func (ts *Stream) GetSegment(segment int32) (string, error) {
ready := ts.isSegmentReady(segment)
// we want to calculate distance in the same lock else it can be funky
distance := 0.
is_scheduled := false
if !ready {
distance = ts.getMinEncoderDistance(segment)
} else if !slices.Contains(ts.heads, segment) {
for _, head := range ts.heads {
if head.segment <= segment && segment < head.end {
is_scheduled = true
break
}
}
} else if !ts.isSegmentTranscoding(segment) {
// if the current segment is ready, check next segments are ready and if not start a transcode for them
for i := segment + 1; i <= min(segment+10, int32(len(ts.segments)-1)); i++ {
if slices.Contains(ts.heads, i) {
if ts.isSegmentTranscoding(i) {
// no need to create a new transcoder if there is already one running for this segment
break
}
@ -253,7 +276,7 @@ func (ts *Stream) GetSegment(segment int32) (string, error) {
if !ready {
// Only start a new encode if there is too big a distance between the current encoder and the segment.
if distance > 60 {
if distance > 60 || !is_scheduled {
log.Printf("Creating new head for %d since closest head is %fs aways", segment, distance)
err := ts.run(segment)
if err != nil {
@ -278,12 +301,12 @@ func (ts *Stream) GetSegment(segment int32) (string, error) {
func (ts *Stream) getMinEncoderDistance(segment int32) float64 {
time := ts.file.Keyframes[segment]
distances := Map(ts.heads, func(i int32, _ int) float64 {
distances := Map(ts.heads, func(head Head, _ int) float64 {
// ignore killed heads or heads after the current time
if i < 0 || ts.file.Keyframes[i] > time {
if head.segment < 0 || ts.file.Keyframes[head.segment] > time {
return math.Inf(1)
}
return time - ts.file.Keyframes[i]
return time - ts.file.Keyframes[head.segment]
})
if len(distances) == 0 {
return math.Inf(1)
@ -295,17 +318,16 @@ func (ts *Stream) Kill() {
ts.lock.Lock()
defer ts.lock.Unlock()
for id := range ts.commands {
for id := range ts.heads {
ts.KillHead(id)
}
}
// Stream assume to be locked
func (ts *Stream) KillHead(encoder_id int) {
if ts.commands[encoder_id] == nil {
if ts.heads[encoder_id] == DeletedHead {
return
}
ts.commands[encoder_id].Process.Signal(os.Interrupt)
ts.commands[encoder_id] = nil
ts.heads[encoder_id] = -1
ts.heads[encoder_id].command.Process.Signal(os.Interrupt)
ts.heads[encoder_id] = DeletedHead
}

View File

@ -182,9 +182,9 @@ func (t *Tracker) killOrphanedeheads(stream *Stream) {
if info.head == -1 {
continue
}
distance = min(Abs(info.head-head), distance)
distance = min(Abs(info.head-head.segment), distance)
}
if distance > 100 {
if distance > 20 {
log.Printf("Killing orphaned head %s %d", stream.file.Path, encoder_id)
stream.KillHead(encoder_id)
}