In my last post, I wanted to dig into the remote write code to learn more about how it worked and see if you could use it to simulate a push based agent. This is all just experimental to learn how Prometheus works. I think you can, and I'm working on testing it out, but before I get there, I wanted to explore the other side: that is the prometheus remote-write receiver.
I came up with a few questions I want to answer while exploring the remote write API:
- Where does the code path between scraping a prometheus endpoint and remote write differ?
- In what format does Prometheus write data to disk?
- What exactly in the code makes Prometheus pull based over push based. Can I identify the specific code blocks where algorithms are implemented that will optimize for one over the other?
Web API write endpoint
// Line 364
r.Post("/write", api.ready(api.remoteWrite))
The remote write handler (api.remoteWrite) is created by the following code block.
storage/remote/write_handler.go
// NewWriteHandler creates a http.Handler that accepts remote write requests and
// writes them to the provided appendable.
func NewWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler {
return &writeHandler{
logger: logger,
appendable: appendable
}
}
I guess I'm surprised how quickly we get from remote write endpoint to the appender, it seems like there's no real special buffering on remote write to make it different from the scraper? In the previous article, we traced remote write back to an appender, so write is writing to an appender which may then re-write to another upstream pretty directly. I don't know why Prometheus writing would be more complicated than this, but as an SRE who doesn't write low level code, I'm curious to go back to the question "What makes us say that Prometheus isn't push based" Seems like you could write a push agent pretty easily.
I traced this back quite a bit, but this appendable is injected from the prometheus command code here (cfg.web.storage is passed) fanout has primary and secondary storages. The local storage configured below is the 'primary' storage and remote storage is the 'secondary' storage. Ready Storage is a struct.
var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)
// Line 1287
type readyStorage struct {
mtx sync.RWMutex
db storage.Storage
startTimeMargin int64
stats *tsdb.DBStats
}
https://github.com/prometheus/prometheus/blob/main/cmd/prometheus/main.go#L566
type Appendable interface {
// Appender returns a new appender for the storage. The implementation
// can choose whether or not to use the context, for deadlines or to check
// for errors.
Appender(ctx context.Context) Appender
}
https://github.com/prometheus/prometheus/blob/main/storage/remote/write_handler.go#L86
type Appender interface {
// Append adds a sample pair for the given series.
// An optional series reference can be provided to accelerate calls.
// A series reference number is returned which can be used to add further
// samples to the given series in the same or later transactions.
// Returned reference numbers are ephemeral and may be rejected in calls
// to Append() at any point. Adding the sample via Append() returns a new
// reference number.
// If the reference is 0 it must not be used for caching.
Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error)
// Commit submits the collected samples and purges the batch. If Commit
// returns a non-nil error, it also rolls back all modifications made in
// the appender so far, as Rollback would do. In any case, an Appender
// must not be used anymore after Commit has been called.
Commit() error
// Rollback rolls back all modifications made in the appender so far.
// Appender has to be discarded after rollback.
Rollback() error
ExemplarAppender
HistogramAppender
MetadataUpdater
}
Scrape Code Path:
Prometheus works by scraping endpoints periodically according to a scrape interval. This process is a loop, defined very clearly by the process entitied "mainLoop":
mainLoop:
for {
...
}
Alot of these are really long functions so I'm going to pull out the key pieces.
Calls "Scrape and Report":
https://github.com/prometheus/prometheus/blob/6fc5305ce904a75f56ca762281c7a1b052f19092/scrape/scrape.go#L1264
last = sl.scrapeAndReport(last, scrapeTime, errc)
Calls "Scrape":
https://github.com/prometheus/prometheus/blob/6fc5305ce904a75f56ca762281c7a1b052f19092/scrape/scrape.go#L1340
scrapeCtx, cancel := context.WithTimeout(sl.parentCtx, sl.timeout)
contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf)
#... Line 1364
total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime)
This function is what actually makes the HTTP request to Scrape a Destination:
https://github.com/prometheus/prometheus/blob/6fc5305ce904a75f56ca762281c7a1b052f19092/scrape/scrape.go#L792
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
if s.req == nil {
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return "", err
}
req.Header.Add("Accept", s.acceptHeader)
req.Header.Add("Accept-Encoding", "gzip")
req.Header.Set("User-Agent", UserAgent)
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", strconv.FormatFloat(s.timeout.Seconds(), 'f', -1, 64))
s.req = req
}
resp, err := s.client.Do(s.req.WithContext(ctx))
if err != nil {
return "", err
}
defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()
if resp.StatusCode != http.StatusOK {
return "", errors.Errorf("server returned HTTP status %s", resp.Status)
}
if s.bodySizeLimit <= 0 {
s.bodySizeLimit = math.MaxInt64
}
if resp.Header.Get("Content-Encoding") != "gzip" {
n, err := io.Copy(w, io.LimitReader(resp.Body, s.bodySizeLimit))
if err != nil {
return "", err
}
if n >= s.bodySizeLimit {
targetScrapeExceededBodySizeLimit.Inc()
return "", errBodySizeLimit
}
return resp.Header.Get("Content-Type"), nil
}
if s.gzipr == nil {
s.buf = bufio.NewReader(resp.Body)
s.gzipr, err = gzip.NewReader(s.buf)
if err != nil {
return "", err
}
} else {
s.buf.Reset(resp.Body)
if err = s.gzipr.Reset(s.buf); err != nil {
return "", err
}
}
n, err := io.Copy(w, io.LimitReader(s.gzipr, s.bodySizeLimit))
s.gzipr.Close()
if err != nil {
return "", err
}
if n >= s.bodySizeLimit {
targetScrapeExceededBodySizeLimit.Inc()
return "", errBodySizeLimit
}
return resp.Header.Get("Content-Type"), nil
}
Sets s.buf and s.gzipr:
https://github.com/prometheus/prometheus/blob/6fc5305ce904a75f56ca762281c7a1b052f19092/scrape/scrape.go#L835
Loop through and switch based on datatype:
https://github.com/prometheus/prometheus/blob/6fc5305ce904a75f56ca762281c7a1b052f19092/scrape/scrape.go#L1535
### For example, this switches on whether or not to append a histogram or just append:
if isHistogram {
if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h)
}
} else {
ref, err = app.Append(ref, lset, t, val)
}
This is where it gets back to the same 'Append' function as before, there are a few different types of Appenders, the one mentioned above seems to be a base struct. The appender is assigned by the following function in scrape.go.
// appender returns an appender for ingested samples from the target.
func appender(app storage.Appender, limit int) storage.Appender {
app = &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
}
// The limit is applied after metrics are potentially dropped via relabeling.
if limit > 0 {
app = &limitAppender{
Appender: app,
limit: limit,
}
}
return app
}
So, where does it go from here?
The /tsdb folder contains the code that we use to write Prometheus data to disk. Specifically blockwriter.go has a method called flush
tsdb/blockwriter.go
// Flush implements the Writer interface. This is where actual block writing
// happens. After flush completes, no writes can be done.
func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
mint := w.head.MinTime()
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
maxt := w.head.MaxTime() + 1
level.Info(w.logger).Log("msg", "flushing", "series_count", w.head.NumSeries(), "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt))
compactor, err := NewLeveledCompactor(ctx, nil, w.logger []int64{w.blockSize}, chunkenc.NewPool(), nil)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create leveled compactor")
}
id, err := compactor.Write(w.destinationDir, w.head, mint, maxt, nil)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "compactor write")
}
return id, nil
}
This process takes us to the 'LeveledCompactor` which has a write method:
`
I think I'm close now but oddly can't find the specific method that writes the data files. This block here writes the metadata file, the tombstones and I assume the samples as well:
`go
The above block is very long, but for example, ehre is where it writes metadata and tombesones
if _, err = writeMetaFile(c.logger, tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
// Create an empty tombstones file.
if _, err := tombstones.WriteFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}
`
And that's about it for our overview of the different scrape types. Let's look back at the questions we had before exploring this API:
Where does the code path between scraping a prometheus endpoint and remote write differ?
**
**In what format does Prometheus write data to disk?
According to this, it's the 'tsdb' format: https://github.com/prometheus/prometheus/blob/release-2.41/tsdb/docs/format/README.md
-
What exactly in the code makes Prometheus pull based over push based. Can I identify the specific code blocks where algorithms are implemented that will optimize for one over the other?
- I'm going to save the details of this for the next post, but I think the answer might be... Nothing.
In the next blog post, I'll show my demo of a push-based agent, and summarize my findings from all the posts.
Top comments (0)