DEV Community

Cover image for Building a distributed workflow engine from scratch

Building a distributed workflow engine from scratch

Arik on August 25, 2023

I've been somewhat obsessed with creating workflow engines for the better part of a decade. The idea of constructing a 'mega' machine from an army ...
Collapse
 
rmcdaniel profile image
Richard McDaniel

Welcome to the party! I have made Laravel Workflow which is built entirely on top of queued jobs. A workflow will queue and run multiple times, as the workflow runs activties and makes progress. There is no dedicated coordinator, broker, etc.

github.com/laravel-workflow/larave...

I'm so excited to see how workflows are gaining popularity. Every new compitetor is more validation.

Collapse
 
ben profile image
Ben Halpern

This is interesting

Collapse
 
dsysd_dev profile image
Dsysd Dev

I was building something like this myself, great to know about this repo !!

Collapse
 
bvandewe profile image
Nos

Interesting... Will check it out. Interested to see how it compares to github.com/serverlessworkflow/synapse

Why not show a sample workflow definition and DSL?

Collapse
 
acoh3n profile image
Arik

That's a good callout. I've updated the article to include an example. Thanks!

Collapse
 
vvm3 profile image
steve

This looks cool. As a long-time user of Nextflow and CWL, and a big fan of Go, I have a few thoughts on it. I am currently a very heavy proponent of Nextflow ; even if you are building your own system, you might be interested to spend time trying it out and getting a feel for how it works, in order to inform your own design choices. Some really relevant points from that system,

  • it uses a DSL built on Groovy (which is built on Java); this lets you define your workflow as code. This can be a blessing or a curse, depending on how much you like programming in the Java-ecosystem
  • CWL on the other hand, uses a YAML-based workflow definition, like what you describe here. In theory it would be a simpler implementation (you may even consider modifying Tork to run CWL workflows), but the big pitfall of YAML is that its incredibly painful to write (its very very easy to use too many or too few indentations and completely break your workflow description), and trying to get embedded dynamic programmatic code execution is a nightmare
  • as you mention, you need a "runtime"; sounds like Tork is its own runtime then? This is great, until your runtime lacks a feature you need. This was a common problem with CWL, which requires an external runtime (cwl-tool, Toil, Cromwell, etc), which all have varying levels of features and support. Nextflow includes its own runtime, but then the onus is on you (the developer) to build out all the feature for running for all users' use-cases.
  • job scheduling; as you describe, this is a pretty big task, most workflow frameworks similarly have a "local" mode for scheduling, but if you really want to use big infra, it is common to instead submit to external schedulers such as AWS Batch or on-prem or cloud-based SLURM or LSF grid schedulers. If you are going to go this route, might be important to have this in mind from the beginning because you might find your current job definition and execution paradigm might not translate as well. Nextflow in particular has a crazy simple method for handling this in a modular portable way; it dynamically generates bash shell scripts per-job, saves them to disk (or S3, etc), and then runs them in the task execution environment (e.g. a Docker container running on an EC2), with all the file input & output staging embedded in the shell scripts, all wrapped around your "echo foo" pipeline task. CWL tends to perform similarly but AFAICT it instead saves all these things into a Python job store that gets executed inside the task execution environment. These aspects are pretty important to understand, as the user, because when things break, its a lot more complicated to understand and debug.
  • staging methods for input/output files between tasks becomes really important, a lot of these frameworks tend to use a "work" directory of some sort or a file-store in order to fully isolate tasks; a Docker container is not enough, if your task fails half-way and leaves a corrupted output which you then "resume" and accidentally pick up and pass through to the next task in the workflow. From the docs for Tork it was not clear how robust the tasks' working directories are and how files are passed from one task to the next safely.

Tork looks pretty cool and I will be sure to keep an eye on it. Go has a lot of features that were sorely missed in the workflow-engine-scene of circa~2014 so its exciting to see what kind of innovations might be possible with it. Ultimately a lot of the most important features of these engines comes from their robustness on diverse infrastructure, and ease of use for complex workflows (hundreds of tasks running with full concurrency + parallel execution + async handling).

Collapse
 
acoh3n profile image
Arik • Edited

Thanks Steve! I really appreciate the feedback and recommendation.

Some thoughts:

  • My main reason for choosing YAML for Tork was that YAML comes with almost no baggage (whitespace controversy aside 🙂). Nothing in YAML means anything unless you give it a meaning. This is unlike -- say Python (or any other high-level language, not trying to pick on Python) -- where there's a significant learning curve. Moreover, many of the constructs of most high-level languages don't make sense in a distributed environment because the language was simply not designed for that. Even commonplace things such as loops and if statements (not to mention 3rd party libraries) don't directly translate to a distributed environment. So you end up having to have to use a subset of the language and it just ends up being a sugarcoat for your APIs. YAML comes "naked", familiar to many, is a no-brainer to learn even for non-programmers and has tons of support in terms of parsing, schema validation etc. To mitigate whitespace-error and typo-related issues, I'm working on adding autocomplete support to my pipeline editor on the Tork Web UI.

  • The runtime in Tork is essentialy Docker (with plans to add support for WASM and Podman in the future if there's a demand for it). I was actually trying to get out of the world where each type of task has to be defined in Go code which would require recompiling the engine, creating custom forks and having knowledge of Go programming -- which all felt too high-friction and could benefit less people. Using Docker as the runtime lets users write just about anything they want using one of Docker's many images and removes the need for recompilation of the engine. My intention here is that Tork only provide the necessary infrastructure to execute tasks in a distributed manner and not worry about WHAT you are running in these tasks.

Example:

  name: some job
  tasks:
  - name: do some calculation in python
    image: python:3
    var: pythonResult
    run: |
      python -c 'print(1+1)' > $TORK_OUTPUT

  - name: do some calculation in ruby
    image: ruby:latest
    var: rubyResult
    run: |
      ruby -e 'x = Time.now; puts x;' > $TORK_OUTPUT

  - name: get the video format
    var: videoFormat
    image: jrottenberg/ffmpeg:3.4-alpine
    run: |
      ffprobe \
        -v quiet \
        -show_format \
        -print_format json \
        http://some/video/file.mp4 > $TORK_OUTPUT
Enter fullscreen mode Exit fullscreen mode

This is a highly contrived example. But the point is that the user can write their scripts in whatever language they're comfortable with rather than being limited to Go.

  • I was trying to not be overly opinionated about storage because there are many options out there and because I wanted to keep the workers stateles for the sake of scalability and resiliency. Any local state (e.g. using volumes) that the tasks generate is automatically deleted at the end of the task. The solution for storing state is similar to how Tork handles runtime, that is get out of the way. What I usually do in my pipelines is use some kind of external object storage (S3,Minio etc.) and then use a post task to upload the state after the task executes. e.g:
  name: convert a video
  tasks:
  - name: convert the first 5 seconds of a video
    image: jrottenberg/ffmpeg:3.4-alpine
    volumes:
      - /tmp
    run: |
      ffmpeg -i https://some/input/file.mov -t 5 /tmp/output.mp4
    post:
        - name: upload the chunk to minio
          image: "amazon/aws-cli:2.13.10"
          env:
            AWS_ACCESS_KEY_ID: "..."
            AWS_SECRET_ACCESS_KEY: "..."
            BUCKET_NAME: "some-bucket"
          run:
            aws s3 cp /tmp/output.mp4 s3://$BUCKET_NAME/output.mp4
Enter fullscreen mode Exit fullscreen mode

Hope this helps

Collapse
 
barocsi profile image
barocsi

Is Apache Airflow suitable for such workflow engine solution?

Collapse
 
acoh3n profile image
Arik

I think that there's definitely an overlap in terms of goals between Airflow and Tork -- both designed as a general-purpose workflow engine. But the way they go about it is quite different. For example, workflows in Airflow are written in Python and require redeployment along with Airflow when they change. In Tork, on the other, workflows are written in plain YAML and do not require redeployment of the engine. Both approaches have pros and cons. I think that at the end of the day both engines are suitable for many use cases so it really boils down to personal preference.

Collapse
 
algot profile image
AlgoT

Why not use a message queue like RabbitMQ, or in bigger organisations, Kafka?

Collapse
 
acoh3n profile image
Arik

The default broker implementation is in memory which is suitable for experimentation.

But Tork does support RabbitMQ for a distributed setup:

github.com/runabol/tork#running-in...