From f3d73b97ad4ac4e3562cf3290416f96083d90bc3 Mon Sep 17 00:00:00 2001 From: Fedor Korotkiy Date: Sun, 5 Apr 2020 00:49:25 +0300 Subject: [PATCH] File transfer complete --- distbuild/pkg/api/heartbeat.go | 2 +- distbuild/pkg/dist/build.go | 7 ++++-- distbuild/pkg/dist/coordinator.go | 2 +- distbuild/pkg/scheduler/scheduler.go | 4 ++-- distbuild/pkg/scheduler/scheduler_test.go | 12 +++++------ distbuild/pkg/worker/download.go | 26 +++++++++++++++++++++++ distbuild/pkg/worker/job.go | 4 ++++ distbuild/pkg/worker/worker.go | 12 ++++++----- 8 files changed, 52 insertions(+), 17 deletions(-) create mode 100644 distbuild/pkg/worker/download.go diff --git a/distbuild/pkg/api/heartbeat.go b/distbuild/pkg/api/heartbeat.go index 3210406..09b4da2 100644 --- a/distbuild/pkg/api/heartbeat.go +++ b/distbuild/pkg/api/heartbeat.go @@ -64,7 +64,7 @@ type JobSpec struct { // Artifacts задаёт воркеров, с которых можно скачать артефакты необходимые этом джобу. Artifacts map[build.ID]WorkerID - Job build.Job + build.Job } type HeartbeatResponse struct { diff --git a/distbuild/pkg/dist/build.go b/distbuild/pkg/dist/build.go index abc67e7..933f638 100644 --- a/distbuild/pkg/dist/build.go +++ b/distbuild/pkg/dist/build.go @@ -62,9 +62,12 @@ func (b *Build) Run(ctx context.Context, w api.StatusWriter) error { b.l.Debug("file upload completed") for _, job := range b.Graph.Jobs { - job := job + spec := api.JobSpec{Job: job, SourceFiles: make(map[build.ID]string)} + for _, file := range job.Inputs { + spec.SourceFiles[b.reverseFiles[file]] = file + } - s := b.c.scheduler.ScheduleJob(&job) + s := b.c.scheduler.ScheduleJob(&spec) select { case <-ctx.Done(): diff --git a/distbuild/pkg/dist/coordinator.go b/distbuild/pkg/dist/coordinator.go index 25add14..352d453 100644 --- a/distbuild/pkg/dist/coordinator.go +++ b/distbuild/pkg/dist/coordinator.go @@ -113,7 +113,7 @@ func (c *Coordinator) Heartbeat(ctx context.Context, req *api.HeartbeatRequest) job := c.scheduler.PickJob(req.WorkerID, ctx.Done()) if job != nil { - rsp.JobsToRun[job.Job.ID] = api.JobSpec{Job: *job.Job} + rsp.JobsToRun[job.Job.ID] = *job.Job } return rsp, nil diff --git a/distbuild/pkg/scheduler/scheduler.go b/distbuild/pkg/scheduler/scheduler.go index 6026a97..a5c65db 100644 --- a/distbuild/pkg/scheduler/scheduler.go +++ b/distbuild/pkg/scheduler/scheduler.go @@ -11,7 +11,7 @@ import ( ) type PendingJob struct { - Job *build.Job + Job *api.JobSpec Result *api.JobResult Finished chan struct{} @@ -202,7 +202,7 @@ func (c *Scheduler) doScheduleJob(job *PendingJob) { c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String())) } -func (c *Scheduler) ScheduleJob(job *build.Job) *PendingJob { +func (c *Scheduler) ScheduleJob(job *api.JobSpec) *PendingJob { c.mu.Lock() pendingJob, running := c.pendingJobs[job.ID] if !running { diff --git a/distbuild/pkg/scheduler/scheduler_test.go b/distbuild/pkg/scheduler/scheduler_test.go index ead8492..205dfb0 100644 --- a/distbuild/pkg/scheduler/scheduler_test.go +++ b/distbuild/pkg/scheduler/scheduler_test.go @@ -32,7 +32,7 @@ func TestScheduler(t *testing.T) { t.Run("SingleJob", func(t *testing.T) { s := NewScheduler(zaptest.NewLogger(t), config) - job0 := &build.Job{ID: build.NewID()} + job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} pendingJob0 := s.ScheduleJob(job0) s.RegisterWorker(workerID0) @@ -65,8 +65,8 @@ func TestScheduler(t *testing.T) { t.Run("CacheLocalScheduling", func(t *testing.T) { s := NewScheduler(zaptest.NewLogger(t), config) - job0 := &build.Job{ID: build.NewID()} - job1 := &build.Job{ID: build.NewID()} + job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} + job1 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} s.RegisterWorker(workerID0) s.OnJobComplete(workerID0, job0.ID, &api.JobResult{}) @@ -89,9 +89,9 @@ func TestScheduler(t *testing.T) { t.Run("DependencyLocalScheduling", func(t *testing.T) { s := NewScheduler(zaptest.NewLogger(t), config) - job0 := &build.Job{ID: build.NewID()} - job1 := &build.Job{ID: build.NewID(), Deps: []build.ID{job0.ID}} - job2 := &build.Job{ID: build.NewID()} + job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} + job1 := &api.JobSpec{Job: build.Job{ID: build.NewID(), Deps: []build.ID{job0.ID}}} + job2 := &api.JobSpec{Job: build.Job{ID: build.NewID()}} s.RegisterWorker(workerID0) s.OnJobComplete(workerID0, job0.ID, &api.JobResult{}) diff --git a/distbuild/pkg/worker/download.go b/distbuild/pkg/worker/download.go new file mode 100644 index 0000000..2c2c4d6 --- /dev/null +++ b/distbuild/pkg/worker/download.go @@ -0,0 +1,26 @@ +package worker + +import ( + "context" + "errors" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" + "gitlab.com/slon/shad-go/distbuild/pkg/filecache" +) + +func (w *Worker) pullFiles(ctx context.Context, files map[build.ID]string) error { + for id := range files { + _, unlock, err := w.fileCache.Get(id) + if errors.Is(err, filecache.ErrNotFound) { + if err := w.fileClient.Download(ctx, w.fileCache, id); err != nil { + return err + } + } else if err != nil { + return err + } else { + unlock() + } + } + + return nil +} diff --git a/distbuild/pkg/worker/job.go b/distbuild/pkg/worker/job.go index 85b87bf..631b546 100644 --- a/distbuild/pkg/worker/job.go +++ b/distbuild/pkg/worker/job.go @@ -165,6 +165,10 @@ func (w *Worker) runJob(ctx context.Context, spec *api.JobSpec) (*api.JobResult, return res, nil } + if err := w.pullFiles(ctx, spec.SourceFiles); err != nil { + return nil, err + } + aRoot, commit, abort, err := w.artifacts.Create(spec.Job.ID) if err != nil { return nil, err diff --git a/distbuild/pkg/worker/worker.go b/distbuild/pkg/worker/worker.go index 230712b..6805177 100644 --- a/distbuild/pkg/worker/worker.go +++ b/distbuild/pkg/worker/worker.go @@ -7,6 +7,7 @@ import ( "sync" "go.uber.org/zap" + "golang.org/x/sync/singleflight" "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/artifact" @@ -20,13 +21,14 @@ type Worker struct { log *zap.Logger - fileCache *filecache.Cache + fileCache *filecache.Cache + fileClient *filecache.Client + fileOnce singleflight.Group + artifacts *artifact.Cache - mux *http.ServeMux - - fileClient *filecache.Client - heartbeat *api.HeartbeatClient + mux *http.ServeMux + heartbeat *api.HeartbeatClient mu sync.Mutex newArtifacts []build.ID