shad-go/distbuild/pkg/dist/coordinator.go

121 lines
2.5 KiB
Go
Raw Normal View History

2020-03-11 22:46:45 +00:00
package dist
import (
2020-03-29 16:24:18 +00:00
"context"
2020-03-11 22:46:45 +00:00
"fmt"
"net/http"
2020-03-14 10:24:44 +00:00
"sync"
"time"
2020-03-11 22:46:45 +00:00
"go.uber.org/zap"
2020-03-29 16:03:07 +00:00
"gitlab.com/slon/shad-go/distbuild/pkg/api"
2020-03-11 22:46:45 +00:00
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache"
2020-03-28 21:34:09 +00:00
"gitlab.com/slon/shad-go/distbuild/pkg/scheduler"
2020-03-11 22:46:45 +00:00
)
type Coordinator struct {
log *zap.Logger
mux *http.ServeMux
fileCache *filecache.Cache
2020-03-14 10:24:44 +00:00
2020-03-28 21:34:09 +00:00
mu sync.Mutex
builds map[build.ID]*Build
scheduler *scheduler.Scheduler
}
var defaultConfig = scheduler.Config{
CacheTimeout: time.Millisecond * 10,
DepsTimeout: time.Millisecond * 100,
2020-03-11 22:46:45 +00:00
}
func NewCoordinator(
log *zap.Logger,
fileCache *filecache.Cache,
) *Coordinator {
c := &Coordinator{
log: log,
mux: http.NewServeMux(),
fileCache: fileCache,
2020-03-14 10:24:44 +00:00
2020-03-28 21:34:09 +00:00
builds: make(map[build.ID]*Build),
scheduler: scheduler.NewScheduler(log, defaultConfig),
2020-03-11 22:46:45 +00:00
}
2020-04-04 21:13:45 +00:00
apiHandler := api.NewBuildService(log, c)
2020-03-29 16:24:18 +00:00
apiHandler.Register(c.mux)
2020-04-04 20:11:21 +00:00
heartbeatHandler := api.NewHeartbeatHandler(log, c)
heartbeatHandler.Register(c.mux)
2020-04-04 21:13:45 +00:00
fileHandler := filecache.NewHandler(log, c.fileCache)
fileHandler.Register(c.mux)
2020-03-11 22:46:45 +00:00
return c
}
func (c *Coordinator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c.mux.ServeHTTP(w, r)
}
2020-04-04 21:13:45 +00:00
func (c *Coordinator) addBuild(b *Build) {
c.mu.Lock()
defer c.mu.Unlock()
2020-03-11 22:46:45 +00:00
2020-04-04 21:13:45 +00:00
c.builds[b.ID] = b
}
2020-03-14 10:24:44 +00:00
2020-04-04 21:13:45 +00:00
func (c *Coordinator) removeBuild(b *Build) {
c.mu.Lock()
defer c.mu.Unlock()
2020-03-28 21:34:09 +00:00
2020-04-04 21:13:45 +00:00
delete(c.builds, b.ID)
}
2020-03-14 10:24:44 +00:00
2020-04-04 21:13:45 +00:00
func (c *Coordinator) getBuild(id build.ID) *Build {
c.mu.Lock()
defer c.mu.Unlock()
2020-03-14 10:24:44 +00:00
2020-04-04 21:13:45 +00:00
return c.builds[id]
}
func (c *Coordinator) StartBuild(ctx context.Context, req *api.BuildRequest, w api.StatusWriter) error {
b := NewBuild(&req.Graph, c)
2020-03-14 10:24:44 +00:00
2020-04-04 21:13:45 +00:00
c.addBuild(b)
defer c.removeBuild(b)
return b.Run(ctx, w)
2020-03-11 22:46:45 +00:00
}
2020-03-29 16:24:18 +00:00
func (c *Coordinator) SignalBuild(ctx context.Context, buildID build.ID, signal *api.SignalRequest) (*api.SignalResponse, error) {
2020-04-04 21:13:45 +00:00
b := c.getBuild(buildID)
if b == nil {
return nil, fmt.Errorf("build %q not found", buildID)
}
return b.Signal(ctx, signal)
2020-03-11 22:46:45 +00:00
}
2020-03-14 10:24:44 +00:00
2020-04-04 20:11:21 +00:00
func (c *Coordinator) Heartbeat(ctx context.Context, req *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {
2020-03-28 21:34:09 +00:00
c.scheduler.RegisterWorker(req.WorkerID)
2020-03-14 10:24:44 +00:00
for _, job := range req.FinishedJob {
job := job
2020-03-28 21:34:09 +00:00
c.scheduler.OnJobComplete(req.WorkerID, job.ID, &job)
2020-03-14 10:24:44 +00:00
}
2020-04-04 20:11:21 +00:00
rsp := &api.HeartbeatResponse{
2020-03-29 16:03:07 +00:00
JobsToRun: map[build.ID]api.JobSpec{},
2020-03-28 21:34:09 +00:00
}
2020-03-14 10:24:44 +00:00
2020-04-04 20:11:21 +00:00
job := c.scheduler.PickJob(req.WorkerID, ctx.Done())
2020-03-28 21:34:09 +00:00
if job != nil {
2020-03-29 16:03:07 +00:00
rsp.JobsToRun[job.Job.ID] = api.JobSpec{Job: *job.Job}
2020-03-14 10:24:44 +00:00
}
2020-04-04 20:11:21 +00:00
return rsp, nil
2020-03-14 10:24:44 +00:00
}