Switch to heartbeat client

This commit is contained in:
Fedor Korotkiy 2020-04-05 00:36:53 +03:00
parent 106ac182ad
commit 6224193cac
2 changed files with 18 additions and 36 deletions

View file

@ -14,6 +14,8 @@ type Build struct {
ID build.ID ID build.ID
Graph *build.Graph Graph *build.Graph
reverseFiles map[string]build.ID
l *zap.Logger l *zap.Logger
c *Coordinator c *Coordinator
uploadDone chan struct{} uploadDone chan struct{}
@ -26,6 +28,8 @@ func NewBuild(graph *build.Graph, c *Coordinator) *Build {
ID: id, ID: id,
Graph: graph, Graph: graph,
reverseFiles: make(map[string]build.ID),
l: c.log.With(zap.String("build_id", id.String())), l: c.log.With(zap.String("build_id", id.String())),
c: c, c: c,
uploadDone: make(chan struct{}), uploadDone: make(chan struct{}),
@ -34,9 +38,12 @@ func NewBuild(graph *build.Graph, c *Coordinator) *Build {
func (b *Build) missingFiles() []build.ID { func (b *Build) missingFiles() []build.ID {
var files []build.ID var files []build.ID
for id := range b.Graph.SourceFiles {
for id, path := range b.Graph.SourceFiles {
files = append(files, id) files = append(files, id)
b.reverseFiles[path] = id
} }
return files return files
} }

View file

@ -1,11 +1,8 @@
package worker package worker
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"net/http" "net/http"
"sync" "sync"
@ -28,6 +25,9 @@ type Worker struct {
mux *http.ServeMux mux *http.ServeMux
fileClient *filecache.Client
heartbeat *api.HeartbeatClient
mu sync.Mutex mu sync.Mutex
newArtifacts []build.ID newArtifacts []build.ID
newSources []build.ID newSources []build.ID
@ -45,8 +45,12 @@ func New(
id: workerID, id: workerID,
coordinatorEndpoint: coordinatorEndpoint, coordinatorEndpoint: coordinatorEndpoint,
log: log, log: log,
fileCache: fileCache,
artifacts: artifacts, fileCache: fileCache,
artifacts: artifacts,
fileClient: filecache.NewClient(log, coordinatorEndpoint),
heartbeat: api.NewHeartbeatClient(log, coordinatorEndpoint),
mux: http.NewServeMux(), mux: http.NewServeMux(),
} }
@ -63,35 +67,6 @@ func (w *Worker) recover() error {
}) })
} }
func (w *Worker) sendHeartbeat(ctx context.Context, req *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {
reqJS, err := json.Marshal(req)
if err != nil {
return nil, err
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", w.coordinatorEndpoint+"/heartbeat", bytes.NewBuffer(reqJS))
if err != nil {
return nil, err
}
httpRsp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return nil, err
}
if httpRsp.StatusCode != http.StatusOK {
errorString, _ := ioutil.ReadAll(httpRsp.Body)
return nil, fmt.Errorf("heartbeat failed: %s", errorString)
}
var rsp api.HeartbeatResponse
if err := json.NewDecoder(httpRsp.Body).Decode(&rsp); err != nil {
return nil, err
}
return &rsp, nil
}
func (w *Worker) Run(ctx context.Context) error { func (w *Worker) Run(ctx context.Context) error {
if err := w.recover(); err != nil { if err := w.recover(); err != nil {
return err return err
@ -99,7 +74,7 @@ func (w *Worker) Run(ctx context.Context) error {
for { for {
w.log.Debug("sending heartbeat request") w.log.Debug("sending heartbeat request")
rsp, err := w.sendHeartbeat(ctx, w.buildHeartbeat()) rsp, err := w.heartbeat.Heartbeat(ctx, w.buildHeartbeat())
if err != nil { if err != nil {
if ctx.Err() != nil { if ctx.Err() != nil {
return ctx.Err() return ctx.Err()