天天看點

【GoLang之協程】

GoLang之協程

目前,WebServer幾種主流的并發模型:

  • 多線程,每個線程一次處理一個請求,在目前請求處理完成之前不會接收其它請求;但在高并發環境下,多線程的開銷比較大;
  • 基于回調的異步IO,如Nginx伺服器使用的epoll模型,這種模式通過事件驅動的方式使用異步IO,使伺服器持續運轉,但人的思維模式是串行的,大量回調函數會把流程分割,對于問題本身的反應不夠自然;
  • 協程,不需要搶占式排程,可以有效提高線程的任務并發性,而避免多線程的缺點;但原生支援協程的語言還很少。

協程(coroutine)是Go語言中的輕量級線程實作,由Go運作時(runtime)管理。

在一個函數調用前加上go關鍵字,這次調用就會在一個新的goroutine中并發執行。當被調用的函數傳回時,這個goroutine也自動結束。需要注意的是,如果這個函數有傳回值,那麼這個傳回值會被丢棄。

先看下面的例子:

func Add(x, y int) {
    z := x + y
    fmt.Println(z)
}

func main() {
    for i:=0; i<10; i++ {
        go Add(i, i)
    }
}      

執行上面的代碼,會發現螢幕什麼也沒列印出來,程式就退出了。

對于上面的例子,main()函數啟動了10個goroutine,然後傳回,這時程式就退出了,而被啟動的執行Add()的goroutine沒來得及執行。我們想要讓main()函數等待所有goroutine退出後再傳回,但如何知道goroutine都退出了呢?這就引出了多個goroutine之間通信的問題。

在工程上,有兩種最常見的并發通信模型:共享記憶體和消息。

來看下面的例子,10個goroutine共享了變量counter,每個goroutine執行完成後,将counter值加1.因為10個goroutine是并發執行的,是以我們還引入了鎖,也就是代碼中的lock變量。在main()函數中,使用for循環來不斷檢查counter值,當其值達到10時,說明所有goroutine都執行完畢了,這時main()傳回,程式退出。

package main
import (
    "fmt"
    "sync"
    "runtime"
)

var counter int = 0

func Count(lock *sync.Mutex) {
    lock.Lock()
    counter++
    fmt.Println("counter =", counter)
    lock.Unlock()
}


func main() {

    lock := &sync.Mutex{}

    for i:=0; i<10; i++ {
        go Count(lock)
    }

    for {
        lock.Lock()

        c := counter

        lock.Unlock()

        runtime.Gosched()    // 出讓時間片

        if c >= 10 {
            break
        }
    }
}      

上面的例子,使用了鎖變量(屬于一種共享記憶體)來同步協程,事實上Go語言主要使用消息機制(channel)來作為通信模型。

channel

消息機制認為每個并發單元是自包含的、獨立的個體,并且都有自己的變量,但在不同并發單元間這些變量不共享。每個并發單元的輸入和輸出隻有一種,那就是消息。

channel是Go語言在語言級别提供的goroutine間的通信方式,我們可以使用channel在多個goroutine之間傳遞消息。channel是程序内的通信方式,是以通過channel傳遞對象的過程和調用函數時的參數傳遞行為比較一緻,比如也可以傳遞指針等。

channel是類型相關的,一個channel隻能傳遞一種類型的值,這個類型需要在聲明channel時指定。

channel的聲明形式為:

var chanName chan ElementType

舉個例子,聲明一個傳遞int類型的channel:

var ch chan int      

使用内置函數make()定義一個channel:

ch := make(chan int)      

在channel的用法中,最常見的包括寫入和讀出:

// 将一個資料value寫入至channel,這會導緻阻塞,直到有其他goroutine從這個channel中讀取資料
ch <- value

// 從channel中讀取資料,如果channel之前沒有寫入資料,也會導緻阻塞,直到channel中被寫入資料為止
value := <-ch      

可以關閉不再使用的channel:

close(ch)      

 我們還可以建立一個帶緩沖的channel:

c := make(chan int, 1024)

// 從帶緩沖的channel中讀資料
for i:=range c {
  ...
}      

此時,建立一個大小為1024的int類型的channel,即使沒有讀取方,寫入方也可以一直往channel裡寫入,在緩沖區被填完之前都不會阻塞。

現在利用channel來重寫上面的例子:

func Count(ch chan int) {
    ch <- 1
    fmt.Println("Counting")
}

func main() {

    chs := make([] chan int, 10)

    for i:=0; i<10; i++ {
        chs[i] = make(chan int)
        go Count(chs[i])
    }

    for _, ch := range(chs) {
        <-ch
    }
}      

在這個例子中,定義了一個包含10個channel的數組,并把數組中的每個channel配置設定給10個不同的goroutine。在每個goroutine完成後,向goroutine寫入一個資料,在這個channel被讀取前,這個操作是阻塞的。在所有的goroutine啟動完成後,依次從10個channel中讀取資料,在對應的channel寫入資料前,這個操作也是阻塞的。這樣,就用channel實作了類似鎖的功能,并保證了所有goroutine完成後main()才傳回。

另外,我們在将一個channel變量傳遞到一個函數時,可以通過将其指定為單向channel變量,進而限制該函數中可以對此channel的操作。

單向channel變量的聲明:

var ch1 chan int      // 普通channel
var ch2 chan <- int    // 隻用于寫int資料
var ch3 <-chan int    // 隻用于讀int資料      

可以通過類型轉換,将一個channel轉換為單向的:

ch4 := make(chan int)
ch5 := <-chan int(ch4)   // 單向讀
ch6 := chan<- int(ch4)  //單向寫      

單向channel的作用有點類似于c++中的const關鍵字,用于遵循代碼“最小權限原則”。

例如在一個函數中使用單向讀channel:

func Parse(ch <-chan int) {
    for value := range ch {
        fmt.Println("Parsing value", value) 
    }
}      

channel作為一種原生類型,本身也可以通過channel進行傳遞,例如下面這個流式處理結構:

type PipeData struct {
    value int
    handler func(int) int
    next chan int
}

func handle(queue chan *PipeData) {
    for data := range queue {
        data.next <- data.handler(data.value)
    }
}      

select

在UNIX中,select()函數用來監控一組描述符,該機制常被用于實作高并發的socket伺服器程式。Go語言直接在語言級别支援select關鍵字,用于處理異步IO問題,大緻結構如下:

select {
    case <- chan1:
    // 如果chan1成功讀到資料
    
    case chan2 <- 1:
    // 如果成功向chan2寫入資料

    default:
    // 預設分支
}      

Go語言沒有對channel提供直接的逾時處理機制,但我們可以利用select來間接實作,例如:

timeout := make(chan bool, 1)

go func() {
    time.Sleep(1e9)
    timeout <- true
}()

switch {
    case <- ch:
    // 從ch中讀取到資料

    case <- timeout:
    // 沒有從ch中讀取到資料,但從timeout中讀取到了資料
}      

這樣使用select就可以避免永久等待的問題,因為程式會在timeout中擷取到一個資料後繼續執行,而無論對ch的讀取是否還處于等待狀态。

同步鎖

Go語言包中的sync包提供了兩種鎖類型:sync.Mutex和sync.RWMutex,前者是互斥鎖,後者是讀寫鎖。

使用鎖的經典模式:

var lck sync.Mutex
func foo() {
    lck.Lock() 
    defer lck.Unlock()
    // ...
}      

lck.Lock()會阻塞直到擷取鎖,然後利用defer語句在函數傳回時自動釋放鎖。

對于從全局角度隻需要運作一次的代碼,比如全局初始化操作,Go語言提供了一個once類型來保證全局的唯一性操作,如下:

var flag int32
var once sync.Once

func initialize() {
    flag = 3
    fmt.Println(flag)
}

func setup() {
    once.Do(initialize)
}

func main() {

    setup()    
    setup()    
}      
func CompareAndSwapUnit64(val *uint64, old, new uint64) (swapped bool)