DEV Community

Mihai Chiorean
Mihai Chiorean

Posted on • Updated on

Querying CSV files in AWS S3 from Go, using SQL

tl;dr

S3 Select allows you to query CSV data in a CSV/JSON file stored in S3 using SQL queries.

I'm going to present an example of how I did this in Go, since I had a hard time finding clear examples while trying to work on it.

Update: now with command line tool

Benefits

  • only the selected records need to come over the wire, not the whole file
  • no need to keep some datastore in sync with the latest csv to query the information
  • reduction in code complexity on the consumer end
  • can arguably be done sync, just like calling any other external API **

S3 Select in Golang

In this example I used a simple CSV I found online as an example.
I've structured it how I would in an application: a package that wraps the s3 specific code and provides some interface.

.
├── FL_insurance_sample.csv
├── README.md
├── client
│   └── csvdb
│       ├── client.go
│       └── client_test.go
├── config.yaml
├── go.mod
├── go.sum
├── main.go
└── vendor
Enter fullscreen mode Exit fullscreen mode

First, the imports:

 3 import (
       ....
 7
 8     "github.com/aws/aws-sdk-go/aws"
 9     "github.com/aws/aws-sdk-go/service/s3"
10     "github.com/aws/aws-sdk-go/service/s3/s3iface"
       ....
12 )

Enter fullscreen mode Exit fullscreen mode

You'll notice this uses the first version of the Go AWS SDK. There is a v2, however, at the time of this writing it was still v0.x.x.

Next, I put the client struct together. It's fairly basic - wraps the S3 API interface and the bucket + key of the resource it will access.

32 // Client represents the struct used to make queries to an s3 csv
33 type Client struct {
34     s3iface.S3API
35     // S3 path - prefix + file/resource name
36     key    string
37     bucket string
38 }
39
40 // NewClient instantiates a new client struct
41 func NewClient(s s3iface.S3API, bucket, key string) *Client {
42     return &Client{
43         S3API:  s,
44         bucket: bucket,
45         key:    key,
46     }
47 }
Enter fullscreen mode Exit fullscreen mode

Now let's move on to the interesting bits. First, we need to create the input for the S3 query

65     req := &s3.SelectObjectContentInput{
66         Bucket:         aws.String(c.bucket),
67         Key:            aws.String(c.key),
68         Expression:     aws.String(q),
69         ExpressionType: aws.String("SQL"),
70         InputSerialization: &s3.InputSerialization{
71             CSV: &s3.CSVInput{
72                 // query using header names. This is a choice for this example
73                 // many csv files do not have a header row; In that case,
74                 // this property would not be needed and the "filters" would be
75                 // on the column index (e.g. _1, _2, _3...)
76                 FileHeaderInfo: aws.String("Use"),
77             },
78         },
79     }
Enter fullscreen mode Exit fullscreen mode

A few things here:

  • Expression: is the actual SQL query which looks something like SELECT * FROM s3object s WHERE s.column = 'value' (Read more here
  • InputSerialization can be CSV or JSON. I'm focusing on CSV for this example.
  • FileHeaderInfo should be set to "Use" if you plan on using column names (header) in your CSV. Otherwise you'd have to use s._1, s._2 in your query - address the column by it's index.
  • the AWS Go SDK uses *string a lot. They've made a convenience function to deal with that. Hence the use of aws.String(...)

Next, let's set up the response serialization for this request:

82     req = req.SetOutputSerialization(&s3.OutputSerialization{
83         JSON: &s3.JSONOutput{},
84     })
Enter fullscreen mode Exit fullscreen mode

Similar to the input, both CSV and JSON can be used. I've used json here because it's easy to parse and populate a struct in Go. I've not done csv parsing in Go to be honest.

Ready to call S3 now.

85     out, err := c.SelectObjectContentWithContext(ctx, req)
86     if err != nil {
87         return nil, err
88     }
89
90     stream := out.GetEventStream()
91     defer stream.Close()
Enter fullscreen mode Exit fullscreen mode

The output of this call exposes a stream of events that we need to read the resulting data. This stream is implemented using a channel and sends some extra data apart from the actual rows themselves. The types of events exposed are:

  • ContinuationEvent - not sure what this is
  • EndEvent - indicates no more messages will be sent; request is completed
  • ProgressEvent - data about the progress of an operation
  • RecordsEvent - the actual rows
  • StatsEvent - stats about the operation: total bytes, bytes processed etc

Read more here

I'm going to be using just the RecordsEvent only here, to keep this on point. However you might want to consider looking at the other events too (for example the EndEvent) when building your application.

 93     rows := []*Row{}
 94     for v := range stream.Events() {
 95         if err := stream.Err(); err != nil {
 96             return nil, err
 97         } 
 98
 99         switch v.(type) {
100         case *s3.RecordsEvent:
101             rec, _ := v.(*s3.RecordsEvent)
102             var row Row
103             if err := json.Unmarshal(rec.Payload, &row); err != nil {
104                 return nil, errors.Wrapf(err, "unable to parse json: %s", string(rec.Payload))
105             }
106             rows = append(rows, &row)
107         default:
108         }
109     }
Enter fullscreen mode Exit fullscreen mode

As events are read from the channel, check for errors and return. Otherwise, switch based on the underlying event type. We're only looking at records here, so there is just 1 case. Convert the interface value to the underlying RecordsEvent type and parse the payload.

Here's what the Row type looks like - a subset of fields (columns) from the CSV with the column name as their json tag.

26 type Row struct {
27     ID        string `json:"policyID"`
28     StateCode string `json:"statecode"`
29     County    string `json:"county"`
30 }
Enter fullscreen mode Exit fullscreen mode

Testing considerations

When unit testing this example, I ran into some issues.
My intuition told me I should create a mock of s3iface.S3API and mock the response from the call to SelectObjectContentWithContext. However this didn't work as expected.

The s3.SelectObjectContentOutput contains a SelectObjectContentEventStream which does not have a factory method that I could find. It has some private fields. Instantiating it when mocking proved to be problematic because those private fields are nil and are used in the calls .Err() and .Close().

The route I took is to use net/http/httptest to "mock" the S3 backend instead. I created an http test server with a generic handler and pointed the AWS/S3 session make requests to this.

Update

Once finishing this post I realized that it might be useful to have an easy way to use this example to try queries on a specific CSV. The repo now has a main and you can build a command line tool to run some queries.

Install

git clone https://github.com/mihai-chiorean/s3-select-example.git && cd s3-select-example/cmd/s3ql

go install
Enter fullscreen mode Exit fullscreen mode

Usage

~/w/s3-select-example ❯❯❯ s3ql --help
A small cli to run/test sql queries against a CSV in AWS S3

Usage:
  s3ql [flags]

Flags:
  -b, --bucket string   S3 bucket where the data resides
  -h, --help            help for s3ql
  -k, --key string      The S3 resource - prefix/key - of the CSV file
  -r, --region string   The AWS region where the bucket is
  -t, --use-headers     Tells S3 Select that the csv has a header row and to use it in the query. (default true)
Enter fullscreen mode Exit fullscreen mode
s3ql --bucket <bucket> --key FL_insurance_sample.csv --region us-east-2 select \* from s3object s where s.statecode = \'FL\'
Enter fullscreen mode Exit fullscreen mode

Will print out, line by line, the json output of each matched record in the CSV.

References

Top comments (0)