DEV Community

Como conectar Spark e S3 para processamento de arquivos

Neste artigo, mostraremos como conseguimos configurar o Apache Spark e o S3 para lidar com arquivos grandes. Quais problemas foram encontrados e como foram resolvidos.

Chaves = ID

A ideia é desenvolver uma estrutura para automatizar os processos de ETL, que recebe como entrada um arquivo com configurações para carregamento. A principal ferramenta da estrutura para processamento de dados é o Apache Spark. Inicialmente, as fontes de dados eram apenas BDs relacionais (Postgres, Oracle e MS SQL) e Kafka, e o receptor era o Apache Hive.

No entanto, algumas fontes não estavam preparadas para fornecer acesso direto a seus bancos de dados. O repositório S3 tornou-se uma opção para obter dados desses sistemas que satisfaria a todos: tanto os sistemas de origem quanto eu ou quem quer que seja, como agregadores de informações. Essa abordagem funciona da seguinte forma: uma vez em um determinado período (por exemplo, uma vez por dia), o sistema de origem descarrega uma determinada fatia de dados de seu banco de dados e a coloca no S3 como um arquivo. A fatia de dados descarregada pela fonte pode ser completa (todos os dados na tabela) ou parcial (somente dados incrementais). O conteúdo da fatia é determinado pelos acordos entre o sistema de origem e o de destino, pois depende do algoritmo pelo qual os dados serão processados posteriormente. Assim, foi necessário configurar nossa estrutura para dar suporte à replicação de dados do S3 para o Apache Hive.

Observação: o código fornecido no artigo é simplificado para evitar a necessidade de descrever a estrutura do projeto.

Declaração do problema

Suponha que tenhamos um armazenamento S3 com vários buckets. Um bucket é um contêiner especial com um ID exclusivo. Cada bucket corresponde a uma fonte.

Pode haver várias pastas em um pacote e cada pasta refere-se a um download diferente. Uma pasta pode conter vários arquivos, e arquivos diferentes podem conter dados de tabelas diferentes. Para identificar o pertencimento de um arquivo a uma tabela, o nome do arquivo deve conter o nome da tabela da qual os dados foram extraídos e o registro de data e hora - a hora da formação do arquivo. Além disso, o nome do arquivo pode conter outras informações, portanto, uma chave exclusiva para cada solução é usada para extrair o nome da tabela e a data. Todos os arquivos têm a extensão csv.

Por exemplo, os nomes dos arquivos podem ser os seguintes: table_1_20240320_122145.csv e 20240322_231243_workers.csv. Então, as chaves desses arquivos podem ser representadas da seguinte forma: employee_table_.csv e worker_.csv, respectivamente.

Em um download, precisamos ler todos os arquivos de uma pasta por meio de uma determinada chave, processá-los com os algoritmos disponíveis (os algoritmos serão descritos nos parágrafos abaixo) e, em seguida, registrá-los no Hive na forma de uma tabela. As tabelas resultantes estarão prontas para serem usadas pelo usuário para resolver suas tarefas.

Image description
Estrutura de armazenamento S3 simplificada

A figura mostra a estrutura de armazenamento descrita acima. Deve observar que, no S3, os objetos são fisicamente armazenados em um espaço de endereço plano, sem estrutura hierárquica, na forma de pares de valores-chave. Cada objeto no S3 tem seu próprio identificador exclusivo, pelo qual você pode se dirigir diretamente a ele.
No entanto, podemos organizar uma variante familiar de armazenamento de arquivos, imitando um armazenamento de arquivos. Nesse caso, o identificador do objeto se parecerá com um caminho no sistema de arquivos, por exemplo, /data/files/file1.txt. Não há representação física de pastas no S3, mas, para simplificar, podemos chamar formalmente os valores intermediários de pastas. No exemplo acima, podemos chamar as pastas "data", "pasta" e "files".

Início do desenvolvimento

Primeiro, decidi fazer a leitura e o processamento dos dados inteiramente por meio do Spark. Para fazer isso, criei um código trivial para ler arquivos csv de uma pasta:

val delimiter = ";"
val header = "true"
val path = "/Pasta1/Table1_*.csv"
spark.read
      .format("csv")
      .option("header", header)
      .option("delimiter", delimiter)
      .option("inferSchema", "true")
      .withColumn("filename", input_file_name)
      .load(path)
Enter fullscreen mode Exit fullscreen mode
  • path (caminho) é uma pasta onde estão localizados os arquivos necessários e uma chave, pela qual é necessário extrair somente os arquivos necessários, pois pode haver arquivos na pasta que pertençam ao mesmo download, mas a outra tabela na origem;

  • delimiter que é o delimitador de coluna no arquivo csv;

  • Defina o parâmetro inferSchema como true para a detecção automática dos tipos de coluna;

  • Adicionada a coluna "filename" com o nome do arquivo no DataFrame, para que ele possa ser usado para descobrir de qual arquivo a linha da tabela foi extraída.

Em seguida, reformulei esse código para que ele possa ler arquivos do S3:

spark
  .sparkContext
  .hadoopConfiguration
  .set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark
  .sparkContext
  .hadoopConfiguration
  .set("fs.s3a.endpoint", <endpoint>)
spark
  .sparkContext
  .hadoopConfiguration
  .set("fs.s3a.access.key",<accessKey>)
spark
  .sparkContext
  .hadoopConfiguration
  .set("fs.s3a.secret.key",<secretKey>)

val delimiter = ";"
val header = "true"
val path = "/Pasta1/Table1_*.csv"
spark.read
      .format("csv")
      .option("header", header)
      .option("delimiter", delimiter)
      .option("inferSchema", "true")
      .withColumn("filename", input_file_name)
      .load(path)
Enter fullscreen mode Exit fullscreen mode

Para oferecer suporte à leitura do S3, apenas adicionei parâmetros de configuração. Tudo isso graças ao suporte nativo ao S3 no Spark.

Vamos dar uma olhada mais de perto nos parâmetros acima:

  1. A implementação do sistema de arquivos com o qual queremos trabalhar. Há três implementações do S3: s3, s3n e s3a. Escolhi para este exemplo o s3a porque ele é um sistema de arquivos nativo para leitura e gravação de arquivos regulares no S3, é compatível com todos os recursos do s3n, mas também é mais eficiente e permite arquivos maiores que 5 GB.

  2. Ponto de extremidade S3.

  3. Chave pública para acessar o accessKey.

  4. Chave privada para acessar a secretKey.

Também precisamos do arquivo JAR hadoop-aws-2.7.1 no caminho da classe. Esse JAR contém a classe org.apache.hadoop.fs.s3a.S3AFileSystem, da qual precisamos.

Agora o código para leitura de arquivos do S3 está pronto. Em seguida, o resultado da leitura é alimentado como entrada para os algoritmos append, replace ou scd2 que foram implementados anteriormente para outros tipos de fonte.

A primeira execução dessa implementação não foi bem-sucedida. Apareceu o seguinte erro: Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target.
Esse problema ocorre quando seu servidor tem um certificado autoassinado. Há duas maneiras de resolver esse problema:

  1. Desativar a validação do certificado SSL na inicialização

  2. Adicionar esse certificado à lista de certificados JVM confiáveis.

Para a conveniência de desenvolver e testar o programa, usei a primeira variante da solução. Você pode desativar a verificação de certificados no spark-application especificando os seguintes comandos na configuração spark-submit:

spark-submit 
--conf spark.driver.extraJavaOptions=-Dcom.amazonaws.sdk.disableCertChecking=true 
--conf spark.executor.extraJavaOptions=-Dcom.amazonaws.sdk.disableCertChecking=true …
Enter fullscreen mode Exit fullscreen mode

Essa variante da solução é aceitável apenas durante o desenvolvimento, mas é inadmissível produzir uma solução desse tipo na produção. Por isso, foi necessário configurar a adição de um certificado à lista de certificados confiáveis no loop de produção. Para fazer isso, foi feito:

  • Exportei um certificado SSL usando um navegador. O nome do arquivo de certificado tem o seguinte nome: cert_name.crt.

  • Criei um repositório de certificados confiável e adicionei um certificado a ele.

keytool -import -alias <any alias> 
-file cert_name.crt -keystore <the truststore file name> -storepass <password>
Enter fullscreen mode Exit fullscreen mode

Adicionado o caminho para o armazenamento de certificado confiável criado ao comando spark-submit:

spark‑submit 
‑-conf 
'spark.executor.extraJavaOptions=‑Djavax.net.ssl.trustStore=<the truststore filename> 
‑Djavax.net.ssl.trustStorePassword=<password>' 
‑-conf
'spark.driver.extraJavaOptions=‑Djavax.net.ssl.trustStore=<the truststore filename> 
‑Djavax.net.ssl.trustStorePassword=<password>' …
Enter fullscreen mode Exit fullscreen mode

Após a instalação dos certificados, consegui iniciar o download. Porém, no processo de análise e teste da implementação descrita, verificou-se que ela tem vários problemas e limitações:

  • Os arquivos lidos não são contados.

Com essa opção de leitura, pegamos todos os arquivos por uma determinada máscara na pasta todas as vezes e não marcamos de forma alguma que já os lemos e que não precisamos processá-los na próxima vez.

  • O inferSchema funciona por um longo tempo.

Em arquivos grandes (mais de 1 GB), há uma queda significativa na velocidade de leitura do arquivo ao usar o parâmetro inferSchema. Além disso, nem todos os tipos de dados foram definidos corretamente.

  • Problemas com arquivos vazios.

É necessário um entendimento explícito dos motivos da presença de arquivos vazios, pois não está claro como lidar com eles: eles devem ser tratados como um erro ou simplesmente não há dados na fonte? Essa questão também precisava ser resolvida, pois a estrutura entende a ausência de um arquivo como uma tabela vazia na origem. Ao mesmo tempo, o algoritmo scd2 é sensível a essa situação, pois considerará todos os dados irrelevantes e, no dia seguinte, reabrirá os dados novamente.

  1. O algoritmo SCD2 não está funcionando corretamente.

O algoritmo SCD2 é um tipo de algoritmo SCD (Slowly Changing Dimensions). O SCD é um mecanismo para rastrear alterações nos dados de medição em termos de um data warehouse.

O SCD2 usa a adição de uma nova linha e colunas adicionais. Essa abordagem preserva a historicidade. Além disso, tem como opção adicionar colunas de serviço que podem ser responsáveis pelo controle de versão, status e intervalo de tempo durante o qual essas linhas podem ser consideradas relevantes.

O algoritmo SCD2 pressupõe a entrada de uma fatia completa de dados de uma fonte e, em seguida, compara esses dados com o receptor para ver se alguma coluna foi atualizada. No caso de arquivos, podemos ter vários arquivos para a mesma tabela ao mesmo tempo, o que significa que, nesse caso, temos várias fatias da tabela em intervalos de tempo diferentes, o que gera um grande número de duplicatas. Por esse motivo, o algoritmo não consegue entender como construir a historicidade, qual linha era anterior e qual era posterior.

Contagem de arquivos lidos

No início do desenvolvimento, não tinha nenhum requisito para excluir arquivos lidos, portanto, não os excluímos. Consequentemente, houve um problema: como entender quais arquivos já foram lidos e processados no lançamento anterior e quais não foram e precisam ser processados agora. Comecei a procurar uma solução para esse problema usando ferramentas spark, mas não consegui encontrar essa opção. Por isso, decidi não ler os arquivos por chave diretamente pelo Spark, mas primeiro usar um mecanismo separado para encontrar os nomes dos arquivos que precisamos processar e, em seguida, alimentar a lista desses arquivos na entrada do Spark.

Para esse fim, decidi usar a classe de sistema de arquivos org.apache.hadoop.fs.FileSystem. Esta é uma classe base abstrata para um sistema de arquivos bastante genérico. Ele pode ser implementado como um sistema de arquivos distribuído ou como um sistema "local", refletindo uma unidade conectada localmente. Ela pode ser usada para manipular arquivos com facilidade: navegar pelo conteúdo das pastas, pesquisar arquivos por chaves (ID), excluir e mover arquivos individuais.

Para criar uma implementação do S3, precisamos passar o URI do nosso armazenamento S3 e sua configuração como um argumento. Já temos uma descrição desses parâmetros na cláusula "Start Development". Especificamos parâmetros semelhantes para o FileSystem.

val conf = new Configuration()
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("fs.s3a.endpoint", "<endpoint>")
conf.set("fs.s3a.access.key", "<accessKey>")
conf.set("fs.s3a.secret.key", "<secretKey>")
Enter fullscreen mode Exit fullscreen mode

Após as configurações acima, podemos criar uma classe para manipular o sistema de arquivos. Para fazer isso, vamos primeiro compor um URI para a conexão, que consiste no prefixo s3a:// e no nome do bucket no S3:

val bucketPath = s"s3a://${bucketName}/"
val s3Path = new URI(bucketPath)
val s3FileSystem = FileSystem.get(s3Path, conf) 
Enter fullscreen mode Exit fullscreen mode

Agora podemos extrair os nomes dos arquivos pela chave (ID) fornecida:
s3FileSystem.globStatus(new Path(s"$directory/$fileMask")).filter(_.isFile)

No entanto, isso é apenas metade da solução. Agora, como descobrir quais dos arquivos recebidos já foram processados e quais não foram? A primeira opção era ler os nomes dos arquivos diretamente da tabela Hive, onde gravamos o conteúdo desses arquivos, mas essa opção tem dois problemas:

  1. Velocidade de obtenção da lista de arquivos lidos. Para obter os arquivos lidos, precisamos acessar a tabela e obter essa lista por meio de distintos. Mas temos uma grande quantidade de dados armazenados nas tabelas, o que pode tornar o trabalho do programa muito lento.

  2. Quando alguns algoritmos (por exemplo, scd2) estão em execução, novos dados podem estar faltando, portanto, o nome do arquivo não será gravado na tabela, o que significa que processaremos esse arquivo várias vezes, o que é incorreto.

Por isso, foi decidido gravar em um arquivo separado no hadoop a lista de arquivos lidos depois de carregar os dados com o spark. O código desse registro é o seguinte:

val dirPath = "<caminho para a pasta com a lista de arquivos lidos>" 
val tempPath = s"${dirPath}_tmp"
val dfNewReadFiles = listFiles.toDF("filename")
val dfAllReadFiles =
if (dirExists(dirPath)) { 
  spark
    .read
    .parquet(dirPath)
    .write
    .mode(SaveMode.Overwrite)
    .parquet(tempPath)
  dfNewReadFiles union spark.read.parquet(tempPath)
}
else dfNewReadFiles

dfAllReadFiles.repartition(1).write.mode(SaveMode.Overwrite).parquet(dirPath)
fs.delete(new Path(tempPath), true)
Enter fullscreen mode Exit fullscreen mode

Aqui registramos tanto os arquivos lidos anteriormente quanto os lidos e processados na iteração atual.

Agora podemos obter facilmente a lista de arquivos não processados com o s3. Vamos refinar o código de leitura de arquivos com o s3 acima, adicionando um filtro nos arquivos já lidos:

val readedFiles =
  app.spark.read
    .parquet(readedFilesDirPath)
    .collect
    .map(_.getString(0))
    .toList
s3FileSystem.globStatus(new Path(s"$s3DirectoryPath/$fileMask"))
      .filter(_.isFile)
      .map(file => s"$s3DirectoryPath/${file.getPath.getName}")
      .diff(readedFilesDirPath)
      .toList
      .sorted
Enter fullscreen mode Exit fullscreen mode

Abandono do inferSchema em favor da especificação de tipos de coluna

Conforme mencionado acima, o inferSchema torna o trabalho do programa mais lento e nem sempre define os tipos corretamente. Isso se deve ao fato de que, para determinar o tipo de uma coluna ao especificar esse parâmetro, o spark lê o arquivo de entrada pelo menos uma vez em sua totalidade para determinar os tipos e, em seguida, lê o arquivo com o esquema resultante para processamento posterior. Além disso, uma coluna pode não conter nenhum valor. Nesse caso, não fica claro para o programa qual tipo definir, e ele escolhe o tipo mais seguro e mais amplo: string.

Por esses motivos, a melhor solução foi recusar o uso do parâmetro inferSchema e, em vez disso, especificar diretamente as colunas e seus tipos. Para isso, foi decidido especificar à força o parâmetro columns nos argumentos do nosso programa, que será obrigatório para os carregamentos do s3. Esse parâmetro é uma lista de pares: coluna e seu tipo.

Em seguida, vamos corrigir o código de leitura de arquivos com colunas:

val delimiter = ";"
val header = "true"
val path = "/Pasta1/Table1_*.csv"
val columns = List(("name", StringType), ("age", IntegerType), ...)
val customSchema = StructType(
  columns
    .map(column => StructField(column._1, column._2, true)))
spark.read
      .format("csv")
      .option("header", header)
      .option("delimiter", delimiter)
      .option("inferSchema", "true")
      .schema(customSchema)
      .withColumn("filename", input_file_name)
      .load(path)
Enter fullscreen mode Exit fullscreen mode

Processamento de arquivos vazios

Primeiro, vamos considerar o quanto é importante para nossos algoritmos que um arquivo vazio seja inserido.

Para o algoritmo replace, que executa uma substituição completa dos dados no receptor, um arquivo vazio simplesmente excluirá todos os dados do receptor e produzirá uma tabela vazia. Para o algoritmo append, um arquivo vazio não fará exatamente nada e os dados permanecerão inalterados. Mas, para o SCD2, isso significa que não há mais dados na fonte e todas as linhas reais existentes devem ser marcadas como excluídas.

Foi decidido que os arquivos vazios não deveriam ser processados pela estrutura e deveriam ser considerados errôneos, porque a probabilidade de a tabela estar realmente vazia na origem é mínima, e é mais provável que um arquivo vazio seja um erro de leitura de dados da origem. Além disso, essa conclusão também foi feita com base nas consultas realizadas pelos seguintes motivos:

  1. Quando o algoritmo é substituído, é melhor que pelo menos alguns dados, mesmo que não estejam totalmente atualizados, permaneçam. Nesse caso, cada linha é marcada com a data e a hora de seu carregamento, o que permite determinar sua relevância sem erros;

  2. Ao executar o algoritmo SCD2, é necessário manter os dados atualizados. Se um arquivo vazio chegar, todas as linhas atuais serão marcadas como excluídas. Então, quando um novo arquivo não vazio chegar, novas linhas reais com os mesmos dados aparecerão, causando um "buraco" no qual todos os dados eram irrelevantes. Essa situação é crítica para os usuários de dados.

Por esses motivos, adicionei uma verificação do vazio de cada novo arquivo antes de ler os próprios arquivos. Se um arquivo estiver vazio, não o processamos, mas simplesmente o adicionamos à lista de arquivos lidos, de modo que eles não sejam mais levados em consideração.

// filePaths - lista de caminhos de arquivos no S3
// read - função para ler dados de um arquivo em um DataFrame usando spark
val filesFromSource =
  filePaths
    .map(filePath => 
         (
           read(filePath).withColumn("filename", input_file_name), 
           filePath
         )
    )
filesNamesForLoad =
  filesFromSource
    .filterNot(x => x._1.isEmpty)
    .map(_._2)
Enter fullscreen mode Exit fullscreen mode

Agora ficamos apenas com os arquivos de dados, que são enviados para processamento pelo algoritmo.

Algoritmo SCD2 para arquivos

No parágrafo "Início do desenvolvimento", descrevi o problema de o algoritmo SCD2 trabalhar com vários arquivos, portanto, não poderemos processar dados de arquivos com o algoritmo SCD2 clássico. Precisamos modificar nosso algoritmo da seguinte forma:

  1. Leremos os arquivos sequencialmente em ordem cronológica. Determinaremos essa ordem pela data e hora especificadas no nome do arquivo (falamos sobre isso no parágrafo "Declaração do problema"). Foi possível obter o tempo de modificação do arquivo a partir das metainformações, mas essa abordagem pode quebrar a cronologia se, por algum motivo, o arquivo for recarregado depois de um arquivo com uma fatia mais recente da tabela de origem.

  2. Aplicaremos o algoritmo SCD2 clássico aos dados de cada arquivo. Nesse caso, ele funcionará corretamente porque os arquivos vêm em ordem cronológica.

Vamos considerar o código do novo algoritmo. O método read() agora tem a seguinte aparência:

def read(filesList: List[String]): List[DataFrame] = {
    filesList.map(filename =>
          spark
            .read
            .format("csv")
            .option("header", header)
            .option("delimiter", delimiter)
            .load(filename)
            .withColumn("filename", input_file_name)
        )
  }
Enter fullscreen mode Exit fullscreen mode

Recebemos uma lista de nomes de arquivos como entrada, os lemos individualmente, adicionamos o nome do arquivo como uma coluna adicional e retornamos uma lista de DataFrames. Cada um dos DataFrames contém dados de um único arquivo. Essa lista é então passada como entrada para o algoritmo SCD2:

val fileNamesIterator = filesList.iterator
val sourceDataframes: List[DataFrame] = read(filesList)
val countFiles = sourceDataframes.length
sourceDataframes.map { data =>
  fileName = fileNamesIterator.next()
  val transformedData = scd2run(data)
  val status = load(transformedData)
  status
}
Enter fullscreen mode Exit fullscreen mode

Nesse trecho de código, lemos os arquivos em sourceDataframes, processamos a lista de DataFrames recebidos um a um e enviamos cada um deles para o método scd2run para processamento e, em seguida, gravamos os dados no receptor.

Conclusão

Vimos como configurar a interação entre o Apache Spark e o S3 para o processamento de arquivos, que conseguimos implementar em um projeto exemplo aqui. Este artigo pode ajudá-lo a lidar com problemas semelhantes e a configurar o S3 mais rapidamente.

Top comments (0)