From b97b6e9a0f1e3a951757958a43831f512c3a117e Mon Sep 17 00:00:00 2001 From: Fedor Korotkiy Date: Sun, 29 Mar 2020 19:03:07 +0300 Subject: [PATCH] Client protocol --- distbuild/disttest/fixture.go | 4 +- distbuild/pkg/api/build.go | 50 +++++++++ distbuild/pkg/api/build_test.go | 130 ++++++++++++++++++++++ distbuild/pkg/api/client.go | 111 ++++++++++++++++++ distbuild/pkg/api/handler.go | 123 ++++++++++++++++++++ distbuild/pkg/{proto => api}/heartbeat.go | 2 +- distbuild/pkg/api/mock/mock.go | 65 +++++++++++ distbuild/pkg/client/build.go | 8 +- distbuild/pkg/dist/build.go | 4 +- distbuild/pkg/dist/coordinator.go | 18 +-- distbuild/pkg/proto/build.go | 23 ---- distbuild/pkg/scheduler/scheduler.go | 30 ++--- distbuild/pkg/scheduler/scheduler_test.go | 10 +- distbuild/pkg/worker/job.go | 10 +- distbuild/pkg/worker/state.go | 10 +- distbuild/pkg/worker/worker.go | 14 +-- lrucache/lru.go | 78 ++++++++++++- 17 files changed, 609 insertions(+), 81 deletions(-) create mode 100644 distbuild/pkg/api/build.go create mode 100644 distbuild/pkg/api/build_test.go create mode 100644 distbuild/pkg/api/client.go create mode 100644 distbuild/pkg/api/handler.go rename distbuild/pkg/{proto => api}/heartbeat.go (99%) create mode 100644 distbuild/pkg/api/mock/mock.go delete mode 100644 distbuild/pkg/proto/build.go diff --git a/distbuild/disttest/fixture.go b/distbuild/disttest/fixture.go index 46a2e32..6149e4c 100644 --- a/distbuild/disttest/fixture.go +++ b/distbuild/disttest/fixture.go @@ -11,11 +11,11 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/artifact" "gitlab.com/slon/shad-go/distbuild/pkg/client" "gitlab.com/slon/shad-go/distbuild/pkg/dist" "gitlab.com/slon/shad-go/distbuild/pkg/filecache" - "gitlab.com/slon/shad-go/distbuild/pkg/proto" "gitlab.com/slon/shad-go/distbuild/pkg/worker" "gitlab.com/slon/shad-go/tools/testtool" @@ -97,7 +97,7 @@ func newEnv(t *testing.T) (e *env, cancel func()) { require.NoError(t, err) workerPrefix := fmt.Sprintf("/worker/%d", i) - workerID := proto.WorkerID("http://" + addr + workerPrefix) + workerID := api.WorkerID("http://" + addr + workerPrefix) w := worker.New( workerID, diff --git a/distbuild/pkg/api/build.go b/distbuild/pkg/api/build.go new file mode 100644 index 0000000..9b38a78 --- /dev/null +++ b/distbuild/pkg/api/build.go @@ -0,0 +1,50 @@ +package api + +import ( + "context" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" +) + +type BuildRequest struct { + Graph build.Graph +} + +type BuildStarted struct { + ID build.ID + MissingFiles []build.ID +} + +type StatusUpdate struct { + JobFinished *JobResult + BuildFailed *BuildFailed + BuildFinished *BuildFinished +} + +type BuildFailed struct { + Error string +} + +type BuildFinished struct { +} + +type SignalRequest struct { +} + +type SignalResponse struct { +} + +type StatusWriter interface { + Started(rsp *BuildStarted) error + Updated(update *StatusUpdate) error +} + +type Service interface { + StartBuild(ctx context.Context, request *BuildRequest, w StatusWriter) error + SignalBuild(ctx context.Context, buildID build.ID, signal *SignalRequest) (*SignalResponse, error) +} + +type StatusReader interface { + Close() error + Next() (*StatusUpdate, error) +} diff --git a/distbuild/pkg/api/build_test.go b/distbuild/pkg/api/build_test.go new file mode 100644 index 0000000..3e18cf0 --- /dev/null +++ b/distbuild/pkg/api/build_test.go @@ -0,0 +1,130 @@ +package api_test + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "gitlab.com/slon/shad-go/distbuild/pkg/api" + mock "gitlab.com/slon/shad-go/distbuild/pkg/api/mock" + "gitlab.com/slon/shad-go/distbuild/pkg/build" +) + +//go:generate mockgen -package mock -destination mock/mock.go . Service + +type env struct { + ctrl *gomock.Controller + mock *mock.MockService + server *httptest.Server + client *api.Client +} + +func (e *env) stop() { + e.server.Close() + e.ctrl.Finish() +} + +func newEnv(t *testing.T) (*env, func()) { + env := &env{} + env.ctrl = gomock.NewController(t) + env.mock = mock.NewMockService(env.ctrl) + + log := zaptest.NewLogger(t) + + mux := http.NewServeMux() + + handler := api.NewServiceHandler(log, env.mock) + handler.Register(mux) + + env.server = httptest.NewServer(mux) + + env.client = &api.Client{Endpoint: env.server.URL} + + return env, env.stop +} + +func TestBuildSignal(t *testing.T) { + env, stop := newEnv(t) + defer stop() + + ctx := context.Background() + + buildIDa := build.ID{01} + buildIDb := build.ID{02} + req := &api.SignalRequest{} + rsp := &api.SignalResponse{} + + env.mock.EXPECT().SignalBuild(gomock.Any(), buildIDa, req).Return(rsp, nil) + env.mock.EXPECT().SignalBuild(gomock.Any(), buildIDb, req).Return(nil, fmt.Errorf("foo bar error")) + + _, err := env.client.SignalBuild(ctx, buildIDa, req) + require.NoError(t, err) + + _, err = env.client.SignalBuild(ctx, buildIDb, req) + require.Error(t, err) + require.Contains(t, err.Error(), "foo bar error") +} + +func TestBuildStartError(t *testing.T) { + env, stop := newEnv(t) + defer stop() + + ctx := context.Background() + + env.mock.EXPECT().StartBuild(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("foo bar error")) + + _, _, err := env.client.StartBuild(ctx, &api.BuildRequest{}) + require.Contains(t, err.Error(), "foo bar error") +} + +func TestBuildRunning(t *testing.T) { + env, stop := newEnv(t) + defer stop() + + ctx := context.Background() + + buildID := build.ID{02} + + req := &api.BuildRequest{ + Graph: build.Graph{SourceFiles: map[build.ID]string{{01}: "a.txt"}}, + } + + started := &api.BuildStarted{ID: buildID} + finished := &api.StatusUpdate{BuildFinished: &api.BuildFinished{}} + + env.mock.EXPECT().StartBuild(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, req *api.BuildRequest, w api.StatusWriter) error { + if err := w.Started(started); err != nil { + return err + } + + if err := w.Updated(finished); err != nil { + return err + } + + return fmt.Errorf("foo bar error") + }) + + rsp, r, err := env.client.StartBuild(ctx, req) + require.NoError(t, err) + + require.Equal(t, started, rsp) + + u, err := r.Next() + require.NoError(t, err) + require.Equal(t, finished, u) + + u, err = r.Next() + require.NoError(t, err) + require.Contains(t, u.BuildFailed.Error, "foo bar error") + + _, err = r.Next() + require.Equal(t, err, io.EOF) +} diff --git a/distbuild/pkg/api/client.go b/distbuild/pkg/api/client.go new file mode 100644 index 0000000..00302d6 --- /dev/null +++ b/distbuild/pkg/api/client.go @@ -0,0 +1,111 @@ +package api + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" +) + +type Client struct { + Endpoint string +} + +type statusReader struct { + r io.ReadCloser + dec *json.Decoder +} + +func (r *statusReader) Close() error { + return r.r.Close() +} + +func (r *statusReader) Next() (*StatusUpdate, error) { + var u StatusUpdate + if err := r.dec.Decode(&u); err != nil { + return nil, err + } + return &u, nil +} + +func (c *Client) StartBuild(ctx context.Context, request *BuildRequest) (*BuildStarted, StatusReader, error) { + reqJSON, err := json.Marshal(request) + if err != nil { + return nil, nil, err + } + + req, err := http.NewRequest("POST", c.Endpoint+"/build", bytes.NewBuffer(reqJSON)) + if err != nil { + return nil, nil, err + } + req.Header.Set("content-type", "application/json") + + rsp, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, nil, err + } + defer func() { + if rsp.Body != nil { + _ = rsp.Body.Close() + } + }() + + if rsp.StatusCode != 200 { + bodyStr, err := ioutil.ReadAll(rsp.Body) + if err != nil { + return nil, nil, fmt.Errorf("build request failed: %v", err) + } + + return nil, nil, fmt.Errorf("build failed: %s", bodyStr) + } + + dec := json.NewDecoder(rsp.Body) + var started BuildStarted + if err := dec.Decode(&started); err != nil { + return nil, nil, err + } + + r := &statusReader{r: rsp.Body, dec: dec} + rsp.Body = nil + return &started, r, nil +} + +func (c *Client) SignalBuild(ctx context.Context, buildID build.ID, signal *SignalRequest) (*SignalResponse, error) { + signalJSON, err := json.Marshal(signal) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", c.Endpoint+"/signal?build_id="+buildID.String(), bytes.NewBuffer(signalJSON)) + if err != nil { + return nil, err + } + req.Header.Set("content-type", "application/json") + + rsp, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + defer rsp.Body.Close() + + rspBody, err := ioutil.ReadAll(rsp.Body) + if err != nil { + return nil, fmt.Errorf("signal request failed: %v", err) + } + + if rsp.StatusCode != 200 { + return nil, fmt.Errorf("signal failed: %s", rspBody) + } + + var signalRsp SignalResponse + if err := json.Unmarshal(rspBody, &rsp); err != nil { + return nil, err + } + + return &signalRsp, err +} diff --git a/distbuild/pkg/api/handler.go b/distbuild/pkg/api/handler.go new file mode 100644 index 0000000..61677f0 --- /dev/null +++ b/distbuild/pkg/api/handler.go @@ -0,0 +1,123 @@ +package api + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + "go.uber.org/zap" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" +) + +func NewServiceHandler(l *zap.Logger, s Service) *ServiceHandler { + return &ServiceHandler{ + l: l, + s: s, + } +} + +type ServiceHandler struct { + l *zap.Logger + s Service +} + +func (s *ServiceHandler) Register(mux *http.ServeMux) { + mux.HandleFunc("/build", s.build) + mux.HandleFunc("/signal", s.signal) +} + +type statusWriter struct { + written bool + w http.ResponseWriter + enc *json.Encoder +} + +func (w *statusWriter) Started(rsp *BuildStarted) error { + w.written = true + w.w.Header().Set("content-type", "application/json") + w.w.WriteHeader(http.StatusOK) + return w.enc.Encode(rsp) +} + +func (w *statusWriter) Updated(update *StatusUpdate) error { + return w.enc.Encode(update) +} + +func (s *ServiceHandler) doBuild(w http.ResponseWriter, r *http.Request) error { + reqJSON, err := ioutil.ReadAll(r.Body) + if err != nil { + return err + } + + var req BuildRequest + if err := json.Unmarshal(reqJSON, &req); err != nil { + return err + } + + sw := &statusWriter{w: w, enc: json.NewEncoder(w)} + err = s.s.StartBuild(r.Context(), &req, sw) + + if err != nil { + if sw.written { + _ = sw.Updated(&StatusUpdate{BuildFailed: &BuildFailed{Error: err.Error()}}) + return nil + } + + return err + } + + return nil +} + +func (s *ServiceHandler) build(w http.ResponseWriter, r *http.Request) { + if err := s.doBuild(w, r); err != nil { + w.WriteHeader(http.StatusBadRequest) + _, _ = fmt.Fprintf(w, "%v", err) + } +} + +func (s *ServiceHandler) doSignal(w http.ResponseWriter, r *http.Request) error { + buildIDParam := r.URL.Query().Get("build_id") + if buildIDParam == "" { + return fmt.Errorf(`"build_id" parameter is missing`) + } + + var buildID build.ID + if err := buildID.UnmarshalText([]byte(buildIDParam)); err != nil { + return err + } + + reqJSON, err := ioutil.ReadAll(r.Body) + if err != nil { + return err + } + + var req SignalRequest + if err := json.Unmarshal(reqJSON, &req); err != nil { + return err + } + + rsp, err := s.s.SignalBuild(r.Context(), buildID, &req) + if err != nil { + return err + } + + rspJSON, err := json.Marshal(rsp) + if err != nil { + return err + } + + w.Header().Set("content-type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(rspJSON) + return nil +} + +func (s *ServiceHandler) signal(w http.ResponseWriter, r *http.Request) { + if err := s.doSignal(w, r); err != nil { + w.WriteHeader(http.StatusBadRequest) + _, _ = fmt.Fprintf(w, "%v", err) + } +} diff --git a/distbuild/pkg/proto/heartbeat.go b/distbuild/pkg/api/heartbeat.go similarity index 99% rename from distbuild/pkg/proto/heartbeat.go rename to distbuild/pkg/api/heartbeat.go index fb1c7c2..606ea6d 100644 --- a/distbuild/pkg/proto/heartbeat.go +++ b/distbuild/pkg/api/heartbeat.go @@ -1,4 +1,4 @@ -package proto +package api import ( "gitlab.com/slon/shad-go/distbuild/pkg/build" diff --git a/distbuild/pkg/api/mock/mock.go b/distbuild/pkg/api/mock/mock.go new file mode 100644 index 0000000..05c31d8 --- /dev/null +++ b/distbuild/pkg/api/mock/mock.go @@ -0,0 +1,65 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: gitlab.com/slon/shad-go/distbuild/pkg/api (interfaces: Service) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + gomock "github.com/golang/mock/gomock" + api "gitlab.com/slon/shad-go/distbuild/pkg/api" + build "gitlab.com/slon/shad-go/distbuild/pkg/build" + reflect "reflect" +) + +// MockService is a mock of Service interface +type MockService struct { + ctrl *gomock.Controller + recorder *MockServiceMockRecorder +} + +// MockServiceMockRecorder is the mock recorder for MockService +type MockServiceMockRecorder struct { + mock *MockService +} + +// NewMockService creates a new mock instance +func NewMockService(ctrl *gomock.Controller) *MockService { + mock := &MockService{ctrl: ctrl} + mock.recorder = &MockServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockService) EXPECT() *MockServiceMockRecorder { + return m.recorder +} + +// SignalBuild mocks base method +func (m *MockService) SignalBuild(arg0 context.Context, arg1 build.ID, arg2 *api.SignalRequest) (*api.SignalResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SignalBuild", arg0, arg1, arg2) + ret0, _ := ret[0].(*api.SignalResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SignalBuild indicates an expected call of SignalBuild +func (mr *MockServiceMockRecorder) SignalBuild(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SignalBuild", reflect.TypeOf((*MockService)(nil).SignalBuild), arg0, arg1, arg2) +} + +// StartBuild mocks base method +func (m *MockService) StartBuild(arg0 context.Context, arg1 *api.BuildRequest, arg2 api.StatusWriter) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StartBuild", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// StartBuild indicates an expected call of StartBuild +func (mr *MockServiceMockRecorder) StartBuild(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartBuild", reflect.TypeOf((*MockService)(nil).StartBuild), arg0, arg1, arg2) +} diff --git a/distbuild/pkg/client/build.go b/distbuild/pkg/client/build.go index f13c985..b1754fd 100644 --- a/distbuild/pkg/client/build.go +++ b/distbuild/pkg/client/build.go @@ -10,8 +10,8 @@ import ( "go.uber.org/zap" + "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/build" - "gitlab.com/slon/shad-go/distbuild/pkg/proto" ) type Client struct { @@ -28,7 +28,7 @@ type BuildListener interface { OnJobFailed(jobID build.ID, code int, error string) error } -func (c *Client) uploadSources(ctx context.Context, src proto.MissingSources) error { +func (c *Client) uploadSources(ctx context.Context, src api.BuildStarted) error { return nil } @@ -60,7 +60,7 @@ func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener d := json.NewDecoder(rsp.Body) - var missing proto.MissingSources + var missing api.BuildStarted if err := d.Decode(&missing); err != nil { return fmt.Errorf("error receiving source list: %w", err) } @@ -70,7 +70,7 @@ func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener } for { - var update proto.StatusUpdate + var update api.StatusUpdate if err := d.Decode(&update); err != nil { return fmt.Errorf("error receiving status update: %w", err) } diff --git a/distbuild/pkg/dist/build.go b/distbuild/pkg/dist/build.go index 23c3fbc..588bc8e 100644 --- a/distbuild/pkg/dist/build.go +++ b/distbuild/pkg/dist/build.go @@ -3,8 +3,8 @@ package dist import ( "context" + "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/build" - "gitlab.com/slon/shad-go/distbuild/pkg/proto" ) type Build struct { @@ -27,7 +27,7 @@ func NewBuild(graph *build.Graph, coordinator *Coordinator) *Build { } } -func (b *Build) Run(ctx context.Context, onStatusUpdate func(update proto.StatusUpdate) error) error { +func (b *Build) Run(ctx context.Context, onStatusUpdate func(update api.StatusUpdate) error) error { panic("implement me") } diff --git a/distbuild/pkg/dist/coordinator.go b/distbuild/pkg/dist/coordinator.go index 8c7850c..483f2a8 100644 --- a/distbuild/pkg/dist/coordinator.go +++ b/distbuild/pkg/dist/coordinator.go @@ -10,9 +10,9 @@ import ( "go.uber.org/zap" + "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/filecache" - "gitlab.com/slon/shad-go/distbuild/pkg/proto" "gitlab.com/slon/shad-go/distbuild/pkg/scheduler" ) @@ -67,7 +67,7 @@ func (c *Coordinator) doBuild(w http.ResponseWriter, r *http.Request) error { w.WriteHeader(http.StatusOK) enc := json.NewEncoder(w) - if err := enc.Encode(proto.MissingSources{}); err != nil { + if err := enc.Encode(api.BuildStarted{}); err != nil { return err } @@ -84,13 +84,13 @@ func (c *Coordinator) doBuild(w http.ResponseWriter, r *http.Request) error { c.log.Debug("job finished", zap.String("job_id", job.ID.String())) - update := proto.StatusUpdate{JobFinished: s.Result} + update := api.StatusUpdate{JobFinished: s.Result} if err := enc.Encode(update); err != nil { return err } } - update := proto.StatusUpdate{BuildFinished: &proto.BuildFinished{}} + update := api.StatusUpdate{BuildFinished: &api.BuildFinished{}} return enc.Encode(update) } @@ -110,14 +110,14 @@ func (c *Coordinator) Build(w http.ResponseWriter, r *http.Request) { if err := c.doBuild(w, r); err != nil { c.log.Error("build failed", zap.Error(err)) - errorUpdate := proto.StatusUpdate{BuildFailed: &proto.BuildFailed{Error: err.Error()}} + errorUpdate := api.StatusUpdate{BuildFailed: &api.BuildFailed{Error: err.Error()}} errorJS, _ := json.Marshal(errorUpdate) _, _ = w.Write(errorJS) } } func (c *Coordinator) doHeartbeat(w http.ResponseWriter, r *http.Request) error { - var req proto.HeartbeatRequest + var req api.HeartbeatRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { return fmt.Errorf("invalid request: %w", err) } @@ -130,13 +130,13 @@ func (c *Coordinator) doHeartbeat(w http.ResponseWriter, r *http.Request) error c.scheduler.OnJobComplete(req.WorkerID, job.ID, &job) } - rsp := proto.HeartbeatResponse{ - JobsToRun: map[build.ID]proto.JobSpec{}, + rsp := api.HeartbeatResponse{ + JobsToRun: map[build.ID]api.JobSpec{}, } job := c.scheduler.PickJob(req.WorkerID, r.Context().Done()) if job != nil { - rsp.JobsToRun[job.Job.ID] = proto.JobSpec{Job: *job.Job} + rsp.JobsToRun[job.Job.ID] = api.JobSpec{Job: *job.Job} } if err := json.NewEncoder(w).Encode(rsp); err != nil { diff --git a/distbuild/pkg/proto/build.go b/distbuild/pkg/proto/build.go deleted file mode 100644 index 918914a..0000000 --- a/distbuild/pkg/proto/build.go +++ /dev/null @@ -1,23 +0,0 @@ -package proto - -import ( - "gitlab.com/slon/shad-go/distbuild/pkg/build" -) - -type MissingSources struct { - MissingFiles []build.ID -} - -type StatusUpdate struct { - SourcesMissing *MissingSources - JobFinished *JobResult - BuildFailed *BuildFailed - BuildFinished *BuildFinished -} - -type BuildFailed struct { - Error string -} - -type BuildFinished struct { -} diff --git a/distbuild/pkg/scheduler/scheduler.go b/distbuild/pkg/scheduler/scheduler.go index be817c4..6026a97 100644 --- a/distbuild/pkg/scheduler/scheduler.go +++ b/distbuild/pkg/scheduler/scheduler.go @@ -6,20 +6,20 @@ import ( "go.uber.org/zap" + "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/build" - "gitlab.com/slon/shad-go/distbuild/pkg/proto" ) type PendingJob struct { Job *build.Job - Result *proto.JobResult + Result *api.JobResult Finished chan struct{} mu sync.Mutex pickedUp chan struct{} } -func (p *PendingJob) finish(res *proto.JobResult) { +func (p *PendingJob) finish(res *api.JobResult) { p.Result = res close(p.Finished) } @@ -73,11 +73,11 @@ type Scheduler struct { mu sync.Mutex - cachedJobs map[build.ID]map[proto.WorkerID]struct{} + cachedJobs map[build.ID]map[api.WorkerID]struct{} pendingJobs map[build.ID]*PendingJob - cacheLocalQueue map[proto.WorkerID]*jobQueue - depLocalQueue map[proto.WorkerID]*jobQueue + cacheLocalQueue map[api.WorkerID]*jobQueue + depLocalQueue map[api.WorkerID]*jobQueue globalQueue chan *PendingJob } @@ -86,16 +86,16 @@ func NewScheduler(l *zap.Logger, config Config) *Scheduler { l: l, config: config, - cachedJobs: make(map[build.ID]map[proto.WorkerID]struct{}), + cachedJobs: make(map[build.ID]map[api.WorkerID]struct{}), pendingJobs: make(map[build.ID]*PendingJob), - cacheLocalQueue: make(map[proto.WorkerID]*jobQueue), - depLocalQueue: make(map[proto.WorkerID]*jobQueue), + cacheLocalQueue: make(map[api.WorkerID]*jobQueue), + depLocalQueue: make(map[api.WorkerID]*jobQueue), globalQueue: make(chan *PendingJob), } } -func (c *Scheduler) RegisterWorker(workerID proto.WorkerID) { +func (c *Scheduler) RegisterWorker(workerID api.WorkerID) { c.mu.Lock() defer c.mu.Unlock() @@ -108,7 +108,7 @@ func (c *Scheduler) RegisterWorker(workerID proto.WorkerID) { c.depLocalQueue[workerID] = new(jobQueue) } -func (c *Scheduler) OnJobComplete(workerID proto.WorkerID, jobID build.ID, res *proto.JobResult) bool { +func (c *Scheduler) OnJobComplete(workerID api.WorkerID, jobID build.ID, res *api.JobResult) bool { c.l.Debug("job completed", zap.String("worker_id", workerID.String()), zap.String("job_id", jobID.String())) c.mu.Lock() @@ -119,7 +119,7 @@ func (c *Scheduler) OnJobComplete(workerID proto.WorkerID, jobID build.ID, res * job, ok := c.cachedJobs[jobID] if !ok { - job = make(map[proto.WorkerID]struct{}) + job = make(map[api.WorkerID]struct{}) c.cachedJobs[jobID] = job } job[workerID] = struct{}{} @@ -135,8 +135,8 @@ func (c *Scheduler) OnJobComplete(workerID proto.WorkerID, jobID build.ID, res * return true } -func (c *Scheduler) findOptimalWorkers(jobID build.ID, deps []build.ID) (cacheLocal, depLocal []proto.WorkerID) { - depLocalSet := map[proto.WorkerID]struct{}{} +func (c *Scheduler) findOptimalWorkers(jobID build.ID, deps []build.ID) (cacheLocal, depLocal []api.WorkerID) { + depLocalSet := map[api.WorkerID]struct{}{} c.mu.Lock() defer c.mu.Unlock() @@ -227,7 +227,7 @@ func (c *Scheduler) ScheduleJob(job *build.Job) *PendingJob { return pendingJob } -func (c *Scheduler) PickJob(workerID proto.WorkerID, canceled <-chan struct{}) *PendingJob { +func (c *Scheduler) PickJob(workerID api.WorkerID, canceled <-chan struct{}) *PendingJob { c.l.Debug("picking next job", zap.String("worker_id", workerID.String())) var cacheLocal, depLocal *jobQueue diff --git a/distbuild/pkg/scheduler/scheduler_test.go b/distbuild/pkg/scheduler/scheduler_test.go index 11bcc37..ead8492 100644 --- a/distbuild/pkg/scheduler/scheduler_test.go +++ b/distbuild/pkg/scheduler/scheduler_test.go @@ -9,12 +9,12 @@ import ( "go.uber.org/goleak" "go.uber.org/zap/zaptest" + "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/build" - "gitlab.com/slon/shad-go/distbuild/pkg/proto" ) const ( - workerID0 proto.WorkerID = "w0" + workerID0 api.WorkerID = "w0" ) func TestScheduler(t *testing.T) { @@ -40,7 +40,7 @@ func TestScheduler(t *testing.T) { require.Equal(t, pendingJob0, pickerJob) - result := &proto.JobResult{ID: job0.ID, ExitCode: 0} + result := &api.JobResult{ID: job0.ID, ExitCode: 0} s.OnJobComplete(workerID0, job0.ID, result) select { @@ -69,7 +69,7 @@ func TestScheduler(t *testing.T) { job1 := &build.Job{ID: build.NewID()} s.RegisterWorker(workerID0) - s.OnJobComplete(workerID0, job0.ID, &proto.JobResult{}) + s.OnJobComplete(workerID0, job0.ID, &api.JobResult{}) pendingJob1 := s.ScheduleJob(job1) pendingJob0 := s.ScheduleJob(job0) @@ -94,7 +94,7 @@ func TestScheduler(t *testing.T) { job2 := &build.Job{ID: build.NewID()} s.RegisterWorker(workerID0) - s.OnJobComplete(workerID0, job0.ID, &proto.JobResult{}) + s.OnJobComplete(workerID0, job0.ID, &api.JobResult{}) pendingJob2 := s.ScheduleJob(job2) pendingJob1 := s.ScheduleJob(job1) diff --git a/distbuild/pkg/worker/job.go b/distbuild/pkg/worker/job.go index f12b126..85b87bf 100644 --- a/distbuild/pkg/worker/job.go +++ b/distbuild/pkg/worker/job.go @@ -12,9 +12,9 @@ import ( "go.uber.org/zap" + "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/artifact" "gitlab.com/slon/shad-go/distbuild/pkg/build" - "gitlab.com/slon/shad-go/distbuild/pkg/proto" ) const ( @@ -25,14 +25,14 @@ const ( stderrFileName = "stderr" ) -func (w *Worker) getJobFromCache(jobID build.ID) (*proto.JobResult, error) { +func (w *Worker) getJobFromCache(jobID build.ID) (*api.JobResult, error) { aRoot, unlock, err := w.artifacts.Get(jobID) if err != nil { return nil, err } defer unlock() - res := &proto.JobResult{ + res := &api.JobResult{ ID: jobID, } @@ -157,7 +157,7 @@ func (w *Worker) lockDeps(deps []build.ID) (paths map[build.ID]string, unlockDep return } -func (w *Worker) runJob(ctx context.Context, spec *proto.JobSpec) (*proto.JobResult, error) { +func (w *Worker) runJob(ctx context.Context, spec *api.JobSpec) (*api.JobResult, error) { res, err := w.getJobFromCache(spec.Job.ID) if err != nil && !errors.Is(err, artifact.ErrNotFound) { return nil, err @@ -227,7 +227,7 @@ func (w *Worker) runJob(ctx context.Context, spec *proto.JobSpec) (*proto.JobRes unlock = append(unlock, unlockDeps) jobContext.Deps = deps - res = &proto.JobResult{ + res = &api.JobResult{ ID: spec.Job.ID, } diff --git a/distbuild/pkg/worker/state.go b/distbuild/pkg/worker/state.go index 5722c7d..b5a5e83 100644 --- a/distbuild/pkg/worker/state.go +++ b/distbuild/pkg/worker/state.go @@ -1,14 +1,12 @@ package worker -import ( - "gitlab.com/slon/shad-go/distbuild/pkg/proto" -) +import "gitlab.com/slon/shad-go/distbuild/pkg/api" -func (w *Worker) buildHeartbeat() *proto.HeartbeatRequest { +func (w *Worker) buildHeartbeat() *api.HeartbeatRequest { w.mu.Lock() defer w.mu.Unlock() - req := &proto.HeartbeatRequest{ + req := &api.HeartbeatRequest{ WorkerID: w.id, FinishedJob: w.finishedJobs, } @@ -17,7 +15,7 @@ func (w *Worker) buildHeartbeat() *proto.HeartbeatRequest { return req } -func (w *Worker) jobFinished(job *proto.JobResult) { +func (w *Worker) jobFinished(job *api.JobResult) { w.mu.Lock() defer w.mu.Unlock() diff --git a/distbuild/pkg/worker/worker.go b/distbuild/pkg/worker/worker.go index 635162e..82dfb10 100644 --- a/distbuild/pkg/worker/worker.go +++ b/distbuild/pkg/worker/worker.go @@ -11,14 +11,14 @@ import ( "go.uber.org/zap" + "gitlab.com/slon/shad-go/distbuild/pkg/api" "gitlab.com/slon/shad-go/distbuild/pkg/artifact" "gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/filecache" - "gitlab.com/slon/shad-go/distbuild/pkg/proto" ) type Worker struct { - id proto.WorkerID + id api.WorkerID coordinatorEndpoint string log *zap.Logger @@ -31,11 +31,11 @@ type Worker struct { mu sync.Mutex newArtifacts []build.ID newSources []build.ID - finishedJobs []proto.JobResult + finishedJobs []api.JobResult } func New( - workerID proto.WorkerID, + workerID api.WorkerID, coordinatorEndpoint string, log *zap.Logger, fileCache *filecache.Cache, @@ -63,7 +63,7 @@ func (w *Worker) recover() error { }) } -func (w *Worker) sendHeartbeat(ctx context.Context, req *proto.HeartbeatRequest) (*proto.HeartbeatResponse, error) { +func (w *Worker) sendHeartbeat(ctx context.Context, req *api.HeartbeatRequest) (*api.HeartbeatResponse, error) { reqJS, err := json.Marshal(req) if err != nil { return nil, err @@ -84,7 +84,7 @@ func (w *Worker) sendHeartbeat(ctx context.Context, req *proto.HeartbeatRequest) return nil, fmt.Errorf("heartbeat failed: %s", errorString) } - var rsp proto.HeartbeatResponse + var rsp api.HeartbeatResponse if err := json.NewDecoder(httpRsp.Body).Decode(&rsp); err != nil { return nil, err } @@ -120,7 +120,7 @@ func (w *Worker) Run(ctx context.Context) error { errStr := fmt.Sprintf("job %s failed: %v", spec.Job.ID, err) w.log.Debug("job failed", zap.String("job_id", spec.Job.ID.String()), zap.Error(err)) - w.jobFinished(&proto.JobResult{ID: spec.Job.ID, Error: &errStr}) + w.jobFinished(&api.JobResult{ID: spec.Job.ID, Error: &errStr}) continue } diff --git a/lrucache/lru.go b/lrucache/lru.go index 570c32b..4a8c074 100644 --- a/lrucache/lru.go +++ b/lrucache/lru.go @@ -2,6 +2,80 @@ package lrucache -func New(cap int) Cache { - panic("implement me") +import ( + "container/list" +) + +type Var struct { + key int + value int +} + +type LRUCache struct { + data map[int]*list.Element + queue *list.List + capacity int + size int +} + +func (cache *LRUCache) Set(key, value int) { + if cache.capacity == 0 { + return + } + + if v, ok := cache.data[key]; !ok { + + if cache.capacity == cache.size { + oldest := cache.queue.Back().Value.(*Var) + delete(cache.data, oldest.key) + + cache.queue.Remove(cache.queue.Back()) + cache.queue.PushFront(&Var{key, value}) + cache.data[key] = cache.queue.Front() + } else { + cache.queue.PushFront(&Var{key, value}) + cache.data[key] = cache.queue.Front() + cache.size++ + } + } else { + cache.queue.MoveToFront(v) + cache.queue.Front().Value.(*Var).value = value + } +} + +func (cache *LRUCache) Get(key int) (value int, has bool) { + val, has := cache.data[key] + if !has { + return + } + + cache.queue.MoveToFront(val) + return val.Value.(*Var).value, has +} + +func (cache *LRUCache) Clear() { + cache.size = 0 + cache.queue = list.New() + cache.data = make(map[int]*list.Element, cache.capacity) +} + +func (cache *LRUCache) Range(f func(key, value int) bool) { + for e := cache.queue.Back(); e != nil; e = e.Prev() { + elem := e.Value.(*Var) + if !f(elem.key, elem.value) { + return + } + } +} + +func (cache *LRUCache) Init(cap int) *LRUCache { + cache.data = make(map[int]*list.Element, cache.capacity) + cache.queue = list.New() + cache.capacity = cap + cache.size = 0 + return cache +} + +func New(cap int) Cache { + return new(LRUCache).Init(cap) }