天天看點

6.824 lab1 MapReduce6.824 Lab-1 MapReduce

6.824 Lab-1 MapReduce

1.實驗内容

1.1内容概述

将經典的Word Counter任務使用MapReduce程式設計範式去實作,任務整體流程如下(假設兩個Map節點和兩個Reduce節點):

6.824 lab1 MapReduce6.824 Lab-1 MapReduce

每個Map Worker負責一個輸入檔案的Map處理,每個Map任務輸出N份檔案(N是Reduce Worker數目),這N份檔案會送到N個Reduce Worker處理。等待所有Map任務完成後,Reduce工作才能開始(此時所有輸入檔案才準備好),每個Reduce Worker輸出一份reduce結果。

1.2lab相關代碼概述

與本實驗相關的主要有三個包,分别是main、mrapps、mr包。main包調用mrapps包和mr包運作整個流程,mrapps包是運作和測試時使用的工具函數包,這兩個包在實驗過程中都不需要改動,自己寫的代碼都在mr包中,其中包含三個檔案coordinator.go、rpc.go和worker.go,作用如下

mr
├── coordinator.go \\Master
├── rpc.go		   \\處理通信
└── worker.go	   \\worker,包含map和reduce
           

2.實驗步驟

2.1定義通信内容(rpc.go)

2.1.1配置設定任務時的請求與響應

worker請求任務時,不區分map和reduce,讓coordinator根據任務完成情況來決定配置設定任務類型,這裡有個邊界情況就是所有的任務都在運作中,這個狀态既得不到任務又不能直接退出,隻能進行下一次任務。

// 節點請求任務
type ReqArgs struct {
	ReqNumber int8 //占用一個位元組表示請求 為1表示申請任務
}
// Master回應任務内容
type ReqReply struct {
	TypeName string   // map or reduce or allinprogress
	Idx      int      //worker idx
	Content  []string //file names for work content
	NReduce  int      //reduce worker number
}
           

2.1.2完成任務時worker彙報任務結果給Master

彙報完成情況時,需要說明任務類型(TypeName), 任務結果(Ret), 完成的任務編号(idx)

// 報告任務完成情況
type FinishReq struct {
	TypeName string //map or reduce
	Ret      []string //output files(map or reduce)
	Idx      int //worker idx (map or reduce)
}

//Master 回應worker
type FinishReply struct {
	Done bool //for reply
}
           

2.2實作Worker(Worker.go)

2.2.1Map worker

Map worker任務包括三步,第一步讀取輸入檔案調用mapf生成key value對,第二步處理kv對,排序之後合并相同條目到同一行,第三步将結果寫入輸出檔案。

1.讀取檔案内容生成Key Value對
intermediate := []KeyValue{}
filename := reply1.Content[0]
NReduce := reply1.NReduce

//打開檔案
file, err := os.Open(filename)
if err != nil {
	log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
	log.Fatalf("cannot read %v", filename)
}

file.Close()

//生成KV對
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
           
2.處理kv對生成中間結果
sort.Sort(ByKey(intermediate))
i := 0
reduceInput := make([][]ReduceKv, NReduce)
for i < len(intermediate) {
	j := i + 1
    for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
        j++
    }
    values := []string{}
    for k := i; k < j; k++ {
        values = append(values, intermediate[k].Value)
    }

    key := intermediate[i].Key

    //根據key配置設定到不同的reduce任務中
    idx := ihash(key) % NReduce
    reduceInput[idx] = append(reduceInput[idx], ReduceKv{Key: key, Value: values})

    i = j
}
           
3.寫入結果

這個過程分為兩步,首先寫入臨時檔案,然後檢查檔案是否已經存在,如果不存在,将臨時檔案改名(原子操作),這樣做的好處是防止一個任務被配置設定給不同worker時,多個worker同時寫一個檔案。

  1. 寫入臨時檔案
    tempFiles := make([]*os.File, NReduce)
    //臨時檔案命名
    for i := range tempFiles {
    	tempFiles[i], err = ioutil.TempFile(".", "out*")
    	if err != nil {
    		log.Fatal("creat tempfile fail")
    	}
    }
    
    //對象以json格式寫入臨時檔案
    for i := range reduceInput {
    	enc := json.NewEncoder(tempFiles[i])
    	for _, kv := range reduceInput[i] {
    		err := enc.Encode(&kv)
    		if err != nil {
    			log.Fatalf("cannot write json %v", i)
    		}
    	}
    }
               
  2. 重命名(原子操作)
//輸入檔案命名
for i := range outNames {
	outNames[i] = "mr-map-out-" + strconv.Itoa(reply1.Idx) + "-" + strconv.Itoa(i)
}
//rename
for i := range tempFiles {
	_, err := os.Stat(outNames[i])
	if os.IsNotExist(err) {
		os.Rename(tempFiles[i].Name(), outNames[i])
		if i == len(tempFiles)-1 {//complete work, call finish info
			args := FinishReq{TypeName: "map", Ret: outNames, Idx: reply1.Idx}
			reply := FinishReply{}
			call("Coordinator.HandFinishInfo", &args, &reply)
		}
	} else {
		os.Remove(tempFiles[i].Name()) //remove tempfile
		break
	}
}
           

2.2.2Reduce Worker

reduce 操作主要是讀取map處理得到的檔案,然後處理寫入輸出檔案,寫檔案和map worker類似,先寫入臨時檔案,完成之後再重命名

使用一個map來記錄讀取到的kv對,

kvaMap := make(map[string]*ReduceKv)

1.讀取中間檔案(json格式)
for _, filename := range inputFileNames {
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}

	dec := json.NewDecoder(file)

	for {
		var kv ReduceKv
		if err := dec.Decode(&kv); err != nil {
			break
		}
		if kvaMap[kv.Key] == nil {//map中沒有記錄,建立
			kvaMap[kv.Key] = &kv
		} else {//map中已經記錄,追加
			kvaMap[kv.Key].Value = append(kvaMap[kv.Key].Value, kv.Value...)
		}
	}
}
           
2.寫入臨時檔案
// 寫入臨時檔案
for _, kv := range kvaMap {
   output := reducef(kv.Key, kv.Value)
   fmt.Fprintf(oTmpFile, "%v %v\n", kv.Key, output)
}
           
3.重命名,完成後彙報給Master
// 重命名臨時檔案
outName := "mr-out-" + strconv.Itoa(idx)
retName := []string{}
_, err := os.Stat(outName)
if os.IsNotExist(err) {
	os.Rename(oTmpFile.Name(), outName)
    //通知Master
	args := FinishReq{TypeName: "reduce", Ret: append(retName, outName), Idx: idx}
	reply := FinishReply{}
	call("Coordinator.HandFinishInfo", &args, &reply)
} else {
	os.Remove(oTmpFile.Name())
}
           

2.3實作Master (Coordinate.go)

Master負責記錄任務的狀态和配置設定任務,定義了兩個資料結構如下:

type Task struct {
	inputFileName []string   //輸入檔案
	status        TaskStatus //任務狀态
}

type Coordinator struct {
	MapTask    []Task
	ReduceTask []Task
	NReduce    int//記錄reduce worker數目 關系到map輸出的檔案數
}
           

定義兩個鎖和兩個全局變量

var coordinateLock sync.RWMutex //用于控制Coordinate結構内變量的通路
var lockBool sync.RWMutex       //控制mapDone 和 reduceDone

//辨別任務完成狀态
var mapDone bool = false
var reduceDone bool = false
           

定義兩個函數 IsWorkDone AssignTask

//用于判斷某一類任務是否完成
func IsWorkDone(tasks []Task) bool {
	coordinateLock.RLock()
	defer coordinateLock.RUnlock()
	for i := range tasks {
		if tasks[i].status != Completed {
			return false
		}
	}
	return true
}

//讀取任務狀态并配置設定任務,傳回值中int如果為正數則是worker id, 為-1表示該類任務完成,為-2表示所有任務都在執行
func AssignTask(tasks []Task) (*Task, int) {
	coordinateLock.RLock()
	defer coordinateLock.RUnlock()
	for i := range tasks {
		if tasks[i].status == Idle {
			return &tasks[i], i
		}
	}
	if IsWorkDone(tasks) {
		return nil, -1 //work done
	} else {
		return nil, -2 //all in progress wait
	}
}
           

定義Coordinate中處理兩類請求的函數,需要在任務配置設定後的10秒,檢查任務是否完成,如果沒有完成,需要重置任務狀态到空閑,便于配置設定給另外一個節點,這裡開了一個協程來完成這個事情,隻要主函數沒有結束,協程不會提前結束,也就是說調用協程的函數(不是main)結束了,協程還可以運作

處理配置設定任務的請求:

func (c *Coordinator) HandWorkerReq(args *ReqArgs, reply *ReqReply) error {
	if args.ReqNumber == 1 {
		lockBool.RLock()
		if !mapDone {
			if mapTask, mapStatus := AssignTask(c.MapTask); mapStatus >= 0 {
				// lock.Lock()

				reply.TypeName = "map"
				reply.Content = mapTask.inputFileName
				reply.Idx = mapStatus
				reply.NReduce = c.NReduce

				coordinateLock.Lock()
				mapTask.status = InProgress
				coordinateLock.Unlock()
				// lock.Unlock()

				go CheckStatus(mapTask)

			} else if mapStatus == -1 {
				lockBool.RUnlock()
				lockBool.Lock()
				mapDone = true
				lockBool.Unlock()
				lockBool.RLock()
				// lock.Unlock()
			} else {
				// lock.Lock()
				reply.TypeName = "allinprogress"
				// lock.Unlock()
			}
		} else if !reduceDone {
			if redTask, redStatus:=AssignTask(c.ReduceTask); redStatus >= 0 {
				reply.TypeName = "reduce"
				reply.Idx = redStatus
				reply.Content = redTask.inputFileName
				reply.NReduce = c.NReduce

				coordinateLock.Lock()
				redTask.status = InProgress
				coordinateLock.Unlock()

				go CheckStatus(redTask)
			} else if redStatus == -1 {
				lockBool.RUnlock()
				lockBool.Lock()
				reduceDone = true
				lockBool.Unlock()
				lockBool.RLock()
			} else {
				// lock.Lock()
				reply.TypeName = "allinprogress"
				// lock.Unlock()
			}
		} else {
			// lock.Lock()
			reply.TypeName = "finish"
			// lock.Unlock()
		}
		lockBool.RUnlock()
	}
	return nil
}

//檢查任務狀态
func CheckStatus(t *Task) {
	time.Sleep(10 * time.Second)

	coordinateLock.Lock()
	defer coordinateLock.Unlock()

	if t.status != Completed {
		t.status = Idle
	}
}
           

處理任務完成消息:

func (c *Coordinator) HandFinishInfo(args *FinishReq, reply *FinishReply) error {
	idx := args.Idx
	if args.TypeName == "map" {
		coordinateLock.Lock()

		if c.MapTask[idx].status != Completed {
			for i := range c.ReduceTask {
				c.ReduceTask[i].inputFileName = append(c.ReduceTask[i].inputFileName, args.Ret[i])
			}
			c.MapTask[idx].status = Completed
		}

		coordinateLock.Unlock()
	}

	if args.TypeName == "reduce" {

		coordinateLock.Lock()

		if c.ReduceTask[idx].status != Completed {
			c.ReduceTask[idx].status = Completed
		}

		coordinateLock.Unlock()
	}
	return nil
}
           

3實驗總結

6.824 lab1 MapReduce6.824 Lab-1 MapReduce

踩坑! 代碼可以通過所有的測試,在debug的過程中,有一個版本一直過不了reduce的并行測試,後面看了測試代碼才知道,判斷并行的方式是在同1秒内有沒有多個節點在建立檔案,由于之前設定的worker會休息1秒再發送請求(指導書建議這樣做),正是請求的間隙,造成了reduce在測試中無法并行,後面不等待直接發送下一個請求就可以通過測試了。

心得! lab1的指導書真的超級詳細,基本上有問題了指導書上都可以找到建議,隻要認真做,根本不用參考别人的代碼,就是debug的時候有點困難,基本靠列印來判斷問題。

4參考資料

6.824首頁

5整體代碼

github連結

繼續閱讀