DEV Community

Viacheslav Poturaev
Viacheslav Poturaev

Posted on • Updated on

Streaming generated data as io.Reader at high speed in Go

When you want to benchmark a piece of code that processes io.Reader, it is common to use io.Pipe to stream generated data with an extra io.Writer.

Take a look at this trivial example benchmark.

func Benchmark_ioPipe(b *testing.B) {
    r, w := io.Pipe()
    i := 0

    go func() {
        for {
            i++

            if i == b.N {
                _ = w.Close()

                return
            }

            _, _ = w.Write([]byte("<this might be a dynamic piece of generated data>\n"))
        }
    }()

    b.ReportAllocs()

    // io.Copy acts as a dummy processor here, in real-world 
    // scenario you'd have an actual io.Reader consumer.
    if _, err := io.Copy(io.Discard, r); err != nil {
        b.Fatal(err.Error())
    }
}
Enter fullscreen mode Exit fullscreen mode
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
Benchmark_ioPipe-12      1655488           729.8 ns/op        64 B/op          1 allocs/op
Enter fullscreen mode Exit fullscreen mode

Performance of such feed is not bad, but not very impressive either. Some speed is sacrificed to enable data safety under concurrency, both reads and writes can be called in parallel with synchronization happening in the pipe.

Let's try to improve performance for a particular case of generated feed exposed as io.Reader, that does not need external synchronization.

type pagesReader struct {
    // next returns contents of the next page, 
    // io.EOF error indicates last page.
    next func() ([]byte, error)

    // buf keeps the data to be read.
    buf []byte
}

func (r *pagesReader) Read(p []byte) (n int, err error) {
    // Fill the reader buffer with pages 
    // until it exceeds the incoming buffer 
    // or reaches the end of pages.
    for len(r.buf) < len(p) {
        page, err := r.next()
        r.buf = append(r.buf, page...)
        if err != nil {
            copy(p, r.buf)
            return len(r.buf), err
        }
    }

    // Put head of reader buffer in the incoming buffer.
    copy(p, r.buf)

    // Move remaining tail into the head of reader buffer.
    remaining := r.buf[len(p):]
    r.buf = r.buf[:len(remaining)]
    copy(r.buf, remaining)

    return len(p), nil
}
Enter fullscreen mode Exit fullscreen mode

pagesReader implements io.Reader that feeds data from user-defined callback next func() ([]byte, error).

Now, let's update the previous benchmark to use pagesReader instead of io.Pipe.

func Benchmark_pagesReader(b *testing.B) {
    i := 0
    r := &pagesReader{
        next: func() ([]byte, error) {
            i++

            if i == b.N {
                return nil, io.EOF
            }

            return []byte("<this might be a dynamic piece of generated data>\n"), nil
        },
    }

    b.ReportAllocs()

    // io.Copy acts as a dummy processor here, in real-world
    // scenario you'd have an actual io.Reader consumer.
    if _, err := io.Copy(io.Discard, r); err != nil {
        b.Fatal(err.Error())
    }
}
Enter fullscreen mode Exit fullscreen mode
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
Benchmark_pagesReader-12        21766588            52.38 ns/op       64 B/op          1 allocs/op
Enter fullscreen mode Exit fullscreen mode

The unsynchronized implementation is much faster (about 14x speed) and might be more suitable for benchmarks for lower impact on overall result.

When synchronization is needed, it can be managed with a mutex in the next function itself.

Top comments (0)