DEV Community

Isaac Ojeda
Isaac Ojeda

Posted on

Integration Events: Implementando comunicación entre servicios con MassTransit y ASP.NET

Introducción

En este post (a la solicitud de Erick, gracias por las sugerencias) vamos a ver como podemos implementar Integration Events de una manera muy sencilla en .NET.

Utilizaremos una librería llamada MassTransit que nos permitirá crear eventos de integración entre dos o más sistemas y así poder diseñar sistemas distribuidos sin tanto detalle de implementación.

Los Message Brokers suelen ser algo complicado, pero como siempre, reinventar la rueda no siempre es algo bueno. Uno, por falta de expertise y dos, por falta de tiempo.

Existen varias soluciones que nos permiten empezar a crear sistemas distribuidos sin tener que ser un experto en cada tema en particular. Pero en el tema de mensajería, MassTransit viene a salvarnos.

Ya mencioné los Integration Events en este post y su diferencia con los Domain Events. Leelo si aun no lo has visto, espero también te sea de utilidad.

Te recuerdo que todo el código lo puedes encontrar en mi github en este repositorio.

Comunicación basada en eventos

Cuando utilizamos comunicación entre servicios basada en eventos, un servicio publica un evento cuando algo importante ha sucedido, usualmente al actualizar un entity. Otros servicios (AKA microservicios) se suscriben a esos eventos para reaccionar según se necesite cuando estos eventos sucedan.

Cuando un servicio recibe un evento, este podría actualizar su domain según se necesite o según los cambios sucedidos en otro servicio. Esto es la esencia del concepto eventual consistency.

Este patrón de publish/suscribe siempre es recomendado utilizar una abstracción de un event bus. El event bus generalmente será una abstracción del Message Broker (quien manda y distribuye los mensajes) y lo más sencillo será, tener un método para suscribirnos/desuscribirnos a eventos y publicar eventos (AKA mensajes).

Podemos usar este tipo de eventos para crear transacciones que involucran múltiples servicios, el cual eventualmente tendremos una consistencia entre esos servicios. La eventual consistency consiste en una serie de operaciones distribuidas. En cada acción, el servicio actualiza su entity y de ser necesario, triggerea más eventos, y eventualmente, el estado del dominio de todos los subsistemas estará completo (consistente).

Como ejemplo, veremos lo que sucede cuando tenemos una “canasta” de compras y en el catálogo de productos el precio cambió:

Image description

Cuando el precio de un producto fue actualizado en el catálogo, este debe de informar a otros servicios (subsistemas, microservicios, etc) sobre el evento que acaba de suceder y así ser consistentes.

Más adelante veremos como implementar este tipo de eventos, pero como ya vimos en la imagen, existen muchas alternativas para mandar mensajes (eventos) a traves de la red. En este caso, utilizaremos una librería llamada MassTransit (lo llamaría como el Entity Framework de los message brokers) y el broker RabbitMQ (muy popular y fácil de usar en desarrollo).

MassTransit ya tiene varias implementaciones de brokers (AKA Transports) y de hecho fácilmente podríamos intercambiarlos, como usar RabbitMQ en local, pero Azure Service Bus para producción.

Utilizar MassTransit como nuestro Event Bus nos va a facilitar mucho la vida y realmente no necesitamos de conocer los detalles de la implementación, aunque dependiendo de lo que usemos en producción, estamos obligados a profundizar en él.

La resilencia, seguridad y escalabilidad son temas que nos importan en nuestro Event Bus, ya que debemos de considerar que si algo falla (ejem. un mensaje no se pudo entregar) la eventual consistencia habrá fallado (es un tema más complicado que valdrá la pena enfocarnos en otro post en el futuro) y nos dejará con un estado “roto”.

Integration Events

Entonces, los Integration Events son usados para mantener en sincronización el estado del dominio entre distintos subsistemas o sistemas externos. Esta funcionalidad se logra publicando eventos fuera de nuestro servicio

Nota 💡: Recuerden mi post pasado, los Domain Events suceden en el mismo servicio y los integration events suceden en distintos servicios (distintas aplicaciones, servidores, etc)

Cuando un evento es publicado, múltiples consumers están listos para accionar según se requiera (el equivalente a los Event Handlers de los Domain Events, pero en distintos servicios)

Así como los Domain Events, un Integration Events termina siendo una clase (POCO) que representa el evento y datos relevantes al evento. Estas clases deberían de estarse replicando en cada proyecto, simplemente para no “acoplar” los servicios entre sí. Si eso no es problema (como lo haremos en este ejemplo) podríamos tener una librería compartida, que, en el personal, no le veo el problema.

Nota 💡: MassTransit trata a los tipos de .NET por su nombre completo (namespaces y tipo) entonces si tenemos los mensajes replicados en distintos proyectos para no acoplarlos, estos deben de tener el mismo namespace y nombre, sino, serán tratados como mensajes distintos.

Event Bus

Al final, es el event bus el que nos permite tener este patrón Pub/Sub entre los servicios y nos abstrae la implementación del Transport que usemos. MassTransit ya incluye su propio Event Bus, pero si nosotros mismos hiciéramos la implementación, deberíamos de hacerlo totalmente de una forma abstracta para no depender de la implementación del transport y poder cambiarlo fácilmente.

Image description

En la imagen vemos como el microservicio A publica un mensaje y el Event Bus se encarga de distribuir esos mensajes a sus respectivos consumers. Aquí el microservicio A no sabe nada sobre el microservicio B y C, pero las business rules necesitan que ellos estén enterados de lo que acaba de suceder.

Esta es la importancia de tener un Event Bus, este nos ayuda a cumplir a que los servicios sean anónimos y que ese “middleman” se encargue de entregar los mensajes en donde se deben entregar.

También, como mencioné antes, el Event Bus es la abstracción del Transport, entonces nuestra aplicación no se ve afectada si cambiamos de RabbitMQ a Azure Service Bus (y como también no paro de mencionarlo, hacerlo con MassTransit es mucho mejor, ya que no reinventamos la rueda).

Si decides implementar tu propio Event Bus, podrías perderte de funcionalidad que ya existe en otros como MassTransit o NServiceBus. Pero igual, todo depende de las necesidades del proyecto.

Implementación den ASP.NET Core

Para implementar este proof of concept vamos a crear tres proyectos en una solución en visual studio:

IntegrationEventsExample/
├─ Basket/
├─ Catalog/
├─ Messages/

Te recomiendo que sigas este ejemplo viendo el código fuente, ya que omitiré ciertas partes de código para fines prácticos.

Requisitos

El requisito que puede ser un blocker para algunos es tener RabbitMQ corriendo, y para mí la forma más fácil es haciéndolo con Docker.

Por lo tanto, para seguir este post hay que tener Docker instalado y corriendo, existe mucho material (de la página oficial) que te ayuda a tener Docker funcionando.

Si no quieres usar docker, puedes seguir esta guía de RabbitMQ para instalarlo con chocolatey.

Corriendo RabbitMQ

Para correr RabbitMQ con docker, lo que yo hago es crear un docker-compose de la infraestructura que se necesita.

En este caso, solo necesitamos de RabbitMQ, pero fácilmente en este docker-compose podríamos poner bases de datos sql, redis caché, elastic search, etc.

version: "3.5"

services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    restart: always
    ports:
      - 5672:5672
      - 15672:15672
    networks:
      - balusoft
    volumes: 
      - rabbitmq:/var/lib/rabbitmq

networks:
  balusoft:
    name: balusoft-network

volumes:
  rabbitmq:
    driver: local
Enter fullscreen mode Exit fullscreen mode

Y para correr este archivo yaml, corremos docker-compose:

docker-compose -f .\infrastructure.yml up
Enter fullscreen mode Exit fullscreen mode

Podemos visitar http://localhost:15672/ para comprobar que todo esté funcionando:

Image description

Para iniciar sesión, usamos el usuario/contraseña default guest. Aunque por ahora, no se verá ninguna actividad.

Basket

Antes de entrar en detalles, para este proyecto tenemos las siguientes dependencias:

<PackageReference Include="MassTransit.AspNetCore" Version="7.3.1" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.3.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="6.0.2" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.2.3" />
Enter fullscreen mode Exit fullscreen mode

En este servicio se maneja el carrito de compras o en este ejemplo, la canasta de compras. Aquí simplemente tenemos un Entity principal que es el Basket y los BasketProducts, lo que sencillamente es, una canasta con productos.

No entraré en detalles de los Entities y DbContext, para eso te pido que veas el código en el repositorio. Al final, es muy sencillo, es una base de datos en memoria para mantener el ejemplo lo más simple posible.

Basket es aquel servicio que va a consumir eventos, ya que, desde el catálogo de productos, puede que el precio sea diferente si este es actualizado. Si hay canastas activas con dichos productos y los precios cambiaron, estos deben de ser actualizados para mantener esa eventual consistency.

Dentro del proyecto compartido Messages pondremos la siguiente definición que representará el Integration Event:

namespace Messages;

public class ProductPriceChanged
{
    public ProductPriceChanged(int productId, double newPrice)
    {
        ProductId = productId;
        NewPrice = newPrice;
    }
    public int ProductId { get; set; }
    public double NewPrice { get; set; }
}
Enter fullscreen mode Exit fullscreen mode

Esto sencillamente indica lo que ha sucedido, el precio cambió.

Creando un Consumer

El consumer viene siendo el Handler, lo que hay que realizar debido a lo que ha ocurrido:

using Basket.Persistence;
using MassTransit;
using Messages;
using Microsoft.EntityFrameworkCore;

namespace Basket.IntegrationEvents.Consumers;

public class ProductPriceChangedConsumer : IConsumer<ProductPriceChanged>
{
    private readonly ILogger<ProductPriceChangedConsumer> _log;
    private readonly BasketDbContext _context;

    public ProductPriceChangedConsumer(ILogger<ProductPriceChangedConsumer> log, BasketDbContext context)
    {
        _log = log;
        _context = context;
    }
    public async Task Consume(ConsumeContext<ProductPriceChanged> context)
    {
        _log.LogInformation("Nuevo evento: Precio actualizado del producto {0}.", context.Message.ProductId);
        _log.LogWarning("Price: {0}", context.Message.NewPrice);

        // Actualizar todas las canastas de compra que tienen ese producto
        var baskets = await _context.Baskets
            .Include(i => i.Products)
            .Where(q => q.Products.Any(a => a.ProductId == context.Message.ProductId))
            .ToListAsync();

        foreach (var basket in baskets)
        {
            var productToUpdate = basket.Products
                .FirstOrDefault(q => q.ProductId == context.Message.ProductId);

            if (productToUpdate is null)
            {
                // TODO: Error?
                continue;
            }

            productToUpdate.UnitCost = context.Message.NewPrice;
            productToUpdate.TotalCost = productToUpdate.UnitCost * productToUpdate.Quantity;

            _log.LogWarning("Precio actualizado en la canasta");
        }

        await _context.SaveChangesAsync();
    }
}
Enter fullscreen mode Exit fullscreen mode

Muy importante que debo mencionar, es que esto es un ejemplo, realmente así no se hace un carrito de compras.

Utilizamos IConsumer<T> de MassTransit para definir el consumer (AKA Event Handler) y aquí realizamos lo que se requiera. En este caso, actualizar precios.

El tipo genérico del IConsumer<T> es el Integration event que previamente ya definimos dentro del proyecto compartido.

Configurando MassTransit

Para agregar las dependencias que MassTransit necesita, hacemos lo siguiente:

builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<ProductPriceChangedConsumer>();
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});
builder.Services.AddMassTransitHostedService();
Enter fullscreen mode Exit fullscreen mode

Tenemos que registrar individualmente cada Consumer que vayamos creando y además se registra un Hosted Service.

Este es el que se conecta al Transport y está en espera de ejecutar los mensajes que se vayan recibiendo. Ocurre un poco de magia, pero es un alivio no tener que preocuparnos tanto de ese tema.

En este proyecto tenemos más codigo pero realmente no es relevante a los Integration Events.

Si corremos este proyecto junto con RabbitMQ, ya podremos ver que este se debe conectar exitosamente:

Image description

La conexión se realiza con éxito ya que estamos en modo desarrollo y la información y puertos usados son los defaults.

Catalog

Ahora en este proyecto, vamos a ver como publicar mensajes con el Event Bus, ya que este es el que genera el cambio del dominio (el precio).

Configurando MassTransit

builder.Services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});
Enter fullscreen mode Exit fullscreen mode

Aquí al igual que el otro proyecto, registramos las dependencias de MassTransit y su implementación con RabbitMQ. En este caso no necesitamos de un Hosted Service, ya que no tenemos Consumers de este lado.

Publicando Mensajes

Para publicar un mensaje, necesitamos del Event Bus, y lo necesitamos cuando (en este caso) se actualice el precio del producto.

Para este ejemplo, esto ocurre al actualizar un producto:

app.MapPut("api/products", async (UpdateProductRequest request, IBus bus, CatalogDbContext context) =>
{
    var product = await context.Products.FindAsync(request.ProductId);

    if (product is null)
    {
        return Results.NotFound();
    }

    var oldPrice = product.Price;

    product.Description = request.Description;
    product.Price = request.Price;

    await context.SaveChangesAsync();

    if (oldPrice != product.Price)
    {
        await bus.Publish(new ProductPriceChanged(product.ProductId, product.Price));
    }

    return Results.Ok();
});
Enter fullscreen mode Exit fullscreen mode

IBus nos permite acceder al método Publish y usamos la misma definición del Integration Event para publicar el evento.

Aquí MassTransit automaticamente crea canales de comunicación según el tipo ProductPriceChanged. Cualquier consumer que esté ligado a este tipo de dato, recibirá el mensaje si el precio se actualiza.

La verdad este ejemplo es demasiado sencillo, pero las cosas se pueden complicar más mientras nos vamos adentrado al mundo de los Event Bus.

Nota 💡: Revisa la documentación de MassTransit para conocer sobre más conceptos de este tipo de comunicación (como Sagas, máquinas de estado, mediadores, etc)

Probando la integración

Para hacer pruebas, hay que correr RabbitMQ junto con estas dos APIs y usamos Swagger para crear un par de actualizaciones de ejemplo.

Recuerda ver el código completo ya que existen un par de métodos Seed para poner información para las pruebas que no he mostrado aquí.

Entonces, por default, el producto cuesta 999 y si actualizamos a cualquier precio diferente, hay que ver que el evento se ejecute correctamente, con Swagger ejecutamos el endpoint en Catalog:

curl -X 'PUT' \
  'https://localhost:7018/api/products' \
  -H 'accept: */*' \
  -H 'Content-Type: application/json' \
  -d '{
  "productId": 1,
  "description": "test",
  "price": 12221
}'
Enter fullscreen mode Exit fullscreen mode

Y en Basket, vemos los siguientes logs:

Image description

Y también RabbitMQ nos confirma la ejecución del evento:

Image description

Y efectivamente tenemos los dos proyectos conectados a RabbitMQ

Image description

Todo esto sucede de forma mágica gracias a MassTransit

Aunque esto sucede de forma muy sencilla, lo que hay detrás de MassTransit es un trabajo fenomenal y por supuesto que puede profundizarse y personalizar a grado de que tiene TODO lo que se necesita para una aplicación enterprise y production ready.

Conclusión

Usar MassTransit para comunicación basada en eventos es una herramienta fenomenal, es open source y contiene una comunidad activa y se mantiene actualizado.

Existen otras herramientas que nos ayudan a cumplir objetivos similares, como Dapr, NServiceBus o Steeltoe. Estos nos abstraen esos detalles de implementación y nos ayuda a enfocarnos en nuestra aplicación, la infraestructura usada no nos es relevante al desarrollar (aunque, es muy importante al finalizar la implementación).

Nuestros buenos amigos de DevMentors también tienen su propia librería que nos ayuda a abstraer esta parte de mensajería utilizando Convey.

Update 💡: DevMentors acaba de subir este vídeo que se ve muy prometedor, recomiendo cualquier contenido de ellos -> Microservices communicaiton

Si quieres una introducción a microservicios y el uso de varias tecnologías presentadas aquí, te recomiendo que veas este curso de dos horas que te ayudará a obtener un mejor panorama de todo el stack que se requiere para trabajar con microservicios (como Docker, docker-compose, entre otros).

También, te recomiendo que sigas estos canales de Youtube que traen contenido relevante y muy bueno respecto a estos temas:

Referencias

Implementing event-based communication between microservices (integration events) | Microsoft Docs

Implementing an event bus with RabbitMQ for the development or test environment | Microsoft Docs

Getting Started | MassTransit (masstransit-project.com)

Discussion (1)

Collapse
erickgonzalezs profile image
Erick González Sánchez

Muchísimas gracias!!! y con Mención y todo!!! No sabes cuanto valoro tu contenido... Honestamente sigo releyendo y espero no te canse con tantas dudas porque está vez si se me están acumulando... Nomás primero quiero entender por mi cuenta todo lo posible del contenido...