DEV Community

Mario Carrion
Mario Carrion

Posted on • Originally published at mariocarrion.com on

Complex Pipelines in Go (Part 5): Putting it all together

Putting it All Together

It's time to connect all the dots and build the final tool. The most important components were already covered in the previous posts but are missing something else: the initial HTTP request meant to be used for downloading the gzip file.

Let's work on that first and then we can put everything together.

Minimum Requirements

All the code relevant to this post is on Github, feel free to explore it for more details, the following is the minimum required for running the example:

  • Go 1.14
  • PostgreSQL 12.3: in theory any recent version should work, the README.md includes specific instructions for running it with Docker.

Downloading the file

To download the file via HTTP we have to use the standard library, specifically the types net/http.Client and compress/gzip.Reader, this is because the file we expect to download is a gzipped one.

For both requirements the following short snippet should cover that:

 // XXX omitting error handling to keep code short
req, _ := http.NewRequest(http.MethodGet, "https://datasets.imdbws.com/name.basics.tsv.gz", nil)

client := &http.Client{
    Timeout: 10 * time.Minute, // XXX: use something reasonable
}

resp, _ := client.Do(req)
 defer resp.Body.Close()

gr, _ := gzip.NewReader(resp.Body)
defer gr.Close()

for {
    line, err := cr.ReadString('\n')
    if err == io.EOF {
        return
    }

   // XXX: do something with the read value!
}
Enter fullscreen mode Exit fullscreen mode

Connecting all the dots

The biggest and most important thing to consider during this integration (of all the previous posts, that is) is how we should handle downstream errors coming from PostgreSQL, specifically the change in batcher, which in the end consolidates a type introduced in part 2 called copyFromSourceMediator this is with the idea of handling errors more closely.

The reason being of this change is the delay between the actual pgx calls (and therefore PostgreSQL) and ours, in practice what this means is that we require sync.Mutex for synchronizing the two goroutines handling sending messages to PostgreSQL and receiving messages from upstream.

See:

func (b *batcher) Copy(ctx context.Context, namesC <-chan name) <-chan error {
    outErrC := make(chan error)
    var mutex sync.Mutex
    var copyFromErr error

    copyFrom := func(batchNamesC <-chan name, batchErrC <-chan error) <-chan error {
        cpOutErrorC := make(chan error)

        go func() {
            defer close(cpOutErrorC)

            copier := newCopyFromSource(batchNamesC, batchErrC)

            _, err := b.conn.CopyFrom(ctx,
                pgx.Identifier{"names"},
                []string{
                    "nconst",
                    "primary_name",
                    "birth_year",
                    "death_year",
                    "primary_professions",
                    "known_for_titles",
                },
                copier)

            if err != nil {
                mutex.Lock()
                copyFromErr = err
                mutex.Unlock()
            }
        }()

        return cpOutErrorC
    }

    go func() {
        batchErrC := make(chan error)
        batchNameC := make(chan name)

        cpOutErrorC := copyFrom(batchNameC, batchErrC)

        defer func() {
            close(batchErrC)
            close(batchNameC)
            close(outErrC)
        }()

        var index int64

        for {
            select {
            case n, open := <-namesC:
                if !open {
                    return
                }

                mutex.Lock()
                if copyFromErr != nil {
                    namesC = nil
                    mutex.Unlock()
                    outErrC <- copyFromErr
                    return
                }
                mutex.Unlock()

                batchNameC <- n

                index++

                if index == b.size {
                    close(batchErrC)
                    close(batchNameC)

                    if err := <-cpOutErrorC; err != nil {
                        outErrC <- err
                        return
                    }

                    batchErrC = make(chan error)
                    batchNameC = make(chan name)

                    cpOutErrorC = copyFrom(batchNameC, batchErrC)
                    index = 0
                }
            case <-ctx.Done():
                if err := ctx.Err(); err != nil {
                    batchErrC <- err
                    outErrC <- err
                    return
                }
            }
        }
    }()

    return outErrC
}
Enter fullscreen mode Exit fullscreen mode

The code looks like a lot but really the important bits are in the variable/function copyFrom which still uses copyFromSource for dealing with events coming from upstream and also uses a mutex to set errors coming from pgx's CopyFrom.

What's next?

This is the last post of the series but I don't think this is the end, I will follow up and improve the existing code in the future. I will answer (at least) the two following questions:

  1. How to provide processing status? How many events left? How many events processed?
  2. How to resume processing events in case of failures?

I will improve this implementation, wait for it.

Top comments (0)