MIT 6.824 Lab1 MapReduce实现思路

    科技2022-07-21  105

    关键问题

    提供RPC接口,供worker调取文件(名)确保每个文件都被worker正确消费,如果worker过了10s还没消费掉这个文件,则将这个文件给别人消费在所有文件都被消费之后,通知每个worker结束任务应该避免任务重复执行,包括map任务和reduce任务

    master实现

    在所有的任务都被完成后,等待10s,给每个前来要任务的worker发送一个特殊的reply,通知他们退出程序。master需要实现一个RPC函数,这个函数要求 上一次完成的工作,如果是第一次,则为nil。发请求总是带上上一次完成的任务,由master维护“任务被消费”的幂等性 先完成所有的map任务,当map任务全部被完成时,再给所有worker发布reduce任务。所以入参要指明是map任务还是reduce任务入参包括自己上次完成的任务的函数名和函数的入参出参包括要完成的函数名和函数的入参。exit是一个特殊的任务,表示要求worker退出通过一个map来确认是否每个任务都被正确完成了。对任务生成hash512,作为key,值为false。当任务被完成时,改为true。每当一个任务被调取之后,起一个协程,睡眠10s,然后检查这个key。如果key的value为false,则把这个任务重新加到任务队列。

    worker实现

    worker在彻底完成工作之前,应该将文件命名为标准文件名。

    事后诸葛

    功能抽象

    从给出的样板代码和要求可以看出,这是一个生产者(master)消费者(worker)模型,且生产者要保障消费者正确消费。那么理所应当可以想到用一个带ACK响应的任务队列来实现这一功能,那么展示如何实现一个简单的消息队列。

    在这个实验中,只有Map和Reduce两种函数,我们可以理解成,worker向master询问要执行什么函数,以及函数的参数。我们将这一部分做一个抽象,理解成wokrer向master要一个job,job包括了函数名,函数的参数,以及其他可能用得到的参数。

    那么基本可以确定,我们要实现一个job的任务队列,并且任务队列根据ACK来确保任务被正确消费。以下只给出结构体和函数。具体实现不提。

    package mr import ( "crypto/sha256" "encoding/json" "fmt" "log" "sync" "time" ) //**************************************** job的定义 **************************************** type Job struct { FuncName string ParamsJson []byte CreatedAt int64 } // 生成的hash作为 ack 记录任务是否被响应的key func (job *Job) Hash() [32]byte {} func (job *Job) IsEmpty() bool {} //**************************************** jobAck的定义 **************************************** // 用一个map来记录没有ack的任务 // 之所以不用sync.Map,是因为那个是读多写少的时候性能好,但是目前的场景,读写差不多 // 额外提一句,这个jobAck只是维护了key是否在map中,并不关心job是啥,甚至是不是job,他关心的只有key type jobAck struct { jobsDone map[[32]byte]bool lock sync.Mutex } func NewJobAck() *jobAck {} // map是否为空 func (ack *jobAck) IsAllAck() bool {} func (ack *jobAck) IsAck(key [32]byte) bool {} // 删除key func (ack *jobAck) Ack(key [32]byte) {} // 添加key func (ack *jobAck) WaitAck(key [32]byte) {} //**************************************** jobLine的定义 **************************************** // 一个line作为任务队列,一个ack用来维护未响应的任务 type JobLine struct { line []Job ack jobAck lock sync.Mutex } func NewJobLine() *JobLine {} func (jobLine *JobLine) Len() int {} func (jobLine *JobLine) Pop() Job {} func (jobLine *JobLine) Add(jobs ...Job) {} // jobAck只是队伍队列用来记录job的key的,根本还是需要jobLine自己把任务重新加到队列中。 func (jobLine *JobLine) Wait(job Job) { key := job.Hash() jobLine.ack.WaitAck(key) log.Printf("job: %s, %d has been in wait", job.FuncName, job.CreatedAt) go func() { time.Sleep(10 * time.Second) if !jobLine.ack.IsAck(key) { log.Printf("job: %s, %d not get ack, add it to line", job.FuncName, job.CreatedAt) jobLine.Add(job) jobLine.ack.Ack(key) } }() } // isEmpty and isWait func (jobLine *JobLine) Status() (bool, bool) {} func (jobLine *JobLine) PopAndWait() Job {} func (jobLine *JobLine) Ack(job Job) {} func (jobLine *JobLine) IsStillWait() bool {} //**************************************** 各种Job的创建函数 **************************************** func NewMapJob(fileName string, contents string) Job { mapParams := MapParams{fileName, contents} paramsJson, _ := json.Marshal(mapParams) return Job{"map", paramsJson, time.Now().Unix()} } func NewReduceJob(nreduce int, myreduce int, fileNames []string) Job { reduceParams := ReduceParams{NReduce: nreduce, MyReduce: myreduce, FileNames: fileNames} paramsJson, _ := json.Marshal(reduceParams) return Job{"reduce", paramsJson, time.Now().Unix()} } // 可能会出现任务队列为空,但是还有任务没响应的情况,这时候就要等待 func NewWaitJob() Job { return Job{"wait", nil, time.Now().Unix()} } // 通知所有的worker退出 func NewExitJob() Job { return Job{"exit", nil, time.Now().Unix()} } //**************************************** params的定义 **************************************** type MapParams struct { FileName string Contents string } type ReduceParams struct { NReduce int MyReduce int FileNames []string }

    通信接口

    从上文可以看出,master和worker之间的交流只包括

    worker向master要任务worker告诉master我做完了

    基于此,RPC接口我设计的很简单。入参就是我上次做完的job,出参就是我这次要做的job。worker只需要把上次的出参作为这次的入参即可。

    type JobRequest struct { Job Job } type JobResponse struct { Job Job }

    Master

    master的实现没有太多好说的,直接给出定义。

    type Master struct { // Your definitions here. fileNames []string nReduce int // 这里考虑到,map和reduce是要顺序执行的。 // 而生产出所有的reduce job也是要时间的,我不希望让第一个拿reduce job的worker等。 // 所以我在master创建的时候就创建好了另一个reduce job line。当map任务全部完成,只需要将jobs指向nextJobs即可。 jobs JobLine nextJobs JobLine isMapDone bool isReduceDone bool }

    Worker

    worker给出多一点的代码,方便理解各种job的行为。

    func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { request := JobRequest{} response := JobResponse{} for { //log.Println("will call master, request", request, "response", response) call("Master.GetJob", request, &response) log.Println("call master, get", response.Job.FuncName) switch response.Job.FuncName { case "exit": return case "wait": time.Sleep(time.Second) case "map": params := MapParams{} json.Unmarshal(response.Job.ParamsJson, &params) kv := myMapf(mapf, params) e := writeFile(kv, fmt.Sprintf("%s-temp", params.FileName)) if e != nil { continue } request.Job = response.Job case "reduce": params := ReduceParams{} json.Unmarshal(response.Job.ParamsJson, &params) kv, e := myReduce(reducef, params) if e != nil { continue } e = writeFile(kv, fmt.Sprintf("mr-out-%d", params.MyReduce)) if e != nil { continue } request.Job = response.Job } } }
    Processed: 0.013, SQL: 8