DEV Community

CincyBC
CincyBC

Posted on

Custom Airflow Hooks

In the last post, we took our monolith scraper class and converted it to a custom Airflow Operator. In Airflow, Operators execute tasks and if you want to create an interface with an outside service, you do that via Airflow Hooks. Just as there are many off-the-shelf Provider Operators in Airflow developed by the community, there are several off-the-shelf Hooks to interface with services.

For example, there is a PostgresOperator to run Postgres Queries. If you look under the hook of the Operator, it is using a PostgresHook to run the queries. If you are creating your own Python functions or Operators in Airflow, you can import just the Hook to use that interface.

Why is this important for us?

Back in the beginning of this series, I broke down web scraping on a continuum from API to manually scraping websites. Interacting with an API would be an interface and (there are examples out there how you could build a Custom Hook to interact with an external API)[https://docs.astronomer.io/learn/airflow-importing-custom-hooks-operators]. Interacting with the "hidden APIs" I described would also be an interface, albeit, less of a traditional one.

Hooks generally will take a conn_id argument where you can put variables including sensitive variables and use them in your Hooks. Here is the example from Astronomer's website:

# import the hook to inherit from
from airflow.hooks.base import BaseHook


# define the class inheriting from an existing hook class
class MyHook(BaseHook):
    """
    Interact with <external tool>.
    :param my_conn_id: ID of the connection to <external tool>
    """

    # provide the name of the parameter which receives the connection id
    conn_name_attr = "my_conn_id"
    # provide a default connection id
    default_conn_name = "my_conn_default"
    # provide the connection type
    conn_type = "general"
    # provide the name of the hook
    hook_name = "MyHook"

    # define the .__init__() method that runs when the DAG is parsed
    def __init__(
        self, my_conn_id: str = default_conn_name, *args, **kwargs
    ) -> None:
        # initialize the parent hook
        super().__init__(*args, **kwargs)
        # assign class variables
        self.my_conn_id = my_conn_id
        # (optional) call the '.get_conn()' method upon initialization
        self.get_conn()

    def get_conn(self):
        """Function that initiates a new connection to your external tool."""
        # retrieve the passed connection id
        conn_id = getattr(self, self.conn_name_attr)
        # get the connection object from the Airflow connection
        conn = self.get_connection(conn_id)

        return conn

    # add additional methods to define interactions with your external tool
Enter fullscreen mode Exit fullscreen mode

You'll notice this code will run the get_conn() method upon instantiation if you leave the "optional" code in there.

If you were to be interacting with a website that required authentication, you could use s = requests.Session() and return that session where it has return conn in the code above. Everything you need to make a custom hook is there for you to adapt.

For our purposes though, we don't need anything fancier than a hook that will make a web call for us and return the contents of the webpage in the form of beautifulsoup. So here is our code:

# import the hook to inherit from
from airflow.hooks.base import BaseHook
from bs4 import BeautifulSoup
import requests


# define the class inheriting from an existing hook class
class ScraperHook(BaseHook):
    """
    Interact with external websites.
    :param url: URL to call
    """

    # provide the name of the hook
    hook_name = "ScraperHook"

    # define the .__init__() method that runs when the DAG is parsed
    def __init__(self, url: str, parser: str = "html.parser", *args, **kwargs) -> None:
        # initialize the parent hook
        super().__init__(*args, **kwargs)
        # assign class variables
        self.url = url
        self.parser = parser

    def get_website(self):
        """Function that returns content of website."""

        r = requests.get(self.url)
        if r.status_code == 200:
            soup: BeautifulSoup = BeautifulSoup(r.content, self.parser)
            return soup
        else:
            return r.status_code
Enter fullscreen mode Exit fullscreen mode

If you recall the web_call function from our functional DAG, it's the same code, but now it lives in the Airflow Hook. This aligns with Airflow best practices of putting your interfaces in hooks!

When we instantiate this (be it in a taskflow or in a custom operator), we can just call it like this:

from hooks.scraper_hook import ScraperHook

soup = ScraperHook(url=<the url>).get_website()
Enter fullscreen mode Exit fullscreen mode

As always, the code is online here.

Top comments (0)