mirror of
				https://github.com/advplyr/audiobookshelf.git
				synced 2025-10-30 18:12:25 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			109 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			109 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| 'use strict'
 | |
| 
 | |
| const { PromisePrototypeThen, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials')
 | |
| 
 | |
| const { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } = require('../../ours/errors').codes
 | |
| 
 | |
| function from(Readable, iterable, opts) {
 | |
|   let iterator
 | |
| 
 | |
|   if (typeof iterable === 'string' || iterable instanceof Buffer) {
 | |
|     return new Readable({
 | |
|       objectMode: true,
 | |
|       ...opts,
 | |
| 
 | |
|       read() {
 | |
|         this.push(iterable)
 | |
|         this.push(null)
 | |
|       }
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   let isAsync
 | |
| 
 | |
|   if (iterable && iterable[SymbolAsyncIterator]) {
 | |
|     isAsync = true
 | |
|     iterator = iterable[SymbolAsyncIterator]()
 | |
|   } else if (iterable && iterable[SymbolIterator]) {
 | |
|     isAsync = false
 | |
|     iterator = iterable[SymbolIterator]()
 | |
|   } else {
 | |
|     throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable)
 | |
|   }
 | |
| 
 | |
|   const readable = new Readable({
 | |
|     objectMode: true,
 | |
|     highWaterMark: 1,
 | |
|     // TODO(ronag): What options should be allowed?
 | |
|     ...opts
 | |
|   }) // Flag to protect against _read
 | |
|   // being called before last iteration completion.
 | |
| 
 | |
|   let reading = false
 | |
| 
 | |
|   readable._read = function () {
 | |
|     if (!reading) {
 | |
|       reading = true
 | |
|       next()
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   readable._destroy = function (error, cb) {
 | |
|     PromisePrototypeThen(
 | |
|       close(error),
 | |
|       () => process.nextTick(cb, error), // nextTick is here in case cb throws
 | |
|       (e) => process.nextTick(cb, e || error)
 | |
|     )
 | |
|   }
 | |
| 
 | |
|   async function close(error) {
 | |
|     const hadError = error !== undefined && error !== null
 | |
|     const hasThrow = typeof iterator.throw === 'function'
 | |
| 
 | |
|     if (hadError && hasThrow) {
 | |
|       const { value, done } = await iterator.throw(error)
 | |
|       await value
 | |
| 
 | |
|       if (done) {
 | |
|         return
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if (typeof iterator.return === 'function') {
 | |
|       const { value } = await iterator.return()
 | |
|       await value
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   async function next() {
 | |
|     for (;;) {
 | |
|       try {
 | |
|         const { value, done } = isAsync ? await iterator.next() : iterator.next()
 | |
| 
 | |
|         if (done) {
 | |
|           readable.push(null)
 | |
|         } else {
 | |
|           const res = value && typeof value.then === 'function' ? await value : value
 | |
| 
 | |
|           if (res === null) {
 | |
|             reading = false
 | |
|             throw new ERR_STREAM_NULL_VALUES()
 | |
|           } else if (readable.push(res)) {
 | |
|             continue
 | |
|           } else {
 | |
|             reading = false
 | |
|           }
 | |
|         }
 | |
|       } catch (err) {
 | |
|         readable.destroy(err)
 | |
|       }
 | |
| 
 | |
|       break
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return readable
 | |
| }
 | |
| 
 | |
| module.exports = from
 |