Ferramentas necessárias:
Crie uma aplicação com o nome de sua escolha no spring starter com as seguintes dependências necessárias
Dentro do arquivo pom.xml adicione a dependencia abaixo, pois vamos utilizar o Gson para serializar nossa mensagem recebida em String para um objeto de uma classe Java.
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
Na nossa segunda aplicação, que eu chamei de API Orquestradora, vamos configurar o elasticsearch e a fila Jms que utilizaremos para realizar o proposito da aplicação.
Criamos a classe ElasticsearchClientConfig onde configuramos o acesso a nosso elasticsearch rodando no docker.
ElasticsearchClientConfig
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
@Configuration
@EnableElasticsearchRepositories(basePackages
= "br.com.orquestrador.infrastructure.repository.elasticsearch")
@ComponentScan(basePackages = { "br.com.orquestrador" })
public class ElasticsearchClientConfig extends
AbstractElasticsearchConfiguration {
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
final ClientConfiguration clientConfiguration =
ClientConfiguration
.builder()
.connectedTo("localhost:9200")
.build();
return RestClients.create(clientConfiguration).rest();
}
}
E também uma classe Repository com o nome UserRepository para utilizarmos o método de save para salvar usuário no elasticsearch e o metodo de buscar por nome.
UserRepository
import br.com.orquestrador.user.User;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@Repository
public interface UserRepository extends ElasticsearchRepository<User, String> {
Optional<User> findByName(String name);
}
Agora vamos configurar a parte de fila para acessar o ActiveMq rodando em nosso docker, para isto criamos a classe JmsConfig e adicionamos algumas configs da filas utilizadas no application.properties.
JmsConfig
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
@Configuration
@EnableJms
public class JmsConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String user;
@Value("${spring.activemq.password}")
private String password;
@Bean
public ActiveMQConnectionFactory connectionFactory() {
if ( "".equals(user) ) {
return new ActiveMQConnectionFactory(brokerUrl);
}
return new ActiveMQConnectionFactory(user, password, brokerUrl);
}
@Bean
public JmsListenerContainerFactory jmsFactoryTopic(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
@Bean
public JmsTemplate jmsTemplate() {
return new JmsTemplate(connectionFactory());
}
@Bean
public JmsTemplate jmsTemplateTopic() {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setPubSubDomain( true );
return jmsTemplate;
}
}
Vamos configurar um Listener Kafka para ouvir nosso tópico que esta produzindo as mensagens na aplicação anterior.
Para isso criamos uma classe KafkaConsumer que ira ficar ouvindo nosso tópico e a cada mensagem produzida realizar a logica que definimos.
- Passo 1 - Serializar a mensagem recebida em um objeto Java
- Passo 2 - Enviar este objeto para uma fila do Activemq
- Passo 3 - Salvar os dados deste usuario no elasticsearch KafkaConsumer
import br.com.orquestrador.application.ListenerKafka.dto.UserDto;
import br.com.orquestrador.user.User;
import br.com.orquestrador.user.UserService;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@Autowired
private Gson serializer;
@Autowired
private UserService userService;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
public KafkaConsumer(Gson serializer) {
this.serializer = serializer;
}
@KafkaListener(topics = "${user.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void receive( @Payload String message) {
logger.info("message received: {}", message);
UserDto usuarioDto = serializer.fromJson(message, UserDto.class);
jmsTemplate.convertAndSend("queue.sample", message);
User usuario = usuarioDto.converte();
logger.info(usuario.toString());
String messageFinal = userService.save(usuario);
logger.info(messageFinal);
}
}
Abaixo segue o link do github onde a aplicação esta armazenada para que possam conferir como a mesma ficou.
Top comments (0)