DEV Community

Geazi Anc
Geazi Anc

Posted on

SkyX: desenvolvimento de uma análise de tráfego aéreo em tempo real com Spark Structured Streaming e Apache Kafka

Atualmente, vivemos em um mundo onde peta bytes de dados são gerados a cada segundo. Como tal, a análise e o processamento desses dados em tempo real torna-se mais do que essencial para uma empresa que busca gerar insights de negócios com mais precisão conforme dados e mais dados são produzidos.

Hoje, vamos desenvolver uma análise de dados em tempo real com base em dados fictícios de um tráfego aéreo utilizando Spark Structured Streaming e Apache Kafka. Caso não saiba o que são essas tecnologias, sugiro a leitura de meu artigo que escrevi introduzindo elas com mais detalhes, assim como outros conceitos que serão abordados no decorrer desse artigo. Então, não esquece de conferir lá 💚.

Você pode conferir o projeto completo em meu GitHub.

Arquitetura

Pois bem, imagine que você, pessoa engenheira de dados, trabalhe em uma empresa aérea chamada de SkyX, onde a cada segundo dados sobre o tráfego aéreo são gerados.

Você foi solicitada para desenvolver uma dashboard que exibe em tempo real dados desses voos, como um rank das cidades mais visitadas no exterior; as cidades onde mais saem pessoas; e as aeronaves que mais transportam pessoas ao redor do mundo.

Esses são os dados que são gerados a cada voo:

  • aircraft_name: nome da aeronave. Na SkyX, só existem apenas cinco aeronaves disponíveis.
  • From: cidade de onde a aeronave está partindo. A SkyX só realiza voos entre cinco cidades ao redor do mundo.
  • To: cidade de destino da aeronave. Como foi dito, a SkyX só realiza voos entre cinco cidades ao redor do mundo.
  • Passengers: quantidade de passageiros que a aeronave está transportando. Todas as aeronaves da SkyX transportam entre 50 e 100 pessoas a cada voo.

A seguir está a arquitetura básica de nosso projeto:

  • Produtor: responsável por produzir dados do tráfego aéreo das aeronaves e enviá-los à um tópico do Apache Kafka.
  • Consumidor: apenas observa os dados que chegam em tempo real ao tópico do Apache Kafka.
  • Análise de dados: três dashboards que processam e analisam em tempo real os dados que chegam no tópico do Apache Kafka. Análise das cidades que mais recebem turistas; análise das cidades que mais saem pessoas para visitar outras cidades; e análise das aeronaves da SkyX que mais transportam pessoas entre as cidades ao redor do mundo.

Preparando o ambiente de desenvolvimento

Este tutorial assume que você já tenha o PySpark instalado em sua máquina. Caso ainda não tenha, confira as etapas na própria documentação.

Já para o Apache Kafka, vamos utilizar ele por meio de conteinerização via Docker 🎉🐳.

E, por fim, vamos utilizar o Python através de um ambiente virtual.

Apache Kafka por conteinerização via Docker

Sem mais delongas, crie uma pasta chamada skyx e adicione o arquivo docker-compose.yml dentro dela.

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
Enter fullscreen mode Exit fullscreen mode

Agora, adicione o seguinte conteúdo dentro do arquivo docker-compose:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

Feito! Já podemos subir nosso servidor do Kafka. Para isso, digite o seguinte comando no terminal:

$ docker compose up -d
$ docker compose ps
Enter fullscreen mode Exit fullscreen mode
NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Enter fullscreen mode Exit fullscreen mode

Observação: este tutorial está utilizando a versão 2.0 do Docker Compose. É por este motivo que não há o "-" entre docker e compose ☺.

Agora, precisamos criar um tópico dentro do Kafka que irá armazenar os dados enviados em tempo real pelo produtor. Para isso, vamos acessar o Kafka dentro do contêiner:

$ docker compose exec kafka bash

E enfim criar o tópico, chamado de airtraffic.

$ kafka-topics --create --topic airtraffic --bootstrap-server localhost:29092

Created topic airtraffic.

Criação do ambiente virtual

Para desenvolvermos nosso produtor, ou seja, a aplicação que será responsável por enviar os dados do tráfego aéreo em tempo real para o tópico do Kafka, precisamos fazer o uso da biblioteca kafka-python. O kafka-python é uma biblioteca desenvolvida pela comunidade que nos permite desenvolver produtores e consumidores que se integram com o Apache Kafka.

Primeiro, vamos criar um arquivo chamado requirements.txt e adicionar a seguinte dependência dentro dele:

kafka-python

Segundo, vamos criar um ambiente virtual e instalar as dependências no arquivo requirements.txt:

$ python -m venv venv
$ venv\scripts\activate
$ pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

Feito! Agora sim nosso ambiente já está pronto para o desenvolvimento 🚀.

Desenvolvimento do produtor

Agora vamos criar nosso produtor. Como foi dito, o produtor será responsável por enviar os dados do tráfego aéreo para o tópico recém criado do Kafka.

Como também foi dito na arquitetura, a SkyX realiza voos apenas entre cinco cidades ao redor do mundo, e tem apenas cinco aeronaves disponíveis 😹. Vale ressaltar que cada aeronave transporta entre 50 e 100 pessoas.

Observe que os dados são gerados de forma aleatória e enviados ao tópico no formato json em um intervalo de tempo entre 1 e 6 segundos 😉.

Vamos lá! Crie um subdiretório chamado src e outro subdiretório chamado kafka. Dentro do diretório kafka, crie um arquivo chamado airtraffic_producer.py e adicione o seguinte código dentro dele:

import random
from json import dumps
from time import sleep
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers="localhost:29092",
    value_serializer=lambda x: dumps(x).encode("utf-8")
)

while True:
    cities = [
        "São Paulo, Brazil",
        "Tokyo, Japan",
        "Berlin, Germany",
        "Rome, Italy",
        "Seoul, South Korea"
    ]

    aircraft_names = [
        "Convair B-36 Peacemaker",
        "Lockheed C-5 Galaxy",
        "Northrop B-2 Spirit",
        "Boeing B-52 Stratofortress",
        "McDonnell XF-85 Goblin"
    ]

    aircraft = {
        "aircraft_name": random.choice(aircraft_names),
        "from": random.choice(cities),
        "to": random.choice(cities),
        "passengers": random.randint(50, 101)
    }

    future = producer.send("airtraffic", value=aircraft)
    print(future.get(timeout=60))

    sleep(random.randint(1, 6))
Enter fullscreen mode Exit fullscreen mode

Feito! Desenvolvemos nosso produtor. Execute-o e deixe rodando por um tempo.

$ python airtraffic_producer.py

Desenvolvimento do consumidor

Agora vamos desenvolver nosso consumidor. Essa será uma aplicação bem simples. Ela irá apenas exibir no terminal em tempo real os dados que chegam no tópico do kafka.

Ainda dentro do diretório kafka, crie um arquivo chamado airtraffic_consumer.py e adicione o seguinte código dentro dele:

from json import loads
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "airtraffic",
    bootstrap_servers="localhost:29092",
    value_deserializer=lambda x: loads(x.decode("utf-8"))
)

for msg in consumer:
    print(msg.value)
Enter fullscreen mode Exit fullscreen mode

Viu só, eu te disse que era bem simples. Execute-o e observe os dados que serão exibidos em tempo real conforme o produtor envia os dados ao tópico.

$ python airtraffic_consumer.py

Análise de dados: cidades que mais recebem turistas

Agora começamos com nossa análise de dados. Nesse momento, vamos desenvolver uma dashboard, uma aplicação, que irá exibir em tempo real um rank das cidades que mais recebem turistas. Ou seja, iremos agrupar os dados pela coluna to e fazer uma somatória com base na coluna passengers. Bem simples!

Para isso, dentro do diretório src, crie um subdiretório chamado dashboards e crie um arquivo chamado tourists_analysis.py. Em seguida, adicione o seguinte código dentro dele:

import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("Tourists Analysis")
         .getOrCreate()
         )

df1 = (spark.readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "localhost:29092")
       .option("subscribe", "airtraffic")
       .option("startingOffsets", "earliest")
       .load()
       )

df2 = df1.selectExpr("CAST(value AS STRING)")

aircraft = {
    "aircraft_name": "",
    "from": "",
    "to": "",
    "passengers": 0
}

schema = F.schema_of_json(F.lit(json.dumps(aircraft)))

airtraffic = (df2.select(F.from_json(df2.value, schema).alias("jsondata"))
              .select("jsondata.*")
              )

tourists = (airtraffic.groupBy("to")
            .agg({"passengers": "sum"})
            .withColumnRenamed("sum(passengers)", "tourists")
            .withColumnRenamed("to", "city")
            .orderBy("tourists", ascending=False)
            )

(tourists.writeStream
 .format("console")
 .outputMode("complete")
 .start()
 .awaitTermination()
 )
Enter fullscreen mode Exit fullscreen mode

E já podemos executar nosso arquivo através do spark-submit. Mas calma lá! Quando estamos integrando o PySpark com o Kafka, devemos executar o spark-submit de modo diferente. É necessário que informemos o pacote do Apache Kafka e a versão atual do Apache Spark através do parâmetro --packages.

Caso seja a primeira vez que esteja integrando o Apache Spark com o Apache Kafka, talvez a execução do spark-submit demore um pouco. Isso ocorre porque ele precisa fazer o download dos pacotes necessários.

Certifique-se que o produtor ainda esteja rodando para que possamos ver a análise dos dados em tempo real. Dentro do diretório dashboards, execute o seguinte comando:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 tourists_analysis.py

+------------------+--------+
|              city|tourists|
+------------------+--------+
|       Rome, Italy|    2628|
|      Tokyo, Japan|    2467|
|   Berlin, Germany|    2204|
|Seoul, South Korea|    1823|
| São Paulo, Brazil|    1719|
+------------------+--------+
Enter fullscreen mode Exit fullscreen mode

Análise de dados: cidades onde mais saem pessoas

Essa análise é bem semelhante a anterior. Porém, ao invés de analisarmos em tempo real as cidades que mais recebem turistas, vamos analisar as cidades onde mais saem pessoas. Para isso, crie um arquivo chamado leavers_analysis.py e adicione o seguinte código dentro dele:

import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("Leavers Analysis")
         .getOrCreate()
         )

df1 = (spark.readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "localhost:29092")
       .option("subscribe", "airtraffic")
       .option("startingOffsets", "earliest")
       .load()
       )

df2 = df1.selectExpr("CAST(value AS STRING)")

aircraft = {
    "aircraft_name": "",
    "from": "",
    "to": "",
    "passengers": 0
}

schema = F.schema_of_json(F.lit(json.dumps(aircraft)))

airtraffic = (df2.select(F.from_json(df2.value, schema).alias("jsondata"))
              .select("jsondata.*")
              )

leavers = (airtraffic.groupBy("from")
           .agg({"passengers": "sum"})
           .withColumnRenamed("sum(passengers)", "leavers")
           .withColumnRenamed("from", "city")
           .orderBy("leavers", ascending=False)
           )

(leavers.writeStream
 .format("console")
 .outputMode("complete")
 .start()
 .awaitTermination()
 )
Enter fullscreen mode Exit fullscreen mode

Certifique-se que o produtor ainda esteja rodando para que possamos ver a análise dos dados em tempo real. Dentro do diretório dashboards, execute o seguinte comando:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 leavers_analysis.py

+------------------+-------+
|              city|leavers|
+------------------+-------+
|      Tokyo, Japan|   2673|
|   Berlin, Germany|   2305|
| São Paulo, Brazil|   2096|
|Seoul, South Korea|   1895|
|       Rome, Italy|   1872|
+------------------+-------+
Enter fullscreen mode Exit fullscreen mode

Análise de dados: aeronaves que mais transportam passageiros

Essa análise é bem mais simples do que as anteriores. Vamos analisar em tempo real as aeronaves que mais transportam passageiros entre as cidades ao redor do mundo. Crie um arquivo chamado aircrafts_analysis.py e adicione o seguinte código dentro dele:

import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("Aircrafts Analysis")
         .getOrCreate()
         )

df1 = (spark.readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "localhost:29092")
       .option("subscribe", "airtraffic")
       .option("startingOffsets", "earliest")
       .load()
       )

df2 = df1.selectExpr("CAST(value AS STRING)")

aircraft = {
    "aircraft_name": "",
    "from": "",
    "to": "",
    "passengers": 0
}

schema = F.schema_of_json(F.lit(json.dumps(aircraft)))

airtraffic = (df2.select(F.from_json(df2.value, schema).alias("jsondata"))
              .select("jsondata.*")
              )

aircrafts = (airtraffic.groupBy("aircraft_name")
             .agg({"passengers": "sum"})
             .withColumnRenamed("sum(passengers)", "total_passengers")
             .orderBy("total_passengers", ascending=False)
             )

(aircrafts.writeStream
 .format("console")
 .outputMode("complete")
 .start()
 .awaitTermination()
 )
Enter fullscreen mode Exit fullscreen mode

Certifique-se que o produtor ainda esteja rodando para que possamos ver a análise dos dados em tempo real. Dentro do diretório dashboards, execute o seguinte comando:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 aircrafts_analysis.py

+--------------------+----------------+
|       aircraft_name|total_passengers|
+--------------------+----------------+
|McDonnell XF-85 G...|            2533|
|Boeing B-52 Strat...|            2345|
|Convair B-36 Peac...|            2012|
| Lockheed C-5 Galaxy|            2002|
| Northrop B-2 Spirit|            1949|
+--------------------+----------------+
Enter fullscreen mode Exit fullscreen mode

Considerações finais

E finalizamos por aqui, pessoal! Neste artigo desenvolvemos uma análise de dados em tempo real com base em dados fictícios de um tráfego aéreo utilizando o Spark Structured Streaming e o Apache Kafka.

Para isso, desenvolvemos um produtor que envia esses dados em tempo real ao tópico do kafka, e depois desenvolvemos 3 dashboards para analisar esses dados em tempo real.

Espero que tenham gostado. Até a próxima 💚.

Top comments (0)