DEV Community

Parth Patil
Parth Patil

Posted on

Data Processing with Elixir (Part 2)

In the last post we saw how to process data in Elixir using the Task.async module. It's a great way do parallel processing in Elixir. Please read the previous post to see what the dataset schema looks like in order to understand the code below.

In this part we will see how we can use the Flow library to process Github events. Flow is great library for processing data in Elixir. It allows one to utilise all cores on the machine. Its a much nicer way to write data pipelines than using Task.async as it allows us to use the familiar map and reduce abstractions that we are so used to in the Elixir world.

Following is the code to use the Flow library to process the TSV file chunks in parallel

# BINARY
tab_char = :binary.compile_pattern("\t")

streams =
  for file <- File.ls!("/Users/parth/temp/github_events/clickhouse_tsv") do
    full_path = "/Users/parth/temp/github_events/clickhouse_tsv/#{file}"
    File.stream!(full_path, [{:read_ahead, 100_000}])
  end

streams
|> Flow.from_enumerables()
|> Flow.map(fn line ->
  line |> String.split(tab_char) |> Enum.at(1)
end)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn event_type, acc ->
  Map.update(acc, event_type, 1, &(&1 + 1))
end)
|> Enum.to_list()

# Result
# [{"CommitCommentEvent", 9954564}, {"CreateEvent", 295163106}]
Enter fullscreen mode Exit fullscreen mode

In the above code I create a list of streams - one each for each tsv file chunk I have. Next I pipe it into Flow.from_enumerables that creates flow definition from it. Then Flow.map processes each line in parallel. Under the hood Flow.map will spin up as many mapper processes as there are CPU cores. You can read more about Flow in its excellent documentation.
The mapper is very simple - it splits the line on tab character and returns the 2nd item in the list.

Then the Flow.partition applies a hash function on the results of the Flow.map stage and sends the result to one of the reducers. The strings that have the same hash value will go to the same reducer.

Finally there is a Enum.to_list() at the end of the Flow that actually causes the Flow pipeline to start running. Remember Flow is lazy it won't start running until you call a Enum.to_list or a Flow.run on the pipeline.

When I ran the above pipeline it took around 630 seconds to run and the CPU usage was around consistent 4.5 cpu cores

I wanted to see if using a regex in the Flow.map stage will make things any faster. Below is updated code with regex parsing in the Flow.map stage.

regex = Regex.compile!("[^\t]+\t([^\t]+)\t.+")

streams =
  for file <- File.ls!("/Users/parth/temp/github_events/clickhouse_tsv") do
    full_path = "/Users/parth/temp/github_events/clickhouse_tsv/#{file}"
    File.stream!(full_path, [{:read_ahead, 100_000}])
  end

streams
|> Flow.from_enumerables()
|> Flow.map(fn line ->
  [[_, event_type]] = Regex.scan(regex, line)
  event_type
end)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn event_type, acc ->
  Map.update(acc, event_type, 1, &(&1 + 1))
end)
|> Enum.to_list()
Enter fullscreen mode Exit fullscreen mode

The above code ran in 470 secs which is a ~25% improvement in speed compared to 630 seconds. I suspect the regex approach creates less garbage compared to the previous approach of splitting the string into a list and then picking the second item and hence might be faster.

In the next part we will explore if we can further optimise this code.

Top comments (3)

Collapse
 
nicolkill profile image
Nicol Acosta

Amazing!

I'll think twice when i need to split text

Collapse
 
hungle00 profile image
hungle00

Thank you.
I finished reading two parts, and hope to see the optimized way in the next part.

Collapse
 
shahryarjb profile image
Shahryar Tavakkoli

I'm really looking forward to seeing how you're going to make more improvements. I hope you post new part
Thanks for the efforts