Just before we start I'd like to tell you this article contains the info I was able to understand about go channels and how you can use it in a good way. so if you find any mistake please leave a comment with it :).
What is Go Channels?
Golang docs say:
Channels are the pipes that connect concurrent goroutines. You can send values into channels from one goroutine and receive those values into another goroutine.
so it's like a simple pipe. you send to it from one side and receive from it from its other side. and with goroutines, you can make it async as I will show you the email queue next.
How to use Channels in Go
package main
import (
"fmt"
)
func main() {
// Create the channel
messages := make(chan string)
// run a go routine to send the message
go func() { messages <- "ping" }()
// when you receive the message store it in msg variable
msg := <-messages
// print received message
fmt.Println(msg)
}
- Line 1: we created the channel using
make
- Line 2: we send the message through a goroutine
- Line 3: we receive the messages and store it in
msg
variable - Line 4: we print the message
so at line 2, we used a goroutine, without it, the code won't work as there are no listeners and we call that a deadlock
messages := make(chan string)
messages <- "ping"
msg := <-messages
fmt.Println(msg)
Output
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/tmp/sandbox676902258/prog.go:11 +0x60
that happens because it's not clear for Go to understand where it should store the message if there are no listeners to send the message directly to them. so a simple solution is to use Buffers
Chanel Buffer
go allows us to set the buffer to a channel by using make(chan string,1)
, the second parameter means that we tell go to create the channel with a buffer of 1 string
messages := make(chan string,1)
messages <- "ping"
msg := <-messages
fmt.Println(msg)
Now if we try to send 2 messages before the initializing the listener it will throw a deadlock, because for the second message there is no place to store it, also we don't have a listener to send it directly
messages <- "ping"
messages <- "pong"
msg := <-messages
fmt.Println(msg)
Best design to use channels for background workers
so how we deal with channels? do we send all messages with goroutines? what about dropping all channels and just open a goroutine with every job?
all these questions we are going to answer them in the next points
a simple queue worker diagram can be like
simply we need to create a Dispatcher
, a dispatcher opens X number of workers based on the requirements and these workers will receive the messages and handles them as we will see.
by Workers I mean channel listeners
Create a Dispatcher
//JobQueue ... a buffered channel that we can send work requests on.
var JobQueue chan Queuable
//Queuable ... interface of Queuable Job
type Queuable interface {
Handle() error
}
//Dispatcher ... worker dispatcher
type Dispatcher struct {
maxWorkers int
WorkerPool chan chan Queuable
Workers []Worker
}
The Dispatcher has 3 main properties
- Max worker: how many workers this dispatcher owns
- Worker Pool: we register all workers in that pool so the dispatcher can pull one of them every time to send the message to it
- Workers: contains all workers to talk to anyone of them like close a specific
Now let's create the dispatcher creator
//NewDispatcher ... creates new queue dispatcher
func NewDispatcher(maxWorkers int) *Dispatcher {
// make job bucket
if JobQueue == nil {
JobQueue = make(chan Queuable, 10)
}
pool := make(chan chan Queuable, maxWorkers)
return &Dispatcher{WorkerPool: pool, maxWorkers: maxWorkers}
}
The dispatcher now needs Run method to start his job
//Run ... starts work of dispatcher and creates the workers
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
// register in dispatcher's workers
d.Workers = append(d.Workers, worker)
}
go d.dispatch()
}
the end of Run method starts dispatch of the Dispatcher
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Queuable) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
dispatch runs an infinite loop, with every new message(job) it will pull one of the workers and send the message to that worker to handle it.
Create Worker
//Worker … simple worker that handles queueable tasks
type Worker struct {
Name string
WorkerPool chan chan Queuable
JobChannel chan Queuable
quit chan bool
}
- Name: worker name so we can see it in logs
- WorkerPool: the pool of the dispatcher to register itself to it
- JobChannel: The messages(jobs) the worker receives from the dispatcher
- quit: to close the worker. it helps for autoscale.
So how worker runs after the dispatcher sends the messages to it
//Start ... initiate worker to start listening for upcoming queueable jobs
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Handle(); err != nil {
fmt.Printf("Error in job: %s\n", err.Error())
}
}
}
}()
}
so Start method starts an infinite loop, the first thing the worker registers itself to the WorkerPool of the dispatcher. then it listens for any message within the select
scope. once it receives the message it will call handle()
method to process this channel.
a simple scenario
- Dispatcher started with 2 workers
- The first worker registers itself and waiting for any messages
- The second worker registers itself and waiting for any messages
- The dispatcher receives a message and pulls the first worker from the pool as it was the first registering itself.
- Now the pool has the second worker only. because the first one has been pulled already to serve the first message.
- a new message came to the dispatcher. dispatcher pulls from the pool the second worker to server the message.
- now let's say the second worker finished handling and the first one still handling the job for any reason.
- the second worker registers itself again to the pool.
- the first worker finished handling and register itself again to the pool.
- Now the pool has 2 workers and will do the same at step 4.
Implement the logic with an email service
Let's say in our application we want to send welcome emails to new users.
type Email struct {
To string `json:"to"`
From string `json:"from"`
Subject string `json:"subject"`
Content string `json:"content"`
}
func (e Email) Handle() error {
r := rand.Intn(200)
time.Sleep(time.Duration(r) * time.Millisecond)
return nil
}
let's assume the handle method will call a third-party like SendGrid to send the email.
//EmailService ... email service
type EmailService struct {
Queue chan queue.Queuable
}
//NewEmailService ... returns email service to send emails :D
func NewEmailService(q chan queue.Queuable) *EmailService {
service := &EmailService{
Queue: q,
}
return service
}
func (s EmailService) Send(e Email) {
s.Queue <- e
}
The EmailService now has a queueable channel and Send method to send the email
Now our main.go file can be something like
var QueueDispatcher *Dispatcher
func main() {
QueueDispatcher = NewDispatcher(4)
QueueDispatcher.Run()
mailService = emails.NewEmailService(JobQueue)
r := gin.Default()
r.GET("/email", sendEmailHandler)
return r
}
func sendEmailHandler(c *gin.Context) {
emailTo := c.Query("to")
emailFrom := c.Query("from")
emailSubject := c.Query("subject")
emailContent := c.Query("content")
email := emails.Email{
To: emailTo,
From: emailFrom,
Subject: emailSubject,
Content: emailContent,
}
mailService.Send(email)
c.String(200, "Email will be sent soon :)")
}
In the next chapter (during next 2 days), we will use Grafana & Prometheus to monitor workers and processing time and get some beautiful metrics for the application.
Top comments (0)