mirror of
				https://github.com/advplyr/audiobookshelf.git
				synced 2025-11-04 03:17:00 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			231 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			231 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
const Stream = require('../Stream')
 | 
						|
// const StreamTest = require('./test/StreamTest')
 | 
						|
const Logger = require('../../Logger')
 | 
						|
const fs = require('fs-extra')
 | 
						|
const Path = require('path')
 | 
						|
 | 
						|
class StreamManager {
 | 
						|
  constructor(db, emitter, clientEmitter) {
 | 
						|
    this.db = db
 | 
						|
 | 
						|
    this.emitter = emitter
 | 
						|
    this.clientEmitter = clientEmitter
 | 
						|
 | 
						|
    this.streams = []
 | 
						|
    this.StreamsPath = Path.join(global.MetadataPath, 'streams')
 | 
						|
  }
 | 
						|
 | 
						|
  getStream(streamId) {
 | 
						|
    return this.streams.find(s => s.id === streamId)
 | 
						|
  }
 | 
						|
 | 
						|
  removeStream(stream) {
 | 
						|
    this.streams = this.streams.filter(s => s.id !== stream.id)
 | 
						|
  }
 | 
						|
 | 
						|
  async openStream(client, libraryItem, transcodeOptions = {}) {
 | 
						|
    if (!client || !client.user) {
 | 
						|
      Logger.error('[StreamManager] Cannot open stream invalid client', client)
 | 
						|
      return
 | 
						|
    }
 | 
						|
    var stream = new Stream(this.StreamsPath, client, libraryItem, transcodeOptions)
 | 
						|
 | 
						|
    stream.on('closed', () => {
 | 
						|
      this.removeStream(stream)
 | 
						|
    })
 | 
						|
 | 
						|
    this.streams.push(stream)
 | 
						|
 | 
						|
    await stream.generatePlaylist()
 | 
						|
    stream.start()
 | 
						|
 | 
						|
    Logger.info('Stream Opened for client', client.user.username, 'for item', stream.itemTitle, 'with streamId', stream.id)
 | 
						|
 | 
						|
    client.stream = stream
 | 
						|
    client.user.stream = stream.id
 | 
						|
 | 
						|
    return stream
 | 
						|
  }
 | 
						|
 | 
						|
  ensureStreamsDir() {
 | 
						|
    return fs.ensureDir(this.StreamsPath)
 | 
						|
  }
 | 
						|
 | 
						|
  removeOrphanStreamFiles(streamId) {
 | 
						|
    try {
 | 
						|
      var StreamsPath = Path.join(this.StreamsPath, streamId)
 | 
						|
      return fs.remove(StreamsPath)
 | 
						|
    } catch (error) {
 | 
						|
      Logger.debug('No orphan stream', streamId)
 | 
						|
      return false
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  async removeOrphanStreams() {
 | 
						|
    try {
 | 
						|
      var dirs = await fs.readdir(this.StreamsPath)
 | 
						|
      if (!dirs || !dirs.length) return true
 | 
						|
 | 
						|
      await Promise.all(dirs.map(async (dirname) => {
 | 
						|
        var fullPath = Path.join(this.StreamsPath, dirname)
 | 
						|
        Logger.info(`Removing Orphan Stream ${dirname}`)
 | 
						|
        return fs.remove(fullPath)
 | 
						|
      }))
 | 
						|
      return true
 | 
						|
    } catch (error) {
 | 
						|
      Logger.debug('No orphan stream', error)
 | 
						|
      return false
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  async openStreamApiRequest(res, user, libraryItem) {
 | 
						|
    Logger.info(`[StreamManager] User "${user.username}" open stream request for "${libraryItem.media.metadata.title}"`)
 | 
						|
    var client = {
 | 
						|
      user
 | 
						|
    }
 | 
						|
    var stream = await this.openStream(client, libraryItem)
 | 
						|
    this.db.updateUserStream(client.user.id, stream.id)
 | 
						|
 | 
						|
    res.json({
 | 
						|
      libraryItemId: libraryItem.id,
 | 
						|
      startTime: stream.startTime,
 | 
						|
      streamId: stream.id,
 | 
						|
      streamUrl: stream.clientPlaylistUri
 | 
						|
    })
 | 
						|
  }
 | 
						|
 | 
						|
  async openStreamSocketRequest(socket, libraryItemId) {
 | 
						|
    Logger.info('[StreamManager] Open Stream Request', socket.id, libraryItemId)
 | 
						|
    var libraryItem = this.db.libraryItems.find(li => li.id === libraryItemId)
 | 
						|
    var client = socket.sheepClient
 | 
						|
 | 
						|
    if (client.stream) {
 | 
						|
      Logger.info('Closing client stream first', client.stream.id)
 | 
						|
      await client.stream.close()
 | 
						|
      client.user.stream = null
 | 
						|
      client.stream = null
 | 
						|
    }
 | 
						|
 | 
						|
    var stream = await this.openStream(client, libraryItem)
 | 
						|
    this.db.updateUserStream(client.user.id, stream.id)
 | 
						|
  }
 | 
						|
 | 
						|
  async closeStreamRequest(socket) {
 | 
						|
    Logger.info('Close Stream Request', socket.id)
 | 
						|
    var client = socket.sheepClient
 | 
						|
    if (!client || !client.stream) {
 | 
						|
      Logger.error('No stream for client', (client && client.user) ? client.user.username : 'No Client')
 | 
						|
      client.socket.emit('stream_closed', 'n/a')
 | 
						|
      return
 | 
						|
    }
 | 
						|
    // var streamId = client.stream.id
 | 
						|
    await client.stream.close()
 | 
						|
    client.user.stream = null
 | 
						|
    client.stream = null
 | 
						|
    this.db.updateUserStream(client.user.id, null)
 | 
						|
  }
 | 
						|
 | 
						|
  async closeStreamApiRequest(userId, streamId) {
 | 
						|
    Logger.info('[StreamManager] Close Stream Api Request', streamId)
 | 
						|
 | 
						|
    var stream = this.streams.find(s => s.id === streamId)
 | 
						|
    if (!stream) {
 | 
						|
      Logger.warn('[StreamManager] Stream not found', streamId)
 | 
						|
      return
 | 
						|
    }
 | 
						|
 | 
						|
    if (!stream.client || !stream.client.user || stream.client.user.id !== userId) {
 | 
						|
      Logger.warn(`[StreamManager] Stream close request from invalid user ${userId}`, stream.client)
 | 
						|
      return
 | 
						|
    }
 | 
						|
 | 
						|
    stream.client.user.stream = null
 | 
						|
    stream.client.stream = null
 | 
						|
    this.db.updateUserStream(stream.client.user.id, null)
 | 
						|
 | 
						|
    await stream.close()
 | 
						|
 | 
						|
    this.streams = this.streams.filter(s => s.id !== streamId)
 | 
						|
    Logger.info(`[StreamManager] Stream ${streamId} closed via API request by ${userId}`)
 | 
						|
  }
 | 
						|
 | 
						|
  streamSync(socket, syncData) {
 | 
						|
    const client = socket.sheepClient
 | 
						|
    if (!client || !client.stream) {
 | 
						|
      Logger.error('[StreamManager] streamSync: No stream for client', (client && client.user) ? client.user.id : 'No Client')
 | 
						|
      return
 | 
						|
    }
 | 
						|
    if (client.stream.id !== syncData.streamId) {
 | 
						|
      Logger.error('[StreamManager] streamSync: Stream id mismatch on stream update', syncData.streamId, client.stream.id)
 | 
						|
      return
 | 
						|
    }
 | 
						|
    if (!client.user) {
 | 
						|
      Logger.error('[StreamManager] streamSync: No User for client', client)
 | 
						|
      return
 | 
						|
    }
 | 
						|
    // const { timeListened, currentTime, streamId } = syncData
 | 
						|
    var listeningSession = client.stream.syncStream(syncData)
 | 
						|
 | 
						|
    if (listeningSession && listeningSession.timeListening > 0) {
 | 
						|
      // Save listening session
 | 
						|
      var existingListeningSession = this.db.sessions.find(s => s.id === listeningSession.id)
 | 
						|
      if (existingListeningSession) {
 | 
						|
        this.db.updateEntity('session', listeningSession)
 | 
						|
      } else {
 | 
						|
        this.db.sessions.push(listeningSession.toJSON()) // Insert right away to prevent duplicate session
 | 
						|
        this.db.insertEntity('session', listeningSession)
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    var userAudiobook = client.user.updateAudiobookProgressFromStream(client.stream)
 | 
						|
    this.db.updateEntity('user', client.user)
 | 
						|
 | 
						|
    if (userAudiobook) {
 | 
						|
      this.clientEmitter(client.user.id, 'current_user_audiobook_update', {
 | 
						|
        id: userAudiobook.audiobookId,
 | 
						|
        data: userAudiobook.toJSON()
 | 
						|
      })
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  streamSyncFromApi(req, res) {
 | 
						|
    var user = req.user
 | 
						|
    var syncData = req.body
 | 
						|
 | 
						|
    var stream = this.streams.find(s => s.id === syncData.streamId)
 | 
						|
    if (!stream) {
 | 
						|
      Logger.error(`[StreamManager] streamSyncFromApi stream not found ${syncData.streamId}`)
 | 
						|
      return res.status(404).send('Stream not found')
 | 
						|
    }
 | 
						|
    if (stream.userToken !== user.token) {
 | 
						|
      Logger.error(`[StreamManager] streamSyncFromApi Invalid stream not owned by user`)
 | 
						|
      return res.status(500).send('Invalid stream auth')
 | 
						|
    }
 | 
						|
 | 
						|
    var listeningSession = stream.syncStream(syncData)
 | 
						|
 | 
						|
    if (listeningSession && listeningSession.timeListening > 0) {
 | 
						|
      // Save listening session
 | 
						|
      var existingListeningSession = this.db.sessions.find(s => s.id === listeningSession.id)
 | 
						|
      if (existingListeningSession) {
 | 
						|
        this.db.updateEntity('session', listeningSession)
 | 
						|
      } else {
 | 
						|
        this.db.sessions.push(listeningSession.toJSON()) // Insert right away to prevent duplicate session
 | 
						|
        this.db.insertEntity('session', listeningSession)
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    var userAudiobook = user.updateAudiobookProgressFromStream(stream)
 | 
						|
    this.db.updateEntity('user', user)
 | 
						|
 | 
						|
    if (userAudiobook) {
 | 
						|
      this.clientEmitter(user.id, 'current_user_audiobook_update', {
 | 
						|
        id: userAudiobook.audiobookId,
 | 
						|
        data: userAudiobook.toJSON()
 | 
						|
      })
 | 
						|
    }
 | 
						|
 | 
						|
    res.sendStatus(200)
 | 
						|
  }
 | 
						|
}
 | 
						|
module.exports = StreamManager |