shad-go/distbuild/pkg/worker/worker.go

112 lines
2.4 KiB
Go
Raw Normal View History

2020-03-10 12:08:59 +00:00
package worker
import (
"context"
2020-03-14 10:24:44 +00:00
"fmt"
2020-03-11 22:46:45 +00:00
"net/http"
2020-03-10 12:08:59 +00:00
"sync"
2020-03-11 22:46:45 +00:00
"go.uber.org/zap"
2020-04-04 21:49:25 +00:00
"golang.org/x/sync/singleflight"
2020-03-11 22:46:45 +00:00
2020-03-29 16:03:07 +00:00
"gitlab.com/slon/shad-go/distbuild/pkg/api"
2020-03-10 12:08:59 +00:00
"gitlab.com/slon/shad-go/distbuild/pkg/artifact"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache"
)
type Worker struct {
2020-03-29 16:03:07 +00:00
id api.WorkerID
2020-03-11 22:46:45 +00:00
coordinatorEndpoint string
log *zap.Logger
2020-04-04 21:49:25 +00:00
fileCache *filecache.Cache
fileClient *filecache.Client
fileOnce singleflight.Group
2020-03-10 12:08:59 +00:00
2020-04-04 21:49:25 +00:00
artifacts *artifact.Cache
2020-03-10 12:08:59 +00:00
2020-04-04 21:49:25 +00:00
mux *http.ServeMux
heartbeat *api.HeartbeatClient
2020-04-04 21:36:53 +00:00
2020-03-10 12:08:59 +00:00
mu sync.Mutex
newArtifacts []build.ID
newSources []build.ID
2020-03-29 16:03:07 +00:00
finishedJobs []api.JobResult
2020-03-10 12:08:59 +00:00
}
2020-03-11 22:46:45 +00:00
func New(
2020-03-29 16:03:07 +00:00
workerID api.WorkerID,
2020-03-11 22:46:45 +00:00
coordinatorEndpoint string,
log *zap.Logger,
fileCache *filecache.Cache,
artifacts *artifact.Cache,
) *Worker {
2020-04-05 12:00:33 +00:00
w := &Worker{
2020-03-28 21:34:09 +00:00
id: workerID,
2020-03-11 22:46:45 +00:00
coordinatorEndpoint: coordinatorEndpoint,
log: log,
2020-04-04 21:36:53 +00:00
fileCache: fileCache,
artifacts: artifacts,
fileClient: filecache.NewClient(log, coordinatorEndpoint),
heartbeat: api.NewHeartbeatClient(log, coordinatorEndpoint),
2020-03-11 22:46:45 +00:00
mux: http.NewServeMux(),
}
2020-04-05 12:00:33 +00:00
artifact.NewHandler(w.log, w.artifacts).Register(w.mux)
return w
2020-03-11 22:46:45 +00:00
}
func (w *Worker) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
w.mux.ServeHTTP(rw, r)
}
2020-03-10 12:08:59 +00:00
func (w *Worker) recover() error {
2020-03-11 22:46:45 +00:00
return w.artifacts.Range(func(file build.ID) error {
2020-03-10 12:08:59 +00:00
w.newArtifacts = append(w.newArtifacts, file)
return nil
})
}
func (w *Worker) Run(ctx context.Context) error {
if err := w.recover(); err != nil {
return err
}
for {
2020-03-14 10:24:44 +00:00
w.log.Debug("sending heartbeat request")
2020-04-04 21:36:53 +00:00
rsp, err := w.heartbeat.Heartbeat(ctx, w.buildHeartbeat())
2020-03-14 10:24:44 +00:00
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
w.log.DPanic("heartbeat failed", zap.Error(err))
continue
}
w.log.Debug("received heartbeat response",
zap.Int("num_jobs", len(rsp.JobsToRun)))
2020-03-28 13:54:43 +00:00
for _, spec := range rsp.JobsToRun {
2020-03-28 14:35:01 +00:00
spec := spec
2020-03-28 21:34:09 +00:00
w.log.Debug("running job", zap.String("job_id", spec.Job.ID.String()))
2020-03-28 13:54:43 +00:00
result, err := w.runJob(ctx, &spec)
if err != nil {
2020-03-28 14:22:24 +00:00
errStr := fmt.Sprintf("job %s failed: %v", spec.Job.ID, err)
2020-03-28 21:34:09 +00:00
w.log.Debug("job failed", zap.String("job_id", spec.Job.ID.String()), zap.Error(err))
2020-03-29 16:03:07 +00:00
w.jobFinished(&api.JobResult{ID: spec.Job.ID, Error: &errStr})
2020-03-28 13:54:43 +00:00
continue
2020-03-14 10:24:44 +00:00
}
2020-03-28 21:34:09 +00:00
w.log.Debug("job finished", zap.String("job_id", spec.Job.ID.String()))
2020-03-28 13:54:43 +00:00
w.jobFinished(result)
2020-03-14 10:24:44 +00:00
}
2020-03-10 12:08:59 +00:00
}
}