DEV Community

Cover image for Concurrencia en Golang IV
Tomas Francisco Lingotti
Tomas Francisco Lingotti

Posted on • Updated on

Concurrencia en Golang IV

|1. | Modelo de concurrencia
|2. | Goroutines y canales
|3. | Wait Groups
|4. | Select y Worker pools
|5. | ErrGroup (error groups)

Para qué sirve select en Go?

Bueno llegamos a la cuarta entrega, en este caso vamos a esar hablando de la palabra reservada select y una pequeña implementación en un worker pool (vamos a explicar que es un worker pool).

Select es un bloque, tiene su sintaxis y su scope comprende todo lo que se ocupa dentro de sus llaves. Su función es el de poner en espera (o wait) a múltiples comunicacions a la goroutine que se está ejecutando, se va a liberar cuando se cumpla alguna de las condiciones que planteamos dentro de nuestro bloque. En otras palabras, en parte de nuestro código que podemos recibir mas de una rutina, es dónde vamos a usar el select.
La sintaxis básica es:

//...
select {
   //...
}
Enter fullscreen mode Exit fullscreen mode

Nota importate: El select bloquea la rutina que lo contiene, muchas veces vamos a ver que nuevas goroutines lanzan un select, tengan esto muy en cuenta y hagan las pruebas necesarias para saber que no van a bloquear la rutina principal.
Nota importante II: No usen select vacios.

Ahora para agregarle algo interesante a nuestro select, dentro del bloque vamos a estar esperando por mensajes que lleguen a los canales que invoquemos. Funciona parecido a la cláusula switch, pero la condición es quien reciba el mensaje, no hace ningún tipo de pattern matching.

Vamos a una impl real

c1 := make(chan string)
c2 := make(chan string)

// enviamos el primer mensaje.
go func() {
    time.Sleep(1 * time.Second)
    c1 <- "one"
}()
// enviamos el primer mensaje.
go func() {
    time.Sleep(2 * time.Second)
    c2 <- "two"
}()
// con este select esperamos dos veces, uno por cada
// canal. Otra práctica común es tener un for infinito
// en una rutina separada para asi no bloquear a la
// principal.
for i := 0; i < 2; i++ {
    select {
    case msg1 := <-c1:
        fmt.Println("received", msg1)
    case msg2 := <-c2:
        fmt.Println("received", msg2)
    }
}
Enter fullscreen mode Exit fullscreen mode

Aca vemos como en cada case, esperamos leer un mensaje de cada canal. Una vez que ocurra, vamos a salir del bloque.

Ahora, otro ejemplo que nos sirve como un daemon en nuestro servicio, que corra cada X tiempo y eternamente mientras la app este en ejecución.

import "time"
//
//
go func() {
        ticker := time.NewTicker(1 * time.Hour)
        for {
            select {
                case <-ticker.C:
                    //doSomething()
            }
        }
    }()

Enter fullscreen mode Exit fullscreen mode

Este código nos asegura que cada una hora vamos a invocar a lo que nosotros le indiquemos. El ticker es un countdown que nos provee el paquete time, muy útil para estos casos.

Resumen del select

  • Bloquea a la rutina que lo esta ejecutando.
  • Cuando entra en un case, va a anular a los otros hasta que vuelva a ejecutarse el select (por eso es tan comun verlo en un for{} infinito).
  • Suelen ir dentro de una función anónima lanzada en una goroutine.

Worker pools, que son?

sabemos que un pool es una reserva, en este caso de workers. Ahora en nuestro código tenemos que identificar 3 cosas claras. 1) El pool 2) El worker y 3) El trabajo a realizar.
El objetivo va a ser que podamos procesar trabajo de forma concurrente, los worker que terminan su trabajo, van de nuevo al pool y así tratar de ahorrar recursos, además de tratar de trabajar más eficientemente.

Vamos a un poco de go.

type Unit interface {
    Job() error
}
Enter fullscreen mode Exit fullscreen mode

Definimos lo que va a ser nuestra unidad de trabajo. Podríamos crear otras interfaces más y hacerlo mas polimórfico. No nos preocupemos por quien lo implementa, sera trabajo del cliente.

var jobQueue chan Unit // cola de trabajo compartida

type worker struct {
    pool  chan chan Unit
    jobCh chan Unit
}

func newWorker(pool chan chan Unit) *worker {
    return &worker{
        jobCh: make(chan Unit),
        pool:  pool,
    }
}

Enter fullscreen mode Exit fullscreen mode

Acá ya tenemos un worker que va a pertener a un pool y escuchar a un canal de unidades de trabajo. El doble channel para el pool es para el registro y otro para activarlo.
La otra función, simplemete actua de constructor.

type pool struct {
    pool    chan chan Unit
    workers int
}

func NewPool(maxWorkers int) *pool {
    return &pool{
        pool:    make(chan chan Unit, maxWorkers),
        workers: maxWorkers,
    }
}
Enter fullscreen mode Exit fullscreen mode

Ahora tenemos la struct de pool, que contiene el mismo canal que el worker y cuantos workers queremos crear, acompañado de su constructor.

Tenemos el esqueleto, ahora pasamos a la acción con los métodos.

func (d *pool) Run() {
    for i := 0; i < d.workers; i++ {
        w := newWorker(d.pool)
        w.start()
    }

    go d.dispatch()
}
Enter fullscreen mode Exit fullscreen mode

Ya se ve algo interesante, por cada worker en que indicamos, vamos a crear uno, asignale el pool que ya teníamos y arrancarlo, veamos de que se trata.

func (w *worker) start() {
    go func() {
        for {
        // register the actual worker in the queue.
            w.pool <- w.jobCh
            select {
            case job := <-w.jobCh:
                // do the actual job here
                err := job.Job()

                if err != nil {
                    log.Println(err.Error())
                }
            }
        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

Para el método start, vamos a registrar el worker y luego con la cláusula select, vamos a esperar a que llegue un trabajo y realizarlo (sin saber que es, pero no nos importa). Todo esto en una nueva rutina por worker.

func (d *pool) dispatch() {
    go func() {
        for {
            select {
            case job, ok := <-jobQueue:
                if ok {
                    jobChannel := <-d.pool
                    jobChannel <- job
                }
            }
        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

Por último, el dispatcher que va a distribuir la carga de trabajo, cada vez que de la jobQueue, se toma uno disponible del pool y a ese worker (implícito en el job channel) se le asigna el trabajo. La job queue es un recurso compartido.

Código en el cliente

func main() {
    // init the shared data structure.
    splanner.InitQueue(20)

    // init the dispatcher & keep it listening.
    splanner.NewPool(15).Run()

    s := http.Server{}
    s.Addr = ":8080"
    http.HandleFunc("/jobs", JobHandler)
    log.Fatal(s.ListenAndServe())
}

func JobHandler(w http.ResponseWriter, r *http.Request) {
    log.Println("getting job...")
    q := r.URL.Query().Get("q")

    if q == "" {
        q = "default"
    }

    for a := 0; a < 100; a++ {
        work := HeavyWork{Name: q, number: a}
        splanner.AddUnit(&work)
        _, _ = w.Write([]byte(fmt.Sprintf("job %s %d done", work.Name, a)))
    }
}

type HeavyWork struct {
    Name   string `json:"name"`
    number int
}

func (p *HeavyWork) Job() error {
    time.Sleep(500 * time.Millisecond)
    fmt.Println(fmt.Sprintf("heavy job is running %d", p.number))
    return nil
}

Enter fullscreen mode Exit fullscreen mode

Así de simple queda par ael cliente, tiene que implementar la interfaz Unit, con la funcion Job() y agregar la unidad de trabajo con una funcion exportada AddUnit que encola el trabajo. En main vemos el setup es extremadamente trivial.

El codigo completo, disponible en github

Conclusiones

Vemos el poder y flexibilidad que nos da el select para orquestar los workers y combinado con el pool, poder despachar rutinas, obtener métricas, etc etc.
No nos olvidemos que hay que estudiarlo, usarlo debidamente y sobre todo, probarlo en muchos escenarios para no llevarnos sosrpresas no gratas en producción.

Ya saben, si quieren sponsorearme, pueden hacerlo acá!

Top comments (5)

Collapse
 
scovl profile image
Vitor Lobo

¡Excelente artículo! Me gustaría felicitar al autor por abordar este tema de manera tan completa y detallada. Las secciones presentadas, desde el modelo de concurrencia hasta los grupos de error, proporcionan una visión muy completa de cómo trabajar con Goroutines y canales en Go.

Collapse
 
tomaslingotti profile image
Tomas Francisco Lingotti

Muchas gracias! dentro de poco la ultima entrega

Collapse
 
gabrielep profile image
Gabriel • Edited

Muy buen artículo, bien detallado para algo tan útil como el select, para poder manejar de forma más ordenada el flujo de nuestras goroutinas con los channels.

Collapse
 
tomaslingotti profile image
Tomas Francisco Lingotti

Muchas gracias gabi!

Collapse
 
vickq profile image
Victoria

Gracias Tomi, muy bueno!