DEV Community

Hernani Almeida
Hernani Almeida

Posted on

Saga Pattern in Microservices - Parte 3

Ferramentas necessárias:

Vamos criar uma aplicação com o nome de Orchestrator no spring starter com as seguintes dependências necessárias

Image description
Essa api realizara todo o controle de eventos da nossa transação de cadastro de usuário através da (Events-Driven Architecture), nela ira conter a logica de envio de mensagens para seguir com a saga de cadastrar usuário ou para finalizar/realizar rolback conforme os tópicos de cada aplicação.
Conforme a api anterior nessa api teremos o application.yml contendo as configs do tópico kafka e também a classe de configuração do Kafka e tópicos.
A saga e iniciada na api orquestradora através de um listener do kafka que recebe a mensagem enviada pela api User-Service, nessa classe SagaOrchestratorConsumer também estão configurados os listeners que continua a saga e que finalizam a saga com sucesso ou falha.

package br.com.orchestrator.application.consumer;

import br.com.orchestrator.core.service.OrchestrationService;
import br.com.orchestrator.core.utils.JsonUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@AllArgsConstructor
public class SagaOrchestratorConsumer {

    private final OrchestrationService service;
    private final JsonUtil jsonUtil;

    @KafkaListener(
        groupId = "${spring.kafka.consumer.group-id}",
        topics = "${spring.kafka.topic.start-saga}"
    )
    public void consumeStartSagaEvent(String payload) {
        log.info("Receiving event {} from start-saga topic", payload);
        var event = jsonUtil.toEvent(payload);
        service.startSaga(event);
    }

    @KafkaListener(
        groupId = "${spring.kafka.consumer.group-id}",
        topics = "${spring.kafka.topic.orchestrator}"
    )
    public void consumeOrchestratorEvent(String payload) {
        log.info("Receiving event {} from orchestrator topic", payload);
        var event = jsonUtil.toEvent(payload);
        service.continueSaga(event);
    }

    @KafkaListener(
        groupId = "${spring.kafka.consumer.group-id}",
        topics = "${spring.kafka.topic.finish-success}"
    )
    public void consumeFinishSagaSuccessEvent(String payload) {
        log.info("Receiving event {} from finish-success topic", payload);
        var event = jsonUtil.toEvent(payload);
        service.finishSagaSuccess(event);
    }

    @KafkaListener(
        groupId = "${spring.kafka.consumer.group-id}",
        topics = "${spring.kafka.topic.finish-fail}"
    )
    public void consumeFinishSagaFailEvent(String payload) {
        log.info("Receiving event {} from finish-fail topic", payload);
        var event = jsonUtil.toEvent(payload);
        service.finishSagaFail(event);
    }
}

Enter fullscreen mode Exit fullscreen mode

Após receber a mensagem chamamos nosso service conforme a regra de negocio recebida via tópico kafka que utiliza a classe SagaExecutionController para seguir com o fluxo da saga via event driven conforme veremos logo mais abaixo.

package br.com.orchestrator.core.service;

import br.com.orchestrator.application.dto.Event;
import br.com.orchestrator.application.dto.History;
import br.com.orchestrator.application.producer.SagaOrchestratorProducer;
import br.com.orchestrator.application.saga.SagaExecutionController;
import br.com.orchestrator.core.enums.ETopics;
import br.com.orchestrator.core.utils.JsonUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;

import static br.com.orchestrator.core.enums.EEventSource.ORCHESTRATOR;
import static br.com.orchestrator.core.enums.ESagaStatus.FAIL;
import static br.com.orchestrator.core.enums.ESagaStatus.SUCCESS;

@Slf4j
@Service
@AllArgsConstructor
public class OrchestrationService {

    private final SagaOrchestratorProducer producer;
    private final JsonUtil jsonUtil;
    private final SagaExecutionController sagaExecutionController;

    public void startSaga(Event event) {
        event.setSource(ORCHESTRATOR);
        event.setStatus(SUCCESS);
        var topic = getTopic(event);
        log.info("SAGA STARTED!");
        addHistory(event, "Saga started!");
        sendToProducerWithTopic(event, topic);
    }

    public void finishSagaSuccess(Event event) {
        event.setSource(ORCHESTRATOR);
        event.setStatus(SUCCESS);
        log.info("SAGA FINISHED SUCCESSFULLY FOR EVENT {}!", event.getId());
        addHistory(event, "Saga finished successfully!");
        notifyFinishedSaga(event);
    }

    public void finishSagaFail(Event event) {
        event.setSource(ORCHESTRATOR);
        event.setStatus(FAIL);
        log.info("SAGA FINISHED WITH ERRORS FOR EVENT {}!", event.getId());
        addHistory(event, "Saga finished with errors!");
        notifyFinishedSaga(event);
    }

    public void continueSaga(Event event) {
        var topic = getTopic(event);
        log.info("SAGA CONTINUING FOR EVENT {}", event.getId());
        sendToProducerWithTopic(event, topic);
    }

    private ETopics getTopic(Event event) {
        return sagaExecutionController.getNextTopic(event);
    }

    private void addHistory(Event event, String message) {
        var history = new History(event.getSource(), event.getStatus(), message, LocalDateTime.now());
        event.addToHistory(history);
    }

    private void sendToProducerWithTopic(Event event, ETopics topic) {
        producer.sendEvent(jsonUtil.toJson(event), topic.getTopic());
    }

    private void notifyFinishedSaga(Event event) {
        producer.sendEvent(jsonUtil.toJson(event), ETopics.NOTIFY_ENDING.getTopic());
    }
}

Enter fullscreen mode Exit fullscreen mode

Nessa api teremos uma classe SagaHandler que terá um método estático que retornara um vetor de uma matriz, utilizaremos essa matriz para que a api orquestradora entenda qual tópico ira enviar a mensagem para dar seguimento a saga de cadastro de usuário.

package br.com.orchestrator.application.saga;


import static br.com.orchestrator.core.enums.EEventSource.*;
import static br.com.orchestrator.core.enums.ESagaStatus.*;
import static br.com.orchestrator.core.enums.ETopics.*;

public final class SagaHandler {

    private SagaHandler() {

    }

    public static final Object[][] SAGA_HANDLER = {
        { ORCHESTRATOR, SUCCESS, ADRESS_VALIDATION_SUCCESS },
        { ORCHESTRATOR, FAIL, FINISH_FAIL },

        { ADRESS_VALIDATION_SERVICE, ROLLBACK_PENDING, ADRESS_VALIDATION_FAIL },
        { ADRESS_VALIDATION_SERVICE, FAIL, FINISH_FAIL },
        { ADRESS_VALIDATION_SERVICE, SUCCESS, VALIDATED_SUCCESS },

        { VALIDATED_SERVICE, ROLLBACK_PENDING, VALIDATED_FAIL },
        { VALIDATED_SERVICE, FAIL, ADRESS_VALIDATION_FAIL },
        { VALIDATED_SERVICE, SUCCESS, REGISTRATION_SUCCESS },

        { REGISTRATION_SERVICE, ROLLBACK_PENDING, REGISTRATION_FAIL },
        { REGISTRATION_SERVICE, FAIL, REGISTRATION_FAIL },
        { REGISTRATION_SERVICE, SUCCESS, FINISH_SUCCESS }
    };

    public static final int EVENT_SOURCE_INDEX = 0;
    public static final int SAGA_STATUS_INDEX = 1;
    public static final int TOPIC_INDEX = 2;
}

Enter fullscreen mode Exit fullscreen mode

Criaremos uma classe SagaExecutionController que ira ser utilizada para controlar toda a execução da saga, nesta classe que utilizamos a matriz acima para saber qual tópico kafka será enviada a mensagem para seguir saga.

package br.com.orchestrator.application.saga;

import br.com.orchestrator.application.dto.Event;
import br.com.orchestrator.application.exception.ValidationException;
import br.com.orchestrator.core.enums.ETopics;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Arrays;

import static br.com.orchestrator.application.saga.SagaHandler.*;
import static java.lang.String.format;
import static org.springframework.util.ObjectUtils.isEmpty;

@Slf4j
@Component
@AllArgsConstructor
public class SagaExecutionController {

    private static final String SAGA_LOG_ID = "USER ID: %s | TRANSACTION ID %s | EVENT ID %s";

    public ETopics getNextTopic(Event event) {
        if (isEmpty(event.getSource()) || isEmpty(event.getStatus())) {
            throw new ValidationException("Source and status must be informed.");
        }
        var topic = findTopicBySourceAndStatus(event);
        logCurrentSaga(event, topic);
        return topic;
    }

    private ETopics findTopicBySourceAndStatus(Event event) {
        return (ETopics) (Arrays.stream(SAGA_HANDLER)
            .filter(row -> isEventSourceAndStatusValid(event, row))
            .map(i -> i[TOPIC_INDEX])
            .findFirst()
            .orElseThrow(() -> new ValidationException("Topic not found!")));
    }

    private boolean isEventSourceAndStatusValid(Event event,
                                                Object[] row) {
        var source = row[EVENT_SOURCE_INDEX];
        var status = row[SAGA_STATUS_INDEX];
        return source.equals(event.getSource()) && status.equals(event.getStatus());
    }

    private void logCurrentSaga(Event event, ETopics topic) {
        var sagaId = createSagaId(event);
        var source = event.getSource();
        switch (event.getStatus()) {
            case SUCCESS -> log.info("### CURRENT SAGA: {} | SUCCESS | NEXT TOPIC {} | {}",
                source, topic, sagaId);
            case ROLLBACK_PENDING -> log.info("### CURRENT SAGA: {} | SENDING TO ROLLBACK CURRENT SERVICE | NEXT TOPIC {} | {}",
                source, topic, sagaId);
            case FAIL -> log.info("### CURRENT SAGA: {} | SENDING TO ROLLBACK PREVIOUS SERVICE | NEXT TOPIC {} | {}",
                source, topic, sagaId);
        }
    }

    private String createSagaId(Event event) {
        return format(SAGA_LOG_ID,
            event.getPayload().getId(), event.getTransactionId(), event.getId());
    }
}

Enter fullscreen mode Exit fullscreen mode

O codigo dessa aplicação ficara assim.
API Saga Pattern - Orchestrator

Parte 4

linkedin
github

Top comments (0)