In this article, we'll explore how to manage the state of jobs in a Worker Pool in Go while leveraging the power of Prometheus and Grafana for monitoring and visualization.
Introduction
Worker Pools are a common concurrency pattern used in software development to efficiently process tasks or jobs concurrently. Monitoring the state of these worker pools is crucial for ensuring the health and performance of your application. Prometheus and Grafana are excellent tools for this purpose.
Prerequisites
Before we dive into the code, make sure you have Prometheus and Grafana set up and running. If you haven't already, follow the official documentation for installation and configuration.
Setting Up Prometheus Metrics
To begin, let's instrument our Go application to expose Prometheus metrics. We'll create a simple worker pool and monitor the number of jobs in progress.
// Import necessary packages
import (
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Define Prometheus metrics
var (
jobQueue = promauto.NewGauge(prometheus.GaugeOpts{Name: "workerpool_jobs_queued", Help: "Number of jobs currently in the queue"})
activeWorkers = promauto.NewGauge(prometheus.GaugeOpts{Name: "workerpool_active_workers", Help: "Number of currently active workers"})
jobState = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "job_state", Help: "The state of the job"}, []string{"jobID", "state"})
jobProcessingDuration = promauto.NewHistogram(prometheus.HistogramOpts{Name: "job_processing_duration_seconds", Help: "The duration it takes to process a job", Buckets: prometheus.LinearBuckets(1, 1, 10)})
)
// Job represents a unit of work
type Job struct {
ID string
Data string
State string // states can be "Queued", "Processing", "Completed"
}
func main() {
// Register Prometheus metrics
prometheus.MustRegister(jobsInProgress)
<span class="c">// Start a web server to expose metrics</span>
<span class="n">http</span><span class="o">.</span><span class="n">Handle</span><span class="p">(</span><span class="s">"/metrics"</span><span class="p">,</span> <span class="n">promhttp</span><span class="o">.</span><span class="n">Handler</span><span class="p">())</span>
<span class="k">go</span> <span class="k">func</span><span class="p">()</span> <span class="p">{</span>
<span class="n">http</span><span class="o">.</span><span class="n">ListenAndServe</span><span class="p">(</span><span class="s">":8080"</span><span class="p">,</span> <span class="no">nil</span><span class="p">)</span>
<span class="p">}()</span>
<span class="c">// Your worker pool logic goes here</span>
<span class="c">// Don't forget to update jobsInProgress as jobs are processed</span>
<span class="c">// Example:</span>
<span class="c">// Assign jobs using goroutines</span>
<span class="k">for</span> <span class="n">i</span> <span class="o">:=</span> <span class="m">1</span><span class="p">;</span> <span class="n">i</span> <span class="o"><=</span> <span class="n">numJobs</span><span class="p">;</span> <span class="n">i</span><span class="o">++</span> <span class="p">{</span>
<span class="k">go</span> <span class="n">assignJob</span><span class="p">(</span><span class="n">i</span><span class="p">,</span> <span class="n">jobs</span><span class="p">,</span> <span class="n">done</span><span class="p">,</span> <span class="o">&</span><span class="n">wg</span><span class="p">)</span>
<span class="p">}</span>
}
// assignJob creates a new job, sends it for processing, and waits for it to complete
func assignJob(i int, jobs chan<- Job, done <-chan Job, wg *sync.WaitGroup) {
defer wg.Done()
<span class="n">jobID</span> <span class="o">:=</span> <span class="n">fmt</span><span class="o">.</span><span class="n">Sprintf</span><span class="p">(</span><span class="s">"%s-%d"</span><span class="p">,</span> <span class="n">uuid</span><span class="o">.</span><span class="n">New</span><span class="p">()</span><span class="o">.</span><span class="n">String</span><span class="p">(),</span> <span class="n">i</span><span class="p">)</span>
<span class="n">job</span> <span class="o">:=</span> <span class="o">&</span><span class="n">Job</span><span class="p">{</span><span class="n">ID</span><span class="o">:</span> <span class="n">jobID</span><span class="p">,</span> <span class="n">Data</span><span class="o">:</span> <span class="n">fmt</span><span class="o">.</span><span class="n">Sprintf</span><span class="p">(</span><span class="s">"Data_%d"</span><span class="p">,</span> <span class="n">i</span><span class="p">),</span> <span class="n">State</span><span class="o">:</span> <span class="s">"Queued"</span><span class="p">}</span>
<span class="n">updateJobState</span><span class="p">(</span><span class="n">job</span><span class="p">,</span> <span class="s">"Queued"</span><span class="p">)</span>
<span class="n">jobQueue</span><span class="o">.</span><span class="n">Inc</span><span class="p">()</span>
<span class="n">jobs</span> <span class="o"><-</span> <span class="n">job</span>
<span class="o"><-</span><span class="n">done</span>
<span class="n">jobQueue</span><span class="o">.</span><span class="n">Dec</span><span class="p">()</span>
}
Configuring Grafana Dashboards
With Prometheus metrics exposed, we can now create Grafana dashboards to visualize the state of our worker pool. Configure Grafana to use Prometheus as a data source and create panels that display the job_state
metric.
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 2,
"links": [],
"liveNow": false,
"panels": [
{
"datasource": {},
"fieldConfig": {
"defaults": {
"color": {
"fixedColor": "transparent",
"mode": "fixed"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 6,
"x": 0,
"y": 0
},
"id": 5,
"options": {
"colorMode": "background",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"textMode": "auto"
},
"pluginVersion": "10.1.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "e294f615-a82c-43ef-a960-86ba0e5b5b11"
},
"editorMode": "code",
"expr": "count(job_state{state=\"Queued\"})",
"instant": false,
"legendFormat": "auto",
"range": true,
"refId": "A"
}
],
"title": "Total Jobs",
"type": "stat"
},
{
"datasource": {},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 18,
"x": 6,
"y": 0
},
"id": 3,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"textMode": "auto"
},
"pluginVersion": "10.1.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "eba8c540-ab46-4987-9347-6112f4e0a081"
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "avg(rate(job_processing_duration_seconds_sum[5m]) / rate(job_processing_duration_seconds_count[5m]))",
"fullMetaSearch": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "auto",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "Time average",
"type": "stat"
},
{
"datasource": {
"type": "prometheus",
"uid": "e6f319b5-9c5c-48df-96f7-40fc9b1c189c"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "count(job_state{state=\"Completed\"})"
},
"properties": [
{
"id": "displayName",
"value": "Complete"
}
]
},
{
"matcher": {
"id": "byName",
"options": "count(job_state{state=\"Processing\"} unless on(jobID) (job_state{state=\"Completed\"})) or\nvector(0)"
},
"properties": [
{
"id": "displayName",
"value": "Processing"
},
{
"id": "color",
"value": {
"fixedColor": "blue",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "count(job_state{state=\"Queued\"} unless on(jobID) (job_state{state=\"Processing\"} or job_state{state=\"Completed\"})) or\nvector(0)"
},
"properties": [
{
"id": "displayName",
"value": "Queue"
},
{
"id": "color",
"value": {
"fixedColor": "yellow",
"mode": "fixed"
}
}
]
}
]
},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 8
},
"id": 4,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"textMode": "auto"
},
"pluginVersion": "10.1.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "a93b3831-a11d-4ef3-b5e7-54bc557e33c0"
},
"editorMode": "code",
"expr": "count(job_state{state=\"Queued\"} unless on(jobID) (job_state{state=\"Processing\"} or job_state{state=\"Completed\"})) or\nvector(0)",
"hide": false,
"instant": false,
"legendFormat": "auto",
"range": true,
"refId": "Queue"
},
{
"datasource": {
"type": "prometheus",
"uid": "eba8c540-ab46-4987-9347-6112f4e0a081"
},
"editorMode": "code",
"exemplar": false,
"expr": "count(job_state{state=\"Processing\"} unless on(jobID) (job_state{state=\"Completed\"})) or\nvector(0)",
"hide": false,
"instant": false,
"legendFormat": "auto",
"range": true,
"refId": "C"
},
{
"datasource": {
"type": "prometheus",
"uid": "eba8c540-ab46-4987-9347-6112f4e0a081"
},
"editorMode": "code",
"expr": "count(job_state{state=\"Completed\"})",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "State Jobs",
"type": "stat"
}
],
"refresh": "5s",
"schemaVersion": 38,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-5m",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "New dashboard Copy",
"uid": "ee9cb0d1-2e66-4f16-8dae-166000f285cd",
"version": 1,
"weekStart": ""
}
Logging Worker Pool State
While Prometheus and Grafana are excellent for monitoring, it's often helpful to log the state of your worker pool for debugging and record-keeping purposes. We can use Go's standard logging package for this.
// processJob simulates processing a job by a worker
func worker(id int, jobs <-chan Job, done chan<- Job) {
for job := range jobs {
startTime := time.Now()
<span class="n">activeWorkers</span><span class="o">.</span><span class="n">Inc</span><span class="p">()</span>
<span class="n">updateJobState</span><span class="p">(</span><span class="n">job</span><span class="p">,</span> <span class="s">"Processing"</span><span class="p">)</span>
<span class="c">// Simulating job processing</span>
<span class="n">time</span><span class="o">.</span><span class="n">Sleep</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">Duration</span><span class="p">(</span><span class="n">rand</span><span class="o">.</span><span class="n">Intn</span><span class="p">(</span><span class="m">10000</span><span class="p">)</span><span class="o">+</span><span class="m">100</span><span class="p">)</span> <span class="o">*</span> <span class="n">time</span><span class="o">.</span><span class="n">Millisecond</span><span class="p">)</span>
<span class="n">activeWorkers</span><span class="o">.</span><span class="n">Dec</span><span class="p">()</span>
<span class="n">jobProcessingDuration</span><span class="o">.</span><span class="n">Observe</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">Since</span><span class="p">(</span><span class="n">startTime</span><span class="p">)</span><span class="o">.</span><span class="n">Seconds</span><span class="p">())</span>
<span class="n">updateJobState</span><span class="p">(</span><span class="n">job</span><span class="p">,</span> <span class="s">"Completed"</span><span class="p">)</span>
<span class="n">done</span> <span class="o"><-</span> <span class="n">job</span>
<span class="p">}</span>
}
Result
Conclusion
In this article, we've learned how to manage the state of jobs in a Worker Pool in Go while integrating Prometheus and Grafana for monitoring. We've also seen how to log the state of the worker pool for debugging purposes. Monitoring and logging are crucial for maintaining the reliability and performance of your applications, and these tools make the process much more accessible.
Now, go ahead and apply these techniques to your Go applications to ensure they run smoothly and efficiently, even under heavy workloads.
Happy coding!
repo: https://github.com/ThanhPhucHuynh/go-grafana-prometheus-worker-pool
Top comments (0)