这个 lab 很早之前就已经做过了,最近在填raft的坑,就干脆把这个也重新做了一下。总的来说这个lab还算是简单的,当然还是有些实现细节要注意。

Part 1

这个部分要完成两个函数,分别是doMap, doReduce。 先来看一下doMap函数和doReduce函数的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
fMap := make(map[string]*os.File)
byteContent, err := ioutil.ReadFile(inFile)
if err != nil {
        fmt.Printf("read file error: %#v\n", err)
}
kvs := mapF(inFile, string(byteContent))
for i := 0; i < nReduce; i++ {
        filename := reduceName(jobName, mapTask, i)
        f, err := os.Create(filename)
        if err != nil {
                fmt.Printf("create file error: %#v\n", err)
        }
        if _, ok := fMap[filename]; !ok {
                fMap[filename] = f
        }
}
for _, kv := range kvs {
        filename := reduceName(jobName, mapTask, ihash(kv.Key)%nReduce)
        enc := json.NewEncoder(fMap[filename])
        enc.Encode(&kv)
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
var kvMap = make(map[string][]string)
var keys []string
for i := 0; i < nMap; i++ {
        filename := reduceName(jobName, i, reduceTask)
        f, err := os.Open(filename)
        if err != nil {
                fmt.Printf("read file error: %#v\n", err)
        }
        dec := json.NewDecoder(f)
        for {
                var kv KeyValue
                err := dec.Decode(&kv)
                if err != nil {
                        break
                }
                if _, ok := kvMap[kv.Key]; !ok {
                        keys = append(keys, kv.Key)
                }
                kvMap[kv.Key] = append(kvMap[kv.Key], kv.Value)
        }
        f.Close()
}

sort.Strings(keys)

f, err := os.Create(outFile)
if err != nil {
        fmt.Printf("read file error: %#v\n", err)
}
enc := json.NewEncoder(f)
for _, k := range keys {
        enc.Encode(KeyValue{k, reduceF(k, kvMap[k])})
}
f.Close()

doMap的过程就是读取输入,调用用户定义的Map函数产生键值对,然后将这些键值对分发给doReduce, 这里是通过文件的形式分发给doReduce过程的,就是将键值对分别写入到不同的文件中,每个doReduce任务读取其中的一个文件,对文件中的 key 进行排序,对每个 key 的 value 进行聚合,然后将结果写入文件。

这里遇到的坑

这里数据处理都是通过操作文件进行的,开始实现的时候是每次循环都打开关闭一次文件,测试的时候发现卡着不动,应该是文件操作太频繁,因为我们的文件个数是reduce的数量,就直接先打开nReduce个文件,将文件名和文件描述符存在一个map里,遍历键值对的时候就不用频繁打开关闭文件了。

Part 2

Part2 要实现单词计数功能,就是在 part1 的基础上,完成我们自己的map函数和reduce函数,实现单词计数功能。这个很简单,贴一下代码吧。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func mapF(filename string, contents string) []mapreduce.KeyValue {
        // Your code here (Part II).
        keys := strings.FieldsFunc(contents, func(r rune) bool { return !unicode.IsLetter(r) })
        var kvs []mapreduce.KeyValue
        for _, key := range keys {
                kv := mapreduce.KeyValue{Key: key, Value: "1"}
                kvs = append(kvs, kv)
        }
        return kvs
}

//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func reduceF(key string, values []string) string {
        // Your code here (Part II).
        sum := 0
        for _, v := range values {
                i, _ := strconv.Atoi(v)
                sum += i
        }
        return strconv.Itoa(sum)
}

Part 3

Part3 要实现任务调度,这里调度的大部分代码都已经写好了的,我们要做的就是把map任务和reduce要分配给不同的worker。 核心代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
var wg sync.WaitGroup
for i := 0; i < ntasks; i++ {
        wg.Add(1)
        go func(TaskNumber int, jobName string, phase jobPhase, n_other int, mapFiles []string) {
                var task DoTaskArgs
                task.JobName = jobName
                task.Phase = phase
                task.NumOtherPhase = n_other
                task.File = mapFiles[TaskNumber]
                task.TaskNumber = TaskNumber
        TASK:
                worker := <-registerChan
                if ok := call(worker, "Worker.DoTask", task, nil); ok {
                        wg.Done()
                        registerChan <- worker
                } else {
                        goto TASK
                }
        }(i, jobName, phase, n_other, mapFiles)
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)

Part 4

这部分就是处理网络失败的请求,其实就是 rpc 失败,我这里就是简单的利用goto, 重新从regiterChanworker分配给要执行的任务。

Part 5

这部分要实现倒排索引,和 part2 很类似。map函数和 part2 差不多,主要看一下reduce函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func reduceF(key string, values []string) string {
        // Your code here (Part V).
        sort.Strings(values)

        // suppress duplicate documents
        last := ""
        i := 0
        for _, f := range values {
                if f != last {
                        last = f
                        values[i] = f
                        i++
                }
        }
        values = values[0:i]

        return strconv.Itoa(len(values)) + " " + strings.Join(values, ",")
}

这里要注意的是要对 values 进行去重,不然没法通过测试。

最后附上一张测试全部通过的截图!image