Place build tags

This commit is contained in:
Fedor Korotkiy 2020-04-05 16:24:48 +03:00
parent bfdd54bdb4
commit 6a208e7817
19 changed files with 74 additions and 1828 deletions

View file

@ -1,13 +1,9 @@
// +build !solution
package api package api
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"go.uber.org/zap" "go.uber.org/zap"
@ -15,107 +11,16 @@ import (
) )
type BuildClient struct { type BuildClient struct {
l *zap.Logger
endpoint string
} }
func NewBuildClient(l *zap.Logger, endpoint string) *BuildClient { func NewBuildClient(l *zap.Logger, endpoint string) *BuildClient {
return &BuildClient{ panic("implement me")
l: l,
endpoint: endpoint,
}
}
type statusReader struct {
r io.ReadCloser
dec *json.Decoder
}
func (r *statusReader) Close() error {
return r.r.Close()
}
func (r *statusReader) Next() (*StatusUpdate, error) {
var u StatusUpdate
if err := r.dec.Decode(&u); err != nil {
return nil, err
}
return &u, nil
} }
func (c *BuildClient) StartBuild(ctx context.Context, request *BuildRequest) (*BuildStarted, StatusReader, error) { func (c *BuildClient) StartBuild(ctx context.Context, request *BuildRequest) (*BuildStarted, StatusReader, error) {
reqJSON, err := json.Marshal(request) panic("implement me")
if err != nil {
return nil, nil, err
}
req, err := http.NewRequest("POST", c.endpoint+"/build", bytes.NewBuffer(reqJSON))
if err != nil {
return nil, nil, err
}
req.Header.Set("content-type", "application/json")
rsp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return nil, nil, err
}
defer func() {
if rsp.Body != nil {
_ = rsp.Body.Close()
}
}()
if rsp.StatusCode != 200 {
bodyStr, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return nil, nil, fmt.Errorf("build request failed: %v", err)
}
return nil, nil, fmt.Errorf("build failed: %s", bodyStr)
}
dec := json.NewDecoder(rsp.Body)
var started BuildStarted
if err := dec.Decode(&started); err != nil {
return nil, nil, err
}
r := &statusReader{r: rsp.Body, dec: dec}
rsp.Body = nil
return &started, r, nil
} }
func (c *BuildClient) SignalBuild(ctx context.Context, buildID build.ID, signal *SignalRequest) (*SignalResponse, error) { func (c *BuildClient) SignalBuild(ctx context.Context, buildID build.ID, signal *SignalRequest) (*SignalResponse, error) {
signalJSON, err := json.Marshal(signal) panic("implement me")
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", c.endpoint+"/signal?build_id="+buildID.String(), bytes.NewBuffer(signalJSON))
if err != nil {
return nil, err
}
req.Header.Set("content-type", "application/json")
rsp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
defer rsp.Body.Close()
rspBody, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return nil, fmt.Errorf("signal request failed: %v", err)
}
if rsp.StatusCode != 200 {
return nil, fmt.Errorf("signal failed: %s", rspBody)
}
var signalRsp SignalResponse
if err = json.Unmarshal(rspBody, &rsp); err != nil {
return nil, err
}
return &signalRsp, err
} }

View file

@ -1,142 +1,20 @@
// +build !solution
package api package api
import ( import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http" "net/http"
"go.uber.org/zap" "go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
) )
func NewBuildService(l *zap.Logger, s Service) *BuildHandler { func NewBuildService(l *zap.Logger, s Service) *BuildHandler {
return &BuildHandler{ panic("implement me")
l: l,
s: s,
}
} }
type BuildHandler struct { type BuildHandler struct {
l *zap.Logger
s Service
} }
func (h *BuildHandler) Register(mux *http.ServeMux) { func (h *BuildHandler) Register(mux *http.ServeMux) {
mux.HandleFunc("/build", h.build) panic("implement me")
mux.HandleFunc("/signal", h.signal)
}
type statusWriter struct {
id build.ID
h *BuildHandler
written bool
w http.ResponseWriter
flush http.Flusher
enc *json.Encoder
}
func (w *statusWriter) Started(rsp *BuildStarted) error {
w.id = rsp.ID
w.written = true
w.h.l.Debug("build started", zap.String("build_id", w.id.String()), zap.Any("started", rsp))
w.w.Header().Set("content-type", "application/json")
w.w.WriteHeader(http.StatusOK)
defer w.flush.Flush()
return w.enc.Encode(rsp)
}
func (w *statusWriter) Updated(update *StatusUpdate) error {
w.h.l.Debug("build updated", zap.String("build_id", w.id.String()), zap.Any("update", update))
defer w.flush.Flush()
return w.enc.Encode(update)
}
func (h *BuildHandler) doBuild(w http.ResponseWriter, r *http.Request) error {
reqJSON, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
var req BuildRequest
if err = json.Unmarshal(reqJSON, &req); err != nil {
return err
}
flush, ok := w.(http.Flusher)
if !ok {
return fmt.Errorf("response writer does not implement http.Flusher")
}
sw := &statusWriter{h: h, w: w, enc: json.NewEncoder(w), flush: flush}
err = h.s.StartBuild(r.Context(), &req, sw)
if err != nil {
if sw.written {
_ = sw.Updated(&StatusUpdate{BuildFailed: &BuildFailed{Error: err.Error()}})
return nil
}
return err
}
return nil
}
func (h *BuildHandler) build(w http.ResponseWriter, r *http.Request) {
if err := h.doBuild(w, r); err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "%v", err)
}
}
func (h *BuildHandler) doSignal(w http.ResponseWriter, r *http.Request) error {
buildIDParam := r.URL.Query().Get("build_id")
if buildIDParam == "" {
return fmt.Errorf(`"build_id" parameter is missing`)
}
var buildID build.ID
if err := buildID.UnmarshalText([]byte(buildIDParam)); err != nil {
return err
}
reqJSON, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
var req SignalRequest
if err = json.Unmarshal(reqJSON, &req); err != nil {
return err
}
rsp, err := h.s.SignalBuild(r.Context(), buildID, &req)
if err != nil {
return err
}
rspJSON, err := json.Marshal(rsp)
if err != nil {
return err
}
w.Header().Set("content-type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(rspJSON)
return nil
}
func (h *BuildHandler) signal(w http.ResponseWriter, r *http.Request) {
if err := h.doSignal(w, r); err != nil {
h.l.Warn("build signal failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "%v", err)
}
} }

View file

@ -1,56 +1,20 @@
// +build !solution
package api package api
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"go.uber.org/zap" "go.uber.org/zap"
) )
type HeartbeatClient struct { type HeartbeatClient struct {
l *zap.Logger
endpoint string
} }
func NewHeartbeatClient(l *zap.Logger, endpoint string) *HeartbeatClient { func NewHeartbeatClient(l *zap.Logger, endpoint string) *HeartbeatClient {
return &HeartbeatClient{l: l, endpoint: endpoint} panic("implement me")
} }
func (c *HeartbeatClient) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) { func (c *HeartbeatClient) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) {
reqJSON, err := json.Marshal(req) panic("implement me")
if err != nil {
return nil, err
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint+"/heartbeat", bytes.NewBuffer(reqJSON))
if err != nil {
return nil, err
}
httpRsp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return nil, err
}
defer httpRsp.Body.Close()
if httpRsp.StatusCode != http.StatusOK {
errorMsg, _ := ioutil.ReadAll(httpRsp.Body)
return nil, fmt.Errorf("heartbeat failed: %s", errorMsg)
}
rspJSON, err := ioutil.ReadAll(httpRsp.Body)
if err != nil {
return nil, err
}
rsp := &HeartbeatResponse{}
if err = json.Unmarshal(rspJSON, rsp); err != nil {
return nil, err
}
return rsp, nil
} }

View file

@ -1,60 +1,20 @@
// +build !solution
package api package api
import ( import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http" "net/http"
"go.uber.org/zap" "go.uber.org/zap"
) )
type HeartbeatHandler struct { type HeartbeatHandler struct {
l *zap.Logger
s HeartbeatService
} }
func NewHeartbeatHandler(l *zap.Logger, s HeartbeatService) *HeartbeatHandler { func NewHeartbeatHandler(l *zap.Logger, s HeartbeatService) *HeartbeatHandler {
return &HeartbeatHandler{l: l, s: s} panic("implement me")
} }
func (h *HeartbeatHandler) Register(mux *http.ServeMux) { func (h *HeartbeatHandler) Register(mux *http.ServeMux) {
mux.HandleFunc("/heartbeat", h.heartbeat) panic("implement me")
}
func (h *HeartbeatHandler) doHeartbeat(w http.ResponseWriter, r *http.Request) error {
reqJSON, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
var req HeartbeatRequest
if err = json.Unmarshal(reqJSON, &req); err != nil {
return err
}
h.l.Debug("heartbeat started", zap.Any("req", req))
rsp, err := h.s.Heartbeat(r.Context(), &req)
if err != nil {
return err
}
h.l.Debug("heartbeat finished", zap.Any("rsp", rsp))
rspJSON, err := json.Marshal(rsp)
if err != nil {
return err
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(rspJSON)
return nil
}
func (h *HeartbeatHandler) heartbeat(w http.ResponseWriter, r *http.Request) {
if err := h.doHeartbeat(w, r); err != nil {
h.l.Warn("heartbeat error", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "%v", err)
}
} }

View file

@ -1,13 +1,9 @@
// +build !solution
package artifact package artifact
import ( import (
"encoding/hex"
"errors" "errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
) )
@ -20,172 +16,24 @@ var (
) )
type Cache struct { type Cache struct {
tmpDir string
cacheDir string
mu sync.Mutex
writeLocked map[build.ID]struct{}
readLocked map[build.ID]int
} }
func NewCache(root string) (*Cache, error) { func NewCache(root string) (*Cache, error) {
tmpDir := filepath.Join(root, "tmp") panic("implement me")
if err := os.RemoveAll(tmpDir); err != nil {
return nil, err
}
if err := os.MkdirAll(tmpDir, 0777); err != nil {
return nil, err
}
cacheDir := filepath.Join(root, "c")
if err := os.MkdirAll(cacheDir, 0777); err != nil {
return nil, err
}
for i := 0; i < 256; i++ {
d := hex.EncodeToString([]byte{uint8(i)})
if err := os.MkdirAll(filepath.Join(cacheDir, d), 0777); err != nil {
return nil, err
}
}
return &Cache{
tmpDir: tmpDir,
cacheDir: cacheDir,
writeLocked: make(map[build.ID]struct{}),
readLocked: make(map[build.ID]int),
}, nil
}
func (c *Cache) readLock(id build.ID) error {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.writeLocked[id]; ok {
return ErrWriteLocked
}
c.readLocked[id]++
return nil
}
func (c *Cache) readUnlock(id build.ID) {
c.mu.Lock()
defer c.mu.Unlock()
c.readLocked[id]--
if c.readLocked[id] == 0 {
delete(c.readLocked, id)
}
}
func (c *Cache) writeLock(id build.ID, remove bool) error {
c.mu.Lock()
defer c.mu.Unlock()
_, err := os.Stat(filepath.Join(c.cacheDir, id.Path()))
if !os.IsNotExist(err) && err != nil {
return err
} else if err == nil && !remove {
return ErrExists
}
if _, ok := c.writeLocked[id]; ok {
return ErrWriteLocked
}
if c.readLocked[id] > 0 {
return ErrReadLocked
}
c.writeLocked[id] = struct{}{}
return nil
}
func (c *Cache) writeUnlock(id build.ID) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.writeLocked, id)
} }
func (c *Cache) Range(artifactFn func(artifact build.ID) error) error { func (c *Cache) Range(artifactFn func(artifact build.ID) error) error {
shards, err := ioutil.ReadDir(c.cacheDir) panic("implement me")
if err != nil {
return err
}
for _, shard := range shards {
dirs, err := ioutil.ReadDir(filepath.Join(c.cacheDir, shard.Name()))
if err != nil {
return err
}
for _, d := range dirs {
var id build.ID
if err := id.UnmarshalText([]byte(d.Name())); err != nil {
return fmt.Errorf("invalid artifact name: %w", err)
}
if err := artifactFn(id); err != nil {
return err
}
}
}
return nil
} }
func (c *Cache) Remove(artifact build.ID) error { func (c *Cache) Remove(artifact build.ID) error {
if err := c.writeLock(artifact, true); err != nil { panic("implement me")
return err
}
defer c.writeUnlock(artifact)
return os.RemoveAll(filepath.Join(c.cacheDir, artifact.Path()))
} }
func (c *Cache) Create(artifact build.ID) (path string, commit, abort func() error, err error) { func (c *Cache) Create(artifact build.ID) (path string, commit, abort func() error, err error) {
if err = c.writeLock(artifact, false); err != nil { panic("implement me")
return
}
path = filepath.Join(c.tmpDir, artifact.String())
if err = os.MkdirAll(path, 0777); err != nil {
c.writeUnlock(artifact)
return
}
abort = func() error {
defer c.writeUnlock(artifact)
return os.RemoveAll(path)
}
commit = func() error {
defer c.writeUnlock(artifact)
return os.Rename(path, filepath.Join(c.cacheDir, artifact.Path()))
}
return
} }
func (c *Cache) Get(artifact build.ID) (path string, unlock func(), err error) { func (c *Cache) Get(artifact build.ID) (path string, unlock func(), err error) {
if err = c.readLock(artifact); err != nil { panic("implement me")
return
}
path = filepath.Join(c.cacheDir, artifact.Path())
if _, err = os.Stat(path); err != nil {
c.readUnlock(artifact)
if os.IsNotExist(err) {
err = ErrNotFound
}
return
}
unlock = func() {
c.readUnlock(artifact)
}
return
} }

View file

@ -1,42 +1,14 @@
// +build !solution
package artifact package artifact
import ( import (
"context" "context"
"fmt"
"io/ioutil"
"net/http"
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/tarstream"
) )
// Download artifact from remote cache into local cache. // Download artifact from remote cache into local cache.
func Download(ctx context.Context, endpoint string, c *Cache, artifactID build.ID) error { func Download(ctx context.Context, endpoint string, c *Cache, artifactID build.ID) error {
dir, commit, abort, err := c.Create(artifactID) panic("implement me")
if err != nil {
return err
}
defer func() { _ = abort() }()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint+"/artifact?id="+artifactID.String(), nil)
if err != nil {
return err
}
rsp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
errStr, _ := ioutil.ReadAll(rsp.Body)
return fmt.Errorf("download: %s", errStr)
}
if err := tarstream.Receive(dir, rsp.Body); err != nil {
return err
}
return commit()
} }

View file

@ -1,54 +1,20 @@
// +build !solution
package artifact package artifact
import ( import (
"fmt"
"net/http" "net/http"
"go.uber.org/zap" "go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/tarstream"
) )
type Handler struct { type Handler struct {
l *zap.Logger
c *Cache
} }
func NewHandler(l *zap.Logger, c *Cache) *Handler { func NewHandler(l *zap.Logger, c *Cache) *Handler {
return &Handler{l: l, c: c} panic("implement me")
} }
func (h *Handler) Register(mux *http.ServeMux) { func (h *Handler) Register(mux *http.ServeMux) {
mux.HandleFunc("/artifact", h.artifact) panic("implement me")
}
func (h *Handler) doArtifact(w http.ResponseWriter, r *http.Request) error {
idStr := r.URL.Query().Get("id")
var id build.ID
if err := id.UnmarshalText([]byte(idStr)); err != nil {
return err
}
h.l.Debug("streaming artifact", zap.String("artifact_id", id.String()))
artifactDir, unlock, err := h.c.Get(id)
if err != nil {
return err
}
defer unlock()
w.WriteHeader(http.StatusOK)
if err := tarstream.Send(artifactDir, w); err != nil {
h.l.Warn("error streaming artifact", zap.Error(err))
}
return nil
}
func (h *Handler) artifact(w http.ResponseWriter, r *http.Request) {
if err := h.doArtifact(w, r); err != nil {
h.l.Warn("artifact handler error", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "%v", err)
}
} }

View file

@ -1,23 +1,16 @@
// +build !solution
package client package client
import ( import (
"context" "context"
"fmt"
"io"
"path/filepath"
"go.uber.org/zap" "go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/api"
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache"
) )
type Client struct { type Client struct {
l *zap.Logger
client *api.BuildClient
cache *filecache.Client
sourceDir string
} }
func NewClient( func NewClient(
@ -25,12 +18,7 @@ func NewClient(
apiEndpoint string, apiEndpoint string,
sourceDir string, sourceDir string,
) *Client { ) *Client {
return &Client{ panic("implement me")
l: l,
client: api.NewBuildClient(l, apiEndpoint),
cache: filecache.NewClient(l, apiEndpoint),
sourceDir: sourceDir,
}
} }
type BuildListener interface { type BuildListener interface {
@ -41,84 +29,6 @@ type BuildListener interface {
OnJobFailed(jobID build.ID, code int, error string) error OnJobFailed(jobID build.ID, code int, error string) error
} }
func (c *Client) uploadSources(ctx context.Context, graph *build.Graph, started *api.BuildStarted) error {
for _, id := range started.MissingFiles {
c.l.Debug("uploading missing file to coordinator", zap.String("id", id.String()))
path, ok := graph.SourceFiles[id]
if !ok {
return fmt.Errorf("file is missing in build graph: id=%s", id)
}
absPath := filepath.Join(c.sourceDir, path)
if err := c.cache.Upload(ctx, id, absPath); err != nil {
return err
}
}
return nil
}
func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener) error { func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener) error {
started, r, err := c.client.StartBuild(ctx, &api.BuildRequest{Graph: graph}) panic("implement me")
if err != nil {
return err
}
c.l.Debug("build started", zap.String("build_id", started.ID.String()))
if err = c.uploadSources(ctx, &graph, started); err != nil {
return err
}
uploadDone := &api.SignalRequest{UploadDone: &api.UploadDone{}}
_, err = c.client.SignalBuild(ctx, started.ID, uploadDone)
if err != nil {
return err
}
for {
u, err := r.Next()
if err == io.EOF {
return fmt.Errorf("unexpected end of status stream")
} else if err != nil {
return err
}
c.l.Debug("received status update", zap.String("build_id", started.ID.String()), zap.Any("update", u))
switch {
case u.BuildFailed != nil:
return fmt.Errorf("build failed: %s", u.BuildFailed.Error)
case u.BuildFinished != nil:
return nil
case u.JobFinished != nil:
jf := u.JobFinished
if jf.Stdout != nil {
if err := lsn.OnJobStdout(jf.ID, jf.Stdout); err != nil {
return err
}
}
if jf.Stderr != nil {
if err := lsn.OnJobStderr(jf.ID, jf.Stderr); err != nil {
return err
}
}
if jf.Error != nil {
if err := lsn.OnJobFailed(jf.ID, jf.ExitCode, *jf.Error); err != nil {
return err
}
} else {
if err := lsn.OnJobFinished(jf.ID); err != nil {
return err
}
}
default:
return fmt.Errorf("build failed: unexpected status update")
}
}
} }

View file

@ -1,119 +0,0 @@
package dist
import (
"context"
"fmt"
"go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/api"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
type Build struct {
ID build.ID
Graph *build.Graph
reverseFiles map[string]build.ID
l *zap.Logger
c *Coordinator
uploadDone chan struct{}
}
func NewBuild(graph *build.Graph, c *Coordinator) *Build {
id := build.NewID()
return &Build{
ID: id,
Graph: graph,
reverseFiles: make(map[string]build.ID),
l: c.log.With(zap.String("build_id", id.String())),
c: c,
uploadDone: make(chan struct{}),
}
}
func (b *Build) missingFiles() []build.ID {
var files []build.ID
for id, path := range b.Graph.SourceFiles {
files = append(files, id)
b.reverseFiles[path] = id
}
return files
}
func (b *Build) Run(ctx context.Context, w api.StatusWriter) error {
if err := w.Started(&api.BuildStarted{ID: b.ID, MissingFiles: b.missingFiles()}); err != nil {
return err
}
b.l.Debug("waiting for file upload")
select {
case <-ctx.Done():
return ctx.Err()
case <-b.uploadDone:
}
b.l.Debug("file upload completed")
for _, job := range b.Graph.Jobs {
spec := api.JobSpec{
Job: job,
SourceFiles: make(map[build.ID]string),
Artifacts: make(map[build.ID]api.WorkerID),
}
for _, file := range job.Inputs {
spec.SourceFiles[b.reverseFiles[file]] = file
}
for _, id := range job.Deps {
workerID, ok := b.c.scheduler.LocateArtifact(id)
if !ok {
return fmt.Errorf("artifact %q is missing in cache", id)
}
spec.Artifacts[id] = workerID
}
s := b.c.scheduler.ScheduleJob(&spec)
select {
case <-ctx.Done():
return ctx.Err()
case <-s.Finished:
}
b.l.Debug("job finished", zap.String("job_id", job.ID.String()))
jobFinished := api.StatusUpdate{JobFinished: s.Result}
if err := w.Updated(&jobFinished); err != nil {
return err
}
}
finished := api.StatusUpdate{BuildFinished: &api.BuildFinished{}}
return w.Updated(&finished)
}
func (b *Build) Signal(ctx context.Context, req *api.SignalRequest) (*api.SignalResponse, error) {
switch {
case req.UploadDone != nil:
select {
case <-b.uploadDone:
return nil, fmt.Errorf("upload already done")
default:
close(b.uploadDone)
}
default:
return nil, fmt.Errorf("unexpected signal kind")
}
return &api.SignalResponse{}, nil
}

View file

@ -1,28 +1,18 @@
// +build !solution
package dist package dist
import ( import (
"context"
"fmt"
"net/http" "net/http"
"sync"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/api"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache" "gitlab.com/slon/shad-go/distbuild/pkg/filecache"
"gitlab.com/slon/shad-go/distbuild/pkg/scheduler" "gitlab.com/slon/shad-go/distbuild/pkg/scheduler"
) )
type Coordinator struct { type Coordinator struct {
log *zap.Logger
mux *http.ServeMux
fileCache *filecache.Cache
mu sync.Mutex
builds map[build.ID]*Build
scheduler *scheduler.Scheduler
} }
var defaultConfig = scheduler.Config{ var defaultConfig = scheduler.Config{
@ -34,87 +24,9 @@ func NewCoordinator(
log *zap.Logger, log *zap.Logger,
fileCache *filecache.Cache, fileCache *filecache.Cache,
) *Coordinator { ) *Coordinator {
c := &Coordinator{ panic("implement me")
log: log,
mux: http.NewServeMux(),
fileCache: fileCache,
builds: make(map[build.ID]*Build),
scheduler: scheduler.NewScheduler(log, defaultConfig),
}
apiHandler := api.NewBuildService(log, c)
apiHandler.Register(c.mux)
heartbeatHandler := api.NewHeartbeatHandler(log, c)
heartbeatHandler.Register(c.mux)
fileHandler := filecache.NewHandler(log, c.fileCache)
fileHandler.Register(c.mux)
return c
} }
func (c *Coordinator) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (c *Coordinator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c.mux.ServeHTTP(w, r) panic("implement me")
}
func (c *Coordinator) addBuild(b *Build) {
c.mu.Lock()
defer c.mu.Unlock()
c.builds[b.ID] = b
}
func (c *Coordinator) removeBuild(b *Build) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.builds, b.ID)
}
func (c *Coordinator) getBuild(id build.ID) *Build {
c.mu.Lock()
defer c.mu.Unlock()
return c.builds[id]
}
func (c *Coordinator) StartBuild(ctx context.Context, req *api.BuildRequest, w api.StatusWriter) error {
b := NewBuild(&req.Graph, c)
c.addBuild(b)
defer c.removeBuild(b)
return b.Run(ctx, w)
}
func (c *Coordinator) SignalBuild(ctx context.Context, buildID build.ID, signal *api.SignalRequest) (*api.SignalResponse, error) {
b := c.getBuild(buildID)
if b == nil {
return nil, fmt.Errorf("build %q not found", buildID)
}
return b.Signal(ctx, signal)
}
func (c *Coordinator) Heartbeat(ctx context.Context, req *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {
c.scheduler.RegisterWorker(req.WorkerID)
for _, job := range req.FinishedJob {
job := job
c.scheduler.OnJobComplete(req.WorkerID, job.ID, &job)
}
rsp := &api.HeartbeatResponse{
JobsToRun: map[build.ID]api.JobSpec{},
}
job := c.scheduler.PickJob(ctx, req.WorkerID)
if job != nil {
rsp.JobsToRun[job.Job.ID] = *job.Job
}
return rsp, nil
} }

View file

@ -1,12 +1,9 @@
// +build !solution
package filecache package filecache
import ( import (
"context" "context"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"go.uber.org/zap" "go.uber.org/zap"
@ -14,70 +11,16 @@ import (
) )
type Client struct { type Client struct {
l *zap.Logger
endpoint string
} }
func NewClient(l *zap.Logger, endpoint string) *Client { func NewClient(l *zap.Logger, endpoint string) *Client {
return &Client{ panic("implement me")
l: l,
endpoint: endpoint,
}
} }
func (c *Client) Upload(ctx context.Context, id build.ID, localPath string) error { func (c *Client) Upload(ctx context.Context, id build.ID, localPath string) error {
f, err := os.Open(localPath) panic("implement me")
if err != nil {
return err
}
defer f.Close()
req, err := http.NewRequestWithContext(ctx, http.MethodPut, c.endpoint+"/file?id="+id.String(), f)
if err != nil {
return err
}
rsp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer rsp.Body.Close()
if rsp.StatusCode != 200 {
errStr, _ := ioutil.ReadAll(rsp.Body)
return fmt.Errorf("file upload: %s", errStr)
}
return nil
} }
func (c *Client) Download(ctx context.Context, localCache *Cache, id build.ID) error { func (c *Client) Download(ctx context.Context, localCache *Cache, id build.ID) error {
w, abort, err := localCache.Write(id) panic("implement me")
if err != nil {
return err
}
defer func() { _ = abort() }()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.endpoint+"/file?id="+id.String(), nil)
if err != nil {
return err
}
rsp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer rsp.Body.Close()
if rsp.StatusCode != 200 {
errStr, _ := ioutil.ReadAll(rsp.Body)
return fmt.Errorf("file upload: %s", errStr)
}
_, err = io.Copy(w, rsp.Body)
if err != nil {
return err
}
return w.Close()
} }

View file

@ -1,12 +1,11 @@
// +build !solution
package filecache package filecache
import ( import (
"errors" "errors"
"io" "io"
"os"
"path/filepath"
"gitlab.com/slon/shad-go/distbuild/pkg/artifact"
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
) )
@ -17,95 +16,25 @@ var (
ErrReadLocked = errors.New("file is locked for read") ErrReadLocked = errors.New("file is locked for read")
) )
const fileName = "file"
func convertErr(err error) error {
switch {
case errors.Is(err, artifact.ErrNotFound):
return ErrNotFound
case errors.Is(err, artifact.ErrExists):
return ErrExists
case errors.Is(err, artifact.ErrWriteLocked):
return ErrWriteLocked
case errors.Is(err, artifact.ErrReadLocked):
return ErrReadLocked
default:
return err
}
}
type Cache struct { type Cache struct {
cache *artifact.Cache
} }
func New(rootDir string) (*Cache, error) { func New(rootDir string) (*Cache, error) {
cache, err := artifact.NewCache(rootDir) panic("implement me")
if err != nil {
return nil, err
}
c := &Cache{cache: cache}
return c, nil
} }
func (c *Cache) Range(fileFn func(file build.ID) error) error { func (c *Cache) Range(fileFn func(file build.ID) error) error {
return c.cache.Range(fileFn) panic("implement me")
} }
func (c *Cache) Remove(file build.ID) error { func (c *Cache) Remove(file build.ID) error {
return convertErr(c.cache.Remove(file)) panic("implement me")
}
type fileWriter struct {
f *os.File
commit func() error
}
func (f *fileWriter) Write(p []byte) (int, error) {
return f.f.Write(p)
}
func (f *fileWriter) Close() error {
closeErr := f.f.Close()
commitErr := f.commit()
if closeErr != nil {
return closeErr
}
return commitErr
} }
func (c *Cache) Write(file build.ID) (w io.WriteCloser, abort func() error, err error) { func (c *Cache) Write(file build.ID) (w io.WriteCloser, abort func() error, err error) {
path, commit, abortDir, err := c.cache.Create(file) panic("implement me")
if err != nil {
err = convertErr(err)
return
}
f, err := os.Create(filepath.Join(path, fileName))
if err != nil {
_ = abort()
return
}
w = &fileWriter{f: f, commit: commit}
abort = func() error {
closeErr := f.Close()
abortErr := abortDir()
if closeErr != nil {
return closeErr
}
return abortErr
}
return
} }
func (c *Cache) Get(file build.ID) (path string, unlock func(), err error) { func (c *Cache) Get(file build.ID) (path string, unlock func(), err error) {
root, unlock, err := c.cache.Get(file) panic("implement me")
path = filepath.Join(root, fileName)
err = convertErr(err)
return
} }

View file

@ -1,99 +1,20 @@
// +build !solution
package filecache package filecache
import ( import (
"errors"
"fmt"
"io"
"net/http" "net/http"
"os"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/singleflight"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
) )
type Handler struct { type Handler struct {
l *zap.Logger
cache *Cache
single singleflight.Group
} }
func NewHandler(l *zap.Logger, cache *Cache) *Handler { func NewHandler(l *zap.Logger, cache *Cache) *Handler {
return &Handler{ panic("implement me")
l: l,
cache: cache,
}
} }
func (h *Handler) Register(mux *http.ServeMux) { func (h *Handler) Register(mux *http.ServeMux) {
mux.HandleFunc("/file", h.file) panic("implement me")
}
func (h *Handler) doGet(w http.ResponseWriter, r *http.Request, id build.ID) error {
path, unlock, err := h.cache.Get(id)
if err != nil {
return err
}
defer unlock()
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
if _, err = io.Copy(w, f); err != nil {
h.l.Warn("error streaming file", zap.Error(err))
}
h.l.Debug("file download complete", zap.String("id", id.String()))
return nil
}
func (h *Handler) doPut(w http.ResponseWriter, r *http.Request, id build.ID) error {
_, err, _ := h.single.Do(id.String(), func() (interface{}, error) {
w, abort, err := h.cache.Write(id)
if errors.Is(err, ErrExists) {
return nil, nil
} else if err != nil {
return nil, err
}
defer func() { _ = abort() }()
if _, err = io.Copy(w, r.Body); err != nil {
return nil, err
}
return nil, w.Close()
})
if err != nil {
return err
}
w.WriteHeader(http.StatusOK)
h.l.Debug("file upload complete", zap.String("id", id.String()))
return nil
}
func (h *Handler) file(w http.ResponseWriter, r *http.Request) {
var id build.ID
err := id.UnmarshalText([]byte(r.URL.Query().Get("id")))
if err == nil {
switch r.Method {
case http.MethodGet:
err = h.doGet(w, r, id)
case http.MethodPut:
err = h.doPut(w, r, id)
default:
err = fmt.Errorf("filehandler: unsupported method %s", r.Method)
}
}
if err != nil {
h.l.Warn("file error", zap.String("method", r.Method), zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "%v", err)
}
} }

View file

@ -1,8 +1,9 @@
// +build !solution
package scheduler package scheduler
import ( import (
"context" "context"
"sync"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
@ -11,43 +12,12 @@ import (
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
) )
var timeAfter = time.After
type PendingJob struct { type PendingJob struct {
Job *api.JobSpec Job *api.JobSpec
Finished chan struct{} Finished chan struct{}
Result *api.JobResult Result *api.JobResult
mu sync.Mutex
pickedUp chan struct{}
}
func (p *PendingJob) finish(res *api.JobResult) {
p.Result = res
close(p.Finished)
}
func (p *PendingJob) pickUp() bool {
p.mu.Lock()
defer p.mu.Unlock()
select {
case <-p.pickedUp:
return false
default:
close(p.pickedUp)
return true
}
}
func (p *PendingJob) enqueue(q chan *PendingJob) {
select {
case q <- p:
case <-p.pickedUp:
}
}
type workerQueue struct {
cacheQueue chan *PendingJob
depQueue chan *PendingJob
} }
type Config struct { type Config struct {
@ -56,233 +26,28 @@ type Config struct {
} }
type Scheduler struct { type Scheduler struct {
l *zap.Logger
config Config
mu sync.Mutex
cachedJobs map[build.ID]map[api.WorkerID]struct{}
pendingJobs map[build.ID]*PendingJob
pendingJobDeps map[build.ID]map[*PendingJob]struct{}
workerQueue map[api.WorkerID]*workerQueue
globalQueue chan *PendingJob
} }
func NewScheduler(l *zap.Logger, config Config) *Scheduler { func NewScheduler(l *zap.Logger, config Config) *Scheduler {
return &Scheduler{ panic("implement me")
l: l,
config: config,
cachedJobs: make(map[build.ID]map[api.WorkerID]struct{}),
pendingJobs: make(map[build.ID]*PendingJob),
pendingJobDeps: make(map[build.ID]map[*PendingJob]struct{}),
workerQueue: make(map[api.WorkerID]*workerQueue),
globalQueue: make(chan *PendingJob),
}
} }
func (c *Scheduler) LocateArtifact(id build.ID) (api.WorkerID, bool) { func (c *Scheduler) LocateArtifact(id build.ID) (api.WorkerID, bool) {
c.mu.Lock() panic("implement me")
defer c.mu.Unlock()
for id := range c.cachedJobs[id] {
return id, true
}
return "", false
} }
func (c *Scheduler) RegisterWorker(workerID api.WorkerID) { func (c *Scheduler) RegisterWorker(workerID api.WorkerID) {
c.mu.Lock() panic("implement me")
defer c.mu.Unlock()
_, ok := c.workerQueue[workerID]
if ok {
return
}
c.workerQueue[workerID] = &workerQueue{
cacheQueue: make(chan *PendingJob),
depQueue: make(chan *PendingJob),
}
} }
func (c *Scheduler) OnJobComplete(workerID api.WorkerID, jobID build.ID, res *api.JobResult) bool { func (c *Scheduler) OnJobComplete(workerID api.WorkerID, jobID build.ID, res *api.JobResult) bool {
c.l.Debug("job completed", zap.String("worker_id", workerID.String()), zap.String("job_id", jobID.String())) panic("implement me")
c.mu.Lock()
pendingJob, pendingFound := c.pendingJobs[jobID]
if pendingFound {
delete(c.pendingJobs, jobID)
}
job, ok := c.cachedJobs[jobID]
if !ok {
job = make(map[api.WorkerID]struct{})
c.cachedJobs[jobID] = job
}
job[workerID] = struct{}{}
workerQueue := c.workerQueue[workerID]
for waiter := range c.pendingJobDeps[jobID] {
go waiter.enqueue(workerQueue.depQueue)
}
c.mu.Unlock()
if !pendingFound {
return false
}
c.l.Debug("finishing pending job", zap.String("job_id", jobID.String()))
pendingJob.finish(res)
return true
}
func (c *Scheduler) enqueueCacheLocal(job *PendingJob) bool {
cached := false
for workerID := range c.cachedJobs[job.Job.ID] {
cached = true
go job.enqueue(c.workerQueue[workerID].cacheQueue)
}
return cached
}
var timeAfter = time.After
func (c *Scheduler) putDepQueue(job *PendingJob, dep build.ID) {
depJobs, ok := c.pendingJobDeps[dep]
if !ok {
depJobs = make(map[*PendingJob]struct{})
c.pendingJobDeps[dep] = depJobs
}
depJobs[job] = struct{}{}
}
func (c *Scheduler) deleteDepQueue(job *PendingJob, dep build.ID) {
depJobs := c.pendingJobDeps[dep]
delete(depJobs, job)
if len(depJobs) == 0 {
delete(c.pendingJobDeps, dep)
}
}
func (c *Scheduler) doScheduleJob(job *PendingJob, cached bool) {
if cached {
select {
case <-job.pickedUp:
c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String()))
return
case <-timeAfter(c.config.CacheTimeout):
}
}
c.mu.Lock()
workers := make(map[api.WorkerID]struct{})
for _, dep := range job.Job.Deps {
c.putDepQueue(job, dep)
for workerID := range c.cachedJobs[dep] {
if _, ok := workers[workerID]; ok {
return
}
go job.enqueue(c.workerQueue[workerID].depQueue)
workers[workerID] = struct{}{}
}
}
c.mu.Unlock()
defer func() {
c.mu.Lock()
defer c.mu.Unlock()
for _, dep := range job.Job.Deps {
c.deleteDepQueue(job, dep)
}
}()
c.l.Debug("job is put into dep-local queues", zap.String("job_id", job.Job.ID.String()))
select {
case <-job.pickedUp:
c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String()))
return
case <-timeAfter(c.config.DepsTimeout):
}
go job.enqueue(c.globalQueue)
c.l.Debug("job is put into global queue", zap.String("job_id", job.Job.ID.String()))
<-job.pickedUp
c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String()))
} }
func (c *Scheduler) ScheduleJob(job *api.JobSpec) *PendingJob { func (c *Scheduler) ScheduleJob(job *api.JobSpec) *PendingJob {
var cached bool panic("implement me")
c.mu.Lock()
pendingJob, running := c.pendingJobs[job.ID]
if !running {
pendingJob = &PendingJob{
Job: job,
Finished: make(chan struct{}),
pickedUp: make(chan struct{}),
}
c.pendingJobs[job.ID] = pendingJob
cached = c.enqueueCacheLocal(pendingJob)
}
c.mu.Unlock()
if !running {
c.l.Debug("job is scheduled", zap.String("job_id", job.ID.String()))
go c.doScheduleJob(pendingJob, cached)
} else {
c.l.Debug("job is pending", zap.String("job_id", job.ID.String()))
}
return pendingJob
} }
func (c *Scheduler) PickJob(ctx context.Context, workerID api.WorkerID) *PendingJob { func (c *Scheduler) PickJob(ctx context.Context, workerID api.WorkerID) *PendingJob {
c.l.Debug("picking next job", zap.String("worker_id", workerID.String())) panic("implement me")
c.mu.Lock()
local := c.workerQueue[workerID]
c.mu.Unlock()
var pg *PendingJob
var queue string
for {
select {
case pg = <-c.globalQueue:
queue = "global"
case pg = <-local.depQueue:
queue = "dep"
case pg = <-local.cacheQueue:
queue = "cache"
case <-ctx.Done():
return nil
}
if pg.pickUp() {
break
}
}
c.l.Debug("picked job",
zap.String("worker_id", workerID.String()),
zap.String("job_id", pg.Job.ID.String()),
zap.String("queue", queue))
return pg
} }

View file

@ -1,98 +1,15 @@
// +build !solution
package tarstream package tarstream
import ( import (
"archive/tar"
"io" "io"
"os"
"path/filepath"
) )
func Send(dir string, w io.Writer) error { func Send(dir string, w io.Writer) error {
tw := tar.NewWriter(w) panic("implement me")
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
rel, err := filepath.Rel(dir, path)
if err != nil {
return err
}
if rel == "." {
return nil
}
switch {
case info.IsDir():
return tw.WriteHeader(&tar.Header{
Name: rel,
Typeflag: tar.TypeDir,
})
default:
h := &tar.Header{
Typeflag: tar.TypeReg,
Name: rel,
Size: info.Size(),
Mode: int64(info.Mode()),
}
if err := tw.WriteHeader(h); err != nil {
return err
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(tw, f)
return err
}
})
if err != nil {
return err
}
return tw.Close()
} }
func Receive(dir string, r io.Reader) error { func Receive(dir string, r io.Reader) error {
tr := tar.NewReader(r) panic("implement me")
for {
h, err := tr.Next()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
absPath := filepath.Join(dir, h.Name)
if h.Typeflag == tar.TypeDir {
if err := os.Mkdir(absPath, 0777); err != nil {
return err
}
} else {
writeFile := func() error {
f, err := os.OpenFile(absPath, os.O_CREATE|os.O_WRONLY, os.FileMode(h.Mode))
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(f, tr)
return err
}
if err := writeFile(); err != nil {
return err
}
}
}
} }

View file

@ -1,45 +0,0 @@
package worker
import (
"context"
"errors"
"gitlab.com/slon/shad-go/distbuild/pkg/api"
"gitlab.com/slon/shad-go/distbuild/pkg/artifact"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache"
)
func (w *Worker) downloadFiles(ctx context.Context, files map[build.ID]string) error {
for id := range files {
_, unlock, err := w.fileCache.Get(id)
if errors.Is(err, filecache.ErrNotFound) {
if err = w.fileClient.Download(ctx, w.fileCache, id); err != nil {
return err
}
} else if err != nil {
return err
} else {
unlock()
}
}
return nil
}
func (w *Worker) downloadArtifacts(ctx context.Context, artifacts map[build.ID]api.WorkerID) error {
for id, worker := range artifacts {
_, unlock, err := w.artifacts.Get(id)
if errors.Is(err, artifact.ErrNotFound) {
if err = artifact.Download(ctx, worker.String(), w.artifacts, id); err != nil {
return err
}
} else if err != nil {
return err
} else {
unlock()
}
}
return nil
}

View file

@ -1,281 +0,0 @@
package worker
import (
"bytes"
"context"
"errors"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/api"
"gitlab.com/slon/shad-go/distbuild/pkg/artifact"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
const (
outputDirName = "output"
srcDirName = "src"
exitCodeFileName = "exit_code"
stdoutFileName = "stdout"
stderrFileName = "stderr"
)
func (w *Worker) getJobFromCache(jobID build.ID) (*api.JobResult, error) {
aRoot, unlock, err := w.artifacts.Get(jobID)
if err != nil {
return nil, err
}
defer unlock()
res := &api.JobResult{
ID: jobID,
}
exitCodeStr, err := ioutil.ReadFile(filepath.Join(aRoot, exitCodeFileName))
if err != nil {
return nil, err
}
res.ExitCode, err = strconv.Atoi(string(exitCodeStr))
if err != nil {
return nil, err
}
res.Stdout, err = ioutil.ReadFile(filepath.Join(aRoot, stdoutFileName))
if err != nil {
return nil, err
}
res.Stderr, err = ioutil.ReadFile(filepath.Join(aRoot, stderrFileName))
if err != nil {
return nil, err
}
return res, nil
}
func executeCmd(ctx context.Context, cmd *build.Cmd) (stdout, stderr []byte, exitCode int, err error) {
var stdoutBuf, stderrBuf bytes.Buffer
if cmd.CatOutput != "" {
err = ioutil.WriteFile(cmd.CatOutput, []byte(cmd.CatTemplate), 0666)
return
}
p := exec.CommandContext(ctx, cmd.Exec[0], cmd.Exec[1:]...)
p.Dir = cmd.WorkingDirectory
p.Env = cmd.Environ
p.Stdout = &stdoutBuf
p.Stderr = &stderrBuf
err = p.Run()
stdout = stdoutBuf.Bytes()
stderr = stderrBuf.Bytes()
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
exitCode = exitErr.ExitCode()
err = nil
}
}
return
}
func (w *Worker) prepareSourceDir(sourceDir string, sourceFiles map[build.ID]string) (unlockSources func(), err error) {
var unlocks []func()
doUnlock := func() {
for _, u := range unlocks {
u()
}
}
defer func() {
if doUnlock != nil {
doUnlock()
}
}()
for id, path := range sourceFiles {
dir, _ := filepath.Split(path)
if dir != "" {
if err := os.MkdirAll(filepath.Join(sourceDir, dir), 0777); err != nil {
return nil, err
}
}
cached, unlock, err := w.fileCache.Get(id)
if err != nil {
return nil, err
}
unlocks = append(unlocks, unlock)
if err := os.Link(cached, filepath.Join(sourceDir, path)); err != nil {
return nil, err
}
}
unlockSources = doUnlock
doUnlock = nil
return
}
func (w *Worker) lockDeps(deps []build.ID) (paths map[build.ID]string, unlockDeps func(), err error) {
var unlocks []func()
doUnlock := func() {
for _, u := range unlocks {
u()
}
}
defer func() {
if doUnlock != nil {
doUnlock()
}
}()
paths = make(map[build.ID]string)
for _, id := range deps {
path, unlock, err := w.artifacts.Get(id)
if err != nil {
return nil, nil, err
}
unlocks = append(unlocks, unlock)
paths[id] = filepath.Join(path, outputDirName)
}
unlockDeps = doUnlock
doUnlock = nil
return
}
func (w *Worker) runJob(ctx context.Context, spec *api.JobSpec) (*api.JobResult, error) {
res, err := w.getJobFromCache(spec.Job.ID)
if err != nil && !errors.Is(err, artifact.ErrNotFound) {
return nil, err
} else if err == nil {
return res, nil
}
if err = w.downloadFiles(ctx, spec.SourceFiles); err != nil {
return nil, err
}
if err = w.downloadArtifacts(ctx, spec.Artifacts); err != nil {
return nil, err
}
aRoot, commit, abort, err := w.artifacts.Create(spec.Job.ID)
if err != nil {
return nil, err
}
defer func() {
if abort == nil {
return
}
if err = abort(); err != nil {
w.log.Warn("error aborting job", zap.Any("job_id", spec.Job.ID), zap.Error(err))
}
}()
outputDir := filepath.Join(aRoot, outputDirName)
if err = os.Mkdir(outputDir, 0777); err != nil {
return nil, err
}
sourceDir := filepath.Join(aRoot, srcDirName)
if err = os.Mkdir(sourceDir, 0777); err != nil {
return nil, err
}
stdoutFile, err := os.Create(filepath.Join(aRoot, stdoutFileName))
if err != nil {
return nil, err
}
defer stdoutFile.Close()
stderrFile, err := os.Create(filepath.Join(aRoot, stderrFileName))
if err != nil {
return nil, err
}
defer stderrFile.Close()
jobContext := build.JobContext{
OutputDir: outputDir,
SourceDir: sourceDir,
}
var unlock []func()
defer func() {
for _, u := range unlock {
u()
}
}()
unlockSourceFiles, err := w.prepareSourceDir(sourceDir, spec.SourceFiles)
if err != nil {
return nil, err
}
unlock = append(unlock, unlockSourceFiles)
deps, unlockDeps, err := w.lockDeps(spec.Job.Deps)
if err != nil {
return nil, err
}
unlock = append(unlock, unlockDeps)
jobContext.Deps = deps
res = &api.JobResult{
ID: spec.Job.ID,
}
for _, cmd := range spec.Job.Cmds {
cmd, err := cmd.Render(jobContext)
if err != nil {
return nil, err
}
stdout, stderr, exitCode, err := executeCmd(ctx, cmd)
if err != nil {
return nil, err
}
res.Stdout = append(res.Stdout, stdout...)
_, err = stdoutFile.Write(stdout)
if err != nil {
return nil, err
}
res.Stderr = append(res.Stderr, stderr...)
_, err = stderrFile.Write(stderr)
if err != nil {
return nil, err
}
if exitCode != 0 {
res.ExitCode = exitCode
break
}
}
if err := ioutil.WriteFile(filepath.Join(aRoot, exitCodeFileName), []byte(strconv.Itoa(res.ExitCode)), 0666); err != nil {
return nil, err
}
abort = nil
if err := commit(); err != nil {
return nil, err
}
return res, nil
}

View file

@ -1,23 +0,0 @@
package worker
import "gitlab.com/slon/shad-go/distbuild/pkg/api"
func (w *Worker) buildHeartbeat() *api.HeartbeatRequest {
w.mu.Lock()
defer w.mu.Unlock()
req := &api.HeartbeatRequest{
WorkerID: w.id,
FinishedJob: w.finishedJobs,
}
w.finishedJobs = nil
return req
}
func (w *Worker) jobFinished(job *api.JobResult) {
w.mu.Lock()
defer w.mu.Unlock()
w.finishedJobs = append(w.finishedJobs, *job)
}

View file

@ -1,39 +1,19 @@
// +build !solution
package worker package worker
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"sync"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/singleflight"
"gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/api"
"gitlab.com/slon/shad-go/distbuild/pkg/artifact" "gitlab.com/slon/shad-go/distbuild/pkg/artifact"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache" "gitlab.com/slon/shad-go/distbuild/pkg/filecache"
) )
type Worker struct { type Worker struct {
id api.WorkerID
coordinatorEndpoint string
log *zap.Logger
fileCache *filecache.Cache
fileClient *filecache.Client
fileOnce singleflight.Group
artifacts *artifact.Cache
mux *http.ServeMux
heartbeat *api.HeartbeatClient
mu sync.Mutex
newArtifacts []build.ID
newSources []build.ID
finishedJobs []api.JobResult
} }
func New( func New(
@ -43,69 +23,13 @@ func New(
fileCache *filecache.Cache, fileCache *filecache.Cache,
artifacts *artifact.Cache, artifacts *artifact.Cache,
) *Worker { ) *Worker {
w := &Worker{ panic("implement me")
id: workerID,
coordinatorEndpoint: coordinatorEndpoint,
log: log,
fileCache: fileCache,
artifacts: artifacts,
fileClient: filecache.NewClient(log, coordinatorEndpoint),
heartbeat: api.NewHeartbeatClient(log, coordinatorEndpoint),
mux: http.NewServeMux(),
}
artifact.NewHandler(w.log, w.artifacts).Register(w.mux)
return w
} }
func (w *Worker) ServeHTTP(rw http.ResponseWriter, r *http.Request) { func (w *Worker) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
w.mux.ServeHTTP(rw, r) panic("implement me")
}
func (w *Worker) recover() error {
return w.artifacts.Range(func(file build.ID) error {
w.newArtifacts = append(w.newArtifacts, file)
return nil
})
} }
func (w *Worker) Run(ctx context.Context) error { func (w *Worker) Run(ctx context.Context) error {
if err := w.recover(); err != nil { panic("implement me")
return err
}
for {
w.log.Debug("sending heartbeat request")
rsp, err := w.heartbeat.Heartbeat(ctx, w.buildHeartbeat())
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
w.log.DPanic("heartbeat failed", zap.Error(err))
continue
}
w.log.Debug("received heartbeat response",
zap.Int("num_jobs", len(rsp.JobsToRun)))
for _, spec := range rsp.JobsToRun {
spec := spec
w.log.Debug("running job", zap.String("job_id", spec.Job.ID.String()))
result, err := w.runJob(ctx, &spec)
if err != nil {
errStr := fmt.Sprintf("job %s failed: %v", spec.Job.ID, err)
w.log.Debug("job failed", zap.String("job_id", spec.Job.ID.String()), zap.Error(err))
w.jobFinished(&api.JobResult{ID: spec.Job.ID, Error: &errStr})
continue
}
w.log.Debug("job finished", zap.String("job_id", spec.Job.ID.String()))
w.jobFinished(result)
}
}
} }