天天看點

go語言高并發處理

作者:幹飯人小羽
go語言高并發處理

代碼和注釋均在代碼:

代碼和注釋均在代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
	
package main
 
import (
    "fmt"
    "runtime"
    "time"
)
 
//---------------------Job-------------------------
type Job interface {
    Do()
}
// 一個資料接口,所有的資料都要實作該接口,才能被傳遞進來
//實作Job接口的一個資料執行個體,需要實作一個Do()方法,對資料的處理就在這個Do()方法中。
 
/*
Job通道:
    這裡有兩個Job通道:
    1、WorkerPool的Job channel,用于調用者把具體的資料寫入到這裡,WorkerPool讀取。
    2、Worker的Job channel,當WorkerPool讀取到Job,并拿到可用的Worker的時候,會将Job執行個體寫入該Worker的Job channel,用來直接執行Do()方法。
*/
//--------------------Worker-----------------------
type Worker struct {
    JobQueue chan Job //Worker的Job通道
}
 
func NewWorker() Worker {
    return Worker{JobQueue: make(chan Job)}
}
//每一個被初始化的worker都會在後期單獨占用一個協程
//初始化的時候會先把自己的JobQueue傳遞到Worker通道中,
//然後阻塞讀取自己的JobQueue,讀到一個Job就執行Job對象的Do()方法。
 
func (w Worker) Run(wq chan chan Job)  {
    go func() {
        for{
            wq <- w.JobQueue
            select {
            case job := <- w.JobQueue:
                job.Do()
            }
        }
    }()
}
 
//----------------WorkerPool---------------------
//工作池(WorkerPool):
type WorkerPool struct {
    workerlen int               //WorkerPool中同時 存在Worker的個數
    JobQueue chan Job           // WorkerPool的Job通道
    WorkerQueue chan chan Job
}
//初始化時會按照傳入的num,啟動num個背景協程,然後循環讀取Job通道裡面的資料,
//讀到一個資料時,再擷取一個可用的Worker,并将Job對象傳遞到該Worker的chan通道
 
func NewWorkerPool(workerlen int) *WorkerPool {
    return &WorkerPool{
        workerlen: workerlen,
        JobQueue: make(chan Job),
        WorkerQueue: make(chan chan Job,workerlen),
    }
}
 
func (wp *WorkerPool) Run()  {
    fmt.Println("初始化worker...")
    for i:=0;i<wp.workerlen;i++ {
        worker := NewWorker()
        worker.Run(wp.WorkerQueue)
    }
 
    go func() {
        for{
            select {
            case job := <-wp.JobQueue:
                worker :=<-wp.WorkerQueue
                worker <- job
            }
        }
    }()
}
 
//------------------測試--------------------
 
type Score struct {
    Num int
}
 
func (s *Score) Do()  {
    fmt.Println("num:",s.Num)
    time.Sleep(time.Second*1*1)
}
 
func main() {
    num := 100*100*20
    p := NewWorkerPool(num)
    p.Run()
    datanum := 100*100*100*100
    go func() {
        for i:=1;i<datanum;i++{
            sc := &Score{Num: i}
            p.JobQueue <- sc
        }
    }()
 
    for{
        fmt.Println("runtime.NumGoroutine():",runtime.NumGoroutine())
        time.Sleep(time.Second*2)
    }
}           

繼續閱讀