DEV Community

Cover image for Building a distributed workflow engine from scratch
Arik
Arik Subscriber

Posted on • Updated on

Building a distributed workflow engine from scratch

I've been somewhat obsessed with creating workflow engines for the better part of a decade. The idea of constructing a 'mega' machine from an army of smaller machines never seems to get old for me.

At their core, workflow engines are responsible for executing a series of tasks (typically referred to as a 'job', 'pipeline' or 'workflow') over a cluster of machines (typically referred to as 'workers' or nodes) as quickly and as efficiently as possible.

Building a workflow engine comes with a bunch of interesting challenges. Here's a short and highly non-exhaustive list:

  1. What do you use to author workflows? Do you use a general-purpose programming language? a configuration-type language like JSON or YAML or do you roll your own DSL (Domain Specific Language)?

  2. How do we decide which tasks go to which workers such that busy workers are not overloaded while others are idle?

  3. How do we deal with the requirement to scale up or down the capacity in response to fluctuations in computational demand?

  4. How do we deal with intermittent task failures?

  5. How do we deal with worker crashes?

  6. How do we deal with a situation where we have more tasks to execute than available capacity?

Let's put some rubber on the road

The first time I needed to actually build a workflow engine was while working for a video streaming startup. At the time, the company was outsourcing all its video processing needs to another company.

The existing process was slow, expensive and brittle. The company was regularly getting new content (movies, trailers, bonus video material, closed captions etc.) and we needed a way to quickly process this content in order to get it up on the service for customers to enjoy.

Moreover, the existing process was quite rigid and any changes to it (e.g. to introduce a new audio technology) took months or were simply not possible. I suggested to build a proof-of-concept that would allows us to bring the work in-house and luckily my managers were open to the idea.

At this point, you're probably asking yourself why would you want to be build one when there are a million open-source and commercial options out there?

True, there are many options out there. And we looked at a good number before we made the decision to build one ourselves. But at least at the time (circa 2014), many of the existing options were either not designed for a distributed environment, were designed more particularly for data-processing use cases, were seemingly abandoned, or simply felt over-engineered to our taste.

The initial iteration of the workflow engine allowed us to start processing 'low-risk' content such as trailers, and as we gained confidence in the new system we slowly phased out the old process completely.

Later on, when a co-worker left to another media company that needed a similar system, he asked me if I'd like to come over and do it all over again from scratch. Naturally, I agreed. Our 2.0 was similar in spirit but a lot of the lessons learned from the old design were fixed in the new design.

Meet Tork

After building two proprietary workflow engines for highly specialized use cases I had an itch to see if other companies - possibly with vastly different use cases - could also benefit from a similar system. So I decided to build an open-source version of it.

Architecture

Tork is a Golang-based implementation which is very similar in spirit to its closed-source predecessors. It can run in a 'standalone' mode on a laptop or deployed to a huge cluster of machines depending on your need.

The main components of Tork are:

  • Coordinator: responsible for managing the lifecycle of jobs and tasks, routing tasks to the right workers and for dealing with task execution errors.

  • Worker: responsible for executing tasks according to the instructions of the Coordinator. Workers are stateless so it's easy to add and remove them as demand for capacity changes.

  • Broker: the means of communication between the Coordinator and worker nodes.

  • Datastore: holds the state for tasks and jobs.

  • Runtime: tasks are executed by means of a runtime which translates the task into actual executable. Currently only Docker is supported due to its ubiquity and large library of images, but there are plans to add support for Podman and WASM in the future.

Hello world

Tork jobs are authored in YAML:



# hello.yaml
---
name: hello job
tasks:
  - name: say hello
    image: ubuntu:mantic # docker image
    run: | # arbitrary script
      echo -n hello world 
  - name: say goodbye
    image: ubuntu:mantic
    run: |
      echo -n bye world


Enter fullscreen mode Exit fullscreen mode

and submitted through the API:



JOB_ID=$(curl \
  -s \
  -X POST \
  --data-binary @hello.yaml \
  -H "Content-type: text/yaml" \
  http://localhost:8000/jobs | jq -r .id)


Enter fullscreen mode Exit fullscreen mode

To query for a job's status:



curl -s http://localhost:8000/jobs/$JOB_ID | jq .

{
  "id": "ed0dba93d262492b8cf26e6c1c4f1c98",
  "state": "COMPLETED",
  ...
}


Enter fullscreen mode Exit fullscreen mode

Tasks can also specify auto-retry count, timeout, enforce CPU and RAM limits and do other fun things such as conditionals, loops and parallel execution.

If you're interested in checking it out, you can find the project on Github:

Top comments (11)

Collapse
 
rmcdaniel profile image
Richard McDaniel

Welcome to the party! I have made Laravel Workflow which is built entirely on top of queued jobs. A workflow will queue and run multiple times, as the workflow runs activties and makes progress. There is no dedicated coordinator, broker, etc.

github.com/laravel-workflow/larave...

I'm so excited to see how workflows are gaining popularity. Every new compitetor is more validation.

Collapse
 
ben profile image
Ben Halpern

This is interesting

Collapse
 
dsysd_dev profile image
Dsysd Dev

I was building something like this myself, great to know about this repo !!

Collapse
 
bvandewe profile image
Nos

Interesting... Will check it out. Interested to see how it compares to github.com/serverlessworkflow/synapse

Why not show a sample workflow definition and DSL?

Collapse
 
acoh3n profile image
Arik

That's a good callout. I've updated the article to include an example. Thanks!

Collapse
 
vvm3 profile image
steve

This looks cool. As a long-time user of Nextflow and CWL, and a big fan of Go, I have a few thoughts on it. I am currently a very heavy proponent of Nextflow ; even if you are building your own system, you might be interested to spend time trying it out and getting a feel for how it works, in order to inform your own design choices. Some really relevant points from that system,

  • it uses a DSL built on Groovy (which is built on Java); this lets you define your workflow as code. This can be a blessing or a curse, depending on how much you like programming in the Java-ecosystem
  • CWL on the other hand, uses a YAML-based workflow definition, like what you describe here. In theory it would be a simpler implementation (you may even consider modifying Tork to run CWL workflows), but the big pitfall of YAML is that its incredibly painful to write (its very very easy to use too many or too few indentations and completely break your workflow description), and trying to get embedded dynamic programmatic code execution is a nightmare
  • as you mention, you need a "runtime"; sounds like Tork is its own runtime then? This is great, until your runtime lacks a feature you need. This was a common problem with CWL, which requires an external runtime (cwl-tool, Toil, Cromwell, etc), which all have varying levels of features and support. Nextflow includes its own runtime, but then the onus is on you (the developer) to build out all the feature for running for all users' use-cases.
  • job scheduling; as you describe, this is a pretty big task, most workflow frameworks similarly have a "local" mode for scheduling, but if you really want to use big infra, it is common to instead submit to external schedulers such as AWS Batch or on-prem or cloud-based SLURM or LSF grid schedulers. If you are going to go this route, might be important to have this in mind from the beginning because you might find your current job definition and execution paradigm might not translate as well. Nextflow in particular has a crazy simple method for handling this in a modular portable way; it dynamically generates bash shell scripts per-job, saves them to disk (or S3, etc), and then runs them in the task execution environment (e.g. a Docker container running on an EC2), with all the file input & output staging embedded in the shell scripts, all wrapped around your "echo foo" pipeline task. CWL tends to perform similarly but AFAICT it instead saves all these things into a Python job store that gets executed inside the task execution environment. These aspects are pretty important to understand, as the user, because when things break, its a lot more complicated to understand and debug.
  • staging methods for input/output files between tasks becomes really important, a lot of these frameworks tend to use a "work" directory of some sort or a file-store in order to fully isolate tasks; a Docker container is not enough, if your task fails half-way and leaves a corrupted output which you then "resume" and accidentally pick up and pass through to the next task in the workflow. From the docs for Tork it was not clear how robust the tasks' working directories are and how files are passed from one task to the next safely.

Tork looks pretty cool and I will be sure to keep an eye on it. Go has a lot of features that were sorely missed in the workflow-engine-scene of circa~2014 so its exciting to see what kind of innovations might be possible with it. Ultimately a lot of the most important features of these engines comes from their robustness on diverse infrastructure, and ease of use for complex workflows (hundreds of tasks running with full concurrency + parallel execution + async handling).

Collapse
 
acoh3n profile image
Arik • Edited

Thanks Steve! I really appreciate the feedback and recommendation.

Some thoughts:

  • My main reason for choosing YAML for Tork was that YAML comes with almost no baggage (whitespace controversy aside πŸ™‚). Nothing in YAML means anything unless you give it a meaning. This is unlike -- say Python (or any other high-level language, not trying to pick on Python) -- where there's a significant learning curve. Moreover, many of the constructs of most high-level languages don't make sense in a distributed environment because the language was simply not designed for that. Even commonplace things such as loops and if statements (not to mention 3rd party libraries) don't directly translate to a distributed environment. So you end up having to have to use a subset of the language and it just ends up being a sugarcoat for your APIs. YAML comes "naked", familiar to many, is a no-brainer to learn even for non-programmers and has tons of support in terms of parsing, schema validation etc. To mitigate whitespace-error and typo-related issues, I'm working on adding autocomplete support to my pipeline editor on the Tork Web UI.

  • The runtime in Tork is essentialy Docker (with plans to add support for WASM and Podman in the future if there's a demand for it). I was actually trying to get out of the world where each type of task has to be defined in Go code which would require recompiling the engine, creating custom forks and having knowledge of Go programming -- which all felt too high-friction and could benefit less people. Using Docker as the runtime lets users write just about anything they want using one of Docker's many images and removes the need for recompilation of the engine. My intention here is that Tork only provide the necessary infrastructure to execute tasks in a distributed manner and not worry about WHAT you are running in these tasks.

Example:

  name: some job
  tasks:
  - name: do some calculation in python
    image: python:3
    var: pythonResult
    run: |
      python -c 'print(1+1)' > $TORK_OUTPUT

  - name: do some calculation in ruby
    image: ruby:latest
    var: rubyResult
    run: |
      ruby -e 'x = Time.now; puts x;' > $TORK_OUTPUT

  - name: get the video format
    var: videoFormat
    image: jrottenberg/ffmpeg:3.4-alpine
    run: |
      ffprobe \
        -v quiet \
        -show_format \
        -print_format json \
        http://some/video/file.mp4 > $TORK_OUTPUT
Enter fullscreen mode Exit fullscreen mode

This is a highly contrived example. But the point is that the user can write their scripts in whatever language they're comfortable with rather than being limited to Go.

  • I was trying to not be overly opinionated about storage because there are many options out there and because I wanted to keep the workers stateles for the sake of scalability and resiliency. Any local state (e.g. using volumes) that the tasks generate is automatically deleted at the end of the task. The solution for storing state is similar to how Tork handles runtime, that is get out of the way. What I usually do in my pipelines is use some kind of external object storage (S3,Minio etc.) and then use a post task to upload the state after the task executes. e.g:
  name: convert a video
  tasks:
  - name: convert the first 5 seconds of a video
    image: jrottenberg/ffmpeg:3.4-alpine
    volumes:
      - /tmp
    run: |
      ffmpeg -i https://some/input/file.mov -t 5 /tmp/output.mp4
    post:
        - name: upload the chunk to minio
          image: "amazon/aws-cli:2.13.10"
          env:
            AWS_ACCESS_KEY_ID: "..."
            AWS_SECRET_ACCESS_KEY: "..."
            BUCKET_NAME: "some-bucket"
          run:
            aws s3 cp /tmp/output.mp4 s3://$BUCKET_NAME/output.mp4
Enter fullscreen mode Exit fullscreen mode

Hope this helps

Collapse
 
barocsi profile image
barocsi

Is Apache Airflow suitable for such workflow engine solution?

Collapse
 
acoh3n profile image
Arik

I think that there's definitely an overlap in terms of goals between Airflow and Tork -- both designed as a general-purpose workflow engine. But the way they go about it is quite different. For example, workflows in Airflow are written in Python and require redeployment along with Airflow when they change. In Tork, on the other, workflows are written in plain YAML and do not require redeployment of the engine. Both approaches have pros and cons. I think that at the end of the day both engines are suitable for many use cases so it really boils down to personal preference.

Collapse
 
algot profile image
AlgoT

Why not use a message queue like RabbitMQ, or in bigger organisations, Kafka?

Collapse
 
acoh3n profile image
Arik

The default broker implementation is in memory which is suitable for experimentation.

But Tork does support RabbitMQ for a distributed setup:

github.com/runabol/tork#running-in...