协程
- 一、goroutine(协程)
-
- (1)进程和线程
- (2)并发和并行
- (3)协程应用
-
- 3-1. 协程基本概念
- 3-2. MPG模式
- 3-3. 设置cpu数目
- 3-4. 协程加互斥锁(写锁)
- 二、channel(管道)
-
- (1)基本用法
- (2)注意事项
- (3)入队 出队
- (4)管道关闭
- (5)管道遍历
- 三、gorutine 和 channel 配合
-
- (1)go和chan是如何协作的?
- (2)协程和管道配和原理分析
- (3)WaitGroup
- (4)案例分析(取素数)
- (5)案例分析(作用协程)
一、goroutine(协程)
(1)进程和线程
(2)并发和并行
并发:多个线程,轮训执行
并行:多个线程,运行在不同的cpu上(多核上运行),同时执行
(3)协程应用
3-1. 协程基本概念
- 什么是协程?
一个golang的程序下可以启多个协程,协程可以理解为轻量级的线程(编译器优化)
特点:
- 有独立的栈空间
- 共享程序堆空间
- 调度由程序员控制
- 轻量级的线程
- 协程执行流程
输出: 流程:func main() { go test() for i := 0; i < 10; i++ { fmt.Println("main()测试,这是第" + strconv.Itoa(i) + "次") time.Sleep(time.Second*1) } } func test() { for i := 0; i < 100; i++ { fmt.Println("test()测试,这是第" + strconv.Itoa(i) + "次") time.Sleep(time.Second*1) } }
3-2. MPG模式
3-3. 设置cpu数目
// 查看目前的cpu核心数
numcpu := runtime.NumCPU()
fmt.Println(numcpu)
// 设置最大协程数。1.8以后go会自动分配
maxcpu := runtime.GOMAXPROCS(3)
fmt.Println(maxcpu)
3-4. 协程加互斥锁(写锁)
var (
resMap = make(map[int]uint64)
// 互斥锁 写锁
lock sync.Mutex
)
func jiecheng(n int) {
var res uint64 = 1
for i := 1; i <= n; i++ {
res *= uint64(i)
}
// 加锁
lock.Lock()
resMap[n] = res
// 解锁
lock.Unlock()
}
func main(){
for i := 1; i < 200; i++ {
go jiecheng(i)
}
// 遍历map
i := 1
for item := range lockChan {
fmt.Println(item)
if i == len(lockChan) {
close(lockChan)
}
i++
}
}
二、channel(管道)
channel和MQ的管道类似,是一个队列,先进先出
(1)基本用法
- 创建(关键词chan)
lockChan := make(chan int, 200)
- 插入
lockChan<- res
- 读取
num := <-lockChan
- 声明读写管道
var chann chan int
- 声明只写管道
var chann chan<- int
- 声明只读管道
var chann <-chan int
(2)注意事项
- channel中只可以存放指定的数据类型
- 当channel申请的cap满了就不可以在继续存放
- channel取出后还可以继续存放,不会改变cap的大小
- 没有协程的情况,channel数据取完了,再取会报错
- 管道内存空间已满,但是只要有读协程,就会阻塞,等待数据取出再继续插入,没有读的协程则会死锁报错
(3)入队 出队
func main() {
// 先进先出
chanIn := make(chan interface{}, 3)
chanIn<- Cat{Name: "小猫咪"}
chanIn<- 1
chanIn<- "tet"
// 先进先出 输出的是:Cat{Name: "小猫咪"}
fmt.Println(<-chanIn)
}
(4)管道关闭
channel关闭后,不可以在继续写入,但是可以继续读
- 写入操作
chan1 := make(chan int, 3) chan1<- 1 chan1<- 2 close(chan1) chan1<- 3 // 此处会报错 :panic: send on closed channel
:panic: send on closed channel 意思是向一个关闭的管道发送数据chan1<- 3
- 读取操作
chan1 := make(chan int, 3) chan1<- 1 chan1<- 2 close(chan1) x, ok := <-chan1 fmt.Println(x, ok) // 输出=》1, true x, ok = <-chan1 fmt.Println(x, ok) // 输出=》2, true x, ok = <-chan1 fmt.Println(x, ok) // 输出=》0, false
x表示从通道获取的数据,ok表示是否拿到
如果数据取完:
- x为声明类型的默认值
- ok为false
(5)管道遍历
管道遍历建议使用for-range的方式遍历
需要注意:
- 遍历时,如果channel没有关闭,则会报deadlock(死锁)的错误
- 遍历时,channel关闭,可以正常的遍历数据,遍历结束既退出
func main() {
chan2 := AddChan()
for i := range chan2 {
fmt.Println(i)
}
}
func AddChan() chan int {
chan2 := make(chan int, 100)
for i := 0; i < 100; i++ {
chan2<- i
}
close(chan2)
return chan2
}
三、gorutine 和 channel 配合
(1)go和chan是如何协作的?
- 主线程会有一个for循环,当管道没有关闭时,会一直等待数据写入
- 协程写完数据需要关闭管道,管道关闭,关闭不会在继续等待数据!!!数据全部取完
的ok则为false,则可以退出!!!
val, ok := <-chann
func main(){
chann := make(chan int, 100)
go write(chann)
for {
// 这里会等待,,,只要chann没有关闭,chann会一直等待数据
val, ok := <-chann
if !ok {
break
}
}
}
func write(chann chan int){
for i:=0;i<100;i++ {
chann<- i
}
// 这里一定要关闭管道,否则主线取数据会一直锁死,等待数据!!!
close(chann)
}
(2)协程和管道配和原理分析
- 有两个管道,一个存储数据的管道,一个存储退出标记的管道
// 数据管道 intChan := make(chan int, 50) // 退出管道 exitChan := make(chan bool, 1)
- 开启写的协程,执行完成记得关闭,否则读取管道一直等待导致死锁
func writeDate(intChan chan int) { for i := 0; i < 50; i++ { intChan<- i time.Sleep(time.Millisecond*1) } // close(intChan) }
- 写数据协程读取完所有数据,存放一个标记到
管道exitChan
func readDate(intChan chan int) { for { // 存一个,读一个,只要通道没有关闭,读取会一直等待 v, ok := <-intChan if !ok { break } fmt.Println("读取到的数据:", v) } // 读取完成 exitChan<- true close(exitChan) }
- 主线程创建守护线程,保证主线程不会被销毁,直到
队列放入数据则祝线程可以结束exitChan
func main() { // 数据管道 intChan := make(chan int, 50) // 退出管道 exitChan := make(chan bool, 1) go writeDate(intChan) go readDate(intChan, exitChan) // 阻塞主线程,,,直到exitChan取出退出标志结束循环 for { if <-exitChan { fmt.Println("结束") break } } }
(3)WaitGroup
go很人性化,为了避免我们写出太多非业务代码,为我们提供了 WaitGroup
来控制协程,去标志是否完成
var wg sync.WaitGroup
func main() {
// 数据管道
intChan := make(chan int, 50)
wg.Add(1)
go writeDate(intChan)
wg.Add(1)
go readDate(intChan)
wg.Wait()
}
func writeDate(intChan chan int) {
defer wg.Done()
}
func readDate(intChan chan int) {
defer wg.Done()
}
函数讲解:
- 只要开启一个协程,就
,表示开启一个协程Add(1)
- 协程执行完毕,则需要
,表示从协程序等待组里删除Done()
- 主线程的
相当于我上面原理解释的(一个for循环会一直等待数据读取完成),只有当所有的协程都Wait()
后,才会继续执行Done()
后续代码。Wait()
需要注意: Add的协程数量,需要和Done的协程数对应,否则死锁报错
(4)案例分析(取素数)
- 开启一个writeDate协程,向管道intChan中写入50个数据
- 开启一个readDate协程,从管道intChan中读取写入的数据
- 主线程需要等到所有协程执行完毕,之后在退出
- writeDate、readDate操作同一个管道
读写锁
package chann
import (
"fmt"
"time"
)
func ReadAndWrite() {
// 数据管道
intChan := make(chan int, 50)
// 退出管道
exitChan := make(chan bool, 1)
for i := 0; i < 10; i++ {
go writeDate(intChan)
}
go writeDate(intChan)
go readDate(intChan, exitChan)
// 阻塞主线程,,,直到exitChan取出退出标志结束循环
for {
if <-exitChan {
fmt.Println("结束")
break
}
}
}
func writeDate(intChan chan int) {
for i := 0; i < 50; i++ {
intChan<- i
time.Sleep(time.Millisecond*1)
}
close(intChan)
}
func readDate(intChan chan int, exitChan chan bool) {
for {
v, ok := <-intChan
if !ok {
break
}
fmt.Println("读取到的数据:", v)
}
// 读取完成
exitChan<- true
close(exitChan)
}
WaitGroup
package chann
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func ReadAndWrite() {
// 数据管道
intChan := make(chan int, 50)
wg.Add(1)
go writeDate(intChan)
wg.Add(1)
go readDate(intChan)
// 等等待所有协程结束退出主进程
wg.Wait()
}
func writeDate(intChan chan int) {
// 协程结束done
defer wg.Done()
for i := 0; i < 50; i++ {
intChan<- i
time.Sleep(time.Millisecond*1)
}
close(intChan)
}
func readDate(intChan chan int) {
// 协程结束done
defer wg.Done()
for {
v, ok := <-intChan
if !ok {
break
}
fmt.Println("读取到的数据:", v)
}
}