Wait for source upload

This commit is contained in:
Fedor Korotkiy 2020-04-05 00:13:45 +03:00
parent 6900c33441
commit d4e3705be3
11 changed files with 204 additions and 60 deletions

View file

@ -63,9 +63,11 @@ func TestJobCaching(t *testing.T) {
// Second build must get results from cache.
require.NoError(t, env.Client.Build(env.Ctx, graph, NewRecorder()))
require.NoError(t, ioutil.WriteFile(tmpFile.Name(), []byte("NOTOK\n"), 0666))
output, err := ioutil.ReadAll(tmpFile)
require.NoError(t, err)
require.Equal(t, []byte("OK\n"), output)
require.Equal(t, []byte("NOTOK\n"), output)
}
var sourceFilesGraph = build.Graph{

View file

@ -14,13 +14,13 @@ import (
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
type Client struct {
type BuildClient struct {
l *zap.Logger
endpoint string
}
func NewClient(l *zap.Logger, endpoint string) *Client {
return &Client{
func NewBuildClient(l *zap.Logger, endpoint string) *BuildClient {
return &BuildClient{
l: l,
endpoint: endpoint,
}
@ -43,7 +43,7 @@ func (r *statusReader) Next() (*StatusUpdate, error) {
return &u, nil
}
func (c *Client) StartBuild(ctx context.Context, request *BuildRequest) (*BuildStarted, StatusReader, error) {
func (c *BuildClient) StartBuild(ctx context.Context, request *BuildRequest) (*BuildStarted, StatusReader, error) {
reqJSON, err := json.Marshal(request)
if err != nil {
return nil, nil, err
@ -85,7 +85,7 @@ func (c *Client) StartBuild(ctx context.Context, request *BuildRequest) (*BuildS
return &started, r, nil
}
func (c *Client) SignalBuild(ctx context.Context, buildID build.ID, signal *SignalRequest) (*SignalResponse, error) {
func (c *BuildClient) SignalBuild(ctx context.Context, buildID build.ID, signal *SignalRequest) (*SignalResponse, error) {
signalJSON, err := json.Marshal(signal)
if err != nil {
return nil, err

View file

@ -11,41 +11,53 @@ import (
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
func NewServiceHandler(l *zap.Logger, s Service) *ServiceHandler {
return &ServiceHandler{
func NewBuildService(l *zap.Logger, s Service) *BuildHandler {
return &BuildHandler{
l: l,
s: s,
}
}
type ServiceHandler struct {
type BuildHandler struct {
l *zap.Logger
s Service
}
func (s *ServiceHandler) Register(mux *http.ServeMux) {
mux.HandleFunc("/build", s.build)
mux.HandleFunc("/signal", s.signal)
func (h *BuildHandler) Register(mux *http.ServeMux) {
mux.HandleFunc("/build", h.build)
mux.HandleFunc("/signal", h.signal)
}
type statusWriter struct {
id build.ID
h *BuildHandler
written bool
w http.ResponseWriter
flush http.Flusher
enc *json.Encoder
}
func (w *statusWriter) Started(rsp *BuildStarted) error {
w.id = rsp.ID
w.written = true
w.h.l.Debug("build started", zap.String("build_id", w.id.String()), zap.Any("started", rsp))
w.w.Header().Set("content-type", "application/json")
w.w.WriteHeader(http.StatusOK)
defer w.flush.Flush()
return w.enc.Encode(rsp)
}
func (w *statusWriter) Updated(update *StatusUpdate) error {
w.h.l.Debug("build updated", zap.String("build_id", w.id.String()), zap.Any("update", update))
defer w.flush.Flush()
return w.enc.Encode(update)
}
func (s *ServiceHandler) doBuild(w http.ResponseWriter, r *http.Request) error {
func (h *BuildHandler) doBuild(w http.ResponseWriter, r *http.Request) error {
reqJSON, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
@ -56,8 +68,13 @@ func (s *ServiceHandler) doBuild(w http.ResponseWriter, r *http.Request) error {
return err
}
sw := &statusWriter{w: w, enc: json.NewEncoder(w)}
err = s.s.StartBuild(r.Context(), &req, sw)
flush, ok := w.(http.Flusher)
if !ok {
return fmt.Errorf("response writer does not implement http.Flusher")
}
sw := &statusWriter{h: h, w: w, enc: json.NewEncoder(w), flush: flush}
err = h.s.StartBuild(r.Context(), &req, sw)
if err != nil {
if sw.written {
@ -71,14 +88,14 @@ func (s *ServiceHandler) doBuild(w http.ResponseWriter, r *http.Request) error {
return nil
}
func (s *ServiceHandler) build(w http.ResponseWriter, r *http.Request) {
if err := s.doBuild(w, r); err != nil {
func (h *BuildHandler) build(w http.ResponseWriter, r *http.Request) {
if err := h.doBuild(w, r); err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "%v", err)
}
}
func (s *ServiceHandler) doSignal(w http.ResponseWriter, r *http.Request) error {
func (h *BuildHandler) doSignal(w http.ResponseWriter, r *http.Request) error {
buildIDParam := r.URL.Query().Get("build_id")
if buildIDParam == "" {
return fmt.Errorf(`"build_id" parameter is missing`)
@ -99,7 +116,7 @@ func (s *ServiceHandler) doSignal(w http.ResponseWriter, r *http.Request) error
return err
}
rsp, err := s.s.SignalBuild(r.Context(), buildID, &req)
rsp, err := h.s.SignalBuild(r.Context(), buildID, &req)
if err != nil {
return err
}
@ -115,8 +132,10 @@ func (s *ServiceHandler) doSignal(w http.ResponseWriter, r *http.Request) error
return nil
}
func (s *ServiceHandler) signal(w http.ResponseWriter, r *http.Request) {
if err := s.doSignal(w, r); err != nil {
func (h *BuildHandler) signal(w http.ResponseWriter, r *http.Request) {
if err := h.doSignal(w, r); err != nil {
h.l.Warn("build signal failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "%v", err)
}

View file

@ -23,7 +23,7 @@ type env struct {
ctrl *gomock.Controller
mock *mock.MockService
server *httptest.Server
client *api.Client
client *api.BuildClient
}
func (e *env) stop() {
@ -40,12 +40,12 @@ func newEnv(t *testing.T) (*env, func()) {
mux := http.NewServeMux()
handler := api.NewServiceHandler(log, env.mock)
handler := api.NewBuildService(log, env.mock)
handler.Register(mux)
env.server = httptest.NewServer(mux)
env.client = api.NewClient(log, env.server.URL)
env.client = api.NewBuildClient(log, env.server.URL)
return env, env.stop
}
@ -128,3 +128,34 @@ func TestBuildRunning(t *testing.T) {
_, err = r.Next()
require.Equal(t, err, io.EOF)
}
func TestBuildResultsStreaming(t *testing.T) {
// Test is hanging?
// See https://golang.org/pkg/net/http/#Flusher
env, stop := newEnv(t)
defer stop()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
buildID := build.ID{02}
req := &api.BuildRequest{}
started := &api.BuildStarted{ID: buildID}
env.mock.EXPECT().StartBuild(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, req *api.BuildRequest, w api.StatusWriter) error {
if err := w.Started(started); err != nil {
return err
}
<-ctx.Done()
return ctx.Err()
})
rsp, _, err := env.client.StartBuild(ctx, req)
require.NoError(t, err)
require.Equal(t, started, rsp)
}

View file

@ -58,8 +58,12 @@ type HeartbeatRequest struct {
// JobSpec описывает джоб, который нужно запустить.
type JobSpec struct {
// SourceFiles задаёт список файлов, который должны присутсововать в директории с исходным кодом при запуске этого джоба.
SourceFiles map[build.ID]string
// Artifacts задаёт воркеров, с которых можно скачать артефакты необходимые этом джобу.
Artifacts map[build.ID]WorkerID
Job build.Job
}

View file

@ -1,2 +1,12 @@
# client
Пакет `client` реализует клиента системы распределённой сборки. Клиент запускается локально, и имеет доступ к
директории с исходным кодом.
Клиент получает на вход `build.Graph` и запускает сборку на координаторе.
После того, как координатор создал новую сборку, клиент заливает недостающие файлы и посылает сигнал о завершении стадии заливки.
После этого, клиент следит за прогрессом сборки, дожидается завершения и выходит.
Клиент тестируется интеграционными тестами из пакета `disttest`.

View file

@ -14,7 +14,7 @@ import (
type Client struct {
l *zap.Logger
client *api.Client
client *api.BuildClient
cache *filecache.Client
sourceDir string
}
@ -26,7 +26,7 @@ func NewClient(
) *Client {
return &Client{
l: l,
client: api.NewClient(l, apiEndpoint),
client: api.NewBuildClient(l, apiEndpoint),
cache: filecache.NewClient(l, apiEndpoint),
sourceDir: sourceDir,
}
@ -68,6 +68,12 @@ func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener
return err
}
uploadDone := &api.SignalRequest{UploadDone: &api.UploadDone{}}
_, err = c.client.SignalBuild(ctx, started.ID, uploadDone)
if err != nil {
return err
}
for {
u, err := r.Next()
if err == io.EOF {

5
distbuild/pkg/dist/README.md vendored Normal file
View file

@ -0,0 +1,5 @@
# dist
Пакет `dist` реализует координатора системы распределённой сборки.
Основная функциональность воркера тестируется интеграционными тестами из пакета `disttest`.

View file

@ -2,6 +2,9 @@ package dist
import (
"context"
"fmt"
"go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/api"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
@ -11,26 +14,74 @@ type Build struct {
ID build.ID
Graph *build.Graph
coordinator *Coordinator
uploadComplete chan struct{}
l *zap.Logger
c *Coordinator
uploadDone chan struct{}
}
func NewBuild(graph *build.Graph, coordinator *Coordinator) *Build {
func NewBuild(graph *build.Graph, c *Coordinator) *Build {
id := build.NewID()
return &Build{
ID: id,
Graph: graph,
coordinator: coordinator,
uploadComplete: make(chan struct{}),
l: c.log.With(zap.String("build_id", id.String())),
c: c,
uploadDone: make(chan struct{}),
}
}
func (b *Build) Run(ctx context.Context, onStatusUpdate func(update api.StatusUpdate) error) error {
panic("implement me")
func (b *Build) Run(ctx context.Context, w api.StatusWriter) error {
if err := w.Started(&api.BuildStarted{ID: b.ID}); err != nil {
return err
}
b.l.Debug("waiting for file upload")
select {
case <-ctx.Done():
return ctx.Err()
case <-b.uploadDone:
}
b.l.Debug("file upload completed")
for _, job := range b.Graph.Jobs {
job := job
s := b.c.scheduler.ScheduleJob(&job)
select {
case <-ctx.Done():
return ctx.Err()
case <-s.Finished:
}
b.l.Debug("job finished", zap.String("job_id", job.ID.String()))
jobFinished := api.StatusUpdate{JobFinished: s.Result}
if err := w.Updated(&jobFinished); err != nil {
return err
}
}
finished := api.StatusUpdate{BuildFinished: &api.BuildFinished{}}
return w.Updated(&finished)
}
func (b *Build) UploadComplete() {
close(b.uploadComplete)
func (b *Build) Signal(ctx context.Context, req *api.SignalRequest) (*api.SignalResponse, error) {
switch {
case req.UploadDone != nil:
select {
case <-b.uploadDone:
return nil, fmt.Errorf("upload already done")
default:
close(b.uploadDone)
}
default:
return nil, fmt.Errorf("unexpected signal kind")
}
return &api.SignalResponse{}, nil
}

View file

@ -43,12 +43,15 @@ func NewCoordinator(
scheduler: scheduler.NewScheduler(log, defaultConfig),
}
apiHandler := api.NewServiceHandler(log, c)
apiHandler := api.NewBuildService(log, c)
apiHandler.Register(c.mux)
heartbeatHandler := api.NewHeartbeatHandler(log, c)
heartbeatHandler.Register(c.mux)
fileHandler := filecache.NewHandler(log, c.fileCache)
fileHandler.Register(c.mux)
return c
}
@ -56,36 +59,43 @@ func (c *Coordinator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c.mux.ServeHTTP(w, r)
}
func (c *Coordinator) addBuild(b *Build) {
c.mu.Lock()
defer c.mu.Unlock()
c.builds[b.ID] = b
}
func (c *Coordinator) removeBuild(b *Build) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.builds, b.ID)
}
func (c *Coordinator) getBuild(id build.ID) *Build {
c.mu.Lock()
defer c.mu.Unlock()
return c.builds[id]
}
func (c *Coordinator) StartBuild(ctx context.Context, req *api.BuildRequest, w api.StatusWriter) error {
if err := w.Started(&api.BuildStarted{}); err != nil {
return err
}
b := NewBuild(&req.Graph, c)
for _, job := range req.Graph.Jobs {
job := job
c.addBuild(b)
defer c.removeBuild(b)
s := c.scheduler.ScheduleJob(&job)
select {
case <-ctx.Done():
return ctx.Err()
case <-s.Finished:
}
c.log.Debug("job finished", zap.String("job_id", job.ID.String()))
jobFinished := api.StatusUpdate{JobFinished: s.Result}
if err := w.Updated(&jobFinished); err != nil {
return err
}
}
finished := api.StatusUpdate{BuildFinished: &api.BuildFinished{}}
return w.Updated(&finished)
return b.Run(ctx, w)
}
func (c *Coordinator) SignalBuild(ctx context.Context, buildID build.ID, signal *api.SignalRequest) (*api.SignalResponse, error) {
return nil, fmt.Errorf("signal build: not implemented")
b := c.getBuild(buildID)
if b == nil {
return nil, fmt.Errorf("build %q not found", buildID)
}
return b.Signal(ctx, signal)
}
func (c *Coordinator) Heartbeat(ctx context.Context, req *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {

View file

@ -0,0 +1,6 @@
# worker
Пакет `worker` реализует воркера в системе распределённой сборки. Воркер ходит с heartbeat-ами
к координатору, получает с него джобы, выполняет их и посылает результаты назад на координатор.
Основная функциональность воркера тестируется интеграционными тестами из пакета `disttest`.