shad-go/smartsched/scheduler_test.go
2023-10-03 20:25:41 +03:00

147 lines
3.6 KiB
Go

package scheduler_test
import (
"context"
"testing"
"time"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"go.uber.org/zap/zaptest"
"gitlab.com/manytask/itmo-go/public/distbuild/pkg/api"
"gitlab.com/manytask/itmo-go/public/distbuild/pkg/build"
"gitlab.com/manytask/itmo-go/public/distbuild/pkg/scheduler"
)
const (
workerID0 api.WorkerID = "w0"
)
var config = scheduler.Config{
CacheTimeout: time.Second,
DepsTimeout: time.Minute,
}
type testScheduler struct {
*scheduler.Scheduler
clockwork.FakeClock
reset chan struct{}
}
func newTestScheduler(t *testing.T) *testScheduler {
log := zaptest.NewLogger(t)
s := &testScheduler{
FakeClock: clockwork.NewFakeClock(),
Scheduler: scheduler.NewScheduler(log, config),
reset: make(chan struct{}),
}
go func() {
select {
case <-time.After(time.Second * 5):
panic("test hang")
case <-s.reset:
return
}
}()
scheduler.TimeAfter = s.FakeClock.After
return s
}
func (s *testScheduler) stop(t *testing.T) {
close(s.reset)
scheduler.TimeAfter = time.After
goleak.VerifyNone(t)
}
func TestScheduler_SingleJob(t *testing.T) {
s := newTestScheduler(t)
defer s.stop(t)
job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}}
pendingJob0 := s.ScheduleJob(job0)
s.BlockUntil(1)
s.Advance(config.DepsTimeout) // At this point job must be in global queue.
s.RegisterWorker(workerID0)
pickerJob := s.PickJob(context.Background(), workerID0)
require.Equal(t, pendingJob0, pickerJob)
result := &api.JobResult{ID: job0.ID, ExitCode: 0}
s.OnJobComplete(workerID0, job0.ID, result)
select {
case <-pendingJob0.Finished:
require.Equal(t, pendingJob0.Result, result)
default:
t.Fatalf("job0 is not finished")
}
}
func TestScheduler_PickJobCancelation(t *testing.T) {
s := newTestScheduler(t)
defer s.stop(t)
ctx, cancel := context.WithCancel(context.Background())
cancel()
s.RegisterWorker(workerID0)
require.Nil(t, s.PickJob(ctx, workerID0))
}
func TestScheduler_CacheLocalScheduling(t *testing.T) {
s := newTestScheduler(t)
defer s.stop(t)
cachedJob := &api.JobSpec{Job: build.Job{ID: build.NewID()}}
uncachedJob := &api.JobSpec{Job: build.Job{ID: build.NewID()}}
s.RegisterWorker(workerID0)
s.OnJobComplete(workerID0, cachedJob.ID, &api.JobResult{})
pendingUncachedJob := s.ScheduleJob(uncachedJob)
pendingCachedJob := s.ScheduleJob(cachedJob)
s.BlockUntil(2) // both jobs should be blocked
firstPickedJob := s.PickJob(context.Background(), workerID0)
assert.Equal(t, pendingCachedJob, firstPickedJob)
s.Advance(config.DepsTimeout) // At this point uncachedJob is put into global queue.
secondPickedJob := s.PickJob(context.Background(), workerID0)
assert.Equal(t, pendingUncachedJob, secondPickedJob)
}
func TestScheduler_DependencyLocalScheduling(t *testing.T) {
s := newTestScheduler(t)
defer s.stop(t)
job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}}
s.RegisterWorker(workerID0)
s.OnJobComplete(workerID0, job0.ID, &api.JobResult{})
job1 := &api.JobSpec{Job: build.Job{ID: build.NewID(), Deps: []build.ID{job0.ID}}}
job2 := &api.JobSpec{Job: build.Job{ID: build.NewID()}}
pendingJob2 := s.ScheduleJob(job2)
pendingJob1 := s.ScheduleJob(job1)
s.BlockUntil(2) // both jobs should be blocked on DepsTimeout
firstPickedJob := s.PickJob(context.Background(), workerID0)
require.Equal(t, pendingJob1, firstPickedJob)
s.Advance(config.DepsTimeout) // At this point job2 is put into global queue.
secondPickedJob := s.PickJob(context.Background(), workerID0)
require.Equal(t, pendingJob2, secondPickedJob)
}