diff --git a/distbuild/disttest/fixture.go b/distbuild/disttest/fixture.go index cc6c321..610c9c0 100644 --- a/distbuild/disttest/fixture.go +++ b/distbuild/disttest/fixture.go @@ -34,7 +34,7 @@ type env struct { HTTP *http.Server } -const nWorkers = 4 +const nWorkers = 1 func newEnv(t *testing.T) (e *env, cancel func()) { cwd, err := os.Getwd() diff --git a/distbuild/disttest/simple_test.go b/distbuild/disttest/simple_test.go deleted file mode 100644 index 12b2d18..0000000 --- a/distbuild/disttest/simple_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package disttest - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "gitlab.com/slon/shad-go/distbuild/pkg/build" -) - -var echoGraph = build.Graph{ - Jobs: []build.Job{ - { - ID: build.ID{'a'}, - Name: "echo", - Cmds: []build.Cmd{ - {Exec: []string{"echo", "OK"}}, - }, - }, - }, -} - -func TestSingleCommand(t *testing.T) { - env, cancel := newEnv(t) - defer cancel() - - recorder := NewRecorder() - require.NoError(t, env.Client.Build(env.Ctx, echoGraph, recorder)) - - assert.Len(t, recorder.Jobs, 1) - assert.Equal(t, &JobResult{Stdout: "OK\n", Code: new(int)}, recorder.Jobs[build.ID{'a'}]) -} diff --git a/distbuild/disttest/single_worker_test.go b/distbuild/disttest/single_worker_test.go new file mode 100644 index 0000000..35cb6bc --- /dev/null +++ b/distbuild/disttest/single_worker_test.go @@ -0,0 +1,72 @@ +package disttest + +import ( + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" +) + +var echoGraph = build.Graph{ + Jobs: []build.Job{ + { + ID: build.ID{'a'}, + Name: "echo", + Cmds: []build.Cmd{ + {Exec: []string{"echo", "OK"}}, + }, + }, + }, +} + +func TestSingleCommand(t *testing.T) { + env, cancel := newEnv(t) + defer cancel() + + recorder := NewRecorder() + require.NoError(t, env.Client.Build(env.Ctx, echoGraph, recorder)) + + assert.Len(t, recorder.Jobs, 1) + assert.Equal(t, &JobResult{Stdout: "OK\n", Code: new(int)}, recorder.Jobs[build.ID{'a'}]) +} + +func TestJobCaching(t *testing.T) { + env, cancel := newEnv(t) + defer cancel() + + tmpFile, err := ioutil.TempFile("", "") + require.NoError(t, err) + + graph := build.Graph{ + Jobs: []build.Job{ + { + ID: build.ID{'a'}, + Name: "echo", + Cmds: []build.Cmd{ + {CatTemplate: "OK\n", CatOutput: tmpFile.Name()}, // No-hermetic, for testing purposes. + {Exec: []string{"echo", "OK"}}, + }, + }, + }, + } + + recorder := NewRecorder() + require.NoError(t, env.Client.Build(env.Ctx, graph, recorder)) + + assert.Len(t, recorder.Jobs, 1) + assert.Equal(t, &JobResult{Stdout: "OK\n", Code: new(int)}, recorder.Jobs[build.ID{'a'}]) + + // Second build must get results from cache. + require.NoError(t, env.Client.Build(env.Ctx, graph, NewRecorder())) + + output, err := ioutil.ReadAll(tmpFile) + require.NoError(t, err) + require.Equal(t, []byte("OK\n"), output) +} + +func TestSourceFiles(t *testing.T) { + +} diff --git a/distbuild/pkg/dist/state.go b/distbuild/pkg/dist/state.go index 32fc789..c5a0323 100644 --- a/distbuild/pkg/dist/state.go +++ b/distbuild/pkg/dist/state.go @@ -11,5 +11,5 @@ type Cluster struct { } func (c *Cluster) FindOptimalWorkers(task build.ID, sources, deps []build.ID) []proto.WorkerID { - + panic("implement me") } diff --git a/distbuild/pkg/worker/job.go b/distbuild/pkg/worker/job.go new file mode 100644 index 0000000..2a03cee --- /dev/null +++ b/distbuild/pkg/worker/job.go @@ -0,0 +1,271 @@ +package worker + +import ( + "bytes" + "context" + "errors" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strconv" + + "go.uber.org/zap" + + "gitlab.com/slon/shad-go/distbuild/pkg/artifact" + "gitlab.com/slon/shad-go/distbuild/pkg/build" + "gitlab.com/slon/shad-go/distbuild/pkg/proto" +) + +const ( + outputDirName = "output" + srcDirName = "src" + exitCodeFileName = "exit_code" + stdoutFileName = "stdout" + stderrFileName = "stderr" +) + +func (w *Worker) getJobFromCache(jobID build.ID) (*proto.JobResult, error) { + aRoot, unlock, err := w.artifacts.Get(jobID) + if err != nil { + return nil, err + } + defer unlock() + + res := &proto.JobResult{} + + exitCodeStr, err := ioutil.ReadFile(filepath.Join(aRoot, exitCodeFileName)) + if err != nil { + return nil, err + } + + res.ExitCode, err = strconv.Atoi(string(exitCodeStr)) + if err != nil { + return nil, err + } + + res.Stdout, err = ioutil.ReadFile(filepath.Join(aRoot, stdoutFileName)) + if err != nil { + return nil, err + } + + res.Stderr, err = ioutil.ReadFile(filepath.Join(aRoot, stderrFileName)) + if err != nil { + return nil, err + } + + return res, nil +} + +func executeCmd(ctx context.Context, cmd *build.Cmd) (stdout, stderr []byte, exitCode int, err error) { + var stdoutBuf, stderrBuf bytes.Buffer + + if cmd.CatOutput != "" { + err = ioutil.WriteFile(cmd.CatOutput, []byte(cmd.CatTemplate), 0666) + return + } else { + p := exec.CommandContext(ctx, cmd.Exec[0], cmd.Exec[1:]...) + p.Dir = cmd.WorkingDirectory + p.Env = cmd.Environ + p.Stdout = &stdoutBuf + p.Stderr = &stderrBuf + + err = p.Run() + + stdout = stdoutBuf.Bytes() + stderr = stderrBuf.Bytes() + + if err != nil { + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + exitCode = exitErr.ExitCode() + err = nil + } + } + return + } +} + +func (w *Worker) prepareSourceDir(sourceDir string, sourceFiles map[build.ID]string) (unlock func(), err error) { + var unlocks []func() + doUnlock := func() { + for _, u := range unlocks { + u() + } + } + + defer func() { + if doUnlock != nil { + doUnlock() + } + }() + + for id, path := range sourceFiles { + dir, _ := filepath.Split(path) + if dir != "" { + if err := os.MkdirAll(filepath.Join(sourceDir, dir), 0777); err != nil { + return nil, err + } + } + + cached, unlock, err := w.fileCache.Get(id) + if err != nil { + return nil, err + } + unlocks = append(unlocks, unlock) + + if err := os.Link(cached, filepath.Join(sourceDir, path)); err != nil { + return nil, err + } + } + + unlock = doUnlock + doUnlock = nil + return +} + +func (w *Worker) lockDeps(deps []build.ID) (paths map[build.ID]string, unlock func(), err error) { + var unlocks []func() + doUnlock := func() { + for _, u := range unlocks { + u() + } + } + + defer func() { + if doUnlock != nil { + doUnlock() + } + }() + + paths = make(map[build.ID]string) + + for _, id := range deps { + path, unlock, err := w.artifacts.Get(id) + if err != nil { + return nil, nil, err + } + unlocks = append(unlocks, unlock) + + paths[id] = filepath.Join(path, outputDirName) + } + + unlock = doUnlock + doUnlock = nil + return +} + +func (w *Worker) runJob(ctx context.Context, spec *proto.JobSpec) (*proto.JobResult, error) { + res, err := w.getJobFromCache(spec.Job.ID) + if err != nil && !errors.Is(err, artifact.ErrNotFound) { + return nil, err + } else if err == nil { + return res, nil + } + + aRoot, commit, abort, err := w.artifacts.Create(spec.Job.ID) + if err != nil { + return nil, err + } + + defer func() { + if abort == nil { + return + } + + if err := abort(); err != nil { + w.log.Warn("error aborting job", zap.Any("job_id", spec.Job.ID), zap.Error(err)) + } + }() + + outputDir := filepath.Join(aRoot, outputDirName) + if err := os.Mkdir(outputDir, 0777); err != nil { + return nil, err + } + + sourceDir := filepath.Join(aRoot, srcDirName) + if err := os.Mkdir(sourceDir, 0777); err != nil { + return nil, err + } + + stdoutFile, err := os.Create(filepath.Join(aRoot, stdoutFileName)) + if err != nil { + return nil, err + } + defer stdoutFile.Close() + + stderrFile, err := os.Create(filepath.Join(aRoot, stderrFileName)) + if err != nil { + return nil, err + } + defer stderrFile.Close() + + jobContext := build.JobContext{ + OutputDir: outputDir, + SourceDir: sourceDir, + } + + var unlock []func() + defer func() { + for _, u := range unlock { + u() + } + }() + + unlockSourceFiles, err := w.prepareSourceDir(sourceDir, spec.SourceFiles) + if err != nil { + return nil, err + } + unlock = append(unlock, unlockSourceFiles) + + deps, unlockDeps, err := w.lockDeps(spec.Job.Deps) + if err != nil { + return nil, err + } + unlock = append(unlock, unlockDeps) + jobContext.Deps = deps + + res = &proto.JobResult{ + ID: spec.Job.ID, + } + + for _, cmd := range spec.Job.Cmds { + cmd, err := cmd.Render(jobContext) + if err != nil { + return nil, err + } + + stdout, stderr, exitCode, err := executeCmd(ctx, cmd) + if err != nil { + return nil, err + } + + res.Stdout = append(res.Stdout, stdout...) + _, err = stdoutFile.Write(stdout) + if err != nil { + return nil, err + } + + res.Stderr = append(res.Stderr, stderr...) + _, err = stderrFile.Write(stderr) + if err != nil { + return nil, err + } + + if exitCode != 0 { + res.ExitCode = exitCode + break + } + } + + if err := ioutil.WriteFile(filepath.Join(aRoot, exitCodeFileName), []byte(strconv.Itoa(res.ExitCode)), 0666); err != nil { + return nil, err + } + + abort = nil + if err := commit(); err != nil { + return nil, err + } + + return res, nil +} diff --git a/distbuild/pkg/worker/worker.go b/distbuild/pkg/worker/worker.go index df42e20..91a5c9b 100644 --- a/distbuild/pkg/worker/worker.go +++ b/distbuild/pkg/worker/worker.go @@ -7,7 +7,6 @@ import ( "fmt" "io/ioutil" "net/http" - "os/exec" "sync" "go.uber.org/zap" @@ -117,29 +116,15 @@ func (w *Worker) Run(ctx context.Context) error { w.log.Debug("received heartbeat response", zap.Int("num_jobs", len(rsp.JobsToRun))) - for _, job := range rsp.JobsToRun { - var finished proto.JobResult - finished.ID = job.Job.ID - - var stdout bytes.Buffer - var stderr bytes.Buffer - - for _, jobCmd := range job.Job.Cmds { - cmd := exec.Command(jobCmd.Exec[0], jobCmd.Exec[1:]...) - cmd.Stdout = &stdout - cmd.Stderr = &stderr - - if err := cmd.Run(); err != nil { - errorString := err.Error() - finished.Error = &errorString - finished.ExitCode = cmd.ProcessState.ExitCode() - break - } + for _, spec := range rsp.JobsToRun { + result, err := w.runJob(ctx, &spec) + if err != nil { + errStr := err.Error() + w.jobFinished(&proto.JobResult{ID: spec.Job.ID, Error: &errStr}) + continue } - finished.Stdout = stdout.Bytes() - finished.Stderr = stderr.Bytes() - w.jobFinished(&finished) + w.jobFinished(result) } } }