Go并发模式

2/26/2023 Go基础

Go语言推荐CSP模型进行并发处理,而非通过共享内存。这里总结一些并发模式或者称作范式,去管理控制协程。

# Go并发模式

我们都知道Go语言开启协程只需要go func(){ },有时候需要与goroutine之间建立联系,方便进一步控制与处理。 因此总结一些范式,优雅的管理goroutine。

# 退出模式

使用关键字go很容易启动goroutine,这样创建的协程和当前协程已经分离,我们并不知道

# 等待指定协程退出















 
















package main

import (
	"fmt"
	"time"
)

type signal struct{}

func worker() {
	println("worker is working...")
	time.Sleep(1 * time.Second)
}

func spawn(f func()) <-chan signal {
	c := make(chan signal)
	go func() {
		println("worker start to work...")
		f()
		c <- signal(struct{}{})
	}()
	return c
}

func main() {
	println("start a worker...")
	c := spawn(worker)
	<-c
	fmt.Println("worker work done!")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

spawn函数创建的新goroutine与调用spawn函数的goroutine(main goroutine)之间通过channel建立了联系。

# 等待多个协程退出



















 
























package main

import (
	"fmt"
	"sync"
	"time"
)

type signal struct{}

func worker(i int) {
	fmt.Printf("worker %d: is working...\n", i)
	time.Sleep(1 * time.Second)
	fmt.Printf("worker %d: works done\n", i)
}

func spawnGroup(f func(i int), num int) <-chan signal {
	c := make(chan signal)
	var wg sync.WaitGroup

	for i := 0; i < num; i++ {
		wg.Add(1)
		go func(i int) {
			fmt.Printf("worker %d: start to work...\n", i)
			f(i)
			wg.Done()
		}(i + 1)
	}

	go func() {
		wg.Wait()
		c <- signal(struct{}{})
	}()
	return c
}

func main() {
	fmt.Println("the group of workers start to work...")
	c := spawnGroup(worker, 5)
	<-c
	fmt.Println("the group of workers work done!")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

通过sync.WaitGroup,对于每一个协程处理前进行wg.Add(1),退出时执行wg.Done(),并等待所有的协程退出wg.Wait()。保障所有的协程都会结束。

# 管道模式-pipeline

管道是Unix/Linux上一种典型的并发程序设计模式,例如执行:

ls -l | grep "\.go"
1

就可以利用管道机制过滤出当前路径下以".go"结尾的文件列表。

ls -l的输出作为grep "\.go"的输入很容易将两个功能模块串在一起。

   ┌──────────────┐                                      ┌───────────────┐                                   ┌──────────────┐
   │    channel   │  ───────────►  goroutines ──────────►│    channel    │ ──────────► goroutines ─────────► │   channel    │
   └──────────────┘                                      └───────────────┘                                   └──────────────┘
1
2
3

Go通常使用channel原语构建管道模式

package main

import "fmt"

func echo(nums []int) <-chan int {
	out := make(chan int, len(nums))
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

func odd(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			if n%2 != 0 {
				out <- n
			}
		}
		close(out)
	}()
	return out
}

func main() {
	var nums = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	for n := range square(odd(echo(nums))) {
		fmt.Println(n)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

echo的结果作为odd的入参,odd的结果作为square的入参。这样我们很容易扩展其他功能函数,例如:sum

参考资料:

Go Concurrency Patterns: Pipelines and cancellation (opens new window)

GO编程模式:PIPELINE (opens new window)

# 扇出,扇入模式

扇出模式,即多个相同goroutine从同一个channel读取数据并处理


                                                 ┌──────────────┐
               ┌──────────► goroutines ────────► │    channel   │
               │                                 └──────────────┘
               │
               │
  ┌────────────┴─┐                               ┌──────────────┐
  │    channel   ├────────► goroutines ────────► │    channel   │
  └────────────┬─┘                               └──────────────┘
               │
               │
               │                                 ┌──────────────┐
               └──────────► goroutines─────────► │    channel   │
                                                 └──────────────┘


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

扇入模式,从多个channel读取数据并处理,将结果放入一个channel


 ┌──────────────┐
 │    channel   ├─────────┐
 └──────────────┘         │
                          │
                          │
 ┌──────────────┐         ▼                ┌──────────────┐
 │    channel   ├───────►goroutines───────►│    channel   │
 └──────────────┘         ▲                └──────────────┘
                          │
                          │
 ┌──────────────┐         │
 │    channel   ├─────────┘
 └──────────────┘

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
	"fmt"
	"sync"
)

func echo(nums []int) <-chan int {
	out := make(chan int, len(nums))
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

func merge(cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)
	output := func(c <-chan int) {
		for n := range c {
			out <- n
		}
		wg.Done()
	}
	wg.Add(len(cs))
	for _, c := range cs {
		go output(c)
	}
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

func main() {
	var nums = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	in := echo(nums)
	c1 := square(in)
	c2 := square(in)

	for n := range merge(c1, c2) {
		fmt.Println(n)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

其中:

  • c1、c2是两个相同的goroutine从同一个channel取数据进行平方计算,这就是扇出;
  • merge从c1、c2两个channel里取数据合并到一个channel里,这就是扇入;

# 超时与取消模式

最近更新时间: 7/26/2023, 6:37:16 AM
什么鸟日子
蒙太奇