In my project, I have a quite large csv file for seeding my database. Put it is project source code increasing size of docker image but that file only run once. So I decide to upload to S3 and stream/download to server and then run seeding code. In this post I will show you how I do this.
I found some post which stream file using HTTPoison library but in some case I don't want to add more dependencies so I write my own module, and I think it's a good way to learn new things.
What you will learn
You will learn 2 cool function
-
Stream.resource
to build a new stream - And
:hackney.stream_body
to read chunk of data from remote file
1. Build the stream
For Stream.resource
, read full document here
Basically, this function receives 3 function as arguments: one function to setup stream, one for build data stream, and one for handle stream completion. This example from the hexdocs.pm
Stream.resource(
fn -> File.open!("sample") end,
fn file ->
case IO.read(file, :line) do
data when is_binary(data) -> {[data], file}
_ -> {:halt, file}
end
end,
fn file -> File.close(file) end
)
Fisrt function open the file and its result is passed as argument for second function
Second function read data line by line until the end of file
Third function handle close file handler
For downloading file, we do similar
- Open the connection
- Stream parts of file
- Close connection
def stream_url(url) do
Stream.resource(
fn -> begin_download(url) end,
&continue_download/1,
&finish_download/1
)
end
2. Open connection
defp begin_download(url) do
{:ok, _status, headers, client} = :hackney.get(url)
headers = Enum.into(headers, %{})
total_size = headers["Content-Length"] |> String.to_integer()
{client, total_size, 0} # 0 is current downloaded size
end
Here we:
- Use
:hackney.get
to open connection to server - Extract content length from header, this is useful to verify length later
- Return tuple of
{client, total_size, current_download_size}
these data would be used to stream content in the next function
3. Stream chunks
defp continue_download({client, total_size, size}) do
case :hackney.stream_body(client) do
{:ok, data} ->
# update downloaded size
new_size = size + byte_size(data)
{[data], {client, total_size, new_size}}
:done ->
# no more data, tell stream to close
# and move to function 3
{:halt, {client, total_size, size}}
{:error, reason} ->
raise reason
end
end
Here we use :hackney.stream_body
to read data from connection chunk by chunk
4. Close connection
defp finish_download({client, total_size, size}) do
:hackney.close(client)
Logger.debug("Complete download #{size} / #{total_size} bytes")
end
Here we simply close the connection
5. Save to the file
In the above steps, we build a stream of data, now we save it to a file
def download(url, save_path) do
stream_url(url)
|> Stream.into(File.stream!(save_path))
|> Stream.run()
end
Remember to invoke Stream.run
to actually run the stream.
6. Stream by line
In our case, we don't want to store file on our server because we only use it once. So we stream and process file content on the fly. We use csv
library to decode csv content because it supports stream but it only accepts stream of lines.
So here we transform stream of chunk to stream of line
def stream_url(url, :line) do
stream_url(url)
|> Stream.concat([:end]) # to known when the stream end
|> Stream.transform("", fn
:end, prev ->
{[prev], ""}
chunk, prev ->
[last_line | lines] =
String.split(prev <> chunk, "\n")
|> Enum.reverse()
{Enum.reverse(lines), last_line}
end)
end
Details about why to split it like this you can read this post from poeticoding.com
References
Thanks for idea from https://gist.github.com/avdi/7990684
and poeticoding.com that help me to solve my problem.
Thanks for reading and your feedback are warmly welcome.
Top comments (2)
Very nice! I didn't know about
:hackney.stream_body
! I did actually had to figure it out by usingPoison
'sAsync
module. It worked for my case and I created a tiny library for it. Check it out if you are interested!It also uses background processes to execute the download streaming.
github.com/mreigen/chunk_download
Here is the hex link
hex.pm/packages/chunk_download
Thank for sharing your lib. Nice work.