Go并发简介 Go并发编程的困难
在面对并发难题时,感觉无从下手,不知道该用什么并发原语来解决问题 。
如果多个并发原语都可以解决问题,那么,究竟哪个是最优解呢 ?比如说是用互斥锁,还是
用 Channel。
不知道如何编排并发任务 。并发编程不像是传统的串行编程,程序的运行存在着很大的不确
定性。这个时候,就会面临一个问题,怎么才能让相应的任务按照你设想的流程运行呢 ?
有时候,按照正常理解的并发方式去实现的程序,结果莫名其妙就 panic 或者死锁了,排
查起来非常困难 。
已知的并发原语都不能解决并发问题 ,程序写起来异常复杂,而且代码混乱,容易出错。
Go并发编程能力 两条主线:
基本并发元语: Mutex、RWMutex、Waitgroup、Cond、Pool、Context 等标准库中的并发原语
原子操作: Go 标准库中提供的原子操作
Channel: Channel 类型是 Go 语言独特的类型,因为比较新,所以难以掌握。
扩展并发元语: Go 开发组不准备在标准库中扩充并发原语了,但是还有一些并发原语应用广泛,比如信号量、SingleFlight、循环栅栏、ErrGroup 等。掌握了它们,就可以在处理一些并发问题时,取得事半功倍的效果。
分布式并发元语: etcd 实现的一些分布式并发原语,比如 Leader 选举、分布式互斥锁、分布式读写锁、分布式队列等,在处理分布式场景的并发问题时,特别有用。
资源并发访问问题: 进程和线程: 进程是资源分配的最小单位,线程是CPU调度的最小单位。
进程=火车,线程=车厢
线程在进程下行进(单纯的车厢无法运行)
一个进程可以包含多个线程(一辆火车可以有多个车厢)
不同进程间数据很难共享(一辆火车上的乘客很难换到另外一辆火车,比如站点换乘)
同一进程下不同线程间数据很易共享(A车厢换到B车厢很容易)
进程要比线程消耗更多的计算机资源(采用多列火车相比多个车厢更耗资源)
进程间不会相互影响,一个线程挂掉将导致整个进程挂掉(一列火车不会影响到另外一列火车,但是如果一列火车上中间的一节车厢着火了,将影响到所有车厢)
进程可以拓展到多机,进程最多适合多核(不同火车可以开在多个轨道上,同一火车的车厢不能在行进的不同的轨道上)
进程使用的内存地址可以上锁,即一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。(比如火车上的洗手间)-”互斥锁”
进程使用的内存地址可以限定使用量(比如火车上的餐厅,最多只允许多少人进入,如果满了需要在门口等,等有人出来了才能进去)-“信号量”
Goroutine 在Java和C++开发中,实现并发编程需要自己维护一个线程池,需要自己包装任务,还需要自己去调用线程执行任务并且切换上下文,非常痛苦!
goroutine就是这种机制,类似于线程,由runtime调度和管理。
使用goroutine 在调用函数前加上go关键字
启动单个goroutine 1 2 3 4 5 6 7 8 9 10 11 12 13 package mainimport "fmt" func hello () { fmt.Println("hello goroutine" ) }func main () { hello() fmt.Println("main go routine done!" ) }
顺序执行的结果:
在hello前加上go关键字
执行结果:
没有打印hello goroutine
Why?
程序启动时,会为main()
函数创建一个默认的goroutine
, main()
函数返回时,该goroutine
结束,所有main()
中启动的goroutine()
会一同结束。
能不能想办法,让main函数等等hello
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package mainimport ( "fmt" "time" )func hello () { fmt.Println("hello goroutine" ) }func main () { go hello() fmt.Println("main go routine done!" ) time.Sleep(time.Second) }
此时,创建新的goroutine需要花费一定时间,main中的goroutine会继续执行
启动多个goroutine 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package mainimport ( "fmt" "sync" )var wg sync.WaitGroupfunc hello2 (i int ) { defer wg.Done() fmt.Println("Hello Goroutine!" , i) }func main () { for i := 0 ; i < 10 ; i++ { wg.Add(1 ) go hello2(i) } wg.Wait() }
执行结果:多次执行的结果并不相同
因为10个goroutine是并发执行的,而goroutine的调度是随机的。
主协程退出后,其他任务是否执行?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package mainimport ( "fmt" "time" )func main () { go func () { i := 0 for { i++ fmt.Printf("new goroutine: i = %d\n" , i) time.Sleep(time.Second) } }() i := 0 for { i++ fmt.Printf("main goroutine: i = %d\n" , i) time.Sleep(time.Second) if i == 2 { break } } }
runtime包 runtime.Gosched() 这个函数的作用是让当前goroutine让出CPU,好让其它的goroutine获得执行的机会。同时,当前的goroutine也会在未来的某个时间点继续运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package mainimport ( "fmt" )func main () { go func (s string ) { for i := 0 ; i < 2 ; i++ { fmt.Println(s) } }("world" ) for i := 0 ; i < 2 ; i++ { fmt.Println("hello" ) } }
runtime.Goexit() runtime.Goexit函数在终止调用它的Goroutine的运行之前会先执行该Groution中还没有执行的defer语句。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package mainimport ( "fmt" "runtime" )func main () { go func () { defer fmt.Println("A.defer" ) func () { defer fmt.Println("B.defer" ) runtime.Goexit() defer fmt.Println("C.defer" ) fmt.Println("B" ) }() fmt.Println("A" ) }() for { } }
runtime.GoMAXPROCS() Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。
Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package mainimport ( "fmt" "runtime" "time" )func a () { for i := 1 ; i < 10 ; i++ { fmt.Println("A:" , i) } }func b () { for i := 1 ; i < 10 ; i++ { fmt.Println("B:" , i) } }func main () { runtime.GOMAXPROCS(2 ) go a() go b() time.Sleep(time.Second) }
此时使用的CPU核心为1核,任务会挨个完成。
Channel 单纯并发无意义,出现函数与函数的数据交换时,才能体现并发执行函数的意义。
如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列 ,总是遵循先入先出(First In First Out)的规则 ,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
channel类型 1 2 3 var ch1 chan int var ch2 chan bool var ch3 chan []int
创建channel 1 2 var ch chan int fmt.Println(ch)
通道声明之后需要使用make函数初始化之后才可以使用。
1 2 3 4 5 make (chan 元素类型, [缓冲大小]) ch4 := make (chan int ) ch5 := make (chan bool ) ch6 := make (chan []int )
channel操作 通道有发送(send)、接收(receive)和关闭(close) 三种操作。
发送和接收都使用<-
符号。
初始化一个通道
发送
接受
关闭
只有在goroutine所有数据都发送完毕时,才能关闭通道。
通道是可以被垃圾回收机制回收的,所以不一定需要关闭,但是文件操作是一定要关闭通道的。
关闭通道后,会有以下注意点:
关闭后的通道再进行发送,会panic
关闭后的通道可以继续接受值,知道通道为空
关闭的没有值的通道接受到的为类型对应的零值
关闭一个已经关闭的通道,会panic
无缓冲通道 无缓冲通道又称为阻塞通道
1 2 3 4 5 func main () { ch := make (chan int ) ch <- 10 fmt.Println("发送成功" ) }
此段代码编译可以通过,但是执行时会出现以下错误
这是死锁错误,因为我们使用的是ch := make(chan int)
,建立的是无缓冲通道,只有在有人接收值的时候才能够发送值。
上述代码ch <- 10
会造成死锁。
如何解决?只有在接收的时候,才能发送,那我们就启动一个协程去接收值。
1 2 3 4 5 6 7 8 9 10 func recv (c chan int ) { ret := <-c fmt.Println("接收成功" , ret) }func main () { ch := make (chan int ) go recv(ch) ch <- 10 fmt.Println("发送成功" ) }
无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。
因此,无缓冲通道也被称为同步通道 。
有缓冲的通道 使用make初始化通道时,指定通道容量
1 2 3 4 5 6 7 func main () { ch := make (chan int , 1 ) ch <- 10 fmt.Println("发送成功" ) fmt.Println(len (ch)) fmt.Println(cap (ch)) }
Close() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package mainimport "fmt" func main () { c := make (chan int ) go func () { for i := 0 ; i < 5 ; i++ { c <- i } close (c) }() for { if data, ok := <-c; ok { fmt.Println(data) } else { break } } fmt.Println("main结束" ) }
如何优雅的从通道循环取值 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func main () { ch1 := make (chan int ) ch2 := make (chan int ) go func () { for i := 0 ; i < 100 ; i++ { ch1 <- i } close (ch1) }() go func () { for { i, ok := <-ch1 if !ok { break } ch2 <- i * i } close (ch2) }() for i := range ch2 { fmt.Println(i) } }
单向通道 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func counter (out chan <- int ) { for i := 0 ; i < 100 ; i++ { out <- i } close (out) }func squarer (out chan <- int , in <-chan int ) { for i := range in { out <- i * i } close (out) }func printer (in <-chan int ) { for i := range in { fmt.Println(i) } }func main () { ch1 := make (chan int ) ch2 := make (chan int ) go counter(ch1) go squarer(ch2, ch1) printer(ch2) }
Select select多路复用 试想,现有多个通道需要接收数据,可以怎么写?
1 2 3 4 5 for { data,ok := <-ch1 data,ok := <-ch2 }
这种方法可以实现多通道接受数据的请求,但是执行效率太低,go自带了select语句,可以同时响应多个通道的操作。
1 2 3 4 5 6 7 8 select { case <-chan1: case chan2 <- 1 : default : }
select可以同时监听一个或多个channel,直到其中一个channel ready
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package mainimport ( "fmt" "time" )func test1 (ch chan string ) { time.Sleep(time.Second * 5 ) ch <- "test1" }func test2 (ch chan string ) { time.Sleep(time.Second * 2 ) ch <- "test2" }func main () { output1 := make (chan string ) output2 := make (chan string ) go test1(output1) go test2(output2) select { case s1 := <-output1: fmt.Println("s1 = " , s1) case s2 := <-output2: fmt.Println("s2 = " , s2) } }
如果多个channel同时ready,则随机选择一个执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package mainimport ( "fmt" )func main () { int_chan := make (chan int , 1 ) string_chan := make (chan string , 1 ) go func () { int_chan <- 1 }() go func () { string_chan <- "hello" }() select { case value := <-int_chan: fmt.Println("int:" , value) case value := <-string_chan: fmt.Println("string:" , value) } fmt.Println("main结束" ) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package mainimport ( "fmt" "time" )func main () { output1 := make (chan string , 10 ) go write(output1) for s := range output1 { fmt.Println("res:" , s) time.Sleep(time.Second) } }func write (ch chan string ) { for { select { case ch <- "hello" : fmt.Println("write hello" ) default : fmt.Println("channel full" ) } time.Sleep(time.Millisecond * 500 ) } }
并发安全和锁 竞态问题 多个goroutine同时操作同一资源,存在数据竞争,结果与预期不符
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 var x int64 var wg sync.WaitGroupfunc add () { for i := 0 ; i < 5000 ; i++ { x = x + 1 } wg.Done() }func main () { wg.Add(2 ) go add() go add() wg.Wait() fmt.Println(x) }
互斥锁 互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 var x int64 var wg sync.WaitGroupvar lock sync.Mutexfunc add () { for i := 0 ; i < 5000 ; i++ { lock.Lock() x = x + 1 lock.Unlock() } wg.Done() }func main () { wg.Add(2 ) go add() go add() wg.Wait() fmt.Println(x) }
使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。
读写互斥锁 互斥锁是完全互斥的 ,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的 ,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。
读写锁分为两种:读锁和写锁。
当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 var ( x int64 wg sync.WaitGroup lock sync.Mutex rwlock sync.RWMutex )func write () { rwlock.Lock() x = x + 1 time.Sleep(10 * time.Millisecond) rwlock.Unlock() wg.Done() }func read () { rwlock.RLock() time.Sleep(time.Millisecond) rwlock.RUnlock() wg.Done() }func main () { start := time.Now() for i := 0 ; i < 10 ; i++ { wg.Add(1 ) go write() } for i := 0 ; i < 1000 ; i++ { wg.Add(1 ) go read() } wg.Wait() end := time.Now() fmt.Println(end.Sub(start)) }