DEV Community

CincyBC
CincyBC

Posted on

Class to Airflow Custom Operator

In the last post, we created a very simple single, task Airflow DAG to run our Sprott Scraper. However, just like our functional scraper, that DAG was a one trick pony that couldn't easily be used for something else. For that, we would need to break it up like we did in the abstract class methods with an extract object, transform object, and load object. This time, each object becomes a task, which is executed in Airflow by Operators. Let's first look at how to create a custom operator from our Python class.

If you noticed the code block after I made the functional sprott scraper DAG, you'd see I replaced the @task decorator with the classic way of doing it with the PythonOperator explicitly called out. You could conceivably write all your tasks as Python functions, but what if you wanted to create reusable classes like we did in our Python pipeline? You can create your own Airflow Operator really easily!

Let's start with the class scraper that we had from a few weeks ago. I'll modify it in only 2 spots and create a custom operator.

# fund_scraper_operator.py
import requests
from bs4 import BeautifulSoup
import json
# To turn into an operator, you just need to inherit the BaseOperator
from airflow.models.baseoperator import BaseOperator


class FundScraperOperator(BaseOperator):
    def __init__(self, url):
        self.url = url

    def web_call(self):  # Extract
        r = requests.get(self.url)
        if r.status_code == 200:
            soup = BeautifulSoup(r.content, "html.parser")
            return soup
        else:
            return r.status_code

    def get_fund_values(self, soup, index, class_name, replace_list):  # Transform
        fund_values = soup.find_all('div', class_=class_name)
        value = fund_values[index].contents
        for x in replace_list:
            value = value.replace(x, '')
        return str(value[0]).strip()

    def write_json(self, data, filename='data.json'):  # Load
        with open(filename, 'w') as f:
            json.dump(data, f, indent=4)
    # You will override "execute" in the BaseOperator with this.
    def execute(self):
        soup = self.web_call()
        data = {}
        data['shareprice'] = self.get_fund_values(
            soup, 4, 'fundHeader_value', ['$US', ','])
        data['u3o8_stock'] = self.get_fund_values(
            soup, 6, 'fundHeader_value', ['$US', ','])
        self.write_json(data)
Enter fullscreen mode Exit fullscreen mode

That's it! You'll notice the only changes from the original script are importing the BaseOperator class from Airflow and inheriting it. We then "extend" it by adding to the parameters it takes (it'll get all of the DAG/Task context from the BaseOperator, so we can't use BaseOperator protected parameters) and override the execute method with our own. It's incredibly easy to make an Operator.

In your Airflow setup, in your Airflow Home next to your dags directory, put this custom operator in a directory called plugins. Just like dags, plugins will be in your PYTHONPATH, so you can import it into a DAG like this:

from bs4 import BeautifulSoup
import pendulum
from datetime import timedelta

from airflow import DAG
from airflow.decorators import task
from fund_scraper_operator import FundScraperOperator


# Default args used when create a new dag
args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'schedule_interval': '@daily',
}

with DAG(
    dag_id='Functional_Sprott_Scraper',
    schedule_interval='5 20 * * 1-6',
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    default_args=args,
    render_template_as_native_obj=True,
    tags=['price', 'scraper']
) as dag:

    scrape_task = FundScraperOperator(
                    url='https://sprott.com/investment-strategies/physical-commodity-funds/uranium/')
Enter fullscreen mode Exit fullscreen mode

The Airflow community is very vibrant, so you'll find Operators to interact with a lot of services. Want to write to S3? There are hooks and operators supported by the community for that. Do you want to query a PSQL database? There are also hooks/operators for that already written. That's one of the reasons why you don't want to create monolith Operators like we have in this article. Before we separate/abstract out our tasks like with did with abstract classes, it's important to talk about Airflow hooks.

As always, the code is up in Github here.

Top comments (0)