Pipeline Pattern Using Goroutines and Channels

Published on
-
5 mins read
Authors

In the previous article, I have discussed about the Goroutines & how concurrency works in Golang under the hood. Do check it out if you are new to concurrency & goroutines in Golang. Now let's take this understanding further with this interesting pattern of pipelines using goroutines and channels.

Most of us have come across the word Pipeline, maybe in DevOps (CI/CD Pipelines) or Big Data (Data Processing/ETL Pipelines), etc. Informally, A Pipeline comprises of series of stages where each stage does some input processing and passes the output to next stage to get further processed. We can modify each stage independent of one another, rate limit the stages and so on.

Let's dive in to know about how to use Goroutines & Channels to implement the Pipeline Pattern with concurrency/async behaviour -

Naive Pipeline (Synchronous) :-

Consider these two functions -

func Multiply(value, multiplier int) int {
	return value * multiplier
}
func AddFive(value int) int {
	return value + 5
}

These functions just perform the two arithmetic operations on a number and return it. You can think of these as “stages” of pipeline. To complete the pipeline we can just combine both stages -

func main() {
    data := []int{5, 6, 7, 8}

    for _, ele := range data {
        fmt.Println(multiply(addFive(ele), 10))
    }
}

This simplistic model can now be extended to utilize Golang’s channels and goroutines to perform the processing of the stages concurrently.

Concurrent Pipeline :-

In our dummy example, we expect to process collection of integers to add 5 to each integer and then multiply them by some multiplier. So, let's define the stages of our pipeline -

  • sliceToStream: It is a Generator function responsible for producing stream of input for the pipeline processing. Takes integer slice as input and returns channel where data is streamed.
  • addFive: Stage to add 5 to each number. Takes the channel returned from stage one as input and returns new channel where the updated values are streamed.
  • multiply: Stage to multiply each number by some multiplier. Takes channel returned by addFive stage as input and returns the channel which has final output.

Let’s start with the Generator function i.e. Stage One -

// Stage One
func sliceToStream(data []int) <-chan int {
	out := make(chan int)

	go func() {
		defer close(out)
		for _, ele := range data {
			out <- ele
		}

	}()

	return out
}

Extending the same idea, we can rewrite multiply and addFive function to do the processing concurrently.

// Stage Two
func multiply(in <-chan int, multiplier int) chan int {
	out := make(chan int)

	go func() {
		defer close(out)
		for i := range in {
			out <- i * multiplier
		}
	}()

	return out
}

// Stage Three
func addFive(in <-chan int) chan int {
	out := make(chan int)

	go func() {
		defer close(out)
		for i := range in {
			out <- i + 5
		}
	}()

	return out
}

Note: All these stage functions are NOT BLOCKED until entire data is processed by them, Since the processing happens inside goroutines and the functions are quickly returning the channel on which we can expect the outputs.

Executing the pipeline -

func main() {
	dataSlice := []int{1, 2, 3, 4}

	// Pipeline
	intStream := sliceToStream(dataSlice)
	addedChan := addFive(intStream)
	finalChan := multiply(addedChan, 10)

	// Printing the final output
	for i := range finalChan {
		fmt.Println(i)
	}
}

This piece of code will run stages concurrently, keep passing their result to next stage via go channels. And we can get the final results by reading last stage’s channel.

Output -

$ go run pipeline.go
60
70
80
90

In case of errors or stopping the operations based on some conditions, we can make use of another channel (Maybe call it as "doneChannel") and add select clause within for loops of the stages to signal the stages to stop the execution.

Now consider the use case of this pattern with better example like reading data from some files/databases, performing sanitization, performing some business logic over sanitized data and finally passing it to respective stakeholder services/clients; Go's concurrency really makes it easy to implement this behaviour.

That's it! This article was just to get you started with the concurrent pipeline pattern in Go. To level up further with the concept, go through this nice article by the official Go Blog. Which talks about Fan-Out/Fan-In implementations, explicit cancellation of goroutines, Bounded Parallelism, etc.

Thanks for sticking till the end!