天天看点

【Golang】十一、重点篇 --- 协程和管道!一、goroutine(协程)二、channel(管道)三、gorutine 和 channel 配合

协程

  • 一、goroutine(协程)
    • (1)进程和线程
    • (2)并发和并行
    • (3)协程应用
      • 3-1. 协程基本概念
      • 3-2. MPG模式
      • 3-3. 设置cpu数目
      • 3-4. 协程加互斥锁(写锁)
  • 二、channel(管道)
    • (1)基本用法
    • (2)注意事项
    • (3)入队 出队
    • (4)管道关闭
    • (5)管道遍历
  • 三、gorutine 和 channel 配合
    • (1)go和chan是如何协作的?
    • (2)协程和管道配和原理分析
    • (3)WaitGroup
    • (4)案例分析(取素数)
    • (5)案例分析(作用协程)

一、goroutine(协程)

(1)进程和线程

【Golang】十一、重点篇 --- 协程和管道!一、goroutine(协程)二、channel(管道)三、gorutine 和 channel 配合

(2)并发和并行

并发:多个线程,轮训执行

【Golang】十一、重点篇 --- 协程和管道!一、goroutine(协程)二、channel(管道)三、gorutine 和 channel 配合

并行:多个线程,运行在不同的cpu上(多核上运行),同时执行

【Golang】十一、重点篇 --- 协程和管道!一、goroutine(协程)二、channel(管道)三、gorutine 和 channel 配合

(3)协程应用

3-1. 协程基本概念

  1. 什么是协程?

一个golang的程序下可以启多个协程,协程可以理解为轻量级的线程(编译器优化)

特点:

  1. 有独立的栈空间
  2. 共享程序堆空间
  3. 调度由程序员控制
  4. 轻量级的线程
  1. 协程执行流程
    func main() {
    	go test()
    	for i := 0; i < 10; i++ {
    		fmt.Println("main()测试,这是第" + strconv.Itoa(i) + "次")
    		time.Sleep(time.Second*1)
    	}
    }
    
    func test() {
    	for i := 0; i < 100; i++ {
    		fmt.Println("test()测试,这是第" + strconv.Itoa(i) + "次")
    		time.Sleep(time.Second*1)
    	}
    }
               
    输出:
    【Golang】十一、重点篇 --- 协程和管道!一、goroutine(协程)二、channel(管道)三、gorutine 和 channel 配合
    流程:
    【Golang】十一、重点篇 --- 协程和管道!一、goroutine(协程)二、channel(管道)三、gorutine 和 channel 配合

3-2. MPG模式

3-3. 设置cpu数目

// 查看目前的cpu核心数
	numcpu := runtime.NumCPU()
	fmt.Println(numcpu)
	
	// 设置最大协程数。1.8以后go会自动分配
	maxcpu := runtime.GOMAXPROCS(3)
	fmt.Println(maxcpu)
           

3-4. 协程加互斥锁(写锁)

var (
	resMap = make(map[int]uint64)
	// 互斥锁 写锁
	lock sync.Mutex
)
func jiecheng(n int) {
	var res uint64 = 1
	for i := 1; i <= n; i++ {
		res *= uint64(i)
	}
	// 加锁
	lock.Lock()
	resMap[n] = res
	// 解锁
	lock.Unlock()
}

func main(){
	for i := 1; i < 200; i++ {
		go jiecheng(i)
	}
	// 遍历map
	i := 1
	for item := range lockChan {
		fmt.Println(item)
		if i == len(lockChan) {
			close(lockChan)
		}
		i++
	}
}
           

二、channel(管道)

channel和MQ的管道类似,是一个队列,先进先出

(1)基本用法

  1. 创建(关键词chan)

    lockChan := make(chan int, 200)

  2. 插入

    lockChan<- res

  3. 读取

    num := <-lockChan

  4. 声明读写管道

    var chann chan int

  5. 声明只写管道

    var chann chan<- int

  6. 声明只读管道

    var chann <-chan int

(2)注意事项

  1. channel中只可以存放指定的数据类型
  2. 当channel申请的cap满了就不可以在继续存放
  3. channel取出后还可以继续存放,不会改变cap的大小
  4. 没有协程的情况,channel数据取完了,再取会报错
  5. 管道内存空间已满,但是只要有读协程,就会阻塞,等待数据取出再继续插入,没有读的协程则会死锁报错

(3)入队 出队

func main() {
	// 先进先出
	chanIn := make(chan interface{}, 3)
	chanIn<- Cat{Name: "小猫咪"}
	chanIn<- 1
	chanIn<- "tet"
	
	// 先进先出 输出的是:Cat{Name: "小猫咪"}
	fmt.Println(<-chanIn)
}
           

(4)管道关闭

channel关闭后,不可以在继续写入,但是可以继续读

  1. 写入操作
    chan1 := make(chan int, 3)
    	chan1<- 1
    	chan1<- 2
    	close(chan1)
    	chan1<- 3 // 此处会报错 :panic: send on closed channel
               

    chan1<- 3

    :panic: send on closed channel 意思是向一个关闭的管道发送数据
  2. 读取操作
    chan1 := make(chan int, 3)
    	chan1<- 1
    	chan1<- 2
    	close(chan1)
    
    	x, ok := <-chan1
    	fmt.Println(x, ok)
    	// 输出=》1, true
    	x, ok = <-chan1
    	fmt.Println(x, ok)
    	// 输出=》2, true
    	x, ok = <-chan1
    	fmt.Println(x, ok)
    	// 输出=》0, false
               

    x表示从通道获取的数据,ok表示是否拿到

    如果数据取完:

    • x为声明类型的默认值
    • ok为false

(5)管道遍历

管道遍历建议使用for-range的方式遍历

需要注意:
  1. 遍历时,如果channel没有关闭,则会报deadlock(死锁)的错误
  2. 遍历时,channel关闭,可以正常的遍历数据,遍历结束既退出
func main() {
	chan2 := AddChan()
	for i := range chan2 {
		fmt.Println(i)
	}
}

func AddChan() chan int {
	chan2 := make(chan int, 100)
	for i := 0; i < 100; i++ {
		chan2<- i
	}
	close(chan2)
	return chan2
}
           

三、gorutine 和 channel 配合

(1)go和chan是如何协作的?

  1. 主线程会有一个for循环,当管道没有关闭时,会一直等待数据写入
  2. 协程写完数据需要关闭管道,管道关闭,关闭不会在继续等待数据!!!数据全部取完

    val, ok := <-chann

    的ok则为false,则可以退出!!!
func main(){
	chann := make(chan int, 100)
	go write(chann)
	for {
		//  这里会等待,,,只要chann没有关闭,chann会一直等待数据
		val, ok := <-chann
		if !ok {
			break
		}
	}
}

func  write(chann chan int){
	for i:=0;i<100;i++ {
		chann<- i
	}
	// 这里一定要关闭管道,否则主线取数据会一直锁死,等待数据!!!
	close(chann)
}
           

(2)协程和管道配和原理分析

  1. 有两个管道,一个存储数据的管道,一个存储退出标记的管道
    // 数据管道
    intChan := make(chan int, 50)
    // 退出管道
    exitChan := make(chan bool, 1)
               
  2. 开启写的协程,执行完成记得关闭,否则读取管道一直等待导致死锁
    func writeDate(intChan chan int)  {
    	for i := 0; i < 50; i++ {
    		intChan<- i
    		time.Sleep(time.Millisecond*1)
    	}
    	// 
    	close(intChan)
    }
               
  3. 写数据协程读取完所有数据,存放一个标记到

    exitChan

    管道
    func readDate(intChan chan int) {
    	for {
    		// 存一个,读一个,只要通道没有关闭,读取会一直等待
    		v, ok := <-intChan
    		if !ok {
    			break
    		}
    		fmt.Println("读取到的数据:", v)
    	}
    	// 读取完成
    	exitChan<- true
    	close(exitChan)
    }
               
  4. 主线程创建守护线程,保证主线程不会被销毁,直到

    exitChan

    队列放入数据则祝线程可以结束
    func main() {
    	// 数据管道
    	intChan := make(chan int, 50)
    	// 退出管道
    	exitChan := make(chan bool, 1)
    	
    	go writeDate(intChan)
    	go readDate(intChan, exitChan)
    
    	// 阻塞主线程,,,直到exitChan取出退出标志结束循环
    	for {
    		if <-exitChan {
    			fmt.Println("结束")
    			break
    		}
    	}
    }
               

(3)WaitGroup

go很人性化,为了避免我们写出太多非业务代码,为我们提供了

WaitGroup

来控制协程,去标志是否完成
var wg sync.WaitGroup
func main() {

	// 数据管道
	intChan := make(chan int, 50)

	wg.Add(1)
	go writeDate(intChan)

	wg.Add(1)
	go readDate(intChan)

	wg.Wait()
}
func writeDate(intChan chan int)  {
	defer wg.Done()
}
func readDate(intChan chan int) {
	defer wg.Done()
}
           

函数讲解:

  1. 只要开启一个协程,就

    Add(1)

    ,表示开启一个协程
  2. 协程执行完毕,则需要

    Done()

    ,表示从协程序等待组里删除
  3. 主线程的

    Wait()

    相当于我上面原理解释的(一个for循环会一直等待数据读取完成),只有当所有的协程都

    Done()

    后,才会继续执行

    Wait()

    后续代码。

需要注意: Add的协程数量,需要和Done的协程数对应,否则死锁报错

(4)案例分析(取素数)

  1. 开启一个writeDate协程,向管道intChan中写入50个数据
  2. 开启一个readDate协程,从管道intChan中读取写入的数据
  3. 主线程需要等到所有协程执行完毕,之后在退出
  4. writeDate、readDate操作同一个管道

读写锁

package chann
	
	import (
		"fmt"
		"time"
	)
	
	func ReadAndWrite() {
		// 数据管道
		intChan := make(chan int, 50)
		// 退出管道
		exitChan := make(chan bool, 1)
	
		for i := 0; i < 10; i++ {
			go writeDate(intChan)
		}
		go writeDate(intChan)
		go readDate(intChan, exitChan)
	
		// 阻塞主线程,,,直到exitChan取出退出标志结束循环
		for {
			if <-exitChan {
				fmt.Println("结束")
				break
			}
		}
	}
	
	func writeDate(intChan chan int)  {
		for i := 0; i < 50; i++ {
			intChan<- i
			time.Sleep(time.Millisecond*1)
		}
		close(intChan)
	}
	
	func readDate(intChan chan int, exitChan chan bool) {
		for {
			v, ok := <-intChan
			if !ok {
				break
			}
			fmt.Println("读取到的数据:", v)
		}
		// 读取完成
		exitChan<- true
		close(exitChan)
	}
           

WaitGroup

package chann
	
	import (
		"fmt"
		"sync"
		"time"
	)
	var wg sync.WaitGroup
	func ReadAndWrite() {
	
		// 数据管道
		intChan := make(chan int, 50)
	
		wg.Add(1)
		go writeDate(intChan)
	
		wg.Add(1)
		go readDate(intChan)
	
		// 等等待所有协程结束退出主进程
		wg.Wait()
	}
	
	func writeDate(intChan chan int)  {
		// 协程结束done
		defer wg.Done()
		for i := 0; i < 50; i++ {
			intChan<- i
			time.Sleep(time.Millisecond*1)
		}
		close(intChan)
	}

	func readDate(intChan chan int) {
		// 协程结束done
		defer wg.Done()
		for {
			v, ok := <-intChan
			if !ok {
				break
			}
			fmt.Println("读取到的数据:", v)
		}
	}
           

(5)案例分析(作用协程)