6.824 Lab-1 MapReduce
1.實驗内容
1.1内容概述
将經典的Word Counter任務使用MapReduce程式設計範式去實作,任務整體流程如下(假設兩個Map節點和兩個Reduce節點):
每個Map Worker負責一個輸入檔案的Map處理,每個Map任務輸出N份檔案(N是Reduce Worker數目),這N份檔案會送到N個Reduce Worker處理。等待所有Map任務完成後,Reduce工作才能開始(此時所有輸入檔案才準備好),每個Reduce Worker輸出一份reduce結果。
1.2lab相關代碼概述
與本實驗相關的主要有三個包,分别是main、mrapps、mr包。main包調用mrapps包和mr包運作整個流程,mrapps包是運作和測試時使用的工具函數包,這兩個包在實驗過程中都不需要改動,自己寫的代碼都在mr包中,其中包含三個檔案coordinator.go、rpc.go和worker.go,作用如下
mr
├── coordinator.go \\Master
├── rpc.go \\處理通信
└── worker.go \\worker,包含map和reduce
2.實驗步驟
2.1定義通信内容(rpc.go)
2.1.1配置設定任務時的請求與響應
worker請求任務時,不區分map和reduce,讓coordinator根據任務完成情況來決定配置設定任務類型,這裡有個邊界情況就是所有的任務都在運作中,這個狀态既得不到任務又不能直接退出,隻能進行下一次任務。
// 節點請求任務
type ReqArgs struct {
ReqNumber int8 //占用一個位元組表示請求 為1表示申請任務
}
// Master回應任務内容
type ReqReply struct {
TypeName string // map or reduce or allinprogress
Idx int //worker idx
Content []string //file names for work content
NReduce int //reduce worker number
}
2.1.2完成任務時worker彙報任務結果給Master
彙報完成情況時,需要說明任務類型(TypeName), 任務結果(Ret), 完成的任務編号(idx)
// 報告任務完成情況
type FinishReq struct {
TypeName string //map or reduce
Ret []string //output files(map or reduce)
Idx int //worker idx (map or reduce)
}
//Master 回應worker
type FinishReply struct {
Done bool //for reply
}
2.2實作Worker(Worker.go)
2.2.1Map worker
Map worker任務包括三步,第一步讀取輸入檔案調用mapf生成key value對,第二步處理kv對,排序之後合并相同條目到同一行,第三步将結果寫入輸出檔案。
1.讀取檔案内容生成Key Value對
intermediate := []KeyValue{}
filename := reply1.Content[0]
NReduce := reply1.NReduce
//打開檔案
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
//生成KV對
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
2.處理kv對生成中間結果
sort.Sort(ByKey(intermediate))
i := 0
reduceInput := make([][]ReduceKv, NReduce)
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
key := intermediate[i].Key
//根據key配置設定到不同的reduce任務中
idx := ihash(key) % NReduce
reduceInput[idx] = append(reduceInput[idx], ReduceKv{Key: key, Value: values})
i = j
}
3.寫入結果
這個過程分為兩步,首先寫入臨時檔案,然後檢查檔案是否已經存在,如果不存在,将臨時檔案改名(原子操作),這樣做的好處是防止一個任務被配置設定給不同worker時,多個worker同時寫一個檔案。
- 寫入臨時檔案
tempFiles := make([]*os.File, NReduce) //臨時檔案命名 for i := range tempFiles { tempFiles[i], err = ioutil.TempFile(".", "out*") if err != nil { log.Fatal("creat tempfile fail") } } //對象以json格式寫入臨時檔案 for i := range reduceInput { enc := json.NewEncoder(tempFiles[i]) for _, kv := range reduceInput[i] { err := enc.Encode(&kv) if err != nil { log.Fatalf("cannot write json %v", i) } } }
- 重命名(原子操作)
//輸入檔案命名
for i := range outNames {
outNames[i] = "mr-map-out-" + strconv.Itoa(reply1.Idx) + "-" + strconv.Itoa(i)
}
//rename
for i := range tempFiles {
_, err := os.Stat(outNames[i])
if os.IsNotExist(err) {
os.Rename(tempFiles[i].Name(), outNames[i])
if i == len(tempFiles)-1 {//complete work, call finish info
args := FinishReq{TypeName: "map", Ret: outNames, Idx: reply1.Idx}
reply := FinishReply{}
call("Coordinator.HandFinishInfo", &args, &reply)
}
} else {
os.Remove(tempFiles[i].Name()) //remove tempfile
break
}
}
2.2.2Reduce Worker
reduce 操作主要是讀取map處理得到的檔案,然後處理寫入輸出檔案,寫檔案和map worker類似,先寫入臨時檔案,完成之後再重命名
使用一個map來記錄讀取到的kv對,
kvaMap := make(map[string]*ReduceKv)
1.讀取中間檔案(json格式)
for _, filename := range inputFileNames {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
dec := json.NewDecoder(file)
for {
var kv ReduceKv
if err := dec.Decode(&kv); err != nil {
break
}
if kvaMap[kv.Key] == nil {//map中沒有記錄,建立
kvaMap[kv.Key] = &kv
} else {//map中已經記錄,追加
kvaMap[kv.Key].Value = append(kvaMap[kv.Key].Value, kv.Value...)
}
}
}
2.寫入臨時檔案
// 寫入臨時檔案
for _, kv := range kvaMap {
output := reducef(kv.Key, kv.Value)
fmt.Fprintf(oTmpFile, "%v %v\n", kv.Key, output)
}
3.重命名,完成後彙報給Master
// 重命名臨時檔案
outName := "mr-out-" + strconv.Itoa(idx)
retName := []string{}
_, err := os.Stat(outName)
if os.IsNotExist(err) {
os.Rename(oTmpFile.Name(), outName)
//通知Master
args := FinishReq{TypeName: "reduce", Ret: append(retName, outName), Idx: idx}
reply := FinishReply{}
call("Coordinator.HandFinishInfo", &args, &reply)
} else {
os.Remove(oTmpFile.Name())
}
2.3實作Master (Coordinate.go)
Master負責記錄任務的狀态和配置設定任務,定義了兩個資料結構如下:
type Task struct {
inputFileName []string //輸入檔案
status TaskStatus //任務狀态
}
type Coordinator struct {
MapTask []Task
ReduceTask []Task
NReduce int//記錄reduce worker數目 關系到map輸出的檔案數
}
定義兩個鎖和兩個全局變量
var coordinateLock sync.RWMutex //用于控制Coordinate結構内變量的通路
var lockBool sync.RWMutex //控制mapDone 和 reduceDone
//辨別任務完成狀态
var mapDone bool = false
var reduceDone bool = false
定義兩個函數 IsWorkDone AssignTask
//用于判斷某一類任務是否完成
func IsWorkDone(tasks []Task) bool {
coordinateLock.RLock()
defer coordinateLock.RUnlock()
for i := range tasks {
if tasks[i].status != Completed {
return false
}
}
return true
}
//讀取任務狀态并配置設定任務,傳回值中int如果為正數則是worker id, 為-1表示該類任務完成,為-2表示所有任務都在執行
func AssignTask(tasks []Task) (*Task, int) {
coordinateLock.RLock()
defer coordinateLock.RUnlock()
for i := range tasks {
if tasks[i].status == Idle {
return &tasks[i], i
}
}
if IsWorkDone(tasks) {
return nil, -1 //work done
} else {
return nil, -2 //all in progress wait
}
}
定義Coordinate中處理兩類請求的函數,需要在任務配置設定後的10秒,檢查任務是否完成,如果沒有完成,需要重置任務狀态到空閑,便于配置設定給另外一個節點,這裡開了一個協程來完成這個事情,隻要主函數沒有結束,協程不會提前結束,也就是說調用協程的函數(不是main)結束了,協程還可以運作
處理配置設定任務的請求:
func (c *Coordinator) HandWorkerReq(args *ReqArgs, reply *ReqReply) error {
if args.ReqNumber == 1 {
lockBool.RLock()
if !mapDone {
if mapTask, mapStatus := AssignTask(c.MapTask); mapStatus >= 0 {
// lock.Lock()
reply.TypeName = "map"
reply.Content = mapTask.inputFileName
reply.Idx = mapStatus
reply.NReduce = c.NReduce
coordinateLock.Lock()
mapTask.status = InProgress
coordinateLock.Unlock()
// lock.Unlock()
go CheckStatus(mapTask)
} else if mapStatus == -1 {
lockBool.RUnlock()
lockBool.Lock()
mapDone = true
lockBool.Unlock()
lockBool.RLock()
// lock.Unlock()
} else {
// lock.Lock()
reply.TypeName = "allinprogress"
// lock.Unlock()
}
} else if !reduceDone {
if redTask, redStatus:=AssignTask(c.ReduceTask); redStatus >= 0 {
reply.TypeName = "reduce"
reply.Idx = redStatus
reply.Content = redTask.inputFileName
reply.NReduce = c.NReduce
coordinateLock.Lock()
redTask.status = InProgress
coordinateLock.Unlock()
go CheckStatus(redTask)
} else if redStatus == -1 {
lockBool.RUnlock()
lockBool.Lock()
reduceDone = true
lockBool.Unlock()
lockBool.RLock()
} else {
// lock.Lock()
reply.TypeName = "allinprogress"
// lock.Unlock()
}
} else {
// lock.Lock()
reply.TypeName = "finish"
// lock.Unlock()
}
lockBool.RUnlock()
}
return nil
}
//檢查任務狀态
func CheckStatus(t *Task) {
time.Sleep(10 * time.Second)
coordinateLock.Lock()
defer coordinateLock.Unlock()
if t.status != Completed {
t.status = Idle
}
}
處理任務完成消息:
func (c *Coordinator) HandFinishInfo(args *FinishReq, reply *FinishReply) error {
idx := args.Idx
if args.TypeName == "map" {
coordinateLock.Lock()
if c.MapTask[idx].status != Completed {
for i := range c.ReduceTask {
c.ReduceTask[i].inputFileName = append(c.ReduceTask[i].inputFileName, args.Ret[i])
}
c.MapTask[idx].status = Completed
}
coordinateLock.Unlock()
}
if args.TypeName == "reduce" {
coordinateLock.Lock()
if c.ReduceTask[idx].status != Completed {
c.ReduceTask[idx].status = Completed
}
coordinateLock.Unlock()
}
return nil
}
3實驗總結
踩坑! 代碼可以通過所有的測試,在debug的過程中,有一個版本一直過不了reduce的并行測試,後面看了測試代碼才知道,判斷并行的方式是在同1秒内有沒有多個節點在建立檔案,由于之前設定的worker會休息1秒再發送請求(指導書建議這樣做),正是請求的間隙,造成了reduce在測試中無法并行,後面不等待直接發送下一個請求就可以通過測試了。
心得! lab1的指導書真的超級詳細,基本上有問題了指導書上都可以找到建議,隻要認真做,根本不用參考别人的代碼,就是debug的時候有點困難,基本靠列印來判斷問題。
4參考資料
6.824首頁
5整體代碼
github連結