Como celebración por el día del libro, puedes conseguir un 15% de descuento en todos los libros de la web:

Channels en C#: la cola en memoria perfecta

Hoy vengo a hablar de un tipo en c# que la gran mayoría de vosotros no conoceréis, de hecho yo no lo conocía hasta hace un par de años hasta que entré en el cliente que estoy actualmente, pero, explicarlo ahora me viene perfecto ya que va en relación a lo que ha sucedido recientemente con la librería MediatR ya que puede reemplazar parte de su funcionalidad. 

 

 

En el post de hoy, veremos El tipo Channels en C#.

 

1 - ¿Que es Channels en C#?

Dentro de C# Tenemos un tipo muy concreto que se basa en el concepto de enviar mensajes o eventos a través de dicho canal, lo que implica que tenemos una parte del sistema que crea un evento y otra parte del sistema que lo recibe. Estos eventos pueden ser de cualquier tipo y están almacenados en el canal hasta que el receptor los consume.

 

Si paras a pensar, esto es una lógica muy similar a lo que viene siendo el patrón productor consumidor pero en memoria. 

NOTA: Si no conoces el patrón productor consumidor te recomiendo que te leas el siguiente post primero: Enlace post patrón Productor-Consumidor.

 

Pero para resumir, es un patrón que donde una parte del sistema genera un evento y otra parte del sistema lo consume. Ambas partes no tienen conocimiento o información sobre la otra parte y técnicamente (aunque no para el correcto funcionamiento del sistema) da igual quien genera o consume ese evento, a los consumidores no les importa quien lo genera, y a los productores no les importa quien lo consume. Si estáis acostumbrados a los sistemas distribuidos, es una cola de mensajes. 

 

 

Channels nos permite implementar esta funcionalidad, que viene siendo una cola de mensajes si estáis acostumbrados a los sistemas distribuidos en memoria sin ninguna infraestructura extra. Lo que permite desacoplar ambas partes del sistema al no tener una comunicación directa y se puede procesar de forma asíncrona.

 

 

2 - ¿Qué tiene esto que ver con MediatR?

Algunos os preguntaréis qué tiene esto que ver con el uso de MediatR, ya que por todo lo que hemos visto los channels no tienen nada que ver con el patrón mediador. Y es cierto, pero la librería MediatR no únicamente implementa el patrón mediador, sino que también implementa notificaciones a traves de su INotificacionHandler<T> y es aquí donde entran los channels.

 

Para este ejemplo vamos a coger el mismo escenario que vimos en el post del patrón mediador, donde un endpoint de una API recibe una llamada para actualizar un producto a través de un endpoint el cual invoca un handler que realiza la lógica y termina creando una notificación a través de MediatR, esta notificación es escuchada por una parte del sistema el cual envía un email a todos los usuarios que estén 

app design

Si leísteis aquel post, o el que hice sobre Core-Driven Architecture sabreís que yo no soy muy fan de utilizar MediatR, aún así, siempre he visto muy muy util el uso de las notificaciones para desacoplar las diferentes partes de un sistema. 

Así que aquí vamos a ver como eliminar mediatR de nuestro sistema reemplazandolo completamente con funcionalidades que ya están disponibles dentro de .NET. 

 

Para el patrón mediador hacia el caso de uso, osea los handlers, los eliminaremos completamente y haremos una llamada directa. Puedes pensar que eso acopla el código, y en parte es cierto, pero el código está acoplado de forma lógica de todas formas, así que la diferencia es prácticamente 0.

 

Si usas la MediatR pipeline con diferentes behaviours, siempre puedes reemplazarlos con middlewares y filtros, el funcionamiento es prácticamente el mismo.

 

Para las notificaciones, utilizaremos channels, y veremos en el siguiente punto como configurarlo. 

Lo que resulta en el siguiente diagrama: 

core driven development diagram

 

 

3 - Implementar el patrón productor consumidor con Channels

Como siempre para este ejemplo el código está en GitHub, pero mi idea es que sea sencillo de seguir.

 

Primero mencionar que tenemos un DTO de entrada, el cual es ItemDto

public record ItemDto(int Id, decimal Price, string Title);

NOTA: En este ejemplo, para ahorrar tiempo, es el que voy a insertar en la capa de datos, pero en una aplicación real, debería ser una entidad, ya que son tipos distintos

 

Un endpoint muy sencillo, que simplemente llama al caso de uso para actualizar el producto: 

app.MapPut("items", async (UpdateItem updateItem, ItemDto itemDto) =>
{
    return await updateItem.Execute(itemDto);
});

 

Y por ahora, el caso de uso donde leemos de la base de datos y actualizamos el valor:

public class UpdateItem (IDatabaseRepository databaseRepository)
{
    public async Task<bool> Execute(ItemDto itemToUpdate)
    {

        if (itemToUpdate.Title.Length > 200)
            throw new Exception("Title must be less than 200 characters");
        if (itemToUpdate.Price <= 0)
            throw new Exception("It can't be free");

        await databaseRepository.UpdateItem(itemToUpdate.Id, itemToUpdate.Price, itemToUpdate.Title);          

        return true;
    }
}

Nota: La capa de la base de datos no está implementada, es solo para mostrar el funcionamiento.

 

 

3.1 - Generar eventos con Channels 

Nos ponemos en situación donde cada vez que se actualice un producto y el precio sea un 30% menor que antes queremos generar una notificación, ya sea un email a los clientes como hace steam, o una notificación push o lo que sea, da igual. En nuestro caso nos concierne el hecho de que tenemos que generar una notificación y obviamente queremos que suceda de forma asíncrona, no queremos estar esperando a que ese proceso termine para hacerlo nosotros. 

Para ello, en el post de MediatR vimos que podíamos utilizar el tipo INotification, aunque hay más soluciones como utilizar una aplicación como hangfire

 

Pero en nuestro escenario actual, lo que tenemos que hacer es importar el using System.Threading.Channels y añadir el channel (canal) en el constructor de nuestro caso de uso y utilizarlo para escribir en el canal: 

public class UpdateItem (IDatabaseRepository databaseRepository, 
Channel<ItemUpdated> channel 👈
)
{
    public async Task<bool> Execute(ItemDto itemToUpdate)
    {

        if (itemToUpdate.Title.Length > 200)
            throw new Exception("Title must be less than 200 characters");
        if (itemToUpdate.Price <= 0)
            throw new Exception("It can't be free");

        ItemDto originalItem = await databaseRepository.GetItemById(itemToUpdate.Id);
        await databaseRepository.UpdateItem(itemToUpdate.Id, itemToUpdate.Price, itemToUpdate.Title);

        decimal percentageDifference = ((itemToUpdate.Price - originalItem.Price) / originalItem.Price) * 100;
        if (percentageDifference <= -30)
        {
            await channel.Writer.WriteAsync(new ItemUpdated() 👈
            {
                Id = itemToUpdate.Id,
                NewPrice = itemToUpdate.Price,
                NewTitle = itemToUpdate.Title,
                OldPrice = originalItem.Price,
                OldTitle = originalItem.Title
            });
        }
        return true;
    }
}

Y que no se nos olvide añadirlo al contenedor de dependencias, aquí no es simplemente añadirlo y ya está, tenemos dos opciones, crear channels bounded or unbounded.

 

La diferencia es sencilla, bounded permite añadir un límite máximo de elementos en ese canal, mientras que unbounded no tiene límite, bueno si lo tiene, la memoria del sistema donde esté corriendo.  

 

Por norma general yo los configuro como unbounded, aunque si los configuras con límites tienes que configurar también que es lo que vas a hacer si el canal se llena, donde tienes varias opciones.

  • Wait, eeperar hasta que haya espacio.
  • DropWrite: dejamos de escribir, bueno, el mensaje que estás intentando escribir se descarta.
  • DropOldest: eliminamos el mensaje más viejo e introducimos el nuevo.
  • DropNewest: elliminamos el mensaje más reciente e introducimos el nuevo.

 

Depende de qué tipo de sistema estés construyendo, puede que te sirvan unos u otros, o simplemente dejarlo sin límite si no vas a tener problemas de memoria, que en los tiempos que corren suele ser lo más normal. 

 

Esta sería la implementación para un Channel con capacidad de 100 elementos donde esperamos a que los eventos anteriores liberen espacio:

BoundedChannelOptions options = new BoundedChannelOptions(100) {
    FullMode = BoundedChannelFullMode.Wait
};
builder.Services.AddSingleton(Channel.CreateBounded<ItemUpdated>(options));

 

 

3.2 - Consumir eventos con channels

Finalmente únicamente nos queda consumir dicho evento. Esto se hace con una tarea en segundo plano, en otras palabras, con un BackgroundService nativo de .NET

public class ItemUpdatedWorker(Channel<ItemUpdated> channel) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (await channel.Reader.WaitToReadAsync(stoppingToken))
        {
            while (channel.Reader.TryRead(out ItemUpdated? item))
            {
                Console.WriteLine($"Item {item.Id} has been updated. Old price: {item.OldPrice}, " +
                                  $"New price: {item.NewPrice}. Old title: {item.OldTitle}, " +
                                  $"New title: {item.NewTitle}");
            }
        }
    }
}

 

Que no se nos olvide añadirlo al contenedor de dependencias:

builder.Services.AddHostedService<ItemUpdatedWorker>();

 

Como vemos, simplemente leemos el channel, en este ejemplo estoy imprimiendo los valores en la consola, pero este backgroundworker podría hacer cualquier acción necesaria en segundo plano. 

 

Uso del bloqueador de anuncios adblock

Hola!

Primero de todo bienvenido a la web de NetMentor donde podrás aprender programación en C# y .NET desde un nivel de principiante hasta más avanzado.


Yo entiendo que utilices un bloqueador de anuncios como AdBlock, Ublock o el propio navegador Brave. Pero te tengo que pedir por favor que desactives el bloqueador para esta web.


Intento personalmente no poner mucha publicidad, la justa para pagar el servidor y por supuesto que no sea intrusiva; Si pese a ello piensas que es intrusiva siempre me puedes escribir por privado o por Twitter a @NetMentorTW.


Si ya lo has desactivado, por favor recarga la página.


Un saludo y muchas gracias por tu colaboración

© copyright 2025 NetMentor | Todos los derechos reservados | RSS Feed

Buy me a coffee Invitame a un café