通常会说程序是并发设计的,也就是说程序允许多个任务同时执行,但实际上并不一定会在同一时刻发生。在单核处理器上,程序能以间隔方式切换执行。并行则依赖多核处理器等物理设备,让多个任务真正在同一时刻执行,它代表了当前程序运行状态。
并行是并发设计的理想执行模式。
多线程或多进程是并行的基本条件,但单线程也可用协程(coroutine)做到并发。尽管协程在单个线程上通过主动切换来实现多任务并发。
协程优点:
将因阻塞而浪费的时间找回来免去线程切换的开销不错的执行效率多个任务本质上依旧是串行的,加上可控自主调度,所以并不需要做同步处理。通常情况下,多进程来实现分布式和负载均衡,减轻单进程垃圾回收压力;用多线程(LWP)抢夺更多的处理器资源;用协程来提高处理器时间片利用率。
只须在函数用前添加go关键字即可创建并发任务。
go println("hello world") go func(s string){ println(s) }("hello world")关键字go并非执行并发操作,而是创建一个并发任务单元。新建任务被放置在系统队列中,等待调度器安排合适系统线程去获取执行权。当前流畅不会阻塞,不会等待该任务启动,且运行时也不保证并发任务的执行次序。
每个任务单元出保存函数指针,调用参数外,还会分配执行所需的栈内存空间。相比系统默认MB级别的线程栈,goroutine自定义栈初始仅需2KB,所以才能创建成千上万的并发任务。自定义栈采取按需分配策略,在需要时进行扩容,最大能到GB规模。
与defer一样,goroutine也会因“延迟执行”而立即计算并复制执行参数。
import "time" var c int func counter() int { c++ return c } func main() { a := 110 go func(x, y int) { time.Sleep(time.Second) println("go", x, y) }(a, counter()) a += 100 println("main", a, counter()) time.Sleep(time.Second * 3) }进程退出时不会等待并发任务结束,可用通道(channel)阻塞,然后发出信号。
import "time" func main() { exit := make(chan struct{}) go func() { time.Sleep(time.Second) println("goroutine done") close(exit) //关闭通道 }() println("main...") <- exit // 通道关闭,立即解除阻塞 println("main exit") }如果要等得多个任务结束,推荐使用sync.WaitGroup.通过设定计数器,让每个goroutine在退出前递减,直至归零时解除阻塞。
import ( "sync" "time" ) func main() { var wg sync.WaitGroup for i :=0;i<10;i++ { wg.Add(1) go func(id int) { defer wg.Done() time.Sleep(time.Second) println("goroutine ",id,"done") }(i) } println("main..") wg.Wait() println("main exit") }尽管WaitGroup.Add实现了原子操作,但建议在gorutine外累加计数器,以免Add尚未执行,Wait已经退出。
可在多出使用Wait阻塞,都能接收到通知,等待归零,解除阻塞。
运行时可能会创建很多线程,但任何时候仅有限的几个线程参与并发任务执行。该数量默认与处理器核数相等,可用runtime.GOMAXPROCS函数(环境变量)修改。参数小于1,返回当前系统的值,不做任何设置。
runtime.NumCPU()获取当前cpu数
与线程不同,goroutine任务无法设置优先级,无法获取编号,没有局部存储(TLS),甚至连返回值都会被抛弃。但除了优先级外,其他功能都很容易实现。
暂停,释放线程去执行其他任务,当前任务被放回队列,等待下次调度时恢复执行。
runtime.Gosched()Goexit立即终止当前任务,运行时确保所有已注册延迟调用被执行。该函数不会影响其他并发任务,不会引发panic,自然无法捕获。
runtime.Goexit()如果在mian.mian调用Goexit,它会等待其他任务结束,然后让进城直接崩溃。
无论身处那一层,Goexit都能立即终止整个调用堆栈,这与return仅退出当前函数不同。
Go并未实现严格的并发安全。
允许全局变量,指针,引用类型这些非安全内存共享操作,就需要开发人员自行维护数据一致性和完成性。Go鼓励使用CSP通道,以通道来代替内存共享,实现并发安全。
通过消息来避免竞态的模型处理CSP还有Actor。两者有着较大区别。
作为CSP核心,通道(channl)显示的,要求操作双方必须知道数据类型和具体通道,并不关心另一端操作者身份和数量。可如果另一端为准备妥当,或消息未能及时处理时,会阻塞当前端。
Actor是透明的,不在乎数据类型及通道,只要知道接收信箱即可。默认就是异步方式,发送方对消息是否被接收和处理并不关心。
从底层实现上来说,通道只是一个队列。同步模式下,发送和接收双方配对,然后直接复制数据给对方。如配对失败,则置入等待对垒,直到另一方出现后才被唤醒。异步模型抢夺的则是数据缓冲槽。发送发要求有空槽可供写入,而接收方则要求有缓冲数据可读。需求不符合时,同样加入等待队列,直到有另一方数据或腾出空槽后被唤醒。
通道还被用作事件通知。
func main() { done := make(chan struct{}) // 结束事件 c := make(chan string) // 数据传输通道 go func() { s := <-c // 接收消息 println(s) close(done) // 关闭通道,作为结束通知 }() c <- "zzz" //发送消息 <-done // 阻塞,直到有数据或管道关闭 }同步模式必须有配对操作的goroutine出现,否则会一致阻塞。而异步模式在缓冲区未满或数据为读完前,不会阻塞。
func main() { c := make(chan int, 3) //创建3个缓冲槽的异步通道 c <- 110 // 缓冲区未满,不会阻塞 c <- 120 println(<-c) // 缓冲区尚有数据,不会阻塞 println(<-c) }缓冲区大小仅是内部属性,不属于类型组成部分。另外通道变量本身就是指针可用相等操作符判断是否为统一对象或nil。
func main() { var a ,b chan int = make(chan int,3),make(chan int) var c chan bool println(a ==b) println(c ==nil) fmt.Printf("%p,%d\n",a,unsafe.Sizeof(a)) }内置函数cap和len返回缓冲区大小和当前已缓冲数量;而对于同步通道则都返回0,据此可判断通道类型是同步还是异步。
除使用简单的发送和接收操作符外,还可用ok-idom 或者range模式处理数据。
func main() { done := make(chan struct{}) c:=make(chan int) go func() { defer close(done) for { x,ok :=<-c if !ok{ // 判断通道是否关闭 return } println(x) } }() c<-110 c<-120 c<-119 close(c) }对于循环接收数据,range模式更简洁
package main func main() { done := make(chan struct{}) c := make(chan int) go func() { defer close(done) for x := range c { //循环获取消息,直到通道被关闭 println(x) } }() c <- 1 c <- 2 c <- 3 <-done }及时用close函数关闭通道引发结束通知,否者可能会导致死锁。
通知可以是群体性的,也未必就是通知结束,可以是人很需要表达的事件。
对于closed或nil通道,发送和接收操作都有相应规则。
向已关闭通道发送数据,引发panic从已关闭接收数据,返回已缓冲数据或零值无论收发,nil通道都会阻塞重复关闭,或者关闭nil通道都会引发panic错误。
通道默认是双向的,并不区分发送和接收端。但某些时候,可限制收发操作的方向来获得更严谨的逻辑操作。
尽管用make创建单向通道,没有任何意义,通常使用类型转换来获取单向通道,并分别赋予操作双方。
package main import ( "sync" ) func main() { var wg sync.WaitGroup wg.Add(2) c := make(chan int) var send chan<- int = c //声明单向通道 发送方 var recv <-chan int = c //声明单向通道 接收方 go func() { defer wg.Done() for i := range recv { println(i) } }() go func() { defer wg.Done() defer close(c) for i := 0; i < 3; i++ { send <- i } }() wg.Wait() }不能再单向通道上做逆向操作。close不能用于接收端。无法将单向通道重新给转换回去。
如果同时处理多个通道,可选用select语句。它会随机选择一个可用通道做收发选择。
package main import "sync" func main() { var wg sync.WaitGroup wg.Add(2) a, b := make(chan int), make(chan int) go func() { defer wg.Done() for { var ( name string x int ok bool ) select { case x, ok = <-a: name = "a" case x, ok = <-b: name = "b" } if !ok { // 如果任一通道关闭,则终止接收 return } println(name, x) } }() go func() { defer wg.Done() defer close(a) defer close(b) for i := 0; i < 10; i++ { select { //随机选择发送channl case a <- i: case b <- i * 10: } } }() wg.Wait() }如果等全部通道消息处理结束(closed),可竞已完成通道设置为nil,这样就会被阻塞,不在被select选中。
unc main() { var wg sync.WaitGroup wg.Add(3) a, b := make(chan int), make(chan int) go func() { //接收端 defer wg.Done() for { select { case x, ok := <-a: if !ok { a = nil break } println("a", x) case x, ok := <-b: if !ok { b = nil break } println("b", x) } if a == nil && b == nil { return } } }() go func() { // 发送端a defer wg.Done() defer close(a) for i := 0; i < 4; i++ { a <- i } }() go func() { // 发送端b defer wg.Done() defer close(b) for i := 0; i < 5; i++ { b <- i * 110 } }() wg.Wait() }即便是同一通道,也会随机选择case执行。
当所有通道都不可用湿,select会执行default语句。如此可避开select阻塞,但需注意处理外层循环,以免陷入空耗。
package main import ( "fmt" "time" ) func main() { done := make(chan struct{}) c := make(chan int) go func() { defer close(done) for { select { case x, ok := <-c: if !ok { return } fmt.Println("data", x) default: fmt.Println("执行默认逻辑") } fmt.Println(time.Now()) time.Sleep(time.Second) } }() time.Sleep(time.Second * 3) c <- 100 close(c) <-done }也可用default处理一些默认逻辑
package main func main() { done := make(chan struct{}) data := []chan int{ // 数据缓冲区 make(chan int, 3), } go func() { defer close(done) for i := 0; i < 10; i++ { select { case data[len(data)-1] <- i: //生产数据 default: // 当前通道已满,生成新的缓存通道 data = append(data, make(chan int, 3)) } } }() <-done for i := 0; 1 < 10; i++ { //实现所有数据 c := data[i] close(c) for x := range c { println(x) } } }通常使用工厂方法将gorutine和通道绑定
package main import ( "sync" ) type receiver struct { sync.WaitGroup data chan int } func newReceiver() *receiver { r := &receiver{ data: make(chan int), } r.Add(1) go func() { defer r.Done() for x := range r.data { println("recv", x) } }() return r } func main() { r := newReceiver() r.data <- 110 r.data <- 120 close(r.data) r.Wait() }鉴于通道本身就是一个并发安全的队列,可用做ID generator,Pool等用途。
将发往通道的数据打包,减少传输次数,可有效提升性能。从实现上来说,通道队列依旧使用锁同步机制,单词获取更多数据(批处理),可改善频繁加锁造成的性能问题。
通道可能会引发goroutine leak,确切的说,是指goroutine处于发送或接收阻塞状态,但一直未被唤醒。垃圾回收器并不回收此类资源,导致会在等待队列里长久休闲,形成资源泄露。
通道并非用来取代所得,通道倾向于解决逻辑层次的并发处理结构,而锁则用来保护局部范围内的数据问题。
标准库sync提供了互斥和读写锁,另有原子操作等。可满足日常开发需要。Mutex,RWMutex.
临界资源:指并发环境中多个进程/线程、协程共享资源。
并发本省并不复杂,但是因给乐游资源竞争的问题,会引起一些问题。
如多个groutine在访问同一个数据资源的时候,其中一个现场修改了数据,那么这个数值就被修改了,对于其他的groutine来讲,这个数值可能是不对的。
可以借助sync包下的操作。
Go并发编程中不要以共享内存的方式去通信,而要以通信的方式去共享内存。
在Go语言中并不鼓励用锁保护共享状态的方式在不同Goroutine中分享信息(以共享内存的方式去通信)。而是鼓励通过channel将共享状态的变化在各个goroutine之间传递(以通信的当时去共享内存),这样同样能向用锁一样保证在同一的时间只有一个Goroutine访问共享状态。
sync是synchronization同步这个词的缩写,所以也会被叫做同步包。
WaitGroup,同步等待组。
在类型上,WaitGroup是一个结构体。一个WaitGroup的用途是等待一个goroutine的集合执行完成。主goroutine调用了Add()方法来设置要等待goroutine的数量。然后,每个goroutine都会执行并且执行完成后调用了Done()这个方法。与此同时可以使用Wait()方法来阻塞,直到所有的goroutine都执行完成。
Add这个方法,用来设置到WaitGroup的计数器的值。每个waitgroup中都有一个计数器,用来表示这个同步等待组中要执行的goroutine的数量。
如果计数器的数值为0,表示等待时被阻塞的goroutine都被释放,如果计数器的数值为负数,那么就会抛出错误。
Done方法就是当前WaitGroup同步等待组中的某个goroutine执行完毕后,设置这个WaitGroup的counter数值减1.
Done()方法底层代码及时调用了Add()方法
Wait()方法,表示让当前的goroutine等待,进入阻塞状态,一直到WaitGroup的计数器为零。才能解除阻塞,这个goroutine才能继续执行。
Mutex是最简单的一种锁类型,互斥锁。当一个goroutine获得了Mutex后,其他goroutine就只能等到这个goroutine释放该Mutex.
每个资源都对应一个可称为“互斥锁”的标记,这个标记用来保证在任意时刻,只能有一个协程(线程)访问该资源,其他的协程只能等待。
互斥锁是传统并发模型对共享资源进行访问控制的主要手段,有标准库sync中mutex结构体类型表示。sync.Mutex类型只有两个公开的指针方法,Lock和Unlock。Lock锁定当前的共享资源,Unlock进行解锁。
在使用互斥锁是,一定要主要:对资源操作完成后,一定要解锁,否则会出现流程执行异常,死锁等问题。通常借助defer。锁定后,立即使用defer语句保证互斥锁及时解锁。
Lock()方法,锁定m,如果该锁已在使用,则调用goroutine将阻塞,直到互斥提可用。
Unlock()方法,解锁m,如果m未在要解锁的条目上锁定,则为运行时错误。
锁定的互斥体不与特定goroutine关联。允许一个goroutine锁定互斥体,然后安排另一个goroutine解锁互斥体。
RWMutex是读/写互斥锁。锁可以由任意数量的读取器和单个编写器持有。RWMutex的零值是未锁定的mutex.
如果一个goroutine持有一个RWMutex进行读取,而另一个goroutine可能调用lock,那么在释放初始读锁之前,任何goroutinr都不应该期望能够获取读取锁。特别是,者禁止递归读取锁定。这是为了确保锁最终可用;被阻止的锁调用会将新的读取器排除在获取锁之外。
同时只能有一个goroutine能够获取写锁。同时可以由任意多个goroutine获得读锁。同时只能存在写锁或者读锁。读锁,当有写锁时,无法加载读锁,当只有读锁或者没有锁时,可以加载读锁,读锁可以加载多个,所以适用于“读多写少”的场景。
读锁解锁,RUnlock 撤销单次RLock调用,它对于其它同时存在的读取器则没有效果。若rw并没有为读取而锁定,调用RUnlock就会引发一个运行时错误。
写锁,如果在添加写锁之前已经有其他的读锁和写锁,则Lock就会阻塞直到该锁可用,为确保该锁最终可用,已阻塞的Lock调用会从获得的锁中排除新的读取锁,即写锁权限高于读锁,有写锁时优先进行写锁定。
写锁解锁,如果没有进行写锁定,则就会引起一个运行时错误。
参考资料
<go语言学习笔记> 雨痕
