mirror of
				https://github.com/zoriya/Kyoo.git
				synced 2025-10-26 00:02:36 -04:00 
			
		
		
		
	Fix Scanner service crash when RabbitMQ queues are predeclared
Signed-off-by: Fred Heinecke <fred.heinecke@yahoo.com>
This commit is contained in:
		
							parent
							
								
									0493265b1d
								
							
						
					
					
						commit
						cf7bc456e8
					
				| @ -13,9 +13,22 @@ class RabbitBase: | |||||||
| 			login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), | 			login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), | ||||||
| 			password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), | 			password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), | ||||||
| 		) | 		) | ||||||
|  | 
 | ||||||
|  | 		# Attempt to declare the queue passively in case it already exists. | ||||||
|  | 		try: | ||||||
|  | 			self._channel = await self._con.channel() | ||||||
|  | 			self._queue = await self._channel.declare_queue(self.QUEUE, passive=True) | ||||||
|  | 			return self | ||||||
|  | 		except Exception: | ||||||
|  | 			# The server will close the channel on error. | ||||||
|  | 			# Cleanup the reference to it. | ||||||
|  | 			await self._channel.close() | ||||||
|  | 
 | ||||||
|  | 		# The queue does not exist, so actively declare it. | ||||||
| 		self._channel = await self._con.channel() | 		self._channel = await self._con.channel() | ||||||
| 		self._queue = await self._channel.declare_queue(self.QUEUE) | 		self._queue = await self._channel.declare_queue(self.QUEUE) | ||||||
| 		return self | 		return self | ||||||
| 
 | 
 | ||||||
| 	async def __aexit__(self, exc_type, exc_value, exc_tb): | 	async def __aexit__(self, exc_type, exc_value, exc_tb): | ||||||
|  | 		await self._channel.close() | ||||||
| 		await self._con.close() | 		await self._con.close() | ||||||
|  | |||||||
| @ -13,11 +13,6 @@ logger = getLogger(__name__) | |||||||
| class Publisher(RabbitBase): | class Publisher(RabbitBase): | ||||||
| 	QUEUE_RESCAN = "scanner.rescan" | 	QUEUE_RESCAN = "scanner.rescan" | ||||||
| 
 | 
 | ||||||
| 	async def __aenter__(self): |  | ||||||
| 		await super().__aenter__() |  | ||||||
| 		self._queue = await self._channel.declare_queue(self.QUEUE_RESCAN) |  | ||||||
| 		return self |  | ||||||
| 
 |  | ||||||
| 	async def _publish(self, data: dict): | 	async def _publish(self, data: dict): | ||||||
| 		await self._channel.default_exchange.publish( | 		await self._channel.default_exchange.publish( | ||||||
| 			Message(json.dumps(data).encode()), | 			Message(json.dumps(data).encode()), | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user