这个 lab 很早之前就已经做过了,最近在填raft
的坑,就干脆把这个也重新做了一下。总的来说这个lab
还算是简单的,当然还是有些实现细节要注意。
Part 1
这个部分要完成两个函数,分别是doMap
, doReduce
。 先来看一下doMap
函数和doReduce
函数的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
fMap := make(map[string]*os.File)
byteContent, err := ioutil.ReadFile(inFile)
if err != nil {
fmt.Printf("read file error: %#v\n", err)
}
kvs := mapF(inFile, string(byteContent))
for i := 0; i < nReduce; i++ {
filename := reduceName(jobName, mapTask, i)
f, err := os.Create(filename)
if err != nil {
fmt.Printf("create file error: %#v\n", err)
}
if _, ok := fMap[filename]; !ok {
fMap[filename] = f
}
}
for _, kv := range kvs {
filename := reduceName(jobName, mapTask, ihash(kv.Key)%nReduce)
enc := json.NewEncoder(fMap[filename])
enc.Encode(&kv)
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
var kvMap = make(map[string][]string)
var keys []string
for i := 0; i < nMap; i++ {
filename := reduceName(jobName, i, reduceTask)
f, err := os.Open(filename)
if err != nil {
fmt.Printf("read file error: %#v\n", err)
}
dec := json.NewDecoder(f)
for {
var kv KeyValue
err := dec.Decode(&kv)
if err != nil {
break
}
if _, ok := kvMap[kv.Key]; !ok {
keys = append(keys, kv.Key)
}
kvMap[kv.Key] = append(kvMap[kv.Key], kv.Value)
}
f.Close()
}
sort.Strings(keys)
f, err := os.Create(outFile)
if err != nil {
fmt.Printf("read file error: %#v\n", err)
}
enc := json.NewEncoder(f)
for _, k := range keys {
enc.Encode(KeyValue{k, reduceF(k, kvMap[k])})
}
f.Close()
|
doMap
的过程就是读取输入,调用用户定义的Map
函数产生键值对,然后将这些键值对分发给doReduce
, 这里是通过文件的形式分发给doReduce
过程的,就是将键值对分别写入到不同的文件中,每个doReduce
任务读取其中的一个文件,对文件中的 key 进行排序,对每个 key 的 value 进行聚合,然后将结果写入文件。
这里遇到的坑
这里数据处理都是通过操作文件进行的,开始实现的时候是每次循环都打开关闭一次文件,测试的时候发现卡着不动,应该是文件操作太频繁,因为我们的文件个数是reduce
的数量,就直接先打开nReduce
个文件,将文件名和文件描述符存在一个map
里,遍历键值对的时候就不用频繁打开关闭文件了。
Part 2
Part2 要实现单词计数功能,就是在 part1 的基础上,完成我们自己的map
函数和reduce
函数,实现单词计数功能。这个很简单,贴一下代码吧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func mapF(filename string, contents string) []mapreduce.KeyValue {
// Your code here (Part II).
keys := strings.FieldsFunc(contents, func(r rune) bool { return !unicode.IsLetter(r) })
var kvs []mapreduce.KeyValue
for _, key := range keys {
kv := mapreduce.KeyValue{Key: key, Value: "1"}
kvs = append(kvs, kv)
}
return kvs
}
//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func reduceF(key string, values []string) string {
// Your code here (Part II).
sum := 0
for _, v := range values {
i, _ := strconv.Atoi(v)
sum += i
}
return strconv.Itoa(sum)
}
|
Part 3
Part3 要实现任务调度,这里调度的大部分代码都已经写好了的,我们要做的就是把map
任务和reduce
要分配给不同的worker
。
核心代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
var wg sync.WaitGroup
for i := 0; i < ntasks; i++ {
wg.Add(1)
go func(TaskNumber int, jobName string, phase jobPhase, n_other int, mapFiles []string) {
var task DoTaskArgs
task.JobName = jobName
task.Phase = phase
task.NumOtherPhase = n_other
task.File = mapFiles[TaskNumber]
task.TaskNumber = TaskNumber
TASK:
worker := <-registerChan
if ok := call(worker, "Worker.DoTask", task, nil); ok {
wg.Done()
registerChan <- worker
} else {
goto TASK
}
}(i, jobName, phase, n_other, mapFiles)
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)
|
Part 4
这部分就是处理网络失败的请求,其实就是 rpc 失败,我这里就是简单的利用goto
, 重新从regiterChan
拿worker
分配给要执行的任务。
Part 5
这部分要实现倒排索引,和 part2 很类似。map
函数和 part2 差不多,主要看一下reduce
函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func reduceF(key string, values []string) string {
// Your code here (Part V).
sort.Strings(values)
// suppress duplicate documents
last := ""
i := 0
for _, f := range values {
if f != last {
last = f
values[i] = f
i++
}
}
values = values[0:i]
return strconv.Itoa(len(values)) + " " + strings.Join(values, ",")
}
|
这里要注意的是要对 values 进行去重,不然没法通过测试。
最后附上一张测试全部通过的截图!image
Author
Hao
LastMod
2018-10-11
License
本文采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可