Let's jump right into the task:
You have to ZIP files which are available on the cloud (eg. AWS S3 Bucket).
The first solution you can think of probably downloads each file, process it (add to ZIP), delete a file and afterwards upload final ZIP to cloud. Simple right?
Naive Solution
There is sample code which accomplishes this solution:
var (
downloader *s3manager.Downloader
uploader *s3manager.Uploader
)
func zipS3Files(in []*s3.GetObjectInput, result *s3manager.UploadInput) error {
filesToZip := make([]string, 0, len(in))
// Download each file to local
for _, file := range in {
pathToFile := os.TempDir() + "/" + path.Base(*file.Key)
f, _:= os.Create(pathToFile)
downloader.Download(f, file)
f.Close()
filesToZip = append(filesToZip, pathToFile)
}
// Create file for ZIP
zipFile := os.TempDir() + "/" + path.Base(*result.Key)
f, _:= os.Create(zipFile)
defer f.Close()
zipWriter := zip.NewWriter(f)
for _, file := range filesToZip {
// Create writer for file inside ZIP
w, _:= zipWriter.Create(file)
// Open file which will be zipped
inFile, _:= os.Open(file)
// Actual process (zip) file
io.Copy(w, inFile)
inFile.Close()
}
zipWriter.Close()
// Seek to begin
f.Seek(0, 0)
// Upload zip
result.Body = f
_, err = uploader.Upload(result)
return err
}
I'm not going to discuss more this solution, as I think it's pretty straight forward. Iterates each file, download it to disk, creates zip, again iterates file and add it to zip and finally upload it to the bucket.
Great, it works right? We solve the task. But maybe we can improve it. Like we can use DownloadWithIterator
, but that's out of the scope of this article.
But why do we need to store files and ZIP to disk? It wasn't in requirements. And also we are using disk even when we don't need to. What about if we wanted to run this code inside AWS Lambda? We are limited to 512MB of storage in /tmp
. You can think of, okay, don't use disk, use memory. Yeah, but still we're limited to memory, which in Lambda it could max 3GB, so we just again hit a limit.
Stream solution
What about if we could create a pipe which will stream data from bucket to zip.Writer
and then result in a bucket? No disk involved. How? By using great and simple interfaces io.Reader
and io.Writer
with io.Pipe
. Let's rewrite code.
Firstly, we create a pipe which will be used for passing a file to zip.Writer
and then to uploader
.
pr, pw := io.Pipe()
Then create zip.Writer
from pipe writer.
zipWriter := zip.NewWriter(pw)
Now anything, that writer would write, it will go through a pipe.
Now we iterate each file and creates a writer in zip.Writer
for _, file := range in {
w, _ := zipWriter.Create(path.Base(*file.Key))
Let's look at function signature of download.
func (d Downloader) Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error)
It requires io.WriterAt
, which os.File
satisfy this interface, but zipWriter.Create
returns io.Writer
. AWS SDK is using io.WriterAt
for concurrent download. We can disable this feature by setting
downloader.Concurrency = 1
We create our own struct which will provide the method WriteAt
so it will satisfy interface io.WriterAt
. It will ignore offset and therefore works like just io.Writer
. AWS SDK is Using io.WriterAt
because of concurrent download, so it can write at offset position (e.g. in middle of file). By disabling concurrent download we can safely ignore the offset argument because it will be downloaded sequentially.
type FakeWriterAt struct {
w io.Writer
}
func (fw FakeWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
// ignore 'offset' because we forced sequential downloads
return fw.w.Write(p)
}
Credits to Stackoverflow user CoolAJ86
Now we can download a file by wrapping writer into our FakeWriterAt
.
downloader.Download(FakeWriterAt{w}, file)
After each file is downloaded we need to close our writers
zipWriter.Close()
pw.Close()
This way we are downloading a file to the writer inside zipWriter
, it is processed and then written to the pipe writer.
Now we need upload ZIP back into the bucket. We are writing to the pipe, but nothing is reading from it. We set the body of UploadInput to pipe reader.
result.Body = pr
uploader.Upload(result)
The last step is to run the download and upload in parallel, so immediately when some chunk of data is downloaded and processed it could be uploaded. We run these 2 steps in parallel using go func()..
and synchronize it with wait groups.
This is the final code:
func zipS3Files(in []*s3.GetObjectInput, result *s3manager.UploadInput) error {
// Create pipe
pr, pw := io.Pipe()
// Create zip.Write which will writes to pipes
zipWriter := zip.NewWriter(pw)
wg := sync.WaitGroup{}
// Wait for downloader and uploader
wg.Add(2)
// Run 'downloader'
go func() {
// We need to close our zip.Writer and also pipe writer
// zip.Writer doesn't close underylying writer
defer func() {
wg.Done()
zipWriter.Close()
pw.Close()
}()
for _, file := range in {
// Sequantially downloads each file to writer from zip.Writer
w, err := zipWriter.Create(path.Base(*file.Key))
if err != nil {
fmt.Println(err)
}
_, err = downloader.Download(FakeWriterAt{w}, file)
if err != nil {
fmt.Println(err)
}
}
}()
go func() {
defer wg.Done()
// Upload the file, body is `io.Reader` from pipe
result.Body = pr
_, err := uploader.Upload(result)
if err != nil {
fmt.Println(err)
}
}()
wg.Wait()
return nil
}
As you can see there is no proper error handling, which is out of scope of this article. Also what about when downloader.Download
fails? We want also upload to fail. This is a great use-case when you want to use context
.
We can create context, e.g. with some timeout.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute * 4)
Into download and upload context could be passed. If the download fails, we can call cancel()
and then uploading is cancelled.
Conclusion
Regarding time performance, I tried to process 20MB a 5MB file. With this approach it took 5s and by first simple solution, it took 7s. But there is no disk involved, so you can use this for AWS Lambda, but still, you are limited to 5min runtime.
In the end, I wanted just to point out how great is the standard library designed. Stick to simple interfaces.
Top comments (1)
Is this streaming solution download all files to memory then upload? I hope it download and upload file data one by one.