One command running

This commit is contained in:
Fedor Korotkiy 2020-03-14 13:24:44 +03:00
parent 110b00a017
commit 5f0bb90e2c
8 changed files with 285 additions and 16 deletions

View file

@ -2,6 +2,7 @@ package disttest
import (
"context"
"errors"
"fmt"
"net"
"net/http"
@ -116,9 +117,23 @@ func newEnv(t *testing.T) (e *env, cancel func()) {
require.NoError(t, err)
go func() {
env.Logger.Error("http server stopped", zap.Error(env.HTTP.Serve(lsn)))
err := env.HTTP.Serve(lsn)
if err != http.ErrServerClosed {
env.Logger.Fatal("http server stopped", zap.Error(err))
}
}()
for _, w := range env.Workers {
go func(w *worker.Worker) {
err := w.Run(env.Ctx)
if errors.Is(err, context.Canceled) {
return
}
env.Logger.Fatal("worker stopped", zap.Error(err))
}(w)
}
return env, func() {
cancelRootContext()
_ = env.HTTP.Shutdown(context.Background())

View file

@ -15,7 +15,7 @@ var echoGraph = build.Graph{
ID: build.ID{'a'},
Name: "echo",
Cmds: []build.Cmd{
{Exec: []string{"echo", "-n", "OK"}},
{Exec: []string{"echo", "OK"}},
},
},
},
@ -25,9 +25,9 @@ func TestSingleCommand(t *testing.T) {
env, cancel := newEnv(t)
defer cancel()
var recorder Recorder
require.NoError(t, env.Client.Build(env.Ctx, echoGraph, &recorder))
recorder := NewRecorder()
require.NoError(t, env.Client.Build(env.Ctx, echoGraph, recorder))
assert.Len(t, len(recorder.Jobs), 1)
assert.Equal(t, &JobResult{Stdout: "OK", Code: new(int)}, recorder.Jobs[build.ID{'a'}])
assert.Len(t, recorder.Jobs, 1)
assert.Equal(t, &JobResult{Stdout: "OK\n", Code: new(int)}, recorder.Jobs[build.ID{'a'}])
}

View file

@ -79,6 +79,9 @@ func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener
case update.BuildFailed != nil:
return fmt.Errorf("build failed: %s", update.BuildFailed.Error)
case update.BuildFinished != nil:
return nil
case update.JobFinished != nil:
jf := update.JobFinished

View file

@ -5,6 +5,8 @@ import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
"go.uber.org/zap"
@ -20,6 +22,10 @@ type Coordinator struct {
log *zap.Logger
mux *http.ServeMux
fileCache *filecache.Cache
mu sync.Mutex
scheduledJobs map[build.ID]*scheduledJob
queue []*scheduledJob
}
func NewCoordinator(
@ -30,9 +36,12 @@ func NewCoordinator(
log: log,
mux: http.NewServeMux(),
fileCache: fileCache,
scheduledJobs: make(map[build.ID]*scheduledJob),
}
c.mux.HandleFunc("/build", c.Build)
c.mux.HandleFunc("/heartbeat", c.Heartbeat)
return c
}
@ -57,7 +66,22 @@ func (c *Coordinator) doBuild(w http.ResponseWriter, r *http.Request) error {
return err
}
return fmt.Errorf("coordinator not implemented")
for _, job := range g.Jobs {
job := job
s := c.scheduleJob(&job)
<-s.done
c.log.Debug("job finished", zap.String("job_id", job.ID.String()))
update := proto.StatusUpdate{JobFinished: s.finished}
if err := enc.Encode(update); err != nil {
return err
}
}
update := proto.StatusUpdate{BuildFinished: &proto.BuildFinished{}}
return enc.Encode(update)
}
func (c *Coordinator) Build(w http.ResponseWriter, r *http.Request) {
@ -69,3 +93,58 @@ func (c *Coordinator) Build(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write(errorJS)
}
}
func (c *Coordinator) doHeartbeat(w http.ResponseWriter, r *http.Request) error {
var req proto.HeartbeatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return fmt.Errorf("invalid request: %w", err)
}
for _, job := range req.FinishedJob {
job := job
scheduled, ok := c.lookupJob(job.ID)
if !ok {
continue
}
c.log.Debug("job finished")
scheduled.finish(&job)
}
var rsp proto.HeartbeatResponse
var job *build.Job
for i := 0; i < 10; i++ {
var ok bool
job, ok = c.pickJob()
if ok {
rsp.JobsToRun = map[build.ID]proto.JobSpec{
job.ID: {Job: *job},
}
break
}
time.Sleep(time.Millisecond)
}
if err := json.NewEncoder(w).Encode(rsp); err != nil {
return err
}
return nil
}
func (c *Coordinator) Heartbeat(w http.ResponseWriter, r *http.Request) {
c.log.Debug("heartbeat started")
if err := c.doHeartbeat(w, r); err != nil {
c.log.Error("heartbeat failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
c.log.Debug("heartbeat finished")
}

68
distbuild/pkg/dist/schedule.go vendored Normal file
View file

@ -0,0 +1,68 @@
package dist
import (
"sync"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/proto"
)
type scheduledJob struct {
job *build.Job
finished *proto.FinishedJob
mu sync.Mutex
done chan struct{}
}
func newScheduledJob(job *build.Job) *scheduledJob {
return &scheduledJob{
job: job,
done: make(chan struct{}),
}
}
func (s *scheduledJob) finish(f *proto.FinishedJob) {
s.mu.Lock()
defer s.mu.Unlock()
if s.finished == nil {
s.finished = f
close(s.done)
}
}
func (c *Coordinator) scheduleJob(job *build.Job) *scheduledJob {
c.mu.Lock()
defer c.mu.Unlock()
if scheduled, ok := c.scheduledJobs[job.ID]; ok {
return scheduled
} else {
scheduled = newScheduledJob(job)
c.scheduledJobs[job.ID] = scheduled
c.queue = append(c.queue, scheduled)
return scheduled
}
}
func (c *Coordinator) pickJob() (*build.Job, bool) {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.queue) == 0 {
return nil, false
}
job := c.queue[0].job
c.queue = c.queue[1:]
return job, true
}
func (c *Coordinator) lookupJob(id build.ID) (*scheduledJob, bool) {
c.mu.Lock()
defer c.mu.Unlock()
scheduled, ok := c.scheduledJobs[id]
return scheduled, ok
}

View file

@ -9,10 +9,14 @@ type MissingSources struct {
}
type StatusUpdate struct {
JobFinished *FinishedJob
BuildFailed *BuildFailed
JobFinished *FinishedJob
BuildFailed *BuildFailed
BuildFinished *BuildFinished
}
type BuildFailed struct {
Error string
}
type BuildFinished struct {
}

View file

@ -0,0 +1,28 @@
package worker
import (
"go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/proto"
)
func (w *Worker) buildHeartbeat() *proto.HeartbeatRequest {
w.mu.Lock()
defer w.mu.Unlock()
req := &proto.HeartbeatRequest{
FinishedJob: w.finishedJobs,
}
w.finishedJobs = nil
return req
}
func (w *Worker) jobFinished(job *proto.FinishedJob) {
w.log.Debug("job finished", zap.String("job_id", job.ID.String()))
w.mu.Lock()
defer w.mu.Unlock()
w.finishedJobs = append(w.finishedJobs, *job)
}

View file

@ -1,8 +1,13 @@
package worker
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"sync"
"go.uber.org/zap"
@ -10,6 +15,7 @@ import (
"gitlab.com/slon/shad-go/distbuild/pkg/artifact"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache"
"gitlab.com/slon/shad-go/distbuild/pkg/proto"
)
type Worker struct {
@ -25,6 +31,7 @@ type Worker struct {
mu sync.Mutex
newArtifacts []build.ID
newSources []build.ID
finishedJobs []proto.FinishedJob
}
func New(
@ -48,13 +55,13 @@ func (w *Worker) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
}
func (w *Worker) recover() error {
err := w.fileCache.Range(func(file build.ID) error {
w.newSources = append(w.newSources, file)
return nil
})
if err != nil {
return err
}
//err := w.fileCache.Range(func(file build.ID) error {
// w.newSources = append(w.newSources, file)
// return nil
//})
//if err != nil {
// return err
//}
return w.artifacts.Range(func(file build.ID) error {
w.newArtifacts = append(w.newArtifacts, file)
@ -62,12 +69,77 @@ func (w *Worker) recover() error {
})
}
func (w *Worker) sendHeartbeat(req *proto.HeartbeatRequest) (*proto.HeartbeatResponse, error) {
reqJS, err := json.Marshal(req)
if err != nil {
return nil, err
}
httpReq, err := http.NewRequest("POST", w.coordinatorEndpoint+"/heartbeat", bytes.NewBuffer(reqJS))
if err != nil {
return nil, err
}
httpRsp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return nil, err
}
if httpRsp.StatusCode != http.StatusOK {
errorString, _ := ioutil.ReadAll(httpRsp.Body)
return nil, fmt.Errorf("heartbeat failed: %s", errorString)
}
var rsp proto.HeartbeatResponse
if err := json.NewDecoder(httpRsp.Body).Decode(&rsp); err != nil {
return nil, err
}
return &rsp, nil
}
func (w *Worker) Run(ctx context.Context) error {
if err := w.recover(); err != nil {
return err
}
for {
w.log.Debug("sending heartbeat request")
rsp, err := w.sendHeartbeat(w.buildHeartbeat())
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
w.log.DPanic("heartbeat failed", zap.Error(err))
continue
}
w.log.Debug("received heartbeat response",
zap.Int("num_jobs", len(rsp.JobsToRun)))
for _, job := range rsp.JobsToRun {
var finished proto.FinishedJob
finished.ID = job.Job.ID
var stdout bytes.Buffer
var stderr bytes.Buffer
for _, jobCmd := range job.Job.Cmds {
cmd := exec.Command(jobCmd.Exec[0], jobCmd.Exec[1:]...)
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
errorString := err.Error()
finished.Error = &errorString
finished.ExitCode = cmd.ProcessState.ExitCode()
break
}
}
finished.Stdout = stdout.Bytes()
finished.Stderr = stderr.Bytes()
w.jobFinished(&finished)
}
}
}