DEV Community

NDREAN
NDREAN

Posted on • Updated on

Notes on streaming downloads with progress in Elixir

Context

We want the server to download data, ie trigger an HTTP get request to some endpoint. The response should be a stream, and we want to append the chunks into a file. We also want to display the download progress.

We will use the Req library, a superset of Finch, which is itself a superset of Mint.

A few reasons for Req:

  • you can stream the body response,
  • it takes care of reconnection when the connection fails or timeouts (not tested here).
  • it takes care of redirections.

[Front-end note]: an HTTP request made from the browser uses the fetch API which can consume a fetch as a stream. To get a download progress, you can use fetch and set a ReadableStream on the response body (res.body.getReader()). Check this or that. For an upload progress, you need to use XMLHttpRequest. This is the reason why Liveview used it in the uploader snippet.

Code

The code below is "livebookable" 😁 or can be run in an iex session.

========================================================

HTTP streaming with progress using Finch and Req

Mix.install([
  {:finch, "~> 0.18.0"},
  {:req, "~> 0.4.14"}
])

Finch.start_link(name: ExStream.Finch)
Enter fullscreen mode Exit fullscreen mode

Test endpoints

We use two endpoints for testing. The last one includes a redirection.

vid = "https://sample-videos.com/video321/mp4/720/big_buck_bunny_720p_1mb.mp4"
img = "https://source.unsplash.com/QT-l619id6w"
Enter fullscreen mode Exit fullscreen mode

Stream & progress with Req

We start with the Req library.

We start with a download with streams into a file. The code is pretty compact: we pass a callback in the :into option. This callback returns a File.stream to collect the chunks.

defmodule ReqWriteStream do
  def download(url, file_path) do
    Req.get!(url, raw: true, into: File.stream!(file_path, [:write]))
  end
end
Enter fullscreen mode Exit fullscreen mode

We test it:

ReqWriteStream.download(img, "image2.jpg")
Enter fullscreen mode Exit fullscreen mode

We now write a module that displays the progress. We grab the "content-length" with a HEAD request. The body is again streamed via a callback declared with the :into option. This callback writes the chunks into a file, and stores the progress state into the :private key of the struct %Req.Response{}.

defmodule ReqProgressStream do
  def download(url, file_path) do
    [size]  = Map.get(Req.head!(url: url).headers, "content-length", ["0"])
    size = String.to_integer(size)
    file_pid = File.open!(file_path, [:write, :binary])

    func = fn {:data, data}, {req, res} ->
      IO.binwrite(file_pid, data)
      chunk_size = byte_size(data)
      res = Req.Response.update_private(res, :progress, chunk_size, &(&1 + chunk_size))

      if size>0, do:
        {Req.Response.get_private(res, :progress) * 100 / size, chunk_size, size} |> dbg()

      {:cont, {req, res}}
    end

    Req.get!(url: url, raw: true, into: func)

    File.close(file_pid)
  end
end
Enter fullscreen mode Exit fullscreen mode

We test concurrent HTTP calls with Task.async_stream because we use the same function with different arguments.

Task.async_stream(
  [[vid, "video.mp4"], [img, "image.jpg"]],
  &apply(ReqProgressStream, :download, &1),
  timeout: :infinity
)
|> Stream.run()
Enter fullscreen mode Exit fullscreen mode

Stream & progress with Finch

We continue with the Finch library. The code below is adapted from the example coming with stream_while.

We expose a function that takeq an URL and a path. As such, it may be fragile with regards to connection errors.

defmodule FinchStream do
  def download(url, file_path) do
    IO.puts("Starting to process #{inspect(file_path)}...........")

    # Open a file to which binary chunks will be appended to.
    # this process is reset in case of redirection
    file_pid = File.open!(file_path, [:write, :binary])

    unless is_pid(file_pid), do: raise("File creation problem on disk")

    # the HTTP stream request
    Finch.build(:get, url)
    |> Finch.stream_while(ExStream.Finch, nil, fn
      # we put the status in the "acc" to handle redirections
      {:status, status}, _acc ->
        {:cont, status}

      # - when we receive 302, we put the "location" header in the "acc"
      # - when we receive a 200, we put the "content-length" and the file name in the "acc",
      {:headers, headers}, acc ->
        handle_headers(headers, acc)

      # when we receive the "location" tuple, we recurse
      # otherwise, we write the chunk into the file and print out the current progress.
      {:data, data}, acc ->
        handle_data(data, acc, file_path, file_pid)
    end)

    case File.close(file_pid) do
      :ok ->
        {:halt, {file_path, :done}}

      {:error, _reason} ->
        {:halt, :error}
    end
  end

  def handle_headers(headers, status) when status in [301, 302, 303, 307, 308] do
    IO.puts("REDIR: #{status}")

    {:cont, Enum.find(headers, &(elem(&1, 0) == "location"))}
  end

  def handle_headers(headers, 200) do
    {"content-length", size} =
      Enum.find(headers, &(elem(&1, 0) == "content-length"))

    case size do
      nil ->
        {:cont, {0, 0}}

      size  ->
        {:cont, {0, String.to_integer(size)}}
    end
  end

  def handle_headers(_, status) do  
    dbg(status)
    {:halt, :bad_status}
  end

  def handle_data(_data, {"location", location}, file_path, file_pid) do
    if Process.alive?(file_pid), do:
       :ok = File.close(file_pid)

    # recursion
    download(location, file_path)
  end

  def handle_data(data, {processed, size}, file_path, file_pid) do
    case IO.binwrite(file_pid, data) do
      :ok ->
        processed =
          if is_integer(size) and size > 0 do
            (processed + byte_size(data))
            |> tap(fn processed ->
              IO.inspect(Float.round(processed * 100 / size, 1),
                label: "Processed #{inspect(file_path)} %: "
              )
            end)
          else
            processed + byte_size(data)
          end

        {:cont, {processed, size}}

      {:error, reason} ->
        {:error, reason}
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

We test this:

Task.async_stream(
  [[vid, "video.mp4"], [img, "image.jpg"]],
  &apply(FinchStream, :download, &1),
  timeout: :infinity
)
|> Stream.run()
Enter fullscreen mode Exit fullscreen mode

Top comments (0)