天天看點

GO 協程協程

協程

程序, 線程, 協程概念

  • 程序:
    • 概念: 資源配置設定的基本機關
    • 通信: 程序之間的通信隻能通過程序通信的方式進行
    • 多程序: 拷貝,使用fork(),生成子程序。每個程序擁有獨立的位址空間(代碼段、堆棧段、資料段)
  • 線程:
    • 概念: 排程運作的最小機關
    • 通信: 同一程序中的線程共享資料(比如全局變量,靜态變量)
    • 多線程: 同一個程序中的線程,它們之間共享大部分資料,使用相同的位址空間。當然線程是擁有自己的局部變量和堆棧(注意不是堆)
  • 協程:
    • 概念: 非搶占式排程。使用者态模拟程序線程的切換的具體實作,并非OS核心提供的功能。由程式員主動控制協程之間的切換。
    • 通信: 不要通過共享記憶體來通信,而應該通過通信來共享記憶體。

      golang提供一種基于消息機制而非共享記憶體的通信模型。消息機制認為每個并發單元都是自包含的獨立個體,并且擁有自己的變量,但在不同并發單元間這些變量不共享。每個并發單元的輸入和輸出隻有一種,那就是消息。

GO協程

goroutine(go協程)是由Go runtime管理的輕量級線程。

說明協程是使用者态, 由Go runtime管理而非OS核心管理

例子:

package main

import (
   "fmt"
   "time"
)

func say(s string) {
   for i := 0; i < 5; i++ {
       time.Sleep(100 * time.Millisecond)
       fmt.Println(s)
   }
}

func main() {
   go say("world")  //啟動go routine
   say("hello")
}
           

channel

概述

要想了解 channel 要先知道 CSP 模型。CSP 是 Communicating Sequential Process 的簡稱,中文可以叫做通信順序程序,是一種并發程式設計模型,由 Tony Hoare 于 1977 年提出。簡單來說,CSP 模型由并發執行的實體(線程或者程序)所組成,實體之間通過發送消息進行通信,這裡發送消息時使用的就是通道,或者叫 channel。CSP 模型的關鍵是關注 channel,而不關注發送消息的實體。Go 語言實作了 CSP 部分理論,goroutine 對應 CSP 中并發執行的實體,channel 也就對應着 CSP 中的 channel。

Channel是Go中的一個核心類型,你可以把它看成一個管道,通過它并發核心單元就可以發送或者接收資料進行通訊(communication)。

channel基礎知識

  1. unBufferChan  := make(chan int) //1
    bufferChan := make(chan int, N) //2
               
  2. 上面的方式 1 建立的是無緩沖 channel,方式 2 建立的是緩沖 channel。如果使用 channel 之前沒有 make,會出現 dead lock 錯誤。至于為什麼是 dead lock,下文我們從源碼裡面看看。
    1. func main() {
          var x chan int
          go func() {
              x <- 1
          }()
          <-x
      }
                 
    2. $ go run channel1.go
      fatal error: all goroutines are asleep - deadlock!
      
      goroutine 1 [chan receive (nil chan)]:
      main.main()
          /Users/kltao/code/go/examples/channl/channel1.go:11 +0x60
      
      goroutine 4 [chan send (nil chan)]:
      main.main.func1(0x0)
                 
  3. channel讀寫操作
    1. ch := make(chan int, 10)
      
      // 讀操作
      x <- ch
      
      // 寫操作
      ch <- x
                 
  4. channel種類
    1. channel 分為無緩沖 channel 和有緩沖 channel。兩者的差別如下:
      • 無緩沖:發送和接收動作是同時發生的。如果沒有 goroutine 讀取 channel (<- channel),則發送者 (channel <-) 會一直阻塞。
      • 緩沖:緩沖 channel 類似一個有容量的隊列。當隊列滿的時候發送者會阻塞;當隊列空的時候接收者會阻塞。
  5. 關閉channel
    1. ch := make(chan int)
      
      // 關閉
      close(ch)
      
      // ok-idiom  用于區分channel中是預設值還是channel關閉了
      val, ok := <-ch
      if ok == false {
          // channel closed
      }
                 
    2. 關閉時要注意:
      • 重複關閉 channel 會導緻 panic。
      • 向關閉的 channel 發送資料會 panic。
      • 從關閉的 channel 讀資料不會 panic,讀出 channel 中已有的資料之後再讀就是 channel 類似的預設值,比如 chan int 類型的 channel 關閉之後讀取到的值為 0。

channel典型用法

  1. goroutine通信
    1. func main() {
          x := make(chan int)
          go func() {
              x <- 1
          }()
          <-x
      }
      
                 
  2. select
    1. select 一定程度上可以類比于 linux 中的 IO 多路複用中的 select。後者相當于提供了對多個 IO 事件的統一管理,而 Golang 中的 select 相當于提供了對多個 channel 的統一管理。當然這隻是 select 在 channel 上的一種使用方法。
    2. select {
          case e, ok := <-ch1:
              ...
          case e, ok := <-ch2:
              ...
          default:  
      }
                 
    3. select 會阻塞,直到條件分支中的某個可以繼續執行,這時就會執行那個條件分支。當多個都準備好的時候,會随機選擇一個。
    4. func receive(ch chan int) {
          for {
              <-ch
          }
      }
      
      func send(ch1, ch2, ch3 chan int) {
          for i := 0; i < 10; i++ {
              // sleep是為了保證所有的管道receiver都已阻塞等待資料
              time.Sleep(1000 * time.Millisecond)
              select {
              case ch1 <- i:
                  fmt.Printf("send %d to ch1\n", i)
              case ch2 <- i:
                  fmt.Printf("send %d to ch2\n", i)
              case ch3 <- i:
                  fmt.Printf("send %d to ch3\n", i)
              }
          }
      }
      
      func main() {
          ch1 := make(chan int)
          ch2 := make(chan int)
          ch3 := make(chan int)
          go receive(ch1)
          go receive(ch2)
          go receive(ch3)
          send(ch1, ch2, ch3)
      } //每次結果不一樣
                 
  3. range channel
    1. range channel 可以直接取到 channel 中的值。當我們使用 range 來操作 channel 的時候,一旦 channel 關閉,channel 内部資料讀完之後循環自動結束。
    2. func consumer(ch chan int) {  //消費者
          for x := range ch {
              fmt.Println(x)
              ...
          }
      }
      
      func producer(ch chan int) { //生産者
        for _, v := range values {
            ch <- v
        }  
      }
                 
  4. 逾時控制
    1. select {
        case <- ch:
          // get data from ch
        case <- time.After(2 * time.Second)
          // read data from ch timeout
      }
      //timeAfter可以換成其他任何異常控制流
                 
  5. 生産者-消費者模型, 如第三條顯示
  6. 單向channel
    1. 單向 channel,顧名思義隻能寫或讀的 channel。但是仔細一想,隻能寫的 channel,如果不讀其中的值有什麼用呢?其實單向 channel 主要用在函數聲明中。
    2. func send(c chan<- int) {
          fmt.Printf("send: %T\n", c)
          c <- 1
      }
      
      func recv(c <-chan int) {
          fmt.Printf("recv: %T\n", c)
          fmt.Println(<-c)
      }
      
      func main() {
          c := make(chan int)
          fmt.Printf("%T\n", c)
          go send(c)
          go recv(c)
          time.Sleep(1 * time.Second)
      }
      /**
       * output:
       *	chan int
       *      send: chan<- int
       *      recv: <-chan int
       *	1
        */
                 

同步(sync)

互斥鎖

  1. 概述:用于主動控制Mutex類型的變量或者将Mutex類型作為struct的元素的變量在同一時間隻被一個routine通路(即執行Lock()方法的代碼塊),這個Mutex帶有2個方法:Lock()和Unlock()。互斥鎖不區分讀和寫,即無論是print列印還是寫操作都是互斥的
    1. func main() {
          var mutex sync.Mutex
          fmt.Printf("%+v\n", mutex)
      
          mutex.Lock()
          fmt.Printf("%+v\n", mutex)
      
          mutex.Unlock()
          fmt.Printf("%+v\n", mutex)
      }
                 
  2. 使用

    // SafeCounter is safe to use concurrently.

    type SafeCounter struct {

    v map[string]int

    mux sync.Mutex

    }

    // Inc increments the counter for the given key.

    func (c *SafeCounter) Inc(key string, id int) {

    c.mux.Lock()

    fmt.Printf("%d. Inc lock.\n", id)

    // Lock so only one goroutine at a time can access the map c.v.

    c.v[key]++

    c.mux.Unlock()

    fmt.Printf("%d. Inc unlock.\n", id)

    }

    // Value returns the current value of the counter for the given key.

    func (c *SafeCounter) Value(key string) int {

    c.mux.Lock()

    fmt.Println(“Value lock.”)

    // Lock so only one goroutine at a time can access the map c.v.

    defer fmt.Println(“Value unlock.”)

    defer c.mux.Unlock()

    return c.v[key]

    }

    func main() {

    c := SafeCounter{v: make(map[string]int)}

    for i := 0; i < 10; i++ {

    go c.Inc(“somekey”, i)

    }

    time.Sleep(time.Second)
     fmt.Println(c.Value("somekey"))
               
    }
  3. 已經鎖定的Mutex與特定的goroutine無關聯
    1. 已經鎖定的Mutex并不與特定的goroutine相關聯,這樣可以利用一個goroutine對其加鎖,再利用其他goroutine對其解鎖

    package main

    import (

    “fmt”

    “sync”

    “time”

    )

    type MyStruct struct {

    v int

    mux sync.Mutex

    }

    func (s *MyStruct) Lock() {

    s.mux.Lock()

    }

    func (s *MyStruct) Unlock() {

    s.mux.Unlock()

    }

    func main() {

    s := MyStruct{v: 0}

    s.v = 1

    fmt.Printf("%+v\n", s)

    go s.Lock()
     time.Sleep(1 * time.Second)
     fmt.Printf("%+v\n", s)
    
     go s.Unlock()
     time.Sleep(1 * time.Second)
     fmt.Printf("%+v\n", s)
               
    }
    3. 雖然互斥鎖可以被直接的在多個Goroutine之間共享,但是我們還是強烈建議把對同一個互斥鎖的成對的鎖定和解鎖操作放在同一個層次的代碼塊中。例如,在同一個函數或方法中對某個互斥鎖的進行鎖定和解鎖。
    
               

讀寫鎖

  1. 概述: 讀寫鎖是針對于讀寫操作的互斥鎖。它與普通的互斥鎖最大的不同就是,它可以分别針對讀操作和寫操作進行鎖定和解鎖操作。
    1. 注意點:
      1. 同時隻能有一個 goroutine 能夠獲得寫鎖定。
      2. 同時可以有任意多個 gorouinte 獲得讀鎖定。
      3. 同時隻能存在寫鎖定或讀鎖定(讀和寫互斥)。
  2. 方法:
    1. func (rw *RWMutex) Lock       //寫鎖定
      func (rw *RWMutex) Unlock     //寫解鎖
      func (rw *RWMutex) RLock      //讀鎖定
      func (rw *RWMutex) RUnlock    //讀解鎖
      
      //都實作了Locker接口
      type Locker interface {
          Lock()
          Unlock()
      }
      //還有一個RLocker方法
      func (rw *RWMutex) RLocker() Locker    //傳回實作了sync.Locker接口的值
                 
    2. 這個RLocker()作用是,使用Lock()和Unlock()來進行讀鎖定和讀解鎖,而無需RLock()和RUnlock()來進行讀鎖定和讀解鎖

WaitGroup

WaitGroup用于等待一組goroutine結束, 有三個方法

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
           
  • Add 用來添加 goroutine 的個數
  • Done 執行一次數量減 1
  • Wait 用來等待結束

例子

func main() {
    var wg sync.WaitGroup
    fmt.Printf("init:             %+v\n", wg)

    for i := 1; i < 10; i++ {
        // 計數加 1
        wg.Add(1)
        go func(i int) {
            fmt.Printf("goroutine%d start: %+v\n", i, wg)
            time.Sleep(11 * time.Second)
            // 計數減 1
            wg.Done()
            fmt.Printf("goroutine%d end:   %+v\n", i, wg)
        }(i)
        time.Sleep(time.Second)
    }

    // 等待執行結束
    wg.Wait()
    fmt.Printf("over:             %+v\n", wg)
}
           

注意: wg.Add() 方法一定要在 goroutine 開始前執行

條件變量(cond)

與互斥量不同,條件變量的作用并不是保證在同一時刻僅有一個線程通路某一個共享資料,而是在對應的共享資料的狀态發生變化時,通知其他是以而被阻塞的線程。條件變量總是與互斥量組合使用。互斥量為共享資料的通路提供互斥支援,而條件變量可以就共享資料的狀态的變化向相關線程發出通知。

//聲明
lock := new(sync.Mutex)
cond := sync.NewCond(lock)

//或者
cond := sync.NewCond(new(synv.Mutex))
           

方法

cond.L.Lock()
cond.L.Unlock() 也可以使用lock.Lock()和lock.Unlock(),完全一樣,因為是指針轉遞
cond.Wait(): Unlock()->阻塞等待通知(即等待Signal()或Broadcast()的通知)->收到通知->Lock()

cond.Signal() : 通知一個Wait()了的,若沒有Wait(),也不會報錯。Signal()通知的順序是根據原來加入通知清單(Wait())的先入先出

cond.Broadcast(): 通知所有Wait()了的,若沒有Wait(),也不會報錯
           

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-pkpnhYul-1575184984806)(/home/eugeo/文檔/學習筆記/Java-golang-learning/go/協程.assets/cond_4.png)]

示例代碼

func main() {
    cond := sync.NewCond(new(sync.Mutex))
    condition := 0

    // Consumer
    go func() {
        for {
            cond.L.Lock()
            for condition == 0 {
                cond.Wait()
            }
            condition--
            fmt.Printf("Consumer: %d\n", condition)
            cond.Signal()
            cond.L.Unlock()
        }
    }()

    // Producer
    for {
        time.Sleep(time.Second)
        cond.L.Lock()
        for condition == 3 {
            cond.Wait()
        }
        condition++
        fmt.Printf("Producer: %d\n", condition)
        cond.Signal()
        cond.L.Unlock()
    }
}
           

輸出:

Producer: 1
Consumer: 0
Producer: 1
Consumer: 0
Producer: 1
Consumer: 0
Producer: 1
Consumer: 0
Producer: 1
Consumer: 0
           

該例子僅适用于單消費者和單生産者, 同時對condition的判斷隻有0和1這種布爾值狀态

實際使用, 應該先channel再鎖

func main() {
    ch := make(chan int, 3)
    v := 0

    // Consumer
    go func() {
        for {
            fmt.Printf("Consumer: %d\n", <-ch)
        }
    }()

    // Producer
    for {
        v++
        fmt.Printf("Producer: %d\n", v)
        ch <- v
        time.Sleep(time.Second)
    }
}
           

臨時對象池

堆和棧

程式會從作業系統申請一塊記憶體,而這塊記憶體也會被分成堆和棧。

func F() {
    temp := make([]int, 0, 20) //臨時變量将申請到棧上
    ...
}
           

棧可以簡單得了解成一次函數調用内部申請到的記憶體,它們會随着函數的傳回把記憶體還給系統。申請到棧記憶體好處:函數傳回直接釋放,不會引起垃圾回收,對性能沒有影響。

func F() []int{
    a := make([]int, 0, 20)
    return a
}
           

而上面這段代碼,申請的代碼一模一樣,但是申請後作為傳回值傳回了,編譯器會認為變量之後還會被使用,當函數傳回之後并不會将其記憶體歸還,那麼它就會被申請到堆上面了。申請到堆上面的記憶體才會引起垃圾回收。

func F() {
    a := make([]int, 0, 20)
    b := make([]int, 0, 20000)
    l := 20
    c := make([]int, 0, l)
}
           

a和b代碼一樣,就是申請的空間不一樣大,但是它們兩個的命運是截然相反的。a前面已經介紹過,會申請到棧上面,而b,由于申請的記憶體較大,編譯器會把這種申請記憶體較大的變量轉移到堆上面。即使是臨時變量,申請過大也會在堆上面申請。

而c,對我們而言其含義和a是一緻的,但是編譯器對于這種不定長度的申請方式,也會在堆上面申請,即使申請的長度很短。

在項目中一般都是c用法而申請記憶體變成了慢語句,解決方法就是使用臨時對象池

臨時對象池例子:
// 一個[]byte的對象池,每個對象為一個[]byte
var bytePool = sync.Pool{
    New: func() interface{} {
        b := make([]byte, 1024)
        return &b
    },
}

func main() {
    a := time.Now().Unix()
    // 不使用對象池
    for i := 0; i < 1000000000; i++ {
        obj := make([]byte, 1024)
        _ = obj
    }
    b := time.Now().Unix()
    // 使用對象池
    for i := 0; i < 1000000000; i++ {
        obj := bytePool.Get().(*[]byte)
        bytePool.Put(obj)
    }
    c := time.Now().Unix()
    fmt.Println("without pool ", b-a, "s") //20s
    fmt.Println("with    pool ", c-b, "s")  //15s
           

隻有當每個對象占用記憶體較大時候,用pool才會改善性能

  1. 當每個對象的記憶體小于一定量的時候,不使用pool的性能秒殺使用pool;當記憶體處于某個量的時候,不使用pool和使用pool性能相當;當記憶體大于某個量的時候,使用pool的優勢就顯現出來了
  2. 不使用pool,那麼對象占用記憶體越大,性能下降越厲害;使用pool,無論對象占用記憶體大還是小,性能都保持不變。可以看到pool有點像飛機,雖然起步比跑車慢,但後勁十足。
即:pool适合占用記憶體大且并發量大的場景。當記憶體小并發量少的時候,使用pool适得其反
使用場景

sync.Pool一種合适的方法是,為臨時緩沖區建立一個池,多個用戶端使用這個緩沖區來共享全局資源。另一方面,如果釋放連結清單是某個對象的一部分,并由這個對象維護,而這個對象隻由一個用戶端使用,在這個用戶端工作完成後釋放連結清單,那麼用Pool實作這個釋放連結清單是不合适的。

在Put之前重置,在Get之後重置

bytePool.Put(obj)
}
c := time.Now().Unix()
fmt.Println("without pool ", b-a, "s") //20s
fmt.Println("with    pool ", c-b, "s")  //15s
           
**隻有當每個對象占用記憶體較大時候,用pool才會改善性能**

>   1.  當每個對象的記憶體小于一定量的時候,不使用pool的性能秒殺使用pool;當記憶體處于某個量的時候,不使用pool和使用pool性能相當;當記憶體大于某個量的時候,使用pool的優勢就顯現出來了
>   2.  不使用pool,那麼對象占用記憶體越大,性能下降越厲害;使用pool,無論對象占用記憶體大還是小,性能都保持不變。可以看到pool有點像飛機,雖然起步比跑車慢,但後勁十足。
>
>   即:pool适合占用記憶體大且并發量大的場景。當記憶體小并發量少的時候,使用pool适得其反

##### 使用場景

sync.Pool一種合适的方法是,為臨時緩沖區建立一個池,多個用戶端使用這個緩沖區來共享全局資源。另一方面,如果釋放連結清單是某個對象的一部分,并由這個對象維護,而這個對象隻由一個用戶端使用,在這個用戶端工作完成後釋放連結清單,那麼用Pool實作這個釋放連結清單是不合适的。

在Put之前重置,在Get之後重置