Use api for communicating with client.
This commit is contained in:
parent
b97b6e9a0f
commit
6e5c478920
3 changed files with 50 additions and 94 deletions
|
@ -67,11 +67,10 @@ func newEnv(t *testing.T) (e *env, cancel func()) {
|
||||||
var cancelRootContext func()
|
var cancelRootContext func()
|
||||||
env.Ctx, cancelRootContext = context.WithCancel(context.Background())
|
env.Ctx, cancelRootContext = context.WithCancel(context.Background())
|
||||||
|
|
||||||
env.Client = &client.Client{
|
env.Client = client.NewClient(
|
||||||
CoordinatorEndpoint: coordinatorEndpoint,
|
env.Logger.Named("client"),
|
||||||
SourceDir: filepath.Join(absCWD, "testdata/src"),
|
coordinatorEndpoint,
|
||||||
Log: env.Logger.Named("client"),
|
filepath.Join(absCWD, "testdata/src"))
|
||||||
}
|
|
||||||
|
|
||||||
coordinatorCache, err := filecache.New(filepath.Join(env.RootDir, "coordinator", "filecache"))
|
coordinatorCache, err := filecache.New(filepath.Join(env.RootDir, "coordinator", "filecache"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -1,12 +1,9 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io"
|
||||||
"net/http"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
@ -15,9 +12,21 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
CoordinatorEndpoint string
|
l *zap.Logger
|
||||||
SourceDir string
|
client *api.Client
|
||||||
Log *zap.Logger
|
sourceDir string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClient(
|
||||||
|
l *zap.Logger,
|
||||||
|
apiEndpoint string,
|
||||||
|
sourceDir string,
|
||||||
|
) *Client {
|
||||||
|
return &Client{
|
||||||
|
l: l,
|
||||||
|
client: &api.Client{Endpoint: apiEndpoint},
|
||||||
|
sourceDir: sourceDir,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type BuildListener interface {
|
type BuildListener interface {
|
||||||
|
@ -28,62 +37,39 @@ type BuildListener interface {
|
||||||
OnJobFailed(jobID build.ID, code int, error string) error
|
OnJobFailed(jobID build.ID, code int, error string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) uploadSources(ctx context.Context, src api.BuildStarted) error {
|
func (c *Client) uploadSources(ctx context.Context, started *api.BuildStarted) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener) error {
|
func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener) error {
|
||||||
graphJS, err := json.Marshal(graph)
|
started, r, err := c.client.StartBuild(ctx, &api.BuildRequest{Graph: graph})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", c.CoordinatorEndpoint+"/build", bytes.NewBuffer(graphJS))
|
c.l.Debug("build started", zap.String("build_id", started.ID.String()))
|
||||||
if err != nil {
|
if err := c.uploadSources(ctx, started); err != nil {
|
||||||
return err
|
|
||||||
}
|
|
||||||
req.Header.Add("Content-Type", "application/json")
|
|
||||||
req = req.WithContext(ctx)
|
|
||||||
|
|
||||||
c.Log.Debug("sending build request", zap.String("url", req.URL.String()))
|
|
||||||
|
|
||||||
rsp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("build failed: %w", err)
|
|
||||||
}
|
|
||||||
defer rsp.Body.Close()
|
|
||||||
|
|
||||||
if rsp.StatusCode != 200 {
|
|
||||||
errorMsg, _ := ioutil.ReadAll(rsp.Body)
|
|
||||||
return fmt.Errorf("build failed: %s", errorMsg)
|
|
||||||
}
|
|
||||||
|
|
||||||
d := json.NewDecoder(rsp.Body)
|
|
||||||
|
|
||||||
var missing api.BuildStarted
|
|
||||||
if err := d.Decode(&missing); err != nil {
|
|
||||||
return fmt.Errorf("error receiving source list: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.uploadSources(ctx, missing); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
var update api.StatusUpdate
|
u, err := r.Next()
|
||||||
if err := d.Decode(&update); err != nil {
|
if err == io.EOF {
|
||||||
return fmt.Errorf("error receiving status update: %w", err)
|
return fmt.Errorf("unexpected end of status stream")
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.l.Debug("received status update", zap.String("build_id", started.ID.String()), zap.Any("update", u))
|
||||||
switch {
|
switch {
|
||||||
case update.BuildFailed != nil:
|
case u.BuildFailed != nil:
|
||||||
return fmt.Errorf("build failed: %s", update.BuildFailed.Error)
|
return fmt.Errorf("build failed: %s", u.BuildFailed.Error)
|
||||||
|
|
||||||
case update.BuildFinished != nil:
|
case u.BuildFinished != nil:
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
case update.JobFinished != nil:
|
case u.JobFinished != nil:
|
||||||
jf := update.JobFinished
|
jf := u.JobFinished
|
||||||
|
|
||||||
if jf.Stdout != nil {
|
if jf.Stdout != nil {
|
||||||
if err := lsn.OnJobStdout(jf.ID, jf.Stdout); err != nil {
|
if err := lsn.OnJobStdout(jf.ID, jf.Stdout); err != nil {
|
||||||
|
|
59
distbuild/pkg/dist/coordinator.go
vendored
59
distbuild/pkg/dist/coordinator.go
vendored
|
@ -1,9 +1,9 @@
|
||||||
package dist
|
package dist
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -44,8 +44,9 @@ func NewCoordinator(
|
||||||
scheduler: scheduler.NewScheduler(log, defaultConfig),
|
scheduler: scheduler.NewScheduler(log, defaultConfig),
|
||||||
}
|
}
|
||||||
|
|
||||||
c.mux.HandleFunc("/build", c.Build)
|
apiHandler := api.NewServiceHandler(log, c)
|
||||||
c.mux.HandleFunc("/signal", c.Signal)
|
apiHandler.Register(c.mux)
|
||||||
|
|
||||||
c.mux.HandleFunc("/heartbeat", c.Heartbeat)
|
c.mux.HandleFunc("/heartbeat", c.Heartbeat)
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
@ -54,66 +55,36 @@ func (c *Coordinator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
c.mux.ServeHTTP(w, r)
|
c.mux.ServeHTTP(w, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Coordinator) doBuild(w http.ResponseWriter, r *http.Request) error {
|
func (c *Coordinator) StartBuild(ctx context.Context, req *api.BuildRequest, w api.StatusWriter) error {
|
||||||
graphJS, err := ioutil.ReadAll(r.Body)
|
if err := w.Started(&api.BuildStarted{}); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var g build.Graph
|
for _, job := range req.Graph.Jobs {
|
||||||
if err := json.Unmarshal(graphJS, &g); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
enc := json.NewEncoder(w)
|
|
||||||
if err := enc.Encode(api.BuildStarted{}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, job := range g.Jobs {
|
|
||||||
job := job
|
job := job
|
||||||
|
|
||||||
s := c.scheduler.ScheduleJob(&job)
|
s := c.scheduler.ScheduleJob(&job)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-r.Context().Done():
|
case <-ctx.Done():
|
||||||
return r.Context().Err()
|
return ctx.Err()
|
||||||
case <-s.Finished:
|
case <-s.Finished:
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log.Debug("job finished", zap.String("job_id", job.ID.String()))
|
c.log.Debug("job finished", zap.String("job_id", job.ID.String()))
|
||||||
|
|
||||||
update := api.StatusUpdate{JobFinished: s.Result}
|
jobFinished := api.StatusUpdate{JobFinished: s.Result}
|
||||||
if err := enc.Encode(update); err != nil {
|
if err := w.Updated(&jobFinished); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
update := api.StatusUpdate{BuildFinished: &api.BuildFinished{}}
|
finished := api.StatusUpdate{BuildFinished: &api.BuildFinished{}}
|
||||||
return enc.Encode(update)
|
return w.Updated(&finished)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Coordinator) Signal(w http.ResponseWriter, r *http.Request) {
|
func (c *Coordinator) SignalBuild(ctx context.Context, buildID build.ID, signal *api.SignalRequest) (*api.SignalResponse, error) {
|
||||||
c.log.Debug("build signal started")
|
panic("implement me")
|
||||||
if err := c.doHeartbeat(w, r); err != nil {
|
|
||||||
c.log.Error("build signal failed", zap.Error(err))
|
|
||||||
|
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
|
||||||
_, _ = w.Write([]byte(err.Error()))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.log.Debug("build signal finished")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Coordinator) Build(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if err := c.doBuild(w, r); err != nil {
|
|
||||||
c.log.Error("build failed", zap.Error(err))
|
|
||||||
|
|
||||||
errorUpdate := api.StatusUpdate{BuildFailed: &api.BuildFailed{Error: err.Error()}}
|
|
||||||
errorJS, _ := json.Marshal(errorUpdate)
|
|
||||||
_, _ = w.Write(errorJS)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Coordinator) doHeartbeat(w http.ResponseWriter, r *http.Request) error {
|
func (c *Coordinator) doHeartbeat(w http.ResponseWriter, r *http.Request) error {
|
||||||
|
|
Loading…
Reference in a new issue