diff --git a/distbuild/pkg/api/heartbeat.go b/distbuild/pkg/api/heartbeat.go index 09b4da2..228573e 100644 --- a/distbuild/pkg/api/heartbeat.go +++ b/distbuild/pkg/api/heartbeat.go @@ -38,10 +38,6 @@ type HeartbeatRequest struct { // в данный момент. RunningJobs []build.ID - DownloadingSources []build.ID - - DownloadingArtifacts []build.ID - // FreeSlots сообщаяет, сколько еще процессов можно запустить на этом воркере. FreeSlots int @@ -51,9 +47,6 @@ type HeartbeatRequest struct { // AddedArtifacts говорит, какие артефакты появились в кеше на этой итерации цикла. AddedArtifacts []build.ID - - // AddedSourceFiles говорит, какие файлы появились в кеше на этой итерации цикла. - AddedSourceFiles []build.ID } // JobSpec описывает джоб, который нужно запустить. diff --git a/distbuild/pkg/dist/coordinator.go b/distbuild/pkg/dist/coordinator.go index 352d453..46e69c0 100644 --- a/distbuild/pkg/dist/coordinator.go +++ b/distbuild/pkg/dist/coordinator.go @@ -111,7 +111,7 @@ func (c *Coordinator) Heartbeat(ctx context.Context, req *api.HeartbeatRequest) JobsToRun: map[build.ID]api.JobSpec{}, } - job := c.scheduler.PickJob(req.WorkerID, ctx.Done()) + job := c.scheduler.PickJob(ctx, req.WorkerID) if job != nil { rsp.JobsToRun[job.Job.ID] = *job.Job } diff --git a/distbuild/pkg/scheduler/README.md b/distbuild/pkg/scheduler/README.md new file mode 100644 index 0000000..e989df6 --- /dev/null +++ b/distbuild/pkg/scheduler/README.md @@ -0,0 +1,41 @@ +# scheduler + +Пакет `scheduler` реализует планировщик системы. `scheduler.Scheduler` хранит полное состояние кластера +и принимает решение на каком воркере и какой джоб нужно запустить. + +Шедулер является точкой координации между воркерами и билдами. Бегущие билды обращаются к шедулеру, +передавая джобы в функцию `ScheduleJob`. Воркеры забирают джобы из шедулера вызывая функцию `PickJob`. + +Вы можете отложить реализацию полной версии шедулера на последний шаг, и реализовать упрощённую версию +на одном глобальном канале. Такой реализации будет достаточно, чтобы работали все интеграционные тесты с одним +воркером. + +## Алгоритм планирования + +Планировщик поддерживает множество очередей: + 1. Одна глобальная очередь + 2. По две локальные очереди на воркер. + +При запросе нового джоба воркер выбирает случайную джобу из трех очередей - глобальной, и двух локальных относящихся +к этому воркеру. Случайная очередь выбирается одним вызовом `select {}`. + +Ожидающий исполнения джоб всегда находится в первой локальной очереди воркеров, на которых есть +результаты работы этого джоба. + +Если джоб ждёт выполнения дольше `CacheTimeout` или если в момент `SchedulerJob` джоба небыло в кеше ни на одном +из воркеров, то он находится во всех вторых локальных очередях воркеров, на которых есть хотябы один артефакт +из множества зависимостей этого джоба. + +Определения первой и второй локальной очереди не зависят от того, в каком порядке в шедулер пришли джобы +и информация о кеше артефактов. Тоесть, если джоб уже находится в глобальной очереди, и в этот момент приходит +информация, что этот джоб находится в кеше на `W0`, то джоб должен быть добавлен +в первую локальную очередь `W0`. + +Если джоб ждёт выполнения дольше `DepTimeout`, то он помещается в глобальную очередь. + +## Тестирование + +Вместо реального времени, юниттесты шедулера используют библиотеку `clockwork`. Это накладывает ограничения +на детали вашей реализации. Ожидание `CacheTimeout` и `DepTimeout` должно быть реализовано как `select {}` на +канале, который вернула функция `timeAfter`. Мы считаем что `CacheTimeout > DepTimeout`, и ожидание этих +таймаутов происходит последовательно в одной горутине. \ No newline at end of file diff --git a/distbuild/pkg/scheduler/export_test.go b/distbuild/pkg/scheduler/export_test.go new file mode 100644 index 0000000..9675c6f --- /dev/null +++ b/distbuild/pkg/scheduler/export_test.go @@ -0,0 +1,3 @@ +package scheduler + +var TimeAfter = &timeAfter diff --git a/distbuild/pkg/scheduler/scheduler.go b/distbuild/pkg/scheduler/scheduler.go index a5c65db..87921b9 100644 --- a/distbuild/pkg/scheduler/scheduler.go +++ b/distbuild/pkg/scheduler/scheduler.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "sync" "time" @@ -12,8 +13,8 @@ import ( type PendingJob struct { Job *api.JobSpec - Result *api.JobResult Finished chan struct{} + Result *api.JobResult mu sync.Mutex pickedUp chan struct{} @@ -37,29 +38,16 @@ func (p *PendingJob) pickUp() bool { } } -type jobQueue struct { - mu sync.Mutex - jobs []*PendingJob -} - -func (q *jobQueue) put(job *PendingJob) { - q.mu.Lock() - defer q.mu.Unlock() - - q.jobs = append(q.jobs, job) -} - -func (q *jobQueue) pop() *PendingJob { - q.mu.Lock() - defer q.mu.Unlock() - - if len(q.jobs) == 0 { - return nil +func (p *PendingJob) enqueue(q chan *PendingJob) { + select { + case q <- p: + case <-p.pickedUp: } +} - job := q.jobs[0] - q.jobs = q.jobs[1:] - return job +type workerQueue struct { + cacheQueue chan *PendingJob + depQueue chan *PendingJob } type Config struct { @@ -73,12 +61,13 @@ type Scheduler struct { mu sync.Mutex - cachedJobs map[build.ID]map[api.WorkerID]struct{} - pendingJobs map[build.ID]*PendingJob + cachedJobs map[build.ID]map[api.WorkerID]struct{} - cacheLocalQueue map[api.WorkerID]*jobQueue - depLocalQueue map[api.WorkerID]*jobQueue - globalQueue chan *PendingJob + pendingJobs map[build.ID]*PendingJob + pendingJobDeps map[build.ID]map[*PendingJob]struct{} + + workerQueue map[api.WorkerID]*workerQueue + globalQueue chan *PendingJob } func NewScheduler(l *zap.Logger, config Config) *Scheduler { @@ -86,12 +75,12 @@ func NewScheduler(l *zap.Logger, config Config) *Scheduler { l: l, config: config, - cachedJobs: make(map[build.ID]map[api.WorkerID]struct{}), - pendingJobs: make(map[build.ID]*PendingJob), + cachedJobs: make(map[build.ID]map[api.WorkerID]struct{}), + pendingJobs: make(map[build.ID]*PendingJob), + pendingJobDeps: make(map[build.ID]map[*PendingJob]struct{}), - cacheLocalQueue: make(map[api.WorkerID]*jobQueue), - depLocalQueue: make(map[api.WorkerID]*jobQueue), - globalQueue: make(chan *PendingJob), + workerQueue: make(map[api.WorkerID]*workerQueue), + globalQueue: make(chan *PendingJob), } } @@ -99,13 +88,15 @@ func (c *Scheduler) RegisterWorker(workerID api.WorkerID) { c.mu.Lock() defer c.mu.Unlock() - _, ok := c.cacheLocalQueue[workerID] + _, ok := c.workerQueue[workerID] if ok { return } - c.cacheLocalQueue[workerID] = new(jobQueue) - c.depLocalQueue[workerID] = new(jobQueue) + c.workerQueue[workerID] = &workerQueue{ + cacheQueue: make(chan *PendingJob), + depQueue: make(chan *PendingJob), + } } func (c *Scheduler) OnJobComplete(workerID api.WorkerID, jobID build.ID, res *api.JobResult) bool { @@ -124,6 +115,11 @@ func (c *Scheduler) OnJobComplete(workerID api.WorkerID, jobID build.ID, res *ap } job[workerID] = struct{}{} + workerQueue := c.workerQueue[workerID] + for waiter := range c.pendingJobDeps[jobID] { + go waiter.enqueue(workerQueue.depQueue) + } + c.mu.Unlock() if !pendingFound { @@ -135,41 +131,38 @@ func (c *Scheduler) OnJobComplete(workerID api.WorkerID, jobID build.ID, res *ap return true } -func (c *Scheduler) findOptimalWorkers(jobID build.ID, deps []build.ID) (cacheLocal, depLocal []api.WorkerID) { - depLocalSet := map[api.WorkerID]struct{}{} +func (c *Scheduler) enqueueCacheLocal(job *PendingJob) bool { + cached := false - c.mu.Lock() - defer c.mu.Unlock() - - for workerID := range c.cachedJobs[jobID] { - cacheLocal = append(cacheLocal, workerID) + for workerID := range c.cachedJobs[job.Job.ID] { + cached = true + go job.enqueue(c.workerQueue[workerID].cacheQueue) } - for _, dep := range deps { - for workerID := range c.cachedJobs[dep] { - if _, ok := depLocalSet[workerID]; !ok { - depLocal = append(depLocal, workerID) - depLocalSet[workerID] = struct{}{} - } - } - } - - return + return cached } var timeAfter = time.After -func (c *Scheduler) doScheduleJob(job *PendingJob) { - cacheLocal, depLocal := c.findOptimalWorkers(job.Job.ID, job.Job.Deps) +func (c *Scheduler) putDepQueue(job *PendingJob, dep build.ID) { + depJobs, ok := c.pendingJobDeps[dep] + if !ok { + depJobs = make(map[*PendingJob]struct{}) + c.pendingJobDeps[dep] = depJobs + } + depJobs[job] = struct{}{} +} - if len(cacheLocal) != 0 { - c.mu.Lock() - for _, workerID := range cacheLocal { - c.cacheLocalQueue[workerID].put(job) - } - c.mu.Unlock() +func (c *Scheduler) deleteDepQueue(job *PendingJob, dep build.ID) { + depJobs := c.pendingJobDeps[dep] + delete(depJobs, job) + if len(depJobs) == 0 { + delete(c.pendingJobDeps, dep) + } +} - c.l.Debug("job is put into cache-local queues", zap.String("job_id", job.Job.ID.String())) +func (c *Scheduler) doScheduleJob(job *PendingJob, cached bool) { + if cached { select { case <-job.pickedUp: c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String())) @@ -178,31 +171,51 @@ func (c *Scheduler) doScheduleJob(job *PendingJob) { } } - if len(depLocal) != 0 { + c.mu.Lock() + workers := make(map[api.WorkerID]struct{}) + + for _, dep := range job.Job.Deps { + c.putDepQueue(job, dep) + + for workerID := range c.cachedJobs[dep] { + if _, ok := workers[workerID]; ok { + return + } + + go job.enqueue(c.workerQueue[workerID].depQueue) + workers[workerID] = struct{}{} + } + } + c.mu.Unlock() + + defer func() { c.mu.Lock() - for _, workerID := range depLocal { - c.depLocalQueue[workerID].put(job) - } - c.mu.Unlock() + defer c.mu.Unlock() - c.l.Debug("job is put into dep-local queues", zap.String("job_id", job.Job.ID.String())) - select { - case <-job.pickedUp: - c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String())) - return - case <-timeAfter(c.config.DepsTimeout): + for _, dep := range job.Job.Deps { + c.deleteDepQueue(job, dep) } - } + }() + + c.l.Debug("job is put into dep-local queues", zap.String("job_id", job.Job.ID.String())) - c.l.Debug("job is put into global queue", zap.String("job_id", job.Job.ID.String())) select { - case c.globalQueue <- job: case <-job.pickedUp: + c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String())) + return + case <-timeAfter(c.config.DepsTimeout): } + + go job.enqueue(c.globalQueue) + c.l.Debug("job is put into global queue", zap.String("job_id", job.Job.ID.String())) + + <-job.pickedUp c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String())) } func (c *Scheduler) ScheduleJob(job *api.JobSpec) *PendingJob { + var cached bool + c.mu.Lock() pendingJob, running := c.pendingJobs[job.ID] if !running { @@ -214,12 +227,13 @@ func (c *Scheduler) ScheduleJob(job *api.JobSpec) *PendingJob { } c.pendingJobs[job.ID] = pendingJob + cached = c.enqueueCacheLocal(pendingJob) } c.mu.Unlock() if !running { c.l.Debug("job is scheduled", zap.String("job_id", job.ID.String())) - go c.doScheduleJob(pendingJob) + go c.doScheduleJob(pendingJob, cached) } else { c.l.Debug("job is pending", zap.String("job_id", job.ID.String())) } @@ -227,50 +241,37 @@ func (c *Scheduler) ScheduleJob(job *api.JobSpec) *PendingJob { return pendingJob } -func (c *Scheduler) PickJob(workerID api.WorkerID, canceled <-chan struct{}) *PendingJob { +func (c *Scheduler) PickJob(ctx context.Context, workerID api.WorkerID) *PendingJob { c.l.Debug("picking next job", zap.String("worker_id", workerID.String())) - var cacheLocal, depLocal *jobQueue - c.mu.Lock() - cacheLocal = c.cacheLocalQueue[workerID] - depLocal = c.depLocalQueue[workerID] + local := c.workerQueue[workerID] c.mu.Unlock() - for { - job := cacheLocal.pop() - if job == nil { - break - } - - if job.pickUp() { - c.l.Debug("picked job from cache-local queue", zap.String("worker_id", workerID.String()), zap.String("job_id", job.Job.ID.String())) - return job - } - } - - for { - job := depLocal.pop() - if job == nil { - break - } - - if job.pickUp() { - c.l.Debug("picked job from dep-local queue", zap.String("worker_id", workerID.String()), zap.String("job_id", job.Job.ID.String())) - return job - } - } + var pg *PendingJob + var queue string for { select { - case job := <-c.globalQueue: - if job.pickUp() { - c.l.Debug("picked job from global queue", zap.String("worker_id", workerID.String()), zap.String("job_id", job.Job.ID.String())) - return job - } - - case <-canceled: + case pg = <-c.globalQueue: + queue = "global" + case pg = <-local.depQueue: + queue = "dep" + case pg = <-local.cacheQueue: + queue = "cache" + case <-ctx.Done(): return nil } + + if pg.pickUp() { + break + } } + + c.l.Debug("picked job", + zap.String("worker_id", workerID.String()), + zap.String("job_id", pg.Job.ID.String()), + zap.String("queue", queue)) + + return pg } diff --git a/distbuild/pkg/scheduler/scheduler_test.go b/distbuild/pkg/scheduler/scheduler_test.go index 205dfb0..9ccca3b 100644 --- a/distbuild/pkg/scheduler/scheduler_test.go +++ b/distbuild/pkg/scheduler/scheduler_test.go @@ -1,113 +1,137 @@ -package scheduler +package scheduler_test import ( + "context" "testing" "time" "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/goleak" "go.uber.org/zap/zaptest" "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/build" + "gitlab.com/slon/shad-go/distbuild/pkg/scheduler" ) const ( workerID0 api.WorkerID = "w0" ) -func TestScheduler(t *testing.T) { - defer goleak.VerifyNone(t) - - clock := clockwork.NewFakeClock() - timeAfter = clock.After - defer func() { timeAfter = time.After }() - - config := Config{ +var ( + config = scheduler.Config{ CacheTimeout: time.Second, DepsTimeout: time.Minute, } +) - t.Run("SingleJob", func(t *testing.T) { - s := NewScheduler(zaptest.NewLogger(t), config) - - job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} - pendingJob0 := s.ScheduleJob(job0) - - s.RegisterWorker(workerID0) - pickerJob := s.PickJob(workerID0, nil) - - require.Equal(t, pendingJob0, pickerJob) - - result := &api.JobResult{ID: job0.ID, ExitCode: 0} - s.OnJobComplete(workerID0, job0.ID, result) - - select { - case <-pendingJob0.Finished: - require.Equal(t, pendingJob0.Result, result) - - default: - t.Fatalf("job0 is not finished") - } - }) - - t.Run("PickJobTimeout", func(t *testing.T) { - s := NewScheduler(zaptest.NewLogger(t), config) - - canceled := make(chan struct{}) - close(canceled) - - s.RegisterWorker(workerID0) - require.Nil(t, s.PickJob(workerID0, canceled)) - }) - - t.Run("CacheLocalScheduling", func(t *testing.T) { - s := NewScheduler(zaptest.NewLogger(t), config) - - job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} - job1 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} - - s.RegisterWorker(workerID0) - s.OnJobComplete(workerID0, job0.ID, &api.JobResult{}) - - pendingJob1 := s.ScheduleJob(job1) - pendingJob0 := s.ScheduleJob(job0) - - // job0 scheduling should be blocked on CacheTimeout - clock.BlockUntil(1) - - pickedJob := s.PickJob(workerID0, nil) - require.Equal(t, pendingJob0, pickedJob) - - pickedJob = s.PickJob(workerID0, nil) - require.Equal(t, pendingJob1, pickedJob) - - clock.Advance(time.Hour) - }) - - t.Run("DependencyLocalScheduling", func(t *testing.T) { - s := NewScheduler(zaptest.NewLogger(t), config) - - job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} - job1 := &api.JobSpec{Job: build.Job{ID: build.NewID(), Deps: []build.ID{job0.ID}}} - job2 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} - - s.RegisterWorker(workerID0) - s.OnJobComplete(workerID0, job0.ID, &api.JobResult{}) - - pendingJob2 := s.ScheduleJob(job2) - pendingJob1 := s.ScheduleJob(job1) - - // job1 should be blocked on DepsTimeout - clock.BlockUntil(1) - - pickedJob := s.PickJob(workerID0, nil) - require.Equal(t, pendingJob1, pickedJob) - - pickedJob = s.PickJob(workerID0, nil) - require.Equal(t, pendingJob2, pickedJob) - - clock.Advance(time.Hour) - }) +type testScheduler struct { + *scheduler.Scheduler + clockwork.FakeClock +} + +func newTestScheduler(t *testing.T) *testScheduler { + log := zaptest.NewLogger(t) + + s := &testScheduler{ + FakeClock: clockwork.NewFakeClock(), + Scheduler: scheduler.NewScheduler(log, config), + } + + *scheduler.TimeAfter = s.FakeClock.After + return s +} + +func (s *testScheduler) stop(t *testing.T) { + *scheduler.TimeAfter = time.After + goleak.VerifyNone(t) +} + +func TestScheduler_SingleJob(t *testing.T) { + s := newTestScheduler(t) + defer s.stop(t) + + job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} + pendingJob0 := s.ScheduleJob(job0) + + s.BlockUntil(1) + s.Advance(config.DepsTimeout) // At this point job must be in global queue. + + s.RegisterWorker(workerID0) + pickerJob := s.PickJob(context.Background(), workerID0) + + require.Equal(t, pendingJob0, pickerJob) + + result := &api.JobResult{ID: job0.ID, ExitCode: 0} + s.OnJobComplete(workerID0, job0.ID, result) + + select { + case <-pendingJob0.Finished: + require.Equal(t, pendingJob0.Result, result) + + default: + t.Fatalf("job0 is not finished") + } +} + +func TestScheduler_PickJobCancelation(t *testing.T) { + s := newTestScheduler(t) + defer s.stop(t) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + s.RegisterWorker(workerID0) + require.Nil(t, s.PickJob(ctx, workerID0)) +} + +func TestScheduler_CacheLocalScheduling(t *testing.T) { + s := newTestScheduler(t) + defer s.stop(t) + + cachedJob := &api.JobSpec{Job: build.Job{ID: build.NewID()}} + uncachedJob := &api.JobSpec{Job: build.Job{ID: build.NewID()}} + + s.RegisterWorker(workerID0) + s.OnJobComplete(workerID0, cachedJob.ID, &api.JobResult{}) + + pendingUncachedJob := s.ScheduleJob(uncachedJob) + pendingCachedJob := s.ScheduleJob(cachedJob) + + s.BlockUntil(2) // both jobs should be blocked + + firstPickedJob := s.PickJob(context.Background(), workerID0) + assert.Equal(t, pendingCachedJob, firstPickedJob) + + s.Advance(config.DepsTimeout) // At this point uncachedJob is put into global queue. + + secondPickedJob := s.PickJob(context.Background(), workerID0) + assert.Equal(t, pendingUncachedJob, secondPickedJob) +} + +func TestScheduler_DependencyLocalScheduling(t *testing.T) { + s := newTestScheduler(t) + defer s.stop(t) + + job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} + s.RegisterWorker(workerID0) + s.OnJobComplete(workerID0, job0.ID, &api.JobResult{}) + + job1 := &api.JobSpec{Job: build.Job{ID: build.NewID(), Deps: []build.ID{job0.ID}}} + job2 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} + + pendingJob2 := s.ScheduleJob(job2) + pendingJob1 := s.ScheduleJob(job1) + + s.BlockUntil(2) // both jobs should be blocked on DepsTimeout + + firstPickedJob := s.PickJob(context.Background(), workerID0) + require.Equal(t, pendingJob1, firstPickedJob) + + s.Advance(config.DepsTimeout) // At this point job2 is put into global queue. + + secondPickedJob := s.PickJob(context.Background(), workerID0) + require.Equal(t, pendingJob2, secondPickedJob) } diff --git a/distbuild/pkg/worker/download.go b/distbuild/pkg/worker/download.go index aaa73b2..bd27b99 100644 --- a/distbuild/pkg/worker/download.go +++ b/distbuild/pkg/worker/download.go @@ -4,11 +4,13 @@ import ( "context" "errors" + "gitlab.com/slon/shad-go/distbuild/pkg/api" + "gitlab.com/slon/shad-go/distbuild/pkg/artifact" "gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/filecache" ) -func (w *Worker) pullFiles(ctx context.Context, files map[build.ID]string) error { +func (w *Worker) downloadFiles(ctx context.Context, files map[build.ID]string) error { for id := range files { _, unlock, err := w.fileCache.Get(id) if errors.Is(err, filecache.ErrNotFound) { @@ -24,3 +26,20 @@ func (w *Worker) pullFiles(ctx context.Context, files map[build.ID]string) error return nil } + +func (w *Worker) downloadArtifacts(ctx context.Context, artifacts map[build.ID]api.WorkerID) error { + for id, worker := range artifacts { + _, unlock, err := w.artifacts.Get(id) + if errors.Is(err, artifact.ErrNotFound) { + if err = artifact.Download(ctx, worker.String(), w.artifacts, id); err != nil { + return err + } + } else if err != nil { + return err + } else { + unlock() + } + } + + return nil +} diff --git a/distbuild/pkg/worker/job.go b/distbuild/pkg/worker/job.go index 8c2334c..c8f2b48 100644 --- a/distbuild/pkg/worker/job.go +++ b/distbuild/pkg/worker/job.go @@ -165,7 +165,11 @@ func (w *Worker) runJob(ctx context.Context, spec *api.JobSpec) (*api.JobResult, return res, nil } - if err = w.pullFiles(ctx, spec.SourceFiles); err != nil { + if err = w.downloadFiles(ctx, spec.SourceFiles); err != nil { + return nil, err + } + + if err := w.downloadArtifacts(ctx, spec.Artifacts); err != nil { return nil, err }