Introduction to Go Fan Out Fan In
Go Fan In and Fan Out is a concurrency paradigm where inputs from several sources get converged (multiplexed) into a single stream or inputs from one source are streamed to multiple pipelines. In simple words , this paradigm can be thought of as producer and consumer architecture , where we have multiple producers sending input to a single consumer or a single producer sending inputs to multiple consumers.
This article introduces you to Go Fan in Fan Out making use of goroutines and channels. To make good sense of this article, you should be familiar with goroutines and Go channels. To learn more about goroutines and go channels , read the Goroutines and Go channels articles respectively.
Before we get started, let us refresh our minds on what channels are.
- Channels facilitate communications between Goroutines.
- Sending and receiving in channels is a blocking operation.
- Data cannot be sent to a closed channel.
- The comma ok idiom is used to receive from a closed channel.
- There can be more than one sender and receiver and vice versa in the same program.
- Ranging over inputs from channels will stop only when the channel is closed.
Fan Out Fan In concurrency paradigm is made up of a pipeline which is a series of stages(Goroutines running the same function). These Goroutines are connected by Go channels. Inputs will be received via the inbound channels. These inputs will be transformed and sent via outbound channels.
Fan Out
Fan out is used when multiple functions read from the same channel. The reading will stop only when the channel is closed. This characteristic is often used to distribute work amongst a group of workers to parallelize the CPU and I /O. Consider a function called generator that has a goroutine that iterates through numbers 0 to 5. For each iteration a number is sent into a receiving channel and stored there temporarily. The generator function then returns a channel storing the numbers.
Example
package main
import (
"fmt"
"time"
)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
}()
return out
}
func main() {
fmt.Println("Start Fan Out ")
c1 := generator(1, 2, 3)
c2 := generator(4, 5, 6)
go func() {
for num := range c1 {
fmt.Println(num)
}
}()
go func() {
for num := range c2 {
fmt.Println(num)
}
}()
time.Sleep(time.Second * 2)
}
Explanation
In the preceding example, we define a function called generator()
that declares a Go channel called out
of type int. After channel declaration we use a goroutine that loops through a list of integers.For each iteration, we send the current integer in the loop into the out channel
. After the iteration is over, we return the receiving channel back to the caller. In the main function, we call the generator()
function two times and pass it integer values. We then spin a goroutine that ranges over the values in the channels and print out the values. We then force the main goroutine to wait the goroutine for 1 second to finish executing by calling the time.Sleep(time.Second*1)
. Without the wait feature, the main goroutines will exit the program without allowing the goroutine to finish executing.
Output
$ go run main.go
Start Fan Out
1
2
3
4
5
6
Data transformation
With regards to producer and consumer architecture, our architecture needs data transformation before sending the output to the consumers. We really do not need the data transformation part. We are just demonstrating common use cases when working with the Fan In Fan Out pattern. The flow of input will be generator() → cube() → consumer()
. Please note that in our case the generator is our producer and the main()
function is our consumer. The cube()
function is responsible for data transformation.
Example
func cube(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n * n
}
}()
return out
}
Explanation
In the preceding example, we define a function called cube()
. The cube()
function takes a single channel as the input parameter. The cube()
function declares an out channel
that will pass the transformed data to the consumer. The cube() function spins a goroutine that iterates through values from the in channel that was passed as a parameter. For each iteration we calculate the cube (n * n * n)
and pass the results to the out channel.
Fan In
Is used when a single function reads from multiple inputs and proceeds until all are closed . This is made possible by multiplexing the input into a single channel.
In the next example, we write code that merges (multiplex) inputs from several channels into one channel.
Example
func merge(in ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(in))
for _, c := range in {
go output(c)
}
go func() {
wg.Wait()
}()
return out
}
Explanation
The merge function is responsible for fanning in inputs from the cube()
function. It takes an array of channels of type int. In the merge()
function, we initialize a waitgroup using var wg sync.Waitgroup
. The wait group is responsible for ensuring that all goroutines execute up to the end. We also initialize a channel out
of type int that will receive all the inputs from different channels. This is where multiplexing(converging inputs)will take place. We then define an anonymous function that iterates over inputs from a single channel and passes them into the out channel.
We then loop through all the channels passed into the merge()
function and then spin output()
goroutines that take a channel from the loop. At this point, we are converging all the inputs from all the channels into the out channel. At the end we spin a separate goroutine that waits for all goroutines to return and finally return the out
channels holding all the converged inputs.
We can now bring everything together in the main function. The generator()
function is the producer. It generates multiple numbers and sends them to an out channel. The cube()
function receives inputs from the out
channel from the generator()
function, transforms it by getting the cube ( n* n*n)
. The cube() function sends the transformed inputs into another out
channel. The merge()
function merges inputs from several channels from the cube()
function into one out
channel.
Example
package main
import (
"fmt"
"sync"
"time"
)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
}()
return out
}
func cube(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n * n
}
}()
return out
}
func merge(in ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(in))
for _, c := range in {
go output(c)
}
go func() {
wg.Wait()
}()
return out
}
func main() {
fmt.Println("Start Fan Out ")
// Producer : Fan Out
c1 := generator(1, 2, 3)
c2 := generator(4, 5, 6)
// Transformer
cube1 := cube(c1)
cube2 := cube(c2)
// Fan In
out := merge(cube1, cube2)
go func() {
for n := range out {
fmt.Println(n)
}
}()
time.Sleep(time.Second * 2)
}
Output
$ go run main.go
Start Fan Out
1
8
64
125
27
216
Summary
In this article we learn about the Go Fan In Fan Out concurrency pattern. This pattern consists of a pipeline which is a series of stages (goroutines) connected by channels. Each stage can have any number of inbound and outbound channels.
References