From a28ac30f2ed8e8cc44b8dd8981fba9d4e985b84d Mon Sep 17 00:00:00 2001 From: Fedor Korotkiy Date: Sun, 5 Apr 2020 15:00:33 +0300 Subject: [PATCH] Add artifact transfer test --- distbuild/disttest/fixture.go | 9 ++-- distbuild/disttest/single_worker_test.go | 10 ++-- distbuild/disttest/three_workers_test.go | 64 ++++++++++++++++++++++++ distbuild/pkg/dist/build.go | 16 +++++- distbuild/pkg/scheduler/scheduler.go | 11 ++++ distbuild/pkg/worker/worker.go | 5 +- 6 files changed, 106 insertions(+), 9 deletions(-) create mode 100644 distbuild/disttest/three_workers_test.go diff --git a/distbuild/disttest/fixture.go b/distbuild/disttest/fixture.go index 697109f..7c4ae8f 100644 --- a/distbuild/disttest/fixture.go +++ b/distbuild/disttest/fixture.go @@ -37,10 +37,13 @@ type env struct { const ( logToStderr = true - nWorkers = 1 ) -func newEnv(t *testing.T) (e *env, cancel func()) { +type Config struct { + WorkerCount int +} + +func newEnv(t *testing.T, config *Config) (e *env, cancel func()) { cwd, err := os.Getwd() require.NoError(t, err) @@ -91,7 +94,7 @@ func newEnv(t *testing.T) (e *env, cancel func()) { router := http.NewServeMux() router.Handle("/coordinator/", http.StripPrefix("/coordinator", env.Coordinator)) - for i := 0; i < nWorkers; i++ { + for i := 0; i < config.WorkerCount; i++ { workerName := fmt.Sprintf("worker%d", i) workerDir := filepath.Join(env.RootDir, workerName) diff --git a/distbuild/disttest/single_worker_test.go b/distbuild/disttest/single_worker_test.go index 1aad054..0906060 100644 --- a/distbuild/disttest/single_worker_test.go +++ b/distbuild/disttest/single_worker_test.go @@ -11,6 +11,8 @@ import ( "gitlab.com/slon/shad-go/distbuild/pkg/build" ) +var singleWorkerConfig = &Config{WorkerCount: 1} + var echoGraph = build.Graph{ Jobs: []build.Job{ { @@ -24,7 +26,7 @@ var echoGraph = build.Graph{ } func TestSingleCommand(t *testing.T) { - env, cancel := newEnv(t) + env, cancel := newEnv(t, singleWorkerConfig) defer cancel() recorder := NewRecorder() @@ -35,7 +37,7 @@ func TestSingleCommand(t *testing.T) { } func TestJobCaching(t *testing.T) { - env, cancel := newEnv(t) + env, cancel := newEnv(t, singleWorkerConfig) defer cancel() tmpFile, err := ioutil.TempFile("", "") @@ -92,7 +94,7 @@ var sourceFilesGraph = build.Graph{ } func TestSourceFiles(t *testing.T) { - env, cancel := newEnv(t) + env, cancel := newEnv(t, singleWorkerConfig) defer cancel() recorder := NewRecorder() @@ -123,7 +125,7 @@ var artifactTransferGraph = build.Graph{ } func TestArtifactTransferBetweenJobs(t *testing.T) { - env, cancel := newEnv(t) + env, cancel := newEnv(t, singleWorkerConfig) defer cancel() recorder := NewRecorder() diff --git a/distbuild/disttest/three_workers_test.go b/distbuild/disttest/three_workers_test.go new file mode 100644 index 0000000..42dddf0 --- /dev/null +++ b/distbuild/disttest/three_workers_test.go @@ -0,0 +1,64 @@ +package disttest + +import ( + "fmt" + "os" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" +) + +var threeWorkerConfig = &Config{WorkerCount: 3} + +func TestArtifactTransferBetweenWorkers(t *testing.T) { + env, cancel := newEnv(t, threeWorkerConfig) + defer cancel() + + baseJob := build.Job{ + ID: build.ID{'a'}, + Name: "write", + Cmds: []build.Cmd{ + {CatTemplate: "OK", CatOutput: "{{.OutputDir}}/out.txt"}, + }, + } + + var wg sync.WaitGroup + wg.Add(3) + + startTime := time.Now() + + for i := 0; i < 3; i++ { + depJobID := build.ID{'b', byte(i)} + depJob := build.Job{ + ID: depJobID, + Name: "cat", + Cmds: []build.Cmd{ + {Exec: []string{"cat", fmt.Sprintf("{{index .Deps %q}}/out.txt", build.ID{'a'})}}, + {Exec: []string{"sleep", "1"}, Environ: os.Environ()}, // DepTimeout is 100ms. + }, + Deps: []build.ID{{'a'}}, + } + + graph := build.Graph{Jobs: []build.Job{baseJob, depJob}} + go func() { + defer wg.Done() + + recorder := NewRecorder() + if !assert.NoError(t, env.Client.Build(env.Ctx, graph, recorder)) { + return + } + + assert.Len(t, recorder.Jobs, 2) + assert.Equal(t, &JobResult{Stdout: "OK", Code: new(int)}, recorder.Jobs[depJobID]) + }() + } + + wg.Wait() + + testDuration := time.Since(startTime) + assert.True(t, testDuration < time.Second*2, "test duration should be less than 2 seconds") +} diff --git a/distbuild/pkg/dist/build.go b/distbuild/pkg/dist/build.go index 933f638..9468f12 100644 --- a/distbuild/pkg/dist/build.go +++ b/distbuild/pkg/dist/build.go @@ -62,11 +62,25 @@ func (b *Build) Run(ctx context.Context, w api.StatusWriter) error { b.l.Debug("file upload completed") for _, job := range b.Graph.Jobs { - spec := api.JobSpec{Job: job, SourceFiles: make(map[build.ID]string)} + spec := api.JobSpec{ + Job: job, + SourceFiles: make(map[build.ID]string), + Artifacts: make(map[build.ID]api.WorkerID), + } + for _, file := range job.Inputs { spec.SourceFiles[b.reverseFiles[file]] = file } + for _, id := range job.Deps { + workerID, ok := b.c.scheduler.LocateArtifact(id) + if !ok { + return fmt.Errorf("artifact %q is missing in cache", id) + } + + spec.Artifacts[id] = workerID + } + s := b.c.scheduler.ScheduleJob(&spec) select { diff --git a/distbuild/pkg/scheduler/scheduler.go b/distbuild/pkg/scheduler/scheduler.go index 87921b9..53cb95e 100644 --- a/distbuild/pkg/scheduler/scheduler.go +++ b/distbuild/pkg/scheduler/scheduler.go @@ -84,6 +84,17 @@ func NewScheduler(l *zap.Logger, config Config) *Scheduler { } } +func (c *Scheduler) LocateArtifact(id build.ID) (api.WorkerID, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + for id := range c.cachedJobs[id] { + return id, true + } + + return "", false +} + func (c *Scheduler) RegisterWorker(workerID api.WorkerID) { c.mu.Lock() defer c.mu.Unlock() diff --git a/distbuild/pkg/worker/worker.go b/distbuild/pkg/worker/worker.go index 6805177..4b990f2 100644 --- a/distbuild/pkg/worker/worker.go +++ b/distbuild/pkg/worker/worker.go @@ -43,7 +43,7 @@ func New( fileCache *filecache.Cache, artifacts *artifact.Cache, ) *Worker { - return &Worker{ + w := &Worker{ id: workerID, coordinatorEndpoint: coordinatorEndpoint, log: log, @@ -56,6 +56,9 @@ func New( mux: http.NewServeMux(), } + + artifact.NewHandler(w.log, w.artifacts).Register(w.mux) + return w } func (w *Worker) ServeHTTP(rw http.ResponseWriter, r *http.Request) {