天天看點

【Golang】十一、重點篇 --- 協程和管道!一、goroutine(協程)二、channel(管道)三、gorutine 和 channel 配合

協程

  • 一、goroutine(協程)
    • (1)程序和線程
    • (2)并發和并行
    • (3)協程應用
      • 3-1. 協程基本概念
      • 3-2. MPG模式
      • 3-3. 設定cpu數目
      • 3-4. 協程加互斥鎖(寫鎖)
  • 二、channel(管道)
    • (1)基本用法
    • (2)注意事項
    • (3)入隊 出隊
    • (4)管道關閉
    • (5)管道周遊
  • 三、gorutine 和 channel 配合
    • (1)go和chan是如何協作的?
    • (2)協程和管道配和原理分析
    • (3)WaitGroup
    • (4)案例分析(取素數)
    • (5)案例分析(作用協程)

一、goroutine(協程)

(1)程序和線程

【Golang】十一、重點篇 --- 協程和管道!一、goroutine(協程)二、channel(管道)三、gorutine 和 channel 配合

(2)并發和并行

并發:多個線程,輪訓執行

【Golang】十一、重點篇 --- 協程和管道!一、goroutine(協程)二、channel(管道)三、gorutine 和 channel 配合

并行:多個線程,運作在不同的cpu上(多核上運作),同時執行

【Golang】十一、重點篇 --- 協程和管道!一、goroutine(協程)二、channel(管道)三、gorutine 和 channel 配合

(3)協程應用

3-1. 協程基本概念

  1. 什麼是協程?

一個golang的程式下可以啟多個協程,協程可以了解為輕量級的線程(編譯器優化)

特點:

  1. 有獨立的棧空間
  2. 共享程式堆空間
  3. 排程由程式員控制
  4. 輕量級的線程
  1. 協程執行流程
    func main() {
    	go test()
    	for i := 0; i < 10; i++ {
    		fmt.Println("main()測試,這是第" + strconv.Itoa(i) + "次")
    		time.Sleep(time.Second*1)
    	}
    }
    
    func test() {
    	for i := 0; i < 100; i++ {
    		fmt.Println("test()測試,這是第" + strconv.Itoa(i) + "次")
    		time.Sleep(time.Second*1)
    	}
    }
               
    輸出:
    【Golang】十一、重點篇 --- 協程和管道!一、goroutine(協程)二、channel(管道)三、gorutine 和 channel 配合
    流程:
    【Golang】十一、重點篇 --- 協程和管道!一、goroutine(協程)二、channel(管道)三、gorutine 和 channel 配合

3-2. MPG模式

3-3. 設定cpu數目

// 檢視目前的cpu核心數
	numcpu := runtime.NumCPU()
	fmt.Println(numcpu)
	
	// 設定最大協程數。1.8以後go會自動配置設定
	maxcpu := runtime.GOMAXPROCS(3)
	fmt.Println(maxcpu)
           

3-4. 協程加互斥鎖(寫鎖)

var (
	resMap = make(map[int]uint64)
	// 互斥鎖 寫鎖
	lock sync.Mutex
)
func jiecheng(n int) {
	var res uint64 = 1
	for i := 1; i <= n; i++ {
		res *= uint64(i)
	}
	// 加鎖
	lock.Lock()
	resMap[n] = res
	// 解鎖
	lock.Unlock()
}

func main(){
	for i := 1; i < 200; i++ {
		go jiecheng(i)
	}
	// 周遊map
	i := 1
	for item := range lockChan {
		fmt.Println(item)
		if i == len(lockChan) {
			close(lockChan)
		}
		i++
	}
}
           

二、channel(管道)

channel和MQ的管道類似,是一個隊列,先進先出

(1)基本用法

  1. 建立(關鍵詞chan)

    lockChan := make(chan int, 200)

  2. 插入

    lockChan<- res

  3. 讀取

    num := <-lockChan

  4. 聲明讀寫管道

    var chann chan int

  5. 聲明隻寫管道

    var chann chan<- int

  6. 聲明隻讀管道

    var chann <-chan int

(2)注意事項

  1. channel中隻可以存放指定的資料類型
  2. 當channel申請的cap滿了就不可以在繼續存放
  3. channel取出後還可以繼續存放,不會改變cap的大小
  4. 沒有協程的情況,channel資料取完了,再取會報錯
  5. 管道記憶體空間已滿,但是隻要有讀協程,就會阻塞,等待資料取出再繼續插入,沒有讀的協程則會死鎖報錯

(3)入隊 出隊

func main() {
	// 先進先出
	chanIn := make(chan interface{}, 3)
	chanIn<- Cat{Name: "小貓咪"}
	chanIn<- 1
	chanIn<- "tet"
	
	// 先進先出 輸出的是:Cat{Name: "小貓咪"}
	fmt.Println(<-chanIn)
}
           

(4)管道關閉

channel關閉後,不可以在繼續寫入,但是可以繼續讀

  1. 寫入操作
    chan1 := make(chan int, 3)
    	chan1<- 1
    	chan1<- 2
    	close(chan1)
    	chan1<- 3 // 此處會報錯 :panic: send on closed channel
               

    chan1<- 3

    :panic: send on closed channel 意思是向一個關閉的管道發送資料
  2. 讀取操作
    chan1 := make(chan int, 3)
    	chan1<- 1
    	chan1<- 2
    	close(chan1)
    
    	x, ok := <-chan1
    	fmt.Println(x, ok)
    	// 輸出=》1, true
    	x, ok = <-chan1
    	fmt.Println(x, ok)
    	// 輸出=》2, true
    	x, ok = <-chan1
    	fmt.Println(x, ok)
    	// 輸出=》0, false
               

    x表示從通道擷取的資料,ok表示是否拿到

    如果資料取完:

    • x為聲明類型的預設值
    • ok為false

(5)管道周遊

管道周遊建議使用for-range的方式周遊

需要注意:
  1. 周遊時,如果channel沒有關閉,則會報deadlock(死鎖)的錯誤
  2. 周遊時,channel關閉,可以正常的周遊資料,周遊結束既退出
func main() {
	chan2 := AddChan()
	for i := range chan2 {
		fmt.Println(i)
	}
}

func AddChan() chan int {
	chan2 := make(chan int, 100)
	for i := 0; i < 100; i++ {
		chan2<- i
	}
	close(chan2)
	return chan2
}
           

三、gorutine 和 channel 配合

(1)go和chan是如何協作的?

  1. 主線程會有一個for循環,當管道沒有關閉時,會一直等待資料寫入
  2. 協程寫完資料需要關閉管道,管道關閉,關閉不會在繼續等待資料!!!資料全部取完

    val, ok := <-chann

    的ok則為false,則可以退出!!!
func main(){
	chann := make(chan int, 100)
	go write(chann)
	for {
		//  這裡會等待,,,隻要chann沒有關閉,chann會一直等待資料
		val, ok := <-chann
		if !ok {
			break
		}
	}
}

func  write(chann chan int){
	for i:=0;i<100;i++ {
		chann<- i
	}
	// 這裡一定要關閉管道,否則主線取資料會一直鎖死,等待資料!!!
	close(chann)
}
           

(2)協程和管道配和原理分析

  1. 有兩個管道,一個存儲資料的管道,一個存儲退出标記的管道
    // 資料管道
    intChan := make(chan int, 50)
    // 退出管道
    exitChan := make(chan bool, 1)
               
  2. 開啟寫的協程,執行完成記得關閉,否則讀取管道一直等待導緻死鎖
    func writeDate(intChan chan int)  {
    	for i := 0; i < 50; i++ {
    		intChan<- i
    		time.Sleep(time.Millisecond*1)
    	}
    	// 
    	close(intChan)
    }
               
  3. 寫資料協程讀取完所有資料,存放一個标記到

    exitChan

    管道
    func readDate(intChan chan int) {
    	for {
    		// 存一個,讀一個,隻要通道沒有關閉,讀取會一直等待
    		v, ok := <-intChan
    		if !ok {
    			break
    		}
    		fmt.Println("讀取到的資料:", v)
    	}
    	// 讀取完成
    	exitChan<- true
    	close(exitChan)
    }
               
  4. 主線程建立守護線程,保證主線程不會被銷毀,直到

    exitChan

    隊列放入資料則祝線程可以結束
    func main() {
    	// 資料管道
    	intChan := make(chan int, 50)
    	// 退出管道
    	exitChan := make(chan bool, 1)
    	
    	go writeDate(intChan)
    	go readDate(intChan, exitChan)
    
    	// 阻塞主線程,,,直到exitChan取出退出标志結束循環
    	for {
    		if <-exitChan {
    			fmt.Println("結束")
    			break
    		}
    	}
    }
               

(3)WaitGroup

go很人性化,為了避免我們寫出太多非業務代碼,為我們提供了

WaitGroup

來控制協程,去标志是否完成
var wg sync.WaitGroup
func main() {

	// 資料管道
	intChan := make(chan int, 50)

	wg.Add(1)
	go writeDate(intChan)

	wg.Add(1)
	go readDate(intChan)

	wg.Wait()
}
func writeDate(intChan chan int)  {
	defer wg.Done()
}
func readDate(intChan chan int) {
	defer wg.Done()
}
           

函數講解:

  1. 隻要開啟一個協程,就

    Add(1)

    ,表示開啟一個協程
  2. 協程執行完畢,則需要

    Done()

    ,表示從協程式等待組裡删除
  3. 主線程的

    Wait()

    相當于我上面原了解釋的(一個for循環會一直等待資料讀取完成),隻有當所有的協程都

    Done()

    後,才會繼續執行

    Wait()

    後續代碼。

需要注意: Add的協程數量,需要和Done的協程數對應,否則死鎖報錯

(4)案例分析(取素數)

  1. 開啟一個writeDate協程,向管道intChan中寫入50個資料
  2. 開啟一個readDate協程,從管道intChan中讀取寫入的資料
  3. 主線程需要等到所有協程執行完畢,之後在退出
  4. writeDate、readDate操作同一個管道

讀寫鎖

package chann
	
	import (
		"fmt"
		"time"
	)
	
	func ReadAndWrite() {
		// 資料管道
		intChan := make(chan int, 50)
		// 退出管道
		exitChan := make(chan bool, 1)
	
		for i := 0; i < 10; i++ {
			go writeDate(intChan)
		}
		go writeDate(intChan)
		go readDate(intChan, exitChan)
	
		// 阻塞主線程,,,直到exitChan取出退出标志結束循環
		for {
			if <-exitChan {
				fmt.Println("結束")
				break
			}
		}
	}
	
	func writeDate(intChan chan int)  {
		for i := 0; i < 50; i++ {
			intChan<- i
			time.Sleep(time.Millisecond*1)
		}
		close(intChan)
	}
	
	func readDate(intChan chan int, exitChan chan bool) {
		for {
			v, ok := <-intChan
			if !ok {
				break
			}
			fmt.Println("讀取到的資料:", v)
		}
		// 讀取完成
		exitChan<- true
		close(exitChan)
	}
           

WaitGroup

package chann
	
	import (
		"fmt"
		"sync"
		"time"
	)
	var wg sync.WaitGroup
	func ReadAndWrite() {
	
		// 資料管道
		intChan := make(chan int, 50)
	
		wg.Add(1)
		go writeDate(intChan)
	
		wg.Add(1)
		go readDate(intChan)
	
		// 等等待所有協程結束退出主程序
		wg.Wait()
	}
	
	func writeDate(intChan chan int)  {
		// 協程結束done
		defer wg.Done()
		for i := 0; i < 50; i++ {
			intChan<- i
			time.Sleep(time.Millisecond*1)
		}
		close(intChan)
	}

	func readDate(intChan chan int) {
		// 協程結束done
		defer wg.Done()
		for {
			v, ok := <-intChan
			if !ok {
				break
			}
			fmt.Println("讀取到的資料:", v)
		}
	}
           

(5)案例分析(作用協程)