diff --git a/distbuild/pkg/api/build_client.go b/distbuild/pkg/api/build_client.go index 7638bdc..399b6ea 100644 --- a/distbuild/pkg/api/build_client.go +++ b/distbuild/pkg/api/build_client.go @@ -1,13 +1,9 @@ +// +build !solution + package api import ( - "bytes" "context" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "net/http" "go.uber.org/zap" @@ -15,107 +11,16 @@ import ( ) type BuildClient struct { - l *zap.Logger - endpoint string } func NewBuildClient(l *zap.Logger, endpoint string) *BuildClient { - return &BuildClient{ - l: l, - endpoint: endpoint, - } -} - -type statusReader struct { - r io.ReadCloser - dec *json.Decoder -} - -func (r *statusReader) Close() error { - return r.r.Close() -} - -func (r *statusReader) Next() (*StatusUpdate, error) { - var u StatusUpdate - if err := r.dec.Decode(&u); err != nil { - return nil, err - } - return &u, nil + panic("implement me") } func (c *BuildClient) StartBuild(ctx context.Context, request *BuildRequest) (*BuildStarted, StatusReader, error) { - reqJSON, err := json.Marshal(request) - if err != nil { - return nil, nil, err - } - - req, err := http.NewRequest("POST", c.endpoint+"/build", bytes.NewBuffer(reqJSON)) - if err != nil { - return nil, nil, err - } - req.Header.Set("content-type", "application/json") - - rsp, err := http.DefaultClient.Do(req.WithContext(ctx)) - if err != nil { - return nil, nil, err - } - defer func() { - if rsp.Body != nil { - _ = rsp.Body.Close() - } - }() - - if rsp.StatusCode != 200 { - bodyStr, err := ioutil.ReadAll(rsp.Body) - if err != nil { - return nil, nil, fmt.Errorf("build request failed: %v", err) - } - - return nil, nil, fmt.Errorf("build failed: %s", bodyStr) - } - - dec := json.NewDecoder(rsp.Body) - var started BuildStarted - if err := dec.Decode(&started); err != nil { - return nil, nil, err - } - - r := &statusReader{r: rsp.Body, dec: dec} - rsp.Body = nil - return &started, r, nil + panic("implement me") } func (c *BuildClient) SignalBuild(ctx context.Context, buildID build.ID, signal *SignalRequest) (*SignalResponse, error) { - signalJSON, err := json.Marshal(signal) - if err != nil { - return nil, err - } - - req, err := http.NewRequest("POST", c.endpoint+"/signal?build_id="+buildID.String(), bytes.NewBuffer(signalJSON)) - if err != nil { - return nil, err - } - req.Header.Set("content-type", "application/json") - - rsp, err := http.DefaultClient.Do(req.WithContext(ctx)) - if err != nil { - return nil, err - } - defer rsp.Body.Close() - - rspBody, err := ioutil.ReadAll(rsp.Body) - if err != nil { - return nil, fmt.Errorf("signal request failed: %v", err) - } - - if rsp.StatusCode != 200 { - return nil, fmt.Errorf("signal failed: %s", rspBody) - } - - var signalRsp SignalResponse - if err = json.Unmarshal(rspBody, &rsp); err != nil { - return nil, err - } - - return &signalRsp, err + panic("implement me") } diff --git a/distbuild/pkg/api/build_handler.go b/distbuild/pkg/api/build_handler.go index ad4b071..fd08441 100644 --- a/distbuild/pkg/api/build_handler.go +++ b/distbuild/pkg/api/build_handler.go @@ -1,142 +1,20 @@ +// +build !solution + package api import ( - "encoding/json" - "fmt" - "io/ioutil" "net/http" "go.uber.org/zap" - - "gitlab.com/slon/shad-go/distbuild/pkg/build" ) func NewBuildService(l *zap.Logger, s Service) *BuildHandler { - return &BuildHandler{ - l: l, - s: s, - } + panic("implement me") } type BuildHandler struct { - l *zap.Logger - s Service } func (h *BuildHandler) Register(mux *http.ServeMux) { - mux.HandleFunc("/build", h.build) - mux.HandleFunc("/signal", h.signal) -} - -type statusWriter struct { - id build.ID - h *BuildHandler - written bool - w http.ResponseWriter - flush http.Flusher - enc *json.Encoder -} - -func (w *statusWriter) Started(rsp *BuildStarted) error { - w.id = rsp.ID - w.written = true - - w.h.l.Debug("build started", zap.String("build_id", w.id.String()), zap.Any("started", rsp)) - - w.w.Header().Set("content-type", "application/json") - w.w.WriteHeader(http.StatusOK) - - defer w.flush.Flush() - return w.enc.Encode(rsp) -} - -func (w *statusWriter) Updated(update *StatusUpdate) error { - w.h.l.Debug("build updated", zap.String("build_id", w.id.String()), zap.Any("update", update)) - - defer w.flush.Flush() - return w.enc.Encode(update) -} - -func (h *BuildHandler) doBuild(w http.ResponseWriter, r *http.Request) error { - reqJSON, err := ioutil.ReadAll(r.Body) - if err != nil { - return err - } - - var req BuildRequest - if err = json.Unmarshal(reqJSON, &req); err != nil { - return err - } - - flush, ok := w.(http.Flusher) - if !ok { - return fmt.Errorf("response writer does not implement http.Flusher") - } - - sw := &statusWriter{h: h, w: w, enc: json.NewEncoder(w), flush: flush} - err = h.s.StartBuild(r.Context(), &req, sw) - - if err != nil { - if sw.written { - _ = sw.Updated(&StatusUpdate{BuildFailed: &BuildFailed{Error: err.Error()}}) - return nil - } - - return err - } - - return nil -} - -func (h *BuildHandler) build(w http.ResponseWriter, r *http.Request) { - if err := h.doBuild(w, r); err != nil { - w.WriteHeader(http.StatusBadRequest) - _, _ = fmt.Fprintf(w, "%v", err) - } -} - -func (h *BuildHandler) doSignal(w http.ResponseWriter, r *http.Request) error { - buildIDParam := r.URL.Query().Get("build_id") - if buildIDParam == "" { - return fmt.Errorf(`"build_id" parameter is missing`) - } - - var buildID build.ID - if err := buildID.UnmarshalText([]byte(buildIDParam)); err != nil { - return err - } - - reqJSON, err := ioutil.ReadAll(r.Body) - if err != nil { - return err - } - - var req SignalRequest - if err = json.Unmarshal(reqJSON, &req); err != nil { - return err - } - - rsp, err := h.s.SignalBuild(r.Context(), buildID, &req) - if err != nil { - return err - } - - rspJSON, err := json.Marshal(rsp) - if err != nil { - return err - } - - w.Header().Set("content-type", "application/json") - w.WriteHeader(http.StatusOK) - _, _ = w.Write(rspJSON) - return nil -} - -func (h *BuildHandler) signal(w http.ResponseWriter, r *http.Request) { - if err := h.doSignal(w, r); err != nil { - h.l.Warn("build signal failed", zap.Error(err)) - - w.WriteHeader(http.StatusBadRequest) - _, _ = fmt.Fprintf(w, "%v", err) - } + panic("implement me") } diff --git a/distbuild/pkg/api/heartbeat_client.go b/distbuild/pkg/api/heartbeat_client.go index b48f398..53f154e 100644 --- a/distbuild/pkg/api/heartbeat_client.go +++ b/distbuild/pkg/api/heartbeat_client.go @@ -1,56 +1,20 @@ +// +build !solution + package api import ( - "bytes" "context" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" "go.uber.org/zap" ) type HeartbeatClient struct { - l *zap.Logger - endpoint string } func NewHeartbeatClient(l *zap.Logger, endpoint string) *HeartbeatClient { - return &HeartbeatClient{l: l, endpoint: endpoint} + panic("implement me") } func (c *HeartbeatClient) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) { - reqJSON, err := json.Marshal(req) - if err != nil { - return nil, err - } - - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint+"/heartbeat", bytes.NewBuffer(reqJSON)) - if err != nil { - return nil, err - } - - httpRsp, err := http.DefaultClient.Do(httpReq) - if err != nil { - return nil, err - } - defer httpRsp.Body.Close() - - if httpRsp.StatusCode != http.StatusOK { - errorMsg, _ := ioutil.ReadAll(httpRsp.Body) - return nil, fmt.Errorf("heartbeat failed: %s", errorMsg) - } - - rspJSON, err := ioutil.ReadAll(httpRsp.Body) - if err != nil { - return nil, err - } - - rsp := &HeartbeatResponse{} - if err = json.Unmarshal(rspJSON, rsp); err != nil { - return nil, err - } - - return rsp, nil + panic("implement me") } diff --git a/distbuild/pkg/api/heartbeat_handler.go b/distbuild/pkg/api/heartbeat_handler.go index c335a54..8f4a4b7 100644 --- a/distbuild/pkg/api/heartbeat_handler.go +++ b/distbuild/pkg/api/heartbeat_handler.go @@ -1,60 +1,20 @@ +// +build !solution + package api import ( - "encoding/json" - "fmt" - "io/ioutil" "net/http" "go.uber.org/zap" ) type HeartbeatHandler struct { - l *zap.Logger - s HeartbeatService } func NewHeartbeatHandler(l *zap.Logger, s HeartbeatService) *HeartbeatHandler { - return &HeartbeatHandler{l: l, s: s} + panic("implement me") } func (h *HeartbeatHandler) Register(mux *http.ServeMux) { - mux.HandleFunc("/heartbeat", h.heartbeat) -} - -func (h *HeartbeatHandler) doHeartbeat(w http.ResponseWriter, r *http.Request) error { - reqJSON, err := ioutil.ReadAll(r.Body) - if err != nil { - return err - } - - var req HeartbeatRequest - if err = json.Unmarshal(reqJSON, &req); err != nil { - return err - } - - h.l.Debug("heartbeat started", zap.Any("req", req)) - rsp, err := h.s.Heartbeat(r.Context(), &req) - if err != nil { - return err - } - h.l.Debug("heartbeat finished", zap.Any("rsp", rsp)) - - rspJSON, err := json.Marshal(rsp) - if err != nil { - return err - } - - w.WriteHeader(http.StatusOK) - _, _ = w.Write(rspJSON) - return nil -} - -func (h *HeartbeatHandler) heartbeat(w http.ResponseWriter, r *http.Request) { - if err := h.doHeartbeat(w, r); err != nil { - h.l.Warn("heartbeat error", zap.Error(err)) - - w.WriteHeader(http.StatusBadRequest) - _, _ = fmt.Fprintf(w, "%v", err) - } + panic("implement me") } diff --git a/distbuild/pkg/artifact/cache.go b/distbuild/pkg/artifact/cache.go index 5ad9a1a..eaa3b8f 100644 --- a/distbuild/pkg/artifact/cache.go +++ b/distbuild/pkg/artifact/cache.go @@ -1,13 +1,9 @@ +// +build !solution + package artifact import ( - "encoding/hex" "errors" - "fmt" - "io/ioutil" - "os" - "path/filepath" - "sync" "gitlab.com/slon/shad-go/distbuild/pkg/build" ) @@ -20,172 +16,24 @@ var ( ) type Cache struct { - tmpDir string - cacheDir string - - mu sync.Mutex - writeLocked map[build.ID]struct{} - readLocked map[build.ID]int } func NewCache(root string) (*Cache, error) { - tmpDir := filepath.Join(root, "tmp") - - if err := os.RemoveAll(tmpDir); err != nil { - return nil, err - } - if err := os.MkdirAll(tmpDir, 0777); err != nil { - return nil, err - } - - cacheDir := filepath.Join(root, "c") - if err := os.MkdirAll(cacheDir, 0777); err != nil { - return nil, err - } - - for i := 0; i < 256; i++ { - d := hex.EncodeToString([]byte{uint8(i)}) - if err := os.MkdirAll(filepath.Join(cacheDir, d), 0777); err != nil { - return nil, err - } - } - - return &Cache{ - tmpDir: tmpDir, - cacheDir: cacheDir, - writeLocked: make(map[build.ID]struct{}), - readLocked: make(map[build.ID]int), - }, nil -} - -func (c *Cache) readLock(id build.ID) error { - c.mu.Lock() - defer c.mu.Unlock() - - if _, ok := c.writeLocked[id]; ok { - return ErrWriteLocked - } - - c.readLocked[id]++ - return nil -} - -func (c *Cache) readUnlock(id build.ID) { - c.mu.Lock() - defer c.mu.Unlock() - - c.readLocked[id]-- - if c.readLocked[id] == 0 { - delete(c.readLocked, id) - } -} - -func (c *Cache) writeLock(id build.ID, remove bool) error { - c.mu.Lock() - defer c.mu.Unlock() - - _, err := os.Stat(filepath.Join(c.cacheDir, id.Path())) - if !os.IsNotExist(err) && err != nil { - return err - } else if err == nil && !remove { - return ErrExists - } - - if _, ok := c.writeLocked[id]; ok { - return ErrWriteLocked - } - if c.readLocked[id] > 0 { - return ErrReadLocked - } - - c.writeLocked[id] = struct{}{} - return nil -} - -func (c *Cache) writeUnlock(id build.ID) { - c.mu.Lock() - defer c.mu.Unlock() - - delete(c.writeLocked, id) + panic("implement me") } func (c *Cache) Range(artifactFn func(artifact build.ID) error) error { - shards, err := ioutil.ReadDir(c.cacheDir) - if err != nil { - return err - } - - for _, shard := range shards { - dirs, err := ioutil.ReadDir(filepath.Join(c.cacheDir, shard.Name())) - if err != nil { - return err - } - - for _, d := range dirs { - var id build.ID - if err := id.UnmarshalText([]byte(d.Name())); err != nil { - return fmt.Errorf("invalid artifact name: %w", err) - } - - if err := artifactFn(id); err != nil { - return err - } - } - } - - return nil + panic("implement me") } func (c *Cache) Remove(artifact build.ID) error { - if err := c.writeLock(artifact, true); err != nil { - return err - } - defer c.writeUnlock(artifact) - - return os.RemoveAll(filepath.Join(c.cacheDir, artifact.Path())) + panic("implement me") } func (c *Cache) Create(artifact build.ID) (path string, commit, abort func() error, err error) { - if err = c.writeLock(artifact, false); err != nil { - return - } - - path = filepath.Join(c.tmpDir, artifact.String()) - if err = os.MkdirAll(path, 0777); err != nil { - c.writeUnlock(artifact) - return - } - - abort = func() error { - defer c.writeUnlock(artifact) - return os.RemoveAll(path) - } - - commit = func() error { - defer c.writeUnlock(artifact) - return os.Rename(path, filepath.Join(c.cacheDir, artifact.Path())) - } - - return + panic("implement me") } func (c *Cache) Get(artifact build.ID) (path string, unlock func(), err error) { - if err = c.readLock(artifact); err != nil { - return - } - - path = filepath.Join(c.cacheDir, artifact.Path()) - if _, err = os.Stat(path); err != nil { - c.readUnlock(artifact) - - if os.IsNotExist(err) { - err = ErrNotFound - } - return - } - - unlock = func() { - c.readUnlock(artifact) - } - return + panic("implement me") } diff --git a/distbuild/pkg/artifact/client.go b/distbuild/pkg/artifact/client.go index 4ad79e7..7b77298 100644 --- a/distbuild/pkg/artifact/client.go +++ b/distbuild/pkg/artifact/client.go @@ -1,42 +1,14 @@ +// +build !solution + package artifact import ( "context" - "fmt" - "io/ioutil" - "net/http" "gitlab.com/slon/shad-go/distbuild/pkg/build" - "gitlab.com/slon/shad-go/distbuild/pkg/tarstream" ) // Download artifact from remote cache into local cache. func Download(ctx context.Context, endpoint string, c *Cache, artifactID build.ID) error { - dir, commit, abort, err := c.Create(artifactID) - if err != nil { - return err - } - defer func() { _ = abort() }() - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint+"/artifact?id="+artifactID.String(), nil) - if err != nil { - return err - } - - rsp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer rsp.Body.Close() - - if rsp.StatusCode != http.StatusOK { - errStr, _ := ioutil.ReadAll(rsp.Body) - return fmt.Errorf("download: %s", errStr) - } - - if err := tarstream.Receive(dir, rsp.Body); err != nil { - return err - } - - return commit() + panic("implement me") } diff --git a/distbuild/pkg/artifact/handler.go b/distbuild/pkg/artifact/handler.go index 3b411ac..8997520 100644 --- a/distbuild/pkg/artifact/handler.go +++ b/distbuild/pkg/artifact/handler.go @@ -1,54 +1,20 @@ +// +build !solution + package artifact import ( - "fmt" "net/http" "go.uber.org/zap" - - "gitlab.com/slon/shad-go/distbuild/pkg/build" - "gitlab.com/slon/shad-go/distbuild/pkg/tarstream" ) type Handler struct { - l *zap.Logger - c *Cache } func NewHandler(l *zap.Logger, c *Cache) *Handler { - return &Handler{l: l, c: c} + panic("implement me") } func (h *Handler) Register(mux *http.ServeMux) { - mux.HandleFunc("/artifact", h.artifact) -} - -func (h *Handler) doArtifact(w http.ResponseWriter, r *http.Request) error { - idStr := r.URL.Query().Get("id") - - var id build.ID - if err := id.UnmarshalText([]byte(idStr)); err != nil { - return err - } - - h.l.Debug("streaming artifact", zap.String("artifact_id", id.String())) - artifactDir, unlock, err := h.c.Get(id) - if err != nil { - return err - } - defer unlock() - - w.WriteHeader(http.StatusOK) - if err := tarstream.Send(artifactDir, w); err != nil { - h.l.Warn("error streaming artifact", zap.Error(err)) - } - return nil -} - -func (h *Handler) artifact(w http.ResponseWriter, r *http.Request) { - if err := h.doArtifact(w, r); err != nil { - h.l.Warn("artifact handler error", zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - _, _ = fmt.Fprintf(w, "%v", err) - } + panic("implement me") } diff --git a/distbuild/pkg/client/build.go b/distbuild/pkg/client/build.go index 92943a7..0f77a32 100644 --- a/distbuild/pkg/client/build.go +++ b/distbuild/pkg/client/build.go @@ -1,23 +1,16 @@ +// +build !solution + package client import ( "context" - "fmt" - "io" - "path/filepath" "go.uber.org/zap" - "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/build" - "gitlab.com/slon/shad-go/distbuild/pkg/filecache" ) type Client struct { - l *zap.Logger - client *api.BuildClient - cache *filecache.Client - sourceDir string } func NewClient( @@ -25,12 +18,7 @@ func NewClient( apiEndpoint string, sourceDir string, ) *Client { - return &Client{ - l: l, - client: api.NewBuildClient(l, apiEndpoint), - cache: filecache.NewClient(l, apiEndpoint), - sourceDir: sourceDir, - } + panic("implement me") } type BuildListener interface { @@ -41,84 +29,6 @@ type BuildListener interface { OnJobFailed(jobID build.ID, code int, error string) error } -func (c *Client) uploadSources(ctx context.Context, graph *build.Graph, started *api.BuildStarted) error { - for _, id := range started.MissingFiles { - c.l.Debug("uploading missing file to coordinator", zap.String("id", id.String())) - - path, ok := graph.SourceFiles[id] - if !ok { - return fmt.Errorf("file is missing in build graph: id=%s", id) - } - - absPath := filepath.Join(c.sourceDir, path) - if err := c.cache.Upload(ctx, id, absPath); err != nil { - return err - } - } - - return nil -} - func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener) error { - started, r, err := c.client.StartBuild(ctx, &api.BuildRequest{Graph: graph}) - if err != nil { - return err - } - - c.l.Debug("build started", zap.String("build_id", started.ID.String())) - if err = c.uploadSources(ctx, &graph, started); err != nil { - return err - } - - uploadDone := &api.SignalRequest{UploadDone: &api.UploadDone{}} - _, err = c.client.SignalBuild(ctx, started.ID, uploadDone) - if err != nil { - return err - } - - for { - u, err := r.Next() - if err == io.EOF { - return fmt.Errorf("unexpected end of status stream") - } else if err != nil { - return err - } - - c.l.Debug("received status update", zap.String("build_id", started.ID.String()), zap.Any("update", u)) - switch { - case u.BuildFailed != nil: - return fmt.Errorf("build failed: %s", u.BuildFailed.Error) - - case u.BuildFinished != nil: - return nil - - case u.JobFinished != nil: - jf := u.JobFinished - - if jf.Stdout != nil { - if err := lsn.OnJobStdout(jf.ID, jf.Stdout); err != nil { - return err - } - } - - if jf.Stderr != nil { - if err := lsn.OnJobStderr(jf.ID, jf.Stderr); err != nil { - return err - } - } - - if jf.Error != nil { - if err := lsn.OnJobFailed(jf.ID, jf.ExitCode, *jf.Error); err != nil { - return err - } - } else { - if err := lsn.OnJobFinished(jf.ID); err != nil { - return err - } - } - - default: - return fmt.Errorf("build failed: unexpected status update") - } - } + panic("implement me") } diff --git a/distbuild/pkg/dist/build.go b/distbuild/pkg/dist/build.go deleted file mode 100644 index 9468f12..0000000 --- a/distbuild/pkg/dist/build.go +++ /dev/null @@ -1,119 +0,0 @@ -package dist - -import ( - "context" - "fmt" - - "go.uber.org/zap" - - "gitlab.com/slon/shad-go/distbuild/pkg/api" - "gitlab.com/slon/shad-go/distbuild/pkg/build" -) - -type Build struct { - ID build.ID - Graph *build.Graph - - reverseFiles map[string]build.ID - - l *zap.Logger - c *Coordinator - uploadDone chan struct{} -} - -func NewBuild(graph *build.Graph, c *Coordinator) *Build { - id := build.NewID() - - return &Build{ - ID: id, - Graph: graph, - - reverseFiles: make(map[string]build.ID), - - l: c.log.With(zap.String("build_id", id.String())), - c: c, - uploadDone: make(chan struct{}), - } -} - -func (b *Build) missingFiles() []build.ID { - var files []build.ID - - for id, path := range b.Graph.SourceFiles { - files = append(files, id) - b.reverseFiles[path] = id - } - - return files -} - -func (b *Build) Run(ctx context.Context, w api.StatusWriter) error { - if err := w.Started(&api.BuildStarted{ID: b.ID, MissingFiles: b.missingFiles()}); err != nil { - return err - } - - b.l.Debug("waiting for file upload") - select { - case <-ctx.Done(): - return ctx.Err() - - case <-b.uploadDone: - } - b.l.Debug("file upload completed") - - for _, job := range b.Graph.Jobs { - spec := api.JobSpec{ - Job: job, - SourceFiles: make(map[build.ID]string), - Artifacts: make(map[build.ID]api.WorkerID), - } - - for _, file := range job.Inputs { - spec.SourceFiles[b.reverseFiles[file]] = file - } - - for _, id := range job.Deps { - workerID, ok := b.c.scheduler.LocateArtifact(id) - if !ok { - return fmt.Errorf("artifact %q is missing in cache", id) - } - - spec.Artifacts[id] = workerID - } - - s := b.c.scheduler.ScheduleJob(&spec) - - select { - case <-ctx.Done(): - return ctx.Err() - case <-s.Finished: - } - - b.l.Debug("job finished", zap.String("job_id", job.ID.String())) - - jobFinished := api.StatusUpdate{JobFinished: s.Result} - if err := w.Updated(&jobFinished); err != nil { - return err - } - } - - finished := api.StatusUpdate{BuildFinished: &api.BuildFinished{}} - return w.Updated(&finished) -} - -func (b *Build) Signal(ctx context.Context, req *api.SignalRequest) (*api.SignalResponse, error) { - switch { - case req.UploadDone != nil: - select { - case <-b.uploadDone: - return nil, fmt.Errorf("upload already done") - default: - close(b.uploadDone) - } - - default: - return nil, fmt.Errorf("unexpected signal kind") - } - - return &api.SignalResponse{}, nil -} diff --git a/distbuild/pkg/dist/coordinator.go b/distbuild/pkg/dist/coordinator.go index 46e69c0..a49b807 100644 --- a/distbuild/pkg/dist/coordinator.go +++ b/distbuild/pkg/dist/coordinator.go @@ -1,28 +1,18 @@ +// +build !solution + package dist import ( - "context" - "fmt" "net/http" - "sync" "time" "go.uber.org/zap" - "gitlab.com/slon/shad-go/distbuild/pkg/api" - "gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/filecache" "gitlab.com/slon/shad-go/distbuild/pkg/scheduler" ) type Coordinator struct { - log *zap.Logger - mux *http.ServeMux - fileCache *filecache.Cache - - mu sync.Mutex - builds map[build.ID]*Build - scheduler *scheduler.Scheduler } var defaultConfig = scheduler.Config{ @@ -34,87 +24,9 @@ func NewCoordinator( log *zap.Logger, fileCache *filecache.Cache, ) *Coordinator { - c := &Coordinator{ - log: log, - mux: http.NewServeMux(), - fileCache: fileCache, - - builds: make(map[build.ID]*Build), - scheduler: scheduler.NewScheduler(log, defaultConfig), - } - - apiHandler := api.NewBuildService(log, c) - apiHandler.Register(c.mux) - - heartbeatHandler := api.NewHeartbeatHandler(log, c) - heartbeatHandler.Register(c.mux) - - fileHandler := filecache.NewHandler(log, c.fileCache) - fileHandler.Register(c.mux) - - return c + panic("implement me") } func (c *Coordinator) ServeHTTP(w http.ResponseWriter, r *http.Request) { - c.mux.ServeHTTP(w, r) -} - -func (c *Coordinator) addBuild(b *Build) { - c.mu.Lock() - defer c.mu.Unlock() - - c.builds[b.ID] = b -} - -func (c *Coordinator) removeBuild(b *Build) { - c.mu.Lock() - defer c.mu.Unlock() - - delete(c.builds, b.ID) -} - -func (c *Coordinator) getBuild(id build.ID) *Build { - c.mu.Lock() - defer c.mu.Unlock() - - return c.builds[id] -} - -func (c *Coordinator) StartBuild(ctx context.Context, req *api.BuildRequest, w api.StatusWriter) error { - b := NewBuild(&req.Graph, c) - - c.addBuild(b) - defer c.removeBuild(b) - - return b.Run(ctx, w) -} - -func (c *Coordinator) SignalBuild(ctx context.Context, buildID build.ID, signal *api.SignalRequest) (*api.SignalResponse, error) { - b := c.getBuild(buildID) - if b == nil { - return nil, fmt.Errorf("build %q not found", buildID) - } - - return b.Signal(ctx, signal) -} - -func (c *Coordinator) Heartbeat(ctx context.Context, req *api.HeartbeatRequest) (*api.HeartbeatResponse, error) { - c.scheduler.RegisterWorker(req.WorkerID) - - for _, job := range req.FinishedJob { - job := job - - c.scheduler.OnJobComplete(req.WorkerID, job.ID, &job) - } - - rsp := &api.HeartbeatResponse{ - JobsToRun: map[build.ID]api.JobSpec{}, - } - - job := c.scheduler.PickJob(ctx, req.WorkerID) - if job != nil { - rsp.JobsToRun[job.Job.ID] = *job.Job - } - - return rsp, nil + panic("implement me") } diff --git a/distbuild/pkg/filecache/client.go b/distbuild/pkg/filecache/client.go index 3e82e97..497c894 100644 --- a/distbuild/pkg/filecache/client.go +++ b/distbuild/pkg/filecache/client.go @@ -1,12 +1,9 @@ +// +build !solution + package filecache import ( "context" - "fmt" - "io" - "io/ioutil" - "net/http" - "os" "go.uber.org/zap" @@ -14,70 +11,16 @@ import ( ) type Client struct { - l *zap.Logger - endpoint string } func NewClient(l *zap.Logger, endpoint string) *Client { - return &Client{ - l: l, - endpoint: endpoint, - } + panic("implement me") } func (c *Client) Upload(ctx context.Context, id build.ID, localPath string) error { - f, err := os.Open(localPath) - if err != nil { - return err - } - defer f.Close() - - req, err := http.NewRequestWithContext(ctx, http.MethodPut, c.endpoint+"/file?id="+id.String(), f) - if err != nil { - return err - } - - rsp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer rsp.Body.Close() - - if rsp.StatusCode != 200 { - errStr, _ := ioutil.ReadAll(rsp.Body) - return fmt.Errorf("file upload: %s", errStr) - } - - return nil + panic("implement me") } func (c *Client) Download(ctx context.Context, localCache *Cache, id build.ID) error { - w, abort, err := localCache.Write(id) - if err != nil { - return err - } - defer func() { _ = abort() }() - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.endpoint+"/file?id="+id.String(), nil) - if err != nil { - return err - } - - rsp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer rsp.Body.Close() - - if rsp.StatusCode != 200 { - errStr, _ := ioutil.ReadAll(rsp.Body) - return fmt.Errorf("file upload: %s", errStr) - } - - _, err = io.Copy(w, rsp.Body) - if err != nil { - return err - } - - return w.Close() + panic("implement me") } diff --git a/distbuild/pkg/filecache/filecache.go b/distbuild/pkg/filecache/filecache.go index 8f31d50..e5876fc 100644 --- a/distbuild/pkg/filecache/filecache.go +++ b/distbuild/pkg/filecache/filecache.go @@ -1,12 +1,11 @@ +// +build !solution + package filecache import ( "errors" "io" - "os" - "path/filepath" - "gitlab.com/slon/shad-go/distbuild/pkg/artifact" "gitlab.com/slon/shad-go/distbuild/pkg/build" ) @@ -17,95 +16,25 @@ var ( ErrReadLocked = errors.New("file is locked for read") ) -const fileName = "file" - -func convertErr(err error) error { - switch { - case errors.Is(err, artifact.ErrNotFound): - return ErrNotFound - case errors.Is(err, artifact.ErrExists): - return ErrExists - case errors.Is(err, artifact.ErrWriteLocked): - return ErrWriteLocked - case errors.Is(err, artifact.ErrReadLocked): - return ErrReadLocked - default: - return err - } -} - type Cache struct { - cache *artifact.Cache } func New(rootDir string) (*Cache, error) { - cache, err := artifact.NewCache(rootDir) - if err != nil { - return nil, err - } - - c := &Cache{cache: cache} - return c, nil + panic("implement me") } func (c *Cache) Range(fileFn func(file build.ID) error) error { - return c.cache.Range(fileFn) + panic("implement me") } func (c *Cache) Remove(file build.ID) error { - return convertErr(c.cache.Remove(file)) -} - -type fileWriter struct { - f *os.File - commit func() error -} - -func (f *fileWriter) Write(p []byte) (int, error) { - return f.f.Write(p) -} - -func (f *fileWriter) Close() error { - closeErr := f.f.Close() - commitErr := f.commit() - - if closeErr != nil { - return closeErr - } - - return commitErr + panic("implement me") } func (c *Cache) Write(file build.ID) (w io.WriteCloser, abort func() error, err error) { - path, commit, abortDir, err := c.cache.Create(file) - if err != nil { - err = convertErr(err) - return - } - - f, err := os.Create(filepath.Join(path, fileName)) - if err != nil { - _ = abort() - return - } - - w = &fileWriter{f: f, commit: commit} - abort = func() error { - closeErr := f.Close() - abortErr := abortDir() - - if closeErr != nil { - return closeErr - } - - return abortErr - } - return + panic("implement me") } func (c *Cache) Get(file build.ID) (path string, unlock func(), err error) { - root, unlock, err := c.cache.Get(file) - path = filepath.Join(root, fileName) - err = convertErr(err) - return + panic("implement me") } diff --git a/distbuild/pkg/filecache/handler.go b/distbuild/pkg/filecache/handler.go index ddd21c2..514fdea 100644 --- a/distbuild/pkg/filecache/handler.go +++ b/distbuild/pkg/filecache/handler.go @@ -1,99 +1,20 @@ +// +build !solution + package filecache import ( - "errors" - "fmt" - "io" "net/http" - "os" "go.uber.org/zap" - "golang.org/x/sync/singleflight" - - "gitlab.com/slon/shad-go/distbuild/pkg/build" ) type Handler struct { - l *zap.Logger - cache *Cache - single singleflight.Group } func NewHandler(l *zap.Logger, cache *Cache) *Handler { - return &Handler{ - l: l, - cache: cache, - } + panic("implement me") } func (h *Handler) Register(mux *http.ServeMux) { - mux.HandleFunc("/file", h.file) -} - -func (h *Handler) doGet(w http.ResponseWriter, r *http.Request, id build.ID) error { - path, unlock, err := h.cache.Get(id) - if err != nil { - return err - } - defer unlock() - - f, err := os.Open(path) - if err != nil { - return err - } - defer f.Close() - - if _, err = io.Copy(w, f); err != nil { - h.l.Warn("error streaming file", zap.Error(err)) - } - - h.l.Debug("file download complete", zap.String("id", id.String())) - return nil -} - -func (h *Handler) doPut(w http.ResponseWriter, r *http.Request, id build.ID) error { - _, err, _ := h.single.Do(id.String(), func() (interface{}, error) { - w, abort, err := h.cache.Write(id) - if errors.Is(err, ErrExists) { - return nil, nil - } else if err != nil { - return nil, err - } - defer func() { _ = abort() }() - - if _, err = io.Copy(w, r.Body); err != nil { - return nil, err - } - return nil, w.Close() - }) - - if err != nil { - return err - } - - w.WriteHeader(http.StatusOK) - h.l.Debug("file upload complete", zap.String("id", id.String())) - return nil -} - -func (h *Handler) file(w http.ResponseWriter, r *http.Request) { - var id build.ID - err := id.UnmarshalText([]byte(r.URL.Query().Get("id"))) - - if err == nil { - switch r.Method { - case http.MethodGet: - err = h.doGet(w, r, id) - case http.MethodPut: - err = h.doPut(w, r, id) - default: - err = fmt.Errorf("filehandler: unsupported method %s", r.Method) - } - } - - if err != nil { - h.l.Warn("file error", zap.String("method", r.Method), zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - _, _ = fmt.Fprintf(w, "%v", err) - } + panic("implement me") } diff --git a/distbuild/pkg/scheduler/scheduler.go b/distbuild/pkg/scheduler/scheduler.go index 53cb95e..9c4444d 100644 --- a/distbuild/pkg/scheduler/scheduler.go +++ b/distbuild/pkg/scheduler/scheduler.go @@ -1,8 +1,9 @@ +// +build !solution + package scheduler import ( "context" - "sync" "time" "go.uber.org/zap" @@ -11,43 +12,12 @@ import ( "gitlab.com/slon/shad-go/distbuild/pkg/build" ) +var timeAfter = time.After + type PendingJob struct { Job *api.JobSpec Finished chan struct{} Result *api.JobResult - - mu sync.Mutex - pickedUp chan struct{} -} - -func (p *PendingJob) finish(res *api.JobResult) { - p.Result = res - close(p.Finished) -} - -func (p *PendingJob) pickUp() bool { - p.mu.Lock() - defer p.mu.Unlock() - - select { - case <-p.pickedUp: - return false - default: - close(p.pickedUp) - return true - } -} - -func (p *PendingJob) enqueue(q chan *PendingJob) { - select { - case q <- p: - case <-p.pickedUp: - } -} - -type workerQueue struct { - cacheQueue chan *PendingJob - depQueue chan *PendingJob } type Config struct { @@ -56,233 +26,28 @@ type Config struct { } type Scheduler struct { - l *zap.Logger - config Config - - mu sync.Mutex - - cachedJobs map[build.ID]map[api.WorkerID]struct{} - - pendingJobs map[build.ID]*PendingJob - pendingJobDeps map[build.ID]map[*PendingJob]struct{} - - workerQueue map[api.WorkerID]*workerQueue - globalQueue chan *PendingJob } func NewScheduler(l *zap.Logger, config Config) *Scheduler { - return &Scheduler{ - l: l, - config: config, - - cachedJobs: make(map[build.ID]map[api.WorkerID]struct{}), - pendingJobs: make(map[build.ID]*PendingJob), - pendingJobDeps: make(map[build.ID]map[*PendingJob]struct{}), - - workerQueue: make(map[api.WorkerID]*workerQueue), - globalQueue: make(chan *PendingJob), - } + panic("implement me") } func (c *Scheduler) LocateArtifact(id build.ID) (api.WorkerID, bool) { - c.mu.Lock() - defer c.mu.Unlock() - - for id := range c.cachedJobs[id] { - return id, true - } - - return "", false + panic("implement me") } func (c *Scheduler) RegisterWorker(workerID api.WorkerID) { - c.mu.Lock() - defer c.mu.Unlock() - - _, ok := c.workerQueue[workerID] - if ok { - return - } - - c.workerQueue[workerID] = &workerQueue{ - cacheQueue: make(chan *PendingJob), - depQueue: make(chan *PendingJob), - } + panic("implement me") } func (c *Scheduler) OnJobComplete(workerID api.WorkerID, jobID build.ID, res *api.JobResult) bool { - c.l.Debug("job completed", zap.String("worker_id", workerID.String()), zap.String("job_id", jobID.String())) - - c.mu.Lock() - pendingJob, pendingFound := c.pendingJobs[jobID] - if pendingFound { - delete(c.pendingJobs, jobID) - } - - job, ok := c.cachedJobs[jobID] - if !ok { - job = make(map[api.WorkerID]struct{}) - c.cachedJobs[jobID] = job - } - job[workerID] = struct{}{} - - workerQueue := c.workerQueue[workerID] - for waiter := range c.pendingJobDeps[jobID] { - go waiter.enqueue(workerQueue.depQueue) - } - - c.mu.Unlock() - - if !pendingFound { - return false - } - - c.l.Debug("finishing pending job", zap.String("job_id", jobID.String())) - pendingJob.finish(res) - return true -} - -func (c *Scheduler) enqueueCacheLocal(job *PendingJob) bool { - cached := false - - for workerID := range c.cachedJobs[job.Job.ID] { - cached = true - go job.enqueue(c.workerQueue[workerID].cacheQueue) - } - - return cached -} - -var timeAfter = time.After - -func (c *Scheduler) putDepQueue(job *PendingJob, dep build.ID) { - depJobs, ok := c.pendingJobDeps[dep] - if !ok { - depJobs = make(map[*PendingJob]struct{}) - c.pendingJobDeps[dep] = depJobs - } - depJobs[job] = struct{}{} -} - -func (c *Scheduler) deleteDepQueue(job *PendingJob, dep build.ID) { - depJobs := c.pendingJobDeps[dep] - delete(depJobs, job) - if len(depJobs) == 0 { - delete(c.pendingJobDeps, dep) - } -} - -func (c *Scheduler) doScheduleJob(job *PendingJob, cached bool) { - if cached { - select { - case <-job.pickedUp: - c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String())) - return - case <-timeAfter(c.config.CacheTimeout): - } - } - - c.mu.Lock() - workers := make(map[api.WorkerID]struct{}) - - for _, dep := range job.Job.Deps { - c.putDepQueue(job, dep) - - for workerID := range c.cachedJobs[dep] { - if _, ok := workers[workerID]; ok { - return - } - - go job.enqueue(c.workerQueue[workerID].depQueue) - workers[workerID] = struct{}{} - } - } - c.mu.Unlock() - - defer func() { - c.mu.Lock() - defer c.mu.Unlock() - - for _, dep := range job.Job.Deps { - c.deleteDepQueue(job, dep) - } - }() - - c.l.Debug("job is put into dep-local queues", zap.String("job_id", job.Job.ID.String())) - - select { - case <-job.pickedUp: - c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String())) - return - case <-timeAfter(c.config.DepsTimeout): - } - - go job.enqueue(c.globalQueue) - c.l.Debug("job is put into global queue", zap.String("job_id", job.Job.ID.String())) - - <-job.pickedUp - c.l.Debug("job picked", zap.String("job_id", job.Job.ID.String())) + panic("implement me") } func (c *Scheduler) ScheduleJob(job *api.JobSpec) *PendingJob { - var cached bool - - c.mu.Lock() - pendingJob, running := c.pendingJobs[job.ID] - if !running { - pendingJob = &PendingJob{ - Job: job, - Finished: make(chan struct{}), - - pickedUp: make(chan struct{}), - } - - c.pendingJobs[job.ID] = pendingJob - cached = c.enqueueCacheLocal(pendingJob) - } - c.mu.Unlock() - - if !running { - c.l.Debug("job is scheduled", zap.String("job_id", job.ID.String())) - go c.doScheduleJob(pendingJob, cached) - } else { - c.l.Debug("job is pending", zap.String("job_id", job.ID.String())) - } - - return pendingJob + panic("implement me") } func (c *Scheduler) PickJob(ctx context.Context, workerID api.WorkerID) *PendingJob { - c.l.Debug("picking next job", zap.String("worker_id", workerID.String())) - - c.mu.Lock() - local := c.workerQueue[workerID] - c.mu.Unlock() - - var pg *PendingJob - var queue string - - for { - select { - case pg = <-c.globalQueue: - queue = "global" - case pg = <-local.depQueue: - queue = "dep" - case pg = <-local.cacheQueue: - queue = "cache" - case <-ctx.Done(): - return nil - } - - if pg.pickUp() { - break - } - } - - c.l.Debug("picked job", - zap.String("worker_id", workerID.String()), - zap.String("job_id", pg.Job.ID.String()), - zap.String("queue", queue)) - - return pg + panic("implement me") } diff --git a/distbuild/pkg/tarstream/stream.go b/distbuild/pkg/tarstream/stream.go index 85c3b59..2e7be8e 100644 --- a/distbuild/pkg/tarstream/stream.go +++ b/distbuild/pkg/tarstream/stream.go @@ -1,98 +1,15 @@ +// +build !solution + package tarstream import ( - "archive/tar" "io" - "os" - "path/filepath" ) func Send(dir string, w io.Writer) error { - tw := tar.NewWriter(w) - - err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - rel, err := filepath.Rel(dir, path) - if err != nil { - return err - } - - if rel == "." { - return nil - } - - switch { - case info.IsDir(): - return tw.WriteHeader(&tar.Header{ - Name: rel, - Typeflag: tar.TypeDir, - }) - - default: - h := &tar.Header{ - Typeflag: tar.TypeReg, - Name: rel, - Size: info.Size(), - Mode: int64(info.Mode()), - } - - if err := tw.WriteHeader(h); err != nil { - return err - } - - f, err := os.Open(path) - if err != nil { - return err - } - defer f.Close() - - _, err = io.Copy(tw, f) - return err - } - }) - - if err != nil { - return err - } - - return tw.Close() + panic("implement me") } func Receive(dir string, r io.Reader) error { - tr := tar.NewReader(r) - - for { - h, err := tr.Next() - if err == io.EOF { - return nil - } else if err != nil { - return err - } - - absPath := filepath.Join(dir, h.Name) - - if h.Typeflag == tar.TypeDir { - if err := os.Mkdir(absPath, 0777); err != nil { - return err - } - } else { - writeFile := func() error { - f, err := os.OpenFile(absPath, os.O_CREATE|os.O_WRONLY, os.FileMode(h.Mode)) - if err != nil { - return err - } - defer f.Close() - - _, err = io.Copy(f, tr) - return err - } - - if err := writeFile(); err != nil { - return err - } - } - } + panic("implement me") } diff --git a/distbuild/pkg/worker/download.go b/distbuild/pkg/worker/download.go deleted file mode 100644 index bd27b99..0000000 --- a/distbuild/pkg/worker/download.go +++ /dev/null @@ -1,45 +0,0 @@ -package worker - -import ( - "context" - "errors" - - "gitlab.com/slon/shad-go/distbuild/pkg/api" - "gitlab.com/slon/shad-go/distbuild/pkg/artifact" - "gitlab.com/slon/shad-go/distbuild/pkg/build" - "gitlab.com/slon/shad-go/distbuild/pkg/filecache" -) - -func (w *Worker) downloadFiles(ctx context.Context, files map[build.ID]string) error { - for id := range files { - _, unlock, err := w.fileCache.Get(id) - if errors.Is(err, filecache.ErrNotFound) { - if err = w.fileClient.Download(ctx, w.fileCache, id); err != nil { - return err - } - } else if err != nil { - return err - } else { - unlock() - } - } - - return nil -} - -func (w *Worker) downloadArtifacts(ctx context.Context, artifacts map[build.ID]api.WorkerID) error { - for id, worker := range artifacts { - _, unlock, err := w.artifacts.Get(id) - if errors.Is(err, artifact.ErrNotFound) { - if err = artifact.Download(ctx, worker.String(), w.artifacts, id); err != nil { - return err - } - } else if err != nil { - return err - } else { - unlock() - } - } - - return nil -} diff --git a/distbuild/pkg/worker/job.go b/distbuild/pkg/worker/job.go deleted file mode 100644 index 12ba144..0000000 --- a/distbuild/pkg/worker/job.go +++ /dev/null @@ -1,281 +0,0 @@ -package worker - -import ( - "bytes" - "context" - "errors" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "strconv" - - "go.uber.org/zap" - - "gitlab.com/slon/shad-go/distbuild/pkg/api" - "gitlab.com/slon/shad-go/distbuild/pkg/artifact" - "gitlab.com/slon/shad-go/distbuild/pkg/build" -) - -const ( - outputDirName = "output" - srcDirName = "src" - exitCodeFileName = "exit_code" - stdoutFileName = "stdout" - stderrFileName = "stderr" -) - -func (w *Worker) getJobFromCache(jobID build.ID) (*api.JobResult, error) { - aRoot, unlock, err := w.artifacts.Get(jobID) - if err != nil { - return nil, err - } - defer unlock() - - res := &api.JobResult{ - ID: jobID, - } - - exitCodeStr, err := ioutil.ReadFile(filepath.Join(aRoot, exitCodeFileName)) - if err != nil { - return nil, err - } - - res.ExitCode, err = strconv.Atoi(string(exitCodeStr)) - if err != nil { - return nil, err - } - - res.Stdout, err = ioutil.ReadFile(filepath.Join(aRoot, stdoutFileName)) - if err != nil { - return nil, err - } - - res.Stderr, err = ioutil.ReadFile(filepath.Join(aRoot, stderrFileName)) - if err != nil { - return nil, err - } - - return res, nil -} - -func executeCmd(ctx context.Context, cmd *build.Cmd) (stdout, stderr []byte, exitCode int, err error) { - var stdoutBuf, stderrBuf bytes.Buffer - - if cmd.CatOutput != "" { - err = ioutil.WriteFile(cmd.CatOutput, []byte(cmd.CatTemplate), 0666) - return - } - - p := exec.CommandContext(ctx, cmd.Exec[0], cmd.Exec[1:]...) - p.Dir = cmd.WorkingDirectory - p.Env = cmd.Environ - p.Stdout = &stdoutBuf - p.Stderr = &stderrBuf - - err = p.Run() - - stdout = stdoutBuf.Bytes() - stderr = stderrBuf.Bytes() - - if err != nil { - var exitErr *exec.ExitError - if errors.As(err, &exitErr) { - exitCode = exitErr.ExitCode() - err = nil - } - } - return -} - -func (w *Worker) prepareSourceDir(sourceDir string, sourceFiles map[build.ID]string) (unlockSources func(), err error) { - var unlocks []func() - doUnlock := func() { - for _, u := range unlocks { - u() - } - } - - defer func() { - if doUnlock != nil { - doUnlock() - } - }() - - for id, path := range sourceFiles { - dir, _ := filepath.Split(path) - if dir != "" { - if err := os.MkdirAll(filepath.Join(sourceDir, dir), 0777); err != nil { - return nil, err - } - } - - cached, unlock, err := w.fileCache.Get(id) - if err != nil { - return nil, err - } - unlocks = append(unlocks, unlock) - - if err := os.Link(cached, filepath.Join(sourceDir, path)); err != nil { - return nil, err - } - } - - unlockSources = doUnlock - doUnlock = nil - return -} - -func (w *Worker) lockDeps(deps []build.ID) (paths map[build.ID]string, unlockDeps func(), err error) { - var unlocks []func() - doUnlock := func() { - for _, u := range unlocks { - u() - } - } - - defer func() { - if doUnlock != nil { - doUnlock() - } - }() - - paths = make(map[build.ID]string) - - for _, id := range deps { - path, unlock, err := w.artifacts.Get(id) - if err != nil { - return nil, nil, err - } - unlocks = append(unlocks, unlock) - - paths[id] = filepath.Join(path, outputDirName) - } - - unlockDeps = doUnlock - doUnlock = nil - return -} - -func (w *Worker) runJob(ctx context.Context, spec *api.JobSpec) (*api.JobResult, error) { - res, err := w.getJobFromCache(spec.Job.ID) - if err != nil && !errors.Is(err, artifact.ErrNotFound) { - return nil, err - } else if err == nil { - return res, nil - } - - if err = w.downloadFiles(ctx, spec.SourceFiles); err != nil { - return nil, err - } - - if err = w.downloadArtifacts(ctx, spec.Artifacts); err != nil { - return nil, err - } - - aRoot, commit, abort, err := w.artifacts.Create(spec.Job.ID) - if err != nil { - return nil, err - } - - defer func() { - if abort == nil { - return - } - - if err = abort(); err != nil { - w.log.Warn("error aborting job", zap.Any("job_id", spec.Job.ID), zap.Error(err)) - } - }() - - outputDir := filepath.Join(aRoot, outputDirName) - if err = os.Mkdir(outputDir, 0777); err != nil { - return nil, err - } - - sourceDir := filepath.Join(aRoot, srcDirName) - if err = os.Mkdir(sourceDir, 0777); err != nil { - return nil, err - } - - stdoutFile, err := os.Create(filepath.Join(aRoot, stdoutFileName)) - if err != nil { - return nil, err - } - defer stdoutFile.Close() - - stderrFile, err := os.Create(filepath.Join(aRoot, stderrFileName)) - if err != nil { - return nil, err - } - defer stderrFile.Close() - - jobContext := build.JobContext{ - OutputDir: outputDir, - SourceDir: sourceDir, - } - - var unlock []func() - defer func() { - for _, u := range unlock { - u() - } - }() - - unlockSourceFiles, err := w.prepareSourceDir(sourceDir, spec.SourceFiles) - if err != nil { - return nil, err - } - unlock = append(unlock, unlockSourceFiles) - - deps, unlockDeps, err := w.lockDeps(spec.Job.Deps) - if err != nil { - return nil, err - } - unlock = append(unlock, unlockDeps) - jobContext.Deps = deps - - res = &api.JobResult{ - ID: spec.Job.ID, - } - - for _, cmd := range spec.Job.Cmds { - cmd, err := cmd.Render(jobContext) - if err != nil { - return nil, err - } - - stdout, stderr, exitCode, err := executeCmd(ctx, cmd) - if err != nil { - return nil, err - } - - res.Stdout = append(res.Stdout, stdout...) - _, err = stdoutFile.Write(stdout) - if err != nil { - return nil, err - } - - res.Stderr = append(res.Stderr, stderr...) - _, err = stderrFile.Write(stderr) - if err != nil { - return nil, err - } - - if exitCode != 0 { - res.ExitCode = exitCode - break - } - } - - if err := ioutil.WriteFile(filepath.Join(aRoot, exitCodeFileName), []byte(strconv.Itoa(res.ExitCode)), 0666); err != nil { - return nil, err - } - - abort = nil - if err := commit(); err != nil { - return nil, err - } - - return res, nil -} diff --git a/distbuild/pkg/worker/state.go b/distbuild/pkg/worker/state.go deleted file mode 100644 index b5a5e83..0000000 --- a/distbuild/pkg/worker/state.go +++ /dev/null @@ -1,23 +0,0 @@ -package worker - -import "gitlab.com/slon/shad-go/distbuild/pkg/api" - -func (w *Worker) buildHeartbeat() *api.HeartbeatRequest { - w.mu.Lock() - defer w.mu.Unlock() - - req := &api.HeartbeatRequest{ - WorkerID: w.id, - FinishedJob: w.finishedJobs, - } - - w.finishedJobs = nil - return req -} - -func (w *Worker) jobFinished(job *api.JobResult) { - w.mu.Lock() - defer w.mu.Unlock() - - w.finishedJobs = append(w.finishedJobs, *job) -} diff --git a/distbuild/pkg/worker/worker.go b/distbuild/pkg/worker/worker.go index 4b990f2..a76a458 100644 --- a/distbuild/pkg/worker/worker.go +++ b/distbuild/pkg/worker/worker.go @@ -1,39 +1,19 @@ +// +build !solution + package worker import ( "context" - "fmt" "net/http" - "sync" "go.uber.org/zap" - "golang.org/x/sync/singleflight" "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/artifact" - "gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/filecache" ) type Worker struct { - id api.WorkerID - coordinatorEndpoint string - - log *zap.Logger - - fileCache *filecache.Cache - fileClient *filecache.Client - fileOnce singleflight.Group - - artifacts *artifact.Cache - - mux *http.ServeMux - heartbeat *api.HeartbeatClient - - mu sync.Mutex - newArtifacts []build.ID - newSources []build.ID - finishedJobs []api.JobResult } func New( @@ -43,69 +23,13 @@ func New( fileCache *filecache.Cache, artifacts *artifact.Cache, ) *Worker { - w := &Worker{ - id: workerID, - coordinatorEndpoint: coordinatorEndpoint, - log: log, - - fileCache: fileCache, - artifacts: artifacts, - - fileClient: filecache.NewClient(log, coordinatorEndpoint), - heartbeat: api.NewHeartbeatClient(log, coordinatorEndpoint), - - mux: http.NewServeMux(), - } - - artifact.NewHandler(w.log, w.artifacts).Register(w.mux) - return w + panic("implement me") } func (w *Worker) ServeHTTP(rw http.ResponseWriter, r *http.Request) { - w.mux.ServeHTTP(rw, r) -} - -func (w *Worker) recover() error { - return w.artifacts.Range(func(file build.ID) error { - w.newArtifacts = append(w.newArtifacts, file) - return nil - }) + panic("implement me") } func (w *Worker) Run(ctx context.Context) error { - if err := w.recover(); err != nil { - return err - } - - for { - w.log.Debug("sending heartbeat request") - rsp, err := w.heartbeat.Heartbeat(ctx, w.buildHeartbeat()) - if err != nil { - if ctx.Err() != nil { - return ctx.Err() - } - - w.log.DPanic("heartbeat failed", zap.Error(err)) - continue - } - w.log.Debug("received heartbeat response", - zap.Int("num_jobs", len(rsp.JobsToRun))) - - for _, spec := range rsp.JobsToRun { - spec := spec - - w.log.Debug("running job", zap.String("job_id", spec.Job.ID.String())) - result, err := w.runJob(ctx, &spec) - if err != nil { - errStr := fmt.Sprintf("job %s failed: %v", spec.Job.ID, err) - - w.log.Debug("job failed", zap.String("job_id", spec.Job.ID.String()), zap.Error(err)) - w.jobFinished(&api.JobResult{ID: spec.Job.ID, Error: &errStr}) - continue - } - - w.log.Debug("job finished", zap.String("job_id", spec.Job.ID.String())) - w.jobFinished(result) - } - } + panic("implement me") }