mirror of
				https://github.com/advplyr/audiobookshelf.git
				synced 2025-11-03 19:07:00 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			162 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			162 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
'use strict'
 | 
						|
 | 
						|
const { pipeline } = require('./pipeline')
 | 
						|
 | 
						|
const Duplex = require('./duplex')
 | 
						|
 | 
						|
const { destroyer } = require('./destroy')
 | 
						|
 | 
						|
const { isNodeStream, isReadable, isWritable } = require('./utils')
 | 
						|
 | 
						|
const {
 | 
						|
  AbortError,
 | 
						|
  codes: { ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS }
 | 
						|
} = require('../../ours/errors')
 | 
						|
 | 
						|
module.exports = function compose(...streams) {
 | 
						|
  if (streams.length === 0) {
 | 
						|
    throw new ERR_MISSING_ARGS('streams')
 | 
						|
  }
 | 
						|
 | 
						|
  if (streams.length === 1) {
 | 
						|
    return Duplex.from(streams[0])
 | 
						|
  }
 | 
						|
 | 
						|
  const orgStreams = [...streams]
 | 
						|
 | 
						|
  if (typeof streams[0] === 'function') {
 | 
						|
    streams[0] = Duplex.from(streams[0])
 | 
						|
  }
 | 
						|
 | 
						|
  if (typeof streams[streams.length - 1] === 'function') {
 | 
						|
    const idx = streams.length - 1
 | 
						|
    streams[idx] = Duplex.from(streams[idx])
 | 
						|
  }
 | 
						|
 | 
						|
  for (let n = 0; n < streams.length; ++n) {
 | 
						|
    if (!isNodeStream(streams[n])) {
 | 
						|
      // TODO(ronag): Add checks for non streams.
 | 
						|
      continue
 | 
						|
    }
 | 
						|
 | 
						|
    if (n < streams.length - 1 && !isReadable(streams[n])) {
 | 
						|
      throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be readable')
 | 
						|
    }
 | 
						|
 | 
						|
    if (n > 0 && !isWritable(streams[n])) {
 | 
						|
      throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be writable')
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  let ondrain
 | 
						|
  let onfinish
 | 
						|
  let onreadable
 | 
						|
  let onclose
 | 
						|
  let d
 | 
						|
 | 
						|
  function onfinished(err) {
 | 
						|
    const cb = onclose
 | 
						|
    onclose = null
 | 
						|
 | 
						|
    if (cb) {
 | 
						|
      cb(err)
 | 
						|
    } else if (err) {
 | 
						|
      d.destroy(err)
 | 
						|
    } else if (!readable && !writable) {
 | 
						|
      d.destroy()
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  const head = streams[0]
 | 
						|
  const tail = pipeline(streams, onfinished)
 | 
						|
  const writable = !!isWritable(head)
 | 
						|
  const readable = !!isReadable(tail) // TODO(ronag): Avoid double buffering.
 | 
						|
  // Implement Writable/Readable/Duplex traits.
 | 
						|
  // See, https://github.com/nodejs/node/pull/33515.
 | 
						|
 | 
						|
  d = new Duplex({
 | 
						|
    // TODO (ronag): highWaterMark?
 | 
						|
    writableObjectMode: !!(head !== null && head !== undefined && head.writableObjectMode),
 | 
						|
    readableObjectMode: !!(tail !== null && tail !== undefined && tail.writableObjectMode),
 | 
						|
    writable,
 | 
						|
    readable
 | 
						|
  })
 | 
						|
 | 
						|
  if (writable) {
 | 
						|
    d._write = function (chunk, encoding, callback) {
 | 
						|
      if (head.write(chunk, encoding)) {
 | 
						|
        callback()
 | 
						|
      } else {
 | 
						|
        ondrain = callback
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    d._final = function (callback) {
 | 
						|
      head.end()
 | 
						|
      onfinish = callback
 | 
						|
    }
 | 
						|
 | 
						|
    head.on('drain', function () {
 | 
						|
      if (ondrain) {
 | 
						|
        const cb = ondrain
 | 
						|
        ondrain = null
 | 
						|
        cb()
 | 
						|
      }
 | 
						|
    })
 | 
						|
    tail.on('finish', function () {
 | 
						|
      if (onfinish) {
 | 
						|
        const cb = onfinish
 | 
						|
        onfinish = null
 | 
						|
        cb()
 | 
						|
      }
 | 
						|
    })
 | 
						|
  }
 | 
						|
 | 
						|
  if (readable) {
 | 
						|
    tail.on('readable', function () {
 | 
						|
      if (onreadable) {
 | 
						|
        const cb = onreadable
 | 
						|
        onreadable = null
 | 
						|
        cb()
 | 
						|
      }
 | 
						|
    })
 | 
						|
    tail.on('end', function () {
 | 
						|
      d.push(null)
 | 
						|
    })
 | 
						|
 | 
						|
    d._read = function () {
 | 
						|
      while (true) {
 | 
						|
        const buf = tail.read()
 | 
						|
 | 
						|
        if (buf === null) {
 | 
						|
          onreadable = d._read
 | 
						|
          return
 | 
						|
        }
 | 
						|
 | 
						|
        if (!d.push(buf)) {
 | 
						|
          return
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  d._destroy = function (err, callback) {
 | 
						|
    if (!err && onclose !== null) {
 | 
						|
      err = new AbortError()
 | 
						|
    }
 | 
						|
 | 
						|
    onreadable = null
 | 
						|
    ondrain = null
 | 
						|
    onfinish = null
 | 
						|
 | 
						|
    if (onclose === null) {
 | 
						|
      callback(err)
 | 
						|
    } else {
 | 
						|
      onclose = callback
 | 
						|
      destroyer(tail, err)
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return d
 | 
						|
}
 |