diff --git a/transcoder/src/runlock.go b/transcoder/src/runlock.go new file mode 100644 index 00000000..a971815e --- /dev/null +++ b/transcoder/src/runlock.go @@ -0,0 +1,63 @@ +package src + +import ( + "errors" + "sync" +) + +type RunLock[K comparable, V any] struct { + running map[K]*Task[V] + lock sync.Mutex +} + +type Task[V any] struct { + ready sync.WaitGroup + listeners []chan Result[V] +} + +type Result[V any] struct { + ok V + err error +} + +func NewRunLock[K comparable, V any]() RunLock[K, V] { + return RunLock[K, V]{ + running: make(map[K]*Task[V]), + } +} + +func (r *RunLock[K, V]) Start(key K) (func() (V, error), func(val V, err error) (V, error)) { + r.lock.Lock() + defer r.lock.Unlock() + task, ok := r.running[key] + + if ok { + ret := make(chan Result[V]) + task.listeners = append(task.listeners, ret) + return func() (V, error) { + res := <-ret + return res.ok, res.err + }, nil + } + + r.running[key] = &Task[V]{ + listeners: make([]chan Result[V], 0), + } + + return nil, func(val V, err error) (V, error) { + r.lock.Lock() + defer r.lock.Unlock() + + task, ok = r.running[key] + if !ok { + return val, errors.New("invalid run lock state. aborting.") + } + + for _, listener := range task.listeners { + listener <- Result[V]{ok: val, err: err} + close(listener) + } + delete(r.running, key) + return val, err + } +}