Ferramentas necessárias:
- Java
- Docker
- IDE de sua preferencia
- Configuração do ambiente conforme a parte 2 do artigo
Vamos criar uma aplicação com o nome de Orchestrator no spring starter com as seguintes dependências necessárias
Essa api realizara todo o controle de eventos da nossa transação de cadastro de usuário através da (Events-Driven Architecture), nela ira conter a logica de envio de mensagens para seguir com a saga de cadastrar usuário ou para finalizar/realizar rolback conforme os tópicos de cada aplicação.
Conforme a api anterior nessa api teremos o application.yml contendo as configs do tópico kafka e também a classe de configuração do Kafka e tópicos.
A saga e iniciada na api orquestradora através de um listener do kafka que recebe a mensagem enviada pela api User-Service, nessa classe SagaOrchestratorConsumer também estão configurados os listeners que continua a saga e que finalizam a saga com sucesso ou falha.
package br.com.orchestrator.application.consumer;
import br.com.orchestrator.core.service.OrchestrationService;
import br.com.orchestrator.core.utils.JsonUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@AllArgsConstructor
public class SagaOrchestratorConsumer {
private final OrchestrationService service;
private final JsonUtil jsonUtil;
@KafkaListener(
groupId = "${spring.kafka.consumer.group-id}",
topics = "${spring.kafka.topic.start-saga}"
)
public void consumeStartSagaEvent(String payload) {
log.info("Receiving event {} from start-saga topic", payload);
var event = jsonUtil.toEvent(payload);
service.startSaga(event);
}
@KafkaListener(
groupId = "${spring.kafka.consumer.group-id}",
topics = "${spring.kafka.topic.orchestrator}"
)
public void consumeOrchestratorEvent(String payload) {
log.info("Receiving event {} from orchestrator topic", payload);
var event = jsonUtil.toEvent(payload);
service.continueSaga(event);
}
@KafkaListener(
groupId = "${spring.kafka.consumer.group-id}",
topics = "${spring.kafka.topic.finish-success}"
)
public void consumeFinishSagaSuccessEvent(String payload) {
log.info("Receiving event {} from finish-success topic", payload);
var event = jsonUtil.toEvent(payload);
service.finishSagaSuccess(event);
}
@KafkaListener(
groupId = "${spring.kafka.consumer.group-id}",
topics = "${spring.kafka.topic.finish-fail}"
)
public void consumeFinishSagaFailEvent(String payload) {
log.info("Receiving event {} from finish-fail topic", payload);
var event = jsonUtil.toEvent(payload);
service.finishSagaFail(event);
}
}
Após receber a mensagem chamamos nosso service conforme a regra de negocio recebida via tópico kafka que utiliza a classe SagaExecutionController para seguir com o fluxo da saga via event driven conforme veremos logo mais abaixo.
package br.com.orchestrator.core.service;
import br.com.orchestrator.application.dto.Event;
import br.com.orchestrator.application.dto.History;
import br.com.orchestrator.application.producer.SagaOrchestratorProducer;
import br.com.orchestrator.application.saga.SagaExecutionController;
import br.com.orchestrator.core.enums.ETopics;
import br.com.orchestrator.core.utils.JsonUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import static br.com.orchestrator.core.enums.EEventSource.ORCHESTRATOR;
import static br.com.orchestrator.core.enums.ESagaStatus.FAIL;
import static br.com.orchestrator.core.enums.ESagaStatus.SUCCESS;
@Slf4j
@Service
@AllArgsConstructor
public class OrchestrationService {
private final SagaOrchestratorProducer producer;
private final JsonUtil jsonUtil;
private final SagaExecutionController sagaExecutionController;
public void startSaga(Event event) {
event.setSource(ORCHESTRATOR);
event.setStatus(SUCCESS);
var topic = getTopic(event);
log.info("SAGA STARTED!");
addHistory(event, "Saga started!");
sendToProducerWithTopic(event, topic);
}
public void finishSagaSuccess(Event event) {
event.setSource(ORCHESTRATOR);
event.setStatus(SUCCESS);
log.info("SAGA FINISHED SUCCESSFULLY FOR EVENT {}!", event.getId());
addHistory(event, "Saga finished successfully!");
notifyFinishedSaga(event);
}
public void finishSagaFail(Event event) {
event.setSource(ORCHESTRATOR);
event.setStatus(FAIL);
log.info("SAGA FINISHED WITH ERRORS FOR EVENT {}!", event.getId());
addHistory(event, "Saga finished with errors!");
notifyFinishedSaga(event);
}
public void continueSaga(Event event) {
var topic = getTopic(event);
log.info("SAGA CONTINUING FOR EVENT {}", event.getId());
sendToProducerWithTopic(event, topic);
}
private ETopics getTopic(Event event) {
return sagaExecutionController.getNextTopic(event);
}
private void addHistory(Event event, String message) {
var history = new History(event.getSource(), event.getStatus(), message, LocalDateTime.now());
event.addToHistory(history);
}
private void sendToProducerWithTopic(Event event, ETopics topic) {
producer.sendEvent(jsonUtil.toJson(event), topic.getTopic());
}
private void notifyFinishedSaga(Event event) {
producer.sendEvent(jsonUtil.toJson(event), ETopics.NOTIFY_ENDING.getTopic());
}
}
Nessa api teremos uma classe SagaHandler que terá um método estático que retornara um vetor de uma matriz, utilizaremos essa matriz para que a api orquestradora entenda qual tópico ira enviar a mensagem para dar seguimento a saga de cadastro de usuário.
package br.com.orchestrator.application.saga;
import static br.com.orchestrator.core.enums.EEventSource.*;
import static br.com.orchestrator.core.enums.ESagaStatus.*;
import static br.com.orchestrator.core.enums.ETopics.*;
public final class SagaHandler {
private SagaHandler() {
}
public static final Object[][] SAGA_HANDLER = {
{ ORCHESTRATOR, SUCCESS, ADRESS_VALIDATION_SUCCESS },
{ ORCHESTRATOR, FAIL, FINISH_FAIL },
{ ADRESS_VALIDATION_SERVICE, ROLLBACK_PENDING, ADRESS_VALIDATION_FAIL },
{ ADRESS_VALIDATION_SERVICE, FAIL, FINISH_FAIL },
{ ADRESS_VALIDATION_SERVICE, SUCCESS, VALIDATED_SUCCESS },
{ VALIDATED_SERVICE, ROLLBACK_PENDING, VALIDATED_FAIL },
{ VALIDATED_SERVICE, FAIL, ADRESS_VALIDATION_FAIL },
{ VALIDATED_SERVICE, SUCCESS, REGISTRATION_SUCCESS },
{ REGISTRATION_SERVICE, ROLLBACK_PENDING, REGISTRATION_FAIL },
{ REGISTRATION_SERVICE, FAIL, REGISTRATION_FAIL },
{ REGISTRATION_SERVICE, SUCCESS, FINISH_SUCCESS }
};
public static final int EVENT_SOURCE_INDEX = 0;
public static final int SAGA_STATUS_INDEX = 1;
public static final int TOPIC_INDEX = 2;
}
Criaremos uma classe SagaExecutionController que ira ser utilizada para controlar toda a execução da saga, nesta classe que utilizamos a matriz acima para saber qual tópico kafka será enviada a mensagem para seguir saga.
package br.com.orchestrator.application.saga;
import br.com.orchestrator.application.dto.Event;
import br.com.orchestrator.application.exception.ValidationException;
import br.com.orchestrator.core.enums.ETopics;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import static br.com.orchestrator.application.saga.SagaHandler.*;
import static java.lang.String.format;
import static org.springframework.util.ObjectUtils.isEmpty;
@Slf4j
@Component
@AllArgsConstructor
public class SagaExecutionController {
private static final String SAGA_LOG_ID = "USER ID: %s | TRANSACTION ID %s | EVENT ID %s";
public ETopics getNextTopic(Event event) {
if (isEmpty(event.getSource()) || isEmpty(event.getStatus())) {
throw new ValidationException("Source and status must be informed.");
}
var topic = findTopicBySourceAndStatus(event);
logCurrentSaga(event, topic);
return topic;
}
private ETopics findTopicBySourceAndStatus(Event event) {
return (ETopics) (Arrays.stream(SAGA_HANDLER)
.filter(row -> isEventSourceAndStatusValid(event, row))
.map(i -> i[TOPIC_INDEX])
.findFirst()
.orElseThrow(() -> new ValidationException("Topic not found!")));
}
private boolean isEventSourceAndStatusValid(Event event,
Object[] row) {
var source = row[EVENT_SOURCE_INDEX];
var status = row[SAGA_STATUS_INDEX];
return source.equals(event.getSource()) && status.equals(event.getStatus());
}
private void logCurrentSaga(Event event, ETopics topic) {
var sagaId = createSagaId(event);
var source = event.getSource();
switch (event.getStatus()) {
case SUCCESS -> log.info("### CURRENT SAGA: {} | SUCCESS | NEXT TOPIC {} | {}",
source, topic, sagaId);
case ROLLBACK_PENDING -> log.info("### CURRENT SAGA: {} | SENDING TO ROLLBACK CURRENT SERVICE | NEXT TOPIC {} | {}",
source, topic, sagaId);
case FAIL -> log.info("### CURRENT SAGA: {} | SENDING TO ROLLBACK PREVIOUS SERVICE | NEXT TOPIC {} | {}",
source, topic, sagaId);
}
}
private String createSagaId(Event event) {
return format(SAGA_LOG_ID,
event.getPayload().getId(), event.getTransactionId(), event.getId());
}
}
O codigo dessa aplicação ficara assim.
API Saga Pattern - Orchestrator
Top comments (0)