Golang channel with multiple receivers [SOLVED]

Introduction

Reading from a single channel shared with multiple goroutines is concurrency in action. Concurrency in Go is the ability for a goroutine(s)(function) to run independent of each other. A goroutine is simply a  lightweight thread  that executes independently. Reading from a single channel using multiple goroutines requires a good understanding of goroutines and how they use channels to communicate with one another. In this article we discuss the below topics,

  1. Sending multiple values into a channel
  2. Reading from a single channel shared by multiple goroutines
  3. Reading from a single channel shared by multiple goroutines while throttling the number of goroutines.

 

Different Go Channel types

There are two types of channels in Go namely buffered and unbuffered channels. Buffered channels are used for asynchronous communication while unbuffered channels are used for synchronous communication.

Advertisement

Example

buffered := make(chan string, 5) // Buffered channel of string type
unBuffered := make(chan string) // Unbuffered channel of string type

 

Sending multiple values into a channel

Unbuffered channels can be used to send into it an unlimited number of values using a for loop. As long as the receiver is ready to receive inbound messages, a sending goroutine will continue sending data. The receiving goroutine can also use a for loop to continuously pull data from a channel. In the next example, we will use a queue to put some data(tasks) into it.

Example

package main
 
import "fmt"
 
type Task struct {
   Name string
}
 
func main() {
   queue := make(chan Task)
 
   go sender(queue)
 
   for task := range queue {
       fmt.Println(task.Name)
   }
 
}
 
func sender(ch chan Task) {
   defer close(ch) // Sender closing a channel
   tasks := []Task{
       {Name: "Review Go Code"},
       {Name: "Write code documentation"},
       {Name: "Meet my mentor"},
       {Name: "Learn Kubernetes"},
       {Name: "Attend Go conference"},
   }
 
   for _, task := range tasks {
       ch <- task
   }
 
}

Output

$ go run main.go
Review Go Code
Write code documentation
Meet my mentor
Learn Kubernetes
Attend Go conference

Explanation

The above example is interesting because it depicts a normal life situation. We define a sender goroutine  that sends multiple tasks to a queue. The main goroutine is responsible for pulling data from the queue using the range statement. Remember the range statement will pull data until the sender closes the channel.

Advertisement

Reading from a single channel shared by multiple goroutines 

This section demonstrates how we can read tasks from a single channel using multiple goroutines.

Example

package main
 
import "fmt"
 
type Task struct {
   Name string
}
 
func main() {
   queue := make(chan Task)
 
   go sender(queue)
 
   for task := range queue {
       go taskHandler(task)
   }
}
 
func sender(ch chan Task) {
   defer close(ch) // Sender closing a channel
   tasks := []Task{
       {Name: "Review Go Code"},
       {Name: "Write code documentation"},
       {Name: "Meet my mentor"},
       {Name: "Learn Kubernetes"},
       {Name: "Attend Go conference"},
   }
 
   for _, task := range tasks {
       ch <- task
   }
}
 
func taskHandler(t Task) {
   size := len(t.Name)
   msg := fmt.Sprintf("Task name size is %d characters", size)
   fmt.Println(msg)
}

Output

$ go run main.go
Task name size is 24 characters
Task name size is 14 characters
Task name size is 16 characters
Task name size is 14 characters

Explanation

In the above example, we define two goroutines namely taskGenerator() and taskHandler(). The taskGenerator()  goroutine takes as an argument the channel it will send data into and loops through an array of tasks. It sends these tasks to a channel using a for range loop.

The taskHandler() goroutine as the name suggests, handles a task that has been passed to it. The taskHandler() goroutine is responsible for getting the size of the task name and prints out a message to the console , each message indicating the size of the task name.

 

Reading from a single channel shared by multiple goroutines while throttling the number of goroutines

In the above example, the number of goroutines running  is determined by the number of tasks in the queue. The number of goroutines is directly proportional to the number of  tasks in the queue channel. This is not good behavior because your computer memory will be over utilized if there are many tasks in the queue. To solve this challenge we limit the number of workers(throttling). In the next example, we limit the number of workers to 3 workers to consume all the tasks in the queue channel.

Example

package main
 
import (
   "fmt"
   "sync"
   "time"
)
 
type Task struct {
   Name string
}
 
func main() {
   var wg sync.WaitGroup
   queue := make(chan Task)
   workers := 3
   tasks := []Task{
       {Name: "Review Go Code"},
       {Name: "Write code documentation"},
       {Name: "Meet my mentor"},
       {Name: "Learn Kubernetes"},
       {Name: "Attend Go conference"},
   }
   wg.Add(len(tasks))
 
   go taskGenerator(queue, tasks)
 
   for i := 0; i < workers; i++ {
      
       go taskHandler(queue, &wg)
   }
 
   wg.Wait()
}
 
func taskGenerator(ch chan Task, tasks []Task) {
   defer close(ch)
   for _, t := range tasks {
       msg := fmt.Sprintf("Start [ %s ] ", t.Name)
       fmt.Println(msg)
       ch <- t
   }
}
 
func taskHandler(ch chan Task, wg *sync.WaitGroup) {
   for t := range ch {
       fmt.Printf("Done [ %s ] \n", t.Name)
       time.Sleep(time.Second)
       wg.Done()
   }
}

Output

$ go run main.go 
Start [ Review Go Code ] 
Start [ Write code documentation ] 
Done [ Review Go Code ] 
Done [ Write code documentation ] 
Start [ Meet my mentor ] 
Done [ Meet my mentor ] 
Start [ Learn Kubernetes ] 
Done [ Learn Kubernetes ] 
Start [ Attend Go conference ] 
Done [ Attend Go conference ]

Explanation

The above is really interesting because it uses sync.WaitGroups. sync.WaitGroup helps the main() goroutine to keep track of the number of tasks still running and wait for them to finish executing before exiting the program. In the main function, we declare a sync.WaitGroup with var wg sync.WaitGroup and a channel of tasks with queue := make(chan Task) .

We then define the number of workers with workers := 3 and an array of tasks. wg.Add(len(tasks)) indicates the number of tasks that should be waited to successfully execute. The next command go taskGenerator(queue, tasks) is responsible for adding tasks in the queue channel by looping through all the tasks and sending them to the queue channel one after the other. taskGenerator() is also responsible for closing the channel after all tasks have been sent into the channel.

Advertisement

The next line is our main focus. We use a for loop to loop through the existing number of workers, and for each worker we call a goroutine with go taskHandler(queue, &wg). In the taskHandler() goroutine, we range over tasks in the channel that is passed to it.These will cause goroutines to compete for tasks until they are finished.

 

Summary

Concurrency in Go enables developers to program their program using easy to use patterns. Reading from a single channel using multiple goroutines is one of the patterns called Fan Out. In this article, we learn about goroutines, channels, Reading from a single channel shared by multiple goroutines and Reading from a single channel shared by multiple goroutines while throttling the number of goroutines.

 

References

https://go.dev/blog/pipelines
https://www.oreilly.com/library/view/concurrency-in-go/9781491941294/ch04.html

 

Categories GO

Didn't find what you were looking for? Perform a quick search across GoLinuxCloud

If my articles on GoLinuxCloud has helped you, kindly consider buying me a coffee as a token of appreciation.

Buy GoLinuxCloud a Coffee

For any other feedbacks or questions you can either use the comments section or contact me form.

Thank You for your support!!

Leave a Comment

X