DEV Community

sai teja
sai teja

Posted on


Pub Sub model in golang

created a basic pub-sub model in GO
please go thru it and let me know if any mistakes/suggestions etc

package main

import (

var Subscribers map[string]chan interface{}

// init empty subscribers
func init() {
    Subscribers = make(map[string]chan interface{})

type SubscriberClient struct {
    Name string
    Ch   chan interface{}

func main() {

    // register clients to subscribe
    subscriberOne := RegisterClient("one")
    subscriberTwo := RegisterClient("two")

    ctx := context.Background()

    // initiate subscribers 

    // set timeout to exit the subscriber 1
    ctx, cfunc := context.WithTimeout(ctx, time.Second*12)

    // it will send the signal to subscriber to exit
    defer cfunc()
    go subscriberOne.sub(ctx)

    // set timeout to exit the subscriber 2
    ctx, cfunc2 := context.WithTimeout(ctx, time.Second*8)
    defer cfunc2()
    go subscriberTwo.sub(ctx)

    // initiate publisher

    // set timeout to exit the publisher
    ctx, cfunc3 := context.WithTimeout(ctx, time.Second*6)
    defer cfunc3()
    go pub(ctx)

    // i am just keeping ticker to send events only for 6 seconds after that it will exit the main
    // else based on use case publisher can handle this . if you are implementing this in
    // API's then no need for ticker because publisher will run until program exists
    select {
    case <-time.Tick(time.Second * 13):
        fmt.Println("main exiting")

// it will return subscriber client with name and channel to read
func RegisterClient(name string) *SubscriberClient {
    // for async we  use buffered channels
    // for sync we use unbuffered channels
    ch := make(chan interface{}, 4)

    // store it channel in our global producer list
    Subscribers[name] = ch

    // return the channel to subscriber so they can read from it
    return &SubscriberClient{Name: name, Ch: ch}

func pub(ctx context.Context) {
    for {
        select {
        //when to stop condition (publisher need to handle this ...)
        case <-ctx.Done():
            fmt.Println("publisher exiting")

        // it will publish messages every 2 seconds
        // other way we can keep constraints when to publish
        case <-time.Tick(time.Second * 2):

// subscriber
func (s *SubscriberClient) sub(ctx context.Context) {
    for {
        select {
        //when to stop condition (subscriber need to handle this ...)
        case <-ctx.Done():
            fmt.Println(s.Name, "exiting")
        case x := <-s.Ch:
            fmt.Println(x, " value from ", s.Name, " subscriber")

// it will publish all registered subscribers (implemented TC:-O(N)// we can enhance it further....)
func publishToAllConsumers(data interface{}) {
    for k, v := range Subscribers {
        fmt.Println("sending data to ", k)
        v <- data

Enter fullscreen mode Exit fullscreen mode

please follow medium for interesting things

Sai Teja – Medium

Read writing from Sai Teja on Medium. Software Engineer | Fitness trainer | Random Thinker. Every day, Sai Teja and thousands of other voices read, write, and share important stories on Medium.


Top comments (0)

Advice For Junior Developers

Advice from a career of 15+ years for new and beginner developers just getting started on their journey.