DEV Community

Cover image for [API Databricks como serviço interno] dbutils — notebook.run, widgets.getArgument, widgets.text e notebook_params
Airton Lira junior
Airton Lira junior

Posted on

[API Databricks como serviço interno] dbutils — notebook.run, widgets.getArgument, widgets.text e notebook_params

Reduzindo Custos com Automação de Processos no Databricks

Tive uma necessidade em um cliente de reduzir o custo de processos que rodavam no Databricks. Uma das features que o Databricks era responsável era de coletar os arquivos de vários SFTP, descompactá-los e colocá-los no Data Lake.

A automação de fluxos de trabalho de dados é um componente crucial na engenharia de dados moderna. Neste artigo, exploraremos como criar uma função AWS Lambda usando GitLab CI/CD e Terraform, que permite a uma aplicação em Go conectar-se a um servidor SFTP, coletar arquivos, armazená-los no Amazon S3 e, por fim, acionar um job no Databricks. Este processo end-to-end é essencial para sistemas que dependem de integração e automação de dados eficientes.

O que Você Vai Precisar para Este Artigo

  • Conta no GitLab com um repositório para o projeto.
  • Conta na AWS com permissões para criar recursos Lambda, S3 e IAM.
  • Conta no Databricks com permissões para criar e executar jobs.
  • Conhecimento básico em Go, Terraform e GitLab CI/CD.

Passo 1: Preparando a Aplicação em Go

Comece criando uma aplicação em Go que se conectará ao servidor SFTP para coletar arquivos. Utilize pacotes como github.com/pkg/sftp para estabelecer a conexão SFTP e github.com/aws/aws-sdk-go para interagir com o serviço S3 da AWS.

package main

import (
 "fmt"
 "log"
 "os"
 "path/filepath"

 "github.com/pkg/sftp"
 "golang.org/x/crypto/ssh"
 "github.com/aws/aws-sdk-go/aws"
 "github.com/aws/aws-sdk-go/aws/session"
 "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func main() {
 // Configuração do cliente SFTP
 user := "seu_usuario_sftp"
 pass := "sua_senha_sftp"
 host := "endereco_sftp:22"
 config := &ssh.ClientConfig{
  User: user,
  Auth: []ssh.AuthMethod{
   ssh.Password(pass),
  },
  HostKeyCallback: ssh.InsecureIgnoreHostKey(),
 }

 // Conectar ao servidor SFTP
 conn, err := ssh.Dial("tcp", host, config)
 if err != nil {
  log.Fatal(err)
 }
 client, err := sftp.NewClient(conn)
 if err != nil {
  log.Fatal(err)
 }
 defer client.Close()

 // Baixar arquivos do SFTP
 remoteFilePath := "/path/to/remote/file"
 localDir := "/path/to/local/dir"
 localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath))
 dstFile, err := os.Create(localFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer dstFile.Close()

 srcFile, err := client.Open(remoteFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer srcFile.Close()

 if _, err := srcFile.WriteTo(dstFile); err != nil {
  log.Fatal(err)
 }

 fmt.Println("Arquivo baixado com sucesso:", localFilePath)

 // Configuração do cliente S3
 sess := session.Must(session.NewSession(&aws.Config{
  Region: aws.String("us-west-2"),
 }))
 uploader := s3manager.NewUploader(sess)

 // Carregar arquivo para o S3
 file, err := os.Open(localFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer file.Close()

 _, err = uploader.Upload(&s3manager.UploadInput{
  Bucket: aws.String("seu-bucket-s3"),
  Key:    aws.String(filepath.Base(localFilePath)),
  Body:   file,
 })
 if err != nil {
  log.Fatal("Falha ao carregar arquivo para o S3:", err)
 }

 fmt.Println("Arquivo carregado com sucesso no S3")
}
Enter fullscreen mode Exit fullscreen mode

Passo 2: Configurando o Terraform

O Terraform será usado para provisionar a função Lambda e os recursos necessários na AWS. Crie um arquivo main.tf com a configuração necessária para criar a função Lambda, as políticas de IAM e os buckets do S3.

provider "aws" {
  region = "us-east-1"
}

resource "aws_iam_role" "lambda_execution_role" {
  name = "lambda_execution_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"
        },
      },
    ]
  })
}

resource "aws_iam_policy" "lambda_policy" {
  name        = "lambda_policy"
  description = "A policy that allows a lambda function to access S3 and SFTP resources"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = [
          "s3:ListBucket",
          "s3:GetObject",
          "s3:PutObject",
        ],
        Effect = "Allow",
        Resource = [
          "arn:aws:s3:::seu-bucket-s3",
          "arn:aws:s3:::seu-bucket-s3/*",
        ],
      },
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
  role       = aws_iam_role.lambda_execution_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

resource "aws_lambda_function" "sftp_lambda" {
  function_name = "sftp_lambda_function"

  s3_bucket = "seu-bucket-s3-com-codigo-lambda"
  s3_key    = "sftp-lambda.zip"

  handler = "main"
  runtime = "go1.x"

  role = aws_iam_role.lambda_execution_role.arn

  environment {
    variables = {
      SFTP_HOST     = "endereco_sftp",
      SFTP_USER     = "seu_usuario_sftp",
      SFTP_PASSWORD = "sua_senha_sftp",
      S3_BUCKET     = "seu-bucket-s3",
    }
  }
}

resource "aws_s3_bucket" "s3_bucket" {
  bucket = "seu-bucket-s3"
  acl    = "private"
}
Enter fullscreen mode Exit fullscreen mode

Passo 3: Configurando o GitLab CI/CD

No GitLab, defina o pipeline CI/CD no arquivo .gitlab-ci.yml. Este pipeline deve incluir etapas para testar a aplicação Go, executar o Terraform para provisionar a infraestrutura e uma etapa para limpeza, se necessário.

stages:
  - test
  - build
  - deploy

variables:
  S3_BUCKET: "seu-bucket-s3"
  AWS_DEFAULT_REGION: "us-east-1"
  TF_VERSION: "1.0.0"

before_script:
  - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )'
  - eval $(ssh-agent -s)
  - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add -
  - mkdir -p ~/.ssh
  - chmod 700 ~/.ssh
  - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts

test:
  stage: test
  image: golang:1.18
  script:
    - go test -v ./...

build:
  stage: build
  image: golang:1.18
  script:
    - go build -o myapp
    - zip -r sftp-lambda.zip myapp
  artifacts:
    paths:
      - sftp-lambda.zip
  only:
    - master

deploy:
  stage: deploy
  image: hashicorp/terraform:$TF_VERSION
  script:
    - terraform init
    - terraform apply -auto-approve
  only:
    - master
  environment:
    name: production
Enter fullscreen mode Exit fullscreen mode

Passo 4: Integrando com o Databricks

Após o upload dos arquivos para o S3, a função Lambda deve acionar um job no Databricks. Isso pode ser feito utilizando a API do Databricks para iniciar jobs existentes.

package main

import (
 "bytes"
 "encoding/json"
 "fmt"
 "net/http"
)

// Estrutura para a requisição de iniciar um job no Databricks
type DatabricksJobRequest struct {
 JobID int `json:"job_id"`
}

// Função para acionar um job no Databricks
func triggerDatabricksJob(databricksInstance string, token string, jobID int) error {
 url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance)
 requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID})
 req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
 if err != nil {
  return err
 }

 req.Header.Set("Content-Type", "application/json")
 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))

 client := &http.Client{}
 resp, err := client.Do(req)
 if err != nil {
  return err
 }
 defer resp.Body.Close()

 if resp.StatusCode != http.StatusOK {
  return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode)
 }

 return nil
}

func main() {
 // ... (código existente para conectar ao SFTP e carregar no S3)

 // Substitua pelos seus valores reais
 databricksInstance := "your-databricks-instance"
 databricksToken := "your-databricks-token"
 databricksJobID := 123 // ID do job que você deseja acionar

 // Acionar o job no Databricks após o upload para o S3
 err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID)
 if err != nil {
  log.Fatal("Erro ao acionar o job do Databricks:", err)
 }

 fmt.Println("Job do Databricks acionado com sucesso")
}
Enter fullscreen mode Exit fullscreen mode

Passo 5: Executando o Pipeline

Faça o push do código para o repositório GitLab para que o pipeline seja executado. Verifique se todos os passos são concluídos com sucesso e se a função Lambda está operacional e interagindo corretamente com o S3 e o Databricks.

Uma vez que você tenha o código completo e o arquivo .gitlab-ci.yml configurado, você pode executar o pipeline seguindo estes passos:

  • Faça o push do seu código para o repositório GitLab:
  git add .
  git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks"
  git push origin master
Enter fullscreen mode Exit fullscreen mode
  • O GitLab CI/CD detectará o novo commit e iniciará o pipeline automaticamente.
  • Acompanhe a execução do pipeline no GitLab acessando a seção CI/CD do seu repositório.
  • Se todos os estágios forem bem-sucedidos, sua função Lambda será implantada e pronta para ser usada.

Lembre-se de que você precisará configurar as variáveis de ambiente no GitLab CI/CD para armazenar informações sensíveis, como tokens de acesso e chaves privadas. Isso pode ser feito na seção ‘Settings’ > ‘CI / CD’ > ‘Variables’ do seu projeto GitLab.

Além disso, certifique-se de que o token do Databricks tenha as permissões necessárias para acionar jobs e que o job exista com o ID fornecido.

Conclusão

A automação de tarefas de engenharia de dados pode ser significativamente simplificada com o uso de ferramentas como GitLab CI/CD, Terraform e AWS Lambda. Ao seguir os passos descritos neste artigo, você pode criar um sistema robusto que automatiza a coleta e integração de dados entre SFTP, S3 e Databricks, tudo isso com a eficiência e a simplicidade da linguagem Go. Com essa abordagem, você estará bem equipado para lidar com os desafios de integração de dados em escala.

Meus contatos:

LinkedIn - Airton Lira Junior

iMasters - Airton Lira Junior

aws #lambda #terraform #gitlab #ci_cd #go #databricks #dataengineering #automation


Aumentando a Autonomia em Pipelines de Dados com Databricks

Com o aumento da demanda por maior autonomia em pipelines de dados, a plataforma de análise de dados Databricks tornou-se uma solução popular para empresas que desejam desenvolver pipelines escaláveis e eficientes. Durante um projeto freelancer no qual me foi solicitado uma maior autonomia sobre o core da empresa, que basicamente é definido por um pipeline de dados que importa arquivos de vários SFTP diferentes, faz a ingestão para o S3, uma série de tratamentos de dados para seguir um layout mandatório e, por fim, o envio para um sistema de fila onde os demais sistemas poderiam usufruir dos dados refinados. Pensei em como poderia atender aos seguintes requisitos com pouco ou quase nenhum desenvolvimento além do que já existe de recurso no Databricks. Por isso, falaremos neste artigo como você pode utilizar a API do Databricks como um serviço interno e dar autonomia para:

  • Reprocessar um arquivo desde sua origem (SFTP).
  • Gerar apenas o JSON que é enviado para o SQS.
  • Processar um arquivo em ambiente de homologação sem enviar para o SQS.

Estas são algumas autonomias que foram exigidas, mas o que será demonstrado aqui vale para qualquer regra de negócio.

dbutils — notebook.run e widgets.get

Antes de algumas evoluções do Databricks em relação a “steps” dentro de um job, eu sempre utilizei uma função muito interessante, que é o dbutils.notebook.run. Com ele, eu conseguia montar uma espécie de orquestração de notebooks que gostaria de executar tanto em sequência como em paralelo, bem como atribuir retry e tempo (parâmetros que você pode passar nessa função). Desta forma, eu montei um único notebook mais ou menos como o abaixo, que fazia a minha orquestração de extração, transformação e envio dos dados de centenas de arquivos:

Exemplo do Notebook de Orquestração

class PropriedadesNoteBook:
  def __init__(self, path, timeout, retry=1):
    self.path = path
    self.timeout = timeout
    self.retry = retry

nbmovefiles = PropriedadesNoteBook("0-move-ftp-to-s3", 500, 3)
nblimpadados = PropriedadesNoteBook("1-notebook_limpa_dados", 1000, 2)
nbimport = PropriedadesNoteBook("2-notebook_importa_arquivos", 1000, 2)
nbidentificadores = PropriedadesNoteBook("3-notebook_processa_identificadores", 1000, 2)
nbprocessjson = PropriedadesNoteBook("4-notebook_processa_json", 2000, 2)
nbexcluidados = PropriedadesNoteBook("6-notebook_envia_json_sqs", 1000, 1)

if dbutils.notebook.run(nbmovefiles.path, nbmovefiles.timeout) != "error":
  send_slack_message("", "Notebook " + nbmovefiles.path + " executado com sucesso!")
else:
  print("Ocorreu um erro na execução do notebook: " + nbmovefiles.path)
  send_slack_message("", f"Ocorreu uma falha na execução do notebook: {nbmovefiles.path}")
  raise

# Repita a lógica para os outros notebooks...
Enter fullscreen mode Exit fullscreen mode

No exemplo acima, eu faço toda a minha etapa de “ETL” de forma sequencial, visto que, em meu caso, os arquivos eram disponibilizados no SFTP de 1 a 3 vezes ao dia. Legal, você tem seu job, sua orquestração, seus notebooks que executam tarefas e seu job é executado em uma janela definida de acordo com o que foi definido com o time de negócios. Mas, depois de desenvolvido, testado e validado, esse processo foi crescendo ao longo do tempo, principalmente em três aspectos:

  1. Muitos setups de novos clientes, que o time de produtos precisava validar o JSON processado pelo Databricks a nível de homologação ou produção.
  2. Recorrência de clientes que, por um erro do lado deles, o arquivo foi disponibilizado fora do schedule do seu job.
  3. Necessidade de enviar o fluxo completo, mas para um sistema de mensageria diferente.

Bom, chegamos onde será demonstrado como utilizar e a efetividade das funções abaixo:

  • notebook_params (parâmetro da API do Databricks).
  • dbutils.widgets.getArgument.
  • dbutils.widgets.text.

Passando e Recebendo Dados Através dos Widgets Entre Notebooks

Conforme havia comentado anteriormente, eu senti a necessidade de trazer mais liberdade para usuários e sistemas e tirar um pouco a preocupação do engenheiro de dados de modificar, executar e monitorar o fluxo de importação, processamento e envio dos dados para o AWS SQS (sistema de fila de mensagens). Desta forma, comecei a elaborar um notebook responsável totalmente pelo reprocessamento das necessidades acima.

Continuando, além do notebook que desenvolvi, que denominei de reprocessamento, criei um segundo notebook chamado orquestrador_reprocessamento e, por último, criei um bucket dedicado a esse tipo de necessidade de reprocessamento. Ótimo, agora como o usuário ou aplicação vai chamar esse notebook e especificar o que ele quer exatamente diante dessas três novas funcionalidades? Para isso, utilizei a própria API do Databricks, composta pelo ID do seu cluster seguido de .cloud.databricks.com/api/2.1/jobs/run-now.

Além disso, conforme mencionei anteriormente, criei um bucket focado em reprocessamento de dados, no qual dentro desse bucket criei as seguintes pastas:

  • hmle/json
  • hmle/sqs
  • json_gerados
  • prod/json
  • prod/sqs

Utilizando a API do Databricks para Reprocessamento Completo

Você pode estar se perguntando: por que introduzi uma lógica do usuário disponibilizar o arquivo no bucket nas pastas específicas de acordo com sua necessidade, mas o reprocessamento completo (SFTP -> ingestão S3 -> limpeza dados -> processamento layout -> envio SQS) não segue esse padrão e sim via API Databricks? Este caso de reprocessamento completo envolve uma etapa na qual o usuário ou API interna envia o arquivo já obtido pelo usuário para um dos buckets, que é o acesso ao SFTP de cada cliente (+40 clientes). Além de ser uma informação sensível, eu preciso garantir que, quando manipulo o SFTP do cliente, o arquivo vai chegar até a etapa final, que é o AWS SQS.

Portanto, no meu notebook de orquestração, valido se dbutils.widgets.get("ftp") existe, se está preenchido como true ou false e, caso seja true, chamo o meu notebook de comunicação e importação com os SFTP dos clientes.

Conclusão

Neste artigo, exploramos as vantagens de utilizar a API do Databricks e seus recursos em um projeto que envolve a importação, processamento e envio de dados para um sistema de fila. Demonstramos como a plataforma Databricks permite a criação de pipelines de dados escaláveis e eficientes, proporcionando maior autonomia aos usuários e sistemas. Utilizando a função dbutils.notebook.run, foi possível orquestrar notebooks e executar etapas de ETL de forma sequencial e paralela, além de atribuir retry e time. Com a implementação dos widgets e a API do Databricks, conseguimos adaptar o fluxo de trabalho de acordo com as necessidades específicas dos usuários, permitindo o reprocessamento de arquivos, geração de JSONs e processamento em ambiente de homologação ou produção.

Meus contatos:

LinkedIn - Airton Lira Junior

databricks #dataengineering #etl #automation #sftp #aws #sqs #api

Top comments (1)

Collapse
 
programmerraja profile image
Boopathi

This article is really interesting! I'm impressed by how you've used Databricks' API and widgets to build such a flexible and adaptable data processing pipeline. I can see how this approach would be extremely helpful for managing different data sources and processing scenarios.