Olá!
Esta é a segunda parte de uma série sobre Event Sourcing (ES) e, neste artigo, pretendemos apresentar versionamento de modelo, persistência e recuperação de eventos.
No primeiro artigo falamos sobre como adicionar ao modelo de domínio o suporte a eventos. Vimos que a partir de um modelo base seria possível atribuir ao nosso modelo de domínio as capacidades necessárias para atuar com eventos.
Veremos neste artigo como persistir os eventos de nossos modelos, reconstituir seu estado a partir dos eventos persistidos, e garantir sua consistência.
Vamos lá!
Versionando nosso modelo
Uma novidade em relação ao modelo do artigo anterior é que, a partir de agora, para fins de consistência, nosso modelo conterá um número de versão.
Este número de versão do modelo é útil para cenários onde haja concorrência. Ou seja, onde dois eventos podem ocorrer quase simultâneamente e levar nosso modelo a um estado inconsistente. Vamos falar sobre isso em detalhes mais à frente.
Por enquanto, confira abaixo a nova versão de nossa classe EventBase
, renomeada para ModelEventBase
:
namespace Lab.EventSourcing.Core
{
public abstract class ModelEventBase : IEvent
{
public Guid ModelId { get; private set; }
public int ModelVersion { get; private set; }
public DateTime When { get; private set; }
public ModelEventBase(Guid modelId, int modelVersion) =>
(ModelId, ModelVersion, When) = (modelId, modelVersion, DateTime.Now);
}
}
Repare que temos aqui, além do momento em que ocorreu o evento, When
, o ID de nosso modelo e sua versão no momento em que o evento é gerado.
Para que estas mesmas informações integrem o estado de nosso modelo, precisamos adicioná-las à nossa classe base de modelos, EventSourcingModel
:
namespace Lab.EventSourcing.Core
{
public abstract class EventSourcingModel
{
private Queue<IEvent> _pendingEvents = new Queue<IEvent>();
public IEnumerable<IEvent> PendingEvents { get => _pendingEvents.AsEnumerable(); }
public Guid Id { get; protected set; }
public int Version { get; protected set; } = 0;
protected int NextVersion { get => Version + 1; }
protected EventSourcingModel(IEnumerable<ModelEventBase> persisted)
{
foreach (var e in persisted)
{
Apply(e);
Version = e.ModelVersion;
}
}
protected void RaiseEvent<TEvent>(TEvent pendingEvent) where TEvent: ModelEventBase
{
_pendingEvents.Enqueue(pendingEvent);
Apply(pendingEvent);
Version = pendingEvent.ModelVersion;
}
protected abstract void Apply(IEvent pendingEvent);
public void Commit() =>
_pendingEvents.Clear();
}
}
Aqui temos mudanças mais significativas em relação à versão do antigo anterior.
Foi criada uma propriedade que indica qual seria a próxima versão de nosso modelo, para facilitar a criação de eventos. Um construtor, que recebe como argumentos uma coleção de eventos, também foi criado, o que nos permitirá reconstituir o estado de nosso modelo a partir do repositório de eventos.
Por fim, o método Apply foi transformado em abstrato, delegando aos nossos modelos a implementação de métodos específicos para cada tipo de evento.
Nota: O construtor desta classe é protegido para impedir o programador de inicializar acidentalmente o modelo com eventos não gerados pelo sistema, preservando assim a consistência de nosso modelo. Veremos a utilidade desta abordagem ao visitarmos a recuperação de eventos persistidos, mais à frente.
Por fim, temos um novo modelo de domínio Inventory
, que representa o registro de estoque que pode receber ou entregar produtos.
namespace Lab.EventSourcing.Inventory
{
public class Inventory : EventSourcingModel
{
private readonly ConcurrentDictionary<Guid, int> _stock = new ConcurrentDictionary<Guid, int>();
protected Inventory(IEnumerable<ModelEventBase> events) : base(events) {}
public static Inventory Create()
{
var inventory = new Inventory(Enumerable.Empty<ModelEventBase>());
inventory.RaiseEvent(new InventoryCreated(Guid.NewGuid()));
return inventory;
}
public void AddProduct(Guid id, int quantity)
{
if (quantity == 0)
throw new InvalidOperationException("The quantity must be greater than zero.");
RaiseEvent(new ProductAdded(Id, NextVersion, id, quantity));
}
public void RemoveProduct(Guid id, int quantity)
{
if (!_stock.ContainsKey(id))
throw new InvalidOperationException("Product not found.");
if (_stock[id] < quantity)
throw new InvalidOperationException($"The requested quantity is unavailable. Current quantity: {_stock[id]}.");
RaiseEvent(new ProductRemoved(Id, NextVersion, id, quantity));
}
public int GetProductCount(Guid productId)
{
return _stock.TryGetValue(productId, out int quantity)
? quantity
: 0;
}
protected override void Apply(IEvent pendingEvent)
{
switch(pendingEvent)
{
case InventoryCreated created:
Apply(created);
break;
case ProductAdded added:
Apply(added);
break;
case ProductRemoved removed:
Apply(removed);
break;
default:
throw new ArgumentException($"Invalid event type: {pendingEvent.GetType()}.");
}
}
protected void Apply(InventoryCreated pending) =>
Id = pending.ModelId;
protected void Apply(ProductAdded pending) =>
_stock.AddOrUpdate(pending.ProductId, pending.Quantity,
(productId, currentQuantity) => currentQuantity += pending.Quantity);
protected void Apply(ProductRemoved pending) =>
_stock[pending.ProductId] -= pending.Quantity;
}
}
...
public class InventoryCreated : ModelEventBase
{
public InventoryCreated(Guid modelId) : base(modelId, 1) { }
}
public class ProductAdded : ModelEventBase
{
public Guid ProductId { get; private set; }
public int Quantity { get; private set; }
public ProductAdded(Guid modelId, int modelVersion, Guid productId, int quantity)
: base(modelId, modelVersion) =>
(ProductId, Quantity) = (productId, quantity);
}
public class ProductRemoved : ModelEventBase
{
public Guid ProductId { get; private set; }
public int Quantity { get; private set; }
public ProductRemoved(Guid modelId, int modelVersion, Guid productId, int quantity)
: base(modelId, modelVersion) =>
(ProductId, Quantity) = (productId, quantity);
}
Repare que os eventos lançados ProductAdded
e ProductRemoved
passaram a incluir o ID de nosso modelo e também sua versão, conforme requerido por ModelEventBase
.
Importante! Repare que os eventos carregados em nosso modelo, a partir de seu construtor, são aplicados para atualizar seu estado e, em seguida, descartados.
Há uma abordagem que preserva os eventos persistidos em uma coleção que coexiste com a coleção de eventos pendentes (PendingEvents
), geralmente chamada dePersistedEvents
, a fim de fornecer todo o histórico do modelo combinando ambas.
Por entender que esta abordagem não é necessária à compreensão do padrão, ela não foi incluída neste exemplo.
Com isso temos o necessário para seguir em frente!
Repositório de eventos (event store)
O local onde os eventos de nosso modelo serão persistidos é chamado de repositório de eventos (event store).
Este repositório pode ser um banco relacional, um banco NoSQL, ou alguma outra forma segura de armazenamento. A única exigência sobre o repositório de eventos é que ele seja capaz de fornecer um modelo de consultas que permita carregar todos os eventos de uma entidade, e filtrar os eventos por versão ou mesmo por período, a fim de saber qual era o estado de um objeto num dado momento.
Nota: Neste artigo, será apresentada uma versão de Event Store implementada com o Entity Framework Core InMemory que, para fins gerais, deve ser considerado um banco MsSQL. Desta forma, ao baixar o código relativo a este artigo, não será necessário nenhum setup de banco de dados.
Abaixo a implementação de nosso repositório de eventos:
namespace Lab.EventSourcing.Core
{
public class EventStore
{
private readonly EventStoreDbContext _eventStoreContext;
public static EventStore Create() =>
new EventStore();
private EventStore()
{
_eventStoreContext = new EventStoreDbContext(new DbContextOptionsBuilder<EventStoreDbContext>()
.UseInMemoryDatabase(databaseName: "EventStore")
.EnableSensitiveDataLogging()
.Options);
}
public void Commit<TModel>(TModel model) where TModel : EventSourcingModel
{
var events = model.PendingEvents.Select(e => PersistentEvent.Create(model.Id,
((ModelEventBase)e).ModelVersion,
((ModelEventBase)e).When,
e.GetType().AssemblyQualifiedName,
JsonConvert.SerializeObject(e)));
_eventStoreContext.Events.AddRange(events);
_eventStoreContext.SaveChanges();
model.Commit();
}
public TModel GetById<TModel>(Guid id) where TModel : EventSourcingModel =>
LoadModel<TModel>(e => e.ModelId == id);
public TModel GetByVersion<TModel>(Guid id, int version) where TModel : EventSourcingModel =>
LoadModel<TModel>(e => e.ModelId == id && e.ModelVersion <= version);
public TModel GetByTime<TModel>(Guid id, DateTime until) where TModel : EventSourcingModel =>
LoadModel<TModel>(e => e.ModelId == id && e.When <= until);
private TModel LoadModel<TModel>(Expression<Func<PersistentEvent, bool>> expression) where TModel : EventSourcingModel
{
var events = _eventStoreContext.Events.Where(expression)
.OrderBy(e => e.ModelVersion)
.Select(e => JsonConvert.DeserializeObject(e.Data, Type.GetType(e.EventType)))
.Cast<ModelEventBase>();
return (TModel)Activator.CreateInstance(typeof(TModel),
BindingFlags.NonPublic | BindingFlags.Instance,
null,
new[] { events } ,
CultureInfo.InvariantCulture);
}
private class EventStoreDbContext : DbContext
{
public EventStoreDbContext(DbContextOptions<EventStoreDbContext> options) : base(options) { }
public DbSet<PersistentEvent> Events { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder) =>
modelBuilder.Entity<PersistentEvent>().HasKey(k => new { k.ModelId, k.ModelVersion });
}
}
}
Vamos detalhar um pouco nosso repositório.
Ele começa com um método chamado Commit
que, recebe um modelo, persiste seus eventos e, em seguida, invoca o método de mesmo nome no modelo, a fim de limpar a lista de eventos não persistidos. Ou seja, toda vez que nossos eventos pendentes forem persistidos, nosso modelo será orientado a descartá-los.
Repare que, antes de persistir os eventos, para cada um deles é criada uma instância de PersistentEvent
. Esta classe é responsável por permitir a persistência de nosso modelo de forma serializada em nossa base de dados, com metadados que tornam sua recuperação mais simples.
namespace Lab.EventSourcing.Core
{
public class PersistentEvent
{
public Guid ModelId { get; private set; }
public int ModelVersion { get; private set; }
public DateTime When { get; private set; }
public string EventType { get; private set; }
public string Data { get; private set; }
public static PersistentEvent Create(Guid id, int version, DateTime when, string eventType, string data) =>
new PersistentEvent
{
ModelId = id,
ModelVersion = version,
When = when,
EventType = eventType,
Data = data
};
}
}
Importante! Os metadados de persistência são uma necessidade exclusiva para bancos de dados relacionais, que é o caso de nosso exemplo. No caso do uso de bancos NoSQL, apenas o próprio evento é necessário.
Em seguida, temos três métodos para carregar nosso modelo, GetById
, GetByVersion
, GetByTime
.
GetById
vai recuperar todos os eventos ocorridos com o nosso modelo. Já GetByVersion
vai recuperar todos os eventos até uma versão específica. Por fim, GetByTime
vai recuperar todos os eventos até um determinado momento no tempo. Todos estes métodos retornam os eventos ordenados por versão, de modo a garantir a consistência de nosso modelo.
E, chamado por todos estes métodos, vemos o método LoadModel<TModel>
que efetivamente cria a instância de nosso modelo, usando seu construtor protegido, e passando como parâmetros os eventos encontrados.
Finalmente, na criação de nosso EventStoreDbContext
, do EntityFramework, no método OnModelCreating
, temos a criação de uma chave composta pelo ID de nosso modelo e por sua versão. Esta chave um tipo de trava otimista (em inglês), que manterá nosso modelo consistente em cenários com concorrência.
A importância do versionamento do modelo
Como dito acima, o versionamento de modelos é interessante em cenários onde se prevê concorrência. Imagine que existam diversas instâncias de um serviço de controle de estoque que responde a uma loja virtual, e o seguinte fluxo:
Um fornecedor A
entrega 10 unidades de um produto X
para o estoque, gerando um evento do tipo ProductAdded
. Esta operação abastece nosso estoque e o disponibiliza para a loja virtual.
Em seguida, dois consumidores resolvem adquirir este produto na loja virtual, sendo que o consumidor M
pede 7 unidades, e N
pede 5. Perceba que, neste caso, não há estoque o bastante para garantir a compra dos dois consumidores. Então apenas um deles deverá conseguir concluir seu pedido: o que chegar primeiro!
Para garantir que apenas o primeiro consumidor consiga reservar seus produtos, a chave acima foi criada em nosso repositório de eventos. Desta forma, o segundo cliente, ao tentar reservar os produtos, receberá um aviso de indisponibilidade.
Esta combinação entre o versionamento em nosso modelo, e uma chave em nosso banco de dados, tende a garantir que nosso modelo se manterá em um estado consistente ao longo de seu ciclo de vida.
Próximos passos
No próximo artigo vamos falar sobre snapshots, uma forma de reduzir o esforço para reconstruir nossos modelos quando a cadeia de eventos que os compõem é muito longa.
A partir deste link é possível clonar o código-fonte do exemplo deste artigo, com alguns testes inclusos para validar os comportamentos mencionados neste artigo.
Caso tenha alguma dúvida sobre o conteúdo deste artigo, me avise pelos comentários que responderei assim que possível. Se gostou desta continuação, me deixe saber pelos indicadores.
Até o próximo artigo!
Top comments (4)
Muito obrigado pelo artigo!
Hoje em dia é difícil achar conteúdo sobre Event Sourcing em português, mesmo estudando em inglês, o conteúdo sobre ES é mais teorias do que prática.
Ansioso pela terceira parte :)
Sou eu quem agradece o reconhecimento, Landerson!
E você tem razão. ES costuma ser mostrado como algo abstrato, ou com implementações confusas e sem muita explicação.
Minha intenção é trazer o mínimo viável para que se entenda o padrão.
Logo mais chega a parte 3! Te agradeço por acompanhar.
Abraço!
Massa demais William, muito legal esse conteudo em português! Acompanhando a serie aqui, abração!
Valeu pelo apoio de sempre, Vinícius! Semana que vem tem mais!
Abração.