DEV Community

Mario Carrion
Mario Carrion

Posted on • Updated on • Originally published at mariocarrion.com

Complex Pipelines in Go (Part 2): Storing Values in Batches

Storing Values in Batches

In part 1 we introduced the 3 processes meant to handle everything required for our final tool to work. In this post we will focus on the Persistent Storage Process, specifically a component used for Storing Values in Batches in PostgreSQL.

The idea behind working in batches is to speed up the loading of records while avoiding overwhelming our primary database, specially when inserting large number of values. Deciding the number of records to batch before copying them over depends on different factors that should be thought thoroughly; a few examples of those factors include: record size, memory available on instance processing events, or network capacity allocated for data transfer, to mention a few.

For our example we will use some arbitrary numbers for default values, but consider measuring your events before making this decision in production.

Minimum Requirements

All the code relevant to this post is on Github, feel free to explore it for more details, the following is the minimum required for running the example:

  • PostgreSQL 12.3: in theory any recent version should work, the README.md includes specific instructions for running it with Docker, and
  • Go 1.14

And the following Go Packages:

name is a Go struct type equivalent to the data structure we decided to use in Part 1, it is defined as:

type name struct {
    NConst             string
    PrimaryName        string
    BirthYear          string
    DeathYear          string
    PrimaryProfessions []string
    KnownForTitles     []string
}
Enter fullscreen mode Exit fullscreen mode

Batching in PostgreSQL

PostgreSQL has a SQL command called COPY which is the best and most efficient way to insert a huge number of records into tables. For our Go code we will be using the pgx implementation of this command via the Conn type, specifically the CopyFrom method:

func (c *Conn) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error)
Enter fullscreen mode Exit fullscreen mode

This methods allows receiving a value, in rowSrc, implementing the CopyFromSource interface type. We will implement a new type that satisfies this interface for avoiding buffering all the data in memory. Recall our end goal is to build a pipeline that accomplishes all our steps as a stream of data flowing from one step to the other. We will call this type copyFromSource.

Introducing "copyFromSource"

This type is defined as:

type copyFromSource struct {
    errorC  <-chan error
    namesC  <-chan name
    err     error
    closed  bool
    current name
}
Enter fullscreen mode Exit fullscreen mode

Two really important things to understand clearly about this type is the two fields using receiving channels. Both of them are used for communicating with the upstream process, which is in charge of streaming records downstream to us (to this type copyFromSource):

  • errorC <-chan error: used to indicate when an error happened, and
  • namesC <-chan name: used to receive the events to eventually copy into the database.

With that in mind, the other important thing to understand is the implementation of Next. Specifically the select block, which we use to block until we receive a value from either channel:

func (c *copyFromSource) Next() bool {
    if c.closed {
        return false
    }

    var open bool

    select {
    case c.current, open = <-c.namesC:
    case c.err = <-c.errorC:
    }

    if !open {
        c.closed = true
        return false
    }

    if c.err != nil {
        return false
    }

    return true
}
Enter fullscreen mode Exit fullscreen mode

In the end copyFromSource is implemented as a building block to accomplish two things:

  1. To satisfy the pqgx.CopyFromSource interface type, and
  2. To use it in conjunction with another type to properly coordinate this batching, that type is called copyFromSourceMediator.

Introducing "copyFromSourceMediator"

This type is defined as:

type copyFromSourceMediator struct {
    namesC chan name
    errorC chan error
    copier *copyFromSource
}
Enter fullscreen mode Exit fullscreen mode

Similarly, this type implements two channels, the biggest difference is that in this case copyFromSourceMediator uses both channels to send values to copyFromSource, which in the end is the type we are mediating here, all of this is much more clearer if we look at the constructor:

func newCopyFromSourceMediator(conn *pgx.Conn) (*copyFromSourceMediator, <-chan error) {
    errorC := make(chan error)
    namesC := make(chan name)

    copier := newCopyFromSource(namesC, errorC)

    res := copyFromSourceMediator{
        namesC: namesC,
        errorC: errorC,
        copier: copier,
    }

    outErrorC := make(chan error)

    go func() {
        defer close(outErrorC)

        _, err := conn.CopyFrom(context.Background(),
            pgx.Identifier{"names"},
            []string{
                "nconst",
                "primary_name",
                "birth_year",
                "death_year",
                "primary_professions",
                "known_for_titles",
            },
            copier)

        outErrorC <- err
    }()

    return &res, outErrorC
}
Enter fullscreen mode Exit fullscreen mode

This constructor is really the one interacting with the database for copying all the received values. So how we indicate the actual batch size? That logic is going to be handled by our last type batcher.

Introducing "batcher"

This type is defined as:

type batcher struct {
    conn *pgx.Conn
    size int
}
Enter fullscreen mode Exit fullscreen mode

And in the end it is the one in charge of using the other two types behind the scenes for accomplish our goal for this process. The meat of batcher is in the method Copy:

func (b *batcher) Copy(ctx context.Context, namesC <-chan name) <-chan error {
    outErrC := make(chan error)

    go func() {
        mediator, errorC := newCopyFromSourceMediator(b.conn)

        copyAll := func(m *copyFromSourceMediator, c <-chan error) error {
            m.CopyAll()
            return <-c
        }

        defer func() {
            if err := copyAll(mediator, errorC); err != nil {
                outErrC <- err
            }

            close(outErrC)
        }()

        var index int

        for {
            select {
            case name, open := <-namesC:
                if !open {
                    return
                }

                mediator.Batch(name)
                index++

                if index == b.size {
                    if err := copyAll(mediator, errorC); err != nil {
                        outErrC <- err
                    }

                    mediator, errorC = newCopyFromSourceMediator(b.conn)
                    index = 0
                }
            case err := <-errorC:
                outErrC <- err
            case <-ctx.Done():
                if err := ctx.Err(); err != nil {
                    mediator.Err(err)
                    outErrC <- err
                }
            }
        }
    }()

    return outErrC
}
Enter fullscreen mode Exit fullscreen mode

Similarly to the other two types, in Copy we use a goroutine, channels and the select block to coordinate all the messages we are receiving as well as to when to indicate it's time to batch a collection of records.

What's next?

The next blog post will cover the implementation of the TSV parser, and as we progress in the series we will continuously connect all the pieces together to eventually complete our final tool.

Top comments (0)