DEV Community

Edward Huang
Edward Huang

Posted on • Originally published at pathtosenior.substack.com on

How To Create Dataflow Job with Scio

gray stairs

A group of brilliant engineers in Google led by Paul Nordstrom wants to create a system that does the streaming data process that MapReduce did for batch data processing. They wanted to provide a robust abstraction and scale to a massive size.

In 2008, the MillWheel team was born.

Building MillWheel was no easy feat. Testing and ensuring correctness in the streaming system was especially challenging because it couldn't be rerun like a batch pipeline to produce the same output. As if that wasn't enough, the Lambda architecture complicated matters further, making it difficult to aggregate and reconcile streaming and batch results. Out of such adversity, Google Dataflow was born- a solution combining the best of both worlds into one unified system serving batch and streaming pipelines.

Creating and designing pipelines is a different thought process and framework from writing custom applications. For the past few months, I have spent numerous days and weeks learning the fundamentals and concepts of Apache Beam and Dataflow job to build a dataflow pipeline for my projects.

There aren't as many articles that briefly introduce Dataflow, Apache Beam, and Scio that you can read while commuting by train or bus to work. Thus, I hope this article helps all beginners like me to wrap their heads around these concepts.

What is Dataflow

Dataflow is a serverless, fast, cost-effective service that supports stream and batch processing. It provides portability with processing jobs written using the open-source Apache Beam libraries. Automating infrastructure provisioning and cluster management removes operational overhead from your data engineering teams.

A lot of the data processing usually works by source input, transformation, and a sink. Engineers developed the pipeline and the transformation in the data flow template. They can use the template to deploy and execute a Dataflow job pipeline. Dataflow then assigns the worker virtual machines to execute the data processing, and you can customize the shape and size of these machines.

For instance, to do a batch processing pipeline for the daily user score in a game, the source will be an upstream queue or a big query table. A data flow job is triggered upon some event, either manually or through an event style trigger, and it processes all those data and calculates the average from each user in its cluster. Lastly, the job sends the result down to the sink, usually a database table or a queue.

Google Dataflow uses Apache Beam as its SDK to develop batch and streaming data processing pipelines. Apache Beam is very useful because it unifies API in the big data world. Before Apache Beam, other data processing frameworks, such as Hadoop, Flink, and Spark, provided their way of defining data processing pipelines. Apache Beam lets you write once and use it everywhere.

Some Application use cases Dataflow job

Many dataflow jobs process much raw data into a more useful structure format. Here are the types of use cases for using dataflow jobs:

  • Data Integration and Ingestions. Migrating one storage data to another.

  • Data Preprocessing. Transforming raw events into useful and desirable format (ETL)

  • Sentiment analysis. Utilize a pre-trained machine learning model or external sentiment analysis API to analyze the sentiment of each customer review. Applying transformation on each review.

  • Aggregation and grouping. Aggregate the data based on desired criteria, such as product ID or category.

Essentially, apache beam helps create the framework that transforms and analyze data into valuable insight so that organization can make better decision-making process.

Fundamentals of Apache Beam

Apache Beam framework consists of 3 components: Pipeline, PCollection, and transform. You must understand the definition of these three components before constructing your data processing pipeline.

When you construct a data pipeline, you are creating a function that processes a list of objects. That function will take in an input, which is usually an iterator. An iterator here means either a finite or infinite collection of Items.

Pipeline

A pipeline is like the driver of your data processing task. All components are encapsulated inside a Pipeline. The pipeline is the main function in constructing any program. You provide any execution option that tells Beam how to run the processing task.

PCollection

P stands for parallel. The collection is what you thought the collection was - a distributed dataset. The data, such as a file or a continuous stream, can be bounded or unbounded.

PTransform

PTransform is the data processing operation. It contains various combinations. The purest form of operation is ParDo. ParDo is similar to map in any programming language.

Each transformation is in a worker, while the creation of inputs and pipelines is in the driver.

In the higher level, it takes in a PColelction[A] and returns a PCollection[B], PCollection[A] => PCollection[B].

Aside from ParDo, there is also GroupByKey, Combine, and Flatten.

ParDo is useful for a variety of common data processing operations, including:

  • Filtering a data set: You can consider each element in a collection and either output that element to a new collection or discard it.

  • Performing computations on each element in a data set.

  • Extracting each part of the element in the data set.

Apache Beam to Scio

Apache Beam only supports Java, Python, and Go programming languages. Thus, Spotify created a wrapper around the Apache beam Java SDK for Scala developers. Here are some of the components in Scio that translate to Apache Beam.

  • ScioContext wraps pipeline

  • SCollection wraps PCollection

  • PTransform is implemented as an idiomatic scala method on SCollection, e.g., map, flatmap, filter, reduce.

3 Example of Writing Apache Beam Pipeline with Scio

I will create three dataflow templates in the Spotify Scio library. I found that Spotify Scio documentation has a lot of examples (link). However, a couple of use cases don't exist in the documentation that will be helpful for someone new to Scio and Apache Beam programming, like myself, to write a data processing pipeline.

Note that the example discussed in this article will be anything regarding creating a traditional dataflow template. If you want to learn more about how to deal with flex templates, please email me, and I can create another blog post for flex templates.

Create Dataflow Job With Custom Input

The gotchas that I have when creating a traditional dataflow template with custom input are to use PipelineOptions and DoFn. ContextAndArgs will not work when creating a traditional template because ContextAndArgs will run the main function during template creation and throws a compilation error saying, no argument found.

The example below is of creating a dataflow template with a custom input.

object Template {

 trait InputOptions extends PipelineOptions {

 @Description(Input 1 Example)

 @Required

 def getInputOneValue: ValueProvider[String]

 def setInputOneValue(value: String): Unit

 }

 def main(cmdLineArgs: Array[String]): Unit = {

 val options = PipelineOptionsFactory.fromArgs(cmdlineArgs: _*).withValidation.as(classOf[InputOptions])

 val sc = ScioContext (options)

 sc.withName(Getting Custom Input).applyTransform(ParDo.of(

 new DoFn[String, Unit]{

 @ProcessElement

 private[Template] def processElement(c: ProcessContext): Unit = {

 val opts = c.getPipelineOptions.as(classOf[InputOptions])

 println(sWhat is the args: ${opts.getInputOneValue})

 c.output()

}))

 sc.run()

 }

}
Enter fullscreen mode Exit fullscreen mode

Upload the template by running the following:

sbt "runMain com. path to senior. Scio.examples.extra.Template --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --stagingLocation=gs://[BUCKET]/staging --templateLocation=gs://[BUCKET]/TemplateExample"

Explanation

  1. Custom Input requires PipelineOptions to configure.

  2. We create a pipeline with validation and serialize that input as a class of InputOptions. PipelineOptionsFactory.fromArgs(cmdlineArgs: _*).withValidation.as(classOf[InputOptions])

  3. DoFn is required to retrieve the run-time parameter. It has two parameters that represent the input types and the output types. Thus, we want an input type of string and an output type of unit in this example.

  4. We need to mention that the private value is accessible in the Template static object because of the annotated methods.

  5. c.getPipelineOptions gets the run-time parameter and serialized the input parameter into a InputOptions instance.

  6. c.output is the result of the current transformation.

Ingest Data From Big Query and Do Some Preprocessing

BigQuery rows are represented as TableRow in Java API, which is Map[String, Object].

Scio provided a type-safe way of using annotation for code to generate the TableRow value. However, due to an additional setup for BIgQuery SBT configuration, I decided to follow one of the examples of BigQueryTornadoes in retrieving data from a big query table that is much easier to set up and understand.

Let's assume schema is a single value name that is an input string.

object Template {

 def main(cmdlineArgs: Array[String]): Unit = {

 val (sc, args) = ContextAndArgs(cmdlineArgs)

 val table = Table.Spec(args.getOrElse(input, TestingBQTable))

 sc.bigQueryTable(table)

 .flatMap(r => r.getString(name))

 .countByValue

 .saveAsTextFile(args(output))

 sc.run()

 }

}
Enter fullscreen mode Exit fullscreen mode

Run the job with the following:

sbt "runMain com. path to senior. Scio.examples.Template --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=[BQ_Table_INPUT] --output=output.txt”

Explanation

  1. Table.Spec will get the table name from the input parameter.

  2. Ope a BigQuery table as a SCollection[TableRow]

  3. Get the "name" field in each row.

  4. countByValue will count each unique value in the collection as a (value, count) pair.

  5. Save the output in the output arguments.

Handling Nested SCollection

For example, we want to create a dataflow template that will first take the run-time input and, based on that input, query a big query table to retrieve the desired data and log it.

Getting the run-time input can be done with PipelineOptions (Example 1), which results in a Collection. When querying the big query table, we can use bigQuerySelect (link) with the SQL query string to get the desired data and result in a Collection. By getting the run-time parameter and then querying the big query table, you encounter a nested SCollection[Scollection[TableRow]], which will throw a run-time exception.

The trick here is to use Scio Tap to materialize the dynamic list, use the result to construct a subsequent step.

object Template {

 trait InputOptions extends PipelineOptions {

 @Description(Input 1 Example)

 @Required

 def getInputOneValue: ValueProvider[String]

 def setInputOneValue(value: String): Unit

 }

  case class Input(inputOne: String)

  //This resulted in a SQL query to call bigQuerySelect

   def queryString(input: Input): String = ??? 

 def main(cmdLineArgs: Array[String]): Unit = {

 val options = PipelineOptionsFactory.fromArgs(cmdlineArgs: _*).withValidation.as(classOf[InputOptions])

 val sc = ScioContext (options)

 val closedTap = sc.withName(Getting Custom Input).applyTransform(ParDo.of(

 new DoFn[String, String]{

 @ProcessElement

 private[Template] def processElement(c: ProcessContext): Unit = {

 val opts = c.getPipelineOptions.as(classOf[InputOptions])

 println(sWhat is the args: ${opts.getInputOneValue})

 c.output(Input(inputOne = opts.getInputOneValue))

})).materialize

    try {

     val scioResult = sc.run().waitUntilDone()

     val t1 = scioResult.tap(closedTap)

      val (sc2, _) = ContextAndArgs(cmdLineArgs)

       t1.valuelmap{ input => 

       val q = queryString(input)

       val tbleRows = sc2.withName("Query BQ Table").bigQuerySelect(q)

       }

       sc2.run().waitUntilFinish() 

     } catch {

      case _: UnsupportedOperationException => 

     }

 )


 }
}
Enter fullscreen mode Exit fullscreen mode

Run the job by running:

sbt "runMain com. path to senior. Scio.examples.extra.Template --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] —inputOneValue=[INPUT]“

Explanation

  1. Create a closed Tap in the GettingCustomInput steps.

  2. Crate another job and its associated ScioContext.

  3. Reopen the taps in the new ScioContext

  4. Execute the BigQuery select afterward.

Conclusion

We have covered the fundamentals of Apache Beam, including the Pipeline, PCollection, and PTransform components, which are essential for constructing data processing pipelines. Understanding these components is key to effectively leveraging the capabilities of Apache Beam. Moreover, we have discussed Scio, a Scala wrapper for Apache Beam, which offers a more intuitive and idiomatic way to write Beam pipelines in Scala. Scio simplifies the pipeline development process and seamlessly integrates with Apache Beam's powerful features.

Additional Resources

I recommend reading through the Apache Beam documentation before diving into Scio to understand how Apache Beam works.

As always, if you have any questions, please message me.

Thanks for reading!


💡 Want more actionable advice about Software engineering?

I’m Edward. I started writing as a Software Engineer at Disney Streaming Service, trying to document my learnings as I step into a Senior role. I write about functional programming, Scala, distributed systems, and careers-development.

Subscribe to the FREE newsletter to get actionable advice every week and topics about Scala, Functional Programming, and Distributed Systems: https://pathtosenior.substack.com/

Top comments (0)