代碼和注釋均在代碼:
代碼和注釋均在代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package main
import (
"fmt"
"runtime"
"time"
)
//---------------------Job-------------------------
type Job interface {
Do()
}
// 一個資料接口,所有的資料都要實作該接口,才能被傳遞進來
//實作Job接口的一個資料執行個體,需要實作一個Do()方法,對資料的處理就在這個Do()方法中。
/*
Job通道:
這裡有兩個Job通道:
1、WorkerPool的Job channel,用于調用者把具體的資料寫入到這裡,WorkerPool讀取。
2、Worker的Job channel,當WorkerPool讀取到Job,并拿到可用的Worker的時候,會将Job執行個體寫入該Worker的Job channel,用來直接執行Do()方法。
*/
//--------------------Worker-----------------------
type Worker struct {
JobQueue chan Job //Worker的Job通道
}
func NewWorker() Worker {
return Worker{JobQueue: make(chan Job)}
}
//每一個被初始化的worker都會在後期單獨占用一個協程
//初始化的時候會先把自己的JobQueue傳遞到Worker通道中,
//然後阻塞讀取自己的JobQueue,讀到一個Job就執行Job對象的Do()方法。
func (w Worker) Run(wq chan chan Job) {
go func() {
for{
wq <- w.JobQueue
select {
case job := <- w.JobQueue:
job.Do()
}
}
}()
}
//----------------WorkerPool---------------------
//工作池(WorkerPool):
type WorkerPool struct {
workerlen int //WorkerPool中同時 存在Worker的個數
JobQueue chan Job // WorkerPool的Job通道
WorkerQueue chan chan Job
}
//初始化時會按照傳入的num,啟動num個背景協程,然後循環讀取Job通道裡面的資料,
//讀到一個資料時,再擷取一個可用的Worker,并将Job對象傳遞到該Worker的chan通道
func NewWorkerPool(workerlen int) *WorkerPool {
return &WorkerPool{
workerlen: workerlen,
JobQueue: make(chan Job),
WorkerQueue: make(chan chan Job,workerlen),
}
}
func (wp *WorkerPool) Run() {
fmt.Println("初始化worker...")
for i:=0;i<wp.workerlen;i++ {
worker := NewWorker()
worker.Run(wp.WorkerQueue)
}
go func() {
for{
select {
case job := <-wp.JobQueue:
worker :=<-wp.WorkerQueue
worker <- job
}
}
}()
}
//------------------測試--------------------
type Score struct {
Num int
}
func (s *Score) Do() {
fmt.Println("num:",s.Num)
time.Sleep(time.Second*1*1)
}
func main() {
num := 100*100*20
p := NewWorkerPool(num)
p.Run()
datanum := 100*100*100*100
go func() {
for i:=1;i<datanum;i++{
sc := &Score{Num: i}
p.JobQueue <- sc
}
}()
for{
fmt.Println("runtime.NumGoroutine():",runtime.NumGoroutine())
time.Sleep(time.Second*2)
}
}