DEV Community

Thierry Njike for Zenika

Posted on • Updated on

Cloud run jobs, your parallel tasks solution

Multitask Cloud run job

Introduction

We usually need to accelerate our tasks without using a lot of resources. It's now possible on Cloud Run. Jobs is a serverless brand-new feature of Cloud Run that is GA since March 23rd 2023. In this article, I will first compare Cloud run and Cloud Functions (1st gen and 2nd gen), then I will explain how Cloud Run jobs works. Then, I will show some use cases where you could use Cloud Run jobs instead of other serverless options. Finally, there will be a basic demo to apply what is explained in the previous parts.

Comparison between serverless options

This is a diagram that shows which product is more suitable depending on the job to perform.

Serverless use cases

How does it work ?

Cloud Run jobs can execute a single task or a group of tasks as well. When you create a job, you set the number of tasks that contains your job. This number is saved as an environment variable that you can directly use in your code without defining it yourself. Each task is identified by its index, starting from 0 which is also saved as an environment variable directly useable in the code. So, after the creation of your job, Cloud Run creates 2 environment variables which are:

  • CLOUD_RUN_TASK_COUNT: which is the total number of tasks of the job
  • CLOUD_RUN_TASK_INDEX: which is the index of the current task

These environment variables are not visible on the job's configuration page. Theirs names are conventional.

When creating a job, you must select an image to use. This image could be stored on Artifact registry, or Docker. For other container registries, follow the steps described on this page. But Google recommends to use Artifact registry. If you face an issue about violated constraint (low carbon), follow the step described in my previous article.

A job can be split into up to 10,000 tasks. Each task creates a new instance of the image and run independently of the others. If a task fails, the job fails too, even if all the others ended successfully.

Use cases examples of Cloud Run jobs.

1- Large dataset

Let's suppose a situation where we have to process a large dataset of 1milion lines. Cloud Run jobs could help us to split the dataset into several smaller datasets and process them separately. Thereby, we can split the job into 100 tasks and process 10000 lines per task.

2- Replications

Imagine that we want to replicate data from 3 external databases to cloud storage. You can do it with a single cloud run job and assign a task per database. So, depending on the index of the task, the corresponding database credentials will be used, without duplicating code.

3- Unsupported language

Cloud functions supports only 7 languages (Node.js, Python, Go, Java, C#, Ruby and PHP). So, you won't be able to use cloud functions with a bash code. One of the advantages of Cloud run jobs is that the code language does not matter because it uses image's containers. Thereby, you just have to create your image and set an entry point.

We can imagine a lot of use cases of cloud run jobs. Now let's jump into an example to show you directly how to use it from the console.

Example

In this example, we will create a cloud run job with 5 tasks. The python code writes the result of a BigQuery query on cloud storage. The BigQuery dataframe result will be split in 5 parts and each part will be written in a separated file in csv format.

1- Let's write the code

If you use the same code to test, do not forget to set your environment variables when creating the job. The python code below:

# librairies imports

import os
import pandas as pd
import numpy as np
from google.cloud import bigquery, storage
from dotenv import load_dotenv


def run_query(project_id, dataset, table):

    # create a bigquery client
    client = bigquery.Client()

    query = f"""SELECT *
    FROM `{project_id}.{dataset}.{table}`
    LIMIT 1000
    """
    #Notes : 
    #avoid SELECT * in real problems. we use it here just to illustrate
    #LIMIT 1000 does not have impact on the cost, the same amount of data are retrieved but filtered in the result.

    # run the sql query
    query_job = client.query(query)

    # we convert the iterator object into pandas dataframe
    rows = []
    for row in query_job.result():
        rows.append(dict(row.items()))

    df = pd.DataFrame(rows)

    return df, len(df)


if __name__ == '__main__':

    # we load all the environment variables
    load_dotenv()

    # we get all the environment variables
    project_id = os.environ.get("PROJECT_ID")
    bucket_name = os.environ.get("BUCKET_NAME")
    dataset = os.environ.get("DATASET")
    table = os.environ.get("TABLE")
    index = int(os.environ.get("CLOUD_RUN_TASK_INDEX")) 
    nb_task = int(os.environ.get("CLOUD_RUN_TASK_COUNT"))


    # the filename root
    filename = "test-parallel-task"

    # we run the query and get the result as a dataframe and the length of the dataframe
    data, n = run_query(project_id, dataset, table)

    # the length of each task dataframe
    len_task_df = n//nb_task
    begin = index*len_task_df
    end = begin + len_task_df if index != nb_task-1 else n   #we write like this to avoid data loss in case of imperfect division

    # we write the corresponding file on cloud storage
    data[begin:end].to_csv(f'gs://{bucket_name}/{filename}_{index}.csv')

Enter fullscreen mode Exit fullscreen mode

As you can see in the last line of the code, I'm writing directly on cloud storage using pandas. This is only possible if you add the gcsfs library in your requirements.txt. Your requirements.txt should look like below.

gcsfs==2023.6.0
google-cloud-bigquery==3.11.1
google-cloud-storage==2.9.0
numpy==1.24.3
pandas==2.0.2
python-dotenv==1.0.0
Enter fullscreen mode Exit fullscreen mode

Note : you can use any other language you want to perform this. But for this case, you can only use supported languages for GCP client API.

2- Image creation

To create the image to use, let's write the Dockerfile first:

# we use the version 3.10 of the python image 
FROM python:3.10

# we define a work directory
WORKDIR /app

# we copy the code dir into the work directory
COPY requirements.txt /app

# we install the dependencies
RUN pip install --no-cache-dir -r requirements.txt

# we copy the code dir into the work directory
COPY . /app

# we execute the code with the following command
CMD [ "python", "main.py" ]
Enter fullscreen mode Exit fullscreen mode

Then, build your image. If you use GCP Artifact registry, follow the part 1 and 2 of my previous article to build your image.

3- Job creation
From the GCP console, search Cloud Run, select the jobs tab and click on create job

job creation

Then, fill the first part of the form. If you use GCP Artifact registry, use the SELECT button to browse and find your image. In the number of tasks field, enter 5.

job info

Now, click on the arrow to reveal the config part. Switch between tabs to configure your job as you want and click on create.

job config

Once created, your job should appear in the job list when you select the JOBS tab on cloud run homepage.

jobs tab

Click on the job and switch between tabs to see job info. The tab History is empty because there is no execution yet. To set a trigger, click on the trigger tab and schedule your job.

job tabs

Click on EXECUTE to start the job and return to the history tab to see the changes. You should see an execution in progress. If you click on the execution, you will see the progress of each task execution. To check the parallelism, you can click on each task to see the start time. You can also check the logs of each task separately for debug purposes.

Tasks info

Once the job is completed, we can check the result on cloud storage to verify if the files have been created as expected.

Cloud storage results

We can see that the suffix of the file names are the indexes of the tasks. In the other hand, we can also see the creation date of the files. We see that 3 of them have been created at the same time because of parallelism. Now, open the file and verify if the contents are what is expected, depending on the index of the task.

This example is just a basic one to help you understand how it works. We can perform more complex tasks with it, as described in the use cases part.

Hope this article will help 🚀

Top comments (4)

Collapse
 
bcouetil profile image
Benoit COUETIL 💫

Good job 👍

If I may : you can use code highlighting on dev.to : dev.to/hoverbaum/how-to-add-code-h...

Collapse
 
thierrynjike profile image
Thierry Njike

Hey Benoît,
Thanks for sharing

Collapse
 
aviratage profile image
Avirat

Thanks for sharing!
I have some doubts. How and where you are setting environment variables? like bucket_name, dataset, table, etc.
Can we create different job executions with different bucket and table names?

Collapse
 
beatlesfansunit profile image
Beatlesfansunite • Edited

This is not a good demo, no parallelism demonstrated. The main function runs one job that updates a 1000 records.