DEV Community

Mohamed Radwan for AWS Community Builders

Posted on • Updated on

Lambda Router for Data Pipeline

Lambda Router

In this article, I am going to show you how to use Lambda as a router in your data analytics projects.
Which service you will use depends on the project, for example, some projects have small files size and the processing job takes around 10 minutes maybe lambda is a good use, other projects have big files and the processing times takes more than two hours maybe Fargate is a good choice.

The data flow in the architecture diagram shows that users upload files to the s3 bucket then the Lambda router will be invoked, Lambda will check the event key, and then query the path from the DynamoDB table.
DynamoDB responses with trigger,arn, and other attributes, Lambda router checks the trigger if true and then checks which arn needs to pass the event to which service.

Lambda router can pass the event to the following services:

  • Fargate
  • SQS
  • SNS
  • Lambda

The benefits of using this architecture:

  • Based on Serverless means never pay for idler resources, only pay per event and storge.
  • Scale with resilience and flexibility, for example: the processing of the files can be in parallel, if you used the fargate service for each file that has been uploaded then if users upload 10 files at the same time you will start 10 tasks in parallel and get the results once each job is done and pay the same amount if you doing it in series.

Lambda router looks like this:

import boto3
import json
from boto3.dynamodb.conditions import Key

region_name='eu-west-1' 
table_name='lambda-router' 


def get_path(path):
    query_table = boto3.resource("dynamodb", region_name= region_name, verify=True).Table(table_name)
    return query_table.query(
           KeyConditionExpression=Key("path").eq(path)
           )

def lambda_handler(event, context):
  key = event['Records'][0]['s3']['object']['key']
  directory=key.split('/')[:2] 
  path ='/'.join(directory)+'/'  
  get_data = get_path(path)
  data = get_data['Items'][0]
  data['object'] = event['Records'][0]['s3']['object']
  arn = data['arn']

  # check the trigger
  if data['trigger'] != 'true':
      return {
        'statusCode': 200,
        'body': json.dumps('Trigger is False')
    } 

  # trigger lambda
  if arn.split(':')[2] == 'lambda':
      return(trigger_lambda(arn,data))

  # publish to sns topic
  if arn.split(':')[2] == 'sns':
      return(publish_sns(arn,data))

  # run ecs fargate task
  if arn.split(':')[2] == 'ecs':
      task=arn.split(':')[-2]
      task_revision=arn.split(':')[-1]
      task_definition= task.split('/')[-1] + ':' + task_revision
      return(run_ecs(data, data['cluster_name'], task_definition, int(data['task_count']), data['subnets'], data['securitygroups']))

  # send to sqs
  if arn.split(':')[2] == 'sqs':
      return(message_sqs(arn.split(':')[-1],data))

def trigger_lambda(arn,data):
    lambda_client = boto3.client('lambda')
    response=lambda_client.invoke(
    FunctionName=arn,
    InvocationType='Event',
    Payload=json.dumps(data),
    )
    return json.loads(json.dumps(response, default=str))

def publish_sns(arn,data):
    sns_client = boto3.client('sns')
    response = sns_client.publish(
        TargetArn=arn,
        Message=json.dumps({'default': json.dumps(data)}),
        MessageStructure='json'
        )
    return response

def run_ecs(event, cluster_name, task, count, subnets, securitygroups):
    ecs = boto3.client('ecs')
    response = ecs.run_task(
    cluster=cluster_name,
    launchType = 'FARGATE',
    taskDefinition=task,

    overrides={
        'containerOverrides': [
            {
                'name': task.split(':')[0],
                'environment': [
                    {
                        'name': 'path',
                        'value': event['path']
                    },
                    {
                        'name': 'trigger',
                        'value': str(event['trigger'])
                    },
                    {
                        'name': 'project-name',
                        'value': event['project-name']
                    },
                    {
                        'name': 'arn',
                        'value': event['arn']
                    },
                    {
                        'name': 'key',
                        'value': event['object']['key']
                    },
                    {
                        'name': 'etag',
                        'value': event['object']['eTag']
                    }


                ]
            },
        ],
    },
    count = count,
    platformVersion='LATEST',
    networkConfiguration={
        'awsvpcConfiguration': {
            'subnets': subnets
            ,
            'securityGroups':
                securitygroups
            ,
            'assignPublicIp': 'DISABLED'
        }
    })
    return response

def message_sqs(queue, event):
        sqs = boto3.resource('sqs')
        queue = sqs.get_queue_by_name(QueueName=queue)
        response = queue.send_message(MessageBody=str(event), MessageAttributes={
            'path':   {
                'DataType': 'String',
                'StringValue': event['path']
                },
            'key':   {
                'DataType': 'String',
                'StringValue': event['object']['key']
                },
            'size':   {
                'DataType': 'String',
                'StringValue': str(event['object']['size'])
                },
            'eTag':   {
                'DataType': 'String',
                'StringValue': event['object']['eTag']
                }


        })
        return response
Enter fullscreen mode Exit fullscreen mode

Data in DynamoDB

Run ECS task:

{
 "path": "data/project1/",
 "arn": "arn:aws:ecs:eu-west-1:123456789:task-definition/gozeit:2",
 "cluster_name": "ecs-cluster",
 "project-name": "project1",
 "securitygroups": [
  "sg-04d32af1102317415"
 ],
 "subnets": [
  "subnet-68ac7a21",
  "subnet-d73ef6b0",
  "subnet-a74a5dff"
 ],
 "task_count": "1",
 "trigger": "true"
}
Enter fullscreen mode Exit fullscreen mode

Publish to SNS:

{
 "path": "data/project2/",
 "arn": "arn:aws:sns:eu-west-1:123456789:secprod",
 "project-name": "project2",
 "trigger": "true"
}
Enter fullscreen mode Exit fullscreen mode

Trigger Lambda:

{
 "path": "data/project3/",
 "arn": "arn:aws:lambda:eu-west-1:123456789:function:lambda-dummy",
 "project-name": "project3",
 "trigger": "false"
}
Enter fullscreen mode Exit fullscreen mode

Trigger SQS:

{
 "path": "data/project4/",
 "arn": "arn:aws:sqs:eu-west-1:123456789:workers",
 "project-name": "project4",
 "trigger": "true"
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)