In this article let's try to create closer to real world Event Sourcing CQRS microservice using: ππ¨βπ»π
EventStoreDB The database built for Event Sourcing
gRPC Go implementation of gRPC
MongoDB Web and API based SMTP testing
Elasticsearch Elasticsearch client for Go.
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
swag Swagger for Go
Echo web framework
Kibana Kibana is user interface that lets you visualize your Elasticsearch
Source code you can find in GitHub repository
The main idea of this project is the implementation of Event Sourcing and CQRS using Go, EventStoreDB, gRPC, MongoDB and ElasticSearch.
Didn't write in this article about Event Sourcing and CQRS patterns, because it makes the article huge and the best place to read is [ (, found this article is very good too, and highly recommend Alexey Zimarev "Hands-on Domain-Driven Design with .NET Core" book and also his blog.
In this project we have microservice working with EventStoreDB using oficial go client, for [projections ( used MongoDB and Elasticsearch for search,
and communicate by gRPC and REST.
Did not implement here any interesting business logic and didn't cover tests, because don't have enough time,
the events list is very simple: create a new order, update shopping cart, pay, submit, cancel, change the delivery address, complete order, and of course in real-world better use more concrete and meaningfully events, but the target here is to show the idea and how it works.
Event Sourcing can be implemented in different ways, used here EventStoreDB, but we can do it with PostgreSQL and Kafka for example.
After trying both approaches, found EventStoreDB is a better solution because all required features are implemented out of the box, it is optimized and really very good engineers developing it.
For local development:
make local or docker_dev // for run docker compose
make run_es // run es microservice
make dev // run all in docker compose with hot reload
All UI interfaces will be available on ports:
EventStoreDB UI: http://localhost:2113
Jaeger UI: http://localhost:16686
Prometheus UI: http://localhost:9090
Grafana UI: http://localhost:3005
Swagger UI: http://localhost:5007/swagger/index.html
Kibana UI: http://localhost:5601/app/home#/
Docker compose file for this project,
depending on your environment, use image for eventstoredb: eventstore/eventstore:21.6.0-buster-slim,
and for M1:
version: "3.8"
image: eventstore/eventstore:21.6.0-buster-slim
- "1113:1113"
- "2113:2113"
- type: volume
source: eventstore-volume-data
target: /var/lib/eventstore
- type: volume
source: eventstore-volume-logs
target: /var/log/eventstore
networks: [ "microservices" ]
image: prom/prometheus:latest
container_name: prometheus
- "9090:9090"
- --config.file=/etc/prometheus/prometheus.yml
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
container_name: node_exporter_container
restart: always
image: prom/node-exporter
- '9101:9100'
networks: [ "microservices" ]
container_name: grafana_container
restart: always
image: grafana/grafana
- '3005:3000'
networks: [ "microservices" ]
image: mongo:latest
restart: always
- "27017:27017"
- mongodb_data_container:/data/db
networks: [ "microservices" ]
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.21
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"
- "14250:14250"
- "9411:9411"
networks: [ "microservices" ]
container_name: node01
restart: always
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms128m -Xmx128m"
soft: -1
hard: -1
- es-data01:/usr/share/elasticsearch/data
- "9200:9200"
networks: [ "microservices" ]
restart: always
ELASTICSEARCH_HOSTS: http://node01:9200
- "5601:5601"
- node01
networks: [ "microservices" ]
driver: local
name: microservices
First, we look at EventStoreDB and then add some code for working with it, we can look at streams and events
EventStoreDB ui:
AggregateRoot can be implemented in different ways, for this project and Go specific, the main aggregate root methods - load events and apply changes.
When we fetch the aggregate from the database, instead of reading its state as one record in a table or document, we read all events that were saved before and call the When method for each.
After all these steps, we will recover all the history of a given aggregate. By doing this, we will be bringing our aggregate to its latest state.
type AggregateBase struct {
ID string
Version int64
AppliedEvents []Event
UncommittedEvents []Event
Type AggregateType
when when
func (a *AggregateBase) Apply(event Event) error {
if event.GetAggregateID() != a.GetID() {
return ErrInvalidAggregateID
if err := a.when(event); err != nil {
return err
a.UncommittedEvents = append(a.UncommittedEvents, event)
return nil
// RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore
func (a *AggregateBase) RaiseEvent(event Event) error {
if event.GetAggregateID() != a.GetID() {
return ErrInvalidAggregateID
if a.GetVersion() >= event.GetVersion() {
return ErrInvalidEventVersion
if err := a.when(event); err != nil {
return err
a.Version = event.GetVersion()
return nil
The Event struct is abstraction under esdb.RecordedEvent and esdb.EventData for comfortable work with it.
An event represents a fact that took place in the domain. They are the source of truth; your current state is derived from the events.
They are immutable and represent the business facts.
It means that we never change or remove anything in the database, we only append new events.
// EventType is the type of any event, used as its unique identifier.
type EventType string
// Event is an internal representation of an event, returned when the Aggregate
// uses NewEvent to create a new event. The events loaded from the db is
// represented by each DBs internal event type, implementing Event.
type Event struct {
EventID string
EventType string
Data []byte
Timestamp time.Time
AggregateType AggregateType
AggregateID string
Version int64
Metadata []byte
In this example we don't use snapshots, so the AggregateStore interface:
// AggregateStore is responsible for loading and saving aggregates.
type AggregateStore interface {
Load(ctx context.Context, aggregate Aggregate) error
Save(ctx context.Context, aggregate Aggregate) error
Exists(ctx context.Context, streamID string) error
Implementation of AggregateStore is Load, Save and Exists methods,
Load and Save accept aggregate then load or apply events using EventStoreDB client.
The Load method: find out the stream name for an aggregate, read all of the events from the aggregate stream,
loop through all of the events, and call the RaiseEvent handler for each of them.
And the Save method persists aggregates by saving the history of changes, handling concurrency, when you retrieve a stream from EventStoreDB, you take note of the current version number, then when you save it back you can determine if somebody else has modified the record in the meantime.
func (a *aggregateStore) Load(ctx context.Context, aggregate es.Aggregate) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "aggregateStore.Load")
defer span.Finish()
span.LogFields(log.String("AggregateID", aggregate.GetID()))
stream, err := a.db.ReadStream(ctx, aggregate.GetID(), esdb.ReadStreamOptions{}, count)
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "db.ReadStream")
defer stream.Close()
for {
event, err := stream.Recv()
if errors.Is(err, esdb.ErrStreamNotFound) {
tracing.TraceErr(span, err)
return errors.Wrap(err, "stream.Recv")
if errors.Is(err, io.EOF) {
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "stream.Recv")
esEvent := es.NewEventFromRecorded(event.Event)
if err := aggregate.RaiseEvent(esEvent); err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "RaiseEvent")
a.log.Debugf("(Load) esEvent: {%s}", esEvent.String())
a.log.Debugf("(Load) aggregate: {%s}", aggregate.String())
return nil
func (a *aggregateStore) Save(ctx context.Context, aggregate es.Aggregate) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "aggregateStore.Save")
defer span.Finish()
span.LogFields(log.String("aggregate", aggregate.String()))
if len(aggregate.GetUncommittedEvents()) == 0 {
a.log.Debugf("(Save) [no uncommittedEvents] len: {%d}", len(aggregate.GetUncommittedEvents()))
return nil
eventsData := make([]esdb.EventData, 0, len(aggregate.GetUncommittedEvents()))
for _, event := range aggregate.GetUncommittedEvents() {
eventsData = append(eventsData, event.ToEventData())
var expectedRevision esdb.ExpectedRevision
if len(aggregate.GetAppliedEvents()) == 0 {
expectedRevision = esdb.NoStream{}
a.log.Debugf("(Save) expectedRevision: {%T}", expectedRevision)
appendStream, err := a.db.AppendToStream(
esdb.AppendToStreamOptions{ExpectedRevision: expectedRevision},
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "db.AppendToStream")
a.log.Debugf("(Save) stream: {%+v}", appendStream)
return nil
readOps := esdb.ReadStreamOptions{Direction: esdb.Backwards, From: esdb.End{}}
stream, err := a.db.ReadStream(context.Background(), aggregate.GetID(), readOps, 1)
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "db.ReadStream")
defer stream.Close()
lastEvent, err := stream.Recv()
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "stream.Recv")
expectedRevision = esdb.Revision(lastEvent.OriginalEvent().EventNumber)
a.log.Debugf("(Save) expectedRevision: {%T}", expectedRevision)
appendStream, err := a.db.AppendToStream(
esdb.AppendToStreamOptions{ExpectedRevision: expectedRevision},
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "db.AppendToStream")
a.log.Debugf("(Save) stream: {%+v}", appendStream)
return nil
For the next step let's create an order aggregate, we have to add es.AggregateBase struct and implement When interface:
type OrderAggregate struct {
Order *models.Order
func (a *OrderAggregate) When(evt es.Event) error {
switch evt.GetEventType() {
case v1.OrderCreated:
return a.onOrderCreated(evt)
case v1.OrderPaid:
return a.onOrderPaid(evt)
case v1.OrderSubmitted:
return a.onOrderSubmitted(evt)
case v1.OrderCompleted:
return a.onOrderCompleted(evt)
case v1.OrderCanceled:
return a.onOrderCanceled(evt)
case v1.ShoppingCartUpdated:
return a.onShoppingCartUpdated(evt)
case v1.DeliveryAddressChanged:
return a.onChangeDeliveryAddress(evt)
return es.ErrInvalidEventType
func (a *OrderAggregate) onOrderCreated(evt es.Event) error {
var eventData v1.OrderCreatedEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
a.Order.AccountEmail = eventData.AccountEmail
a.Order.ShopItems = eventData.ShopItems
a.Order.Created = true
a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
a.Order.DeliveryAddress = eventData.DeliveryAddress
return nil
func (a *OrderAggregate) onOrderPaid(evt es.Event) error {
var payment models.Payment
if err := evt.GetJsonData(&payment); err != nil {
return errors.Wrap(err, "GetJsonData")
a.Order.Paid = true
a.Order.Payment = payment
return nil
func (a *OrderAggregate) onOrderSubmitted(evt es.Event) error {
a.Order.Submitted = true
return nil
func (a *OrderAggregate) onOrderCompleted(evt es.Event) error {
var eventData v1.OrderCompletedEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
a.Order.Completed = true
a.Order.DeliveredTime = eventData.DeliveryTimestamp
a.Order.Canceled = false
return nil
func (a *OrderAggregate) onOrderCanceled(evt es.Event) error {
var eventData v1.OrderCanceledEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
a.Order.Canceled = true
a.Order.Completed = false
a.Order.CancelReason = eventData.CancelReason
return nil
func (a *OrderAggregate) onShoppingCartUpdated(evt es.Event) error {
var eventData v1.ShoppingCartUpdatedEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
a.Order.ShopItems = eventData.ShopItems
a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
return nil
func (a *OrderAggregate) onChangeDeliveryAddress(evt es.Event) error {
var eventData v1.OrderDeliveryAddressChangedEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
a.Order.DeliveryAddress = eventData.DeliveryAddress
return nil
For example, let's look at, create order case, onCreateOrderCommand handle command, validate order state, serialize data and create CreateOrderEvent:
func (a *OrderAggregate) CreateOrder(ctx context.Context, shopItems []*models.ShopItem, accountEmail, deliveryAddress string) error {
span, _ := opentracing.StartSpanFromContext(ctx, "OrderAggregate.CreateOrder")
defer span.Finish()
span.LogFields(log.String("AggregateID", a.GetID()))
if a.Order.Created {
return ErrAlreadyCreated
if shopItems == nil {
return ErrOrderShopItemsIsRequired
if deliveryAddress == "" {
return ErrInvalidDeliveryAddress
event, err := eventsV1.NewOrderCreatedEvent(a, shopItems, accountEmail, deliveryAddress)
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "NewOrderCreatedEvent")
if err := event.SetMetadata(tracing.ExtractTextMapCarrier(span.Context())); err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "SetMetadata")
return a.Apply(event)
Then aggregate handle event using onOrderCreated method which only applies changes to the state:
func (a *OrderAggregate) onOrderCreated(evt es.Event) error {
var eventData v1.OrderCreatedEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
a.Order.AccountEmail = eventData.AccountEmail
a.Order.ShopItems = eventData.ShopItems
a.Order.Created = true
a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
a.Order.DeliveryAddress = eventData.DeliveryAddress
return nil
Our microservice accept http or gRPC requests:
For swagger used swag and let's look at create order handler code:
// CreateOrder
// @Tags Orders
// @Summary Create order
// @Description Create new order
// @Param order body dto.CreateOrderReqDto true "create order"
// @Accept json
// @Produce json
// @Success 201 {string} id ""
// @Router /orders [post]
func (h *orderHandlers) CreateOrder() echo.HandlerFunc {
return func(c echo.Context) error {
ctx, span := tracing.StartHttpServerTracerSpan(c, "orderHandlers.CreateOrder")
defer span.Finish()
var reqDto dto.CreateOrderReqDto
if err := c.Bind(&reqDto); err != nil {
h.log.Errorf("(Bind) err: {%v}", err)
tracing.TraceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
if err := h.v.StructCtx(ctx, reqDto); err != nil {
h.log.Errorf("(validate) err: {%v}", err)
tracing.TraceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
id := uuid.NewV4().String()
command := v1.NewCreateOrderCommand(id, reqDto.ShopItems, reqDto.AccountEmail, reqDto.DeliveryAddress)
err := h.os.Commands.CreateOrder.Handle(ctx, command)
if err != nil {
h.log.Errorf("(CreateOrder.Handle) id: {%s}, err: {%v}", id, err)
tracing.TraceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
h.log.Infof("(order created) id: {%s}", id)
return c.JSON(http.StatusCreated, id)
and gRPC CreateOrder handler does the same as http handler, validate request and call command.
For validation used validator because of implements value validations for structs and individual fields based on tags.
func (s *orderGrpcService) CreateOrder(ctx context.Context, req *orderService.CreateOrderReq) (*orderService.CreateOrderRes, error) {
ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "orderGrpcService.CreateOrder")
defer span.Finish()
span.LogFields(log.String("req", req.String()))
aggregateID := uuid.NewV4().String()
command := v1.NewCreateOrderCommand(aggregateID, models.ShopItemsFromProto(req.GetShopItems()), req.GetAccountEmail(), req.GetDeliveryAddress())
if err := s.v.StructCtx(ctx, command); err != nil {
s.log.Errorf("(validate) aggregateID: {%s}, err: {%v}", aggregateID, err)
tracing.TraceErr(span, err)
return nil, s.errResponse(err)
if err := s.os.Commands.CreateOrder.Handle(ctx, command); err != nil {
s.log.Errorf("(CreateOrder.Handle) orderID: {%s}, err: {%v}", aggregateID, err)
return nil, s.errResponse(err)
s.log.Infof("(created order): orderID: {%s}", aggregateID)
return &orderService.CreateOrderRes{AggregateID: aggregateID}, nil
http and gRPC handlers do the same, validate the incoming request and call command service with CreateOrder command,
which load OrderAggregate, call HandleCommand method and save it to event store.
The main reason for CQRS gaining popularity is the ability to handle reads and writes separately due to severe differences in optimization techniques for those much more distinct operations.
func (c *createOrderHandler) Handle(ctx context.Context, command *CreateOrderCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createOrderHandler.Handle")
defer span.Finish()
span.LogFields(log.String("AggregateID", command.GetAggregateID()))
order := aggregate.NewOrderAggregateWithID(command.AggregateID)
err :=, order.GetID())
if err != nil && !errors.Is(err, esdb.ErrStreamNotFound) {
return err
if err := order.CreateOrder(ctx, command.ShopItems, command.AccountEmail, command.DeliveryAddress); err != nil {
return err
span.LogFields(log.String("order", order.String()))
return, order)
The process of building a piece of state from events is called a projection.
EventStoreDB has subscriptions, so we can subscribe our projections for order type stream events.
When we execute a command, the aggregate generates a new event that represents the state transitions of the aggregate. Those events are committed to the store, so the store appends them to the end of the aggregate stream. A subscription receives these events and updates its read models.
MongoDB's projection subscribes to events stream using the persistent subscription.
and process events using When method, like aggregate it applies changes depending on the event type:
func (o *mongoProjection) ProcessEvents(ctx context.Context, stream *esdb.PersistentSubscription, workerID int) error {
for {
event := stream.Recv()
select {
case <-ctx.Done():
return ctx.Err()
if event.SubscriptionDropped != nil {
o.log.Errorf("(SubscriptionDropped) err: {%v}", event.SubscriptionDropped.Error)
return errors.Wrap(event.SubscriptionDropped.Error, "Subscription Dropped")
if event.EventAppeared != nil {
o.log.ProjectionEvent(constants.MongoProjection, o.cfg.Subscriptions.MongoProjectionGroupName, event.EventAppeared, workerID)
err := o.When(ctx, es.NewEventFromRecorded(event.EventAppeared.Event))
if err != nil {
o.log.Errorf("(mongoProjection.when) err: {%v}", err)
if err := stream.Nack(err.Error(), esdb.Nack_Retry, event.EventAppeared); err != nil {
o.log.Errorf("(stream.Nack) err: {%v}", err)
return errors.Wrap(err, "stream.Nack")
err = stream.Ack(event.EventAppeared)
if err != nil {
o.log.Errorf("(stream.Ack) err: {%v}", err)
return errors.Wrap(err, "stream.Ack")
o.log.Infof("(ACK) event commit: {%v}", *event.EventAppeared.Commit)
func (o *mongoProjection) When(ctx context.Context, evt es.Event) error {
ctx, span := tracing.StartProjectionTracerSpan(ctx, "mongoProjection.When", evt)
defer span.Finish()
span.LogFields(log.String("AggregateID", evt.GetAggregateID()), log.String("EventType", evt.GetEventType()))
switch evt.GetEventType() {
case v1.OrderCreated:
return o.onOrderCreate(ctx, evt)
case v1.OrderPaid:
return o.onOrderPaid(ctx, evt)
case v1.OrderSubmitted:
return o.onSubmit(ctx, evt)
case v1.ShoppingCartUpdated:
return o.onShoppingCartUpdate(ctx, evt)
case v1.OrderCanceled:
return o.onCancel(ctx, evt)
case v1.OrderCompleted:
return o.onCompleted(ctx, evt)
case v1.DeliveryAddressUpdated:
return o.onDeliveryAddressUpdated(ctx, evt)
o.log.Warnf("(mongoProjection) [When unknown EventType] eventType: {%s}", evt.EventType)
return es.ErrInvalidEventType
and onOrderCreate method deserialize data and handle event calling MongoDB repository insert method:
func (o *mongoProjection) onOrderCreate(ctx context.Context, evt es.Event) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongoProjection.onOrderCreate")
defer span.Finish()
span.LogFields(log.String("AggregateID", evt.GetAggregateID()))
var eventData v1.OrderCreatedEvent
if err := evt.GetJsonData(&eventData); err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "evt.GetJsonData")
span.LogFields(log.String("AccountEmail", eventData.AccountEmail))
op := &models.OrderProjection{
OrderID: aggregate.GetOrderAggregateID(evt.AggregateID),
ShopItems: eventData.ShopItems,
Created: true,
AccountEmail: eventData.AccountEmail,
TotalPrice: aggregate.GetShopItemsTotalPrice(eventData.ShopItems),
DeliveryAddress: eventData.DeliveryAddress,
_, err := o.mongoRepo.Insert(ctx, op)
if err != nil {
return err
return nil
The OrderMongoRepository insert method is simple:
func (m *mongoRepository) Insert(ctx context.Context, order *models.OrderProjection) (string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.Insert")
defer span.Finish()
span.LogFields(log.String("OrderID", order.OrderID))
_, err := m.getOrdersCollection().InsertOne(ctx, order, &options.InsertOneOptions{})
if err != nil {
tracing.TraceErr(span, err)
return "", err
return order.OrderID, nil
The same idea for ElasticSearch projection implementation, we can use it for searching orders, so let's look at the repository Search method.
At this project as elastic search client used elastic.
func (e *elasticRepository) Search(ctx context.Context, text string, pq *utils.Pagination) (*dto.OrderSearchResponseDto, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepository.Search")
defer span.Finish()
span.LogFields(log.String("Search", text))
shouldMatch := v7.NewBoolQuery().
Should(v7.NewMatchPhrasePrefixQuery(shopItemTitle, text), v7.NewMatchPhrasePrefixQuery(shopItemDescription, text)).
searchResult, err := e.elasticClient.Search(e.cfg.ElasticIndexes.Orders).
if err != nil {
tracing.TraceErr(span, err)
return nil, errors.Wrap(err, "elasticClient.Search")
orders := make([]*models.OrderProjection, 0, len(searchResult.Hits.Hits))
for _, hit := range searchResult.Hits.Hits {
jsonBytes, err := hit.Source.MarshalJSON()
if err != nil {
tracing.TraceErr(span, err)
return nil, errors.Wrap(err, "Source.MarshalJSON")
var order models.OrderProjection
if err := json.Unmarshal(jsonBytes, &order); err != nil {
tracing.TraceErr(span, err)
return nil, errors.Wrap(err, "json.Unmarshal")
orders = append(orders, &order)
return &dto.OrderSearchResponseDto{
Pagination: dto.Pagination{
TotalCount: searchResult.TotalHits(),
TotalPages: int64(pq.GetTotalPages(int(searchResult.TotalHits()))),
Page: int64(pq.GetPage()),
Size: int64(pq.GetSize()),
HasMore: pq.GetHasMore(int(searchResult.TotalHits())),
Orders: mappers.OrdersFromProjections(orders),
}, nil
More details and source code of the full project you can find here,
of course in real-world applications, we have to implement many more necessary features, like circuit breaker, retries, rate limiters, etc.,
depending on the project it can be implemented in different ways, for example, you can use Kubernetes and Istio for some of them.
I hope this article is usefully and helpfully, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)
