mirror of
				https://github.com/advplyr/audiobookshelf.git
				synced 2025-10-27 00:32:24 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			259 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			259 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| // Ported from https://github.com/mafintosh/end-of-stream with
 | |
| // permission from the author, Mathias Buus (@mafintosh).
 | |
| 'use strict'
 | |
| 
 | |
| const { AbortError, codes } = require('../../ours/errors')
 | |
| 
 | |
| const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes
 | |
| 
 | |
| const { once } = require('../../ours/util')
 | |
| 
 | |
| const { validateAbortSignal, validateFunction, validateObject } = require('../validators')
 | |
| 
 | |
| const { Promise } = require('../../ours/primordials')
 | |
| 
 | |
| const {
 | |
|   isClosed,
 | |
|   isReadable,
 | |
|   isReadableNodeStream,
 | |
|   isReadableFinished,
 | |
|   isReadableErrored,
 | |
|   isWritable,
 | |
|   isWritableNodeStream,
 | |
|   isWritableFinished,
 | |
|   isWritableErrored,
 | |
|   isNodeStream,
 | |
|   willEmitClose: _willEmitClose
 | |
| } = require('./utils')
 | |
| 
 | |
| function isRequest(stream) {
 | |
|   return stream.setHeader && typeof stream.abort === 'function'
 | |
| }
 | |
| 
 | |
| const nop = () => {}
 | |
| 
 | |
| function eos(stream, options, callback) {
 | |
|   var _options$readable, _options$writable
 | |
| 
 | |
|   if (arguments.length === 2) {
 | |
|     callback = options
 | |
|     options = {}
 | |
|   } else if (options == null) {
 | |
|     options = {}
 | |
|   } else {
 | |
|     validateObject(options, 'options')
 | |
|   }
 | |
| 
 | |
|   validateFunction(callback, 'callback')
 | |
|   validateAbortSignal(options.signal, 'options.signal')
 | |
|   callback = once(callback)
 | |
|   const readable =
 | |
|     (_options$readable = options.readable) !== null && _options$readable !== undefined
 | |
|       ? _options$readable
 | |
|       : isReadableNodeStream(stream)
 | |
|   const writable =
 | |
|     (_options$writable = options.writable) !== null && _options$writable !== undefined
 | |
|       ? _options$writable
 | |
|       : isWritableNodeStream(stream)
 | |
| 
 | |
|   if (!isNodeStream(stream)) {
 | |
|     // TODO: Webstreams.
 | |
|     throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream)
 | |
|   }
 | |
| 
 | |
|   const wState = stream._writableState
 | |
|   const rState = stream._readableState
 | |
| 
 | |
|   const onlegacyfinish = () => {
 | |
|     if (!stream.writable) {
 | |
|       onfinish()
 | |
|     }
 | |
|   } // TODO (ronag): Improve soft detection to include core modules and
 | |
|   // common ecosystem modules that do properly emit 'close' but fail
 | |
|   // this generic check.
 | |
| 
 | |
|   let willEmitClose =
 | |
|     _willEmitClose(stream) && isReadableNodeStream(stream) === readable && isWritableNodeStream(stream) === writable
 | |
|   let writableFinished = isWritableFinished(stream, false)
 | |
| 
 | |
|   const onfinish = () => {
 | |
|     writableFinished = true // Stream should not be destroyed here. If it is that
 | |
|     // means that user space is doing something differently and
 | |
|     // we cannot trust willEmitClose.
 | |
| 
 | |
|     if (stream.destroyed) {
 | |
|       willEmitClose = false
 | |
|     }
 | |
| 
 | |
|     if (willEmitClose && (!stream.readable || readable)) {
 | |
|       return
 | |
|     }
 | |
| 
 | |
|     if (!readable || readableFinished) {
 | |
|       callback.call(stream)
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   let readableFinished = isReadableFinished(stream, false)
 | |
| 
 | |
|   const onend = () => {
 | |
|     readableFinished = true // Stream should not be destroyed here. If it is that
 | |
|     // means that user space is doing something differently and
 | |
|     // we cannot trust willEmitClose.
 | |
| 
 | |
|     if (stream.destroyed) {
 | |
|       willEmitClose = false
 | |
|     }
 | |
| 
 | |
|     if (willEmitClose && (!stream.writable || writable)) {
 | |
|       return
 | |
|     }
 | |
| 
 | |
|     if (!writable || writableFinished) {
 | |
|       callback.call(stream)
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   const onerror = (err) => {
 | |
|     callback.call(stream, err)
 | |
|   }
 | |
| 
 | |
|   let closed = isClosed(stream)
 | |
| 
 | |
|   const onclose = () => {
 | |
|     closed = true
 | |
|     const errored = isWritableErrored(stream) || isReadableErrored(stream)
 | |
| 
 | |
|     if (errored && typeof errored !== 'boolean') {
 | |
|       return callback.call(stream, errored)
 | |
|     }
 | |
| 
 | |
|     if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
 | |
|       if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
 | |
|     }
 | |
| 
 | |
|     if (writable && !writableFinished) {
 | |
|       if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
 | |
|     }
 | |
| 
 | |
|     callback.call(stream)
 | |
|   }
 | |
| 
 | |
|   const onrequest = () => {
 | |
|     stream.req.on('finish', onfinish)
 | |
|   }
 | |
| 
 | |
|   if (isRequest(stream)) {
 | |
|     stream.on('complete', onfinish)
 | |
| 
 | |
|     if (!willEmitClose) {
 | |
|       stream.on('abort', onclose)
 | |
|     }
 | |
| 
 | |
|     if (stream.req) {
 | |
|       onrequest()
 | |
|     } else {
 | |
|       stream.on('request', onrequest)
 | |
|     }
 | |
|   } else if (writable && !wState) {
 | |
|     // legacy streams
 | |
|     stream.on('end', onlegacyfinish)
 | |
|     stream.on('close', onlegacyfinish)
 | |
|   } // Not all streams will emit 'close' after 'aborted'.
 | |
| 
 | |
|   if (!willEmitClose && typeof stream.aborted === 'boolean') {
 | |
|     stream.on('aborted', onclose)
 | |
|   }
 | |
| 
 | |
|   stream.on('end', onend)
 | |
|   stream.on('finish', onfinish)
 | |
| 
 | |
|   if (options.error !== false) {
 | |
|     stream.on('error', onerror)
 | |
|   }
 | |
| 
 | |
|   stream.on('close', onclose)
 | |
| 
 | |
|   if (closed) {
 | |
|     process.nextTick(onclose)
 | |
|   } else if (
 | |
|     (wState !== null && wState !== undefined && wState.errorEmitted) ||
 | |
|     (rState !== null && rState !== undefined && rState.errorEmitted)
 | |
|   ) {
 | |
|     if (!willEmitClose) {
 | |
|       process.nextTick(onclose)
 | |
|     }
 | |
|   } else if (
 | |
|     !readable &&
 | |
|     (!willEmitClose || isReadable(stream)) &&
 | |
|     (writableFinished || isWritable(stream) === false)
 | |
|   ) {
 | |
|     process.nextTick(onclose)
 | |
|   } else if (
 | |
|     !writable &&
 | |
|     (!willEmitClose || isWritable(stream)) &&
 | |
|     (readableFinished || isReadable(stream) === false)
 | |
|   ) {
 | |
|     process.nextTick(onclose)
 | |
|   } else if (rState && stream.req && stream.aborted) {
 | |
|     process.nextTick(onclose)
 | |
|   }
 | |
| 
 | |
|   const cleanup = () => {
 | |
|     callback = nop
 | |
|     stream.removeListener('aborted', onclose)
 | |
|     stream.removeListener('complete', onfinish)
 | |
|     stream.removeListener('abort', onclose)
 | |
|     stream.removeListener('request', onrequest)
 | |
|     if (stream.req) stream.req.removeListener('finish', onfinish)
 | |
|     stream.removeListener('end', onlegacyfinish)
 | |
|     stream.removeListener('close', onlegacyfinish)
 | |
|     stream.removeListener('finish', onfinish)
 | |
|     stream.removeListener('end', onend)
 | |
|     stream.removeListener('error', onerror)
 | |
|     stream.removeListener('close', onclose)
 | |
|   }
 | |
| 
 | |
|   if (options.signal && !closed) {
 | |
|     const abort = () => {
 | |
|       // Keep it because cleanup removes it.
 | |
|       const endCallback = callback
 | |
|       cleanup()
 | |
|       endCallback.call(
 | |
|         stream,
 | |
|         new AbortError(undefined, {
 | |
|           cause: options.signal.reason
 | |
|         })
 | |
|       )
 | |
|     }
 | |
| 
 | |
|     if (options.signal.aborted) {
 | |
|       process.nextTick(abort)
 | |
|     } else {
 | |
|       const originalCallback = callback
 | |
|       callback = once((...args) => {
 | |
|         options.signal.removeEventListener('abort', abort)
 | |
|         originalCallback.apply(stream, args)
 | |
|       })
 | |
|       options.signal.addEventListener('abort', abort)
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return cleanup
 | |
| }
 | |
| 
 | |
| function finished(stream, opts) {
 | |
|   return new Promise((resolve, reject) => {
 | |
|     eos(stream, opts, (err) => {
 | |
|       if (err) {
 | |
|         reject(err)
 | |
|       } else {
 | |
|         resolve()
 | |
|       }
 | |
|     })
 | |
|   })
 | |
| }
 | |
| 
 | |
| module.exports = eos
 | |
| module.exports.finished = finished
 |