1. channel的使用
很多文章介紹channel的時候都和併發揉在一起,這裏我想把它當做一種數據結構來單獨介紹它的實現原理。
channel,通道。golang中用於數據傳遞的一種數據結構。是golang中一種傳遞數據的方式,也可用作事件通知。
1.1 聲明、傳值、關閉
使用chan關鍵字聲明一個通道,在使用前必須先創建,操作符 <-
用於指定通道的方向,發送或接收。如果未指定方向,則為雙向通道。
1 //聲明和創建
2 var ch chan int // 聲明一個傳遞int類型的channel
3 ch := make(chan int) // 使用內置函數make()定義一個channel
4 ch2 := make(chan interface{}) // 創建一個空接口類型的通道, 可以存放任意格式
5
6 type Equip struct{ /* 一些字段 */ }
7 ch2 := make(chan *Equip) // 創建Equip指針類型的通道, 可以存放*Equip
8
9 //傳值
10 ch <- value // 將一個數據value寫入至channel,這會導致阻塞,直到有其他goroutine從這個channel中讀取數據
11 value := <-ch // 從channel中讀取數據,如果channel之前沒有寫入數據,也會導致阻塞,直到channel中被寫入數據為止
12
13 ch := make(chan interface{}) // 創建一個空接口通道
14 ch <- 0 // 將0放入通道中
15 ch <- "hello" // 將hello字符串放入通道中
16
17 //關閉
18 close(ch) // 關閉channel
把數據往通道中發送時,如果接收方一直都沒有接收,那麼發送操作將持續阻塞。Go 程序運行時能智能地發現一些永遠無法發送成功的語句並報錯:
fatal error: all goroutines are asleep - deadlock!
//運行時發現所有的 goroutine(包括main)都處於等待 goroutine。
1.2 四種重要的通道使用方式
無緩衝通道
通道默認是無緩衝的,無緩衝通道上的發送操作將會被阻塞,直到有其他goroutine從對應的通道上執行接收操作,數據傳送完成,通道繼續工作。
package main
import (
"fmt"
"time"
)
var done chan bool
func HelloWorld() {
fmt.Println("Hello world goroutine")
time.Sleep(1*time.Second)
done <- true
}
func main() {
done = make(chan bool) // 創建一個channel
go HelloWorld()
<-done
}
1 //輸出
2 //Hello world goroutine
由於main不會等goroutine執行結束才返回,前文專門加了sleep輸出為了可以看到goroutine的輸出內容,那麼在這裏由於是阻塞的,所以無需sleep。
將代碼中”done <- true”和”<-done”,去掉再執行,沒有上面的輸出內容。
管道
通道可以用來連接goroutine,這樣一個的輸出是另一個輸入。這就叫做管道:
1 package main
2
3 import (
4 "fmt"
5 "time"
6 )
7 var echo chan string
8 var receive chan string
9
10 // 定義goroutine 1
11 func Echo() {
12 time.Sleep(1*time.Second)
13 echo <- "這是一次測試"
14 }
15
16 // 定義goroutine 2
17 func Receive() {
18 temp := <- echo // 阻塞等待echo的通道的返回
19 receive <- temp
20 }
21
22
23 func main() {
24 echo = make(chan string)
25 receive = make(chan string)
26
27 go Echo()
28 go Receive()
29
30 getStr := <-receive // 接收goroutine 2的返回
31
32 fmt.Println(getStr)
33 }
輸出字符串:”這是一次測試”。
在這裏不一定要去關閉channel,因為底層的垃圾回收機制會根據它是否可以訪問來決定是否自動回收它。(這裏不是根據channel是否關閉來決定的)
單向通道類型
1 package main
2
3 import (
4 "fmt"
5 "time"
6 )
7
8 // 定義goroutine 1
9 func Echo(out chan<- string) { // 定義輸出通道類型
10 time.Sleep(1*time.Second)
11 out <- "這又是一次測試"
12 close(out)
13 }
14
15 // 定義goroutine 2
16 func Receive(out chan<- string, in <-chan string) { // 定義輸出通道類型和輸入類型
17 temp := <-in // 阻塞等待echo的通道的返回
18 out <- temp
19 close(out)
20 }
21
22
23 func main() {
24 echo := make(chan string)
25 receive := make(chan string)
26
27 go Echo(echo)
28 go Receive(receive, echo)
29
30 getStr := <-receive // 接收goroutine 2的返回
31
32 fmt.Println(getStr)
33 }
輸出:這又是一次測試。
緩衝管道
goroutine的通道默認是是阻塞的,那麼有什麼辦法可以緩解阻塞? 答案是:加一個緩衝區。
創建一個緩衝通道:
1 ch := make(chan string, 3) // 創建了緩衝區為3的通道
2
3 //==
4 len(ch) // 長度計算
5 cap(ch) // 容量計算
緩衝通道傳遞數據示意圖:
2. 內部結構
Go語言channel是first-class的,意味着它可以被存儲到變量中,可以作為參數傳遞給函數,也可以作為函數的返回值返回。作為Go語言的核心特徵之一,雖然channel看上去很高端,但是其實channel僅僅就是一個數據結構而已,具體定義在 $GOROOT/src/runtime/chan.go
里。如下:
1 type hchan struct {
2 qcount uint // 隊列中的總數據
3 dataqsiz uint // 循環隊列的大小
4 buf unsafe.Pointer // 指向dataqsiz元素數組
5 elemsize uint16 //
6 closed uint32
7 elemtype *_type // 元素類型
8 sendx uint // 發送索引
9 recvx uint // 接收索引
10 recvq waitq // 接待員名單, 因recv而阻塞的等待隊列。
11 sendq waitq // 發送服務員列表, 因send而阻塞的等待隊列。
12 //鎖定保護hchan中的所有字段,以及幾個在此通道上阻止的sudogs中的字段。
13 //按住此鎖定時不要更改另一個G的狀態(尤其是不要準備G),因為這可能會導致死鎖堆棧縮小。
14 lock mutex
15 }
其中一個核心的部分是存放channel數據的環形隊列,由qcount和elemsize分別指定了隊列的容量和當前使用量。dataqsize是隊列的大小。elemalg是元素操作的一個Alg結構體,記錄下元素的操作,如copy函數,equal函數,hash函數等。
如果是帶緩衝區的chan,則緩衝區數據實際上是緊接着Hchan結構體中分配的。不帶緩衝的 channel ,環形隊列 size 則為 0。
1 c = (Hchan*)runtime.mal(n + hint*elem->size);
另一重要部分就是recvq和sendq兩個鏈表,一個是因讀這個通道而導致阻塞的goroutine,另一個是因為寫這個通道而阻塞的goroutine。如果一個goroutine阻塞於channel了,那麼它就被掛在recvq或sendq中。WaitQ是鏈表的定義,包含一個頭結點和一個尾結點:
1 struct WaitQ
2 {
3 SudoG* first;
4 SudoG* last;
5 };
隊列中的每個成員是一個SudoG結構體變量:
1 struct SudoG
2 {
3 G* g; // g和selgen構成
4 uint32 selgen; // 指向g的弱指針
5 SudoG* link;
6 int64 releasetime;
7 byte* elem; // 數據元素
8 };
該結構中主要的就是一個g和一個elem。elem用於存儲goroutine的數據。讀通道時,數據會從Hchan的隊列中拷貝到SudoG的elem域。寫通道時,數據則是由SudoG的elem域拷貝到Hchan的隊列中。
基本的寫channel操作,在底層運行時庫中對應的是一個runtime.chansend函數。
1 c <- v
在運行時庫中會執行:
1 void runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
其中c就是channel,ep是取變量v的地址。這裏的傳值約定是調用者負責分配好ep的空間,僅需要簡單的取變量地址就夠了。pres參數是在select中的通道操作使用的。
這個函數首先會區分是同步還是異步。同步是指chan是不帶緩衝區的,因此可能寫阻塞,而異步是指chan帶緩衝區,只有緩衝區滿才阻塞。在同步的情況下,由於channel本身是不帶數據緩存的,這時首先會查看Hchan結構體中的recvq鏈表時否為空,即是否有因為讀該管道而阻塞的goroutine。如果有則可以正常寫channel,否則操作會阻塞。
recvq不為空的情況下,將一個SudoG結構體出隊列,將傳給通道的數據(函數參數ep)拷貝到SudoG結構體中的elem域,並將SudoG中的g放到就緒隊列中,狀態置為ready,然後函數返回。如果recvq為空,否則要將當前goroutine阻塞。此時將一個SudoG結構體,掛到通道的sendq鏈表中,這個SudoG中的elem域是參數eq,SudoG中的g是當前的goroutine。當前goroutine會被設置為waiting狀態並掛到等待隊列中。
在異步的情況,如果緩衝區滿了,也是要將當前goroutine和數據一起作為SudoG結構體掛在sendq隊列中,表示因寫channel而阻塞。否則也是先看有沒有recvq鏈表是否為空,有就喚醒。跟同步不同的是在channel緩衝區不滿的情況,這裏不會阻塞寫者,而是將數據放到channel的緩衝區中,調用者返回。
讀channel的操作也是類似的,對應的函數是runtime.chansend。一個是收一個是發,基本的過程都是差不多的。
當協程嘗試從未關閉的 channel 中讀取數據時,內部的操作如下:
- 當 buf 非空時,此時 recvq 必為空,buf 彈出一個元素給讀協程,讀協程獲得數據後繼續執行,此時若 sendq 非空,則從 sendq 中彈出一個寫協程轉入 running 狀態,待寫數據入隊列 buf ,此時讀取操作
<- ch
未阻塞;
- 當 buf 為空但 sendq 非空時(不帶緩衝的 channel),則從 sendq 中彈出一個寫協程轉入 running 狀態,待寫數據直接傳遞給讀協程,讀協程繼續執行,此時讀取操作
<- ch
未阻塞;
- 當 buf 為空並且 sendq 也為空時,讀協程入隊列 recvq 並轉入 blocking 狀態,當後續有其他協程往 channel 寫數據時,讀協程才會重新轉入 running 狀態,此時讀取操作
<- ch
阻塞。
類似的,當協程嘗試往未關閉的 channel 中寫入數據時,內部的操作如下:
- 當隊列 recvq 非空時,此時隊列 buf 必為空,從 recvq 彈出一個讀協程接收待寫數據,此讀協程此時結束阻塞並轉入 running 狀態,寫協程繼續執行,此時寫入操作
ch <-
未阻塞;
- 當隊列 recvq 為空但 buf 未滿時,此時 sendq 必為空,寫協程的待寫數據入 buf 然後繼續執行,此時寫入操作
ch <-
未阻塞;
- 當隊列 recvq 為空並且 buf 為滿時,此時寫協程入隊列 sendq 並轉入 blokcing 狀態,當後續有其他協程從 channel 中讀數據時,寫協程才會重新轉入 running 狀態,此時寫入操作
ch <-
阻塞。
當關閉 non-nil channel 時,內部的操作如下:
- 當隊列 recvq 非空時,此時 buf 必為空,recvq 中的所有協程都將收到對應類型的零值然後結束阻塞狀態;
- 當隊列 sendq 非空時,此時 buf 必為滿,sendq 中的所有協程都會產生 panic ,在 buf 中數據仍然會保留直到被其他協程讀取。
空通道是指將一個channel賦值為nil,或者定義后不調用make進行初始化。按照Go語言的語言規範,讀寫空通道是永遠阻塞的。其實在函數runtime.chansend和runtime.chanrecv開頭就有判斷這類情況,如果發現參數c是空的,則直接將當前的goroutine放到等待隊列,狀態設置為waiting。
讀一個關閉的通道,永遠不會阻塞,會返回一個通道數據類型的零值。這個實現也很簡單,將零值複製到調用函數的參數ep中。寫一個關閉的通道,則會panic。關閉一個空通道,也會導致panic。
3. channel的高級用法
3.1 條件變量(condition variable)
類型於 POSIX 接口中線程通知其他線程某個事件發生的條件變量,channel 的特性也可以用來當成協程之間同步的條件變量。因為 channel 只是用來通知,所以 channel 中具體的數據類型和值並不重要,這種場景一般用 strct {}
作為 channel 的類型。
一對一通知
類似 pthread_cond_signal()
的功能,用來在一個協程中通知另個某一個協程事件發生:
1 package main
2
3 import (
4 "fmt"
5 "time"
6 )
7
8 func main() {
9 ch := make(chan struct{})
10 nums := make([]int, 100)
11
12 go func() {
13 time.Sleep(time.Second)
14 for i := 0; i < len(nums); i++ {
15 nums[i] = i
16 }
17 // send a finish signal
18 ch <- struct{}{}
19 }()
20
21 // wait for finish signal
22 <-ch
23 fmt.Println(nums)
24 }
廣播通知
類似 pthread_cond_broadcast()
的功能。利用從已關閉的 channel 讀取數據時總是非阻塞的特性,可以實現在一個協程中向其他多個協程廣播某個事件發生的通知:
1 package main
2
3 import (
4 "fmt"
5 "time"
6 )
7
8 func main() {
9 N := 10
10 exit := make(chan struct{})
11 done := make(chan struct{}, N)
12
13 // start N worker goroutines
14 for i := 0; i < N; i++ {
15 go func(n int) {
16 for {
17 select {
18 // wait for exit signal
19 case <-exit:
20 fmt.Printf("worker goroutine #%d exit\n", n)
21 done <- struct{}{}
22 return
23 case <-time.After(time.Second):
24 fmt.Printf("worker goroutine #%d is working...\n", n)
25 }
26 }
27 }(i)
28 }
29
30 time.Sleep(3 * time.Second)
31 // broadcast exit signal
32 close(exit)
33 // wait for all worker goroutines exit
34 for i := 0; i < N; i++ {
35 <-done
36 }
37 fmt.Println("main goroutine exit")
38 }
3.2 信號量
channel 的讀/寫相當於信號量的 P / V 操作,下面的示例程序中 channel 相當於信號量:
1 package main
2
3 import (
4 "log"
5 "math/rand"
6 "time"
7 )
8
9 type Seat int
10 type Bar chan Seat
11
12 func (bar Bar) ServeConsumer(customerId int) {
13 log.Print("-> consumer#", customerId, " enters the bar")
14 seat := <-bar // need a seat to drink
15 log.Print("consumer#", customerId, " drinks at seat#", seat)
16 time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
17 log.Print("<- consumer#", customerId, " frees seat#", seat)
18 bar <- seat // free the seat and leave the bar
19 }
20
21 func main() {
22 rand.Seed(time.Now().UnixNano())
23
24 bar24x7 := make(Bar, 10) // the bar has 10 seats
25 // Place seats in an bar.
26 for seatId := 0; seatId < cap(bar24x7); seatId++ {
27 bar24x7 <- Seat(seatId) // none of the sends will block
28 }
29
30 // a new consumer try to enter the bar for each second
31 for customerId := 0; ; customerId++ {
32 time.Sleep(time.Second)
33 go bar24x7.ServeConsumer(customerId)
34 }
35 }
3.3 互斥量
互斥量相當於二元信號里,所以 cap 為 1 的 channel 可以當成互斥量使用:
1 package main
2
3 import "fmt"
4
5 func main() {
6 mutex := make(chan struct{}, 1) // the capacity must be one
7
8 counter := 0
9 increase := func() {
10 mutex <- struct{}{} // lock
11 counter++
12 <-mutex // unlock
13 }
14
15 increase1000 := func(done chan<- struct{}) {
16 for i := 0; i < 1000; i++ {
17 increase()
18 }
19 done <- struct{}{}
20 }
21
22 done := make(chan struct{})
23 go increase1000(done)
24 go increase1000(done)
25 <-done; <-done
26 fmt.Println(counter) // 2000
27 }
4. 關閉 channel
關閉不再需要使用的 channel 並不是必須的。跟其他資源比如打開的文件、socket 連接不一樣,這類資源使用完后不關閉後會造成句柄泄露,channel 使用完后不關閉也沒有關係,channel 沒有被任何協程用到后最終會被 GC 回收。關閉 channel 一般是用來通知其他協程某個任務已經完成了。golang 也沒有直接提供判斷 channel 是否已經關閉的接口,雖然可以用其他不太優雅的方式自己實現一個:
1 func isClosed(ch chan int) bool {
2 select {
3 case <-ch:
4 return true
5 default:
6 }
7 return false
8 }
不過實現一個這樣的接口也沒什麼必要。因為就算通過 isClosed()
得到當前 channel 當前還未關閉,如果試圖往 channel 里寫數據,仍然可能會發生 panic ,因為在調用 isClosed()
后,其他協程可能已經把 channel 關閉了。
關閉 channel 時應該注意以下準則:
- 不要在讀取端關閉 channel ,因為寫入端無法知道 channel 是否已經關閉,往已關閉的 channel 寫數據會 panic ;
- 有多個寫入端時,不要再寫入端關閉 channle ,因為其他寫入端無法知道 channel 是否已經關閉,關閉已經關閉的 channel 會發生 panic ;
- 如果只有一個寫入端,可以在這個寫入端放心關閉 channel 。
關閉 channel 粗暴一點的做法是隨意關閉,如果產生了 panic 就用 recover 避免進程掛掉。稍好一點的方案是使用標準庫的 sync
包來做關閉 channel 時的協程同步,不過使用起來也稍微複雜些。下面介紹一種優雅些的做法。
4.1 一寫多讀
這種場景下這個唯一的寫入端可以關閉 channel 用來通知讀取端所有數據都已經寫入完成了。讀取端只需要用 for range
把 channel 中數據遍歷完就可以了,當 channel 關閉時,for range
仍然會將 channel 緩衝中的數據全部遍歷完然後再退出循環:
1 package main
2
3 import (
4 "fmt"
5 "sync"
6 )
7
8 func main() {
9 wg := &sync.WaitGroup{}
10 ch := make(chan int, 100)
11
12 send := func() {
13 for i := 0; i < 100; i++ {
14 ch <- i
15 }
16 // signal sending finish
17 close(ch)
18 }
19
20 recv := func(id int) {
21 defer wg.Done()
22 for i := range ch {
23 fmt.Printf("receiver #%d get %d\n", id, i)
24 }
25 fmt.Printf("receiver #%d exit\n", id)
26 }
27
28 wg.Add(3)
29 go recv(0)
30 go recv(1)
31 go recv(2)
32 send()
33
34 wg.Wait()
35 }
4.2 多寫一讀
這種場景下雖然可以用 sync.Once
來解決多個寫入端重複關閉 channel 的問題,但更優雅的辦法設置一個額外的 channel ,由讀取端通過關閉來通知寫入端任務完成不要再繼續再寫入數據了:
1 package main
2
3 import (
4 "fmt"
5 "sync"
6 )
7
8 func main() {
9 wg := &sync.WaitGroup{}
10 ch := make(chan int, 100)
11 done := make(chan struct{})
12
13 send := func(id int) {
14 defer wg.Done()
15 for i := 0; ; i++ {
16 select {
17 case <-done:
18 // get exit signal
19 fmt.Printf("sender #%d exit\n", id)
20 return
21 case ch <- id*1000 + i:
22 }
23 }
24 }
25
26 recv := func() {
27 count := 0
28 for i := range ch {
29 fmt.Printf("receiver get %d\n", i)
30 count++
31 if count >= 1000 {
32 // signal recving finish
33 close(done)
34 return
35 }
36 }
37 }
38
39 wg.Add(3)
40 go send(0)
41 go send(1)
42 go send(2)
43 recv()
44
45 wg.Wait()
46 }
4.2 多寫多讀
這種場景稍微複雜,和上面的例子一樣,也需要設置一個額外 channel 用來通知多個寫入端和讀取端。另外需要起一個額外的協程來通過關閉這個 channel 來廣播通知:
1 package main
2
3 import (
4 "fmt"
5 "sync"
6 "time"
7 )
8
9 func main() {
10 wg := &sync.WaitGroup{}
11 ch := make(chan int, 100)
12 done := make(chan struct{})
13
14 send := func(id int) {
15 defer wg.Done()
16 for i := 0; ; i++ {
17 select {
18 case <-done:
19 // get exit signal
20 fmt.Printf("sender #%d exit\n", id)
21 return
22 case ch <- id*1000 + i:
23 }
24 }
25 }
26
27 recv := func(id int) {
28 defer wg.Done()
29 for {
30 select {
31 case <-done:
32 // get exit signal
33 fmt.Printf("receiver #%d exit\n", id)
34 return
35 case i := <-ch:
36 fmt.Printf("receiver #%d get %d\n", id, i)
37 time.Sleep(time.Millisecond)
38 }
39 }
40 }
41
42 wg.Add(6)
43 go send(0)
44 go send(1)
45 go send(2)
46 go recv(0)
47 go recv(1)
48 go recv(2)
49
50 time.Sleep(time.Second)
51 // signal finish
52 close(done)
53 // wait all sender and receiver exit
54 wg.Wait()
55 }
channle 作為 golang 最重要的特性,用起來還是比較方便的。傳統的 C 里要實現類似的功能的話,一般需要用到 socket 或者 FIFO 來實現,另外還要考慮數據包的完整性與併發衝突的問題,channel 則屏蔽了這些底層細節,使用者只需要考慮讀寫就可以了。 channel 是引用類型,了解一下 channel 底層的機制對更好的使用 channel 還是很用必要的。雖然操作原語簡單,但涉及到阻塞的問題,使用不當可能會造成死鎖或者無限制的協程創建最終導致進程掛掉。
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※專營大陸空運台灣貨物推薦
※台灣空運大陸一條龍服務
※評比彰化搬家公司費用,南投搬家公司費用收費行情懶人包大公開
※彰化搬家費用,南投搬家費用,距離,噸數怎麼算?達人教你簡易估價知識!