File transfer complete

This commit is contained in:
Fedor Korotkiy 2020-04-05 00:49:25 +03:00
parent 6224193cac
commit f3d73b97ad
8 changed files with 52 additions and 17 deletions

View file

@ -64,7 +64,7 @@ type JobSpec struct {
// Artifacts задаёт воркеров, с которых можно скачать артефакты необходимые этом джобу.
Artifacts map[build.ID]WorkerID
Job build.Job
build.Job
}
type HeartbeatResponse struct {

View file

@ -62,9 +62,12 @@ func (b *Build) Run(ctx context.Context, w api.StatusWriter) error {
b.l.Debug("file upload completed")
for _, job := range b.Graph.Jobs {
job := job
spec := api.JobSpec{Job: job, SourceFiles: make(map[build.ID]string)}
for _, file := range job.Inputs {
spec.SourceFiles[b.reverseFiles[file]] = file
}
s := b.c.scheduler.ScheduleJob(&job)
s := b.c.scheduler.ScheduleJob(&spec)
select {
case <-ctx.Done():

View file

@ -113,7 +113,7 @@ func (c *Coordinator) Heartbeat(ctx context.Context, req *api.HeartbeatRequest)
job := c.scheduler.PickJob(req.WorkerID, ctx.Done())
if job != nil {
rsp.JobsToRun[job.Job.ID] = api.JobSpec{Job: *job.Job}
rsp.JobsToRun[job.Job.ID] = *job.Job
}
return rsp, nil

View file

@ -11,7 +11,7 @@ import (
)
type PendingJob struct {
Job *build.Job
Job *api.JobSpec
Result *api.JobResult
Finished chan struct{}
@ -202,7 +202,7 @@ func (c *Scheduler) doScheduleJob(job *PendingJob) {
c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String()))
}
func (c *Scheduler) ScheduleJob(job *build.Job) *PendingJob {
func (c *Scheduler) ScheduleJob(job *api.JobSpec) *PendingJob {
c.mu.Lock()
pendingJob, running := c.pendingJobs[job.ID]
if !running {

View file

@ -32,7 +32,7 @@ func TestScheduler(t *testing.T) {
t.Run("SingleJob", func(t *testing.T) {
s := NewScheduler(zaptest.NewLogger(t), config)
job0 := &build.Job{ID: build.NewID()}
job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}}
pendingJob0 := s.ScheduleJob(job0)
s.RegisterWorker(workerID0)
@ -65,8 +65,8 @@ func TestScheduler(t *testing.T) {
t.Run("CacheLocalScheduling", func(t *testing.T) {
s := NewScheduler(zaptest.NewLogger(t), config)
job0 := &build.Job{ID: build.NewID()}
job1 := &build.Job{ID: build.NewID()}
job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}}
job1 := &api.JobSpec{Job: build.Job{ID: build.NewID()}}
s.RegisterWorker(workerID0)
s.OnJobComplete(workerID0, job0.ID, &api.JobResult{})
@ -89,9 +89,9 @@ func TestScheduler(t *testing.T) {
t.Run("DependencyLocalScheduling", func(t *testing.T) {
s := NewScheduler(zaptest.NewLogger(t), config)
job0 := &build.Job{ID: build.NewID()}
job1 := &build.Job{ID: build.NewID(), Deps: []build.ID{job0.ID}}
job2 := &build.Job{ID: build.NewID()}
job0 := &api.JobSpec{Job: build.Job{ID: build.NewID()}}
job1 := &api.JobSpec{Job: build.Job{ID: build.NewID(), Deps: []build.ID{job0.ID}}}
job2 := &api.JobSpec{Job: build.Job{ID: build.NewID()}}
s.RegisterWorker(workerID0)
s.OnJobComplete(workerID0, job0.ID, &api.JobResult{})

View file

@ -0,0 +1,26 @@
package worker
import (
"context"
"errors"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache"
)
func (w *Worker) pullFiles(ctx context.Context, files map[build.ID]string) error {
for id := range files {
_, unlock, err := w.fileCache.Get(id)
if errors.Is(err, filecache.ErrNotFound) {
if err := w.fileClient.Download(ctx, w.fileCache, id); err != nil {
return err
}
} else if err != nil {
return err
} else {
unlock()
}
}
return nil
}

View file

@ -165,6 +165,10 @@ func (w *Worker) runJob(ctx context.Context, spec *api.JobSpec) (*api.JobResult,
return res, nil
}
if err := w.pullFiles(ctx, spec.SourceFiles); err != nil {
return nil, err
}
aRoot, commit, abort, err := w.artifacts.Create(spec.Job.ID)
if err != nil {
return nil, err

View file

@ -7,6 +7,7 @@ import (
"sync"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
"gitlab.com/slon/shad-go/distbuild/pkg/api"
"gitlab.com/slon/shad-go/distbuild/pkg/artifact"
@ -20,13 +21,14 @@ type Worker struct {
log *zap.Logger
fileCache *filecache.Cache
fileCache *filecache.Cache
fileClient *filecache.Client
fileOnce singleflight.Group
artifacts *artifact.Cache
mux *http.ServeMux
fileClient *filecache.Client
heartbeat *api.HeartbeatClient
mux *http.ServeMux
heartbeat *api.HeartbeatClient
mu sync.Mutex
newArtifacts []build.ID