mirror of
				https://github.com/zoriya/Kyoo.git
				synced 2025-11-04 03:27:14 -05:00 
			
		
		
		
	Fix Backend service crash when RabbitMQ queues are predeclared
Signed-off-by: Fred Heinecke <fred.heinecke@yahoo.com>
This commit is contained in:
		
							parent
							
								
									0fe423869a
								
							
						
					
					
						commit
						1a61b18b2d
					
				@ -30,6 +30,7 @@ public class RabbitProducer
 | 
				
			|||||||
	{
 | 
						{
 | 
				
			||||||
		_channel = rabbitConnection.CreateModel();
 | 
							_channel = rabbitConnection.CreateModel();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if (!doesExchangeExist(rabbitConnection, "events.resource"))
 | 
				
			||||||
			_channel.ExchangeDeclare("events.resource", ExchangeType.Topic);
 | 
								_channel.ExchangeDeclare("events.resource", ExchangeType.Topic);
 | 
				
			||||||
		_ListenResourceEvents<Collection>("events.resource");
 | 
							_ListenResourceEvents<Collection>("events.resource");
 | 
				
			||||||
		_ListenResourceEvents<Movie>("events.resource");
 | 
							_ListenResourceEvents<Movie>("events.resource");
 | 
				
			||||||
@ -39,6 +40,7 @@ public class RabbitProducer
 | 
				
			|||||||
		_ListenResourceEvents<Studio>("events.resource");
 | 
							_ListenResourceEvents<Studio>("events.resource");
 | 
				
			||||||
		_ListenResourceEvents<User>("events.resource");
 | 
							_ListenResourceEvents<User>("events.resource");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if (!doesExchangeExist(rabbitConnection, "events.watched"))
 | 
				
			||||||
			_channel.ExchangeDeclare("events.watched", ExchangeType.Topic);
 | 
								_channel.ExchangeDeclare("events.watched", ExchangeType.Topic);
 | 
				
			||||||
		IWatchStatusRepository.OnMovieStatusChangedHandler += _PublishWatchStatus<Movie>("movie");
 | 
							IWatchStatusRepository.OnMovieStatusChangedHandler += _PublishWatchStatus<Movie>("movie");
 | 
				
			||||||
		IWatchStatusRepository.OnShowStatusChangedHandler += _PublishWatchStatus<Show>("show");
 | 
							IWatchStatusRepository.OnShowStatusChangedHandler += _PublishWatchStatus<Show>("show");
 | 
				
			||||||
@ -47,6 +49,30 @@ public class RabbitProducer
 | 
				
			|||||||
		);
 | 
							);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/// <summary>
 | 
				
			||||||
 | 
						/// Checks if the exchange exists. Needed to avoid crashing when re-declaring an existing
 | 
				
			||||||
 | 
						/// queue with different parameters.
 | 
				
			||||||
 | 
						/// </summary>
 | 
				
			||||||
 | 
						/// <param name="rabbitConnection">The RabbitMQ connection.</param>
 | 
				
			||||||
 | 
						/// <param name="exchangeName">The name of the channel.</param>
 | 
				
			||||||
 | 
						/// <returns>True if the queue exists, false otherwise.</returns>
 | 
				
			||||||
 | 
						private bool doesExchangeExist(IConnection rabbitConnection, string exchangeName)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							// If the queue does not exist when QueueDeclarePassive is called,
 | 
				
			||||||
 | 
							// an exception will be thrown. According to the docs, when this
 | 
				
			||||||
 | 
							// happens, the entire channel should be thrown away.
 | 
				
			||||||
 | 
							using var channel = rabbitConnection.CreateModel();
 | 
				
			||||||
 | 
							try
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								channel.ExchangeDeclarePassive(exchangeName);
 | 
				
			||||||
 | 
								return true;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							catch (Exception)
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								return false;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	private void _ListenResourceEvents<T>(string exchange)
 | 
						private void _ListenResourceEvents<T>(string exchange)
 | 
				
			||||||
		where T : IResource, IQuery
 | 
							where T : IResource, IQuery
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
 | 
				
			|||||||
@ -31,10 +31,36 @@ public class ScannerProducer : IScanner
 | 
				
			|||||||
	public ScannerProducer(IConnection rabbitConnection)
 | 
						public ScannerProducer(IConnection rabbitConnection)
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
		_channel = rabbitConnection.CreateModel();
 | 
							_channel = rabbitConnection.CreateModel();
 | 
				
			||||||
 | 
							if (!doesQueueExist(rabbitConnection, "scanner"))
 | 
				
			||||||
			_channel.QueueDeclare("scanner", exclusive: false, autoDelete: false);
 | 
								_channel.QueueDeclare("scanner", exclusive: false, autoDelete: false);
 | 
				
			||||||
 | 
							if (!doesQueueExist(rabbitConnection, "scanner.rescan"))
 | 
				
			||||||
			_channel.QueueDeclare("scanner.rescan", exclusive: false, autoDelete: false);
 | 
								_channel.QueueDeclare("scanner.rescan", exclusive: false, autoDelete: false);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/// <summary>
 | 
				
			||||||
 | 
						/// Checks if the queue exists. Needed to avoid crashing when re-declaring an existing
 | 
				
			||||||
 | 
						/// queue with different parameters.
 | 
				
			||||||
 | 
						/// </summary>
 | 
				
			||||||
 | 
						/// <param name="rabbitConnection">The RabbitMQ connection.</param>
 | 
				
			||||||
 | 
						/// <param name="queueName">The name of the channel.</param>
 | 
				
			||||||
 | 
						/// <returns>True if the queue exists, false otherwise.</returns>
 | 
				
			||||||
 | 
						private bool doesQueueExist(IConnection rabbitConnection, string queueName)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							// If the queue does not exist when QueueDeclarePassive is called,
 | 
				
			||||||
 | 
							// an exception will be thrown. According to the docs, when this
 | 
				
			||||||
 | 
							// happens, the entire channel should be thrown away.
 | 
				
			||||||
 | 
							using var channel = rabbitConnection.CreateModel();
 | 
				
			||||||
 | 
							try
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								channel.QueueDeclarePassive(queueName);
 | 
				
			||||||
 | 
								return true;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							catch (Exception)
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								return false;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	private Task _Publish<T>(T message, string queue = "scanner")
 | 
						private Task _Publish<T>(T message, string queue = "scanner")
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
		var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message, Utility.JsonOptions));
 | 
							var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message, Utility.JsonOptions));
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user