mirror of
				https://github.com/advplyr/audiobookshelf.git
				synced 2025-10-30 18:12:25 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			423 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			423 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| 'use strict'
 | |
| 
 | |
| const abortControllerModule = require('../../../../../watcher/aborter/controller')
 | |
| 
 | |
| const bufferModule = require('buffer')
 | |
| 
 | |
| const {
 | |
|   isReadable,
 | |
|   isWritable,
 | |
|   isIterable,
 | |
|   isNodeStream,
 | |
|   isReadableNodeStream,
 | |
|   isWritableNodeStream,
 | |
|   isDuplexNodeStream
 | |
| } = require('./utils')
 | |
| 
 | |
| const eos = require('./end-of-stream')
 | |
| 
 | |
| const {
 | |
|   AbortError,
 | |
|   codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE }
 | |
| } = require('../../ours/errors')
 | |
| 
 | |
| const { destroyer } = require('./destroy')
 | |
| 
 | |
| const Duplex = require('./duplex')
 | |
| 
 | |
| const Readable = require('./readable')
 | |
| 
 | |
| const { createDeferredPromise } = require('../../ours/util')
 | |
| 
 | |
| const from = require('./from')
 | |
| 
 | |
| const Blob = globalThis.Blob || bufferModule.Blob
 | |
| const isBlob =
 | |
|   typeof Blob !== 'undefined'
 | |
|     ? function isBlob(b) {
 | |
|       return b instanceof Blob
 | |
|     }
 | |
|     : function isBlob(b) {
 | |
|       return false
 | |
|     }
 | |
| const AbortController = globalThis.AbortController || abortControllerModule.AbortController
 | |
| 
 | |
| const { FunctionPrototypeCall } = require('../../ours/primordials') // This is needed for pre node 17.
 | |
| 
 | |
| class Duplexify extends Duplex {
 | |
|   constructor(options) {
 | |
|     super(options) // https://github.com/nodejs/node/pull/34385
 | |
| 
 | |
|     if ((options === null || options === undefined ? undefined : options.readable) === false) {
 | |
|       this._readableState.readable = false
 | |
|       this._readableState.ended = true
 | |
|       this._readableState.endEmitted = true
 | |
|     }
 | |
| 
 | |
|     if ((options === null || options === undefined ? undefined : options.writable) === false) {
 | |
|       this._writableState.writable = false
 | |
|       this._writableState.ending = true
 | |
|       this._writableState.ended = true
 | |
|       this._writableState.finished = true
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| module.exports = function duplexify(body, name) {
 | |
|   if (isDuplexNodeStream(body)) {
 | |
|     return body
 | |
|   }
 | |
| 
 | |
|   if (isReadableNodeStream(body)) {
 | |
|     return _duplexify({
 | |
|       readable: body
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   if (isWritableNodeStream(body)) {
 | |
|     return _duplexify({
 | |
|       writable: body
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   if (isNodeStream(body)) {
 | |
|     return _duplexify({
 | |
|       writable: false,
 | |
|       readable: false
 | |
|     })
 | |
|   } // TODO: Webstreams
 | |
|   // if (isReadableStream(body)) {
 | |
|   //   return _duplexify({ readable: Readable.fromWeb(body) });
 | |
|   // }
 | |
|   // TODO: Webstreams
 | |
|   // if (isWritableStream(body)) {
 | |
|   //   return _duplexify({ writable: Writable.fromWeb(body) });
 | |
|   // }
 | |
| 
 | |
|   if (typeof body === 'function') {
 | |
|     const { value, write, final, destroy } = fromAsyncGen(body)
 | |
| 
 | |
|     if (isIterable(value)) {
 | |
|       return from(Duplexify, value, {
 | |
|         // TODO (ronag): highWaterMark?
 | |
|         objectMode: true,
 | |
|         write,
 | |
|         final,
 | |
|         destroy
 | |
|       })
 | |
|     }
 | |
| 
 | |
|     const then = value === null || value === undefined ? undefined : value.then
 | |
| 
 | |
|     if (typeof then === 'function') {
 | |
|       let d
 | |
|       const promise = FunctionPrototypeCall(
 | |
|         then,
 | |
|         value,
 | |
|         (val) => {
 | |
|           if (val != null) {
 | |
|             throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val)
 | |
|           }
 | |
|         },
 | |
|         (err) => {
 | |
|           destroyer(d, err)
 | |
|         }
 | |
|       )
 | |
|       return (d = new Duplexify({
 | |
|         // TODO (ronag): highWaterMark?
 | |
|         objectMode: true,
 | |
|         readable: false,
 | |
|         write,
 | |
| 
 | |
|         final(cb) {
 | |
|           final(async () => {
 | |
|             try {
 | |
|               await promise
 | |
|               process.nextTick(cb, null)
 | |
|             } catch (err) {
 | |
|               process.nextTick(cb, err)
 | |
|             }
 | |
|           })
 | |
|         },
 | |
| 
 | |
|         destroy
 | |
|       }))
 | |
|     }
 | |
| 
 | |
|     throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or AsyncFunction', name, value)
 | |
|   }
 | |
| 
 | |
|   if (isBlob(body)) {
 | |
|     return duplexify(body.arrayBuffer())
 | |
|   }
 | |
| 
 | |
|   if (isIterable(body)) {
 | |
|     return from(Duplexify, body, {
 | |
|       // TODO (ronag): highWaterMark?
 | |
|       objectMode: true,
 | |
|       writable: false
 | |
|     })
 | |
|   } // TODO: Webstreams.
 | |
|   // if (
 | |
|   //   isReadableStream(body?.readable) &&
 | |
|   //   isWritableStream(body?.writable)
 | |
|   // ) {
 | |
|   //   return Duplexify.fromWeb(body);
 | |
|   // }
 | |
| 
 | |
|   if (
 | |
|     typeof (body === null || body === undefined ? undefined : body.writable) === 'object' ||
 | |
|     typeof (body === null || body === undefined ? undefined : body.readable) === 'object'
 | |
|   ) {
 | |
|     const readable =
 | |
|       body !== null && body !== undefined && body.readable
 | |
|         ? isReadableNodeStream(body === null || body === undefined ? undefined : body.readable)
 | |
|           ? body === null || body === undefined
 | |
|             ? undefined
 | |
|             : body.readable
 | |
|           : duplexify(body.readable)
 | |
|         : undefined
 | |
|     const writable =
 | |
|       body !== null && body !== undefined && body.writable
 | |
|         ? isWritableNodeStream(body === null || body === undefined ? undefined : body.writable)
 | |
|           ? body === null || body === undefined
 | |
|             ? undefined
 | |
|             : body.writable
 | |
|           : duplexify(body.writable)
 | |
|         : undefined
 | |
|     return _duplexify({
 | |
|       readable,
 | |
|       writable
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   const then = body === null || body === undefined ? undefined : body.then
 | |
| 
 | |
|   if (typeof then === 'function') {
 | |
|     let d
 | |
|     FunctionPrototypeCall(
 | |
|       then,
 | |
|       body,
 | |
|       (val) => {
 | |
|         if (val != null) {
 | |
|           d.push(val)
 | |
|         }
 | |
| 
 | |
|         d.push(null)
 | |
|       },
 | |
|       (err) => {
 | |
|         destroyer(d, err)
 | |
|       }
 | |
|     )
 | |
|     return (d = new Duplexify({
 | |
|       objectMode: true,
 | |
|       writable: false,
 | |
| 
 | |
|       read() { }
 | |
|     }))
 | |
|   }
 | |
| 
 | |
|   throw new ERR_INVALID_ARG_TYPE(
 | |
|     name,
 | |
|     [
 | |
|       'Blob',
 | |
|       'ReadableStream',
 | |
|       'WritableStream',
 | |
|       'Stream',
 | |
|       'Iterable',
 | |
|       'AsyncIterable',
 | |
|       'Function',
 | |
|       '{ readable, writable } pair',
 | |
|       'Promise'
 | |
|     ],
 | |
|     body
 | |
|   )
 | |
| }
 | |
| 
 | |
| function fromAsyncGen(fn) {
 | |
|   let { promise, resolve } = createDeferredPromise()
 | |
|   const ac = new AbortController()
 | |
|   const signal = ac.signal
 | |
|   const value = fn(
 | |
|     (async function* () {
 | |
|       while (true) {
 | |
|         const _promise = promise
 | |
|         promise = null
 | |
|         const { chunk, done, cb } = await _promise
 | |
|         process.nextTick(cb)
 | |
|         if (done) return
 | |
|         if (signal.aborted)
 | |
|           throw new AbortError(undefined, {
 | |
|             cause: signal.reason
 | |
|           })
 | |
|           ; ({ promise, resolve } = createDeferredPromise())
 | |
|         yield chunk
 | |
|       }
 | |
|     })(),
 | |
|     {
 | |
|       signal
 | |
|     }
 | |
|   )
 | |
|   return {
 | |
|     value,
 | |
| 
 | |
|     write(chunk, encoding, cb) {
 | |
|       const _resolve = resolve
 | |
|       resolve = null
 | |
| 
 | |
|       _resolve({
 | |
|         chunk,
 | |
|         done: false,
 | |
|         cb
 | |
|       })
 | |
|     },
 | |
| 
 | |
|     final(cb) {
 | |
|       const _resolve = resolve
 | |
|       resolve = null
 | |
| 
 | |
|       _resolve({
 | |
|         done: true,
 | |
|         cb
 | |
|       })
 | |
|     },
 | |
| 
 | |
|     destroy(err, cb) {
 | |
|       ac.abort()
 | |
|       cb(err)
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| function _duplexify(pair) {
 | |
|   const r = pair.readable && typeof pair.readable.read !== 'function' ? Readable.wrap(pair.readable) : pair.readable
 | |
|   const w = pair.writable
 | |
|   let readable = !!isReadable(r)
 | |
|   let writable = !!isWritable(w)
 | |
|   let ondrain
 | |
|   let onfinish
 | |
|   let onreadable
 | |
|   let onclose
 | |
|   let d
 | |
| 
 | |
|   function onfinished(err) {
 | |
|     const cb = onclose
 | |
|     onclose = null
 | |
| 
 | |
|     if (cb) {
 | |
|       cb(err)
 | |
|     } else if (err) {
 | |
|       d.destroy(err)
 | |
|     } else if (!readable && !writable) {
 | |
|       d.destroy()
 | |
|     }
 | |
|   } // TODO(ronag): Avoid double buffering.
 | |
|   // Implement Writable/Readable/Duplex traits.
 | |
|   // See, https://github.com/nodejs/node/pull/33515.
 | |
| 
 | |
|   d = new Duplexify({
 | |
|     // TODO (ronag): highWaterMark?
 | |
|     readableObjectMode: !!(r !== null && r !== undefined && r.readableObjectMode),
 | |
|     writableObjectMode: !!(w !== null && w !== undefined && w.writableObjectMode),
 | |
|     readable,
 | |
|     writable
 | |
|   })
 | |
| 
 | |
|   if (writable) {
 | |
|     eos(w, (err) => {
 | |
|       writable = false
 | |
| 
 | |
|       if (err) {
 | |
|         destroyer(r, err)
 | |
|       }
 | |
| 
 | |
|       onfinished(err)
 | |
|     })
 | |
| 
 | |
|     d._write = function (chunk, encoding, callback) {
 | |
|       if (w.write(chunk, encoding)) {
 | |
|         callback()
 | |
|       } else {
 | |
|         ondrain = callback
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     d._final = function (callback) {
 | |
|       w.end()
 | |
|       onfinish = callback
 | |
|     }
 | |
| 
 | |
|     w.on('drain', function () {
 | |
|       if (ondrain) {
 | |
|         const cb = ondrain
 | |
|         ondrain = null
 | |
|         cb()
 | |
|       }
 | |
|     })
 | |
|     w.on('finish', function () {
 | |
|       if (onfinish) {
 | |
|         const cb = onfinish
 | |
|         onfinish = null
 | |
|         cb()
 | |
|       }
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   if (readable) {
 | |
|     eos(r, (err) => {
 | |
|       readable = false
 | |
| 
 | |
|       if (err) {
 | |
|         destroyer(r, err)
 | |
|       }
 | |
| 
 | |
|       onfinished(err)
 | |
|     })
 | |
|     r.on('readable', function () {
 | |
|       if (onreadable) {
 | |
|         const cb = onreadable
 | |
|         onreadable = null
 | |
|         cb()
 | |
|       }
 | |
|     })
 | |
|     r.on('end', function () {
 | |
|       d.push(null)
 | |
|     })
 | |
| 
 | |
|     d._read = function () {
 | |
|       while (true) {
 | |
|         const buf = r.read()
 | |
| 
 | |
|         if (buf === null) {
 | |
|           onreadable = d._read
 | |
|           return
 | |
|         }
 | |
| 
 | |
|         if (!d.push(buf)) {
 | |
|           return
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   d._destroy = function (err, callback) {
 | |
|     if (!err && onclose !== null) {
 | |
|       err = new AbortError()
 | |
|     }
 | |
| 
 | |
|     onreadable = null
 | |
|     ondrain = null
 | |
|     onfinish = null
 | |
| 
 | |
|     if (onclose === null) {
 | |
|       callback(err)
 | |
|     } else {
 | |
|       onclose = callback
 | |
|       destroyer(w, err)
 | |
|       destroyer(r, err)
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return d
 | |
| }
 |