Go并发简介

Go并发简介

Go并发编程的困难

  1. 在面对并发难题时,感觉无从下手,不知道该用什么并发原语来解决问题
  1. 如果多个并发原语都可以解决问题,那么,究竟哪个是最优解呢?比如说是用互斥锁,还是

用 Channel。

  1. 不知道如何编排并发任务。并发编程不像是传统的串行编程,程序的运行存在着很大的不确

定性。这个时候,就会面临一个问题,怎么才能让相应的任务按照你设想的流程运行呢

  1. 有时候,按照正常理解的并发方式去实现的程序,结果莫名其妙就 panic 或者死锁了,

查起来非常困难

  1. 已知的并发原语都不能解决并发问题,程序写起来异常复杂,而且代码混乱,容易出错。

Go并发编程能力

两条主线:

  • 知识主线
  • 学习主线

image-20230224101635312

基本并发元语:

Mutex、RWMutex、Waitgroup、Cond、Pool、Context 等标准库中的并发原语

原子操作:

Go 标准库中提供的原子操作

Channel:

Channel 类型是 Go 语言独特的类型,因为比较新,所以难以掌握。

扩展并发元语:

Go 开发组不准备在标准库中扩充并发原语了,但是还有一些并发原语应用广泛,比如信号量、SingleFlight、循环栅栏、ErrGroup 等。掌握了它们,就可以在处理一些并发问题时,取得事半功倍的效果。

分布式并发元语:

etcd 实现的一些分布式并发原语,比如 Leader 选举、分布式互斥锁、分布式读写锁、分布式队列等,在处理分布式场景的并发问题时,特别有用。

资源并发访问问题:

进程和线程:

进程是资源分配的最小单位,线程是CPU调度的最小单位。

image-20230224103234848

进程=火车,线程=车厢

  • 线程在进程下行进(单纯的车厢无法运行)
  • 一个进程可以包含多个线程(一辆火车可以有多个车厢)
  • 不同进程间数据很难共享(一辆火车上的乘客很难换到另外一辆火车,比如站点换乘)
  • 同一进程下不同线程间数据很易共享(A车厢换到B车厢很容易)
  • 进程要比线程消耗更多的计算机资源(采用多列火车相比多个车厢更耗资源)
  • 进程间不会相互影响,一个线程挂掉将导致整个进程挂掉(一列火车不会影响到另外一列火车,但是如果一列火车上中间的一节车厢着火了,将影响到所有车厢)
  • 进程可以拓展到多机,进程最多适合多核(不同火车可以开在多个轨道上,同一火车的车厢不能在行进的不同的轨道上)
  • 进程使用的内存地址可以上锁,即一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。(比如火车上的洗手间)-”互斥锁”
  • 进程使用的内存地址可以限定使用量(比如火车上的餐厅,最多只允许多少人进入,如果满了需要在门口等,等有人出来了才能进去)-“信号量”

Goroutine

在Java和C++开发中,实现并发编程需要自己维护一个线程池,需要自己包装任务,还需要自己去调用线程执行任务并且切换上下文,非常痛苦!

goroutine就是这种机制,类似于线程,由runtime调度和管理。

使用goroutine

在调用函数前加上go关键字

启动单个goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import "fmt"

func hello() {
fmt.Println("hello goroutine")
}

func main() {
hello()
fmt.Println("main go routine done!")
}

顺序执行的结果:

image-20230224111809045

在hello前加上go关键字

1
go hello()

执行结果:

image-20230224111904021

没有打印hello goroutine

Why?

程序启动时,会为main()函数创建一个默认的goroutine, main()函数返回时,该goroutine结束,所有main()中启动的goroutine()会一同结束。

能不能想办法,让main函数等等hello

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
"time"
)

func hello() {
fmt.Println("hello goroutine")
}

func main() {
go hello()
fmt.Println("main go routine done!")
time.Sleep(time.Second)
}

image-20230224114542872

此时,创建新的goroutine需要花费一定时间,main中的goroutine会继续执行

启动多个goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"sync"
)

var wg sync.WaitGroup

func hello2(i int) {
defer wg.Done()
fmt.Println("Hello Goroutine!", i)
}

func main() {
for i := 0; i < 10; i++ {
wg.Add(1)
go hello2(i)
}
wg.Wait()
}

执行结果:多次执行的结果并不相同

image-20230224172920481

image-20230224173005952

因为10个goroutine是并发执行的,而goroutine的调度是随机的。

主协程退出后,其他任务是否执行?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"
"time"
)

func main() {
// 合起来写
go func() {
i := 0
for {
i++
fmt.Printf("new goroutine: i = %d\n", i)
time.Sleep(time.Second)
}
}()
i := 0
for {
i++
fmt.Printf("main goroutine: i = %d\n", i)
time.Sleep(time.Second)
if i == 2 {
break
}
}
}

image-20230224173334444

runtime包

runtime.Gosched()

这个函数的作用是让当前goroutine让出CPU,好让其它的goroutine获得执行的机会。同时,当前的goroutine也会在未来的某个时间点继续运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
)

func main() {
go func(s string) {
for i := 0; i < 2; i++ {
fmt.Println(s)
}
}("world")

// 主协程
for i := 0; i < 2; i++ {
//runtime.Gosched()
fmt.Println("hello")
}
}

runtime.Goexit()

runtime.Goexit函数在终止调用它的Goroutine的运行之前会先执行该Groution中还没有执行的defer语句。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"runtime"
)

func main() {
go func() {
defer fmt.Println("A.defer")
func() {
defer fmt.Println("B.defer")
// 结束协程
runtime.Goexit()
defer fmt.Println("C.defer")
fmt.Println("B")
}()
fmt.Println("A")
}()
for {
}
}

runtime.GoMAXPROCS()

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。

Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"runtime"
"time"
)

func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}

func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}

func main() {
runtime.GOMAXPROCS(2)
go a()
go b()
time.Sleep(time.Second)
}

此时使用的CPU核心为1核,任务会挨个完成。

Channel

单纯并发无意义,出现函数与函数的数据交换时,才能体现并发执行函数的意义。

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

channel类型

1
2
3
var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道

创建channel

1
2
var ch chan int
fmt.Println(ch) //结果为nil

通道声明之后需要使用make函数初始化之后才可以使用。

1
2
3
4
5
make(chan 元素类型, [缓冲大小]) // 缓冲可以自选

ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)

channel操作

通道有发送(send)、接收(receive)和关闭(close)三种操作。

发送和接收都使用<-符号。

初始化一个通道

1
ch := make(chan int)

发送

1
ch <- 10

接受

1
2
x := <-ch //从通道内接受值并赋给变量x
<-ch //从ch中接受值,忽略结果

关闭

1
close(ch)

只有在goroutine所有数据都发送完毕时,才能关闭通道。

通道是可以被垃圾回收机制回收的,所以不一定需要关闭,但是文件操作是一定要关闭通道的。

关闭通道后,会有以下注意点:

  • 关闭后的通道再进行发送,会panic
  • 关闭后的通道可以继续接受值,知道通道为空
  • 关闭的没有值的通道接受到的为类型对应的零值
  • 关闭一个已经关闭的通道,会panic

无缓冲通道

无缓冲通道又称为阻塞通道

1
2
3
4
5
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功")
}

此段代码编译可以通过,但是执行时会出现以下错误

image-20230224212700464

这是死锁错误,因为我们使用的是ch := make(chan int),建立的是无缓冲通道,只有在有人接收值的时候才能够发送值。

上述代码ch <- 10会造成死锁。

如何解决?只有在接收的时候,才能发送,那我们就启动一个协程去接收值。

1
2
3
4
5
6
7
8
9
10
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 启用goroutine从通道接收值
ch <- 10
fmt.Println("发送成功")
}

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。

因此,无缓冲通道也被称为同步通道

有缓冲的通道

使用make初始化通道时,指定通道容量

1
2
3
4
5
6
7
func main(){
ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
ch <- 10
fmt.Println("发送成功")
fmt.Println(len(ch))
fmt.Println(cap(ch))
}

Close()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import "fmt"

func main() {
c := make(chan int)
go func() {
for i := 0; i < 5; i++ {
c <- i
}
close(c)
}()
for {
if data, ok := <-c; ok {
fmt.Println(data)
} else {
break
}
}
fmt.Println("main结束")
}

如何优雅的从通道循环取值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
// 开启goroutine将0~100的数发送到ch1中
go func() {
for i := 0; i < 100; i++ {
ch1 <- i
}
close(ch1)
}()
// 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
go func() {
for {
i, ok := <-ch1 // 通道关闭后再取值ok=false
if !ok {
break
}
ch2 <- i * i
}
close(ch2)
}()
// 在主goroutine中从ch2中接收值打印
for i := range ch2 { // 通道关闭后会退出for range循环
fmt.Println(i)
}
}

单向通道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}

func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}

func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2, ch1)
printer(ch2)
}

Select

select多路复用

试想,现有多个通道需要接收数据,可以怎么写?

1
2
3
4
5
//使用遍历
for{
data,ok := <-ch1
data,ok := <-ch2
}

这种方法可以实现多通道接受数据的请求,但是执行效率太低,go自带了select语句,可以同时响应多个通道的操作。

1
2
3
4
5
6
7
8
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
  • select可以同时监听一个或多个channel,直到其中一个channel ready
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"time"
)

func test1(ch chan string) {
time.Sleep(time.Second * 5)
ch <- "test1"
}
func test2(ch chan string) {
time.Sleep(time.Second * 2)
ch <- "test2"
}
func main() {
// 建立两个通道
output1 := make(chan string)
output2 := make(chan string)

go test1(output1)
go test2(output2)
select {
case s1 := <-output1:
fmt.Println("s1 = ", s1)
case s2 := <-output2:
fmt.Println("s2 = ", s2)

}
}

  • 如果多个channel同时ready,则随机选择一个执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
)

func main() {
// 创建2个管道
int_chan := make(chan int, 1)
string_chan := make(chan string, 1)
go func() {
//time.Sleep(2 * time.Second)
int_chan <- 1
}()
go func() {
string_chan <- "hello"
}()
select {
case value := <-int_chan:
fmt.Println("int:", value)
case value := <-string_chan:
fmt.Println("string:", value)
}
fmt.Println("main结束")
}
  • 可以用于判断管道是否存满
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"time"
)

// 判断管道有没有存满
func main() {
// 创建管道
output1 := make(chan string, 10)
// 子协程写数据
go write(output1)
// 取数据
for s := range output1 {
fmt.Println("res:", s)
time.Sleep(time.Second)
}
}

func write(ch chan string) {
for {
select {
// 写数据
case ch <- "hello":
fmt.Println("write hello")
default:
fmt.Println("channel full")
}
time.Sleep(time.Millisecond * 500)
}
}

并发安全和锁

竞态问题

多个goroutine同时操作同一资源,存在数据竞争,结果与预期不符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var x int64
var wg sync.WaitGroup

func add() {
for i := 0; i < 5000; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}

互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
for i := 0; i < 5000; i++ {
lock.Lock() // 加锁
x = x + 1
lock.Unlock() // 解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}

使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。

读写互斥锁

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。

读写锁分为两种:读锁和写锁。

当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)

func write() {
// lock.Lock() // 加互斥锁
rwlock.Lock() // 加写锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
rwlock.Unlock() // 解写锁
// lock.Unlock() // 解互斥锁
wg.Done()
}

func read() {
// lock.Lock() // 加互斥锁
rwlock.RLock() // 加读锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
rwlock.RUnlock() // 解读锁
// lock.Unlock() // 解互斥锁
wg.Done()
}

func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}

for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}

wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}


Go并发简介
http://example.com/2023/02/24/Go并发简介/
Author
WYX
Posted on
February 24, 2023
Licensed under