DEV Community

Cover image for Example of applying CDC to JSON files with PySpark
romerito
romerito

Posted on

Example of applying CDC to JSON files with PySpark

In the previous post we showed an example of how to apply CDC using sparksql. Today we are going to try to exemplify what a use case would be like in real life, for that we are going to use FAKE data that we generate through the platform 4devs.

We select people's registration data. The idea is to load this data into a RAW layer and capture the changes and apply them to the TRUSTED layer.Well, to do this example I'm going to use everything local, I use Pop OS! by Default and I already have apache spark installed, in case you need to install it, here is the link to the installer I created. My project structure looks like this.

print1

First I created a python class that creates a spark session, the file is here: ./lakehouse/src/cdc/internals.py

from delta import *
from pyspark.sql import SparkSession

path = "/home/romeritomorais/Dropbox/tecnology/develop/bigdata/lakehouse/datawarehouse"
DataWareHouse = f"{path}"

class session:

    def spark():
        build = (
            SparkSession.builder.appName("application")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .config("spark.sql.warehouse.dir", DataWareHouse)
            .enableHiveSupport())

        return configure_spark_with_delta_pip(build).getOrCreate()
Enter fullscreen mode Exit fullscreen mode

Our main code loads the spark session and reads the JSON files found in the folder: ./lakehouse/storage/stage

we only have 2 sample files, the data will be loaded into a DataFrame, after that we will convert the data type to string and write to our DataWareHouse in Delta format.

Our main code loads the spark session and reads the JSON files found in the folder: ./lakehouse/storage/stage This is an example of the contents of the files

[
    {
        "nome": "Fernanda Isabella FIORENTINO",
        "idade": 55,
        "cpf": "677.958.886-58",
        "rg": "14.130.167-3",
        "data_nasc": "05/03/1985",
        "sexo": "Feminino",
        "signo": "Peixes",
        "mae": "Bárbara Isabel",
        "pai": "Renato Ian Gomes",
        "email": "fernanda_isabella_pompeu@consultorialk.com.br",
        "senha": "FW2CYDLB0l",
        "cep": "76901-636",
        "endereco": "Governador valadares",
        "numero": 144,
        "bairro": "Pipoca",
        "cidade": "Ji-Paraná",
        "estado": "RO",
        "telefone_fixo": "(69) 3962-4304",
        "celular": "(69) 99407-8102",
        "altura": "1,54",
        "peso": 74,
        "tipo_sanguineo": "A-",
        "cor": "vermelho"
    },
    {
        "nome": "Pietro Mateus Carlos da Silva",
        "idade": 66,
        "cpf": "287.883.345-70",
        "rg": "25.820.920-3",
        "data_nasc": "23/07/1956",
        "sexo": "Masculino",
        "signo": "Leão",
        "mae": "Vera Elaine",
        "pai": "Rodrigo Marcelo da Silva",
        "email": "pietro_dasilva@konekoshouten.com.br",
        "senha": "IrHifX9RyC",
        "cep": "57018-685",
        "endereco": "Conjunto Nossa Senhora do Amparo",
        "numero": 577,
        "bairro": "Chã de Bebedouro",
        "cidade": "Maceió",
        "estado": "AL",
        "telefone_fixo": "(82) 3967-9915",
        "celular": "(82) 99107-9714",
        "altura": "1,85",
        "peso": 66,
        "tipo_sanguineo": "B+",
        "cor": "roxo"
    }
]
Enter fullscreen mode Exit fullscreen mode

This data will be loaded into a DataFrame, after that we will convert the data type to string and write it to our DataWareHouse in Delta format, We will also enable cdf(change data feed) for all tables that will be created. Understand that for this example we are not going to pay attention to the settings, in a real scenario each implementation needs to be analyzed to meet the needs of the project.

#!/usr/bin/env python
# coding: utf-8

from pathlib import Path
from cdc.internals import session

spark = session.spark()
path = Path.cwd()
storage = f"{path.parent}/storage/stage"

if __name__ == '__main__':
    spark.sparkContext.setLogLevel("info")

    # carregando dados e escrevendo no formato delta na bronze
    spark.sql("set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true")

    df = spark.read.option("multiline", "true").json(f"{storage}/*")

    appl = df.selectExpr(
        "REPLACE(CAST(altura AS STRING),',','.') AS altura",
        "CAST(bairro AS STRING) AS bairro",
        "CAST(celular AS STRING) AS celular",
        "CAST(cep AS STRING) AS cep",
        "CAST(cidade AS STRING) AS cidade",
        "CAST(cor AS STRING) AS cor",
        "CAST(cpf AS STRING) AS cpf",
        "CAST(data_nasc AS STRING) AS data_nasc",
        "CAST(email AS STRING) AS email",
        "CAST(endereco AS STRING) AS endereco",
        "CAST(estado AS STRING) AS estado",
        "CAST(idade AS STRING) AS idade",
        "CAST(mae AS STRING) AS mae",
        "CAST(nome AS STRING) AS nome",
        "CAST(numero AS STRING) AS numero",
        "CAST(pai AS STRING) AS pai",
        "CAST(peso AS STRING) AS peso",
        "CAST(rg AS STRING) AS rg",
        "CAST(senha AS STRING) AS senha",
        "CAST(sexo AS STRING) AS sexo",
        "CAST(signo AS STRING) AS signo",
        "CAST(telefone_fixo AS STRING) AS telefone_fixo",
        "CAST(tipo_sanguineo AS STRING) AS tipo_sanguineo"
    )

    # escreve os dados no formato delta na bronze
    appl.write.format("delta").mode("append").saveAsTable("default.rw_cadastro")

    spark.stop()

Enter fullscreen mode Exit fullscreen mode

After writing the data, let's see how it was saved

p2

Ready, we have our data saved in delta format in our Database.

As we insert the data into the RAW, the CDF technology generates a line-by-line versioning of the records, it understands what was inserted, deleted or updated, for each line it generates a version, if I insert a line now it will understand that it was of the INSERT type, and generate a version, let's say it generated version 1, if I insert the same line it will generate version 2 and etc... but when we talk about taking the data to the layer from trusted, we need to understand that we are going to take only the version of the records that reflect the origin, if my data at the origin, let's imagine that it is data from an ERP table, we need to take this data to trusted, and how can we do that, simple, doing a MERGE operation. For this we need to load the table that will undergo the change, in our case the default.tr_cadastro table.

#!/usr/bin/env python
# coding: utf-8

from delta import *
from pyspark.sql.functions import col, dense_rank
from pyspark.sql.window import Window
from pathlib import Path
from cdc.internals import session

spark = session.spark()
tableRaw = "rw_cadastro"
tableTrusted = "tr_cadastro"

cadastroTrustedDF = DeltaTable.forName(spark, f"{tableTrusted}")

Enter fullscreen mode Exit fullscreen mode

Now let's load the changes that the default.rw_cadastro table underwent

change_data = spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", 0) \
        .table(f"{tableRaw}") \
        .filter("_change_type != 'update_preimage'")
Enter fullscreen mode Exit fullscreen mode

In this case I'm loading version 0, but you can adopt a strategy of loading only 1 specific range.

With the changes loaded, let's make a partition in the CPF column, as my JSON file did not come with a field that represents a PRIMARY KEY, I will use the CPF field because I understand that in real life this information will not change. partition will group by that field and then select the last record based on _commit_version

windowPartition = Window.partitionBy(
        "cpf").orderBy(col("_commit_version").desc())
Enter fullscreen mode Exit fullscreen mode

After that we create a new DataFrame using the dense_rank window function and selecting only the desired line, an important point, it is necessary to use the strategy of getting the _commit_version of the last modified records well, without this strategy you will probably load old modifications, and depending of the amount of changes in the RAW can generate a gigantic volume of data to apply the PARTITION

apply_change_data = change_data\
        .withColumn("dense_rank", dense_rank().over(windowPartition))\
        .where("dense_rank=1")\
        .distinct()
Enter fullscreen mode Exit fullscreen mode

Now let's MERGE the changes captured by the CDF.

cadastroTrustedDF.alias("cadastroTrustedDF").merge(apply_change_data.alias("cadastroRawDF"), "cadastroRawDF.cpf = cadastroTrustedDF.cpf") \
        .whenMatchedDelete(
            condition="cadastroRawDF._change_type = 'delete'") \
        .whenMatchedUpdate(
        set={
            "altura": "cadastroRawDF.altura",
            "bairro": "cadastroRawDF.bairro",
            "celular": "cadastroRawDF.celular",
            "cep": "cadastroRawDF.cep",
            "cidade": "cadastroRawDF.cidade",
            "cor": "cadastroRawDF.cor",
            "cpf": "cadastroRawDF.cpf",
            "data_nasc": "cadastroRawDF.data_nasc",
            "email": "cadastroRawDF.email",
            "endereco": "cadastroRawDF.endereco",
            "estado": "cadastroRawDF.estado",
            "idade": "cadastroRawDF.idade",
            "mae": "cadastroRawDF.mae",
            "nome": "cadastroRawDF.nome",
            "numero": "cadastroRawDF.numero",
            "pai": "cadastroRawDF.pai",
            "peso": "cadastroRawDF.peso",
            "rg": "cadastroRawDF.rg",
            "senha": "cadastroRawDF.senha",
            "sexo": "cadastroRawDF.sexo",
            "signo": "cadastroRawDF.signo",
            "telefone_fixo": "cadastroRawDF.telefone_fixo",
            "tipo_sanguineo": "cadastroRawDF.tipo_sanguineo"
        }
    )\
        .whenNotMatchedInsert(
            condition="cadastroRawDF._change_type != 'delete'",
        values={
            "altura": "cadastroRawDF.altura",
            "bairro": "cadastroRawDF.bairro",
            "celular": "cadastroRawDF.celular",
            "cep": "cadastroRawDF.cep",
            "cidade": "cadastroRawDF.cidade",
            "cor": "cadastroRawDF.cor",
            "cpf": "cadastroRawDF.cpf",
            "data_nasc": "cadastroRawDF.data_nasc",
            "email": "cadastroRawDF.email",
            "endereco": "cadastroRawDF.endereco",
            "estado": "cadastroRawDF.estado",
            "idade": "cadastroRawDF.idade",
            "mae": "cadastroRawDF.mae",
            "nome": "cadastroRawDF.nome",
            "numero": "cadastroRawDF.numero",
            "pai": "cadastroRawDF.pai",
            "peso": "cadastroRawDF.peso",
            "rg": "cadastroRawDF.rg",
            "senha": "cadastroRawDF.senha",
            "sexo": "cadastroRawDF.sexo",
            "signo": "cadastroRawDF.signo",
            "telefone_fixo": "cadastroRawDF.telefone_fixo",
            "tipo_sanguineo": "cadastroRawDF.tipo_sanguineo"
        }
    ).execute()
Enter fullscreen mode Exit fullscreen mode

The operation is simple, we have 3 conditions to apply the changes, the first is using the method .whenMatchedDelete() which compares the row from the source table (rw_cadastro) by the CPF field and checks if this row was deleted at the origin, if so, this row is removed from the destination (rw_cadastro).

The second condition is using the .whenMatchedUpdate() method

where again the row of the source table is compared by the field cadastreRawDF.cpf = cadastreTrustedDF.cpf, if there is a change in the origin, it is updated in the destination table.

And finally,.whenNotMatchedInsert() if the records exist only at the source, it understands that this data will be inserted into the destination structure, a very important information, if at the time of this operation there are duplicate lines, let's say that 2 or more records have the CPF repeated , causes an ERROR, because the operation cannot perform consecutive operations on the same record.

But before applying this operation we have to type the data and leave it in the schema of the default.tr_cadastro table, and for that we transform the data before performing the MERGE

# cria view temporaria para transformChangeDataacao dos dados onde é aplicado a tipagem etc ..
selectChangeData.createOrReplaceTempView("transformChangeData")

# transformChangeDataação
apply_change_data = spark.sql("""
SELECT 
  CAST(nome AS STRING) AS nome, 
  CAST(idade AS INT) AS idade, 
  CAST(altura AS FLOAT) AS altura, 
  CAST(sexo AS STRING) AS sexo, 
  CAST(peso AS FLOAT) AS peso, 
  CAST(cor AS STRING) AS cor, 
  CAST(
    replace(data_nasc, '/', '-') AS STRING
  ) AS data_nasc, 
  CAST(signo AS STRING) AS signo, 
  CAST(tipo_sanguineo AS STRING) AS tipo_sanguineo, 
  CAST(
    replace(
      replace(cpf, '.', ''), 
      '-', 
      ''
    ) AS long
  ) AS cpf, 
  CAST(
    replace(
      replace(rg, '.', ''), 
      '-', 
      ''
    ) AS long
  ) AS rg, 
  CAST(mae AS STRING) AS mae, 
  CAST(pai AS STRING) AS pai, 
  CAST(endereco AS STRING) AS endereco, 
  CAST(numero AS STRING) AS numero, 
  CAST(bairro AS STRING) AS bairro, 
  CAST(
    replace(cep, '-', '') AS STRING
  ) AS cep, 
  CAST(cidade AS STRING) AS cidade, 
  CAST(estado AS STRING) AS estado, 
  CAST(
    replace(
      replace(
        replace(
          replace(celular, ')', ''), 
          '(', 
          ''
        ), 
        ' ', 
        ''
      ), 
      '-', 
      ''
    ) AS STRING
  ) AS celular, 
  CAST(
    replace(
      replace(
        replace(
          replace(telefone_fixo, ')', ''), 
          '(', 
          ''
        ), 
        ' ', 
        ''
      ), 
      '-', 
      ''
    ) AS STRING
  ) AS telefone_fixo, 
  CAST(email AS STRING) AS email,
  _change_type,
  _commit_version,
  _commit_timestamp
FROM 
  transformChangeData
""")
Enter fullscreen mode Exit fullscreen mode

And here is a sample of our table data

p3

This is one of the ways to work with CDC in spark using Delta's resources, I hope this piece of information can help you, the project is here github

Top comments (1)

Collapse
 
anikethsdeshpande profile image
Aniketh Deshpande

this is amazing and very helpful !