DEV Community

CincyBC
CincyBC

Posted on

Final: How to Structure the DAG

In the previous posts, we've talked about how easy it is to create custom hooks and operators in an object oriented style of Airflow. Airflow itself is moving more in the direction of Functional Programming with taskflow decorators turning any Python function into a task. However, it's still valuable to learn and use Operator/Hook/Sensor classes to build extendable tools to plug into your DAGs as needed. As I said in a previous post, the Airflow community is very active, and there are Hooks and Operators already out there to work with pretty much any tool you want. So, in all likelihood, you won't need to build everything you need from scratch.

If we were to develop our FundScraper to its ultimate conclusion using Data Engineering best practices, this is how I would structure it:

  1. EmptyOperator Start Task (not actually executed, but good to have if you want to add tasks later)
  2. Custom Extract Operator - This would be similar to the one we created a couple of articles ago, but it would write the entire contents of the html to a file to create a "snapshot" of the website for the day we scraped. We would write html to an S3 bucket using the s3 hook that the AWS community has developed.
  3. Custom Transform Operator - This would be the rest of our Custom Operator we created that extracted the values we wanted, but this time, it would pull from the html file we created with the Extract Operator. The flow would be s3 hook to read the html file, "transformations" to extract the values we want, and finally s3 hook to write the files as JSON back to a new bucket or prefix in s3.
  4. PostgresOperator - Use the off the shelf PostgresOperator to fun a COPY INTO query reading our s3 file and put it in an RDS instance in S3.
  5. EmptyOperator End Task (again, not actually run)

While the Custom Extract Operator cannot be idempotent since the website will change, we can still make the other tasks idempotent by separating them out. What benefit does this have?

Let's say the website changes and the values we pull aren't in the same spot as they used to be. If we had the "extract/transform/load" all in one task and didn't write until the end, then everything would fail and we'd lose the state of the website that day forever (unless a website like waybackmachine just happened to be storing it). Writing it to an html file in cheap S3 storage solves that. We would then be able to change the transformation to find the values we want and rerun all of the failed tasks to catch up.

Why not just include a couple of "checkpoints" in the code writing them in a single task? Why do we need 3 tasks?

Think of the tasks as checkpoints. We can then pick up at any of the checkpoints that failed making modifications.

Can this be applied to more traditional ETL jobs moving data from a transactional database to an analytical database? Absolutely. Just replace the extract task from a webscraper to an Operator written to interact with the flavor of your transactional database.

There is more we can do with all of this....More ways for us to abstract things out from the base DAG and allow us to create as many scrapers as we'd like via yaml (as I did with the abstract config), but I'm going to close this series here. It's been fun showing how to take a small scraping script you would run with a cron job that couldn't be extended and had zero observability and turned it into a robust ETL process with fail safes, observability, and extendability. I hope you enjoyed it. Let me know if you'd like me to extend this series!

Top comments (0)