Ferramentas necessárias:
Para iniciarmos nossa aplicação, será necessário preparar o ambiente local para termos acessos as tecnologias utilizadas, vamos utilizar a magia do Docker e rodar o kafka, mongodb e postgres.
Crie uma pasta com o nome a seu gosto e dentro da mesma crie um arquivo com o nome docker-compose.yaml e dentro deste arquivo copie o seguinte codigo.
version: '3'
services:
user-db:
image: mongo:latest
container_name: user-db
restart: always
networks:
- orchestrator-saga
environment:
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=123456
ports:
- 27017:27017
adress-db:
image: 'postgres:alpine'
container_name: adress-db
volumes:
- adress-volume:/var/lib/postgresql/data
ports:
- 5433:5432
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: password
POSTGRES_DB: adress-db
POSTGRES_HOST: adress-db
networks:
- orchestrator-saga
userregistration-db:
image: 'postgres:alpine'
container_name: userregistration-db
volumes:
- userregistration-volume:/var/lib/postgresql/data
ports:
- 5434:5432
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: password
POSTGRES_DB: userregistration-db
POSTGRES_HOST: userregistration-db
networks:
- orchestrator-saga
zookeeper:
image: confluentinc/cp-zookeeper:latest
networks:
- orchestrator-saga
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
restart: "no"
ports:
- "2181:2181"
- "9092:9092"
networks:
- orchestrator-saga
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
redpanda-console:
container_name: redpanda
image: docker.redpanda.com/vectorized/console:latest
restart: on-failure
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
ports:
- "8087:8087"
networks:
- orchestrator-saga
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["kafka:29092"]
depends_on:
- "kafka"
volumes:
adress-volume:
userregistration-volume:
networks:
orchestrator-saga:
driver: bridge
Agora dentro da pasta execute o seguinte comando docker-compose up -d
Dentro da interface visual do docker você devera visualizar os containers rodando para cada tecnologia que utilizaremos.
Agora vamos utilizar o spring starter para criar nossa primeira estrutura de API.
Vamos adicionar uma nova lib no nosso arquivo pom.xml para que seja gerado automaticamente a documentação via swagger da nossa api.
<!-- https://mvnrepository.com/artifact/org.springdoc/springdoc-openapi-starter-webmvc-ui -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.0.4</version>
</dependency>
Feito isso, vamos configurar nosso tópicos kafka e acesso ao mongoDb através do arquivo de properties da aplicação, defina o arquivo de application.properties igual o código abaixo.
server.port=3000
spring.kafka.bootstrap-servers=${KAFKA_BROKER:localhost:9092}
spring.kafka.topic.start-saga=start-saga
spring.kafka.topic.notify-ending=notify-ending
spring.kafka.consumer.group-id=user-group
spring.kafka.consumer.auto-offset-reset=latest
spring.data.mongodb.database=admin
spring.data.mongodb.uri=${MONGO_DB_URI:mongodb://admin:123456@localhost:27017}
logging.level.org.apache.kafka=OFF
para configurar nosso topico Kafka vamos criar uma classe de configuração contendo as seguintes config.
package br.com.userservice.infrastructure.kafka;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
private static final Integer PARTITION_COUNT = 1;
private static final Integer REPLICA_COUNT = 1;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.topic.start-saga}")
private String startSagaTopic;
@Value("${spring.kafka.topic.notify-ending}")
private String notifyEndingTopic;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
var props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerProps());
}
private Map<String, Object> producerProps() {
var props = new HashMap<String, Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
private NewTopic buildTopic(String name) {
return TopicBuilder
.name(name)
.partitions(PARTITION_COUNT)
.replicas(REPLICA_COUNT)
.build();
}
@Bean
public NewTopic startSagaTopic() {
return buildTopic(startSagaTopic);
}
@Bean
public NewTopic notifyEndingTopic() {
return buildTopic(notifyEndingTopic);
}
}
Após isto criamos uma classe controller que ira receber a requisição via http e chamar uma classe service para enviar uma mensagem com dados para nosso servico Orchestrator que ira start/comandar/finalizar todo resto de envio de mensagens via topico Kafka para finalizar a transação de salvar um usuário.
Uma classe Service que ira agregar/gerir a regra de negocio relacionada a transação, nesse caso enviar a mensagem com os dados para o serviço de orquestração.
O codigo dessa aplicação ficara assim.
API Saga Pattern - User Service
Top comments (0)