DEV Community

Cover image for Notification Based on Email Content Build on Serverless.
jguo
jguo

Posted on • Updated on

Notification Based on Email Content Build on Serverless.

About

Ark invest ETF is a rising star on the stock market. If you subscribe to the ETF trading notifications, you could receive their trading information at the end of each day. But, I don't really want to get all their trading information. I just want to know their new holdings. So, I'd like to build a system to send me notifications only I care about.

Desgin

Alt Text

Build the app step by step

1. Create a Google Developer App
Follow this post to create a Gmail app on google cloud.
Note: choose Desktop Application when you create OAuth token

2. Create first Lambda to process email
First, let's create a script to get OAuth token for accessing the Gmail app. (client_secret.json is the file which we get from step 1)

import os
from googleapiclient import errors, discovery
from oauth2client import client, tools, file

SCOPES = ['https://mail.google.com/']
def get_credentials():
    wd = os.getcwd()

    # creates credentials with a refresh token
    credential_path = os.path.join(wd,
                                  'credentials_out.json')
    store = file.Storage(credential_path)
    creds = store.get()
    if not creds or creds.invalid:
        flow = client.flow_from_clientsecrets('client_secret.json', SCOPES)
        creds = tools.run_flow(flow, store)
    return creds

if __name__ == '__main__':
    get_credentials()
Enter fullscreen mode Exit fullscreen mode

In the out file (credentials_out.json), we should be able to find client_id, client_secret, and refresh_token information.

Second, let's create build the lambda function to process email content. Find source code on github

import os
from googleapiclient import errors, discovery
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from email.mime.text import MIMEText
import base64
import re
import requests
from datetime import date, datetime
from pytz import timezone
import boto3
from oauth2client import client, tools, file
import httplib2
from bs4 import BeautifulSoup
import csv

# TODO
# 1. Validate the date of the email.

# If modifying these scopes, delete the file token.pickle.
SCOPES = ['https://mail.google.com/']
# ARK_INVEST_QUERY="from:ark@arkinvest.com subject:ARK Investment Management Trading Information"
ARK_INVEST_QUERY=os.environ['EMAIL_QUERY']
S3_BUCKET ="ark-fly"
OBJECT_KEY_PATTERN="dailytradingtrans/{today}-trading.csv"
SEND_NOTIFICATION_TO = "guojiayanc@gmail.com"
SENDER = "noreply@arkfly.com"
TEMP_TRADING_CSV_FILE="/tmp/trading.csv"

def main():
  service = login()
  try:
    result = get_messages(service, 'me')
    if 'messages' in result:
      messageId = result['messages'][0]['id']
      data = get_message('me', messageId, service)
      print(data)
      generate_csv(data)
      today = get_date()
      upload_to_s3(OBJECT_KEY_PATTERN.format(today=today))
      delete_massage(service, 'me', messageId)
    else:
      print("No message found!")
  except Exception as error:
    # TODO Sender is not set correctly
    print("ARK Fly processing failed " + str(error))
    message = create_message(SENDER, SEND_NOTIFICATION_TO, "ARK Fly processing failed", str(error))
    send_message(service, 'me', message)

def delete_massage(service, user_id, message_id):
  try:
    message = service.users().messages().delete(userId=user_id, id=message_id).execute()
  except Exception as error:
    raise error

def login():
  try:
    credentials = get_credentials()
    http = credentials.authorize(httplib2.Http())
    return discovery.build('gmail', 'v1', http=http, cache_discovery=False)
  except Exception as error:
    print("Log into gmail failed!" + str(error))
    raise error

def get_credentials():
  client_id = os.environ['GMAIL_CLIENT_ID']
  client_secret = os.environ['GMAIL_CLIENT_SECRET']
  refresh_token = os.environ['GMAIL_REFRESH_TOKEN']

  credentials = client.GoogleCredentials(None,
  client_id,
  client_secret,
  refresh_token,
  None,
  "https://accounts.google.com/o/oauth2/token",
  'my-user-agent')

  return credentials

def upload_to_s3(object_name):
  client = boto3.client('s3')
  try:
    response = client.upload_file(TEMP_TRADING_CSV_FILE, S3_BUCKET, object_name)
  except Exception as error:
    raise Exception("faile to upload to s3! " + str(error))

def get_date():
  tz = timezone('EST')
  today = datetime.now(tz).strftime("%Y-%m-%d")
  return today

def get_message(user_id, message_id, service):
  try:
    message = service.users().messages().get(userId=user_id, id=message_id).execute()
    data = ''
    print(message)
    if 'parts' in message['payload']:
      for part in message['payload']['parts']:
        data += base64.urlsafe_b64decode(part['body']['data']).decode("utf-8") + "\n"
    else:
      data += base64.urlsafe_b64decode(message['payload']['body']['data']).decode("utf-8") + "\n"
    return data
  except Exception as error:
    raise error

def generate_csv(data):
  try:
    bs=BeautifulSoup(data, 'html.parser')
    table_body=bs.find('table')
    rows = table_body.find_all('tr')
    csv_rows = []
    for row in rows:
      cols=row.find_all('td')
      cols=[x.text.strip().replace('\r\n', ' ') for x in cols]
      csv_rows.append(cols)
    with open(TEMP_TRADING_CSV_FILE, "w") as f:
      wr = csv.writer(f)
      wr.writerows(csv_rows)
  except Exception as error:
    raise Exception("Today's trading table not found!" + str(error))

def create_message(sender, to, subject, message_text):
  message = MIMEText(message_text)
  message['to'] = to
  message['from'] = sender
  message['subject'] = subject
  raw_message = base64.urlsafe_b64encode(message.as_string().encode("utf-8"))
  return {
    'raw': raw_message.decode("utf-8")
  }

def send_message(service, user_id, message):
  try:
    message = service.users().messages().send(userId=user_id, body=message).execute()
    print('Message Id: %s' % message['id'])
    return message
  except Exception as error:
    print('An error occurred: %s' % error)
    raise error

def get_messages(service, user_id):
  try:
    return service.users().messages().list(userId=user_id, q=ARK_INVEST_QUERY).execute()
  except Exception as error:
    print('An error occurred: %s' % error)
    raise error

# For local test
if __name__ == '__main__':
    main()

def lambda_handler(event, context):
  main()
  return {
    "status":200
  }
Enter fullscreen mode Exit fullscreen mode

Then, let's containerize the python code and push it to ecr on aws.

FROM amazon/aws-lambda-python:3
WORKDIR /var/task/
COPY . ./
RUN pip install -r requirements.txt
CMD [ "app.lambda_handler" ]
Enter fullscreen mode Exit fullscreen mode

And build a simple script to push the docker image to ecr (change account id to your aws accountid)

#!/bin/sh
set -e

docker build . -t ark-go-email
docker tag ark-go-email:latest 120400168286.dkr.ecr.us-west-2.amazonaws.com/ark-go-email:latest
aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin 120400168286.dkr.ecr.us-west-2.amazonaws.com
docker push 120400168286.dkr.ecr.us-west-2.amazonaws.com/ark-go-email:latest
Enter fullscreen mode Exit fullscreen mode

Next, create an AWS lambda from docker container image, check this post for how.
Note: we need to create 4 lambda environment variables.

EMAIL_QUERY=from:ARK Trading Desk <ark@ark-funds.com> subject:ARK Investment Management Trading Information is:unread newer_than:1d
GMAIL_CLIENT_ID=<client_id>
GMAIL_CLIENT_SECRET=<client_secret>
GMAIL_REFRESH_TOKEN=<refresh_token>
Enter fullscreen mode Exit fullscreen mode

Last, create a cloud watch event to trigger the lambda on cron based.
Alt Text

3. Create Second Lambda to fetch ark holding information
Source code can be found here
All the steps should be the same as in section 2. But, the cron express is cron(0 4 ? * TUE-SAT *).

4. Create Third Lambda to process data generated by section 2 and 3
Source code can be found here

5. Create Fourth Lambda to send an email notification
Source code can be found here

6. Setup s3 event notifications
Choose the s3 bucket -> Properties -> Event notifications.
Alt Text

Note: In section 2, after it processes the email, it uploads a generated csv to s3. And it should trigger the third lambda in section 4. In section 4, if the lambda finds any new holds, it will upload the data s3, which will trigger the last lambda to send a notification. If no new holdings, do nothing. It will not trigger the last lambda.

Top comments (1)

Collapse
 
jiayanguo profile image
jguo

This is a very generic document. Add a comment if you have have any questions. Happy to help. Build something almost free is really cool.