mirror of
				https://github.com/advplyr/audiobookshelf.git
				synced 2025-11-04 03:17:00 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			538 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			538 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
'use strict'
 | 
						|
 | 
						|
const abortControllerModule = require('../../../../../watcher/aborter/controller')
 | 
						|
 | 
						|
const AbortController = globalThis.AbortController || abortControllerModule.AbortController
 | 
						|
 | 
						|
const {
 | 
						|
  codes: { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE },
 | 
						|
  AbortError
 | 
						|
} = require('../../ours/errors')
 | 
						|
 | 
						|
const { validateAbortSignal, validateInteger, validateObject } = require('../validators')
 | 
						|
 | 
						|
const kWeakHandler = require('../../ours/primordials').Symbol('kWeak')
 | 
						|
 | 
						|
const { finished } = require('./end-of-stream')
 | 
						|
 | 
						|
const {
 | 
						|
  ArrayPrototypePush,
 | 
						|
  MathFloor,
 | 
						|
  Number,
 | 
						|
  NumberIsNaN,
 | 
						|
  Promise,
 | 
						|
  PromiseReject,
 | 
						|
  PromisePrototypeCatch,
 | 
						|
  Symbol
 | 
						|
} = require('../../ours/primordials')
 | 
						|
 | 
						|
const kEmpty = Symbol('kEmpty')
 | 
						|
const kEof = Symbol('kEof')
 | 
						|
 | 
						|
function map(fn, options) {
 | 
						|
  if (typeof fn !== 'function') {
 | 
						|
    throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
 | 
						|
  }
 | 
						|
 | 
						|
  if (options != null) {
 | 
						|
    validateObject(options, 'options')
 | 
						|
  }
 | 
						|
 | 
						|
  if ((options === null || options === undefined ? undefined : options.signal) != null) {
 | 
						|
    validateAbortSignal(options.signal, 'options.signal')
 | 
						|
  }
 | 
						|
 | 
						|
  let concurrency = 1
 | 
						|
 | 
						|
  if ((options === null || options === undefined ? undefined : options.concurrency) != null) {
 | 
						|
    concurrency = MathFloor(options.concurrency)
 | 
						|
  }
 | 
						|
 | 
						|
  validateInteger(concurrency, 'concurrency', 1)
 | 
						|
  return async function* map() {
 | 
						|
    var _options$signal, _options$signal2
 | 
						|
 | 
						|
    const ac = new AbortController()
 | 
						|
    const stream = this
 | 
						|
    const queue = []
 | 
						|
    const signal = ac.signal
 | 
						|
    const signalOpt = {
 | 
						|
      signal
 | 
						|
    }
 | 
						|
 | 
						|
    const abort = () => ac.abort()
 | 
						|
 | 
						|
    if (
 | 
						|
      options !== null &&
 | 
						|
      options !== undefined &&
 | 
						|
      (_options$signal = options.signal) !== null &&
 | 
						|
      _options$signal !== undefined &&
 | 
						|
      _options$signal.aborted
 | 
						|
    ) {
 | 
						|
      abort()
 | 
						|
    }
 | 
						|
 | 
						|
    options === null || options === undefined
 | 
						|
      ? undefined
 | 
						|
      : (_options$signal2 = options.signal) === null || _options$signal2 === undefined
 | 
						|
        ? undefined
 | 
						|
        : _options$signal2.addEventListener('abort', abort)
 | 
						|
    let next
 | 
						|
    let resume
 | 
						|
    let done = false
 | 
						|
 | 
						|
    function onDone() {
 | 
						|
      done = true
 | 
						|
    }
 | 
						|
 | 
						|
    async function pump() {
 | 
						|
      try {
 | 
						|
        for await (let val of stream) {
 | 
						|
          var _val
 | 
						|
 | 
						|
          if (done) {
 | 
						|
            return
 | 
						|
          }
 | 
						|
 | 
						|
          if (signal.aborted) {
 | 
						|
            throw new AbortError()
 | 
						|
          }
 | 
						|
 | 
						|
          try {
 | 
						|
            val = fn(val, signalOpt)
 | 
						|
          } catch (err) {
 | 
						|
            val = PromiseReject(err)
 | 
						|
          }
 | 
						|
 | 
						|
          if (val === kEmpty) {
 | 
						|
            continue
 | 
						|
          }
 | 
						|
 | 
						|
          if (typeof ((_val = val) === null || _val === undefined ? undefined : _val.catch) === 'function') {
 | 
						|
            val.catch(onDone)
 | 
						|
          }
 | 
						|
 | 
						|
          queue.push(val)
 | 
						|
 | 
						|
          if (next) {
 | 
						|
            next()
 | 
						|
            next = null
 | 
						|
          }
 | 
						|
 | 
						|
          if (!done && queue.length && queue.length >= concurrency) {
 | 
						|
            await new Promise((resolve) => {
 | 
						|
              resume = resolve
 | 
						|
            })
 | 
						|
          }
 | 
						|
        }
 | 
						|
 | 
						|
        queue.push(kEof)
 | 
						|
      } catch (err) {
 | 
						|
        const val = PromiseReject(err)
 | 
						|
        PromisePrototypeCatch(val, onDone)
 | 
						|
        queue.push(val)
 | 
						|
      } finally {
 | 
						|
        var _options$signal3
 | 
						|
 | 
						|
        done = true
 | 
						|
 | 
						|
        if (next) {
 | 
						|
          next()
 | 
						|
          next = null
 | 
						|
        }
 | 
						|
 | 
						|
        options === null || options === undefined
 | 
						|
          ? undefined
 | 
						|
          : (_options$signal3 = options.signal) === null || _options$signal3 === undefined
 | 
						|
            ? undefined
 | 
						|
            : _options$signal3.removeEventListener('abort', abort)
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    pump()
 | 
						|
 | 
						|
    try {
 | 
						|
      while (true) {
 | 
						|
        while (queue.length > 0) {
 | 
						|
          const val = await queue[0]
 | 
						|
 | 
						|
          if (val === kEof) {
 | 
						|
            return
 | 
						|
          }
 | 
						|
 | 
						|
          if (signal.aborted) {
 | 
						|
            throw new AbortError()
 | 
						|
          }
 | 
						|
 | 
						|
          if (val !== kEmpty) {
 | 
						|
            yield val
 | 
						|
          }
 | 
						|
 | 
						|
          queue.shift()
 | 
						|
 | 
						|
          if (resume) {
 | 
						|
            resume()
 | 
						|
            resume = null
 | 
						|
          }
 | 
						|
        }
 | 
						|
 | 
						|
        await new Promise((resolve) => {
 | 
						|
          next = resolve
 | 
						|
        })
 | 
						|
      }
 | 
						|
    } finally {
 | 
						|
      ac.abort()
 | 
						|
      done = true
 | 
						|
 | 
						|
      if (resume) {
 | 
						|
        resume()
 | 
						|
        resume = null
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }.call(this)
 | 
						|
}
 | 
						|
 | 
						|
function asIndexedPairs(options = undefined) {
 | 
						|
  if (options != null) {
 | 
						|
    validateObject(options, 'options')
 | 
						|
  }
 | 
						|
 | 
						|
  if ((options === null || options === undefined ? undefined : options.signal) != null) {
 | 
						|
    validateAbortSignal(options.signal, 'options.signal')
 | 
						|
  }
 | 
						|
 | 
						|
  return async function* asIndexedPairs() {
 | 
						|
    let index = 0
 | 
						|
 | 
						|
    for await (const val of this) {
 | 
						|
      var _options$signal4
 | 
						|
 | 
						|
      if (
 | 
						|
        options !== null &&
 | 
						|
        options !== undefined &&
 | 
						|
        (_options$signal4 = options.signal) !== null &&
 | 
						|
        _options$signal4 !== undefined &&
 | 
						|
        _options$signal4.aborted
 | 
						|
      ) {
 | 
						|
        throw new AbortError({
 | 
						|
          cause: options.signal.reason
 | 
						|
        })
 | 
						|
      }
 | 
						|
 | 
						|
      yield [index++, val]
 | 
						|
    }
 | 
						|
  }.call(this)
 | 
						|
}
 | 
						|
 | 
						|
async function some(fn, options = undefined) {
 | 
						|
  // eslint-disable-next-line no-unused-vars
 | 
						|
  for await (const unused of filter.call(this, fn, options)) {
 | 
						|
    return true
 | 
						|
  }
 | 
						|
 | 
						|
  return false
 | 
						|
}
 | 
						|
 | 
						|
async function every(fn, options = undefined) {
 | 
						|
  if (typeof fn !== 'function') {
 | 
						|
    throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
 | 
						|
  } // https://en.wikipedia.org/wiki/De_Morgan%27s_laws
 | 
						|
 | 
						|
  return !(await some.call(
 | 
						|
    this,
 | 
						|
    async (...args) => {
 | 
						|
      return !(await fn(...args))
 | 
						|
    },
 | 
						|
    options
 | 
						|
  ))
 | 
						|
}
 | 
						|
 | 
						|
async function find(fn, options) {
 | 
						|
  for await (const result of filter.call(this, fn, options)) {
 | 
						|
    return result
 | 
						|
  }
 | 
						|
 | 
						|
  return undefined
 | 
						|
}
 | 
						|
 | 
						|
async function forEach(fn, options) {
 | 
						|
  if (typeof fn !== 'function') {
 | 
						|
    throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
 | 
						|
  }
 | 
						|
 | 
						|
  async function forEachFn(value, options) {
 | 
						|
    await fn(value, options)
 | 
						|
    return kEmpty
 | 
						|
  } // eslint-disable-next-line no-unused-vars
 | 
						|
 | 
						|
  for await (const unused of map.call(this, forEachFn, options));
 | 
						|
}
 | 
						|
 | 
						|
function filter(fn, options) {
 | 
						|
  if (typeof fn !== 'function') {
 | 
						|
    throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
 | 
						|
  }
 | 
						|
 | 
						|
  async function filterFn(value, options) {
 | 
						|
    if (await fn(value, options)) {
 | 
						|
      return value
 | 
						|
    }
 | 
						|
 | 
						|
    return kEmpty
 | 
						|
  }
 | 
						|
 | 
						|
  return map.call(this, filterFn, options)
 | 
						|
} // Specific to provide better error to reduce since the argument is only
 | 
						|
// missing if the stream has no items in it - but the code is still appropriate
 | 
						|
 | 
						|
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
 | 
						|
  constructor() {
 | 
						|
    super('reduce')
 | 
						|
    this.message = 'Reduce of an empty stream requires an initial value'
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
async function reduce(reducer, initialValue, options) {
 | 
						|
  var _options$signal5
 | 
						|
 | 
						|
  if (typeof reducer !== 'function') {
 | 
						|
    throw new ERR_INVALID_ARG_TYPE('reducer', ['Function', 'AsyncFunction'], reducer)
 | 
						|
  }
 | 
						|
 | 
						|
  if (options != null) {
 | 
						|
    validateObject(options, 'options')
 | 
						|
  }
 | 
						|
 | 
						|
  if ((options === null || options === undefined ? undefined : options.signal) != null) {
 | 
						|
    validateAbortSignal(options.signal, 'options.signal')
 | 
						|
  }
 | 
						|
 | 
						|
  let hasInitialValue = arguments.length > 1
 | 
						|
 | 
						|
  if (
 | 
						|
    options !== null &&
 | 
						|
    options !== undefined &&
 | 
						|
    (_options$signal5 = options.signal) !== null &&
 | 
						|
    _options$signal5 !== undefined &&
 | 
						|
    _options$signal5.aborted
 | 
						|
  ) {
 | 
						|
    const err = new AbortError(undefined, {
 | 
						|
      cause: options.signal.reason
 | 
						|
    })
 | 
						|
    this.once('error', () => { }) // The error is already propagated
 | 
						|
 | 
						|
    await finished(this.destroy(err))
 | 
						|
    throw err
 | 
						|
  }
 | 
						|
 | 
						|
  const ac = new AbortController()
 | 
						|
  const signal = ac.signal
 | 
						|
 | 
						|
  if (options !== null && options !== undefined && options.signal) {
 | 
						|
    const opts = {
 | 
						|
      once: true,
 | 
						|
      [kWeakHandler]: this
 | 
						|
    }
 | 
						|
    options.signal.addEventListener('abort', () => ac.abort(), opts)
 | 
						|
  }
 | 
						|
 | 
						|
  let gotAnyItemFromStream = false
 | 
						|
 | 
						|
  try {
 | 
						|
    for await (const value of this) {
 | 
						|
      var _options$signal6
 | 
						|
 | 
						|
      gotAnyItemFromStream = true
 | 
						|
 | 
						|
      if (
 | 
						|
        options !== null &&
 | 
						|
        options !== undefined &&
 | 
						|
        (_options$signal6 = options.signal) !== null &&
 | 
						|
        _options$signal6 !== undefined &&
 | 
						|
        _options$signal6.aborted
 | 
						|
      ) {
 | 
						|
        throw new AbortError()
 | 
						|
      }
 | 
						|
 | 
						|
      if (!hasInitialValue) {
 | 
						|
        initialValue = value
 | 
						|
        hasInitialValue = true
 | 
						|
      } else {
 | 
						|
        initialValue = await reducer(initialValue, value, {
 | 
						|
          signal
 | 
						|
        })
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    if (!gotAnyItemFromStream && !hasInitialValue) {
 | 
						|
      throw new ReduceAwareErrMissingArgs()
 | 
						|
    }
 | 
						|
  } finally {
 | 
						|
    ac.abort()
 | 
						|
  }
 | 
						|
 | 
						|
  return initialValue
 | 
						|
}
 | 
						|
 | 
						|
async function toArray(options) {
 | 
						|
  if (options != null) {
 | 
						|
    validateObject(options, 'options')
 | 
						|
  }
 | 
						|
 | 
						|
  if ((options === null || options === undefined ? undefined : options.signal) != null) {
 | 
						|
    validateAbortSignal(options.signal, 'options.signal')
 | 
						|
  }
 | 
						|
 | 
						|
  const result = []
 | 
						|
 | 
						|
  for await (const val of this) {
 | 
						|
    var _options$signal7
 | 
						|
 | 
						|
    if (
 | 
						|
      options !== null &&
 | 
						|
      options !== undefined &&
 | 
						|
      (_options$signal7 = options.signal) !== null &&
 | 
						|
      _options$signal7 !== undefined &&
 | 
						|
      _options$signal7.aborted
 | 
						|
    ) {
 | 
						|
      throw new AbortError(undefined, {
 | 
						|
        cause: options.signal.reason
 | 
						|
      })
 | 
						|
    }
 | 
						|
 | 
						|
    ArrayPrototypePush(result, val)
 | 
						|
  }
 | 
						|
 | 
						|
  return result
 | 
						|
}
 | 
						|
 | 
						|
function flatMap(fn, options) {
 | 
						|
  const values = map.call(this, fn, options)
 | 
						|
  return async function* flatMap() {
 | 
						|
    for await (const val of values) {
 | 
						|
      yield* val
 | 
						|
    }
 | 
						|
  }.call(this)
 | 
						|
}
 | 
						|
 | 
						|
function toIntegerOrInfinity(number) {
 | 
						|
  // We coerce here to align with the spec
 | 
						|
  // https://github.com/tc39/proposal-iterator-helpers/issues/169
 | 
						|
  number = Number(number)
 | 
						|
 | 
						|
  if (NumberIsNaN(number)) {
 | 
						|
    return 0
 | 
						|
  }
 | 
						|
 | 
						|
  if (number < 0) {
 | 
						|
    throw new ERR_OUT_OF_RANGE('number', '>= 0', number)
 | 
						|
  }
 | 
						|
 | 
						|
  return number
 | 
						|
}
 | 
						|
 | 
						|
function drop(number, options = undefined) {
 | 
						|
  if (options != null) {
 | 
						|
    validateObject(options, 'options')
 | 
						|
  }
 | 
						|
 | 
						|
  if ((options === null || options === undefined ? undefined : options.signal) != null) {
 | 
						|
    validateAbortSignal(options.signal, 'options.signal')
 | 
						|
  }
 | 
						|
 | 
						|
  number = toIntegerOrInfinity(number)
 | 
						|
  return async function* drop() {
 | 
						|
    var _options$signal8
 | 
						|
 | 
						|
    if (
 | 
						|
      options !== null &&
 | 
						|
      options !== undefined &&
 | 
						|
      (_options$signal8 = options.signal) !== null &&
 | 
						|
      _options$signal8 !== undefined &&
 | 
						|
      _options$signal8.aborted
 | 
						|
    ) {
 | 
						|
      throw new AbortError()
 | 
						|
    }
 | 
						|
 | 
						|
    for await (const val of this) {
 | 
						|
      var _options$signal9
 | 
						|
 | 
						|
      if (
 | 
						|
        options !== null &&
 | 
						|
        options !== undefined &&
 | 
						|
        (_options$signal9 = options.signal) !== null &&
 | 
						|
        _options$signal9 !== undefined &&
 | 
						|
        _options$signal9.aborted
 | 
						|
      ) {
 | 
						|
        throw new AbortError()
 | 
						|
      }
 | 
						|
 | 
						|
      if (number-- <= 0) {
 | 
						|
        yield val
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }.call(this)
 | 
						|
}
 | 
						|
 | 
						|
function take(number, options = undefined) {
 | 
						|
  if (options != null) {
 | 
						|
    validateObject(options, 'options')
 | 
						|
  }
 | 
						|
 | 
						|
  if ((options === null || options === undefined ? undefined : options.signal) != null) {
 | 
						|
    validateAbortSignal(options.signal, 'options.signal')
 | 
						|
  }
 | 
						|
 | 
						|
  number = toIntegerOrInfinity(number)
 | 
						|
  return async function* take() {
 | 
						|
    var _options$signal10
 | 
						|
 | 
						|
    if (
 | 
						|
      options !== null &&
 | 
						|
      options !== undefined &&
 | 
						|
      (_options$signal10 = options.signal) !== null &&
 | 
						|
      _options$signal10 !== undefined &&
 | 
						|
      _options$signal10.aborted
 | 
						|
    ) {
 | 
						|
      throw new AbortError()
 | 
						|
    }
 | 
						|
 | 
						|
    for await (const val of this) {
 | 
						|
      var _options$signal11
 | 
						|
 | 
						|
      if (
 | 
						|
        options !== null &&
 | 
						|
        options !== undefined &&
 | 
						|
        (_options$signal11 = options.signal) !== null &&
 | 
						|
        _options$signal11 !== undefined &&
 | 
						|
        _options$signal11.aborted
 | 
						|
      ) {
 | 
						|
        throw new AbortError()
 | 
						|
      }
 | 
						|
 | 
						|
      if (number-- > 0) {
 | 
						|
        yield val
 | 
						|
      } else {
 | 
						|
        return
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }.call(this)
 | 
						|
}
 | 
						|
 | 
						|
module.exports.streamReturningOperators = {
 | 
						|
  asIndexedPairs,
 | 
						|
  drop,
 | 
						|
  filter,
 | 
						|
  flatMap,
 | 
						|
  map,
 | 
						|
  take
 | 
						|
}
 | 
						|
module.exports.promiseReturningOperators = {
 | 
						|
  every,
 | 
						|
  forEach,
 | 
						|
  reduce,
 | 
						|
  toArray,
 | 
						|
  some,
 | 
						|
  find
 | 
						|
}
 |