From 5f0bb90e2cf5e2e6c12cf5a6f715d070c105ca74 Mon Sep 17 00:00:00 2001 From: Fedor Korotkiy Date: Sat, 14 Mar 2020 13:24:44 +0300 Subject: [PATCH] One command running --- distbuild/disttest/fixture.go | 17 +++++- distbuild/disttest/simple_test.go | 10 ++-- distbuild/pkg/client/build.go | 3 ++ distbuild/pkg/dist/coordinator.go | 81 ++++++++++++++++++++++++++++- distbuild/pkg/dist/schedule.go | 68 ++++++++++++++++++++++++ distbuild/pkg/proto/build.go | 8 ++- distbuild/pkg/worker/state.go | 28 ++++++++++ distbuild/pkg/worker/worker.go | 86 ++++++++++++++++++++++++++++--- 8 files changed, 285 insertions(+), 16 deletions(-) create mode 100644 distbuild/pkg/dist/schedule.go create mode 100644 distbuild/pkg/worker/state.go diff --git a/distbuild/disttest/fixture.go b/distbuild/disttest/fixture.go index beba2bb..cc6c321 100644 --- a/distbuild/disttest/fixture.go +++ b/distbuild/disttest/fixture.go @@ -2,6 +2,7 @@ package disttest import ( "context" + "errors" "fmt" "net" "net/http" @@ -116,9 +117,23 @@ func newEnv(t *testing.T) (e *env, cancel func()) { require.NoError(t, err) go func() { - env.Logger.Error("http server stopped", zap.Error(env.HTTP.Serve(lsn))) + err := env.HTTP.Serve(lsn) + if err != http.ErrServerClosed { + env.Logger.Fatal("http server stopped", zap.Error(err)) + } }() + for _, w := range env.Workers { + go func(w *worker.Worker) { + err := w.Run(env.Ctx) + if errors.Is(err, context.Canceled) { + return + } + + env.Logger.Fatal("worker stopped", zap.Error(err)) + }(w) + } + return env, func() { cancelRootContext() _ = env.HTTP.Shutdown(context.Background()) diff --git a/distbuild/disttest/simple_test.go b/distbuild/disttest/simple_test.go index 834d3a7..12b2d18 100644 --- a/distbuild/disttest/simple_test.go +++ b/distbuild/disttest/simple_test.go @@ -15,7 +15,7 @@ var echoGraph = build.Graph{ ID: build.ID{'a'}, Name: "echo", Cmds: []build.Cmd{ - {Exec: []string{"echo", "-n", "OK"}}, + {Exec: []string{"echo", "OK"}}, }, }, }, @@ -25,9 +25,9 @@ func TestSingleCommand(t *testing.T) { env, cancel := newEnv(t) defer cancel() - var recorder Recorder - require.NoError(t, env.Client.Build(env.Ctx, echoGraph, &recorder)) + recorder := NewRecorder() + require.NoError(t, env.Client.Build(env.Ctx, echoGraph, recorder)) - assert.Len(t, len(recorder.Jobs), 1) - assert.Equal(t, &JobResult{Stdout: "OK", Code: new(int)}, recorder.Jobs[build.ID{'a'}]) + assert.Len(t, recorder.Jobs, 1) + assert.Equal(t, &JobResult{Stdout: "OK\n", Code: new(int)}, recorder.Jobs[build.ID{'a'}]) } diff --git a/distbuild/pkg/client/build.go b/distbuild/pkg/client/build.go index 3dc9101..f13c985 100644 --- a/distbuild/pkg/client/build.go +++ b/distbuild/pkg/client/build.go @@ -79,6 +79,9 @@ func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener case update.BuildFailed != nil: return fmt.Errorf("build failed: %s", update.BuildFailed.Error) + case update.BuildFinished != nil: + return nil + case update.JobFinished != nil: jf := update.JobFinished diff --git a/distbuild/pkg/dist/coordinator.go b/distbuild/pkg/dist/coordinator.go index 43a4bf3..d4779fe 100644 --- a/distbuild/pkg/dist/coordinator.go +++ b/distbuild/pkg/dist/coordinator.go @@ -5,6 +5,8 @@ import ( "fmt" "io/ioutil" "net/http" + "sync" + "time" "go.uber.org/zap" @@ -20,6 +22,10 @@ type Coordinator struct { log *zap.Logger mux *http.ServeMux fileCache *filecache.Cache + + mu sync.Mutex + scheduledJobs map[build.ID]*scheduledJob + queue []*scheduledJob } func NewCoordinator( @@ -30,9 +36,12 @@ func NewCoordinator( log: log, mux: http.NewServeMux(), fileCache: fileCache, + + scheduledJobs: make(map[build.ID]*scheduledJob), } c.mux.HandleFunc("/build", c.Build) + c.mux.HandleFunc("/heartbeat", c.Heartbeat) return c } @@ -57,7 +66,22 @@ func (c *Coordinator) doBuild(w http.ResponseWriter, r *http.Request) error { return err } - return fmt.Errorf("coordinator not implemented") + for _, job := range g.Jobs { + job := job + + s := c.scheduleJob(&job) + <-s.done + + c.log.Debug("job finished", zap.String("job_id", job.ID.String())) + + update := proto.StatusUpdate{JobFinished: s.finished} + if err := enc.Encode(update); err != nil { + return err + } + } + + update := proto.StatusUpdate{BuildFinished: &proto.BuildFinished{}} + return enc.Encode(update) } func (c *Coordinator) Build(w http.ResponseWriter, r *http.Request) { @@ -69,3 +93,58 @@ func (c *Coordinator) Build(w http.ResponseWriter, r *http.Request) { _, _ = w.Write(errorJS) } } + +func (c *Coordinator) doHeartbeat(w http.ResponseWriter, r *http.Request) error { + var req proto.HeartbeatRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return fmt.Errorf("invalid request: %w", err) + } + + for _, job := range req.FinishedJob { + job := job + + scheduled, ok := c.lookupJob(job.ID) + if !ok { + continue + } + + c.log.Debug("job finished") + scheduled.finish(&job) + } + + var rsp proto.HeartbeatResponse + + var job *build.Job + for i := 0; i < 10; i++ { + var ok bool + job, ok = c.pickJob() + + if ok { + rsp.JobsToRun = map[build.ID]proto.JobSpec{ + job.ID: {Job: *job}, + } + + break + } + + time.Sleep(time.Millisecond) + } + + if err := json.NewEncoder(w).Encode(rsp); err != nil { + return err + } + + return nil +} + +func (c *Coordinator) Heartbeat(w http.ResponseWriter, r *http.Request) { + c.log.Debug("heartbeat started") + if err := c.doHeartbeat(w, r); err != nil { + c.log.Error("heartbeat failed", zap.Error(err)) + + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(err.Error())) + return + } + c.log.Debug("heartbeat finished") +} diff --git a/distbuild/pkg/dist/schedule.go b/distbuild/pkg/dist/schedule.go new file mode 100644 index 0000000..efaee15 --- /dev/null +++ b/distbuild/pkg/dist/schedule.go @@ -0,0 +1,68 @@ +package dist + +import ( + "sync" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" + "gitlab.com/slon/shad-go/distbuild/pkg/proto" +) + +type scheduledJob struct { + job *build.Job + finished *proto.FinishedJob + + mu sync.Mutex + done chan struct{} +} + +func newScheduledJob(job *build.Job) *scheduledJob { + return &scheduledJob{ + job: job, + done: make(chan struct{}), + } +} + +func (s *scheduledJob) finish(f *proto.FinishedJob) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.finished == nil { + s.finished = f + close(s.done) + } +} + +func (c *Coordinator) scheduleJob(job *build.Job) *scheduledJob { + c.mu.Lock() + defer c.mu.Unlock() + + if scheduled, ok := c.scheduledJobs[job.ID]; ok { + return scheduled + } else { + scheduled = newScheduledJob(job) + c.scheduledJobs[job.ID] = scheduled + c.queue = append(c.queue, scheduled) + return scheduled + } +} + +func (c *Coordinator) pickJob() (*build.Job, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + if len(c.queue) == 0 { + return nil, false + } + + job := c.queue[0].job + c.queue = c.queue[1:] + return job, true +} + +func (c *Coordinator) lookupJob(id build.ID) (*scheduledJob, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + scheduled, ok := c.scheduledJobs[id] + return scheduled, ok +} diff --git a/distbuild/pkg/proto/build.go b/distbuild/pkg/proto/build.go index 9517aa2..163ddd5 100644 --- a/distbuild/pkg/proto/build.go +++ b/distbuild/pkg/proto/build.go @@ -9,10 +9,14 @@ type MissingSources struct { } type StatusUpdate struct { - JobFinished *FinishedJob - BuildFailed *BuildFailed + JobFinished *FinishedJob + BuildFailed *BuildFailed + BuildFinished *BuildFinished } type BuildFailed struct { Error string } + +type BuildFinished struct { +} diff --git a/distbuild/pkg/worker/state.go b/distbuild/pkg/worker/state.go new file mode 100644 index 0000000..1ac0042 --- /dev/null +++ b/distbuild/pkg/worker/state.go @@ -0,0 +1,28 @@ +package worker + +import ( + "go.uber.org/zap" + + "gitlab.com/slon/shad-go/distbuild/pkg/proto" +) + +func (w *Worker) buildHeartbeat() *proto.HeartbeatRequest { + w.mu.Lock() + defer w.mu.Unlock() + + req := &proto.HeartbeatRequest{ + FinishedJob: w.finishedJobs, + } + + w.finishedJobs = nil + return req +} + +func (w *Worker) jobFinished(job *proto.FinishedJob) { + w.log.Debug("job finished", zap.String("job_id", job.ID.String())) + + w.mu.Lock() + defer w.mu.Unlock() + + w.finishedJobs = append(w.finishedJobs, *job) +} diff --git a/distbuild/pkg/worker/worker.go b/distbuild/pkg/worker/worker.go index d0fa0d0..c27c17d 100644 --- a/distbuild/pkg/worker/worker.go +++ b/distbuild/pkg/worker/worker.go @@ -1,8 +1,13 @@ package worker import ( + "bytes" "context" + "encoding/json" + "fmt" + "io/ioutil" "net/http" + "os/exec" "sync" "go.uber.org/zap" @@ -10,6 +15,7 @@ import ( "gitlab.com/slon/shad-go/distbuild/pkg/artifact" "gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/filecache" + "gitlab.com/slon/shad-go/distbuild/pkg/proto" ) type Worker struct { @@ -25,6 +31,7 @@ type Worker struct { mu sync.Mutex newArtifacts []build.ID newSources []build.ID + finishedJobs []proto.FinishedJob } func New( @@ -48,13 +55,13 @@ func (w *Worker) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } func (w *Worker) recover() error { - err := w.fileCache.Range(func(file build.ID) error { - w.newSources = append(w.newSources, file) - return nil - }) - if err != nil { - return err - } + //err := w.fileCache.Range(func(file build.ID) error { + // w.newSources = append(w.newSources, file) + // return nil + //}) + //if err != nil { + // return err + //} return w.artifacts.Range(func(file build.ID) error { w.newArtifacts = append(w.newArtifacts, file) @@ -62,12 +69,77 @@ func (w *Worker) recover() error { }) } +func (w *Worker) sendHeartbeat(req *proto.HeartbeatRequest) (*proto.HeartbeatResponse, error) { + reqJS, err := json.Marshal(req) + if err != nil { + return nil, err + } + + httpReq, err := http.NewRequest("POST", w.coordinatorEndpoint+"/heartbeat", bytes.NewBuffer(reqJS)) + if err != nil { + return nil, err + } + + httpRsp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return nil, err + } + + if httpRsp.StatusCode != http.StatusOK { + errorString, _ := ioutil.ReadAll(httpRsp.Body) + return nil, fmt.Errorf("heartbeat failed: %s", errorString) + } + + var rsp proto.HeartbeatResponse + if err := json.NewDecoder(httpRsp.Body).Decode(&rsp); err != nil { + return nil, err + } + + return &rsp, nil +} + func (w *Worker) Run(ctx context.Context) error { if err := w.recover(); err != nil { return err } for { + w.log.Debug("sending heartbeat request") + rsp, err := w.sendHeartbeat(w.buildHeartbeat()) + if err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + w.log.DPanic("heartbeat failed", zap.Error(err)) + continue + } + w.log.Debug("received heartbeat response", + zap.Int("num_jobs", len(rsp.JobsToRun))) + + for _, job := range rsp.JobsToRun { + var finished proto.FinishedJob + finished.ID = job.Job.ID + + var stdout bytes.Buffer + var stderr bytes.Buffer + + for _, jobCmd := range job.Job.Cmds { + cmd := exec.Command(jobCmd.Exec[0], jobCmd.Exec[1:]...) + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + errorString := err.Error() + finished.Error = &errorString + finished.ExitCode = cmd.ProcessState.ExitCode() + break + } + } + + finished.Stdout = stdout.Bytes() + finished.Stderr = stderr.Bytes() + w.jobFinished(&finished) + } } }