From 6900c33441f8f295ca5b88efd5651f42d8ffee07 Mon Sep 17 00:00:00 2001 From: Fedor Korotkiy Date: Sat, 4 Apr 2020 23:11:21 +0300 Subject: [PATCH] Improve tests --- distbuild/disttest/fixture.go | 10 ++++++- distbuild/disttest/single_worker_test.go | 2 +- distbuild/pkg/api/build.go | 3 ++ distbuild/pkg/build/README.md | 4 +++ distbuild/pkg/client/README.md | 2 ++ distbuild/pkg/client/build.go | 22 +++++++++++++-- distbuild/pkg/dist/coordinator.go | 36 ++++++------------------ distbuild/pkg/filecache/client_test.go | 11 +++++--- 8 files changed, 53 insertions(+), 37 deletions(-) create mode 100644 distbuild/pkg/build/README.md create mode 100644 distbuild/pkg/client/README.md diff --git a/distbuild/disttest/fixture.go b/distbuild/disttest/fixture.go index 40542fc..1783c47 100644 --- a/distbuild/disttest/fixture.go +++ b/distbuild/disttest/fixture.go @@ -35,7 +35,10 @@ type env struct { HTTP *http.Server } -const nWorkers = 1 +const ( + logToStderr = true + nWorkers = 1 +) func newEnv(t *testing.T) (e *env, cancel func()) { cwd, err := os.Getwd() @@ -53,6 +56,11 @@ func newEnv(t *testing.T) (e *env, cancel func()) { cfg := zap.NewDevelopmentConfig() cfg.OutputPaths = []string{filepath.Join(env.RootDir, "test.log")} + + if logToStderr { + cfg.OutputPaths = append(cfg.OutputPaths, "stderr") + } + env.Logger, err = cfg.Build() require.NoError(t, err) diff --git a/distbuild/disttest/single_worker_test.go b/distbuild/disttest/single_worker_test.go index 9fe5ae5..3d808a8 100644 --- a/distbuild/disttest/single_worker_test.go +++ b/distbuild/disttest/single_worker_test.go @@ -120,7 +120,7 @@ var artifactTransferGraph = build.Graph{ }, } -func TestArtifactTransfer(t *testing.T) { +func TestArtifactTransferBetweenJobs(t *testing.T) { env, cancel := newEnv(t) defer cancel() diff --git a/distbuild/pkg/api/build.go b/distbuild/pkg/api/build.go index 9b38a78..56e4924 100644 --- a/distbuild/pkg/api/build.go +++ b/distbuild/pkg/api/build.go @@ -28,7 +28,10 @@ type BuildFailed struct { type BuildFinished struct { } +type UploadDone struct{} + type SignalRequest struct { + UploadDone *UploadDone } type SignalResponse struct { diff --git a/distbuild/pkg/build/README.md b/distbuild/pkg/build/README.md new file mode 100644 index 0000000..9877a27 --- /dev/null +++ b/distbuild/pkg/build/README.md @@ -0,0 +1,4 @@ +# build + +Пакет `build` содержит описание графа сборки и набор хелпер-функций для работы с графом. Вам не нужно +писать новый код в этом пакете, но нужно научиться пользоваться тем кодом который вам дан. diff --git a/distbuild/pkg/client/README.md b/distbuild/pkg/client/README.md new file mode 100644 index 0000000..164a39d --- /dev/null +++ b/distbuild/pkg/client/README.md @@ -0,0 +1,2 @@ +# client + diff --git a/distbuild/pkg/client/build.go b/distbuild/pkg/client/build.go index 4814987..54c0cc5 100644 --- a/distbuild/pkg/client/build.go +++ b/distbuild/pkg/client/build.go @@ -9,11 +9,13 @@ import ( "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/build" + "gitlab.com/slon/shad-go/distbuild/pkg/filecache" ) type Client struct { l *zap.Logger client *api.Client + cache *filecache.Client sourceDir string } @@ -24,7 +26,8 @@ func NewClient( ) *Client { return &Client{ l: l, - client: &api.Client{endpoint: apiEndpoint}, + client: api.NewClient(l, apiEndpoint), + cache: filecache.NewClient(l, apiEndpoint), sourceDir: sourceDir, } } @@ -37,7 +40,20 @@ type BuildListener interface { OnJobFailed(jobID build.ID, code int, error string) error } -func (c *Client) uploadSources(ctx context.Context, started *api.BuildStarted) error { +func (c *Client) uploadSources(ctx context.Context, graph *build.Graph, started *api.BuildStarted) error { + for _, id := range started.MissingFiles { + c.l.Debug("uploading missing file to coordinator", zap.String("id", id.String())) + + path, ok := graph.SourceFiles[id] + if !ok { + return fmt.Errorf("file is missing in build graph: id=%s", id) + } + + if err := c.cache.Upload(ctx, id, path); err != nil { + return err + } + } + return nil } @@ -48,7 +64,7 @@ func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener } c.l.Debug("build started", zap.String("build_id", started.ID.String())) - if err := c.uploadSources(ctx, started); err != nil { + if err := c.uploadSources(ctx, &graph, started); err != nil { return err } diff --git a/distbuild/pkg/dist/coordinator.go b/distbuild/pkg/dist/coordinator.go index 4ec0a2d..3b0f0ca 100644 --- a/distbuild/pkg/dist/coordinator.go +++ b/distbuild/pkg/dist/coordinator.go @@ -2,7 +2,6 @@ package dist import ( "context" - "encoding/json" "fmt" "net/http" "sync" @@ -47,7 +46,9 @@ func NewCoordinator( apiHandler := api.NewServiceHandler(log, c) apiHandler.Register(c.mux) - c.mux.HandleFunc("/heartbeat", c.Heartbeat) + heartbeatHandler := api.NewHeartbeatHandler(log, c) + heartbeatHandler.Register(c.mux) + return c } @@ -84,15 +85,10 @@ func (c *Coordinator) StartBuild(ctx context.Context, req *api.BuildRequest, w a } func (c *Coordinator) SignalBuild(ctx context.Context, buildID build.ID, signal *api.SignalRequest) (*api.SignalResponse, error) { - panic("implement me") + return nil, fmt.Errorf("signal build: not implemented") } -func (c *Coordinator) doHeartbeat(w http.ResponseWriter, r *http.Request) error { - var req api.HeartbeatRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - return fmt.Errorf("invalid request: %w", err) - } - +func (c *Coordinator) Heartbeat(ctx context.Context, req *api.HeartbeatRequest) (*api.HeartbeatResponse, error) { c.scheduler.RegisterWorker(req.WorkerID) for _, job := range req.FinishedJob { @@ -101,30 +97,14 @@ func (c *Coordinator) doHeartbeat(w http.ResponseWriter, r *http.Request) error c.scheduler.OnJobComplete(req.WorkerID, job.ID, &job) } - rsp := api.HeartbeatResponse{ + rsp := &api.HeartbeatResponse{ JobsToRun: map[build.ID]api.JobSpec{}, } - job := c.scheduler.PickJob(req.WorkerID, r.Context().Done()) + job := c.scheduler.PickJob(req.WorkerID, ctx.Done()) if job != nil { rsp.JobsToRun[job.Job.ID] = api.JobSpec{Job: *job.Job} } - if err := json.NewEncoder(w).Encode(rsp); err != nil { - return err - } - - return nil -} - -func (c *Coordinator) Heartbeat(w http.ResponseWriter, r *http.Request) { - c.log.Debug("heartbeat started") - if err := c.doHeartbeat(w, r); err != nil { - c.log.Error("heartbeat failed", zap.Error(err)) - - w.WriteHeader(http.StatusBadRequest) - _, _ = w.Write([]byte(err.Error())) - return - } - c.log.Debug("heartbeat finished") + return rsp, nil } diff --git a/distbuild/pkg/filecache/client_test.go b/distbuild/pkg/filecache/client_test.go index 6656410..a0d852a 100644 --- a/distbuild/pkg/filecache/client_test.go +++ b/distbuild/pkg/filecache/client_test.go @@ -1,6 +1,7 @@ package filecache_test import ( + "bytes" "context" "io/ioutil" "net/http" @@ -60,8 +61,10 @@ func TestFileUpload(t *testing.T) { env := newEnv(t) defer env.stop() + content := bytes.Repeat([]byte("foobar"), 1024*1024) + tmpFilePath := filepath.Join(env.cache.tmpDir, "foo.txt") - require.NoError(t, ioutil.WriteFile(tmpFilePath, []byte("foobar"), 0666)) + require.NoError(t, ioutil.WriteFile(tmpFilePath, content, 0666)) ctx := context.Background() @@ -76,7 +79,7 @@ func TestFileUpload(t *testing.T) { content, err := ioutil.ReadFile(path) require.NoError(t, err) - require.Equal(t, []byte("foobar"), content) + require.Equal(t, content, content) }) t.Run("RepeatedUpload", func(t *testing.T) { @@ -88,8 +91,8 @@ func TestFileUpload(t *testing.T) { t.Run("ConcurrentUpload", func(t *testing.T) { const ( - N = 100 - G = 100 + N = 10 + G = 10 ) for i := 0; i < N; i++ {