mirror of
				https://github.com/advplyr/audiobookshelf.git
				synced 2025-10-30 18:12:25 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			416 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			416 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| // Ported from https://github.com/mafintosh/pump with
 | |
| // permission from the author, Mathias Buus (@mafintosh).
 | |
| 'use strict'
 | |
| 
 | |
| const abortControllerModule = require('../../../../../watcher/aborter/controller')
 | |
| 
 | |
| const { ArrayIsArray, Promise, SymbolAsyncIterator } = require('../../ours/primordials')
 | |
| 
 | |
| const eos = require('./end-of-stream')
 | |
| 
 | |
| const { once } = require('../../ours/util')
 | |
| 
 | |
| const destroyImpl = require('./destroy')
 | |
| 
 | |
| const Duplex = require('./duplex')
 | |
| 
 | |
| const {
 | |
|   aggregateTwoErrors,
 | |
|   codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED },
 | |
|   AbortError
 | |
| } = require('../../ours/errors')
 | |
| 
 | |
| const { validateFunction, validateAbortSignal } = require('../validators')
 | |
| 
 | |
| const { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require('./utils')
 | |
| 
 | |
| const AbortController = globalThis.AbortController || abortControllerModule.AbortController
 | |
| let PassThrough
 | |
| let Readable
 | |
| 
 | |
| function destroyer(stream, reading, writing) {
 | |
|   let finished = false
 | |
|   stream.on('close', () => {
 | |
|     finished = true
 | |
|   })
 | |
|   const cleanup = eos(
 | |
|     stream,
 | |
|     {
 | |
|       readable: reading,
 | |
|       writable: writing
 | |
|     },
 | |
|     (err) => {
 | |
|       finished = !err
 | |
|     }
 | |
|   )
 | |
|   return {
 | |
|     destroy: (err) => {
 | |
|       if (finished) return
 | |
|       finished = true
 | |
|       destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'))
 | |
|     },
 | |
|     cleanup
 | |
|   }
 | |
| }
 | |
| 
 | |
| function popCallback(streams) {
 | |
|   // Streams should never be an empty array. It should always contain at least
 | |
|   // a single stream. Therefore optimize for the average case instead of
 | |
|   // checking for length === 0 as well.
 | |
|   validateFunction(streams[streams.length - 1], 'streams[stream.length - 1]')
 | |
|   return streams.pop()
 | |
| }
 | |
| 
 | |
| function makeAsyncIterable(val) {
 | |
|   if (isIterable(val)) {
 | |
|     return val
 | |
|   } else if (isReadableNodeStream(val)) {
 | |
|     // Legacy streams are not Iterable.
 | |
|     return fromReadable(val)
 | |
|   }
 | |
| 
 | |
|   throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], val)
 | |
| }
 | |
| 
 | |
| async function* fromReadable(val) {
 | |
|   if (!Readable) {
 | |
|     Readable = require('./readable')
 | |
|   }
 | |
| 
 | |
|   yield* Readable.prototype[SymbolAsyncIterator].call(val)
 | |
| }
 | |
| 
 | |
| async function pump(iterable, writable, finish, { end }) {
 | |
|   let error
 | |
|   let onresolve = null
 | |
| 
 | |
|   const resume = (err) => {
 | |
|     if (err) {
 | |
|       error = err
 | |
|     }
 | |
| 
 | |
|     if (onresolve) {
 | |
|       const callback = onresolve
 | |
|       onresolve = null
 | |
|       callback()
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   const wait = () =>
 | |
|     new Promise((resolve, reject) => {
 | |
|       if (error) {
 | |
|         reject(error)
 | |
|       } else {
 | |
|         onresolve = () => {
 | |
|           if (error) {
 | |
|             reject(error)
 | |
|           } else {
 | |
|             resolve()
 | |
|           }
 | |
|         }
 | |
|       }
 | |
|     })
 | |
| 
 | |
|   writable.on('drain', resume)
 | |
|   const cleanup = eos(
 | |
|     writable,
 | |
|     {
 | |
|       readable: false
 | |
|     },
 | |
|     resume
 | |
|   )
 | |
| 
 | |
|   try {
 | |
|     if (writable.writableNeedDrain) {
 | |
|       await wait()
 | |
|     }
 | |
| 
 | |
|     for await (const chunk of iterable) {
 | |
|       if (!writable.write(chunk)) {
 | |
|         await wait()
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if (end) {
 | |
|       writable.end()
 | |
|     }
 | |
| 
 | |
|     await wait()
 | |
|     finish()
 | |
|   } catch (err) {
 | |
|     finish(error !== err ? aggregateTwoErrors(error, err) : err)
 | |
|   } finally {
 | |
|     cleanup()
 | |
|     writable.off('drain', resume)
 | |
|   }
 | |
| }
 | |
| 
 | |
| function pipeline(...streams) {
 | |
|   return pipelineImpl(streams, once(popCallback(streams)))
 | |
| }
 | |
| 
 | |
| function pipelineImpl(streams, callback, opts) {
 | |
|   if (streams.length === 1 && ArrayIsArray(streams[0])) {
 | |
|     streams = streams[0]
 | |
|   }
 | |
| 
 | |
|   if (streams.length < 2) {
 | |
|     throw new ERR_MISSING_ARGS('streams')
 | |
|   }
 | |
| 
 | |
|   const ac = new AbortController()
 | |
|   const signal = ac.signal
 | |
|   const outerSignal = opts === null || opts === undefined ? undefined : opts.signal // Need to cleanup event listeners if last stream is readable
 | |
|   // https://github.com/nodejs/node/issues/35452
 | |
| 
 | |
|   const lastStreamCleanup = []
 | |
|   validateAbortSignal(outerSignal, 'options.signal')
 | |
| 
 | |
|   function abort() {
 | |
|     finishImpl(new AbortError())
 | |
|   }
 | |
| 
 | |
|   outerSignal === null || outerSignal === undefined ? undefined : outerSignal.addEventListener('abort', abort)
 | |
|   let error
 | |
|   let value
 | |
|   const destroys = []
 | |
|   let finishCount = 0
 | |
| 
 | |
|   function finish(err) {
 | |
|     finishImpl(err, --finishCount === 0)
 | |
|   }
 | |
| 
 | |
|   function finishImpl(err, final) {
 | |
|     if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
 | |
|       error = err
 | |
|     }
 | |
| 
 | |
|     if (!error && !final) {
 | |
|       return
 | |
|     }
 | |
| 
 | |
|     while (destroys.length) {
 | |
|       destroys.shift()(error)
 | |
|     }
 | |
| 
 | |
|     outerSignal === null || outerSignal === undefined ? undefined : outerSignal.removeEventListener('abort', abort)
 | |
|     ac.abort()
 | |
| 
 | |
|     if (final) {
 | |
|       if (!error) {
 | |
|         lastStreamCleanup.forEach((fn) => fn())
 | |
|       }
 | |
| 
 | |
|       process.nextTick(callback, error, value)
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   let ret
 | |
| 
 | |
|   for (let i = 0; i < streams.length; i++) {
 | |
|     const stream = streams[i]
 | |
|     const reading = i < streams.length - 1
 | |
|     const writing = i > 0
 | |
|     const end = reading || (opts === null || opts === undefined ? undefined : opts.end) !== false
 | |
|     const isLastStream = i === streams.length - 1
 | |
| 
 | |
|     if (isNodeStream(stream)) {
 | |
|       if (end) {
 | |
|         const { destroy, cleanup } = destroyer(stream, reading, writing)
 | |
|         destroys.push(destroy)
 | |
| 
 | |
|         if (isReadable(stream) && isLastStream) {
 | |
|           lastStreamCleanup.push(cleanup)
 | |
|         }
 | |
|       } // Catch stream errors that occur after pipe/pump has completed.
 | |
| 
 | |
|       function onError(err) {
 | |
|         if (err && err.name !== 'AbortError' && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
 | |
|           finish(err)
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       stream.on('error', onError)
 | |
| 
 | |
|       if (isReadable(stream) && isLastStream) {
 | |
|         lastStreamCleanup.push(() => {
 | |
|           stream.removeListener('error', onError)
 | |
|         })
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if (i === 0) {
 | |
|       if (typeof stream === 'function') {
 | |
|         ret = stream({
 | |
|           signal
 | |
|         })
 | |
| 
 | |
|         if (!isIterable(ret)) {
 | |
|           throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or Stream', 'source', ret)
 | |
|         }
 | |
|       } else if (isIterable(stream) || isReadableNodeStream(stream)) {
 | |
|         ret = stream
 | |
|       } else {
 | |
|         ret = Duplex.from(stream)
 | |
|       }
 | |
|     } else if (typeof stream === 'function') {
 | |
|       ret = makeAsyncIterable(ret)
 | |
|       ret = stream(ret, {
 | |
|         signal
 | |
|       })
 | |
| 
 | |
|       if (reading) {
 | |
|         if (!isIterable(ret, true)) {
 | |
|           throw new ERR_INVALID_RETURN_VALUE('AsyncIterable', `transform[${i - 1}]`, ret)
 | |
|         }
 | |
|       } else {
 | |
|         var _ret
 | |
| 
 | |
|         if (!PassThrough) {
 | |
|           PassThrough = require('./passthrough')
 | |
|         } // If the last argument to pipeline is not a stream
 | |
|         // we must create a proxy stream so that pipeline(...)
 | |
|         // always returns a stream which can be further
 | |
|         // composed through `.pipe(stream)`.
 | |
| 
 | |
|         const pt = new PassThrough({
 | |
|           objectMode: true
 | |
|         }) // Handle Promises/A+ spec, `then` could be a getter that throws on
 | |
|         // second use.
 | |
| 
 | |
|         const then = (_ret = ret) === null || _ret === undefined ? undefined : _ret.then
 | |
| 
 | |
|         if (typeof then === 'function') {
 | |
|           finishCount++
 | |
|           then.call(
 | |
|             ret,
 | |
|             (val) => {
 | |
|               value = val
 | |
| 
 | |
|               if (val != null) {
 | |
|                 pt.write(val)
 | |
|               }
 | |
| 
 | |
|               if (end) {
 | |
|                 pt.end()
 | |
|               }
 | |
| 
 | |
|               process.nextTick(finish)
 | |
|             },
 | |
|             (err) => {
 | |
|               pt.destroy(err)
 | |
|               process.nextTick(finish, err)
 | |
|             }
 | |
|           )
 | |
|         } else if (isIterable(ret, true)) {
 | |
|           finishCount++
 | |
|           pump(ret, pt, finish, {
 | |
|             end
 | |
|           })
 | |
|         } else {
 | |
|           throw new ERR_INVALID_RETURN_VALUE('AsyncIterable or Promise', 'destination', ret)
 | |
|         }
 | |
| 
 | |
|         ret = pt
 | |
|         const { destroy, cleanup } = destroyer(ret, false, true)
 | |
|         destroys.push(destroy)
 | |
| 
 | |
|         if (isLastStream) {
 | |
|           lastStreamCleanup.push(cleanup)
 | |
|         }
 | |
|       }
 | |
|     } else if (isNodeStream(stream)) {
 | |
|       if (isReadableNodeStream(ret)) {
 | |
|         finishCount += 2
 | |
|         const cleanup = pipe(ret, stream, finish, {
 | |
|           end
 | |
|         })
 | |
| 
 | |
|         if (isReadable(stream) && isLastStream) {
 | |
|           lastStreamCleanup.push(cleanup)
 | |
|         }
 | |
|       } else if (isIterable(ret)) {
 | |
|         finishCount++
 | |
|         pump(ret, stream, finish, {
 | |
|           end
 | |
|         })
 | |
|       } else {
 | |
|         throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], ret)
 | |
|       }
 | |
| 
 | |
|       ret = stream
 | |
|     } else {
 | |
|       ret = Duplex.from(stream)
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (
 | |
|     (signal !== null && signal !== undefined && signal.aborted) ||
 | |
|     (outerSignal !== null && outerSignal !== undefined && outerSignal.aborted)
 | |
|   ) {
 | |
|     process.nextTick(abort)
 | |
|   }
 | |
| 
 | |
|   return ret
 | |
| }
 | |
| 
 | |
| function pipe(src, dst, finish, { end }) {
 | |
|   src.pipe(dst, {
 | |
|     end
 | |
|   })
 | |
| 
 | |
|   if (end) {
 | |
|     // Compat. Before node v10.12.0 stdio used to throw an error so
 | |
|     // pipe() did/does not end() stdio destinations.
 | |
|     // Now they allow it but "secretly" don't close the underlying fd.
 | |
|     src.once('end', () => dst.end())
 | |
|   } else {
 | |
|     finish()
 | |
|   }
 | |
| 
 | |
|   eos(
 | |
|     src,
 | |
|     {
 | |
|       readable: true,
 | |
|       writable: false
 | |
|     },
 | |
|     (err) => {
 | |
|       const rState = src._readableState
 | |
| 
 | |
|       if (
 | |
|         err &&
 | |
|         err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
 | |
|         rState &&
 | |
|         rState.ended &&
 | |
|         !rState.errored &&
 | |
|         !rState.errorEmitted
 | |
|       ) {
 | |
|         // Some readable streams will emit 'close' before 'end'. However, since
 | |
|         // this is on the readable side 'end' should still be emitted if the
 | |
|         // stream has been ended and no error emitted. This should be allowed in
 | |
|         // favor of backwards compatibility. Since the stream is piped to a
 | |
|         // destination this should not result in any observable difference.
 | |
|         // We don't need to check if this is a writable premature close since
 | |
|         // eos will only fail with premature close on the reading side for
 | |
|         // duplex streams.
 | |
|         src.once('end', finish).once('error', finish)
 | |
|       } else {
 | |
|         finish(err)
 | |
|       }
 | |
|     }
 | |
|   )
 | |
|   return eos(
 | |
|     dst,
 | |
|     {
 | |
|       readable: false,
 | |
|       writable: true
 | |
|     },
 | |
|     finish
 | |
|   )
 | |
| }
 | |
| 
 | |
| module.exports = {
 | |
|   pipelineImpl,
 | |
|   pipeline
 | |
| }
 |