mirror of
				https://github.com/advplyr/audiobookshelf.git
				synced 2025-11-03 19:07:00 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			259 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			259 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
// Ported from https://github.com/mafintosh/end-of-stream with
 | 
						|
// permission from the author, Mathias Buus (@mafintosh).
 | 
						|
'use strict'
 | 
						|
 | 
						|
const { AbortError, codes } = require('../../ours/errors')
 | 
						|
 | 
						|
const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes
 | 
						|
 | 
						|
const { once } = require('../../ours/util')
 | 
						|
 | 
						|
const { validateAbortSignal, validateFunction, validateObject } = require('../validators')
 | 
						|
 | 
						|
const { Promise } = require('../../ours/primordials')
 | 
						|
 | 
						|
const {
 | 
						|
  isClosed,
 | 
						|
  isReadable,
 | 
						|
  isReadableNodeStream,
 | 
						|
  isReadableFinished,
 | 
						|
  isReadableErrored,
 | 
						|
  isWritable,
 | 
						|
  isWritableNodeStream,
 | 
						|
  isWritableFinished,
 | 
						|
  isWritableErrored,
 | 
						|
  isNodeStream,
 | 
						|
  willEmitClose: _willEmitClose
 | 
						|
} = require('./utils')
 | 
						|
 | 
						|
function isRequest(stream) {
 | 
						|
  return stream.setHeader && typeof stream.abort === 'function'
 | 
						|
}
 | 
						|
 | 
						|
const nop = () => {}
 | 
						|
 | 
						|
function eos(stream, options, callback) {
 | 
						|
  var _options$readable, _options$writable
 | 
						|
 | 
						|
  if (arguments.length === 2) {
 | 
						|
    callback = options
 | 
						|
    options = {}
 | 
						|
  } else if (options == null) {
 | 
						|
    options = {}
 | 
						|
  } else {
 | 
						|
    validateObject(options, 'options')
 | 
						|
  }
 | 
						|
 | 
						|
  validateFunction(callback, 'callback')
 | 
						|
  validateAbortSignal(options.signal, 'options.signal')
 | 
						|
  callback = once(callback)
 | 
						|
  const readable =
 | 
						|
    (_options$readable = options.readable) !== null && _options$readable !== undefined
 | 
						|
      ? _options$readable
 | 
						|
      : isReadableNodeStream(stream)
 | 
						|
  const writable =
 | 
						|
    (_options$writable = options.writable) !== null && _options$writable !== undefined
 | 
						|
      ? _options$writable
 | 
						|
      : isWritableNodeStream(stream)
 | 
						|
 | 
						|
  if (!isNodeStream(stream)) {
 | 
						|
    // TODO: Webstreams.
 | 
						|
    throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream)
 | 
						|
  }
 | 
						|
 | 
						|
  const wState = stream._writableState
 | 
						|
  const rState = stream._readableState
 | 
						|
 | 
						|
  const onlegacyfinish = () => {
 | 
						|
    if (!stream.writable) {
 | 
						|
      onfinish()
 | 
						|
    }
 | 
						|
  } // TODO (ronag): Improve soft detection to include core modules and
 | 
						|
  // common ecosystem modules that do properly emit 'close' but fail
 | 
						|
  // this generic check.
 | 
						|
 | 
						|
  let willEmitClose =
 | 
						|
    _willEmitClose(stream) && isReadableNodeStream(stream) === readable && isWritableNodeStream(stream) === writable
 | 
						|
  let writableFinished = isWritableFinished(stream, false)
 | 
						|
 | 
						|
  const onfinish = () => {
 | 
						|
    writableFinished = true // Stream should not be destroyed here. If it is that
 | 
						|
    // means that user space is doing something differently and
 | 
						|
    // we cannot trust willEmitClose.
 | 
						|
 | 
						|
    if (stream.destroyed) {
 | 
						|
      willEmitClose = false
 | 
						|
    }
 | 
						|
 | 
						|
    if (willEmitClose && (!stream.readable || readable)) {
 | 
						|
      return
 | 
						|
    }
 | 
						|
 | 
						|
    if (!readable || readableFinished) {
 | 
						|
      callback.call(stream)
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  let readableFinished = isReadableFinished(stream, false)
 | 
						|
 | 
						|
  const onend = () => {
 | 
						|
    readableFinished = true // Stream should not be destroyed here. If it is that
 | 
						|
    // means that user space is doing something differently and
 | 
						|
    // we cannot trust willEmitClose.
 | 
						|
 | 
						|
    if (stream.destroyed) {
 | 
						|
      willEmitClose = false
 | 
						|
    }
 | 
						|
 | 
						|
    if (willEmitClose && (!stream.writable || writable)) {
 | 
						|
      return
 | 
						|
    }
 | 
						|
 | 
						|
    if (!writable || writableFinished) {
 | 
						|
      callback.call(stream)
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  const onerror = (err) => {
 | 
						|
    callback.call(stream, err)
 | 
						|
  }
 | 
						|
 | 
						|
  let closed = isClosed(stream)
 | 
						|
 | 
						|
  const onclose = () => {
 | 
						|
    closed = true
 | 
						|
    const errored = isWritableErrored(stream) || isReadableErrored(stream)
 | 
						|
 | 
						|
    if (errored && typeof errored !== 'boolean') {
 | 
						|
      return callback.call(stream, errored)
 | 
						|
    }
 | 
						|
 | 
						|
    if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
 | 
						|
      if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
 | 
						|
    }
 | 
						|
 | 
						|
    if (writable && !writableFinished) {
 | 
						|
      if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
 | 
						|
    }
 | 
						|
 | 
						|
    callback.call(stream)
 | 
						|
  }
 | 
						|
 | 
						|
  const onrequest = () => {
 | 
						|
    stream.req.on('finish', onfinish)
 | 
						|
  }
 | 
						|
 | 
						|
  if (isRequest(stream)) {
 | 
						|
    stream.on('complete', onfinish)
 | 
						|
 | 
						|
    if (!willEmitClose) {
 | 
						|
      stream.on('abort', onclose)
 | 
						|
    }
 | 
						|
 | 
						|
    if (stream.req) {
 | 
						|
      onrequest()
 | 
						|
    } else {
 | 
						|
      stream.on('request', onrequest)
 | 
						|
    }
 | 
						|
  } else if (writable && !wState) {
 | 
						|
    // legacy streams
 | 
						|
    stream.on('end', onlegacyfinish)
 | 
						|
    stream.on('close', onlegacyfinish)
 | 
						|
  } // Not all streams will emit 'close' after 'aborted'.
 | 
						|
 | 
						|
  if (!willEmitClose && typeof stream.aborted === 'boolean') {
 | 
						|
    stream.on('aborted', onclose)
 | 
						|
  }
 | 
						|
 | 
						|
  stream.on('end', onend)
 | 
						|
  stream.on('finish', onfinish)
 | 
						|
 | 
						|
  if (options.error !== false) {
 | 
						|
    stream.on('error', onerror)
 | 
						|
  }
 | 
						|
 | 
						|
  stream.on('close', onclose)
 | 
						|
 | 
						|
  if (closed) {
 | 
						|
    process.nextTick(onclose)
 | 
						|
  } else if (
 | 
						|
    (wState !== null && wState !== undefined && wState.errorEmitted) ||
 | 
						|
    (rState !== null && rState !== undefined && rState.errorEmitted)
 | 
						|
  ) {
 | 
						|
    if (!willEmitClose) {
 | 
						|
      process.nextTick(onclose)
 | 
						|
    }
 | 
						|
  } else if (
 | 
						|
    !readable &&
 | 
						|
    (!willEmitClose || isReadable(stream)) &&
 | 
						|
    (writableFinished || isWritable(stream) === false)
 | 
						|
  ) {
 | 
						|
    process.nextTick(onclose)
 | 
						|
  } else if (
 | 
						|
    !writable &&
 | 
						|
    (!willEmitClose || isWritable(stream)) &&
 | 
						|
    (readableFinished || isReadable(stream) === false)
 | 
						|
  ) {
 | 
						|
    process.nextTick(onclose)
 | 
						|
  } else if (rState && stream.req && stream.aborted) {
 | 
						|
    process.nextTick(onclose)
 | 
						|
  }
 | 
						|
 | 
						|
  const cleanup = () => {
 | 
						|
    callback = nop
 | 
						|
    stream.removeListener('aborted', onclose)
 | 
						|
    stream.removeListener('complete', onfinish)
 | 
						|
    stream.removeListener('abort', onclose)
 | 
						|
    stream.removeListener('request', onrequest)
 | 
						|
    if (stream.req) stream.req.removeListener('finish', onfinish)
 | 
						|
    stream.removeListener('end', onlegacyfinish)
 | 
						|
    stream.removeListener('close', onlegacyfinish)
 | 
						|
    stream.removeListener('finish', onfinish)
 | 
						|
    stream.removeListener('end', onend)
 | 
						|
    stream.removeListener('error', onerror)
 | 
						|
    stream.removeListener('close', onclose)
 | 
						|
  }
 | 
						|
 | 
						|
  if (options.signal && !closed) {
 | 
						|
    const abort = () => {
 | 
						|
      // Keep it because cleanup removes it.
 | 
						|
      const endCallback = callback
 | 
						|
      cleanup()
 | 
						|
      endCallback.call(
 | 
						|
        stream,
 | 
						|
        new AbortError(undefined, {
 | 
						|
          cause: options.signal.reason
 | 
						|
        })
 | 
						|
      )
 | 
						|
    }
 | 
						|
 | 
						|
    if (options.signal.aborted) {
 | 
						|
      process.nextTick(abort)
 | 
						|
    } else {
 | 
						|
      const originalCallback = callback
 | 
						|
      callback = once((...args) => {
 | 
						|
        options.signal.removeEventListener('abort', abort)
 | 
						|
        originalCallback.apply(stream, args)
 | 
						|
      })
 | 
						|
      options.signal.addEventListener('abort', abort)
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return cleanup
 | 
						|
}
 | 
						|
 | 
						|
function finished(stream, opts) {
 | 
						|
  return new Promise((resolve, reject) => {
 | 
						|
    eos(stream, opts, (err) => {
 | 
						|
      if (err) {
 | 
						|
        reject(err)
 | 
						|
      } else {
 | 
						|
        resolve()
 | 
						|
      }
 | 
						|
    })
 | 
						|
  })
 | 
						|
}
 | 
						|
 | 
						|
module.exports = eos
 | 
						|
module.exports.finished = finished
 |