Introduction
Reading from a single channel shared with multiple goroutines is concurrency in action. Concurrency in Go is the ability for a goroutine(s)(function) to run independent of each other. A goroutine is simply a lightweight thread that executes independently. Reading from a single channel using multiple goroutines requires a good understanding of goroutines and how they use channels to communicate with one another. In this article we discuss the below topics,
- Sending multiple values into a channel
- Reading from a single channel shared by multiple goroutines
- Reading from a single channel shared by multiple goroutines while throttling the number of goroutines.
Different Go Channel types
There are two types of channels in Go namely buffered
and unbuffered
channels. Buffered
channels are used for asynchronous
communication while unbuffered
channels are used for synchronous
communication.
Example
buffered := make(chan string, 5) // Buffered channel of string type
unBuffered := make(chan string) // Unbuffered channel of string type
Sending multiple values into a channel
Unbuffered channels can be used to send into it an unlimited number of values using a for loop. As long as the receiver is ready to receive inbound messages, a sending goroutine will continue sending data. The receiving goroutine can also use a for loop to continuously pull data from a channel. In the next example, we will use a queue to put some data(tasks) into it.
Example
package main
import "fmt"
type Task struct {
Name string
}
func main() {
queue := make(chan Task)
go sender(queue)
for task := range queue {
fmt.Println(task.Name)
}
}
func sender(ch chan Task) {
defer close(ch) // Sender closing a channel
tasks := []Task{
{Name: "Review Go Code"},
{Name: "Write code documentation"},
{Name: "Meet my mentor"},
{Name: "Learn Kubernetes"},
{Name: "Attend Go conference"},
}
for _, task := range tasks {
ch <- task
}
}
Output
$ go run main.go
Review Go Code
Write code documentation
Meet my mentor
Learn Kubernetes
Attend Go conference
Explanation
The above example is interesting because it depicts a normal life situation. We define a sender goroutine that sends multiple tasks to a queue. The main goroutine is responsible for pulling data from the queue using the range statement. Remember the range statement will pull data until the sender closes the channel.
Reading from a single channel shared by multiple goroutinesÂ
This section demonstrates how we can read tasks from a single channel using multiple goroutines.
Example
package main
import "fmt"
type Task struct {
Name string
}
func main() {
queue := make(chan Task)
go sender(queue)
for task := range queue {
go taskHandler(task)
}
}
func sender(ch chan Task) {
defer close(ch) // Sender closing a channel
tasks := []Task{
{Name: "Review Go Code"},
{Name: "Write code documentation"},
{Name: "Meet my mentor"},
{Name: "Learn Kubernetes"},
{Name: "Attend Go conference"},
}
for _, task := range tasks {
ch <- task
}
}
func taskHandler(t Task) {
size := len(t.Name)
msg := fmt.Sprintf("Task name size is %d characters", size)
fmt.Println(msg)
}
Output
$ go run main.go
Task name size is 24 characters
Task name size is 14 characters
Task name size is 16 characters
Task name size is 14 characters
Explanation
In the above example, we define two goroutines namely taskGenerator()
and taskHandler()
. The taskGenerator()Â
goroutine takes as an argument the channel it will send data into and loops through an array of tasks. It sends these tasks to a channel using a for range loop.
The taskHandler()
goroutine as the name suggests, handles a task that has been passed to it. The taskHandler()
goroutine is responsible for getting the size of the task name and prints out a message to the console , each message indicating the size of the task name.
Reading from a single channel shared by multiple goroutines while throttling the number of goroutines
In the above example, the number of goroutines running is determined by the number of tasks in the queue. The number of goroutines is directly proportional to the number of tasks in the queue channel. This is not good behavior because your computer memory will be over utilized if there are many tasks in the queue. To solve this challenge we limit the number of workers(throttling). In the next example, we limit the number of workers to 3 workers to consume all the tasks in the queue channel.
Example
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
Name string
}
func main() {
var wg sync.WaitGroup
queue := make(chan Task)
workers := 3
tasks := []Task{
{Name: "Review Go Code"},
{Name: "Write code documentation"},
{Name: "Meet my mentor"},
{Name: "Learn Kubernetes"},
{Name: "Attend Go conference"},
}
wg.Add(len(tasks))
go taskGenerator(queue, tasks)
for i := 0; i < workers; i++ {
go taskHandler(queue, &wg)
}
wg.Wait()
}
func taskGenerator(ch chan Task, tasks []Task) {
defer close(ch)
for _, t := range tasks {
msg := fmt.Sprintf("Start [ %s ] ", t.Name)
fmt.Println(msg)
ch <- t
}
}
func taskHandler(ch chan Task, wg *sync.WaitGroup) {
for t := range ch {
fmt.Printf("Done [ %s ] \n", t.Name)
time.Sleep(time.Second)
wg.Done()
}
}
Output
$ go run main.go
Start [ Review Go Code ]
Start [ Write code documentation ]
Done [ Review Go Code ]
Done [ Write code documentation ]
Start [ Meet my mentor ]
Done [ Meet my mentor ]
Start [ Learn Kubernetes ]
Done [ Learn Kubernetes ]
Start [ Attend Go conference ]
Done [ Attend Go conference ]
Explanation
The above is really interesting because it uses sync.WaitGroups
. sync.WaitGroup
helps the main()
goroutine to keep track of the number of tasks still running and wait for them to finish executing before exiting the program. In the main function, we declare a sync.WaitGroup
with var wg sync.WaitGroup
and a channel of tasks with queue := make(chan Task)
.
We then define the number of workers with workers := 3
and an array of tasks. wg.Add(len(tasks))
indicates the number of tasks that should be waited to successfully execute. The next command go taskGenerator(queue, tasks)
is responsible for adding tasks in the queue
channel by looping through all the tasks and sending them to the queue channel one after the other. taskGenerator()
is also responsible for closing the channel after all tasks have been sent into the channel.
The next line is our main focus. We use a for loop to loop through the existing number of workers, and for each worker we call a goroutine with go taskHandler(queue, &wg)
. In the taskHandler()
goroutine, we range over tasks in the channel that is passed to it.These will cause goroutines to compete for tasks until they are finished.
Summary
Concurrency in Go enables developers to program their program using easy to use patterns. Reading from a single channel using multiple goroutines is one of the patterns called Fan Out. In this article, we learn about goroutines, channels, Reading from a single channel shared by multiple goroutines and Reading from a single channel shared by multiple goroutines while throttling the number of goroutines.
References
https://go.dev/blog/pipelines
https://www.oreilly.com/library/view/concurrency-in-go/9781491941294/ch04.html