無緩沖管道 :
指在接收前沒有能力儲存任何值的通道,這種類型通道要求發送gorouutine和接收goroutine同時準備好,才能完成發送和接收操作。如果兩個goroutine沒有同時準備好,
通道會導緻先執行發送或者接收的goroutine阻塞等待,這種對通道進行發送和接收的互動行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在
模拟打羽毛球:發球 接球
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var wg sync.WaitGroup
func init(){
rand.Seed(time.Now().Unix())
}
func main(){
court := make(chan int)
wg.Add(2)
go player("發球",court)
go player("接球",court)
court <- 1
wg.Wait()
}
func player(name string,court chan int) {
defer wg.Done()
for{
ball,ok:= <-court
if !ok{
fmt.Printf("player %s won\n",name)
return
}
n:=rand.Intn(100)
if n%13 ==0{
fmt.Printf("player %s Missed\n",name)
close(court)
return
}
fmt.Printf("player %s Hit %d\n",name,ball)
ball++
court<- ball
}
}
GOROOT=D:\go #gosetup
GOPATH=D:\gospaces #gosetup
D:\go\bin\go.exe build -o C:\Users\Administrator\AppData\Local\Temp\___go_build_listen20_go.exe D:/gocode/test/listen20.go #gosetup
"D:\soft\goland2018.3\GoLand 2018.3.5\bin\runnerw64.exe" C:\Users\Administrator\AppData\Local\Temp\___go_build_listen20_go.exe #gosetup
player 接球 Hit 1
player 發球 Hit 2
player 接球 Hit 3
player 發球 Hit 4
player 接球 Hit 5
player 發球 Hit 6
player 接球 Hit 7
player 發球 Hit 8
player 接球 Hit 9
player 發球 Hit 10
player 接球 Hit 11
player 發球 Hit 12
player 接球 Hit 13
player 發球 Hit 14
player 接球 Hit 15
player 發球 Hit 16
player 接球 Hit 17
player 發球 Hit 18
player 接球 Hit 19
player 發球 Hit 20
player 接球 Hit 21
player 發球 Hit 22
player 接球 Hit 23
player 發球 Hit 24
player 接球 Missed
player 發球 won
4名跑步者圍繞賽道輪流跑 接力棒
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main(){
//建立一個無緩沖的通道
baton := make(chan int)
wg.Add(1)
//第一位跑步者持有接力棒
go Runner(baton)
//開始比賽
baton <- 1
wg.Wait()
}
//Runner 模拟接力比賽中的一位跑步者
func Runner(baton chan int){
var newRunner int
//等待接力棒
runner := <- baton
//開始繞着跑道跑步
fmt.Printf("運動員 %d 到跑到這裡來準備... \n",runner)
//建立下一位跑步者
if runner != 4{
newRunner = runner+1
fmt.Printf("運動員 %d 開始跑... \n",runner)
go Runner(baton)
}
//圍繞着跑到跑步跑100毫秒
time.Sleep(1000000*time.Microsecond)
//比賽結束了嗎?
if runner ==4 {
fmt.Printf("運動員 %d finished ,race over \n",runner)
wg.Done()
return
}
fmt.Printf("運動員 %d 跑完一圈了開始和運動員 %d 交換接力棒... \n",runner,newRunner)
baton <- newRunner
}
運動員 1 到跑到這裡來準備...
運動員 1 開始跑...
運動員 1 跑完一圈了開始和運動員 2 交換接力棒...
運動員 2 到跑到這裡來準備...
運動員 2 開始跑...
運動員 2 跑完一圈了開始和運動員 3 交換接力棒...
運動員 3 到跑到這裡來準備...
運動員 3 開始跑...
運動員 3 跑完一圈了開始和運動員 4 交換接力棒...
運動員 4 到跑到這裡來準備...
運動員 4 finished ,race over
有緩沖的通道:
有緩沖的通道是一種在被接收前能夠存儲一個或者多個值的通道,這種類型的通道并不強制要求協程之間必須同時完成發送和接收。通道會阻塞發送和接收的
動作的條件也會不同,隻有在通道中沒有要接受的值時,接收動作才會阻塞。隻有在通道沒有可用的緩沖的去容納被發送的值時,發送才會阻塞。這回導緻有緩沖
的通道和無緩沖的通道之間的一個很大的不同:無緩沖的通道保證進行發送和接收的協程會在同一時間進行資料交換;有緩沖的通道沒有這種保證

package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
const(
numberGoroutines = 4 //要使用的gotoutine的數量
taskLoad = 10 //要處理的工作的數量
)
//wg 用來等待程式完成
var wg sync.WaitGroup
//優先執行這個函數
func init(){
rand.Seed(time.Now().Unix())
}
//main 是所有go程式入口
func main(){
//建立一個有緩沖的通道來管理工作
tasks := make(chan string,taskLoad)
//啟動協程來處理工作
wg.Add(numberGoroutines)
for gr := 1;gr<=numberGoroutines;gr++ {
go worker(tasks,gr)
}
//增加一組要完成的工作
for post := 1;post<=taskLoad;post++{
tasks <- fmt.Sprintf("task :%d",post)
}
//當所有工作都處理完成是關閉管道
//以便所有goroutine推出
//很多人此處有疑問:為啥任務沒處理完就關閉了
//答:當任務關閉後 協程依然可以從通道中接收資料,但是不能像通道發送資料,能夠從已經關閉的通道接收資料這點非常非常重要
//因為這允許通道關閉後依舊能取出其中緩沖的全部值,而不丢失資料
close(tasks)
//等待所有工作完成
wg.Wait()
}
//worker 作為goroutine的啟動來處理
//從有緩沖的通道傳入的工作
func worker(tasks chan string,worker int){
//通知函數已經傳回、
defer wg.Done()
for{
//等待配置設定工作
task,ok:= <- tasks
if !ok{
//這裡意味着通道已經空了,并且已經被關閉
fmt.Printf("worker;%d:shutting down\n",worker)
return
}
//顯示我們開始工作
fmt.Printf("worker :%d:start %s\n",worker,task)
//随機等待一段時間模拟工作
sleep := rand.Int63n(100)
time.Sleep(time.Duration(sleep)*time.Microsecond)
//顯示我們完成了工作
fmt.Printf("worker:%d:completed %s\n",worker,task)
}
}
View Code
GOROOT=D:\go #gosetup
GOPATH=D:\gospaces #gosetup
D:\go\bin\go.exe build -o C:\Users\Administrator\AppData\Local\Temp\___go_build_listen20_go.exe D:/gocode/test/listen20.go #gosetup
"D:\soft\goland2018.3\GoLand 2018.3.5\bin\runnerw64.exe" C:\Users\Administrator\AppData\Local\Temp\___go_build_listen20_go.exe #gosetup
worker :4:start task :4
worker :3:start task :3
worker :1:start task :1
worker :2:start task :2
worker:3:completed task :3
worker :3:start task :5
worker:4:completed task :4
worker :4:start task :6
worker:1:completed task :1
worker :1:start task :7
worker:2:completed task :2
worker :2:start task :8
worker:2:completed task :8
worker:3:completed task :5
worker :3:start task :10
worker:4:completed task :6
worker;4:shutting down
worker :2:start task :9
worker:1:completed task :7
worker;1:shutting down
worker:3:completed task :10
worker;3:shutting down
worker:2:completed task :9
worker;2:shutting down
Process finished with exit code 0
網易:
package main
import (
"fmt"
)
func main() {
var c chan int
fmt.Printf("c=%v\n", c)
c = make(chan int, 1)
fmt.Printf("c=%v\n", c)
c <- 100
/*
data := <-c
fmt.Printf("data:%v\n", data)
*/
<-c
}
nobufChan 不帶緩沖(不帶大小的chan 無法插入資料的,隻有當有人在擷取資料時候才可以放入資料)
比如:收快遞:隻有快遞員見到你本人後,隻能寄快遞
package main
import (
"fmt"
"time"
)
func produce(c chan int) {
c <- 1000
fmt.Println("produce finished")
}
func consume(c chan int) {
data := <-c
fmt.Println(data)
}
func main() {
var c chan int
fmt.Printf("c=%v\n", c)
c = make(chan int)
go produce(c)
go consume(c)
time.Sleep(time.Second * 5)
}
goroutine_sync 模拟sleep阻塞的功能
package main
import (
"fmt"
"time"
)
func hello(c chan bool) {
time.Sleep(5 * time.Second)
fmt.Println("hello goroutine")
c <- true
}
func main() {
var exitChan chan bool
exitChan = make(chan bool)
go hello(exitChan)
fmt.Println("main thread terminate")
<-exitChan
}
隻讀 隻寫的chan
package main
import "fmt"
func sendData(sendch chan<- int) {
sendch <- 10
//<-sendch
}
func readData(sendch <-chan int) {
//sendch <- 10
data := <-sendch
fmt.Println(data)
}
func main() {
chnl := make(chan int)
go sendData(chnl)
readData(chnl)
}
判斷管道是否關閉
package main
import (
"fmt"
)
func producer(chnl chan int) {
for i := 0; i < 10; i++ {
chnl <- i
}
close(chnl)
}
func main() {
ch := make(chan int)
go producer(ch)
for {
v, ok := <-ch
if ok == false {
fmt.Println("chan is closed")
break
}
fmt.Println("Received ", v)
}
}
for-range-chan 不需要關注管道是否關閉 管道關閉後 自動退出循環
package main
import (
"fmt"
"time"
)
func producer(chnl chan int) {
for i := 0; i < 10; i++ {
chnl <- i
time.Sleep(time.Second)
}
close(chnl)
}
func main() {
ch := make(chan int)
go producer(ch)
for v := range ch {
fmt.Println("receive:", v)
}
}
待緩沖的chan(容量)
特點:當沒有往chan放入資料,直接去擷取資料就會報錯(死鎖);當超過chan容量後,繼續放入資料也會報錯(死鎖)

package main
import "fmt"
func main() {
ch := make(chan string, 2)
var s string
//s = <-ch
ch <- "hello"
ch <- "world"
ch <- "!"
//ch <- "test"
s1 := <-ch
s2 := <-ch
fmt.Println(s, s1, s2)
}
View Code
待緩沖的chan

package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v, "from ch")
time.Sleep(2 * time.Second)
}
}
View Code
長度和容量

package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}
View Code
如何等待一組goroutine結束?
方法1:low版本
package main
import (
"fmt"
"time"
)
func process(i int, ch chan bool) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
ch <- true
}
func main() {
no := 3
exitChan := make(chan bool, no)
for i := 0; i < no; i++ {
go process(i, exitChan)
}
for i := 0; i < no; i++ {
<-exitChan
}
fmt.Println("All go routines finished executing")
}
方法2:sync.WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
wg.Wait()
fmt.Println("wait return")
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
workerpool的實作
woker池的實作
a,生産者,消費者模型,簡單有效
b,控制goroutine的數量,防止goroutine洩露和暴漲
c,基于goroutine和chan,建構wokerpool非常簡單
1,任務抽象程一個個job
2,使用job隊列和result隊列
3,開一個組goroutine進行實際任務計算,并把結果放回result隊列
案例:

package main
import (
"fmt"
"math/rand"
)
type Job struct {
Number int
Id int
}
type Result struct {
job *Job
sum int
}
func calc(job *Job, result chan *Result) {
var sum int
number := job.Number
for number != 0 {
tmp := number % 10
sum += tmp
number /= 10
}
r := &Result{
job: job,
sum: sum,
}
result <- r
}
func Worker(jobChan chan *Job, resultChan chan *Result) {
for job := range jobChan {
calc(job, resultChan)
}
}
func startWorkerPool(num int, jobChan chan *Job, resultChan chan *Result) {
for i := 0; i < num; i++ {
go Worker(jobChan, resultChan)
}
}
func printResult(resultChan chan *Result) {
for result := range resultChan {
fmt.Printf("job id:%v number:%v result:%d\n", result.job.Id, result.job.Number, result.sum)
}
}
func main() {
jobChan := make(chan *Job, 1000)
resultChan := make(chan *Result, 1000)
startWorkerPool(128, jobChan, resultChan)
go printResult(resultChan)
var id int
for {
id++
number := rand.Int()
job := &Job{
Id: id,
Number: number,
}
jobChan <- job
}
}
View Code
select

package main
import (
"fmt"
"time"
)
func server1(ch chan string) {
time.Sleep(time.Second * 6)
ch <- "response from server1"
}
func server2(ch chan string) {
time.Sleep(time.Second * 3)
ch <- "response from server2"
}
func main() {
output1 := make(chan string)
output2 := make(chan string)
go server1(output1)
go server2(output2)
/*
s1 := <-output1
fmt.Println("s1:", s1)
s2 := <-output2
fmt.Println("s2:", s2)
*/
select {
case s1 := <-output1:
fmt.Println("s1:", s1)
case s2 := <-output2:
fmt.Println("s2:", s2)
default:
fmt.Println("run default")
}
}
View Code

package main
import (
"fmt"
"time"
)
func write(ch chan string) {
for {
select {
case ch <- "hello":
fmt.Println("write succ")
default:
fmt.Println("channel is full")
}
time.Sleep(time.Millisecond * 500)
}
}
func main() {
//select {}
output1 := make(chan string, 10)
go write(output1)
for s := range output1 {
fmt.Println("recv:", s)
time.Sleep(time.Second)
}
}
View Code
sync.Mutex

package main
import (
"fmt"
"sync"
)
var x int
var wg sync.WaitGroup
var mutex sync.Mutex
func add() {
for i := 0; i < 5000; i++ {
mutex.Lock()
x = x + 1
mutex.Unlock()
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println("x:", x)
}
View Code

package main
import (
"fmt"
"sync"
"time"
)
var rwlock sync.RWMutex
var x int
var wg sync.WaitGroup
func write() {
rwlock.Lock()
fmt.Println("write lock")
x = x + 1
time.Sleep(10 * time.Second)
fmt.Println("write unlock")
rwlock.Unlock()
wg.Done()
}
func read(i int) {
fmt.Println("wait for rlock")
rwlock.RLock()
fmt.Printf("goroutine:%d x=%d\n", i, x)
time.Sleep(time.Second)
rwlock.RUnlock()
wg.Done()
}
func main() {
wg.Add(1)
go write()
time.Sleep(time.Millisecond * 5)
for i := 0; i < 10; i++ {
wg.Add(1)
go read(i)
}
wg.Wait()
}
讀鎖寫鎖
互斥鎖和讀寫鎖比較

package main
import (
"fmt"
"sync"
"time"
)
var rwlock sync.RWMutex
var x int
var wg sync.WaitGroup
var mutex sync.Mutex
func write() {
for i := 0; i < 100; i++ {
//rwlock.Lock()
mutex.Lock()
x = x + 1
time.Sleep(10 * time.Millisecond)
mutex.Unlock()
//rwlock.Unlock()
}
wg.Done()
}
func read(i int) {
for i := 0; i < 100; i++ {
//rwlock.RLock()
mutex.Lock()
time.Sleep(time.Millisecond)
mutex.Unlock()
//rwlock.RUnlock()
}
wg.Done()
}
func main() {
start := time.Now().UnixNano()
wg.Add(1)
go write()
for i := 0; i < 100; i++ {
wg.Add(1)
go read(i)
}
wg.Wait()
end := time.Now().UnixNano()
cost := (end - start) / 1000 / 1000
fmt.Println("cost:", cost, "ms")
}
View Code

package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var x int32
var wg sync.WaitGroup
var mutex sync.Mutex
func addMutex() {
for i := 0; i < 500; i++ {
mutex.Lock()
x = x + 1
mutex.Unlock()
}
wg.Done()
}
func add() {
for i := 0; i < 500; i++ {
//mutex.Lock()
//x = x +1
atomic.AddInt32(&x, 1)
//mutex.Unlock()
}
wg.Done()
}
func main() {
start := time.Now().UnixNano()
for i := 0; i < 10000; i++ {
wg.Add(1)
go add()
//go addMutex()
}
wg.Wait()
end := time.Now().UnixNano()
cost := (end - start) / 1000 / 1000
fmt.Println("x:", x, "cost:", cost, "ms")
}
atomic
其它案例:
先看代碼
package main
import (
"strings"
"fmt"
"time"
)
func main() {
users:=strings.Split("shenyi,zhangsan,lisi,wangwu",",")
ages:=strings.Split("19,21,25,26",",")
c1,c2:=make(chan bool),make(chan bool)
ret:=make([]string,0)
go func() {
for _,v:=range users{
<-c1
ret=append(ret,v)
time.Sleep(time.Second)
c2<-true
}
}()
go func() {
for _,v:=range ages{
<-c2
ret=append(ret,v)
c1<-true
}
}()
c1<-true
fmt.Println(ret)
}
列印:
[shenyi]
package main
import (
//_ "github.com/go-sql-driver/mysql"
"io/ioutil"
"net/http"
"fmt"
)
func main() {
url:="https://news.com/n/page/%d/"
c:=make(chan map[int][]byte)
for i:=1;i<=3;i++{
go func(index int) {
url:=fmt.Sprintf(url,index)
res,_:=http.Get(url)
cnt,_:= ioutil.ReadAll(res.Body)
c<-map[int][]byte{index:cnt}
if index==3 {
close(c)
}
}(i)
}
for getcnt:=range c{
for k,v:=range getcnt{
ioutil.WriteFile(fmt.Sprintf("./files/%d",k),v,666)
}
}
}