From a60b6dfad139192b2dcbe8d9e61d63da1ed4797d Mon Sep 17 00:00:00 2001 From: Fedor Korotkiy Date: Sun, 29 Mar 2020 00:34:09 +0300 Subject: [PATCH] Add scheduler --- distbuild/disttest/fixture.go | 18 +- distbuild/pkg/build/graph.go | 41 ---- distbuild/pkg/build/id.go | 52 ++++ distbuild/pkg/build/top_sort.go | 31 +++ distbuild/pkg/build/top_sort_test.go | 29 +++ distbuild/pkg/dist/build.go | 36 +++ distbuild/pkg/dist/coordinator.go | 73 +++--- distbuild/pkg/dist/schedule.go | 68 ------ distbuild/pkg/dist/state.go | 15 -- distbuild/pkg/proto/build.go | 7 +- distbuild/pkg/proto/heartbeat.go | 4 + distbuild/pkg/scheduler/scheduler.go | 276 ++++++++++++++++++++++ distbuild/pkg/scheduler/scheduler_test.go | 113 +++++++++ distbuild/pkg/worker/job.go | 4 +- distbuild/pkg/worker/state.go | 5 +- distbuild/pkg/worker/worker.go | 22 +- go.mod | 1 + go.sum | 2 + 18 files changed, 614 insertions(+), 183 deletions(-) create mode 100644 distbuild/pkg/build/id.go create mode 100644 distbuild/pkg/build/top_sort.go create mode 100644 distbuild/pkg/build/top_sort_test.go create mode 100644 distbuild/pkg/dist/build.go delete mode 100644 distbuild/pkg/dist/schedule.go delete mode 100644 distbuild/pkg/dist/state.go create mode 100644 distbuild/pkg/scheduler/scheduler.go create mode 100644 distbuild/pkg/scheduler/scheduler_test.go diff --git a/distbuild/disttest/fixture.go b/distbuild/disttest/fixture.go index 96e8ab4..46a2e32 100644 --- a/distbuild/disttest/fixture.go +++ b/distbuild/disttest/fixture.go @@ -15,6 +15,7 @@ import ( "gitlab.com/slon/shad-go/distbuild/pkg/client" "gitlab.com/slon/shad-go/distbuild/pkg/dist" "gitlab.com/slon/shad-go/distbuild/pkg/filecache" + "gitlab.com/slon/shad-go/distbuild/pkg/proto" "gitlab.com/slon/shad-go/distbuild/pkg/worker" "gitlab.com/slon/shad-go/tools/testtool" @@ -80,6 +81,9 @@ func newEnv(t *testing.T) (e *env, cancel func()) { coordinatorCache, ) + router := http.NewServeMux() + router.Handle("/coordinator/", http.StripPrefix("/coordinator", env.Coordinator)) + for i := 0; i < nWorkers; i++ { workerName := fmt.Sprintf("worker%d", i) workerDir := filepath.Join(env.RootDir, workerName) @@ -92,7 +96,11 @@ func newEnv(t *testing.T) (e *env, cancel func()) { artifacts, err = artifact.NewCache(filepath.Join(workerDir, "artifacts")) require.NoError(t, err) + workerPrefix := fmt.Sprintf("/worker/%d", i) + workerID := proto.WorkerID("http://" + addr + workerPrefix) + w := worker.New( + workerID, coordinatorEndpoint, env.Logger.Named(workerName), fileCache, @@ -100,19 +108,13 @@ func newEnv(t *testing.T) (e *env, cancel func()) { ) env.Workers = append(env.Workers, w) - } - mux := http.NewServeMux() - mux.Handle("/coordinator/", http.StripPrefix("/coordinator", env.Coordinator)) - - for i, w := range env.Workers { - workerPrefix := fmt.Sprintf("/worker/%d", i) - mux.Handle(workerPrefix+"/", http.StripPrefix(workerPrefix, w)) + router.Handle(workerPrefix+"/", http.StripPrefix(workerPrefix, w)) } env.HTTP = &http.Server{ Addr: addr, - Handler: mux, + Handler: router, } lsn, err := net.Listen("tcp", env.HTTP.Addr) diff --git a/distbuild/pkg/build/graph.go b/distbuild/pkg/build/graph.go index 860d973..9538ac7 100644 --- a/distbuild/pkg/build/graph.go +++ b/distbuild/pkg/build/graph.go @@ -1,46 +1,5 @@ package build -import ( - "crypto/sha1" - "encoding" - "encoding/hex" - "fmt" - "path/filepath" -) - -type ID [sha1.Size]byte - -var ( - _ = encoding.TextMarshaler(ID{}) - _ = encoding.TextUnmarshaler(&ID{}) -) - -func (id ID) String() string { - return hex.EncodeToString(id[:]) -} - -func (id ID) Path() string { - return filepath.Join(hex.EncodeToString(id[:1]), hex.EncodeToString(id[:])) -} - -func (id ID) MarshalText() ([]byte, error) { - return []byte(hex.EncodeToString(id[:])), nil -} - -func (id *ID) UnmarshalText(b []byte) error { - raw, err := hex.DecodeString(string(b)) - if err != nil { - return err - } - - if len(raw) != len(id) { - return fmt.Errorf("invalid id size: %q", b) - } - - copy(id[:], raw) - return nil -} - // Job описывает одну вершину графа сборки. type Job struct { // ID задаёт уникальный идентификатор джоба. diff --git a/distbuild/pkg/build/id.go b/distbuild/pkg/build/id.go new file mode 100644 index 0000000..bbad3c4 --- /dev/null +++ b/distbuild/pkg/build/id.go @@ -0,0 +1,52 @@ +package build + +import ( + "crypto/rand" + "crypto/sha1" + "encoding" + "encoding/hex" + "fmt" + "path/filepath" +) + +type ID [sha1.Size]byte + +var ( + _ = encoding.TextMarshaler(ID{}) + _ = encoding.TextUnmarshaler(&ID{}) +) + +func (id ID) String() string { + return hex.EncodeToString(id[:]) +} + +func (id ID) Path() string { + return filepath.Join(hex.EncodeToString(id[:1]), hex.EncodeToString(id[:])) +} + +func (id ID) MarshalText() ([]byte, error) { + return []byte(hex.EncodeToString(id[:])), nil +} + +func (id *ID) UnmarshalText(b []byte) error { + raw, err := hex.DecodeString(string(b)) + if err != nil { + return err + } + + if len(raw) != len(id) { + return fmt.Errorf("invalid id size: %q", b) + } + + copy(id[:], raw) + return nil +} + +func NewID() ID { + var id ID + _, err := rand.Read(id[:]) + if err != nil { + panic(fmt.Sprintf("crypto/rand is unavailable: %v", err)) + } + return id +} diff --git a/distbuild/pkg/build/top_sort.go b/distbuild/pkg/build/top_sort.go new file mode 100644 index 0000000..78ddc98 --- /dev/null +++ b/distbuild/pkg/build/top_sort.go @@ -0,0 +1,31 @@ +package build + +// TopSort sorts jobs in topological order assuming dependency graph contains no cycles. +func TopSort(jobs []Job) []Job { + var sorted []Job + visited := make([]bool, len(jobs)) + + jobIDIndex := map[ID]int{} + for i, j := range jobs { + jobIDIndex[j.ID] = i + } + + var visit func(jobIndex int) + visit = func(jobIndex int) { + if visited[jobIndex] { + return + } + + visited[jobIndex] = true + for _, dep := range jobs[jobIndex].Deps { + visit(jobIDIndex[dep]) + } + sorted = append(sorted, jobs[jobIndex]) + } + + for i := range jobs { + visit(i) + } + + return sorted +} diff --git a/distbuild/pkg/build/top_sort_test.go b/distbuild/pkg/build/top_sort_test.go new file mode 100644 index 0000000..9133068 --- /dev/null +++ b/distbuild/pkg/build/top_sort_test.go @@ -0,0 +1,29 @@ +package build + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTopSort(t *testing.T) { + jobs := []Job{ + { + ID: ID{'a'}, + Deps: []ID{{'b'}}, + }, + { + ID: ID{'b'}, + Deps: []ID{{'c'}}, + }, + { + ID: ID{'c'}, + }, + } + + sorted := TopSort(jobs) + require.Equal(t, 3, len(sorted)) + require.Equal(t, ID{'c'}, sorted[0].ID) + require.Equal(t, ID{'b'}, sorted[1].ID) + require.Equal(t, ID{'a'}, sorted[2].ID) +} diff --git a/distbuild/pkg/dist/build.go b/distbuild/pkg/dist/build.go new file mode 100644 index 0000000..23c3fbc --- /dev/null +++ b/distbuild/pkg/dist/build.go @@ -0,0 +1,36 @@ +package dist + +import ( + "context" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" + "gitlab.com/slon/shad-go/distbuild/pkg/proto" +) + +type Build struct { + ID build.ID + Graph *build.Graph + + coordinator *Coordinator + uploadComplete chan struct{} +} + +func NewBuild(graph *build.Graph, coordinator *Coordinator) *Build { + id := build.NewID() + + return &Build{ + ID: id, + Graph: graph, + + coordinator: coordinator, + uploadComplete: make(chan struct{}), + } +} + +func (b *Build) Run(ctx context.Context, onStatusUpdate func(update proto.StatusUpdate) error) error { + panic("implement me") +} + +func (b *Build) UploadComplete() { + close(b.uploadComplete) +} diff --git a/distbuild/pkg/dist/coordinator.go b/distbuild/pkg/dist/coordinator.go index d4779fe..8c7850c 100644 --- a/distbuild/pkg/dist/coordinator.go +++ b/distbuild/pkg/dist/coordinator.go @@ -13,19 +13,22 @@ import ( "gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/filecache" "gitlab.com/slon/shad-go/distbuild/pkg/proto" + "gitlab.com/slon/shad-go/distbuild/pkg/scheduler" ) -type Build struct { -} - type Coordinator struct { log *zap.Logger mux *http.ServeMux fileCache *filecache.Cache - mu sync.Mutex - scheduledJobs map[build.ID]*scheduledJob - queue []*scheduledJob + mu sync.Mutex + builds map[build.ID]*Build + scheduler *scheduler.Scheduler +} + +var defaultConfig = scheduler.Config{ + CacheTimeout: time.Millisecond * 10, + DepsTimeout: time.Millisecond * 100, } func NewCoordinator( @@ -37,10 +40,12 @@ func NewCoordinator( mux: http.NewServeMux(), fileCache: fileCache, - scheduledJobs: make(map[build.ID]*scheduledJob), + builds: make(map[build.ID]*Build), + scheduler: scheduler.NewScheduler(log, defaultConfig), } c.mux.HandleFunc("/build", c.Build) + c.mux.HandleFunc("/signal", c.Signal) c.mux.HandleFunc("/heartbeat", c.Heartbeat) return c } @@ -69,12 +74,17 @@ func (c *Coordinator) doBuild(w http.ResponseWriter, r *http.Request) error { for _, job := range g.Jobs { job := job - s := c.scheduleJob(&job) - <-s.done + s := c.scheduler.ScheduleJob(&job) + + select { + case <-r.Context().Done(): + return r.Context().Err() + case <-s.Finished: + } c.log.Debug("job finished", zap.String("job_id", job.ID.String())) - update := proto.StatusUpdate{JobFinished: s.finished} + update := proto.StatusUpdate{JobFinished: s.Result} if err := enc.Encode(update); err != nil { return err } @@ -84,6 +94,18 @@ func (c *Coordinator) doBuild(w http.ResponseWriter, r *http.Request) error { return enc.Encode(update) } +func (c *Coordinator) Signal(w http.ResponseWriter, r *http.Request) { + c.log.Debug("build signal started") + if err := c.doHeartbeat(w, r); err != nil { + c.log.Error("build signal failed", zap.Error(err)) + + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(err.Error())) + return + } + c.log.Debug("build signal finished") +} + func (c *Coordinator) Build(w http.ResponseWriter, r *http.Request) { if err := c.doBuild(w, r); err != nil { c.log.Error("build failed", zap.Error(err)) @@ -100,34 +122,21 @@ func (c *Coordinator) doHeartbeat(w http.ResponseWriter, r *http.Request) error return fmt.Errorf("invalid request: %w", err) } + c.scheduler.RegisterWorker(req.WorkerID) + for _, job := range req.FinishedJob { job := job - scheduled, ok := c.lookupJob(job.ID) - if !ok { - continue - } - - c.log.Debug("job finished") - scheduled.finish(&job) + c.scheduler.OnJobComplete(req.WorkerID, job.ID, &job) } - var rsp proto.HeartbeatResponse + rsp := proto.HeartbeatResponse{ + JobsToRun: map[build.ID]proto.JobSpec{}, + } - var job *build.Job - for i := 0; i < 10; i++ { - var ok bool - job, ok = c.pickJob() - - if ok { - rsp.JobsToRun = map[build.ID]proto.JobSpec{ - job.ID: {Job: *job}, - } - - break - } - - time.Sleep(time.Millisecond) + job := c.scheduler.PickJob(req.WorkerID, r.Context().Done()) + if job != nil { + rsp.JobsToRun[job.Job.ID] = proto.JobSpec{Job: *job.Job} } if err := json.NewEncoder(w).Encode(rsp); err != nil { diff --git a/distbuild/pkg/dist/schedule.go b/distbuild/pkg/dist/schedule.go deleted file mode 100644 index 16ccf28..0000000 --- a/distbuild/pkg/dist/schedule.go +++ /dev/null @@ -1,68 +0,0 @@ -package dist - -import ( - "sync" - - "gitlab.com/slon/shad-go/distbuild/pkg/build" - "gitlab.com/slon/shad-go/distbuild/pkg/proto" -) - -type scheduledJob struct { - job *build.Job - finished *proto.JobResult - - mu sync.Mutex - done chan struct{} -} - -func newScheduledJob(job *build.Job) *scheduledJob { - return &scheduledJob{ - job: job, - done: make(chan struct{}), - } -} - -func (s *scheduledJob) finish(f *proto.JobResult) { - s.mu.Lock() - defer s.mu.Unlock() - - if s.finished == nil { - s.finished = f - close(s.done) - } -} - -func (c *Coordinator) scheduleJob(job *build.Job) *scheduledJob { - c.mu.Lock() - defer c.mu.Unlock() - - if scheduled, ok := c.scheduledJobs[job.ID]; ok { - return scheduled - } - - scheduled := newScheduledJob(job) - c.scheduledJobs[job.ID] = scheduled - c.queue = append(c.queue, scheduled) - return scheduled -} - -func (c *Coordinator) pickJob() (*build.Job, bool) { - c.mu.Lock() - defer c.mu.Unlock() - - if len(c.queue) == 0 { - return nil, false - } - - job := c.queue[0].job - c.queue = c.queue[1:] - return job, true -} - -func (c *Coordinator) lookupJob(id build.ID) (*scheduledJob, bool) { - c.mu.Lock() - defer c.mu.Unlock() - - scheduled, ok := c.scheduledJobs[id] - return scheduled, ok -} diff --git a/distbuild/pkg/dist/state.go b/distbuild/pkg/dist/state.go deleted file mode 100644 index c5a0323..0000000 --- a/distbuild/pkg/dist/state.go +++ /dev/null @@ -1,15 +0,0 @@ -package dist - -import ( - "gitlab.com/slon/shad-go/distbuild/pkg/build" - "gitlab.com/slon/shad-go/distbuild/pkg/proto" -) - -type Cluster struct { - sourceFiles map[build.ID]map[proto.WorkerID]struct{} - artifacts map[build.ID]map[proto.WorkerID]struct{} -} - -func (c *Cluster) FindOptimalWorkers(task build.ID, sources, deps []build.ID) []proto.WorkerID { - panic("implement me") -} diff --git a/distbuild/pkg/proto/build.go b/distbuild/pkg/proto/build.go index 2b5bb7e..918914a 100644 --- a/distbuild/pkg/proto/build.go +++ b/distbuild/pkg/proto/build.go @@ -9,9 +9,10 @@ type MissingSources struct { } type StatusUpdate struct { - JobFinished *JobResult - BuildFailed *BuildFailed - BuildFinished *BuildFinished + SourcesMissing *MissingSources + JobFinished *JobResult + BuildFailed *BuildFailed + BuildFinished *BuildFinished } type BuildFailed struct { diff --git a/distbuild/pkg/proto/heartbeat.go b/distbuild/pkg/proto/heartbeat.go index d0f123d..fb1c7c2 100644 --- a/distbuild/pkg/proto/heartbeat.go +++ b/distbuild/pkg/proto/heartbeat.go @@ -20,6 +20,10 @@ type JobResult struct { type WorkerID string +func (w WorkerID) String() string { + return string(w) +} + type HeartbeatRequest struct { // WorkerID задаёт персистентный идентификатор данного воркера. // diff --git a/distbuild/pkg/scheduler/scheduler.go b/distbuild/pkg/scheduler/scheduler.go new file mode 100644 index 0000000..be817c4 --- /dev/null +++ b/distbuild/pkg/scheduler/scheduler.go @@ -0,0 +1,276 @@ +package scheduler + +import ( + "sync" + "time" + + "go.uber.org/zap" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" + "gitlab.com/slon/shad-go/distbuild/pkg/proto" +) + +type PendingJob struct { + Job *build.Job + Result *proto.JobResult + Finished chan struct{} + + mu sync.Mutex + pickedUp chan struct{} +} + +func (p *PendingJob) finish(res *proto.JobResult) { + p.Result = res + close(p.Finished) +} + +func (p *PendingJob) pickUp() bool { + p.mu.Lock() + defer p.mu.Unlock() + + select { + case <-p.pickedUp: + return false + default: + close(p.pickedUp) + return true + } +} + +type jobQueue struct { + mu sync.Mutex + jobs []*PendingJob +} + +func (q *jobQueue) put(job *PendingJob) { + q.mu.Lock() + defer q.mu.Unlock() + + q.jobs = append(q.jobs, job) +} + +func (q *jobQueue) pop() *PendingJob { + q.mu.Lock() + defer q.mu.Unlock() + + if len(q.jobs) == 0 { + return nil + } + + job := q.jobs[0] + q.jobs = q.jobs[1:] + return job +} + +type Config struct { + CacheTimeout time.Duration + DepsTimeout time.Duration +} + +type Scheduler struct { + l *zap.Logger + config Config + + mu sync.Mutex + + cachedJobs map[build.ID]map[proto.WorkerID]struct{} + pendingJobs map[build.ID]*PendingJob + + cacheLocalQueue map[proto.WorkerID]*jobQueue + depLocalQueue map[proto.WorkerID]*jobQueue + globalQueue chan *PendingJob +} + +func NewScheduler(l *zap.Logger, config Config) *Scheduler { + return &Scheduler{ + l: l, + config: config, + + cachedJobs: make(map[build.ID]map[proto.WorkerID]struct{}), + pendingJobs: make(map[build.ID]*PendingJob), + + cacheLocalQueue: make(map[proto.WorkerID]*jobQueue), + depLocalQueue: make(map[proto.WorkerID]*jobQueue), + globalQueue: make(chan *PendingJob), + } +} + +func (c *Scheduler) RegisterWorker(workerID proto.WorkerID) { + c.mu.Lock() + defer c.mu.Unlock() + + _, ok := c.cacheLocalQueue[workerID] + if ok { + return + } + + c.cacheLocalQueue[workerID] = new(jobQueue) + c.depLocalQueue[workerID] = new(jobQueue) +} + +func (c *Scheduler) OnJobComplete(workerID proto.WorkerID, jobID build.ID, res *proto.JobResult) bool { + c.l.Debug("job completed", zap.String("worker_id", workerID.String()), zap.String("job_id", jobID.String())) + + c.mu.Lock() + pendingJob, pendingFound := c.pendingJobs[jobID] + if pendingFound { + delete(c.pendingJobs, jobID) + } + + job, ok := c.cachedJobs[jobID] + if !ok { + job = make(map[proto.WorkerID]struct{}) + c.cachedJobs[jobID] = job + } + job[workerID] = struct{}{} + + c.mu.Unlock() + + if !pendingFound { + return false + } + + c.l.Debug("finishing pending job", zap.String("job_id", jobID.String())) + pendingJob.finish(res) + return true +} + +func (c *Scheduler) findOptimalWorkers(jobID build.ID, deps []build.ID) (cacheLocal, depLocal []proto.WorkerID) { + depLocalSet := map[proto.WorkerID]struct{}{} + + c.mu.Lock() + defer c.mu.Unlock() + + for workerID := range c.cachedJobs[jobID] { + cacheLocal = append(cacheLocal, workerID) + } + + for _, dep := range deps { + for workerID := range c.cachedJobs[dep] { + if _, ok := depLocalSet[workerID]; !ok { + depLocal = append(depLocal, workerID) + depLocalSet[workerID] = struct{}{} + } + } + } + + return +} + +var timeAfter = time.After + +func (c *Scheduler) doScheduleJob(job *PendingJob) { + cacheLocal, depLocal := c.findOptimalWorkers(job.Job.ID, job.Job.Deps) + + if len(cacheLocal) != 0 { + c.mu.Lock() + for _, workerID := range cacheLocal { + c.cacheLocalQueue[workerID].put(job) + } + c.mu.Unlock() + + c.l.Debug("job is put into cache-local queues", zap.String("job_id", job.Job.ID.String())) + select { + case <-job.pickedUp: + c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String())) + return + case <-timeAfter(c.config.CacheTimeout): + } + } + + if len(depLocal) != 0 { + c.mu.Lock() + for _, workerID := range depLocal { + c.depLocalQueue[workerID].put(job) + } + c.mu.Unlock() + + c.l.Debug("job is put into dep-local queues", zap.String("job_id", job.Job.ID.String())) + select { + case <-job.pickedUp: + c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String())) + return + case <-timeAfter(c.config.DepsTimeout): + } + } + + c.l.Debug("job is put into global queue", zap.String("job_id", job.Job.ID.String())) + select { + case c.globalQueue <- job: + case <-job.pickedUp: + } + c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String())) +} + +func (c *Scheduler) ScheduleJob(job *build.Job) *PendingJob { + c.mu.Lock() + pendingJob, running := c.pendingJobs[job.ID] + if !running { + pendingJob = &PendingJob{ + Job: job, + Finished: make(chan struct{}), + + pickedUp: make(chan struct{}), + } + + c.pendingJobs[job.ID] = pendingJob + } + c.mu.Unlock() + + if !running { + c.l.Debug("job is scheduled", zap.String("job_id", job.ID.String())) + go c.doScheduleJob(pendingJob) + } else { + c.l.Debug("job is pending", zap.String("job_id", job.ID.String())) + } + + return pendingJob +} + +func (c *Scheduler) PickJob(workerID proto.WorkerID, canceled <-chan struct{}) *PendingJob { + c.l.Debug("picking next job", zap.String("worker_id", workerID.String())) + + var cacheLocal, depLocal *jobQueue + + c.mu.Lock() + cacheLocal = c.cacheLocalQueue[workerID] + depLocal = c.depLocalQueue[workerID] + c.mu.Unlock() + + for { + job := cacheLocal.pop() + if job == nil { + break + } + + if job.pickUp() { + c.l.Debug("picked job from cache-local queue", zap.String("worker_id", workerID.String()), zap.String("job_id", job.Job.ID.String())) + return job + } + } + + for { + job := depLocal.pop() + if job == nil { + break + } + + if job.pickUp() { + c.l.Debug("picked job from dep-local queue", zap.String("worker_id", workerID.String()), zap.String("job_id", job.Job.ID.String())) + return job + } + } + + for { + select { + case job := <-c.globalQueue: + if job.pickUp() { + c.l.Debug("picked job from global queue", zap.String("worker_id", workerID.String()), zap.String("job_id", job.Job.ID.String())) + return job + } + + case <-canceled: + return nil + } + } +} diff --git a/distbuild/pkg/scheduler/scheduler_test.go b/distbuild/pkg/scheduler/scheduler_test.go new file mode 100644 index 0000000..11bcc37 --- /dev/null +++ b/distbuild/pkg/scheduler/scheduler_test.go @@ -0,0 +1,113 @@ +package scheduler + +import ( + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "go.uber.org/zap/zaptest" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" + "gitlab.com/slon/shad-go/distbuild/pkg/proto" +) + +const ( + workerID0 proto.WorkerID = "w0" +) + +func TestScheduler(t *testing.T) { + defer goleak.VerifyNone(t) + + clock := clockwork.NewFakeClock() + timeAfter = clock.After + defer func() { timeAfter = time.After }() + + config := Config{ + CacheTimeout: time.Second, + DepsTimeout: time.Minute, + } + + t.Run("SingleJob", func(t *testing.T) { + s := NewScheduler(zaptest.NewLogger(t), config) + + job0 := &build.Job{ID: build.NewID()} + pendingJob0 := s.ScheduleJob(job0) + + s.RegisterWorker(workerID0) + pickerJob := s.PickJob(workerID0, nil) + + require.Equal(t, pendingJob0, pickerJob) + + result := &proto.JobResult{ID: job0.ID, ExitCode: 0} + s.OnJobComplete(workerID0, job0.ID, result) + + select { + case <-pendingJob0.Finished: + require.Equal(t, pendingJob0.Result, result) + + default: + t.Fatalf("job0 is not finished") + } + }) + + t.Run("PickJobTimeout", func(t *testing.T) { + s := NewScheduler(zaptest.NewLogger(t), config) + + canceled := make(chan struct{}) + close(canceled) + + s.RegisterWorker(workerID0) + require.Nil(t, s.PickJob(workerID0, canceled)) + }) + + t.Run("CacheLocalScheduling", func(t *testing.T) { + s := NewScheduler(zaptest.NewLogger(t), config) + + job0 := &build.Job{ID: build.NewID()} + job1 := &build.Job{ID: build.NewID()} + + s.RegisterWorker(workerID0) + s.OnJobComplete(workerID0, job0.ID, &proto.JobResult{}) + + pendingJob1 := s.ScheduleJob(job1) + pendingJob0 := s.ScheduleJob(job0) + + // job0 scheduling should be blocked on CacheTimeout + clock.BlockUntil(1) + + pickedJob := s.PickJob(workerID0, nil) + require.Equal(t, pendingJob0, pickedJob) + + pickedJob = s.PickJob(workerID0, nil) + require.Equal(t, pendingJob1, pickedJob) + + clock.Advance(time.Hour) + }) + + t.Run("DependencyLocalScheduling", func(t *testing.T) { + s := NewScheduler(zaptest.NewLogger(t), config) + + job0 := &build.Job{ID: build.NewID()} + job1 := &build.Job{ID: build.NewID(), Deps: []build.ID{job0.ID}} + job2 := &build.Job{ID: build.NewID()} + + s.RegisterWorker(workerID0) + s.OnJobComplete(workerID0, job0.ID, &proto.JobResult{}) + + pendingJob2 := s.ScheduleJob(job2) + pendingJob1 := s.ScheduleJob(job1) + + // job1 should be blocked on DepsTimeout + clock.BlockUntil(1) + + pickedJob := s.PickJob(workerID0, nil) + require.Equal(t, pendingJob1, pickedJob) + + pickedJob = s.PickJob(workerID0, nil) + require.Equal(t, pendingJob2, pickedJob) + + clock.Advance(time.Hour) + }) +} diff --git a/distbuild/pkg/worker/job.go b/distbuild/pkg/worker/job.go index d5f1de1..f12b126 100644 --- a/distbuild/pkg/worker/job.go +++ b/distbuild/pkg/worker/job.go @@ -32,7 +32,9 @@ func (w *Worker) getJobFromCache(jobID build.ID) (*proto.JobResult, error) { } defer unlock() - res := &proto.JobResult{} + res := &proto.JobResult{ + ID: jobID, + } exitCodeStr, err := ioutil.ReadFile(filepath.Join(aRoot, exitCodeFileName)) if err != nil { diff --git a/distbuild/pkg/worker/state.go b/distbuild/pkg/worker/state.go index 528c4ec..5722c7d 100644 --- a/distbuild/pkg/worker/state.go +++ b/distbuild/pkg/worker/state.go @@ -1,8 +1,6 @@ package worker import ( - "go.uber.org/zap" - "gitlab.com/slon/shad-go/distbuild/pkg/proto" ) @@ -11,6 +9,7 @@ func (w *Worker) buildHeartbeat() *proto.HeartbeatRequest { defer w.mu.Unlock() req := &proto.HeartbeatRequest{ + WorkerID: w.id, FinishedJob: w.finishedJobs, } @@ -19,8 +18,6 @@ func (w *Worker) buildHeartbeat() *proto.HeartbeatRequest { } func (w *Worker) jobFinished(job *proto.JobResult) { - w.log.Debug("job finished", zap.String("job_id", job.ID.String())) - w.mu.Lock() defer w.mu.Unlock() diff --git a/distbuild/pkg/worker/worker.go b/distbuild/pkg/worker/worker.go index e55795c..635162e 100644 --- a/distbuild/pkg/worker/worker.go +++ b/distbuild/pkg/worker/worker.go @@ -18,6 +18,7 @@ import ( ) type Worker struct { + id proto.WorkerID coordinatorEndpoint string log *zap.Logger @@ -34,12 +35,14 @@ type Worker struct { } func New( + workerID proto.WorkerID, coordinatorEndpoint string, log *zap.Logger, fileCache *filecache.Cache, artifacts *artifact.Cache, ) *Worker { return &Worker{ + id: workerID, coordinatorEndpoint: coordinatorEndpoint, log: log, fileCache: fileCache, @@ -54,27 +57,19 @@ func (w *Worker) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } func (w *Worker) recover() error { - //err := w.fileCache.Range(func(file build.ID) error { - // w.newSources = append(w.newSources, file) - // return nil - //}) - //if err != nil { - // return err - //} - return w.artifacts.Range(func(file build.ID) error { w.newArtifacts = append(w.newArtifacts, file) return nil }) } -func (w *Worker) sendHeartbeat(req *proto.HeartbeatRequest) (*proto.HeartbeatResponse, error) { +func (w *Worker) sendHeartbeat(ctx context.Context, req *proto.HeartbeatRequest) (*proto.HeartbeatResponse, error) { reqJS, err := json.Marshal(req) if err != nil { return nil, err } - httpReq, err := http.NewRequest("POST", w.coordinatorEndpoint+"/heartbeat", bytes.NewBuffer(reqJS)) + httpReq, err := http.NewRequestWithContext(ctx, "POST", w.coordinatorEndpoint+"/heartbeat", bytes.NewBuffer(reqJS)) if err != nil { return nil, err } @@ -104,7 +99,7 @@ func (w *Worker) Run(ctx context.Context) error { for { w.log.Debug("sending heartbeat request") - rsp, err := w.sendHeartbeat(w.buildHeartbeat()) + rsp, err := w.sendHeartbeat(ctx, w.buildHeartbeat()) if err != nil { if ctx.Err() != nil { return ctx.Err() @@ -118,13 +113,18 @@ func (w *Worker) Run(ctx context.Context) error { for _, spec := range rsp.JobsToRun { spec := spec + + w.log.Debug("running job", zap.String("job_id", spec.Job.ID.String())) result, err := w.runJob(ctx, &spec) if err != nil { errStr := fmt.Sprintf("job %s failed: %v", spec.Job.ID, err) + + w.log.Debug("job failed", zap.String("job_id", spec.Job.ID.String()), zap.Error(err)) w.jobFinished(&proto.JobResult{ID: spec.Job.ID, Error: &errStr}) continue } + w.log.Debug("job finished", zap.String("job_id", spec.Job.ID.String())) w.jobFinished(result) } } diff --git a/go.mod b/go.mod index d28eb04..6a78e46 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/golang/mock v1.4.1 github.com/gorilla/handlers v1.4.2 github.com/gorilla/mux v1.7.4 + github.com/jonboulle/clockwork v0.1.0 github.com/spf13/cobra v0.0.5 github.com/stretchr/testify v1.4.0 go.uber.org/goleak v1.0.0 diff --git a/go.sum b/go.sum index 21e1d52..20e5cb7 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7 github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= +github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=