From 5e026449cc872fe98ed05031d1d560c35b9c2d89 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Sun, 28 Jul 2024 15:07:29 +0200 Subject: [PATCH] Add runing task dedup lock --- transcoder/src/runlock.go | 63 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 transcoder/src/runlock.go diff --git a/transcoder/src/runlock.go b/transcoder/src/runlock.go new file mode 100644 index 00000000..a971815e --- /dev/null +++ b/transcoder/src/runlock.go @@ -0,0 +1,63 @@ +package src + +import ( + "errors" + "sync" +) + +type RunLock[K comparable, V any] struct { + running map[K]*Task[V] + lock sync.Mutex +} + +type Task[V any] struct { + ready sync.WaitGroup + listeners []chan Result[V] +} + +type Result[V any] struct { + ok V + err error +} + +func NewRunLock[K comparable, V any]() RunLock[K, V] { + return RunLock[K, V]{ + running: make(map[K]*Task[V]), + } +} + +func (r *RunLock[K, V]) Start(key K) (func() (V, error), func(val V, err error) (V, error)) { + r.lock.Lock() + defer r.lock.Unlock() + task, ok := r.running[key] + + if ok { + ret := make(chan Result[V]) + task.listeners = append(task.listeners, ret) + return func() (V, error) { + res := <-ret + return res.ok, res.err + }, nil + } + + r.running[key] = &Task[V]{ + listeners: make([]chan Result[V], 0), + } + + return nil, func(val V, err error) (V, error) { + r.lock.Lock() + defer r.lock.Unlock() + + task, ok = r.running[key] + if !ok { + return val, errors.New("invalid run lock state. aborting.") + } + + for _, listener := range task.listeners { + listener <- Result[V]{ok: val, err: err} + close(listener) + } + delete(r.running, key) + return val, err + } +}