Are you passionate about Elixir and Phoenix? Subscribe to the Poeticoding newsletter and join happy regular readers and receive new posts by email.
- Part 1 – Elixir Stream to process large HTTP responses on the fly
- Part 2 – this article
You find the code, of this and the previous article, on poeticoding/httpstream_articles GitHub repo. The code you find at this repo is not meant for production use, is just part of the experiments of these articles.
In the last article we've put together the concepts of HTTP async response and processing large files using Elixir Streams, to be able to easily download and save a large file using our HTTPStream module.
The stream we've built around HTTPoison is able to emit the chunks of an HTTP chunked response. In this way we avoid the huge memory impact we could have saving the whole response into memory.
In the last example of the previous article, each chunk is streamed down to the pipeline's functions, saved into our local file and then garbage collected. Then, it was easy to add compression just including the StreamGzip.gzip
function in the pipeline.
Text response
Previously we where treating the response just as a binary, delegating the compression to the StreamGzip
library and savings the result into a file.
In this part we want to process the lines of a large text file. To make it fun and a bit more realistic, we are going to get some inspiration from the first day of the Advent of code challenge. There are also some cool videos made by José Valim, where he uses Elixir to solve the challenges of the Advent of Code.
I've generated and uploaded a 125Mb text file at this link
https://poeticoding-data.sfo2.digitaloceanspaces.com/httpstream/numbers.txt
with 30 million lines. In each line there is a random integer from -1000 to 1000
767
138
-701
98
...
We want to:
- 1. process this text file on the fly, while downloading it, line by line.
- 2. like we did in the first part, we want to avoid big memory spikes (we cannot load the full response into memory)
- 3. last but not least, if we want to process just the first 30 lines of text, we just want to download the first few chunks needed.
To do just some tests, you can also find a smaller version of 4.5Mb with 1 million lines
https://poeticoding-data.sfo2.digitaloceanspaces.com/httpstream/numbers_small.txt
which is the same you find in the poeticoding/httpstream_articles repo.
What we are going to build
To process an HTTP response line by line, we need something that works like
File.stream! "numbers.txt" #, [], :line
which creates a stream that opens a file and, by default, emits each line. We want to do something similar with our HTTP async response.
Instead of changing the current HTTPStream module implementation, we want to take full advantage of the Elixir Stream's composability, writing a function we just add to the pipeline, that converts the chunks to lines.
Chunks to Lines
Ok, but why can't we just use the implementation we've done already for the binary file?
The problem with the previous implementation is that the stream we've built emits chunks without distinguishing the lines. We need an Elixir Stream that emits lines instead of chunks.
Unfortunately this is not as easy at it seems: splitting chunks into lines is not enough. This could be true only in a easy and specific case:
["767\n138\n","-701\n98\n","-504\n22"]
where each element of the list is a chunk and each chunk ends with a new line. In this easy case we just need to split each chunk, filter out the empty strings, emitting the result
iex> String.split("767\n138\n")
["767","138",""]
The problem though, is that we do not have any guarantee that all the chunks will be in this form. It's actually more than likely that multiple parts of the same line will be over different chunks
["767\n13","8\n-701\n98\n-","504\n22"]
You see how in this case the first two chunks don't end with a newline character \n
and the second chunk starts with the final part of the line started in the previous chunk.
Stream.transform
We are now going to implement a function HTTPStream.lines(chunks_enum)
which takes a stream of chunks as an input and returns a stream of lines.
We'll then use this function as part of our pipeline in this way:
HTTPStream.get("https://.../numbers.txt")
|> HTTPStream.lines()
|> Stream.map(fn line -> ... end)
...
We don't want to use Enum.reduce
since this function is greedy and it would hold all the lines in memory. Instead, we can use Stream.transform
which is lazy
Stream.transform(enum, initial_accumulator, fn element, acc ->
...
{list_of_elements_to_emit, new_accumulator}
end)
The first element of the returned tuple is a list of elements we are going to emit, in our case lines.
We see from the image that the accumulator acc
is the last part of the first chunk and is prepended to the second chunk.
We need a function, we've called next_lines
, that splits a chunk into separate lines and returns a tuple with two elements: the lines we want to emit and the last part of the chunk.
Stream.transform(chunks_enum, "", fn chunk, prev ->
{lines, last_part} = next_lines(chunk, prev)
{lines, last_part}
end)
Our initial accumulator is an empty string. This empty string will be passed as prev
while processing the first chunk.
Recursive implementation
We now need to write the next_lines(chunk,prev)
function. We can implement it using recursion, going through each single UTF-8 character looking for newlines. Remember that to behave like File.stream!
we need to preserve the newlines \n
.
# https://github.com/poeticoding/httpstream_articles/blob/36bc2167b7024a990b04b28f9447fb9bc0e0310e/lib/http_stream.ex#L78
def next_lines(chunk, prev), do: next_lines(chunk,prev,[])
def next_lines(<<"\n"::utf8, rest::binary>>, prev, lines) do
next_lines(rest,"",[prev <> "\n" | lines])
end
def next_lines(<<c::utf8, rest::binary>>, prev, lines) do
next_lines(rest,<<prev::binary, c::utf8>>,lines)
end
def next_lines(<<>>, prev, lines), do: {Enum.reverse(lines), prev}
Ok, there is a lot happening here. Let's start from the beginning
next_lines(chunk,prev\\"")
The first clause is just a helper. We pass achunk
, and the accumulatorprev
. The function callsnext_lines/3
, passing an empty list of lines as third argument.next_lines(<<"\n"::utf8, rest::binary>>, prev, lines)
We are pattern matching a sequence of UTF-8 characters. This function is called only when we reach a newline character. We then call recursivelynext_lines
passing therest
of the chunk we need to process, setting the accumulator to an empty string""
, passing list of lines where we've prepended the accumulated line,prev
.next_lines(<<c::utf8, rest::binary>>,prev,lines)
Since every timec
is a newline the clause above is matched, in this clausec != "\n"
so we just need to append it toprev
and recursively callnext_lines
going through therest
of the chunk.next_lines(<<>>,prev,lines)
<<>>
is an empty binary and means we've reached the end of the chunk. For performance reason we've pepended the lines.
[prev <> "\n" | lines]
is faster thanlines ++ [prev]
, especially when thelines
list is big. When we reach the end of our recursion, we need to reverse the lines' list.
Let's try this function on iex
# ["767\n13","8\n-701\n98\n-","504\n22"]
iex> {lines, prev} = HTTPStream.next_lines "767\n13"
{["767\n"], "13"}
iex> {lines, prev} = HTTPStream.next_lines "8\n-701\n98\n-", prev
{["138\n", "-701\n", "98\n"], "-"}
iex> {lines, prev} = HTTPStream.next_lines "504\n22", prev
{["-504\n"], "22"}
iex> prev
"22"
Perfect, exactly what we need 👍. We go through the chunks' list passing the obtained prev
to the next call.
HTTPStream.lines
next_lines
returns the same tuple we need to return in the reducer function passed to Stream.transform/3
. We can then write HTTPStream.lines/1
in a nice and compact way
def lines(chunks_enum) do
chunks_enum
|> Stream.transform("",&next_lines/2)
end
Let's try it on iex
iex> ["767\n13","8\n-701\n98\n-","504\n22"] \
...> |> HTTPStream.lines() \
...> |> Enum.each(&IO.inspect/1)
"767\n"
"138\n"
"-701\n"
"98\n"
"-504\n"
:ok
Mmm 🤔 ... there is something wrong here. The last line "22"
is missing.
Emitting last line
The reason why it's not emitted is because it doesn't end with a newline and it remains stuck as an accumulator (prev
). We have to emit it when the stream is ended, but using Stream.transform/3
the reducer function doesn't know when the stream is going to end! (Please let me know in the comments if you know there is a way to catch the end of a stream)
A workaround we can use, to let next_lines/2
know when the stream reached the end, is to add an :end
atom at the end of our chunks' stream. next_lines/2
than has to handle the case with a specific clause
def next_lines(:end,prev), do: { [prev], ""}
which emits the final line. The accumulator is set to an empty string but it could be anything at this point.
Let's try it again on iex
iex> ["767\n13","8\n-701\n98\n-","504\n22", :end] \
...> |> HTTPStream.lines() \
...> |> Enum.each(&IO.inspect/1)
"767\n"
"138\n"
"-701\n"
"98\n"
"-504\n"
"22"
:ok
Great, it works! 🎉
But now how can we easily add a :end
atom at the end of our HTTP chunked response stream?
Emitting :end at the end of the streamed HTTP response
If you have an alternative way of doing this, please share it in the comments section below! 👩💻👨💻
We need to make a small but significant change to our HTTPStream.get(url)
function.
# HTTPStream.get/1
def get(url) do
Stream.resource(
start_fun,
# next_fun
fn
#first clause
%HTTPoison.AsyncResponse{id: id}=resp ->
receive do
...
%HTTPoison.AsyncEnd{id: ^id}->
# emitting :end
{[:end], {:end, resp}}
end
#second clause
{:end, resp} ->
{:halt, resp}
end,
after_fun
)
end
1. When we receive the
%HTTPoison.AsyncEnd{}
message we know that we've reached the end of the HTTP response. Instead of just halting the stream, we emit the:end
and set a new accumulator{:end, resp}
, whereresp
is the%HTTPoison.AsyncResponse{}
struct.2. After emitting
:end
,next_fun
is called again. This time the accumulator is the one we've just set,{:end, resp}
, which pattern matches the second clause of ournext_fun
.
Something I don't like about this change, is that now we always have to handle the final :end
, especially when saving the stream into a file.
HTTPStream.get("https://.../large_image.tiff")
|> Stream.reject(&match?(:end,&1))
|> Stream.into(File.stream!("image.tiff.gz"))
|> Stream.run()
The function in second line pattern matches each chunk and filters out the :end
atom.
It's maybe better to enable and disable the final :end
via an option passed to HTTPStream.get(url, emit_end)
, like the version you see on GitHub.
Sum numbers in 30M lines remote file
Let's use what we've implemented to process a 125MB remote text file with 30M numbers, each one separated by a newline \n
. While processing the lines on the fly, we sum the numbers to get the total.
"https://poeticoding-data.sfo2.digitaloceanspaces.com/httpstream/numbers.txt"
|> HTTPStream.get()
|> HTTPStream.lines()
|> Stream.map(fn line->
case Integer.parse(line) do
{num, _} -> num
:error -> 0
end
end)
|> Enum.sum()
|> IO.puts()
## OUTPUT
STATUS: : 200
HEADERS: : [
{"Content-Length", "131732144"},
{"Accept-Ranges", "bytes"},
{"Content-Type", "text/plain"},
...
]
12468816
Fantastic, we got the result: 12468816! 🎉
With the Erlang Observer sometime I've seen some memory spike (still below 100Mb) and sometime the line is almost flat. I think this could be related to how big the chunks are.
In the GitHub repo you find a memory_test.exs
script you can play with, to see the HTTPStream.line
memory allocation with different chunks sizes. Even with a 4.5Mb file, if we exaggerate with the chunk size (like 2_000_000
) we have a huge memory spike. With 2_000
the line is almost flat.
It would be great to be able to set a maximum chunks' size in the HTTPoison options, unfortunately I didn't find any option to do that.
String.split
Let's see another way of writing the HTTPStream.lines(chunks)
function. In the previous implementation we've used recursion to go through each single character of the chunk and to find newlines.
If we don't need to preserve newlines, we can use String.split/2
along with Stream.transform/3
.
def lines(enum) do
enum
|> Stream.transform("",fn
:end, prev ->
{[prev],""}
chunk, prev ->
[last_line | lines] =
String.split(prev <> chunk,"\n")
|> Enum.reverse()
{Enum.reverse(lines),last_line}
end)
end
The idea is similar to what we did before. We split the chunk into lines and the last element of the list becomes the accumulator, which is concatenated to the next chunk.
See how we extract the last item of the list of lines.
lines = String.split(prev <> chunk, "\n")
[last_line | rev_lines] = Enum.reverse(lines)
{ Enum.reverse(rev_lines), last_line }
- We split the concatenated string
prev <> chunk
obtaining a list of lines. We now need to get the last element of the list. - We reverse the list, creating a list of new elements. Now, the head of
Enum.reverse(lines)
is the last element oflines
. -
rev_lines
is the list of lines we want to emit, but in the wrong order, so we emitEnum.reverse(rev_lines)
and setlast_line
as the next accumulator.
Let's see an example
iex> chunks = ["767\n138\n-701\n98\n-5", "04\n22\n375"]
iex> [chunk | remaining_chunks] = chunks
iex> chunk
"767\n138\n-701\n98\n-5"
iex> lines = String.split(chunk,"\n")
["767", "138", "-701", "98", "-5"]
iex> [last_line | rev_lines] = Enum.reverse(lines)
["-5", "98", "-701", "138", "767"]
iex> last_line
"-5"
iex> lines_to_emit = Enum.reverse(rev_lines)
["767", "138", "-701", "98"]
#let's process another chunk
iex> [next_chunk | remaining_chunks] = remaining_chunks
iex> next_chunk
"04\n22\n375"
# we need to prepend last_line
iex> chunk = last_line <> next_chunk
"-504\n22\n375"
iex> lines = String.split(chunk,"\n")
["-504", "22", "375"]
It turns out that this implementation is much faster than the previous one. Let's see some benchmarking
Benchmark HTTPStream.line
If you want to run this benchmark on your computer, you find everything on poeticoding/httpstream_articles.
Let's consider the stream created by
File.stream!("numbers_small.txt")
which by default emits the lines of a file. We want to compare the speed of this function with HTTPStream.line
.
Instead of using a remote file, we are going to use a smaller ~4Mb version, numbers_small.txt
you can find on the GitHub repo.
We need to now find a way to simulate the stream of chunks made by HTTPStream.get
.
chunk_stream = File.stream!("numbers_small.txt",[],16_000)
Passing chunks size as third argument of File.stream!/3
, the stream, instead of lines, will emit chunks (in this case of 16kb).
In the script bench_lines.exs we use Benchee
# bench_lines.exs
chunk_stream = File.stream!("numbers_small.txt",[],16_000)
lines_stream = File.stream!("numbers_small.txt", [], :line)
stream_sum = fn enum ->
enum
|> Stream.map(fn line->
case Integer.parse(line) do
{num, _} -> num
:error -> 0
end
end)
|> Enum.sum()
end
Benchee.run(%{
"File.stream! :line" => fn ->
lines_stream
|> stream_sum.()
end,
"with next_lines" => fn ->
chunk_stream
|> HTTPStream.lines(:next_lines)
|> stream_sum.()
end,
"with String.split" => fn ->
chunk_stream
|> HTTPStream.lines(:string_split)
|> stream_sum.()
end
},
time: 10
)
$ mix run bench_lines.exs
Name ips average
with String.split 3.35 298.30 ms
File.stream! :line 2.08 481.22 ms
with next_lines 1.14 875.01 ms
Comparison:
with String.split 3.35
File.stream! :line 2.08 - 1.61x slower +182.93 ms
with next_lines 1.14 - 2.93x slower +576.71 ms
The interesting thing is that the version "with String.split"
is even faster than "File.stream! :line"
, while the first implementation we did is the slowest.
Honestly, I don't know why the version "with String.split"
is the fastest one. Maybe some optimisation in the String.split/2
function? If you are interested about these details, I've opened a topic about this on the elixir forum: Streaming lines from an enum of chunks.
Reducing the chunk size from 16_000
to 2_000
we see how both "with String.split"
and "with next_lines"
are a bit faster
chunk_stream = File.stream!("numbers_small.txt",[],2000)
Name ips average
with String.split 3.79 263.67 ms
File.stream! :line 2.06 484.98 ms
with next_lines 1.42 706.48 ms
Comparison:
with String.split 3.79
File.stream! :line 2.06 - 1.84x slower +221.31 ms
with next_lines 1.42 - 2.68x slower +442.81 ms
With a smaller chunk all the split, reverse and concatenation operations are faster.
Sum the first 30 numbers
The stream of lines we've built is lazy, this means we can take just the first 30 lines and halt the stream, without downloading the whole HTTP response.
To take the first 30 lines we use Enum.take/2
.
"https://poeticoding-data.sfo2.digitaloceanspaces.com/httpstream/numbers.txt"
|> HTTPStream.get()
|> HTTPStream.lines()
|> Stream.map(fn line->
case Integer.parse(line) do
{num, _} -> num
:error -> 0
end
end)
|> Enum.take(30)
|> Enum.sum()
|> IO.puts()
You find this code in sum_30_lines.exs
$ mix run sum_30_lines.exs
STATUS: : 200
HEADERS: : [
{"Content-Length", "131732144"},
{"Accept-Ranges", "bytes"},
{"Content-Type", "text/plain"},
...
]
END_FUN
1393
It should be really quick. Once took 30 lines, the stream is halted and the HTTP connection is closed.
Are you passionate about Elixir and Phoenix? Subscribe to the Poeticoding newsletter and join happy regular readers and receive new posts by email.
Top comments (0)