DEV Community

Cover image for Dagster with User Code Deployments (gRPC)
Michiel Ghyselinck
Michiel Ghyselinck

Posted on • Updated on

Dagster with User Code Deployments (gRPC)

If you haven't heard about Dagster

Dagster is an open source data orchestrator for machine learning, analytics, and ETL that makes your pipelines more reliable and robust. The project's website: https://dagster.io/.

This post is primarily targeted towards people that already have used Dagster and specifically people who want a concrete example of User Code Deployments.

Motivation

I work as a data engineer in a start-up based in Gent (https://www.tengu.io/) and there we are constantly looking for new ways to improve our platform and data pipelines.

We stumbled upon Dagster when it still was very young, around 40 stars on Github. We did some experiments with it but then it couldn't satisfy our needs primarily because we couldn't separate our code across different deployments/services. Meanwhile Dagster has matured (2.2k stars on Github!) and with that came User Code Deployments.

User Code Deployments allow you to separate your pipeline code from the Dagit image. Which means you can update your user code without having to redeploy the entire Dagster system!

You can have separate code repositories per deployment. This allows separate teams within an organization to manage their own images.

This post can serve as extra information next to Dagster's official documentation. The walkthrough consists of four parts:

  1. Create a docker image that contains your repository with your Dagster pipelines.
  2. Configure the Helm values.yaml file.
  3. Setup Dagster with Helm using values.yaml from step two.
  4. Run our pipeline with Dagit and/or GraphQL.

Let's go!

Prerequisites

  • A Kubernetes cluster with Helm installed on it.
  • A Docker registry that can be accessed by that Kubernetes cluster.
  • (Some experience with Dagster.)

1. Docker image with user code

First we'll create a docker image that contains your user code i.e. your Dagster repositories with Dagster pipelines. This is not the image that will be used for the Dagit instance.

This separates user code from Dagit/Dagster system code, which gives a couple of advantages:

  • Increase of robustness.
  • Allow separate teams within an organization to manage their own images.
  • Reduce inter-dependencies.

Repository structure

Start a new project and create two python files (celery_pipeline.py, repos.py), a yaml file (workspace.yaml) and a Dockerfile. All files should be in the same directory:



project
│   
└───image
    │   celery_pipeline.py
    |   repos.py
    │   workspace.yaml
    |   Dockerfile


Enter fullscreen mode Exit fullscreen mode

This is our pipeline file that will execute the same solid five times. Later in this post we will install Dagster with primarily default values (except for user code deployments) which means we will have Celery to execute our pipelines. Paste the following code into the celery_pipeline.py file:

celery_pipeline.py



"""
A basic pipeline that can be executed with Celery.
"""
from dagster_celery_k8s import celery_k8s_job_executor
from dagster import ModeDefinition, default_executors, pipeline, solid

celery_mode_defs = [ModeDefinition(executor_defs=default_executors + [celery_k8s_job_executor])]

@solid
def not_much(_):
    return

@pipeline(mode_defs=celery_mode_defs)
def parallel_pipeline():
    for i in range(5):
        not_much.alias("not_much_" + str(i))()


Enter fullscreen mode Exit fullscreen mode

Now we'll create a repository that will contain our pipeline. Add this code to repos.py:

repos.py



"""
Simple repository that contains our parallel pipeline.
"""
import sys

from dagster import repository
from dagster.utils import script_relative_path

sys.path.append(script_relative_path("."))

from celery_pipeline import parallel_pipeline

@repository
def example_repository():
    return [parallel_pipeline]


Enter fullscreen mode Exit fullscreen mode

A workspace tells Dagster which repositories exist and where they should be loaded from. Copy and paste the following code into the workspace.yaml file:

workspace.yaml



# Yaml for loading our single repository from our repos.py file:
load_from:
  - python_file: repos.py


Enter fullscreen mode Exit fullscreen mode

At last we have the Dockerfile that we will use to build our user code image. I presume you know how Dockerfiles work. This one does two things:

  1. Install the necessary Dagster libraries.
  2. Copy our user code to the root directory (COPY . /).

Add these statements to your Dockerfile:

Dockerfile



FROM python:3.7.8-slim

# This tutorial was written using Dagster 0.9.12
ARG DAGSTER_VERSION=0.9.12

RUN apt-get update -yqq && \
    apt-get install -yqq cron

RUN pip install \
    dagster==${DAGSTER_VERSION} \
    dagster-graphql==${DAGSTER_VERSION} \
    dagster-postgres==${DAGSTER_VERSION} \
    dagster-cron==${DAGSTER_VERSION} \
    dagster-celery[flower,redis,kubernetes]==${DAGSTER_VERSION} \
    dagster-aws==${DAGSTER_VERSION} \
    dagster-k8s==${DAGSTER_VERSION} \
    dagster-celery-k8s==${DAGSTER_VERSION} \
    dagit==${DAGSTER_VERSION}

COPY . /


Enter fullscreen mode Exit fullscreen mode

Build and push image

Navigate to your image directory and build the docker image. Use a docker registry that can be accessed by your Kubernetes cluster.



# F.e. docker build -t us.gcr.io/company-12345/user_code:0.1 .
docker build -t YOUR_REGISTRY/user_code:0.1 .
docker push YOUR_REGISTRY/user_code:0.1


Enter fullscreen mode Exit fullscreen mode

2. values.yaml

Next up we will configure the values.yaml file that we will feed to Helm when deploying Dagster.

Download the default values.yaml file and make following changes to support user code deployments:



userDeployments:
  # Whether launching user deployments is enabled.
  enabled: true
  # List of unique deployments using images that contain your
  # user code.
  deployments:
    - name: "k8s-example-user-code-1"
      image:
        # Use the image that you created in the previous step, without the tag
        # F.e. us.gcr.io/company-12345/user_code
        repository: "YOUR_REGISTRY/user_code"
        tag: 0.1
      # Make sure these arguments are the same as your repository file name
      # and repository name.
      # Arguments to `dagster api grpc`.
      dagsterApiGrpcArgs:
        - "--python-file"
        - "repos.py"
        - "--attribute"
        - "example_repository"


Enter fullscreen mode Exit fullscreen mode

3. Install Dagster

Start with creating a new namespace:



kubectl create namespace dagster-walkthrough


Enter fullscreen mode Exit fullscreen mode

Now we are ready to deploy Dagster and Dagit to your Kubernetes cluster using Helm. (I deployed this in GKE.)



helm repo add dagster https://dagster-io.github.io/helm
helm install dagster dagster/dagster -n dagster-walkthrough -f /path/to/values.yaml


Enter fullscreen mode Exit fullscreen mode

Give this some time and then you should see something like this:
Alt Text

If you think something went wrong and you want to start over I recommend to delete the whole release and reinstall:



helm delete dagster -n dagster-walkthrough
helm install dagster dagster/dagster -n dagster-walkthrough -f /path/to/values.yaml


Enter fullscreen mode Exit fullscreen mode

4. Run pipeline

Now that we have everything setup we can start running pipelines. I want to show you how you can run your pipeline with the UI (Dagit) or by using GraphQL.

Dagit

Go to the Dagit UI and select the parallel pipeline from the example repository. Click on to the Playground tab and add the following run config:

Run config



execution:
  celery-k8s:
    config:
      job_namespace: dagster-walkthrough
      env_config_maps:
        - "dagster-pipeline-env"
      image_pull_policy: "Always"
storage:
  filesystem:


Enter fullscreen mode Exit fullscreen mode

Click in the bottom right on "Launch execution". Dagster will launch a run coordinator Kubernetes Job, which traverses the pipeline run execution plan and submits steps to Celery queues for execution.

The step executions are picked up by Celery workers, and each step execution spawns a Kubernetes Job. You should have similar Kubernetes resources like this:

Alt Text

Execution results

In Dagit you can monitor the progress of your pipeline, it should show five lines. Each line representing a Job:

Alt Text

GraphQL

If you would like to use GraphQL to execute your pipeline you can send a request to the GraphQL endpoint (http://localhost/graphql) or execute it in the Playground.



mutation ExecutePipeline(
  $repositoryLocationName: String!
  $repositoryName: String!
  $pipelineName: String!
  $runConfigData: RunConfigData!
  $mode: String!
) {
  launchPipelineExecution(
    executionParams: {
      selector: {
        repositoryLocationName: $repositoryLocationName
        repositoryName: $repositoryName
        pipelineName: $pipelineName
      }
      runConfigData: $runConfigData
      mode: $mode
    }
  ) {
    __typename
    ... on LaunchPipelineRunSuccess {
      run {
        runId
      }
    }
    ... on PipelineConfigValidationInvalid {
      errors {
        message
        reason
      }
    }
    ... on PythonError {
      message
    }
  }
}


Enter fullscreen mode Exit fullscreen mode

Query variables



{
"repositoryName": "example_repository",
"repositoryLocationName": "k8s-example-user-code-1",
"pipelineName": "parallel_pipeline",
"mode": "default",
"runConfigData": {
"execution": {
"celery-k8s": {
"config": {
"job_namespace": "dagster-walkthrough",
"env_config_maps": [
"dagster-pipeline-env"
],
"image_pull_policy": "Always"
}
}
},
"storage": {
"filesystem": null
}
}
}
Enter fullscreen mode Exit fullscreen mode




Conclusion

With this post I hope that I could save some of your time in setting up Dagster with User Code Deployments. If you have any questions feel free to comment, I'll try to help you out.

This is my first technical blog post, so constructive feedback is appreciated. Thank you for reading!

Resources

https://docs.dagster.io/deploying/celery
https://docs.dagster.io/deploying/k8s_part2
https://docs.dagster.io/overview/graphql-api#launch-a-pipeline-run
https://docs.dagster.io/_apidocs/libraries/dagster_celery_k8s#dagster_celery_k8s.celery_k8s_job_executor
https://github.com/dagster-io/dagster/blob/master/helm/dagster/values.yaml

Top comments (2)

Collapse
 
blackstrip profile image
Henry

Great work, very helpful. thx.

Collapse
 
somasays profile image
Somasundaram Sekar

Thanks for the detailed post. I dont understand the source and purpose of "env_config_maps:
- "dagster-pipeline-env"
Can you please throw some light on that