从给出的样板代码和要求可以看出,这是一个生产者(master)消费者(worker)模型,且生产者要保障消费者正确消费。那么理所应当可以想到用一个带ACK响应的任务队列来实现这一功能,那么展示如何实现一个简单的消息队列。
在这个实验中,只有Map和Reduce两种函数,我们可以理解成,worker向master询问要执行什么函数,以及函数的参数。我们将这一部分做一个抽象,理解成wokrer向master要一个job,job包括了函数名,函数的参数,以及其他可能用得到的参数。
那么基本可以确定,我们要实现一个job的任务队列,并且任务队列根据ACK来确保任务被正确消费。以下只给出结构体和函数。具体实现不提。
package mr import ( "crypto/sha256" "encoding/json" "fmt" "log" "sync" "time" ) //**************************************** job的定义 **************************************** type Job struct { FuncName string ParamsJson []byte CreatedAt int64 } // 生成的hash作为 ack 记录任务是否被响应的key func (job *Job) Hash() [32]byte {} func (job *Job) IsEmpty() bool {} //**************************************** jobAck的定义 **************************************** // 用一个map来记录没有ack的任务 // 之所以不用sync.Map,是因为那个是读多写少的时候性能好,但是目前的场景,读写差不多 // 额外提一句,这个jobAck只是维护了key是否在map中,并不关心job是啥,甚至是不是job,他关心的只有key type jobAck struct { jobsDone map[[32]byte]bool lock sync.Mutex } func NewJobAck() *jobAck {} // map是否为空 func (ack *jobAck) IsAllAck() bool {} func (ack *jobAck) IsAck(key [32]byte) bool {} // 删除key func (ack *jobAck) Ack(key [32]byte) {} // 添加key func (ack *jobAck) WaitAck(key [32]byte) {} //**************************************** jobLine的定义 **************************************** // 一个line作为任务队列,一个ack用来维护未响应的任务 type JobLine struct { line []Job ack jobAck lock sync.Mutex } func NewJobLine() *JobLine {} func (jobLine *JobLine) Len() int {} func (jobLine *JobLine) Pop() Job {} func (jobLine *JobLine) Add(jobs ...Job) {} // jobAck只是队伍队列用来记录job的key的,根本还是需要jobLine自己把任务重新加到队列中。 func (jobLine *JobLine) Wait(job Job) { key := job.Hash() jobLine.ack.WaitAck(key) log.Printf("job: %s, %d has been in wait", job.FuncName, job.CreatedAt) go func() { time.Sleep(10 * time.Second) if !jobLine.ack.IsAck(key) { log.Printf("job: %s, %d not get ack, add it to line", job.FuncName, job.CreatedAt) jobLine.Add(job) jobLine.ack.Ack(key) } }() } // isEmpty and isWait func (jobLine *JobLine) Status() (bool, bool) {} func (jobLine *JobLine) PopAndWait() Job {} func (jobLine *JobLine) Ack(job Job) {} func (jobLine *JobLine) IsStillWait() bool {} //**************************************** 各种Job的创建函数 **************************************** func NewMapJob(fileName string, contents string) Job { mapParams := MapParams{fileName, contents} paramsJson, _ := json.Marshal(mapParams) return Job{"map", paramsJson, time.Now().Unix()} } func NewReduceJob(nreduce int, myreduce int, fileNames []string) Job { reduceParams := ReduceParams{NReduce: nreduce, MyReduce: myreduce, FileNames: fileNames} paramsJson, _ := json.Marshal(reduceParams) return Job{"reduce", paramsJson, time.Now().Unix()} } // 可能会出现任务队列为空,但是还有任务没响应的情况,这时候就要等待 func NewWaitJob() Job { return Job{"wait", nil, time.Now().Unix()} } // 通知所有的worker退出 func NewExitJob() Job { return Job{"exit", nil, time.Now().Unix()} } //**************************************** params的定义 **************************************** type MapParams struct { FileName string Contents string } type ReduceParams struct { NReduce int MyReduce int FileNames []string }从上文可以看出,master和worker之间的交流只包括
worker向master要任务worker告诉master我做完了基于此,RPC接口我设计的很简单。入参就是我上次做完的job,出参就是我这次要做的job。worker只需要把上次的出参作为这次的入参即可。
type JobRequest struct { Job Job } type JobResponse struct { Job Job }master的实现没有太多好说的,直接给出定义。
type Master struct { // Your definitions here. fileNames []string nReduce int // 这里考虑到,map和reduce是要顺序执行的。 // 而生产出所有的reduce job也是要时间的,我不希望让第一个拿reduce job的worker等。 // 所以我在master创建的时候就创建好了另一个reduce job line。当map任务全部完成,只需要将jobs指向nextJobs即可。 jobs JobLine nextJobs JobLine isMapDone bool isReduceDone bool }worker给出多一点的代码,方便理解各种job的行为。
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { request := JobRequest{} response := JobResponse{} for { //log.Println("will call master, request", request, "response", response) call("Master.GetJob", request, &response) log.Println("call master, get", response.Job.FuncName) switch response.Job.FuncName { case "exit": return case "wait": time.Sleep(time.Second) case "map": params := MapParams{} json.Unmarshal(response.Job.ParamsJson, ¶ms) kv := myMapf(mapf, params) e := writeFile(kv, fmt.Sprintf("%s-temp", params.FileName)) if e != nil { continue } request.Job = response.Job case "reduce": params := ReduceParams{} json.Unmarshal(response.Job.ParamsJson, ¶ms) kv, e := myReduce(reducef, params) if e != nil { continue } e = writeFile(kv, fmt.Sprintf("mr-out-%d", params.MyReduce)) if e != nil { continue } request.Job = response.Job } } }