DEV Community

Cover image for Elixir Stream to process  large HTTP responses on the fly
Alvise Susmel
Alvise Susmel

Posted on • Updated on • Originally published at poeticoding.com

Elixir Stream to process large HTTP responses on the fly

Are you passionate about Elixir and Phoenix? Subscribe to the Poeticoding newsletter and join happy regular readers and receive new posts by email.

This is the first of a two-part article where we see a handy way of handling async HTTP responses.

  • In this first part we see how make an Elixir Stream out of a large HTTP response.
  • In the second part we will extend the implementation so we can download a large text file, processing lines on the fly.

First and foremost … WHY?

Why should we handle HTTP responses with Streams, when we can just simply use something like HTTPoison.get to receive a response with the data we need?

We already saw how HTTPoison, by default, saves the whole HTTP response into memory. We’ve also seen that this can be avoided using asynchronous requests, but this forces use everything to handle low level HTTPoison async process messages.

Wouldn’t be great to be able to write something like this?

HTTPStream.get("https://.../large_file.csv", :line)
|> Stream.filter ...
|> Stream.map ...
|> Enum.take(10)

Just few lines and a wonderful pipeline syntax to process a large text file on the fly. But we could also use streams to download and compress binary files, like in this way

HTTPStream.get("https://.../large_image.tiff")
|> StreamGzip.gzip
|> Stream.into(File.stream!("image.tiff.gz"))
|> Stream.run

This approach brings many advantages:

  • We can take advantage of Elixir Streams to easily create beautiful pipelines, getting code clarity and reusability.
  • A big HTTP response is divided into chunks, avoiding to incur into memory issues
  • We can process a file of any dimension on the fly. We can even process the first few hundred lines of a large CSV file, without needing to download all of it.

It turns out that thanks to native Elixir Stream’s functions, it’s not that hard to create our HTTP Stream.

First example – a large image file

Before start coding our implementation, we need to find an easy example to play with. We can start with the example of HTTPoison Async Request article, where we downloaded a large image. But this time the goal is to do it just using Elixir streams.

The Whirlpool Galaxy

The original image is a TIFF file of 204.9Mb, which is enough to do a first test of our implementation. The URL we are going to use is: https://www.spacetelescope.org/static/archives/images/original/heic0506a.tif

Implementation

Stream.resource/3

With HTTPoison Async Request we have a good starting point: instead of getting the whole response with a struct in memory, the HTTP response is divided in chunks, sent one by one to the process’ mailbox.

Stream.resource/3 is exactly what we need to wrap HTTPoison.

Stream.resource(
  start_fun,
  next_fun,
  end_fun
)

start_fun

We now define a module called HTTPStream and a function get(url), in which we build our stream and return. The first function passed to Stream.resource is the start_fun, in which we describe how to start the enumeration, in our case making an async HTTP request.

defmodule HTTPStream do

  def get(url) do
    Stream.resource(

      #start_fun
      fn -> 
        HTTPoison.get!(
           url,  %{}, 
           [stream_to: self(), async: :once]
        ) 
      end,

     next_fun,
     end_fun
    )
  end
end

When passing the options [stream_to: self(), async: once], HTTPoison.get! returns immediately a %HTTPoison.AsyncResponse{id: #Reference<...>} struct, which is then passed to next_fun.

Remember that a stream is lazy, the functions passed to Stream.resource/3 won’t run immediately. In this way we can build a stream pipeline that actually makes the HTTP request only when a function tries to enumerate it.

next_fun – AsyncStatus

In the second function, next_fun, we receive and handle the data coming from the asynchronous HTTP response. We now write just the first part to handle the status code message, %HTTPoison.AsyncStatus{}.

# next_fun
fn %HTTPoison.AsyncResponse{id: id}=resp ->
  receive do
    %HTTPoison.AsyncStatus{id: ^id, code: code}->
      IO.inspect(code, label: "Status code: ")
      {:halt, resp}
  after
    5_000 -> raise "receive timeout"
  end
end

The next_fun we provide expects the async response %HTTPoison.AsyncResponse{} struct returned by start_fun. We used its id to selectively receive only the response’s messages. At the moment we’ve implemented just the status code part, so we can test it straightaway seeing if it works.

  • next_fun must return a tuple, this can be {[...], resp} when we want to pass elements to the pipeline, or {:halt, resp} when we want to stop the enumeration.
  • %HTTPoison.AsyncStatus{} is the first message we receive, so we print the code and return {:halt, resp}, since we just want to test this part and stop the enumeration.
  • The end_fun is called when the stream is halted. In this function we clean up the resources closing the connection. In our case we stop the asynchronous response calling :hackney.stop_async(resp.id).

It’s time to see quickly how this first part works on iex

$ iex -S mix
Erlang/OTP 21 ...
Interactive Elixir (1.8.0)

iex> image_url = "https://www.spacetelescope.org/static/archives/images/original/heic0506a.tif"

iex> image_url
...> |> HTTPStream.get()
...> |> Stream.run
Status code: : 200
:ok

Great it, works! 🎉 It gets and prints the status code and stops, without downloading the whole file.

AsyncHeaders, AsyncChunk and AsyncEnd

#next_fun
fn %HTTPoison.AsyncResponse{id: id}=resp->
  receive do
    %HTTPoison.AsyncStatus{id: ^id, code: code}->
      IO.inspect(code, label: "STATUS: ")
      HTTPoison.stream_next(resp)
      {[], resp}
    %HTTPoison.AsyncHeaders{id: ^id, headers: headers}->
      IO.inspect(headers, label: "HEADERS: ")
      HTTPoison.stream_next(resp)
      {[], resp}
    %HTTPoison.AsyncChunk{id: ^id, chunk: chunk}->
      HTTPoison.stream_next(resp)
      {[chunk], resp}
    %HTTPoison.AsyncEnd{id: ^id}->
      {:halt, resp}
  end
end

%HTTPoison.AsyncStatus{}
We’ve seen how we handle this message. Instead of halting the stream we now request the next message using HTTPoison.stream_next(resp) and, since at this stage we don’t have any data to emit, we return a {[], resp} tuple. resp is the accumulator, which is then passed to next_fun the next time is called.

%HTTPoison.AsyncHeaders{}
Similar of what we’ve done for the status code, we print the headers, we ask HTTPoison to send the next message to our the process mailbox and we return a tuple with an empty list, since there is no data we need to emit.

%HTTPoison.AsyncChunk{}
These are the messages containing the actual response’s body, divided in chunks. One message for each small chunk. Like previously, we ask for the next message but this time we emit the chunk returning {[chunk], resp}.

%HTTPoison.AsyncEnd{}
We receive this message when reached the end of the HTTP response. It’s now time to halt the enumeration.

Time for a first ride 🏎

iex> HTTPStream.get(large_tiff_image_url)
#Function<55.131689479/2 in Stream.resource/3>

At first, we see HTTPStream.get(url) returns a stream and no request is run at the moment.

Let’s also start the Erlang observer to monitor the allocated memory.

iex> :observer.start

iex> large_tiff_image_url \
...> HTTPStream.get() \
...> |> Stream.into(File.stream!("image.tif"))
...> |> Stream.run

STATUS: : 200
HEADERS: : [
  {"Server", "nginx/1.13.7"},
  {"Content-Type", "image/tiff"},
  {"Content-Length", "214777324"},
  ...
]
:ok

This time we have are just interested on writing to a file, so we use Stream.run at the end to run the pipeline. All the emitted chunks are caught by Stream.into and written to “image.tif”.

We also see on the observer that the allocated memory by the download stream is minimal.

Compression? Just a line of code

And if we want to compress the file while downloading it? Do we have to change the whole implementation? No!

large_tiff_image_url
|> HTTPStream.get()
|> StreamGzip.gzip()
|> Stream.into(File.stream!("image.tif.gz"))
|> Stream.run

Thanks to streams high composability, we just need to add a new stage into the pipeline. Using the StreamGzip library, we compress the chunks coming from HTTPStream.get and save them into “image.tif.gz”.

What’s next? Text processing!

We have seen how the approach of wrapping HTTPoison with an Elixir stream brings many advantages. In this part we’ve just seen how to download a binary file, compress it and save it locally.

In the next part we will see how to refactor our implementation to treat lines of text, instead of just chunks, so we can process huge text files on the fly avoiding to impact memory.

Are you passionate about Elixir and Phoenix? Subscribe to the Poeticoding newsletter and join happy regular readers and receive new posts by email.

Top comments (0)