DEV Community

Dávid Mikuš for FlowUp

Posted on • Updated on

Using io.Reader/io.Writer in Go to stream data

Let's jump right into the task:

You have to ZIP files which are available on the cloud (eg. AWS S3 Bucket).
The first solution you can think of probably downloads each file, process it (add to ZIP), delete a file and afterwards upload final ZIP to cloud. Simple right?

Naive Solution

There is sample code which accomplishes this solution:

var (
    downloader *s3manager.Downloader
    uploader *s3manager.Uploader
)

func zipS3Files(in []*s3.GetObjectInput, result *s3manager.UploadInput) error {
    filesToZip := make([]string, 0, len(in))
    // Download each file to local
    for _, file := range in {
        pathToFile := os.TempDir() + "/" + path.Base(*file.Key)
        f, _:= os.Create(pathToFile)
        downloader.Download(f, file)
        f.Close()
        filesToZip = append(filesToZip, pathToFile)
    }
    // Create file for ZIP
    zipFile := os.TempDir() + "/" + path.Base(*result.Key)
    f, _:= os.Create(zipFile)
    defer f.Close()
    zipWriter := zip.NewWriter(f)
    for _, file := range filesToZip {
        // Create writer for file inside ZIP
        w, _:= zipWriter.Create(file)
        // Open file which will be zipped
        inFile, _:= os.Open(file)
        // Actual process (zip) file
        io.Copy(w, inFile)
        inFile.Close()
    }
    zipWriter.Close()
    // Seek to begin
    f.Seek(0, 0)
    // Upload zip
    result.Body = f
    _, err = uploader.Upload(result)
    return err
}

I'm not going to discuss more this solution, as I think it's pretty straight forward. Iterates each file, download it to disk, creates zip, again iterates file and add it to zip and finally upload it to the bucket.

Great, it works right? We solve the task. But maybe we can improve it. Like we can use DownloadWithIterator, but that's out of the scope of this article.

But why do we need to store files and ZIP to disk? It wasn't in requirements. And also we are using disk even when we don't need to. What about if we wanted to run this code inside AWS Lambda? We are limited to 512MB of storage in /tmp. You can think of, okay, don't use disk, use memory. Yeah, but still we're limited to memory, which in Lambda it could max 3GB, so we just again hit a limit.

Stream solution

What about if we could create a pipe which will stream data from bucket to zip.Writer and then result in a bucket? No disk involved. How? By using great and simple interfaces io.Reader and io.Writer with io.Pipe. Let's rewrite code.

Zip

Firstly, we create a pipe which will be used for passing a file to zip.Writer and then to uploader.

pr, pw := io.Pipe()

Then create zip.Writer from pipe writer.

zipWriter := zip.NewWriter(pw)

Now anything, that writer would write, it will go through a pipe.

Now we iterate each file and creates a writer in zip.Writer

for _, file := range in {
            w, _ := zipWriter.Create(path.Base(*file.Key))

Let's look at function signature of download.

func (d Downloader) Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error)

It requires io.WriterAt, which os.File satisfy this interface, but zipWriter.Create returns io.Writer. AWS SDK is using io.WriterAt for concurrent download. We can disable this feature by setting

downloader.Concurrency = 1

We create our own struct which will provide the method WriteAt so it will satisfy interface io.WriterAt. It will ignore offset and therefore works like just io.Writer. AWS SDK is Using io.WriterAt because of concurrent download, so it can write at offset position (e.g. in middle of file). By disabling concurrent download we can safely ignore the offset argument because it will be downloaded sequentially.

type FakeWriterAt struct {
    w io.Writer
}

func (fw FakeWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
    // ignore 'offset' because we forced sequential downloads
    return fw.w.Write(p)
}

Credits to Stackoverflow user CoolAJ86

Now we can download a file by wrapping writer into our FakeWriterAt.

downloader.Download(FakeWriterAt{w}, file)

After each file is downloaded we need to close our writers

zipWriter.Close()
pw.Close()

This way we are downloading a file to the writer inside zipWriter, it is processed and then written to the pipe writer.

Now we need upload ZIP back into the bucket. We are writing to the pipe, but nothing is reading from it. We set the body of UploadInput to pipe reader.

result.Body = pr
uploader.Upload(result)

The last step is to run the download and upload in parallel, so immediately when some chunk of data is downloaded and processed it could be uploaded. We run these 2 steps in parallel using go func().. and synchronize it with wait groups.

This is the final code:

func zipS3Files(in []*s3.GetObjectInput, result *s3manager.UploadInput) error {
    // Create pipe
    pr, pw := io.Pipe()
    // Create zip.Write which will writes to pipes
    zipWriter := zip.NewWriter(pw)
    wg := sync.WaitGroup{}
    // Wait for downloader and uploader
    wg.Add(2)
    // Run 'downloader'
    go func() {
        // We need to close our zip.Writer and also pipe writer
        // zip.Writer doesn't close underylying writer
        defer func() {
            wg.Done()
            zipWriter.Close()
            pw.Close()
        }()
        for _, file := range in {
            // Sequantially downloads each file to writer from zip.Writer
            w, err := zipWriter.Create(path.Base(*file.Key))
            if err != nil {
                fmt.Println(err)
            }
            _, err = downloader.Download(FakeWriterAt{w}, file)
            if err != nil {
                fmt.Println(err)
            }
        }

    }()
    go func() {
        defer wg.Done()
        // Upload the file, body is `io.Reader` from pipe
        result.Body = pr
        _, err := uploader.Upload(result)
        if err != nil {
            fmt.Println(err)
        }
    }()
    wg.Wait()
    return nil
}

As you can see there is no proper error handling, which is out of scope of this article. Also what about when downloader.Download fails? We want also upload to fail. This is a great use-case when you want to use context.
We can create context, e.g. with some timeout.

ctx, cancel := context.WithTimeout(context.Background(), time.Minute * 4)

Into download and upload context could be passed. If the download fails, we can call cancel() and then uploading is cancelled.

Conclusion

Regarding time performance, I tried to process 20MB a 5MB file. With this approach it took 5s and by first simple solution, it took 7s. But there is no disk involved, so you can use this for AWS Lambda, but still, you are limited to 5min runtime.

In the end, I wanted just to point out how great is the standard library designed. Stick to simple interfaces.

Top comments (1)

Collapse
 
ndlinhceltic profile image
Nguyen Duy Linh

Is this streaming solution download all files to memory then upload? I hope it download and upload file data one by one.