Add artifact transfer test

This commit is contained in:
Fedor Korotkiy 2020-04-05 15:00:33 +03:00
parent e56f1df9ba
commit a28ac30f2e
6 changed files with 106 additions and 9 deletions

View file

@ -37,10 +37,13 @@ type env struct {
const ( const (
logToStderr = true logToStderr = true
nWorkers = 1
) )
func newEnv(t *testing.T) (e *env, cancel func()) { type Config struct {
WorkerCount int
}
func newEnv(t *testing.T, config *Config) (e *env, cancel func()) {
cwd, err := os.Getwd() cwd, err := os.Getwd()
require.NoError(t, err) require.NoError(t, err)
@ -91,7 +94,7 @@ func newEnv(t *testing.T) (e *env, cancel func()) {
router := http.NewServeMux() router := http.NewServeMux()
router.Handle("/coordinator/", http.StripPrefix("/coordinator", env.Coordinator)) router.Handle("/coordinator/", http.StripPrefix("/coordinator", env.Coordinator))
for i := 0; i < nWorkers; i++ { for i := 0; i < config.WorkerCount; i++ {
workerName := fmt.Sprintf("worker%d", i) workerName := fmt.Sprintf("worker%d", i)
workerDir := filepath.Join(env.RootDir, workerName) workerDir := filepath.Join(env.RootDir, workerName)

View file

@ -11,6 +11,8 @@ import (
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
) )
var singleWorkerConfig = &Config{WorkerCount: 1}
var echoGraph = build.Graph{ var echoGraph = build.Graph{
Jobs: []build.Job{ Jobs: []build.Job{
{ {
@ -24,7 +26,7 @@ var echoGraph = build.Graph{
} }
func TestSingleCommand(t *testing.T) { func TestSingleCommand(t *testing.T) {
env, cancel := newEnv(t) env, cancel := newEnv(t, singleWorkerConfig)
defer cancel() defer cancel()
recorder := NewRecorder() recorder := NewRecorder()
@ -35,7 +37,7 @@ func TestSingleCommand(t *testing.T) {
} }
func TestJobCaching(t *testing.T) { func TestJobCaching(t *testing.T) {
env, cancel := newEnv(t) env, cancel := newEnv(t, singleWorkerConfig)
defer cancel() defer cancel()
tmpFile, err := ioutil.TempFile("", "") tmpFile, err := ioutil.TempFile("", "")
@ -92,7 +94,7 @@ var sourceFilesGraph = build.Graph{
} }
func TestSourceFiles(t *testing.T) { func TestSourceFiles(t *testing.T) {
env, cancel := newEnv(t) env, cancel := newEnv(t, singleWorkerConfig)
defer cancel() defer cancel()
recorder := NewRecorder() recorder := NewRecorder()
@ -123,7 +125,7 @@ var artifactTransferGraph = build.Graph{
} }
func TestArtifactTransferBetweenJobs(t *testing.T) { func TestArtifactTransferBetweenJobs(t *testing.T) {
env, cancel := newEnv(t) env, cancel := newEnv(t, singleWorkerConfig)
defer cancel() defer cancel()
recorder := NewRecorder() recorder := NewRecorder()

View file

@ -0,0 +1,64 @@
package disttest
import (
"fmt"
"os"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
var threeWorkerConfig = &Config{WorkerCount: 3}
func TestArtifactTransferBetweenWorkers(t *testing.T) {
env, cancel := newEnv(t, threeWorkerConfig)
defer cancel()
baseJob := build.Job{
ID: build.ID{'a'},
Name: "write",
Cmds: []build.Cmd{
{CatTemplate: "OK", CatOutput: "{{.OutputDir}}/out.txt"},
},
}
var wg sync.WaitGroup
wg.Add(3)
startTime := time.Now()
for i := 0; i < 3; i++ {
depJobID := build.ID{'b', byte(i)}
depJob := build.Job{
ID: depJobID,
Name: "cat",
Cmds: []build.Cmd{
{Exec: []string{"cat", fmt.Sprintf("{{index .Deps %q}}/out.txt", build.ID{'a'})}},
{Exec: []string{"sleep", "1"}, Environ: os.Environ()}, // DepTimeout is 100ms.
},
Deps: []build.ID{{'a'}},
}
graph := build.Graph{Jobs: []build.Job{baseJob, depJob}}
go func() {
defer wg.Done()
recorder := NewRecorder()
if !assert.NoError(t, env.Client.Build(env.Ctx, graph, recorder)) {
return
}
assert.Len(t, recorder.Jobs, 2)
assert.Equal(t, &JobResult{Stdout: "OK", Code: new(int)}, recorder.Jobs[depJobID])
}()
}
wg.Wait()
testDuration := time.Since(startTime)
assert.True(t, testDuration < time.Second*2, "test duration should be less than 2 seconds")
}

View file

@ -62,11 +62,25 @@ func (b *Build) Run(ctx context.Context, w api.StatusWriter) error {
b.l.Debug("file upload completed") b.l.Debug("file upload completed")
for _, job := range b.Graph.Jobs { for _, job := range b.Graph.Jobs {
spec := api.JobSpec{Job: job, SourceFiles: make(map[build.ID]string)} spec := api.JobSpec{
Job: job,
SourceFiles: make(map[build.ID]string),
Artifacts: make(map[build.ID]api.WorkerID),
}
for _, file := range job.Inputs { for _, file := range job.Inputs {
spec.SourceFiles[b.reverseFiles[file]] = file spec.SourceFiles[b.reverseFiles[file]] = file
} }
for _, id := range job.Deps {
workerID, ok := b.c.scheduler.LocateArtifact(id)
if !ok {
return fmt.Errorf("artifact %q is missing in cache", id)
}
spec.Artifacts[id] = workerID
}
s := b.c.scheduler.ScheduleJob(&spec) s := b.c.scheduler.ScheduleJob(&spec)
select { select {

View file

@ -84,6 +84,17 @@ func NewScheduler(l *zap.Logger, config Config) *Scheduler {
} }
} }
func (c *Scheduler) LocateArtifact(id build.ID) (api.WorkerID, bool) {
c.mu.Lock()
defer c.mu.Unlock()
for id := range c.cachedJobs[id] {
return id, true
}
return "", false
}
func (c *Scheduler) RegisterWorker(workerID api.WorkerID) { func (c *Scheduler) RegisterWorker(workerID api.WorkerID) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()

View file

@ -43,7 +43,7 @@ func New(
fileCache *filecache.Cache, fileCache *filecache.Cache,
artifacts *artifact.Cache, artifacts *artifact.Cache,
) *Worker { ) *Worker {
return &Worker{ w := &Worker{
id: workerID, id: workerID,
coordinatorEndpoint: coordinatorEndpoint, coordinatorEndpoint: coordinatorEndpoint,
log: log, log: log,
@ -56,6 +56,9 @@ func New(
mux: http.NewServeMux(), mux: http.NewServeMux(),
} }
artifact.NewHandler(w.log, w.artifacts).Register(w.mux)
return w
} }
func (w *Worker) ServeHTTP(rw http.ResponseWriter, r *http.Request) { func (w *Worker) ServeHTTP(rw http.ResponseWriter, r *http.Request) {