DEV Community

camilo cabrales
camilo cabrales

Posted on • Updated on

Procesamiento de Streams de Datos

En este post vamos a ver como procesar un stream datos con Kinesis y Lambda.

Kinesis es un servicio que nos sirve para procesar datos en tiempo real. Algunas de sus caracteristicas son:

  • Esta compuesto por Shard.Un Shard es la capacidad de datos que se pueden consumir, equivale a un 1 MB por segundo o mil registros por segundo.

  • Los mensajes pueden ser almacenados entre 1 y 365 días.

  • Dependiendo de la cantidad de data a consumir los shard se pueden de dividir (split) para tener más capacidad de consumo o se pueden unir (merge) para disminuir la capacidad de consumo.

  • Partition Key es el identificador que se utiliza para decir en qué Shard se quiere almacenar el registro. Se define cuando se envía un mensaje a Kinesis.

Para este ejercicio vamos a utilizar el stream de wikipedia:Stream
qué va a ser procesado por Kinesis.

Empecemos creando un Data Stream. Vamos a buscar Kinesis en la barra de búsqueda, en la pantalla principal de Kinesis seleccionamos Kinesis Data Streams y damos click en Create data stream.

Principal Kinesis

Ahora configuremos nuestro data stream. Vamos a darle el nombre de: Wikipedia-Stream, seleccionamos el modo Provisioned para aprovisionar un shard y damos click en el botón crear.

Create Kinesis 1

Create Kinesis 2

Una vez creado nuestro data stream en la pestaña de configuración podemos modificar: la cantidad de shards aprovisionados, la encripción, el periodo de retención o si queremos tener métricas mejoradas en CloudWatch.

Configuration Tab

Ya que tenemos configurado nuestro data stream debemos enviarle la información.
Vamos a utilizar el siguiente programa de Python para enviar los datos a Kinesis utilizando la librería boto3. El programa esta leyendo el siguiente stream de datos: https://stream.wikimedia.org/v2/stream/recentchange

import json
from sseclient import SSEClient as EventSource
import boto3
from datetime import datetime

clientKinesis = boto3.client('kinesis')
clientLambda = boto3.client('lambda')
url = 'https://stream.wikimedia.org/v2/stream/recentchange'

def get_object(change):
    return json.dumps({"id": change["id"],"bot":change["bot"],"length":change["length"]["new"],
                    "timestamp":str(datetime.fromtimestamp(change["timestamp"])),"type":change["type"],"user":change["user"],
                    "wiki":change["wiki"],"server_name":change["server_name"]})

def put_kinesis(data):
    return clientKinesis.put_record(
        StreamName='Wikipedia-Stream',
        Data=data,
        PartitionKey='1'
    )      


for event in EventSource(url):

    if event.event == 'message':
        try:
            change = json.loads(event.data)
            if ("id" in change and "bot" in change and "bot" in change and "length" in change 
                and "timestamp" in change and "type" in change
                and "user" in change and "wiki" in change and "server_name" in change): 
                put_kinesis(get_object(change))
        except ValueError:
            pass
Enter fullscreen mode Exit fullscreen mode

Es necesario contar con las siguientes librerías instaladas para ejecutar el programa:

  • boto3 pip install boto3
  • sseclient pip install sseclient
  • tener instalada y configurada la CLI

Ya que hemos configurado el productor de datos debemos configurar el consumidor que en este caso va a ser una función Lambda.

Vamos a buscar el servicio Lambda en el buscador y en la pagina principal damos click en el botón Create function.

Principal Lambda

Debemos definir el nombre de la función: lambda-stream, el runtime: python3.7 y dejamos el rol por defecto por ahora.

Principal Lambda

Creada nuestra función Lamba debemos configurar el rol de la función para que pueda acceder a Kinesis. Para esto vamos a la pestaña configuración y damos click en el rol de la función.

Edir Rol

Debemos añadir la políticas necesarias: AmazonKinesisFullAccess y la política de CloudWatchFullAccess para poder monitorear los llamados a nuestra función.

Policy Kinesis

Policy CloudWatch

Finish ROL

Una vez creada nuestra función y asignado los permisos necesarios para su ejecución debemos configurar un trigger para que procese los datos de nuestro data stream.

Add Trigger

EL siguiente paso es configurar el trigger, en donde el tipo de trigger es Kinesis, seleccionamos el nombre del data stream que ya hemos creado, definimos la cantidad de registros que queremos leer (en este caso vamos a leer de un solo registro) y definimos la posición en donde debemos comenzar a leer (Latest).

Save Trigger

Ya que terminamos la configuración vamos a modificar nuestra función Lambda para que imprima los datos que se reciben desde Kinesis.

import json
import base64


def lambda_handler(event, context):
    #Obtiene toda la infomación enviada por Kinesis
    print("EVENTO: " ,event)
    #Obtiene solamente los datos 
    print("DATA: ",json.loads(base64.b64decode(event["Records"][0]["kinesis"]["data"])))
    # TODO implement
    return {
        'statusCode': 200,
        'body': json.dumps('Kinesis!')
    }
Enter fullscreen mode Exit fullscreen mode

El siguiente paso es ejecutar (python3 nombrearchivo.py) el programa para que empiece a enviar los datos a Kinesis.
Para ver los datos que procesa la función Lambda pestaña Monitor

Monitor

En la siguiente pantalla vemos los datos que enviamos a imprimir en el log de Cloud Watch

Log

Ahora que sabemos como obtener los datos que enviamos a Kinesis los invito a guardar esta información en DynamoDB, RDS o procesarla con otro de servicio.

Me pueden encontrar en:

Camilo Cabrales

Referencias

Kinesis

Install Boto3

Install sseclient

Install CLI

DynamoDb TTL

Discussion (0)