DEV Community

Cover image for Serverless Notification System Implementation With Step Functions Workflow
Swapnil Pawar
Swapnil Pawar

Posted on

Serverless Notification System Implementation With Step Functions Workflow

Scenario

A client is running call center workloads in production and they are using “Verint“ third-party vendor system that uploads recordings (.tar) to the Amazon S3 bucket for backup and DR purposes.

The third-party Verint system is creating .tar files in the backend and uploading them using an S3 bucket using a multipart processing approach. The issue that we found is that there is no way to track if any chunk of the .tar files has failed to upload to the S3 bucket.

Due to that, it has created an issue from a Compliance point of view.

Approach

To identify the root cause and possible solution in identifying the failure backups in Amazon S3, we need to check S3 server logs and then Develop and configure email alert notifications whenever there is a failure in file backup in Amazon S3.

To know more about S3 Server Access logging, please check the ref link [1]

To start with, we need to check S3 logs to get better visibility of the error and to analyze S3 Server Access Logs at scale, we have used Amazon Athena (Serverless Interactive Query Service) which makes it easy to analyze data in Amazon S3 using standard SQL.

Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

There were going to be multiple other processing steps that were going to involve so we have decided to go with Workflow Orchestration Service (AWS Step Functions) [2] to automate business processes.


The analytic queries in this blog post focus on use case:

  • Store file upload logs in Amazon Simple Storage Service (Amazon S3 Server Level Logging)
  • Use Athena query Amazon Simple Storage Service (Amazon S3) server access logs
  • Use Serverless orchestration using Step Function to Automate the Notification Workflow.

Note: This complete workflow will be run using Cloudwatch Events (CRON JOB) on a daily basis.


Step Function Reference Workflow:

I have used Step Function Workflow Studio to design that makes it faster and easier to build workflows using a drag and drop interface in the AWS console.

Let me show you how easy is to create a state machine using Workflow Studio. To get started, go to the Step Functions console and create a state machine. You will see an option to start designing the new state machine visually with Workflow Studio.

Step Function AWS Console

Here are some of the available flow states:

Choice: Adds if-then-else logic.
Parallel: Adds parallel branches.
Map: Adds a for-each loop.
Wait: Delays for a specific time.

Step Function Workflow

The above step function workflow is broken down step by step below:

Step 1:
Fetch Call center Records lambda function executes Athena Query to get the daily records of S3 Server Access Logs from Athena Table. To learn more about query S3 server access logs and create table data, Check link [3]

def lambda_handler(event, context):

    Previous_Date = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d:%H:%M:%S')
    NDate = datetime.datetime.now() - datetime.timedelta(days=1)
    Next_Date = datetime.datetime.strftime(NDate, '%Y-%m-%d:%H:%M:%S')


    #Rendering Environment Variables
    AthenaDB = os.environ['athena_db']
    AthenaTable = os.environ['athena_table']
    AthenaoutputLocation = os.environ['athena_query_output']

    # number of retries
    RETRY_COUNT = 10

    #Initialize Boto3 Athena Client
    client = boto3.client('athena')

    #Query to get the records of duration when customer connected to contact center till he gets connected to an agent.
    query = """SELECT bucket_name,key,httpstatus,requestdatetime,request_uri,errorcode, count(*) as total FROM "{}"."{}" where {} != '{}' and {} {} and {} BETWEEN {} and {} GROUP BY {}, {}, {}, {}, {}, {};""".format(AthenaDB, AthenaTable, 'httpstatus', '200', 'requester', 'IS NOT NULL', "parse_datetime(requestdatetime,'dd/MMM/yyyy:HH:mm:ss Z')", "parse_datetime('"+str(Next_Date)+"','yyyy-MM-dd:HH:mm:ss')", "parse_datetime('"+str(Previous_Date)+"','yyyy-MM-dd:HH:mm:ss')", 'key', 'httpstatus', 'bucket_name', 'requestdatetime', 'request_uri', 'errorcode')
    # Executes Athena Query To Get The BSC Reports
    try:
        # Athena Query Execution
        response = client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={
                'Database': AthenaDB
            },
            ResultConfiguration={
                "EncryptionConfiguration": {
                    "EncryptionOption": "SSE_S3"
                },
                'OutputLocation': AthenaoutputLocation
            }
        )
        if response:
            print("Successfully Executed:"+ response['QueryExecutionId'])
            # get query execution id
            query_execution_id = response['QueryExecutionId']
            print(query_execution_id)

            # get execution status
            for i in range(1, 1 + RETRY_COUNT):

                # get query execution
                query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
                query_execution_status = query_status['QueryExecution']['Status']['State']

                if query_execution_status == 'SUCCEEDED':
                    print("STATUS:" + query_execution_status)
                    break

                if query_execution_status == 'FAILED':
                    raise Exception("STATUS:" + query_execution_status)

                else:
                    print("STATUS:" + query_execution_status)
                    time.sleep(i)
            else:
                client.stop_query_execution(QueryExecutionId=query_execution_id)
                raise Exception('TIME OVER')
            return response
        else:
            return build_internal_error_response("Unexpected error while completing Athena Start Execution API request", str(ex))    
    except Exception as ex:
        print(ex)
        return build_error_response('Customer error while making API request', str(ex))

def build_internal_error_response(internal_error_message, internal_error_details=None):
    return build_error_response(internal_error_message, internal_error_details, 'InternalError', 'InternalError')

def build_error_response(internal_error_message, internal_error_details=None, customer_error_code=None, customer_error_message=None):
    error_response = {
        'internalErrorMessage': internal_error_message,
        'internalErrorDetails': internal_error_details,
        'customerErrorMessage': customer_error_message,
        'customerErrorCode': customer_error_code
    }
    print(error_response)
    return error_response

Enter fullscreen mode Exit fullscreen mode

Step 2: Added Wait Stage to give Athena query a little time to finish query and upload results to S3 bucket.

Step 3: Athena query created a results file name by QueryExecutionId so We are getting Query Execution Id to identity S3 Object in a later state.

Athena GetQueryExecution

Step 4: Added choice state to execute based on Success or Failed response.

Step 5: If Succeded, Based on QueryExecutionId, we are getting a results file from S3.

Step 6 & 7: We are using SES( Simple Email Service) to send email notifications. Since This is a sandbox env of SES, we have verified few identities and getting only verified identities list (In case there are any pending, failed status identities)

SES Identities

SES GetIdentity Verification Attributes

Step 8: Created another “Process-***-Records” Lambda function to build up SES email functionality and get the S3 object as an attachment in the email which contains records that are failed to upload to S3.

Step 9: If SES failed to send an email, Sys Admin will be notified of the error to track down the issue.

Step 10: If successful execution, it will be in a Success state otherwise will be in the Failed state.

References:

[1] Logging requests using server access logging - Amazon Simple Storage Service

[2] https://aws.amazon.com/step-functions

[3] Using Amazon S3 access logs to identify requests - Amazon Simple Storage Service
[4] Analyze my Amazon S3 server access logs using Athena

Discussion (0)