If you haven't heard about Dagster
Dagster is an open source data orchestrator for machine learning, analytics, and ETL that makes your pipelines more reliable and robust. The project's website: https://dagster.io/.
This post is primarily targeted towards people that already have used Dagster and specifically people who want a concrete example of User Code Deployments.
Motivation
I work as a data engineer in a start-up based in Gent (https://www.tengu.io/) and there we are constantly looking for new ways to improve our platform and data pipelines.
We stumbled upon Dagster when it still was very young, around 40 stars on Github. We did some experiments with it but then it couldn't satisfy our needs primarily because we couldn't separate our code across different deployments/services. Meanwhile Dagster has matured (2.2k stars on Github!) and with that came User Code Deployments.
User Code Deployments allow you to separate your pipeline code from the Dagit image. Which means you can update your user code without having to redeploy the entire Dagster system!
You can have separate code repositories per deployment. This allows separate teams within an organization to manage their own images.
This post can serve as extra information next to Dagster's official documentation. The walkthrough consists of four parts:
- Create a docker image that contains your repository with your Dagster pipelines.
- Configure the Helm values.yaml file.
- Setup Dagster with Helm using values.yaml from step two.
- Run our pipeline with Dagit and/or GraphQL.
Let's go!
Prerequisites
- A Kubernetes cluster with Helm installed on it.
- A Docker registry that can be accessed by that Kubernetes cluster.
- (Some experience with Dagster.)
1. Docker image with user code
First we'll create a docker image that contains your user code i.e. your Dagster repositories with Dagster pipelines. This is not the image that will be used for the Dagit instance.
This separates user code from Dagit/Dagster system code, which gives a couple of advantages:
- Increase of robustness.
- Allow separate teams within an organization to manage their own images.
- Reduce inter-dependencies.
Repository structure
Start a new project and create two python files (celery_pipeline.py, repos.py), a yaml file (workspace.yaml) and a Dockerfile. All files should be in the same directory:
project
│
└───image
│ celery_pipeline.py
| repos.py
│ workspace.yaml
| Dockerfile
This is our pipeline file that will execute the same solid five times. Later in this post we will install Dagster with primarily default values (except for user code deployments) which means we will have Celery to execute our pipelines. Paste the following code into the celery_pipeline.py
file:
celery_pipeline.py
"""
A basic pipeline that can be executed with Celery.
"""
from dagster_celery_k8s import celery_k8s_job_executor
from dagster import ModeDefinition, default_executors, pipeline, solid
celery_mode_defs = [ModeDefinition(executor_defs=default_executors + [celery_k8s_job_executor])]
@solid
def not_much(_):
return
@pipeline(mode_defs=celery_mode_defs)
def parallel_pipeline():
for i in range(5):
not_much.alias("not_much_" + str(i))()
Now we'll create a repository that will contain our pipeline. Add this code to repos.py
:
repos.py
"""
Simple repository that contains our parallel pipeline.
"""
import sys
from dagster import repository
from dagster.utils import script_relative_path
sys.path.append(script_relative_path("."))
from celery_pipeline import parallel_pipeline
@repository
def example_repository():
return [parallel_pipeline]
A workspace tells Dagster which repositories exist and where they should be loaded from. Copy and paste the following code into the workspace.yaml
file:
workspace.yaml
# Yaml for loading our single repository from our repos.py file:
load_from:
- python_file: repos.py
At last we have the Dockerfile that we will use to build our user code image. I presume you know how Dockerfiles work. This one does two things:
- Install the necessary Dagster libraries.
- Copy our user code to the root directory (
COPY . /
).
Add these statements to your Dockerfile:
Dockerfile
FROM python:3.7.8-slim
# This tutorial was written using Dagster 0.9.12
ARG DAGSTER_VERSION=0.9.12
RUN apt-get update -yqq && \
apt-get install -yqq cron
RUN pip install \
dagster==${DAGSTER_VERSION} \
dagster-graphql==${DAGSTER_VERSION} \
dagster-postgres==${DAGSTER_VERSION} \
dagster-cron==${DAGSTER_VERSION} \
dagster-celery[flower,redis,kubernetes]==${DAGSTER_VERSION} \
dagster-aws==${DAGSTER_VERSION} \
dagster-k8s==${DAGSTER_VERSION} \
dagster-celery-k8s==${DAGSTER_VERSION} \
dagit==${DAGSTER_VERSION}
COPY . /
Build and push image
Navigate to your image directory and build the docker image. Use a docker registry that can be accessed by your Kubernetes cluster.
# F.e. docker build -t us.gcr.io/company-12345/user_code:0.1 .
docker build -t YOUR_REGISTRY/user_code:0.1 .
docker push YOUR_REGISTRY/user_code:0.1
2. values.yaml
Next up we will configure the values.yaml
file that we will feed to Helm when deploying Dagster.
Download the default values.yaml file and make following changes to support user code deployments:
userDeployments:
# Whether launching user deployments is enabled.
enabled: true
# List of unique deployments using images that contain your
# user code.
deployments:
- name: "k8s-example-user-code-1"
image:
# Use the image that you created in the previous step, without the tag
# F.e. us.gcr.io/company-12345/user_code
repository: "YOUR_REGISTRY/user_code"
tag: 0.1
# Make sure these arguments are the same as your repository file name
# and repository name.
# Arguments to `dagster api grpc`.
dagsterApiGrpcArgs:
- "--python-file"
- "repos.py"
- "--attribute"
- "example_repository"
3. Install Dagster
Start with creating a new namespace:
kubectl create namespace dagster-walkthrough
Now we are ready to deploy Dagster and Dagit to your Kubernetes cluster using Helm. (I deployed this in GKE.)
helm repo add dagster https://dagster-io.github.io/helm
helm install dagster dagster/dagster -n dagster-walkthrough -f /path/to/values.yaml
Give this some time and then you should see something like this:
If you think something went wrong and you want to start over I recommend to delete the whole release and reinstall:
helm delete dagster -n dagster-walkthrough
helm install dagster dagster/dagster -n dagster-walkthrough -f /path/to/values.yaml
4. Run pipeline
Now that we have everything setup we can start running pipelines. I want to show you how you can run your pipeline with the UI (Dagit) or by using GraphQL.
Dagit
Go to the Dagit UI and select the parallel pipeline from the example repository. Click on to the Playground tab and add the following run config:
Run config
execution:
celery-k8s:
config:
job_namespace: dagster-walkthrough
env_config_maps:
- "dagster-pipeline-env"
image_pull_policy: "Always"
storage:
filesystem:
Click in the bottom right on "Launch execution". Dagster will launch a run coordinator Kubernetes Job, which traverses the pipeline run execution plan and submits steps to Celery queues for execution.
The step executions are picked up by Celery workers, and each step execution spawns a Kubernetes Job. You should have similar Kubernetes resources like this:
Execution results
In Dagit you can monitor the progress of your pipeline, it should show five lines. Each line representing a Job:
GraphQL
If you would like to use GraphQL to execute your pipeline you can send a request to the GraphQL endpoint (http://localhost/graphql) or execute it in the Playground.
mutation ExecutePipeline(
$repositoryLocationName: String!
$repositoryName: String!
$pipelineName: String!
$runConfigData: RunConfigData!
$mode: String!
) {
launchPipelineExecution(
executionParams: {
selector: {
repositoryLocationName: $repositoryLocationName
repositoryName: $repositoryName
pipelineName: $pipelineName
}
runConfigData: $runConfigData
mode: $mode
}
) {
__typename
... on LaunchPipelineRunSuccess {
run {
runId
}
}
... on PipelineConfigValidationInvalid {
errors {
message
reason
}
}
... on PythonError {
message
}
}
}
Query variables
{
"repositoryName": "example_repository",
"repositoryLocationName": "k8s-example-user-code-1",
"pipelineName": "parallel_pipeline",
"mode": "default",
"runConfigData": {
"execution": {
"celery-k8s": {
"config": {
"job_namespace": "dagster-walkthrough",
"env_config_maps": [
"dagster-pipeline-env"
],
"image_pull_policy": "Always"
}
}
},
"storage": {
"filesystem": null
}
}
}
Conclusion
With this post I hope that I could save some of your time in setting up Dagster with User Code Deployments. If you have any questions feel free to comment, I'll try to help you out.
This is my first technical blog post, so constructive feedback is appreciated. Thank you for reading!
Resources
https://docs.dagster.io/deploying/celery
https://docs.dagster.io/deploying/k8s_part2
https://docs.dagster.io/overview/graphql-api#launch-a-pipeline-run
https://docs.dagster.io/_apidocs/libraries/dagster_celery_k8s#dagster_celery_k8s.celery_k8s_job_executor
https://github.com/dagster-io/dagster/blob/master/helm/dagster/values.yaml
Top comments (2)
Great work, very helpful. thx.
Thanks for the detailed post. I dont understand the source and purpose of "env_config_maps:
- "dagster-pipeline-env"
Can you please throw some light on that