En el post anterior vimos cómo abstraer el patrón productor consumidor para que los clientes de nuestra librería puedan implementar dicho patrón de una forma muy sencilla.
En este caso, vamos a ver como utilizar dicha abstracción por el lado del servicio que vamos a utilizar, en nuestro caso RabbitMQ, pero podría ser otro como kafka, Mosquitto, etc.
Ya lo mencioné en el post anterior, pero estos post son muy importantes ya que es posible que algún día en una entrevista os pidan hacer un sistema que incluya comunicación asíncrona. Con la librería de Distribt serás completamente capaz de implementar el patrón productor consumidor con unas pocas líneas de código.
Índice
1 - ¿Qué es RabbitMQ?
RabbitMQ es un software que actúa como Message Broker
o lo que quiere decir lo mismo, el lugar donde publicamos mensajes cuando queremos tener comunicación asíncrona en nuestra aplicación.
RabbitMQ también nos permite administrar colas, las cuales funcionan como las colas de las colecciones de .NET.
1.2 - ¿Cómo funciona RabbitMQ?
Cada message Broker tiene sus propias características y funcionalidades, en el caso de RabbitMQ podemos distinguir dos elementos claves de la arquitectura, las colas y los exchanges.
1.2.1 - Qué es una cola en RabbitMQ?
Las colas en RabbitMQ funcionan con la misma idea que las colas en la programación en general, llega un elemento por un lado y sale por el otro, siempre de forma ordenada y siempre 1 a 1. Lo que quiere decir que llega un mensaje por un lado y sale un solo mensaje por el otro.
Por lo que si lo que queremos es utilizar colas de mensajes para nuestra comunicación asíncrona, son el elemento a elegir.
1.2.2 - Qué es un exchange en RabbitMQ?
Los exchanges, por el contrario, son algo más complicados; En resumidas cuentas podríamos entender un exchange como el lugar donde debemos publicar los mensajes cuando queremos tener la posibilidad de tener más de un consumidor.
Habrá escenarios en nuestro sistema, por ejemplo los mensajes de integración que vimos en el post anterior donde necesitamos que X número de clientes consuman dicho mensaje.
Los exchanges en RabbitMQ funcionan de una forma un poco peculiar,y es que no almacenan la información, únicamente la transfieren, eso quiere decir que si enviamos un mensaje a un exchange, y no tienen ningún consumidor, este se pierde.
Pero a la vez, no podemos consumir de un exchange directamente en nuestra aplicación, si no que únicamente podemos consumir de las colas.
Por lo tanto, debemos enlazar los exchanges con las colas correspondientes, a esta acción se le llama hacer un binding.
Y juntos, Exchange, binding y colas, representan un broker.
- Nota: cuando hacemos el binding también se puede hacer de un exchange a otro exchange.
La cosa no acaba ahí, dicho exchange lo podemos configurar de múltiples formas:
1.2.3 - Tipos de exchanges en RabbitMQ
Antes de pasar a explicar los diferentes tipos vamos a mencionar que es RoutingKey
, una forma fácil de entender su funcionamiento es como la dirección. Básicamente es una propiedad que indicamos al mandar el mensaje, y esa misma propiedad puede ser configurada en los diferentes bindings.
Exchange tipo direct en RabbitMQ
Un exchange tipo direct
está ligado directamente a la Routing Key esto es debido a que cuando enviamos el mensaje, el exchange comprobará la routing key, y propagará el mensaje únicamente en aquellos bindings que estén configurados con la misma routing key.
Exchange tipo Topic en RabbitMQ
Un exchange tipo topic
utiliza el routing key para hacer un match, utilizando wildcards que permiten añadir configuración extra. Por ejemplo, produces un mensaje con un routing key con el valor “api.privada.order”
y luego tienes bindings que configuran su routing key con “api.privada.order”
o “api.privada.*”
como puedes observar en esta última tenemos un asterisco, eso es la wildcard
Hay dos tipos diferentes de wildcard:
- Asterisco (
*
) el cual permite definir cualquier valor como hemos visto en el ejemplo. - Hash (
#
) el cual permite indicar que haya una palabra o no, por ejemplo“api.privada”
enviará el mensaje a un topic con la routing key con el valor“api.privada.#”
mientras que con un asterisco no lo hace.
Exchange tipo header en RabbitMQ
Un exchange tipo header
es muy similar a un tipo direct. Al crear un exchange tipo direct lo hacemos con la routing key, pero, qué sucede si necesitamos más de un argumento para definir el binding?
Para ello utilizamos el tipo header, ya que podemos configurar tantos argumentos como queramos.
En el ejemplo vemos como tenemos puesto en el binding más de un argumento, en este caso el tipo de mensaje y el formato
.
Exchange tipo Fanout en RabbitMQ
Finalmente el exchange tipo fanout
es aquel que propaga los mensajes a todos los bindings, sin importar el routing key o ninguna otra configuración.
1.3 - Por qué RabbitMQ
He elegido RabbitMQ principalmente por qué es muy común dentro del entorno de .NET, muchas librerías escritas en .NET que abstraen funcionalidades lo hacen primero para RabbitMQ y luego para el resto de message brokers. Lo que lleva a muchas empresas a utilizar RabbitMQ por el mismo motivo.
Y por ese motivo yo he decidido utilizar RabbitMQ para el curso de Distribt
.
2 - Crear la estructura de RabbitMQ
Antes de empezar a crear la estructura hay que comprender que podemos crearla de dos formas diferentes, en el código, o a través de los ficheros de configuración de la estructura del sistema.
- La primera de las formas se suele llamar “lazy creation” lo que viene a significar que dichos elementos (colas, exchanges, bindings, etc) no se van a crear hasta que los vayamos a utilizar.
- Y la segunda, que yo sepa no tiene ningún nombre, más que incluir la infraestructura en la configuración, y normalmente el servicio que estamos configurando, nos da como importar dicha estructura, en el caso de RabbitMQ es un fichero json.
Cada una tiene pros y contras, por ejemplo, en lazy creation debemos asegurarnos que las colas, los exchanges, las colas dead-letter existen antes de enviar cada mensaje, mientras que si lo hacemos por configuración, nos saltará una excepción en tiempo de ejecución en caso de que dicha cola no exista.
Para este curso voy a elegir la opción de la configuración, por el motivo de que pienso que la configuración de la infraestructura debería estar separada del código de la aplicación. Pero si en tu caso personal, prefieres hacerlo en código, que sepas que RabbitMQ provee una librería que permite hacerlo de forma muy sencilla.
La ruta de los ficheros de configuración va a ser .\tools\rabbitmq
Primero de todo vamos a ver que vamos a implementar en este post.
Basándonos en la arquitectura propuesta para la aplicación, vamos a implementar producers/consumers entre la api pública y el microservicio de subscription.
Esto quiere decir que generamos un evento en la api y el microservicio lo consume.
Para ello necesitamos diferentes elementos:
- Exchange para publicar los mensajes de la api
- Exchange para recibir mensajes en el micro servicio de subscripción
- Cola para las suscripciones
- dead letter exchange
- dead letter queue
- y sus respectivos bindigns
y para esto necesitamos dos ficheros, el primero llamado rabbitmq.config
, donde apuntaremos al segundo, que contiene la configuración:
[
{rabbit, [
{loopback_users, []}
]},
{rabbitmq_management, [
{load_definitions, "/etc/rabbitmq/definitions.json"}
]}
].
y ahora el fichero de la configuración definitions.json
:
{
"users": [],
"vhosts": [
{
"name": "/"
}
],
"permissions": [],
"parameters": [],
"policies": [
{"vhost":"/","name":"DLX","pattern":".*","apply-to":"queues","definition":{"dead-letter-exchange":"dead-letter.exchange"},"priority":0}
],
"queues": [
{"name":"subscription-queue","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
{"name":"subscription-queue.dead-letter","vhost":"/","durable":true,"auto_delete":false,"arguments":{}}
],
"exchanges": [
{"name":"api.public.exchange","vhost":"/","type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{}},
{"name":"subscription.exchange","vhost":"/","type":"topic","durable":true,"auto_delete":false,"internal":false,"arguments":{}},
{"name":"dead-letter.exchange","vhost":"/","type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{}}
],
"bindings": [
{"source":"api.public.exchange","vhost":"/","destination":"subscription.exchange","destination_type":"exchange","routing_key":"subscription","arguments":{}},
{"source":"subscription.exchange","vhost":"/","destination":"subscription-queue","destination_type":"queue","routing_key":"subscription","arguments":{}},
{"source":"dead-letter.exchange","vhost":"/","destination":"subscription-queue.dead-letter","destination_type":"queue","routing_key":"subscription","arguments":{}}
]
}
Como puedes observar la forma de especificar la dead-letter queue es a través de una policy.
Finalmente únicamente nos queda juntarlo todo en el fichero docker-compose
version: "3.9"
services:
rabbitmq:
image: rabbitmq:3.9-management-alpine #management version needed to be able to have a User interface
container_name: rabbitmq
environment:
- RABBITMQ_DEFAULT_USER=DistribtAdmin
- RABBITMQ_DEFAULT_PASS=DistribtPass
ports:
- 5672:5672
- 15672:15672
volumes:
- ./tools/rabbitmq.config:/etc/rabbitmq/rabbitmq.config
- ./tools/definitions.json:/etc/rabbitmq/definitions.json
Y si vamos al navegador (http://localhost:15672
) podemos ver que tenemos toda la configuración implementada.
y también podemos observar cómo se han configurado los bindings correctamente:
Ahora vamos a pasar a implementar el código. Para ello, lo primero que debemos hacer es en un nuevo proyecto de librería incluir la referencia desde NuGet al paquete RabbitMQ.Client
3 - Publicar mensajes con RabbitMQ
En el post anterior vimos la abstracción del patrón productor consumidor y que únicamente debemos hacer _publisher.publish()
para publicar mensajes, pero, qué es lo que estamos haciendo realmente por detrás?
Como recordarás indicamos que todo lo que nuestro publisher debe hacer es crear una clase que implemente IExternalMessagePublisher<TMessage>
y así es, creamos una clase que implemente dicha interfaz
public class RabbitMQMessagePublisher<TMessage> : IExternalMessagePublisher<TMessage>
where TMessage : IMessage
{
private readonly ISerializer _serializer;
private readonly RabbitMQSettings _settings;
private readonly ConnectionFactory _connectionFactory;
public RabbitMQMessagePublisher(ISerializer serializer, IOptions<RabbitMQSettings> settings)
{
_settings = settings.Value;
_serializer = serializer;
_connectionFactory = new ConnectionFactory()
{
HostName = _settings.Hostname,
Password = _settings.Password,
UserName = _settings.Username
};
}
public Task Publish(TMessage message, string? routingKey = null, CancellationToken cancellationToken = default)
{
using IConnection connection = _connectionFactory.CreateConnection();
using IModel model = connection.CreateModel();
PublishSingle(message, model, routingKey);
return Task.CompletedTask;
}
public Task PublishMany(IEnumerable<TMessage> messages, string? routingKey = null, CancellationToken cancellationToken = default)
{
using IConnection connection = _connectionFactory.CreateConnection();
using IModel model = connection.CreateModel();
foreach (TMessage message in messages)
{
PublishSingle(message, model, routingKey);
}
return Task.CompletedTask;
}
private void PublishSingle(TMessage message, IModel model, string? routingKey)
{
var properties = model.CreateBasicProperties();
properties.Type = RemoveVersion(message.GetType());
model.BasicPublish(exchange: GetCorrectExchange(),
routingKey: routingKey ?? "",
basicProperties: properties,
body: _serializer.SerializeObjectToByteArray(message));
}
private string GetCorrectExchange()
{
return (typeof(TMessage) == typeof(IntegrationMessage)
? _settings.Publisher?.IntegrationExchange
: _settings.Publisher?.DomainExchange)
?? throw new ArgumentException("please configure the Exchanges on the appsettings");
}
}
En el código implementamos ambos métodos, Publish
y PublishMany
, ambos hacen prácticamente lo mismo, creamos la conexión el modelo y publicamos los mensajes.
Cuando publicamos los mensajes, indicamos la propiedad type
que nos servirá después para poder identificar el tipo que es el mensaje en si.
- Nota: La parte
RemoveVersion
es para acortar el texto (hay un límite de 250 caracteres) y dejar como tipo solo el assembly, así podemos deserializar sin problemas.
Finalmente publicamos el mensaje con model.BasicPublish()
al cual le pasamos varios parámetros
- Exchange: esta parte depende un poco de tu lógica, en la lógica de nuestra aplicación los eventos de integración van a un exchange y los eventos de dominio van a otro.
- Routingkey: es la parte explicada un par de puntos mas atrás, si la especificas al mandar el mensaje se incluirá, en caso contrario enviará un string vacio.
- Propiedades: que incluye el tipo.
- Mensaje: RabbitMQ recibe los mensajes como array de bytes por lo tanto debemos serializarlos a
byte[]
. El código deserializer
tambien es parte de distribt.
Y con esto configurado, únicamente nos queda incluir el publisher en el contenedor de dependencias para poder ser inyectado.
public static void AddRabbitMQPublisher<TMessage>(this IServiceCollection serviceCollection)
where TMessage : IMessage
{
serviceCollection.AddPublisher<TMessage>();
serviceCollection.AddSingleton<IExternalMessagePublisher<TMessage>, RabbitMQMessagePublisher<TMessage>>();
}
El cual está incluido tanto en AddServiceBusIntegrationPublisher
como AddServiceBusDomainPublisher
que son los métodos utilizados desde nuestra aplicación que vimos en el post anterior.
Ahora desde nuestra minimal api podemos publicar mensajes con dos líneas de código:
app.MapPost("/subscribe", async (SubscriptionDto subscriptionDto) =>
{
IIntegrationMessagePublisher publisher = app.Services.GetRequiredService<IIntegrationMessagePublisher>();
await publisher.Publish(subscriptionDto, routingKey: "subscription");
});
Pero antes de ejecutar, no olvides indicar la configuración en fichero appsettings.json
{
....
"Bus": {
"RabbitMQ": {
"Hostname": "localhost",
"Username": "DistribtAdmin",
"Password": "DistribtPass",
"Publisher": {
"IntegrationExchange": "api.public.exchange"
}
}
}
}
4 - Consumir mensajes con rabbitMQ
El proceso para configurar el consumer de RabbitMQ es similar, únicamente debemos implementar la interfaz IMessageConsumer<TMessage>
para ello creamos la clase RabbitMQMessageConsumer<TMessage>
:
public class RabbitMQMessageConsumer<TMessage> : IMessageConsumer<TMessage>
{
private readonly ISerializer _serializer;
private readonly RabbitMQSettings _settings;
private readonly ConnectionFactory _connectionFactory;
private readonly IHandleMessage _handleMessage;
public RabbitMQMessageConsumer(ISerializer serializer, IOptions<RabbitMQSettings> settings, IHandleMessage handleMessage)
{
_settings = settings.Value;
_serializer = serializer;
_handleMessage = handleMessage;
_connectionFactory = new ConnectionFactory()
{
HostName = _settings.Hostname,
Password = _settings.Password,
UserName = _settings.Username
};
}
public Task StartAsync(CancellationToken cancelToken = default)
{
return Task.Run(async () => await Consume(), cancelToken);
}
private Task Consume()
{
using IConnection connection = _connectionFactory.CreateConnection();
using IModel channel = connection.CreateModel();
RabbitMQMessageReceiver receiver = new RabbitMQMessageReceiver(channel, _serializer, _handleMessage);
string queue = GetCorrectQueue();
channel.BasicConsume(queue, false, receiver);
return Task.CompletedTask;
}
}
- Nota: rabbitMQ no trabaja con procesos
async
pero debemos suponer que el resto de librerías si lo hacen, por eso está elTask.Run
.
Lo que tenemos que hacer es consumir mensajes de la cola especificada en el appsettings
del consumer:
{
...
"Bus": {
"RabbitMQ": {
"Hostname" : "localhost",
"Username": "DistribtAdmin",
"Password" : "DistribtPass",
"Consumer": {
"IntegrationQueue" : "subscription-queue"
}
}
},
}
Si te fijas cuando hacemos .BasicConsumer
estamos indicando la cola
y además un objeto llamado RabbitMQMessageReceiver
.
Este es el objeto que va a invocar la librería de rabbitMQ cuando consuma un mensaje, eso es debido a que implementa DefaultBasicConsumer
que tiene un método denominado HandleBasicDeliver
, y es aquí donde vamos a ejecutar el handler
que hemos creado en la aplicación consumidora.
public class RabbitMQMessageReceiver : DefaultBasicConsumer
{
private readonly IModel _channel;
private readonly ISerializer _serializer;
private byte[]? MessageBody { get; set; }
private Type? MessageType { get; set; }
private ulong DeliveryTag { get; set; }
private readonly IHandleMessage _handleMessage;
public RabbitMQMessageReceiver(IModel channel, ISerializer serializer, IHandleMessage handleMessage)
{
_channel = channel;
_serializer = serializer;
_handleMessage = handleMessage;
}
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange,
string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
MessageType = Type.GetType(properties.Type)!;
MessageBody = body.ToArray();
DeliveryTag = deliveryTag; // Used to delete the message from rabbitMQ
// #5 not ideal solution, but seems that this HandleBasicDeliver needs to be like this as its not async
var t = Task.Run(HandleMessage);
t.Wait();
}
private async Task HandleMessage()
{
if (MessageBody == null || MessageType == null)
{
throw new ArgumentException("Neither the body or the messageType has been populated");
}
IMessage message = (_serializer.DeserializeObject(MessageBody, MessageType) as IMessage)
?? throw new ArgumentException("The message did not deserialized properly");
await _handleMessage.Handle(message, CancellationToken.None);
//When success, delete from the queue
_channel.BasicAck(DeliveryTag, false);
}
}
Y con esto, los handlers que teníamos especificados del post anterior ya funcionarán sin problemas
Conclusión
- En este post hemos visto qué es RabbitMQ y sus características.
- Hemos visto cómo funciona rabbitMQ con Docker.
- La implementación de RabbitMQ con .NET