Recently, I've been using Luigi at work to build data pipelines. In this post, I'd like to share 3 things I like about Luigi with y'all.
All Luigi tasks can be triggered on the command-line via something like:
$ luigi --module ml.spark SparkModelInference --project nlp
I can easily pass in a variety of parameters into the Luigi task. This meant I no longer need to write verbose
argparse code to create a command-line interface. Tasks can easily be triggered on the command-line for testing and also in production, in case my
cron jobs have failed and I have to trigger them manually.
Achieving idempotency for tasks is as simple as pointing the Luigi target to a specific file name. When that task is triggered, Luigi will check if that file exists. If that file exists, which ideally that file would be created by that Luigi task, then Luigi will determine that the task is "DONE" and will not run that task.
I can touch on idempotency a bit because this concept is quite important. An operation is considered idempotent if it can be applied multiple times without changing the result. For example, if you were loading data for this month's batch, then triggering the task multiple times should not introduce multiple copies of this month's batch into the database. We want this ability to trigger a task multiple times because a task might intermittently fail due to random reasons like the network is down. So re-triggering allows your data pipeline to be tolerant against these intermittent failures.
Atomicity is another important concept to learn when you're building your data pipeline. If an atomic task fails, it should create no files at the output destination. This means the next time the Luigi task is re-triggered, the output destination will still be empty and clean. So the re-triggered task is able to do what it needs to do. This is another mechanism that helps your data pipeline to tolerate intermittent failures and "fixes itself".
This feature is not as well documented within Luigi's docs. I had to dig into some modules and source code to learn about this. In Luigi, you can accomplish atomicity for local files by:
class MyTask(luigi.Task): def output(self): return luigi.LocalTarget(...) def run(self): with self.output().temporary_path() as tmp_path: run_something(output_path=tmp_path)
This way, atomicity is achieved by writing output to a temporary directory. Then, once the write is completed, there is a rename of the temporary directory to the actual output destination. The rename operation is atomic on both the Linux filesystem and HDFS.
While I'm mentioning great things about Luigi in this post, there are also things I dislike about Luigi, which I will elaborate on in another post. Even though Luigi is not as hot as Airflow nowadays, I believe Luigi is still a great framework for building data pipelines. It focuses on enabling you to build a data pipeline that is robust and fault-tolerant, through fundamental principles like idempotency and atomicity.
If you have any questions or if there are any particular things that you like/dislike about Luigi, please feel free to comment below :) Your comments and feedbacks are highly appreciated.