mirror of
https://github.com/zoriya/Kyoo.git
synced 2025-05-31 12:14:46 -04:00
Cleanup next segment preparation
This commit is contained in:
parent
3f9446d46f
commit
4993d34835
@ -93,11 +93,19 @@ func (ts *Stream) run(start int32) error {
|
|||||||
end := min(start+3, int32(len(ts.file.Keyframes)))
|
end := min(start+3, int32(len(ts.file.Keyframes)))
|
||||||
ts.lock.Lock()
|
ts.lock.Lock()
|
||||||
for i := start; i < end; i++ {
|
for i := start; i < end; i++ {
|
||||||
if ts.isSegmentReady(i) {
|
if ts.isSegmentReady(i) || ts.isSegmentTranscoding(i) {
|
||||||
end = i
|
end = i
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if start == end {
|
||||||
|
// this can happens if the start segment was finished between the check
|
||||||
|
// to call run() and the actual call.
|
||||||
|
// since most checks are done in a RLock() instead of a Lock() this can
|
||||||
|
// happens when two goroutines try to make the same segment ready
|
||||||
|
ts.lock.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
encoder_id := len(ts.heads)
|
encoder_id := len(ts.heads)
|
||||||
ts.heads = append(ts.heads, Head{segment: start, end: end, command: nil})
|
ts.heads = append(ts.heads, Head{segment: start, end: end, command: nil})
|
||||||
ts.lock.Unlock()
|
ts.lock.Unlock()
|
||||||
@ -211,6 +219,7 @@ func (ts *Stream) run(start int32) error {
|
|||||||
}
|
}
|
||||||
ts.lock.Lock()
|
ts.lock.Lock()
|
||||||
ts.heads[encoder_id].segment = segment
|
ts.heads[encoder_id].segment = segment
|
||||||
|
log.Printf("Segment %d got ready (%d)", segment, encoder_id)
|
||||||
if ts.isSegmentReady(segment) {
|
if ts.isSegmentReady(segment) {
|
||||||
// the current segment is already marked at done so another process has already gone up to here.
|
// the current segment is already marked at done so another process has already gone up to here.
|
||||||
cmd.Process.Signal(os.Interrupt)
|
cmd.Process.Signal(os.Interrupt)
|
||||||
@ -244,11 +253,11 @@ func (ts *Stream) run(start int32) error {
|
|||||||
go func() {
|
go func() {
|
||||||
err := cmd.Wait()
|
err := cmd.Wait()
|
||||||
if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ExitCode() == 255 {
|
if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ExitCode() == 255 {
|
||||||
log.Println("ffmpeg was killed by us")
|
log.Printf("ffmpeg %d was killed by us", encoder_id)
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
log.Println("ffmpeg occured an error", err, stderr.String())
|
log.Printf("ffmpeg %d occured an error: %s: %s", encoder_id, err, stderr.String())
|
||||||
} else {
|
} else {
|
||||||
log.Println("ffmpeg finished successfully")
|
log.Printf("ffmpeg %d finished successfully", encoder_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
ts.lock.Lock()
|
ts.lock.Lock()
|
||||||
@ -292,19 +301,6 @@ func (ts *Stream) GetSegment(segment int32) (string, error) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if !ts.isSegmentTranscoding(segment) {
|
|
||||||
// if the current segment is ready, check next segments are ready and if not start a transcode for them
|
|
||||||
for i := segment + 1; i <= min(segment+10, int32(len(ts.segments)-1)); i++ {
|
|
||||||
if ts.isSegmentTranscoding(i) {
|
|
||||||
// no need to create a new transcoder if there is already one running for this segment
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if !ts.isSegmentReady(i) {
|
|
||||||
log.Printf("Creating new head for future segment (%d)", i)
|
|
||||||
go ts.run(i)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
ts.lock.RUnlock()
|
ts.lock.RUnlock()
|
||||||
|
|
||||||
@ -326,14 +322,33 @@ func (ts *Stream) GetSegment(segment int32) (string, error) {
|
|||||||
return "", errors.New("could not retrive the selected segment (timeout)")
|
return "", errors.New("could not retrive the selected segment (timeout)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ts.prerareNextSegements(segment)
|
||||||
return fmt.Sprintf(ts.handle.getOutPath(ts.segments[segment].encoder), segment), nil
|
return fmt.Sprintf(ts.handle.getOutPath(ts.segments[segment].encoder), segment), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ts *Stream) prerareNextSegements(segment int32) {
|
||||||
|
ts.lock.RLock()
|
||||||
|
defer ts.lock.RUnlock()
|
||||||
|
for i := segment + 1; i <= min(segment+10, int32(len(ts.segments)-1)); i++ {
|
||||||
|
if ts.isSegmentReady(i) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// only start encode for segments not planned (getMinEncoderDistance returns Inf for them)
|
||||||
|
// or if they are 60s away (asume 5s per segments)
|
||||||
|
if ts.getMinEncoderDistance(i) < 60+(5*float64(i-segment)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Printf("Creating new head for future segment (%d)", i)
|
||||||
|
go ts.run(i)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (ts *Stream) getMinEncoderDistance(segment int32) float64 {
|
func (ts *Stream) getMinEncoderDistance(segment int32) float64 {
|
||||||
time := ts.file.Keyframes[segment]
|
time := ts.file.Keyframes[segment]
|
||||||
distances := Map(ts.heads, func(head Head, _ int) float64 {
|
distances := Map(ts.heads, func(head Head, _ int) float64 {
|
||||||
// ignore killed heads or heads after the current time
|
// ignore killed heads or heads after the current time
|
||||||
if head.segment < 0 || ts.file.Keyframes[head.segment] > time {
|
if head.segment < 0 || ts.file.Keyframes[head.segment] > time || segment >= head.end {
|
||||||
return math.Inf(1)
|
return math.Inf(1)
|
||||||
}
|
}
|
||||||
return time - ts.file.Keyframes[head.segment]
|
return time - ts.file.Keyframes[head.segment]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user