DEV Community

Cover image for Taller Kinesis Data Firehose y S3
Francisco
Francisco

Posted on • Edited on

Taller Kinesis Data Firehose y S3

Arquitectura del  Taller de Python + Kinesis Firehose + S3
Arquitectura del Taller de Python + Kinesis Firehose + S3

En este taller, aprenderemos cómo procesar archivos CSV con AWS utilizando Kinesis Data Firehose y S3. Además, utilizaremos Python como fuente de datos desde una maquina externa para enviar los datos al flujo de Firehose.

Requisitos previos:

  1. Una cuenta de AWS con acceso a Kinesis Data Firehose y S3.
  2. Python 3.x instalado en local.
  3. Tener instalada la liberias boto3 y pandas

Paso 1: Configurar el bucket de S3

Lo primero que debemos hacer es crear un bucket de S3 donde almacenaremos nuestros datos. Para hacer esto, vamos a la consola de AWS y seleccionamos S3. Luego, hacemos clic en el botón "Crear bucket" y seguimos las instrucciones para nombrar el bucket y establecer las opciones de configuración.

Crear Bucket s3

Paso 2: Configurar el flujo de Kinesis Data Firehose

A continuación, creamos un flujo de Kinesis Data Firehose que recibirá los datos de nuestra fuente de datos y los almacenará en nuestro bucket de S3. Para hacer esto, vamos a la consola de AWS y seleccionamos Kinesis Data Firehose. Luego, hacemos clic en "Crear flujo de Firehose" y seguimos las instrucciones para nombrar nuestro flujo y establecer las opciones de configuración.

Para la opciones de configuración:

  • Seleccionaremos en origen como Direct PUT

  • Destino S3

  • Nombre de la secuencia de entrada cualquier nombre para el ejemplo colocare taller1-csv. El nombre que se coloque se convertira en el identificacion del servicio el cual recibira la trama que enviemos mas adelante.

  • Prefijo de bucket de S3
    data/input/firehose/movie/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/

  • Prefijo de salida de error de bucket S3
    data/input/firehose/error

Los prefijos crear las carpetas necesarias de no existir al momento de recibir la trama o caer en error.

  • Tamaño del búfer 1 MiB (datos)

  • Intervalo de almacenamiento en el búfer 60 segundos

Para las configuraciones del búfer para que la trama sea procesada debe cumplir cualquiera de las dos opciones el tamaño o el intervalo

Imagen de configuración

Image description

Paso 3: Configurar la fuente de datos

Antes de configurar la fuente de datos requerimosPara poder conectarnos a servicio de AWS desde una maquina externa necesitaremos crear una clave de acceso para esto se debe realizar los siguientes pasos:

  • Haz clic en tu nombre de usuario en la esquina superior derecha de la página y selecciona "My Security Credentials" (Mis credenciales de seguridad).
    Si se te solicita, ingresa tu nombre de usuario y contraseña de AWS nuevamente.

  • En la pestaña "Access keys" (Claves de acceso), haz clic en "Create New Access Key" (Crear nueva clave de acceso).
    Haz clic en el botón "Download Key File" (Descargar archivo de clave) para descargar la clave de acceso en un archivo .csv. También puedes copiar y pegar la clave de acceso y la clave secreta en un lugar seguro.

  • Asegúrate de guardar el archivo de la clave de acceso de manera segura, ya que no se mostrará nuevamente. Si pierdes la clave de acceso, deberás crear una nueva.

Acceso a Clave

Una vez que hemos creada la clave de acceso y nuestro flujo de Kinesis Data Firehose, debemos configurar la fuente de datos para que sepa de dónde tomar los datos. En este caso, utilizaremos Python como fuente de datos. Para hacer esto, escribiremos un script Python que lea los datos del archivo CSV y los envíe al flujo de Kinesis Data Firehose utilizando la biblioteca Boto3 de AWS para interactuar con los servicios de AWS desde nuestro código Python.

import boto3
import time
import json
import pandas as pd

AWS_ACCESS_KEY = #credenciales
AWS_SECRET_KEY = #credenciales
REGION_NAME = #Region

DeliveryStreamName = 'taller1-csv' #paso 2
firehose = boto3.client('firehose',aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=REGION_NAME
)


record = {}
bad_lines = []
column_names = ["MovieID", "YearOfRelease", "Title" ]
df = pd.read_csv("movie_titles.csv",  encoding = "ISO-8859-1" , names=column_names , error_bad_lines=False)
for index, row in df.iterrows():
 record = {'MovieID':row[0],
    'YearOfRelease':row[1],
    'Title':row[2] 
    }
 response = firehose.put_record(
        DeliveryStreamName = DeliveryStreamName,
        Record = {
            'Data': json.dumps(record)
        }
    )      
print('Dato de movie enviado a Kinesis Data Firehose : \n' + str(record))
time.sleep(.5)
Enter fullscreen mode Exit fullscreen mode

Paso 4: Enviar los datos al flujo de Kinesis Data Firehose
Ahora, podemos ejecutar nuestro script Python y enviar los datos al flujo de Kinesis Data Firehose. Los datos serán transformados y almacenados en nuestro bucket de S3 según las configuraciones establecidas en los pasos anteriores.

Paso 5: Verificar los datos almacenados en S3
Finalmente, podemos verificar que nuestros datos se han almacenado correctamente en nuestro bucket de S3. Podemos acceder a nuestro bucket desde la consola de AWS y verificar que el archivo CSV se ha transformado en el formato deseado.

Resultado

Conclusión
En este taller, hemos aprendido cómo procesar archivos CSV utilizando AWS con Kinesis Data Firehose y S3. Además, hemos utilizado Python como fuente de datos para enviar los datos al flujo de Firehose. AWS ofrece una amplia gama de servicios y herramientas para procesar y analizar datos, lo que hace que sea fácil y escalable trabajar con grandes volúmenes de datos en la nube.

** Despligue con Cloudformation**

Description: Stack Lab Firehose
Parameters:
  NombreBucket:
    Description: bucketS3
    Type: String
    Default: aws-firehose
Resources:
  deliverystream:
    DependsOn:
      - deliveryPolicy
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: taller1-csv
      ExtendedS3DestinationConfiguration:
        BucketARN: !Join 
          - ''
          - - 'arn:aws:s3:::'
            - !Ref s3bucket
        BufferingHints:
          IntervalInSeconds: '60'
          SizeInMBs: '1'
        CompressionFormat: UNCOMPRESSED
        Prefix: data/input/firehose/movie/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
        ErrorOutputPrefix: data/input/firehose/error
        RoleARN: !GetAtt deliveryRole.Arn
  s3bucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Join [ -, [ !Ref NombreBucket, !Ref AWS::AccountId , 'movie' ] ]
  deliveryRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Sid: ''
            Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: 'sts:AssumeRole'
            Condition:
              StringEquals:
                'sts:ExternalId': !Ref 'AWS::AccountId'
  deliveryPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: firehose_delivery_policy
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - 's3:AbortMultipartUpload'
              - 's3:GetBucketLocation'
              - 's3:GetObject'
              - 's3:ListBucket'
              - 's3:ListBucketMultipartUploads'
              - 's3:PutObject'
            Resource:
              - !Join 
                - ''
                - - 'arn:aws:s3:::'
                  - !Ref s3bucket
              - !Join 
                - ''
                - - 'arn:aws:s3:::'
                  - !Ref s3bucket
                  - '*'
      Roles:
        - !Ref deliveryRole
Enter fullscreen mode Exit fullscreen mode

Top comments (0)