mirror of
				https://github.com/advplyr/audiobookshelf.git
				synced 2025-10-31 10:27:01 -04:00 
			
		
		
		
	Update:Cleanup socket usage & add func for emitting events to admin users
This commit is contained in:
		
							parent
							
								
									e2af33e136
								
							
						
					
					
						commit
						180293ebc1
					
				| @ -378,7 +378,7 @@ export default { | ||||
|       } | ||||
|     }, | ||||
|     streamReady() { | ||||
|       console.log(`[STREAM-CONTAINER] Stream Ready`) | ||||
|       console.log(`[StreamContainer] Stream Ready`) | ||||
|       if (this.$refs.audioPlayer) { | ||||
|         this.$refs.audioPlayer.setStreamReady() | ||||
|       } else { | ||||
|  | ||||
| @ -342,6 +342,7 @@ export default { | ||||
|       this.$root.socket = this.socket | ||||
|       console.log('Socket initialized') | ||||
| 
 | ||||
|       // Pre-defined socket events | ||||
|       this.socket.on('connect', this.connect) | ||||
|       this.socket.on('connect_error', this.connectError) | ||||
|       this.socket.on('disconnect', this.disconnect) | ||||
| @ -350,6 +351,7 @@ export default { | ||||
|       this.socket.io.on('reconnect_error', this.reconnectError) | ||||
|       this.socket.io.on('reconnect_failed', this.reconnectFailed) | ||||
| 
 | ||||
|       // Event received after authorizing socket | ||||
|       this.socket.on('init', this.init) | ||||
| 
 | ||||
|       // Stream Listeners | ||||
|  | ||||
							
								
								
									
										17
									
								
								server/Db.js
									
									
									
									
									
								
							
							
						
						
									
										17
									
								
								server/Db.js
									
									
									
									
									
								
							| @ -258,23 +258,6 @@ class Db { | ||||
|     }) | ||||
|   } | ||||
| 
 | ||||
|   updateUserStream(userId, streamId) { | ||||
|     return this.usersDb.update((record) => record.id === userId, (user) => { | ||||
|       user.stream = streamId | ||||
|       return user | ||||
|     }).then((results) => { | ||||
|       Logger.debug(`[DB] Updated user ${results.updated}`) | ||||
|       this.users = this.users.map(u => { | ||||
|         if (u.id === userId) { | ||||
|           u.stream = streamId | ||||
|         } | ||||
|         return u | ||||
|       }) | ||||
|     }).catch((error) => { | ||||
|       Logger.error(`[DB] Update user Failed ${error}`) | ||||
|     }) | ||||
|   } | ||||
| 
 | ||||
|   updateServerSettings() { | ||||
|     global.ServerSettings = this.serverSettings.toJSON() | ||||
|     return this.updateEntity('settings', this.serverSettings) | ||||
|  | ||||
| @ -100,7 +100,7 @@ class Server { | ||||
|     Logger.info('[Server] Init v' + version) | ||||
|     await this.playbackSessionManager.removeOrphanStreams() | ||||
| 
 | ||||
|     var previousVersion = await this.db.checkPreviousVersion() // Returns null if same server version
 | ||||
|     const previousVersion = await this.db.checkPreviousVersion() // Returns null if same server version
 | ||||
|     if (previousVersion) { | ||||
|       Logger.debug(`[Server] Upgraded from previous version ${previousVersion}`) | ||||
|     } | ||||
| @ -167,13 +167,13 @@ class Server { | ||||
| 
 | ||||
|     // EBook static file routes
 | ||||
|     router.get('/ebook/:library/:folder/*', (req, res) => { | ||||
|       var library = this.db.libraries.find(lib => lib.id === req.params.library) | ||||
|       const library = this.db.libraries.find(lib => lib.id === req.params.library) | ||||
|       if (!library) return res.sendStatus(404) | ||||
|       var folder = library.folders.find(fol => fol.id === req.params.folder) | ||||
|       const folder = library.folders.find(fol => fol.id === req.params.folder) | ||||
|       if (!folder) return res.status(404).send('Folder not found') | ||||
| 
 | ||||
|       var remainingPath = req.params['0'] | ||||
|       var fullPath = Path.join(folder.fullPath, remainingPath) | ||||
|       const remainingPath = req.params['0'] | ||||
|       const fullPath = Path.join(folder.fullPath, remainingPath) | ||||
|       res.sendFile(fullPath) | ||||
|     }) | ||||
| 
 | ||||
| @ -264,15 +264,15 @@ class Server { | ||||
| 
 | ||||
|   // Remove unused /metadata/items/{id} folders
 | ||||
|   async purgeMetadata() { | ||||
|     var itemsMetadata = Path.join(global.MetadataPath, 'items') | ||||
|     const itemsMetadata = Path.join(global.MetadataPath, 'items') | ||||
|     if (!(await fs.pathExists(itemsMetadata))) return | ||||
|     var foldersInItemsMetadata = await fs.readdir(itemsMetadata) | ||||
|     const foldersInItemsMetadata = await fs.readdir(itemsMetadata) | ||||
| 
 | ||||
|     var purged = 0 | ||||
|     let purged = 0 | ||||
|     await Promise.all(foldersInItemsMetadata.map(async foldername => { | ||||
|       var hasMatchingItem = this.db.libraryItems.find(ab => ab.id === foldername) | ||||
|       const hasMatchingItem = this.db.libraryItems.find(ab => ab.id === foldername) | ||||
|       if (!hasMatchingItem) { | ||||
|         var folderPath = Path.join(itemsMetadata, foldername) | ||||
|         const folderPath = Path.join(itemsMetadata, foldername) | ||||
|         Logger.debug(`[Server] Purging unused metadata ${folderPath}`) | ||||
| 
 | ||||
|         await fs.remove(folderPath).then(() => { | ||||
| @ -291,8 +291,8 @@ class Server { | ||||
|   // Remove user media progress with items that no longer exist & remove seriesHideFrom that no longer exist
 | ||||
|   async cleanUserData() { | ||||
|     for (let i = 0; i < this.db.users.length; i++) { | ||||
|       var _user = this.db.users[i] | ||||
|       var hasUpdated = false | ||||
|       const _user = this.db.users[i] | ||||
|       let hasUpdated = false | ||||
|       if (_user.mediaProgress.length) { | ||||
|         const lengthBefore = _user.mediaProgress.length | ||||
|         _user.mediaProgress = _user.mediaProgress.filter(mp => { | ||||
| @ -338,9 +338,10 @@ class Server { | ||||
|   } | ||||
| 
 | ||||
|   logout(req, res) { | ||||
|     var { socketId } = req.body | ||||
|     Logger.info(`[Server] User ${req.user ? req.user.username : 'Unknown'} is logging out with socket ${socketId}`) | ||||
|     SocketAuthority.logout(socketId) | ||||
|     if (req.body.socketId) { | ||||
|       Logger.info(`[Server] User ${req.user ? req.user.username : 'Unknown'} is logging out with socket ${req.body.socketId}`) | ||||
|       SocketAuthority.logout(req.body.socketId) | ||||
|     } | ||||
| 
 | ||||
|     res.sendStatus(200) | ||||
|   } | ||||
|  | ||||
| @ -30,24 +30,37 @@ class SocketAuthority { | ||||
|     return Object.values(this.clients).filter(c => c.user && c.user.id === userId) | ||||
|   } | ||||
| 
 | ||||
|   // Emits event to all authorized clients
 | ||||
|   emitter(evt, data) { | ||||
|     for (const socketId in this.clients) { | ||||
|       this.clients[socketId].socket.emit(evt, data) | ||||
|       if (this.clients[socketId].user) { | ||||
|         this.clients[socketId].socket.emit(evt, data) | ||||
|       } | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   clientEmitter(userId, ev, data) { | ||||
|     var clients = this.getClientsForUser(userId) | ||||
|   // Emits event to all clients for a specific user
 | ||||
|   clientEmitter(userId, evt, data) { | ||||
|     const clients = this.getClientsForUser(userId) | ||||
|     if (!clients.length) { | ||||
|       return Logger.debug(`[Server] clientEmitter - no clients found for user ${userId}`) | ||||
|     } | ||||
|     clients.forEach((client) => { | ||||
|       if (client.socket) { | ||||
|         client.socket.emit(ev, data) | ||||
|         client.socket.emit(evt, data) | ||||
|       } | ||||
|     }) | ||||
|   } | ||||
| 
 | ||||
|   // Emits event to all admin user clients
 | ||||
|   adminEmitter(evt, data) { | ||||
|     for (const socketId in this.clients) { | ||||
|       if (this.clients[socketId].user && this.clients[socketId].user.isAdminOrUp) { | ||||
|         this.clients[socketId].socket.emit(evt, data) | ||||
|       } | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   initialize(Server) { | ||||
|     this.Server = Server | ||||
| 
 | ||||
| @ -78,7 +91,29 @@ class SocketAuthority { | ||||
|       socket.on('remove_log_listener', () => Logger.removeSocketListener(socket.id)) | ||||
|       socket.on('fetch_daily_logs', () => this.Server.logManager.socketRequestDailyLogs(socket)) | ||||
| 
 | ||||
|       // Sent automatically from socket.io clients
 | ||||
|       socket.on('disconnect', (reason) => { | ||||
|         Logger.removeSocketListener(socket.id) | ||||
| 
 | ||||
|         const _client = this.clients[socket.id] | ||||
|         if (!_client) { | ||||
|           Logger.warn(`[Server] Socket ${socket.id} disconnect, no client (Reason: ${reason})`) | ||||
|         } else if (!_client.user) { | ||||
|           Logger.info(`[Server] Unauth socket ${socket.id} disconnected (Reason: ${reason})`) | ||||
|           delete this.clients[socket.id] | ||||
|         } else { | ||||
|           Logger.debug('[Server] User Offline ' + _client.user.username) | ||||
|           this.adminEmitter('user_offline', _client.user.toJSONForPublic(this.Server.playbackSessionManager.sessions, this.Server.db.libraryItems)) | ||||
| 
 | ||||
|           const disconnectTime = Date.now() - _client.connected_at | ||||
|           Logger.info(`[Server] Socket ${socket.id} disconnected from client "${_client.user.username}" after ${disconnectTime}ms (Reason: ${reason})`) | ||||
|           delete this.clients[socket.id] | ||||
|         } | ||||
|       }) | ||||
| 
 | ||||
|       //
 | ||||
|       // Events for testing
 | ||||
|       //
 | ||||
|       socket.on('message_all_users', (payload) => { | ||||
|         // admin user can send a message to all authenticated users
 | ||||
|         //   displays on the web app as a toast
 | ||||
| @ -95,26 +130,6 @@ class SocketAuthority { | ||||
|         Logger.debug(`[Server] Received ping from socket ${user.username || 'No User'}`) | ||||
|         socket.emit('pong') | ||||
|       }) | ||||
| 
 | ||||
|       // Sent automatically from socket.io clients
 | ||||
|       socket.on('disconnect', (reason) => { | ||||
|         Logger.removeSocketListener(socket.id) | ||||
| 
 | ||||
|         const _client = this.clients[socket.id] | ||||
|         if (!_client) { | ||||
|           Logger.warn(`[Server] Socket ${socket.id} disconnect, no client (Reason: ${reason})`) | ||||
|         } else if (!_client.user) { | ||||
|           Logger.info(`[Server] Unauth socket ${socket.id} disconnected (Reason: ${reason})`) | ||||
|           delete this.clients[socket.id] | ||||
|         } else { | ||||
|           Logger.debug('[Server] User Offline ' + _client.user.username) | ||||
|           this.io.emit('user_offline', _client.user.toJSONForPublic(this.Server.playbackSessionManager.sessions, this.Server.db.libraryItems)) | ||||
| 
 | ||||
|           const disconnectTime = Date.now() - _client.connected_at | ||||
|           Logger.info(`[Server] Socket ${socket.id} disconnected from client "${_client.user.username}" after ${disconnectTime}ms (Reason: ${reason})`) | ||||
|           delete this.clients[socket.id] | ||||
|         } | ||||
|       }) | ||||
|     }) | ||||
|   } | ||||
| 
 | ||||
| @ -141,9 +156,9 @@ class SocketAuthority { | ||||
| 
 | ||||
|     Logger.debug(`[Server] User Online ${client.user.username}`) | ||||
| 
 | ||||
|     // TODO: Send to authenticated clients only
 | ||||
|     this.io.emit('user_online', client.user.toJSONForPublic(this.Server.playbackSessionManager.sessions, this.Server.db.libraryItems)) | ||||
|     this.adminEmitter('user_online', client.user.toJSONForPublic(this.Server.playbackSessionManager.sessions, this.Server.db.libraryItems)) | ||||
| 
 | ||||
|     // Update user lastSeen
 | ||||
|     user.lastSeen = Date.now() | ||||
|     await this.Server.db.updateEntity('user', user) | ||||
| 
 | ||||
| @ -155,20 +170,19 @@ class SocketAuthority { | ||||
|     if (user.isAdminOrUp) { | ||||
|       initialPayload.usersOnline = this.getUsersOnline() | ||||
|     } | ||||
| 
 | ||||
|     client.socket.emit('init', initialPayload) | ||||
|   } | ||||
| 
 | ||||
|   logout(socketId) { | ||||
|     // Strip user and client from client and client socket
 | ||||
|     if (socketId && this.clients[socketId]) { | ||||
|       var client = this.clients[socketId] | ||||
|       var clientSocket = client.socket | ||||
|       const client = this.clients[socketId] | ||||
|       const clientSocket = client.socket | ||||
|       Logger.debug(`[Server] Found user client ${clientSocket.id}, Has user: ${!!client.user}, Socket has client: ${!!clientSocket.sheepClient}`) | ||||
| 
 | ||||
|       if (client.user) { | ||||
|         Logger.debug('[Server] User Offline ' + client.user.username) | ||||
|         this.io.emit('user_offline', client.user.toJSONForPublic(null, this.Server.db.libraryItems)) | ||||
|         this.adminEmitter('user_offline', client.user.toJSONForPublic(null, this.Server.db.libraryItems)) | ||||
|       } | ||||
| 
 | ||||
|       delete this.clients[socketId].user | ||||
|  | ||||
| @ -169,7 +169,7 @@ class PlaybackSessionManager { | ||||
|     user.currentSessionId = newPlaybackSession.id | ||||
| 
 | ||||
|     this.sessions.push(newPlaybackSession) | ||||
|     SocketAuthority.emitter('user_stream_update', user.toJSONForPublic(this.sessions, this.db.libraryItems)) | ||||
|     SocketAuthority.adminEmitter('user_stream_update', user.toJSONForPublic(this.sessions, this.db.libraryItems)) | ||||
| 
 | ||||
|     return newPlaybackSession | ||||
|   } | ||||
| @ -213,7 +213,7 @@ class PlaybackSessionManager { | ||||
|       await this.saveSession(session) | ||||
|     } | ||||
|     Logger.debug(`[PlaybackSessionManager] closeSession "${session.id}"`) | ||||
|     SocketAuthority.emitter('user_stream_update', user.toJSONForPublic(this.sessions, this.db.libraryItems)) | ||||
|     SocketAuthority.adminEmitter('user_stream_update', user.toJSONForPublic(this.sessions, this.db.libraryItems)) | ||||
|     return this.removeSession(session.id) | ||||
|   } | ||||
| 
 | ||||
|  | ||||
| @ -262,13 +262,13 @@ class ApiRouter { | ||||
| 
 | ||||
|   async getDirectories(dir, relpath, excludedDirs, level = 0) { | ||||
|     try { | ||||
|       var paths = await fs.readdir(dir) | ||||
|       const paths = await fs.readdir(dir) | ||||
| 
 | ||||
|       var dirs = await Promise.all(paths.map(async dirname => { | ||||
|         var fullPath = Path.join(dir, dirname) | ||||
|         var path = Path.join(relpath, dirname) | ||||
|       let dirs = await Promise.all(paths.map(async dirname => { | ||||
|         const fullPath = Path.join(dir, dirname) | ||||
|         const path = Path.join(relpath, dirname) | ||||
| 
 | ||||
|         var isDir = (await fs.lstat(fullPath)).isDirectory() | ||||
|         const isDir = (await fs.lstat(fullPath)).isDirectory() | ||||
|         if (isDir && !excludedDirs.includes(path) && dirname !== 'node_modules') { | ||||
|           return { | ||||
|             path, | ||||
| @ -293,13 +293,13 @@ class ApiRouter { | ||||
|   // Helper Methods
 | ||||
|   //
 | ||||
|   userJsonWithItemProgressDetails(user, hideRootToken = false) { | ||||
|     var json = user.toJSONForBrowser() | ||||
|     const json = user.toJSONForBrowser() | ||||
|     if (json.type === 'root' && hideRootToken) { | ||||
|       json.token = '' | ||||
|     } | ||||
| 
 | ||||
|     json.mediaProgress = json.mediaProgress.map(lip => { | ||||
|       var libraryItem = this.db.libraryItems.find(li => li.id === lip.libraryItemId) | ||||
|       const libraryItem = this.db.libraryItems.find(li => li.id === lip.libraryItemId) | ||||
|       if (!libraryItem) { | ||||
|         Logger.warn('[ApiRouter] Library item not found for users progress ' + lip.libraryItemId) | ||||
|         lip.media = null | ||||
| @ -326,31 +326,18 @@ class ApiRouter { | ||||
|   async handleDeleteLibraryItem(libraryItem) { | ||||
|     // Remove libraryItem from users
 | ||||
|     for (let i = 0; i < this.db.users.length; i++) { | ||||
|       var user = this.db.users[i] | ||||
|       var madeUpdates = user.removeMediaProgressForLibraryItem(libraryItem.id) | ||||
|       if (madeUpdates) { | ||||
|       const user = this.db.users[i] | ||||
|       if (user.removeMediaProgressForLibraryItem(libraryItem.id)) { | ||||
|         await this.db.updateEntity('user', user) | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     // remove any streams open for this audiobook
 | ||||
|     // TODO: Change to PlaybackSessionManager to remove open sessions for user
 | ||||
|     // var streams = this.streamManager.streams.filter(stream => stream.audiobookId === libraryItem.id)
 | ||||
|     // for (let i = 0; i < streams.length; i++) {
 | ||||
|     //   var stream = streams[i]
 | ||||
|     //   var client = stream.client
 | ||||
|     //   await stream.close()
 | ||||
|     //   if (client && client.user) {
 | ||||
|     //     client.user.stream = null
 | ||||
|     //     client.stream = null
 | ||||
|     //     this.db.updateUserStream(client.user.id, null)
 | ||||
|     //   }
 | ||||
|     // }
 | ||||
|     // TODO: Remove open sessions for library item
 | ||||
| 
 | ||||
|     // remove book from collections
 | ||||
|     var collectionsWithBook = this.db.collections.filter(c => c.books.includes(libraryItem.id)) | ||||
|     const collectionsWithBook = this.db.collections.filter(c => c.books.includes(libraryItem.id)) | ||||
|     for (let i = 0; i < collectionsWithBook.length; i++) { | ||||
|       var collection = collectionsWithBook[i] | ||||
|       const collection = collectionsWithBook[i] | ||||
|       collection.removeBook(libraryItem.id) | ||||
|       await this.db.updateEntity('collection', collection) | ||||
|       SocketAuthority.clientEmitter(collection.userId, 'collection_updated', collection.toJSONExpanded(this.db.libraryItems)) | ||||
| @ -361,34 +348,32 @@ class ApiRouter { | ||||
|       await this.cacheManager.purgeCoverCache(libraryItem.id) | ||||
|     } | ||||
| 
 | ||||
|     var json = libraryItem.toJSONExpanded() | ||||
|     await this.db.removeLibraryItem(libraryItem.id) | ||||
|     SocketAuthority.emitter('item_removed', json) | ||||
|     SocketAuthority.emitter('item_removed', libraryItem.toJSONExpanded()) | ||||
|   } | ||||
| 
 | ||||
|   async getUserListeningSessionsHelper(userId) { | ||||
|     var userSessions = await this.db.selectUserSessions(userId) | ||||
|     const userSessions = await this.db.selectUserSessions(userId) | ||||
|     return userSessions.sort((a, b) => b.updatedAt - a.updatedAt) | ||||
|   } | ||||
| 
 | ||||
|   async getAllSessionsWithUserData() { | ||||
|     var sessions = await this.db.getAllSessions() | ||||
|     const sessions = await this.db.getAllSessions() | ||||
|     sessions.sort((a, b) => b.updatedAt - a.updatedAt) | ||||
|     return sessions.map(se => { | ||||
|       var user = this.db.users.find(u => u.id === se.userId) | ||||
|       var _se = { | ||||
|       const user = this.db.users.find(u => u.id === se.userId) | ||||
|       return { | ||||
|         ...se, | ||||
|         user: user ? { id: user.id, username: user.username } : null | ||||
|       } | ||||
|       return _se | ||||
|     }) | ||||
|   } | ||||
| 
 | ||||
|   async getUserListeningStatsHelpers(userId) { | ||||
|     const today = date.format(new Date(), 'YYYY-MM-DD') | ||||
| 
 | ||||
|     var listeningSessions = await this.getUserListeningSessionsHelper(userId) | ||||
|     var listeningStats = { | ||||
|     const listeningSessions = await this.getUserListeningSessionsHelper(userId) | ||||
|     const listeningStats = { | ||||
|       totalTime: 0, | ||||
|       items: {}, | ||||
|       days: {}, | ||||
| @ -397,7 +382,7 @@ class ApiRouter { | ||||
|       recentSessions: listeningSessions.slice(0, 10) | ||||
|     } | ||||
|     listeningSessions.forEach((s) => { | ||||
|       var sessionTimeListening = s.timeListening | ||||
|       let sessionTimeListening = s.timeListening | ||||
|       if (typeof sessionTimeListening == 'string') { | ||||
|         sessionTimeListening = Number(sessionTimeListening) | ||||
|       } | ||||
| @ -432,15 +417,15 @@ class ApiRouter { | ||||
| 
 | ||||
|   async createAuthorsAndSeriesForItemUpdate(mediaPayload) { | ||||
|     if (mediaPayload.metadata) { | ||||
|       var mediaMetadata = mediaPayload.metadata | ||||
|       const mediaMetadata = mediaPayload.metadata | ||||
| 
 | ||||
|       // Create new authors if in payload
 | ||||
|       if (mediaMetadata.authors && mediaMetadata.authors.length) { | ||||
|         // TODO: validate authors
 | ||||
|         var newAuthors = [] | ||||
|         const newAuthors = [] | ||||
|         for (let i = 0; i < mediaMetadata.authors.length; i++) { | ||||
|           if (mediaMetadata.authors[i].id.startsWith('new')) { | ||||
|             var author = this.db.authors.find(au => au.checkNameEquals(mediaMetadata.authors[i].name)) | ||||
|             let author = this.db.authors.find(au => au.checkNameEquals(mediaMetadata.authors[i].name)) | ||||
|             if (!author) { | ||||
|               author = new Author() | ||||
|               author.setData(mediaMetadata.authors[i]) | ||||
| @ -461,10 +446,10 @@ class ApiRouter { | ||||
|       // Create new series if in payload
 | ||||
|       if (mediaMetadata.series && mediaMetadata.series.length) { | ||||
|         // TODO: validate series
 | ||||
|         var newSeries = [] | ||||
|         const newSeries = [] | ||||
|         for (let i = 0; i < mediaMetadata.series.length; i++) { | ||||
|           if (mediaMetadata.series[i].id.startsWith('new')) { | ||||
|             var seriesItem = this.db.series.find(se => se.checkNameEquals(mediaMetadata.series[i].name)) | ||||
|             let seriesItem = this.db.series.find(se => se.checkNameEquals(mediaMetadata.series[i].name)) | ||||
|             if (!seriesItem) { | ||||
|               seriesItem = new Series() | ||||
|               seriesItem.setData(mediaMetadata.series[i]) | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user