mirror of
https://github.com/zoriya/Kyoo.git
synced 2025-05-24 02:02:36 -04:00
Fix transcoder S3 deadlock when writer does not respect context cancellation
Signed-off-by: Fred Heinecke <fred.heinecke@yahoo.com>
This commit is contained in:
parent
949a367d0d
commit
a16d09d692
@ -166,7 +166,6 @@ func (ssb *S3StorageBackend) DeleteItemsWithPrefix(ctx context.Context, pathPref
|
|||||||
func (ssb *S3StorageBackend) SaveItemWithCallback(ctx context.Context, path string, writeContents ContentsWriterCallback) (err error) {
|
func (ssb *S3StorageBackend) SaveItemWithCallback(ctx context.Context, path string, writeContents ContentsWriterCallback) (err error) {
|
||||||
// Create a pipe to connect the writer and reader.
|
// Create a pipe to connect the writer and reader.
|
||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
defer utils.CleanupWithErr(&err, pr.Close, "failed to close pipe reader")
|
|
||||||
|
|
||||||
// Start a goroutine to write to the pipe.
|
// Start a goroutine to write to the pipe.
|
||||||
// Writing and reading must occur concurrently to avoid deadlocks.
|
// Writing and reading must occur concurrently to avoid deadlocks.
|
||||||
@ -174,7 +173,6 @@ func (ssb *S3StorageBackend) SaveItemWithCallback(ctx context.Context, path stri
|
|||||||
// Use a separate context for the writer to allow cancellation if the upload fails. This is important to avoid
|
// Use a separate context for the writer to allow cancellation if the upload fails. This is important to avoid
|
||||||
// a hung goroutine leak if the upload fails.
|
// a hung goroutine leak if the upload fails.
|
||||||
writeCtx, cancel := context.WithCancel(ctx)
|
writeCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
var writerGroup errgroup.Group
|
var writerGroup errgroup.Group
|
||||||
writerGroup.Go(func() (err error) {
|
writerGroup.Go(func() (err error) {
|
||||||
@ -182,16 +180,20 @@ func (ssb *S3StorageBackend) SaveItemWithCallback(ctx context.Context, path stri
|
|||||||
return writeContents(writeCtx, pw)
|
return writeContents(writeCtx, pw)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Handle cleanup and avoid a goroutines leak.
|
||||||
|
// Order is critical here. If the context is not cancelled, or the pipe is not closed
|
||||||
|
// before waiting for the writer to finish, there can be a deadlock. This happens when
|
||||||
|
// a writer without context support is waiting for written bytes to be read.
|
||||||
|
// Remember: the last deferred function is executed first.
|
||||||
// Wait for the write to complete and check for errors.
|
// Wait for the write to complete and check for errors.
|
||||||
// This should always happen even if saving fails, to prevent a goroutine leak.
|
// This should always happen even if saving fails, to prevent a goroutine leak.
|
||||||
defer utils.CleanupWithErr(&err, writerGroup.Wait, "writer callback failed")
|
defer utils.CleanupWithErr(&err, writerGroup.Wait, "writer callback failed")
|
||||||
|
defer utils.CleanupWithErr(&err, pr.Close, "failed to close pipe reader")
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
// Upload the object to S3 using the pipe as the body.
|
// Upload the object to S3 using the pipe as the body.
|
||||||
if err := ssb.SaveItem(ctx, path, pr); err != nil {
|
if err := ssb.SaveItem(ctx, path, pr); err != nil {
|
||||||
// Ensure that the writer context is cancelled prior to awaiting for the writer to finish.
|
return fmt.Errorf("failed to save item with path %q: %w", path, err)
|
||||||
// This is important to avoid a hung goroutine leak if the upload fails.
|
|
||||||
cancel()
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user