O Apache Flink é um framework de processamento de dados distribuído, tanto para processamento em batch quanto em streaming. Com ele, é possível desenvolver aplicações orientadas a eventos; realizar análise de dados em batch e em streaming; além de poder ser utilizado para o desenvolvimento de pipelines de dados ETL.
Já o Pub/Sub é um serviço de mensagens assíncrono e escalonável da Google que separa os serviços que produzem mensagens dos serviços que as processam. Ele é usado para análises de streaming e pipelines de integração de dados para carregar e distribuir dados, sendo igualmente eficaz como um middleware voltado para mensagens para integração de serviços ou como uma fila para carregar tarefas em paralelo.
Nesse artigo, iremos desenvolver uma pipeline de dados bem simples em tempo real utilizando o Apache Flink em conjunto com a versão 3 da linguagem de programação Scala, fazendo o uso do Pub/Sub como message broker. Antes de começarmos, vamos alinhar as espectativas?
Primeiro, esse artigo não tem a pretenção de ser um artigo introdutório ao Apache Flink. Caso você nunca tenha ouvido falar nele antes, sugiro a leitura do first steps da documentação oficial. Leia sem medo! A documentação do Apache Flink é excelente!
Segundo, apesar do Apache Flink ter uma API oficial para a linguagem Scala, ela foi descontinuada e será removida nas próximas versões. Você pode saber mais sobre isso aqui. Todavia, como o scala é uma linguagem baseada na JVM e o Apache Flink é desenvolvido em java, é perfeitamente possível ainda utilizarmos a linguagem scala para o desenvolvimento com o Apache Fllink, porém utilizando as APIs do Java. Sim, eu também torci o nariz para isso. Ninguém merece! Mas, para deixar nossa vida mais fácil, vamos utilizar a biblioteca Flink Scala API, que é nada menos que um fork da Scala API oficial do Flink, porém completamente mantido pela comunidade. Recomendo muito essa biblioteca!
Terceiro, por fim, iremos desenvolver aqui uma pipeline de dados em tempo real bem simples. O objetivo não é fornecer um exemplo complexo, mas sim fornecer um guia para trabalhar com o Apache Flink com a linguagem Scala mais o Pub/Sub como message broker. Tive muita dificuldade de encontrar um artigo decente que utilizasse essas três tecnologias em conjunto.
O que vamos ver nesse artigo?
- 1. Definição do problema
- 2. Setup
- 3. Desenvolvimento da pipeline de dados
- 4. Executando a pipeline de dados
- 5. Considerações finais
Agora, chega de papo. Vamos começar!
1. Definição do problema
Uma determinada aplicação web é responsável por receber o cadastro inicial de novos clientes da grande empresa de varejo brasileira chamada de My Awesome Company, doravante MAC, mac.br. A aplicação envia em tempo real o cadastro inicial dos novos clientes para um tópico do Pub/Sub, e você deve desenvolver uma pipeline de dados que processa esse dado em tempo real, enriqueça o cadastro inicial do cliente com algumas informações relevantes de negócio e, por fim, o envie para um tópico final no Pub/Sub. Bem simples, não?
A aplicação web envia o seguinte payload para o Pub/Sub:
{
"fullName": "string",
"birthDate": "string"
}
Onde:
- fullName é o nome completo do cliente (dann!);
- birthDate é a data de nascimento do cliente, no formato _ano-mes-dia*;
A pipeline de dados deve enriquecer esse cadastro básico do cliente com algumas informações relevantes de negócio:
- É preciso separar o nome completo do cliente em primeiro nome e último nome;
- Deve-se calcular a idade atual do cliente com base em sua data de nascimento;
- Se o cliente tiver mais de 30 anos, o cadastro não deve ser realizado e o cliente deve constar como inativo;
- Adicionar um campo createdAt, relacionado a data de criação do cliente.
Tendo esse entendimento, vamos começar a codar!
2. Setup
Calma lá! Não vamos começar a codar ainda 🙍🏼. Vamos precisar configurar algumas coisas antes. As configurações iniciais que vamos ter que fazer são as seguintes:
- Criação dos tópicos e das assinaturas no Pub/Sub;
- Instalação das dependências necessárias para o funcionamento da pipeline de dados;
2.1. Criação dos tópicos e assinaturas no Pub/Sub
Para a criação dos tópicos e assinaturas no Pub/Sub, vamos estar utilizando a CLI oficial da Google Cloud, o gcloud. Siga essas instruções caso ainda não tenha a CLI devidamente configurada em sua máquina.
Agora, quais tópicos precisam ser criados?
- created-customer: o tópico onde a aplicação web da MAC irá enviar os payloads referentes aos cadastros iniciais dos clientes;
- registered-customer: o tópico final onde nossa pipeline de dados irá enviar os clientes com os respectivos cadastros devidamente enriquecidos;
Vamos começar pelo tópico created-customer. Para esse tópico, também precisamos criar uma assinatura padrão do tipo pull:
# criando o tópico created-customer
$ gcloud pubsub topics create created-customer
Created topic [projects/my-project-id/topics/created-customer].
# agora, criando uma assinatura do tipo pull para o tópico created-customer
$ gcloud pubsub subscriptions create created-customer-sub --topic=created-customer
Created subscription [projects/my-project-id/subscriptions/created-customer-sub].
Agora, vamos criar o tópico registered-customer. Para esse tópico, também precisamos criar uma assinatura padrão do tipo pull:
# criando o tópico registered-customer
$ gcloud pubsub topics create registered-customer
Created topic [projects/my-project-id/topics/registered-customer].
# agora, criando uma assinatura do tipo pull para o tópico registered-customer
$ gcloud pubsub subscriptions create registered-customer-sub --topic=registered-customer
Created subscription [projects/my-project-id/subscriptions/registered-customer-sub].
2.2. Instalação das dependências
Agora sim! Hora de codar! 🎉
Antes de tudo, o desenvolvimento de nossa pipeline de dados não será feito com base em projetos SBT. Vamos utilizar a Scala CLI, uma ferramenta de linha de comando que permite compilar, executar, testar e empacotar códigos Scala. Com base no Scala CLI, podemos desenvolver scripts Scala de forma muito prática e rápida!
Para a instalação das dependências, vamos utilizar um recurso do Scala CLI chamado de diretivas. Diretivas são modos de definirmos configurações dentro do próprio código fonte, sem precisar de uma ferramenta de build como o SBT para tal. Uma das diretivas que vamos utilizar é para definirmos as dependências que nossa pipeline irá utilizar, a saber:
- Apache Flink Client (a própria dependência do Apache Flink);
- Flink Scala API (uma biblioteca mantida pela comunidade que nos permite desenvolver códigos no Apache Flink com as APIs do Scala);
- Flink Connector GCP PubSub: o connector oficial do Apache Flink que nos permite enviar e receber mensagens do Pub/Sub;
- Toolkit: um conjunto de bibliotecas úteis para tarefas cotidianas, incluindo a biblioteca uPickle, utilizada para serializar e deserializar JSON;
Para começarmos, crie um diretório chamado br-mac, e um arquivo chamado Customers.sc dentro dele:
$ mkdir br-mac
...
$ cd br-mac
...
$ touch Customers.sc
...
Agora, dentro do arquivo Customers.sc, adicione as seguintes linhas que são relacionadas as diretivas para a instalação das dependências necessárias:
//> using toolkit default
//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6"
//> using dep "org.apache.flink:flink-clients:1.18.1"
//> using dep org.apache.flink:flink-connector-gcp-pubsub:3.1.0-1.18
E já adicione os imports que serão utilizados posteriormente:
import br.mac.customers.models.*
import br.mac.customers.serializations.*
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.connectors.gcp.pubsub.{PubSubSink, PubSubSource}
import org.apache.flinkx.api.*
import org.apache.flinkx.api.serializers.*
Feito! As dependências e os imports foram definidos. Vamos em frente.
3. Desenvolvimento da pipeline de dados
Chegou o momento de desenvolvermos a pipeline de dados em si com o Apache Flink! Esse desenvolvimento irá consistir em seis partes:
- Modelos e requisitos de negócio;
- Desenvolvimento dos serializers e deserializers;
- Utilização do ParameterTool para que possamos pegar algumas informações relevantes para nossa pipeline através de argumentos da linha de comando;
- Desenvolvimento do PubSubSource para que o Apache Flink possa ler os dados do tópico created-customer do Pub/Sub;
- Desenvolvimento do PubSubSink para que o Apache Flink possa enviar os dados processados para o tópico registered-customer no Pub/Sub;
- Desenvolvimento do core da pipeline de dados aplicando os requisitos de negócio;
Vam'bora?
3.1. Modelos e requisitos de negócio
'
Os modelos de negócio são as informações que iremos receber e enviar para o Pub/Sub. Como dito anteriormente, eremos receber do Pub/Sub um payload no formato JSON, e enviar um payload para o Pub/Sub também no formato JSON. Precisamos modelar esse payload em classes em Scala.
Como essas classes são representações de payloads JSON, vamos utilizar a biblioteca uPickle para que seja possível serializá-las
e deserializá-las no formato JSON. Caso ainda não conheça a biblioteca uPickle, recomendo fortemente que dê uma lida na documentação. Também é uma excelente biblioteca!
Um exemplo de payload que iremos receber, relacionado ao cadastro inicial dos clientes, é o seguinte:
{
"fullName": "John Doe",
"birthDate": "1995-01-01"
}
Já um exemplo de payload que iremos enviar para o Pub/Sub, relacionado ao cadastro final do cliente, é o seguinte:
{
"firstName": "John",
"lastName": "Doe",
"age": 29,
"isActive": true,
"createdAt": "2024-08-08T18:07:44.167635713Z"
}
Crie um outro arquivo chamado Models.scala. Observe que dessa vez a extensão do arquivo é .scala, e não .sc. Isso porque esse arquivo é um módulo Scala, e não um script Scala.
No arquivo, adicione as seguintes linhas:
package br.mac.customers.models
import upickle.default.*
final case class CreatedCustomer(fullName: String, birthDate: String) derives ReadWriter
final case class RegisteredCustomer(firstName: String, lastName: String, age: Int, isActive: Boolean, createdAt: String)
derives ReadWriter
Feito! Porém, ainda não acabamos com nossos modelos. Precisamos definir alguns métodos para que possamos satisfazer os requisitos de negócio que foram definidos, sendo eles:
- É preciso separar o nome completo do cliente em primeiro nome e último nome;
- Deve-se calcular a idade atual do cliente com base em sua data de nascimento;
- Se o cliente tiver mais de 30 anos, o cadastro não deve ser realizado e o cliente deve constar como inativo;
- Adicionar um campo createdAt, relacionado a data de criação do cliente.
O primeiro e o segundo requisito de negócio podemos definir como métodos na classe CreatedCustomer. Já o terceiro, podemos definir um construtor para a classe RegisteredCustomer que cria uma instância da classe com o atributo isActive definido como true e o atributo createdAt definido como o horário atual. O quarto requisito iremos trabalhar na própria pipeline de dados.
Para o primeiro e segundo requisito, precisamos fazer algumas importações no arquivo Models.scala:
import java.time.temporal.ChronoUnit
import java.time.{Instant, LocalDate}
E já podemos definir os métodos na classe CreatedCustomer:
final case class CreatedCustomer(fullName: String, birthDate: String) derives ReadWriter:
def firstName: String = fullName.split(" ").head
def lastName: String = fullName.split(" ").last
def age: Int = ChronoUnit.YEARS.between(LocalDate.parse(birthDate), LocalDate.now()).toInt
Por fim, vamos declarar o construtor para a classe RegisteredCustomer. Vamos fazer isso definindo o método apply no companion object:
object RegisteredCustomer:
def apply(firstName: String, lastName: String, age: Int): RegisteredCustomer =
RegisteredCustomer(firstName, lastName, age, true, Instant.now().toString)
Portanto, o código final para o arquivo Models.scala ficou dessa maneira:
package br.mac.customers.models
import upickle.default.*
import java.time.temporal.ChronoUnit
import java.time.{Instant, LocalDate}
final case class CreatedCustomer(fullName: String, birthDate: String) derives ReadWriter:
def firstName: String = fullName.split(" ").head
def lastName: String = fullName.split(" ").last
def age: Int = ChronoUnit.YEARS.between(LocalDate.parse(birthDate), LocalDate.now()).toInt
final case class RegisteredCustomer(firstName: String, lastName: String, age: Int, isActive: Boolean, createdAt: String)
derives ReadWriter
object RegisteredCustomer:
def apply(firstName: String, lastName: String, age: Int): RegisteredCustomer =
RegisteredCustomer(firstName, lastName, age, true, Instant.now().toString)
3.2. Definindo os serializers e deserializers
Quando falamos em connectores do Apache Flink, como é o caso do connector do Apache Flink para o Pub/Sub, precisamos ter em mente dois conceitos fundamentais: serializers e deserializers. Em outras palavras, serializations.
Os serializers são responsáveis por transformar os tipos de dados primitivos, tanto da linguagem Java quanto do Scala, para serem enviados para o destino no formato binário. Já os deserializers são responsáveis por transformar o dado recebido da fonte e transformá-los para instâncias de objetos das linguagens de programação utilizadas.
No nosso caso, é necessário criar um serializer que receba uma instância de uma das nossas classes recém criadas, transforme-as em strings JSON, as transforme para binário para que aí sim elas possam ser enviadas para o Pub/Sub. O processo é exatamente o oposto para os deserializers. Precisamos transformar uma mensagem, uma string JSON, que o Pub/Sub envia no formato binário e transformar essa mensagem em uma instância das classes recém criadas.
É um processo relativamente simples. Para deserializarmos a string JSON para uma instância da case class, vamos usar o uPickle. Se você já tiver familiaridade com o Flink, deve estar se perguntando porque não fazemos esse processo com a biblioteca flink-json. Simples, tive muitos problemas ao utilizá-la para deserializar as strings JSON para as case classes. Portanto, achei mais prático criar um deserializer customizado que utiliza a biblioteca uPickle para esse processo.
Chega de papo! Vamos codar!
Crie um outro arquivo no diretório chamado Serializations.scala e adicione as seguintes linhas dentro dele:
package br.mac.customers.serializations
import br.mac.customers.models.*
import org.apache.flink.api.common.serialization.{AbstractDeserializationSchema, SerializationSchema}
import upickle.default.{read, write}
Vamos criar o deserializer para a classe CreatedCustomer. Para isso, basta definir uma classe que extende a classe abstrata AbstractDeserializationSchema, e definirmos o método deserialize. Para mais informações, consulte essa documentação.
class CreatedCustomerDeserializer extends AbstractDeserializationSchema[CreatedCustomer]:
override def deserialize(message: Array[Byte]): CreatedCustomer = read[CreatedCustomer](new String(message, "UTF-8"))
Viu só? Eu disse que era simples!
Agora vamos definir o serializer para a classe RegisteredCustomer.
class RegisteredCustomerSerializer extends SerializationSchema[RegisteredCustomer]:
override def serialize(element: RegisteredCustomer): Array[Byte] =
write[RegisteredCustomer](element).getBytes("UTF-8")
O interessante dessa abordagem é que podemos utilizar qualquer biblioteca que desejarmos para serializar e deserializar strings JSON. Se estivéssemos utilizando a biblioteca flink-json, estaríamos refém em utilizar a biblioteca jackson do Java. Sim, também senti arrepio só de pensar nisso!
O código final para o arquivo Serializations.scala ficou dessa forma:
package br.mac.customers.serializations
import br.mac.customers.models.*
import org.apache.flink.api.common.serialization.{AbstractDeserializationSchema, SerializationSchema}
import upickle.default.{read, write}
class CreatedCustomerDeserializer extends AbstractDeserializationSchema[CreatedCustomer]:
override def deserialize(message: Array[Byte]): CreatedCustomer = read[CreatedCustomer](new String(message, "UTF-8"))
class RegisteredCustomerSerializer extends SerializationSchema[RegisteredCustomer]:
override def serialize(element: RegisteredCustomer): Array[Byte] =
write[RegisteredCustomer](element).getBytes("UTF-8")
Terminamos por aqui com os serializers e deserializers. Vamos continuar!
3.3. Argumentos da pipeline
Para que possamos deixar nossa pipeline o mais flexível possível, devemos ter um modo de recebermos alguns parâmetros que são relevantes para o funcionamento da nossa aplicação, sem a necessidade de deixarmos essas informações hard-coded. Essas informações são:
- ID do projeto do Google Cloud Platform;
- Nome da assinatura do Pub/Sub de onde o Apache Fllink irá consumir os dados;
- Nome do tópico do Pub/Sub onde o Apache Flink irá enviar os dados processados;
Para isso, vamos receber essas informações através de argumentos da linha de comando. Para isso, vamos utilizar um utillitário built-in do Apache Flink chamado ParameterTool. Você pode aprender mais sobre a utilização desse utilitário nessa documentação.
Mãos a obra! Adicione as seguintes linhas no arquivo Customers.sc:
val parameters = ParameterTool.fromArgs(args)
val projectName = parameters.get("project")
val subscriptionName = parameters.get("subscription-name")
val topicName = parameters.get("topic-name")
Feito! Com isso, podemos passar para nossa pipeline o ID do projeto, o nome da assinatura e o nome do tópico através dos parâmetros --project, --subscription-name e --topic-name, respectivamente.
3.4. Pub/Sub source
O Pub/Sub source, como dito, é o modo que o Apache Flink irá ler os dados do Pub/Sub. Vamos construir esse source através do connector oficial do Apache Flink para o Pub/Sub. Caso tenha interesse em saber mais sobre esse connector, verifique essa documentação.
O construtor do source do Pub/Sub requer as seguintes informações:
- Deserializer: o modo que o Apache Flink irá transformar a mensagem recebida do Pub/Sub em objetos da linguagem Scala. Lembra do deserializer para a classe CreatedCustomer que desenvolvemos mais a cima? Então, é ela que vamos estar usando;
- ProjectName: o nome do projeto do GCP onde você criou os tópicos e as assinaturas do Pub/Sub;
- SubscriptionName: o nome da assinatura de onde o Apache Flink irá consumir os dados relacionados ao cadastro inicial dos clientes;
Adicione as seguintes linhas no arquivo:
val pubsubSource = PubSubSource
.newBuilder()
.withDeserializationSchema(new CreatedCustomerDeserializer())
.withProjectName(projectName)
.withSubscriptionName(subscriptionName)
.build()
E é só isso! Bem simples também, né?
3.5. Pub/Sub Sink
Ufa, estamos quase acabando. Vamos construir o PubSub Sink da nossa pipeline.
Como dito, o Pub/Sub Sink é um modo do Apache Flink enviar os dados processados para o Pub/Sub. O construtor do Pub/Sub Sink requer as seguintes informações:
- Serializer: o modo que o Apache Flink irá transformar a instância da classe RegisteredCustomer para uma string JSON e em seguida para binário e enviar para o Pub/Sub. Lembra do serializer que criamos anteriormente? É ele que vamos usar!
- ProjectName: o nome do projeto do GCP onde você criou os tópicos e as assinaturas do Pub/Sub;
- TopicName: o nome do tópico que o Apache Fllink irá enviar os dados processados;
Adicione as seguintes linhas no arquivo:
val pubsubSink = PubSubSink
.newBuilder()
.withSerializationSchema(new RegisteredCustomerSerializer())
.withProjectName(projectName)
.withTopicName(topicName)
.build()
3.6. Pipeline de dados e aplicação dos requisitos de negócio
Chegamos enfim na última etapa de desenvolvimento! Vamos construir o core da nossa pipeline de dados! Recordando, nossa pipeline de dados irá:
- Ler os cadastros iniciais dos clientes do tópico created-customer do Pub/Sub;
- Aplicar as transformações e regras conforme os requisitos de negócio, como:
- Separar o nome do cliente em primeiro e último nome;
- Calcular a idade do cliente com base na data de nascimento;
- Definir a data de criação do cliente;
- Se a idade do cliente for maior ou igual a 30 anos, não registrar o cliente e definir o status isActive como false;
- Enviar os dados processados para o tópico registered-customer no Pub/Sub;
Vamos lá! Mão na massa!
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000L)
env
.addSource(pubsubSource) // lendo os dados do tópico created-customer
.map(cc => RegisteredCustomer(cc.firstName, cc.lastName, cc.age)) // separando o nome do cliente em primeiro e último nome, calculando a idade e definindo a data de criação
.map(rc => if rc.age >= 30 then rc.copy(isActive = false) else rc) // verificando se a idade do cliente é maior ou igual a 30
.addSink(pubsubSink) // enviando os dados processados para o tópico registered-customer
env.execute("customerRegistering")
Acabou? Sim, acabou! Veja como ficou o código completo:
//> using toolkit default
//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6"
//> using dep "org.apache.flink:flink-clients:1.18.1"
//> using dep org.apache.flink:flink-connector-gcp-pubsub:3.1.0-1.18
import br.mac.customers.models.*
import br.mac.customers.serializations.*
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.connectors.gcp.pubsub.{PubSubSink, PubSubSource}
import org.apache.flinkx.api.*
import org.apache.flinkx.api.serializers.*
val parameters = ParameterTool.fromArgs(args)
val projectName = parameters.get("project")
val subscriptionName = parameters.get("subscription-name")
val topicName = parameters.get("topic-name")
val pubsubSource = PubSubSource
.newBuilder()
.withDeserializationSchema(new CreatedCustomerDeserializer())
.withProjectName(projectName)
.withSubscriptionName(subscriptionName)
.build()
val pubsubSink = PubSubSink
.newBuilder()
.withSerializationSchema(new RegisteredCustomerSerializer())
.withProjectName(projectName)
.withTopicName(topicName)
.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000L)
env
.addSource(pubsubSource)
.map(cc => RegisteredCustomer(cc.firstName, cc.lastName, cc.age))
.map(rc => if rc.age >= 30 then rc.copy(isActive = false) else rc)
.addSink(pubsubSink)
env.execute("customerRegistering")
4. Executando a pipeline de dados
Antes de executar a pipeline, acesse o Pub/Sub através do console de seu navegador, acesse o tópico created-customer e envie manualmente algumas mensagens conforme o schema do payload CreatedCustomer. Por exemplo:
{
"fullName": "John Doe",
"birthDate": "1995-01-01"
}
Vamos ver tudo isso em ação? Para isso, execute a pipeline de dados através do Scala CLI. Não é necessário empacotar a pipeline de dados e subir em um cluster Flink. Estamos trabalhando aqui no modo local.
Execute a pipeline de dados com o seguinte comando. Observe os parâmetros da aplicação, conforme definimos anteriormente:
$ scala-cli . -- \
--project seu-project-id-aqui \
--subscription-name created-customer-sub \
--topic-name registered-customer
# ...
Compiling project (Scala 3.4.2, JVM (11))
Compiled project (Scala 3.4.2, JVM (11))
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
Execução em andamento! Abra o Pub/Sub em seu navegador, vá até o tópico registered-customer, e clique em Efetuar pull. Com isso você irá visualizar os dados que foram processados pelo Apache Flink 🎉.
Pressione CTRL + C para interromper a execução da pipeline.
5. Considerações finais
E chegamos ao fim do artigo! Hoje, fizemos:
- Definimos o problema da empresa My Awesome Company (MAC);
- Definimos os payloads JSON que seria recebido e enviado aos tópicos do Pub/Sub;
- Definimos os requisitos de negócio que seriam aplicados aos dados recebidos;
- Criamos dois tópicos no Pub/Sub: um para receber a mensagem referente ao cadastro inicial dos clientes e outro para o envio do dado após ser processado pelo Apache Flink;
- Desenvolvemos a pipeline de dados no Apache Flink, definindo os modelos de negócio referente cada payload recebido e enviado; serializers e deserializers das strings JSON; e por fim a pipeline de dados em si, aplicando as regras de negócio anteriormente definidas;
Por hoje é só, "pe-pe-pessoal!" 🐷! Se você gostou, me dê uma forcinha e senta o dedo no like aí e compartilhe com seus amigos, combinado?
Até a próxima 💚
Top comments (0)