DEV Community πŸ‘©β€πŸ’»πŸ‘¨β€πŸ’»

Cover image for Data pipelines with Spotify's Luigi
Michele Pangrazzi
Michele Pangrazzi

Posted on • Updated on

Data pipelines with Spotify's Luigi

Hi!

At Wonderflow we're doing a lot of ML / NLP using Python and recently we are enjoying writing data pipelines using Spotify's Luigi.

It's quite easy to find online lots of posts about Luigi, especially comparisons with other tools like Apache AirFlow.

In this post I'll try to explain how we're using Luigi at Wonderflow, what issues we found and how we handled them.

Stateless data pipelines are bad

When it comes to do computationally expensive analysis on data, state management is very important.

If you have to do a generic computation wrapped in a method like the following:

def compute():

    # step 1
    data = fetch_data(data, params)

    # step 2
    data_1 = heavy_computation_1(data, a=1, b=2, c=3)

    # step 3
    data_2 = heavy_computation_2(data_1, x=1, y=2, z=3)

    return data_2
Enter fullscreen mode Exit fullscreen mode

There are 3 main steps here and 2 of them are quite heavy to compute.

Imagine now that you are tuning step 3 x, y and z parameters. And you find yourself running this method over and over with different parameter values and check every time if the output suits you.

This might seem the fastest way to do it, but you're actually wasting a lot of time: to check heavy_computation_2 result you're continuously run fetch_data and heavy_computation_1 on (probably) the same data.

Although you can apply workarounds, even if you end up with a complete pipeline scheduled by an orchestrator, you might end up doing many useless things (i.e. doing the same things on the same data many times)

We ran into this kind of issues and we need to find a clean workaround.

Luigi to the rescue

As you may already found out, Luigi has been designed to solve issues like the one above.

Let's rewrite the task as a Luigi pipeline:

import luigi
from luigi.local_target import LocalTarget
from fetching import fetch_data
from processing import heavy_computation_1, heavy_computation_2
from json import dumps, loads

class FetchData(luigi.Task):
    def run(self):
        data = fetch_data()

        with self.output().open("w") as outfile:
            outfile.write(dumps(data))

    def output(self):
        return LocalTarget("output/data.json")

class HeavyComputationOne(luigi.Task):
    a = luigi.IntParameter(default=42)
    b = luigi.IntParameter(default=42)
    c = luigi.IntParameter(default=42)

    def requires(self):
        return FetchData()

    def run(self):
        with self.input().open('r') as infile:
            input_data = loads(infile.read())

        data = heavy_computation_1(input_data, a=self.a, b=self.b, c=self.c)

        with self.output().open("w") as outfile:
            outfile.write(dumps(data))

    def output(self):
        return LocalTarget(f"output/processed_data_1-{self.a}-{self.b}-{self.c}.json")


class HeavyComputationTwo(luigi.Task):
    x = luigi.IntParameter(default=42)
    y = luigi.IntParameter(default=42)
    z = luigi.IntParameter(default=42)

    def requires(self):
        return HeavyComputationOne(a=1, b=2, c=3)

    def run(self):
        with self.input().open('r') as infile:
            input_data = loads(infile.read())

        data = heavy_computation_2(input_data, x=self.x, y=self.y, z=self.z)

        with self.output().open("w") as outfile:
            outfile.write(dumps(data))

    def output(self):
        return LocalTarget(f"output/processed_data_2-{self.x}-{self.y}-{self.z}.json")
Enter fullscreen mode Exit fullscreen mode

Luigi Task structure is quite simple, you simply write a Python (sub)class with 3 methods:

  • requires() which defines task requirements (if any)
  • run() which contains the task business logic
  • output() which defines the kind of output (in this case files on local filesystem, but they could be e.g. files on S3 or many more things)

As you can see, each task has a defined output file which is reflecting input parameters. This is needed since in Luigi each task is idempotent.

It means that Luigi will not run the same task with same output twice.

Now you can run the HeavyComputationTwo from CLI (yes, Luigi will generate automatically the CLI for you) task N times, using different values of x, y and z parameters. Luigi will run the requirements (HeavyComputationOne and FetchData) only once.

So no more wasting time running unneeded tasks :-) And you can focus on tuning your task's logic.

Moreover, this makes the pipeline stateful, fixing the issue we had with the compute() method.

After trying Luigi a bit, we initially tough we found the silver bullet for our needs, but as everything it has pros and cons.

Let's go through some of them.

Β 1. Luigi pipelines are hard to test

You may have noticed that Luigi is heavily coupled with your code (e.g. the main class task is a subclass of a Luigi's class). Mainly for this reason, lots of people are saying that Luigi pipelines are hard to test.

Since we're doing TDD on almost any kind of software in Wonderflow, we wanted to make sure that TDD is possible even using Luigi.

I think it's true that Luigi is heavily coupled with your code, but I don't think this makes pipelines or tasks hard to test.

Here's the testing pattern we're using.

When you usually write a test, you follow the Given-When-Then structure:

def test_something():

# Given an initial situation

# When something happens

# Then there's an outcome
Enter fullscreen mode Exit fullscreen mode

Imagine we want to test the FetchData task (ideally we write the test first!):

import luigi
from tasks import FetchData
from json impo

def setup_db():
   # here we setup a local test DB with test data
   pass

def test_fetch_data_task():
    setup_db()
    task = FetchData()

    assert luigi.build([task], local_scheduler=True)
    assert task.output().exists()

    data = loads(task.output().open("r").read())

    assert len(data) > 0
    # other assertions (if needed)
Enter fullscreen mode Exit fullscreen mode

To recap:

  1. We setup a local database and instantiate the FetchData task
  2. We run the task and ensure the output exists
  3. We read the task's output and make needed assertions on it

This is quite effective and nice to read, perfectly following the Given-When-Then pattern.

You may have noticed that this is an integration test, which is indirectly covering the fetch_data() method.

As a best practice, you need to do unit test on all the methods you're using inside the main run() one, since they also can be used outside a Luigi task.

So, since on FetchData you're calling fetch_data() you need to unit-testing it as well :-)

If all the methods called inside the task are unit tested, and you test the whole task (or pipeline, if task has requirements) as described above, you're basically doing a nice unit + integration test combo, which is usually enough to avoid code regression.

Β 2. Chaining tasks leads to parameters repetition

Considering the pipeline above:

FetchData -> HeavyComputationOne -> HeavyComputationTwo

You may have noticed that I've hardcoded a, b and c params on HeavyComputationTwo task requirements.

So, if you have to run HeavyComputationTwo but you need e.g. to specify different a, b or cΒ values other than x, y and z ones, do you need to declare again the HeavyComputationOne parameters again?

class HeavyComputationTwo(luigi.Task):
    a = luigi.IntParameter(default=42)
    b = luigi.IntParameter(default=42)
    c = luigi.IntParameter(default=42)

    x = luigi.IntParameter(default=42)
    y = luigi.IntParameter(default=42)
    z = luigi.IntParameter(default=42)

    # ...
Enter fullscreen mode Exit fullscreen mode

The answer is: of course not. If you do that, the code will be a lot less readable and if params changes, you have to change them in every chained task.

Fortunately, Luigi comes with a very handy inherits method from its utilΒ module.

So you simply have to decorate the HeavyComputationTwo method specifying that it inherits from HeavyComputationOne, in this way:

from tasks import HeavyComputationOne
from luigi.util import inherits

@inherits(HeavyComputationOne)
class HeavyComputationTwo(luigi.Task):
    x = luigi.IntParameter(default=42)
    y = luigi.IntParameter(default=42)
    z = luigi.IntParameter(default=42)

    # ...
Enter fullscreen mode Exit fullscreen mode

So, a, b and c are now inherited from HeavyComputationOne, and there's no need to re-declare them anymore.

Β 3. Luigi doesn't scale

As described in the docs, there are some limitations by design.

According to Luigi's execution model, authors say that Luigi scheduler could hang if you scheduled more than a few thousands of jobs at once.

It's also true that Luigi is designed to handle well batch processing jobs. So if you need to run lots of pipelines on an almost real-time situation, it might be not the best tool to use. Better to read carefully the docs first!

We are running only batch processing pipelines, mainly related to ML / NLP tasks and it suits us quite well.

If you really need to launch many tasks at once, there's also a Kubernetes Job wrapper plugin available.

4. There's no in-task parallelism

If having N different workers does not suits you and the specific task is very complex in term of computation, you may delegate the required parallelism to other tools (if multiprocessing is not enough.

We have tasks which actually require lots of different Spacy language models to be loaded at once, and we load them on many processes at once.

To do that, we are efficiently using Dask, simply creating on-demand local (or remote) clusters on task run() method:

import luigi
import dask.bag as db

class HeavyTask(luigi.Task):
    def run():
    # ...

    with create_local_cluster(memory_limit="4GB", n_workers=6) as cluster, Client(cluster):
        bag = db.from_sequence(data)

        output +=bag.map_partitions(heavy_method, arg1=arg1)
            .compute()
        )
Enter fullscreen mode Exit fullscreen mode

Where create_local_cluster() simply wraps the related Dask method:

from dask.distributed import LocalCluster

def create_local_cluster(n_workers: int = 4, **kwargs) -> LocalCluster:
    return LocalCluster(
        n_workers=n_workers,
        worker_dashboard_address=None,
        **kwargs,
    )
Enter fullscreen mode Exit fullscreen mode

With Dask you can create local, SSH, or Kubernetes-based clusters according to your needs. It may require a bit of tuning but it's more than enough to handle parallelism in our common use cases.

5. There are no triggers for pipelines

This is true, but I would not necessarily see this as a drawback. There are many options here like:

  • API
  • Crontab
  • SSH (e.g. a Jenkins pipeline which fires a Luigi pipeline via SSH)

Conclusions

If you have a small team which is already proficient in Python, Luigi's relatively short learning curve and benefits could be ideal for your data pipelines, especially if you mainly run batch processing jobs.

Moreover, configure and deploy the Luigi's Scheduler on a server / pod for production use is easy, while it might be not for other similar tools like Apache AirFlow.

Top comments (0)

Want to rep DEV and be comfy at the same time?

Check out our classic DEV shirt β€” available in multiple colors.