Finish distbuild/api

This commit is contained in:
Fedor Korotkiy 2020-04-04 19:19:01 +03:00
parent 3f5dd7b6cc
commit f221388f5c
10 changed files with 269 additions and 29 deletions

View file

@ -0,0 +1,32 @@
# api
Пакет api реализует протокол, по которому общаются компоненты системы.
Этот пакет не занимается передачей файлов и артефактов, соответствующие функции находятся в
пакетах `filecache` и `artifact`.
## Worker <-> Coordinator
- Worker и Сoordinator общаются через один запрос `POST /heartbeat`.
- Worker посылает `HeartbeatRequest` и получает в ответ `HeartbeatResponse`.
- Запрос и ответ передаются в формате json.
- Ошибка обработки heartbeat передаётся как текстовая строка.
## Client <-> Coordinator
Client и Coordinator общаются через два вызова.
- `POST /build` - стартует новый билд.
* Client посылает в Body запроса json c описанием сборки.
* Coordinator стримит в body ответа json сообщения описывающие прогресс сборки.
* Первым сообщением в ответе Coordinator присылает `buildID`.
* _Тут можно было бы использовать websocket, но нас устраивает более простое решение._
- `POST /signal?build_id=12345` - посылает сигнал бегущему билду.
* Запрос и ответ передаются в формате json.
# Замечания
- Конструкторы клиентов и хендлеров принимают первым параметром `*zap.Logger`. Запишите в лог события
получения/отправки запроса и все ошибки. Это поможет вам отлаживать интеграционные тесты
в следующей части задания.

View file

@ -9,11 +9,21 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
) )
type Client struct { type Client struct {
Endpoint string l *zap.Logger
endpoint string
}
func NewClient(l *zap.Logger, endpoint string) *Client {
return &Client{
l: l,
endpoint: endpoint,
}
} }
type statusReader struct { type statusReader struct {
@ -39,7 +49,7 @@ func (c *Client) StartBuild(ctx context.Context, request *BuildRequest) (*BuildS
return nil, nil, err return nil, nil, err
} }
req, err := http.NewRequest("POST", c.Endpoint+"/build", bytes.NewBuffer(reqJSON)) req, err := http.NewRequest("POST", c.endpoint+"/build", bytes.NewBuffer(reqJSON))
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -81,7 +91,7 @@ func (c *Client) SignalBuild(ctx context.Context, buildID build.ID, signal *Sign
return nil, err return nil, err
} }
req, err := http.NewRequest("POST", c.Endpoint+"/signal?build_id="+buildID.String(), bytes.NewBuffer(signalJSON)) req, err := http.NewRequest("POST", c.endpoint+"/signal?build_id="+buildID.String(), bytes.NewBuffer(signalJSON))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -45,7 +45,7 @@ func newEnv(t *testing.T) (*env, func()) {
env.server = httptest.NewServer(mux) env.server = httptest.NewServer(mux)
env.client = &api.Client{Endpoint: env.server.URL} env.client = api.NewClient(log, env.server.URL)
return env, env.stop return env, env.stop
} }

View file

@ -2,9 +2,6 @@ package api
import ( import (
"context" "context"
"net/http"
"go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
) )
@ -73,24 +70,3 @@ type HeartbeatResponse struct {
type HeartbeatService interface { type HeartbeatService interface {
Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error)
} }
type HeartbeatClient struct {
Endpoint string
}
func (c *HeartbeatClient) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) {
panic("implement me")
}
type HeartbeatHandler struct {
l *zap.Logger
s HeartbeatService
}
func (h *HeartbeatHandler) Register(mux *http.ServeMux) {
mux.HandleFunc("/heartbeat", h.heartbeat)
}
func (h *HeartbeatHandler) heartbeat(w http.ResponseWriter, r *http.Request) {
panic("implement me")
}

View file

@ -0,0 +1,56 @@
package api
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"go.uber.org/zap"
)
type HeartbeatClient struct {
l *zap.Logger
endpoint string
}
func NewHeartbeatClient(l *zap.Logger, endpoint string) *HeartbeatClient {
return &HeartbeatClient{l: l, endpoint: endpoint}
}
func (c *HeartbeatClient) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) {
reqJSON, err := json.Marshal(req)
if err != nil {
return nil, err
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint+"/heartbeat", bytes.NewBuffer(reqJSON))
if err != nil {
return nil, err
}
httpRsp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return nil, err
}
defer httpRsp.Body.Close()
if httpRsp.StatusCode != http.StatusOK {
errorMsg, _ := ioutil.ReadAll(httpRsp.Body)
return nil, fmt.Errorf("heartbeat failed: %s", errorMsg)
}
rspJSON, err := ioutil.ReadAll(httpRsp.Body)
if err != nil {
return nil, err
}
rsp := &HeartbeatResponse{}
if err = json.Unmarshal(rspJSON, rsp); err != nil {
return nil, err
}
return rsp, nil
}

View file

@ -0,0 +1,60 @@
package api
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"go.uber.org/zap"
)
type HeartbeatHandler struct {
l *zap.Logger
s HeartbeatService
}
func NewHeartbeatHandler(l *zap.Logger, s HeartbeatService) *HeartbeatHandler {
return &HeartbeatHandler{l: l, s: s}
}
func (h *HeartbeatHandler) Register(mux *http.ServeMux) {
mux.HandleFunc("/heartbeat", h.heartbeat)
}
func (h *HeartbeatHandler) doHeartbeat(w http.ResponseWriter, r *http.Request) error {
reqJSON, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
var req HeartbeatRequest
if err := json.Unmarshal(reqJSON, &req); err != nil {
return err
}
h.l.Debug("heartbeat started", zap.Any("req", req))
rsp, err := h.s.Heartbeat(r.Context(), &req)
if err != nil {
return err
}
h.l.Debug("heartbeat finished", zap.Any("rsp", rsp))
rspJSON, err := json.Marshal(rsp)
if err != nil {
return err
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(rspJSON)
return nil
}
func (h *HeartbeatHandler) heartbeat(w http.ResponseWriter, r *http.Request) {
if err := h.doHeartbeat(w, r); err != nil {
h.l.Warn("heartbeat error", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "%v", err)
}
}

View file

@ -0,0 +1,56 @@
package api_test
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"gitlab.com/slon/shad-go/distbuild/pkg/api"
"gitlab.com/slon/shad-go/distbuild/pkg/api/mock"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
//go:generate mockgen -package mock -destination mock/heartbeat.go . HeartbeatService
func TestHeartbeat(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
l := zaptest.NewLogger(t)
m := mock.NewMockHeartbeatService(ctrl)
mux := http.NewServeMux()
api.NewHeartbeatHandler(l, m).Register(mux)
server := httptest.NewServer(mux)
defer server.Close()
client := api.NewHeartbeatClient(l, server.URL)
req := &api.HeartbeatRequest{
WorkerID: "worker0",
}
rsp := &api.HeartbeatResponse{
JobsToRun: map[build.ID]api.JobSpec{
{0x01}: {Job: build.Job{Name: "cc a.c"}},
},
}
gomock.InOrder(
m.EXPECT().Heartbeat(gomock.Any(), gomock.Eq(req)).Times(1).Return(rsp, nil),
m.EXPECT().Heartbeat(gomock.Any(), gomock.Eq(req)).Times(1).Return(nil, fmt.Errorf("build error: foo bar")),
)
clientRsp, err := client.Heartbeat(context.Background(), req)
require.NoError(t, err)
require.Equal(t, rsp, clientRsp)
_, err = client.Heartbeat(context.Background(), req)
require.Error(t, err)
require.Contains(t, err.Error(), "build error: foo bar")
}

View file

@ -0,0 +1,50 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: gitlab.com/slon/shad-go/distbuild/pkg/api (interfaces: HeartbeatService)
// Package mock is a generated GoMock package.
package mock
import (
context "context"
gomock "github.com/golang/mock/gomock"
api "gitlab.com/slon/shad-go/distbuild/pkg/api"
reflect "reflect"
)
// MockHeartbeatService is a mock of HeartbeatService interface
type MockHeartbeatService struct {
ctrl *gomock.Controller
recorder *MockHeartbeatServiceMockRecorder
}
// MockHeartbeatServiceMockRecorder is the mock recorder for MockHeartbeatService
type MockHeartbeatServiceMockRecorder struct {
mock *MockHeartbeatService
}
// NewMockHeartbeatService creates a new mock instance
func NewMockHeartbeatService(ctrl *gomock.Controller) *MockHeartbeatService {
mock := &MockHeartbeatService{ctrl: ctrl}
mock.recorder = &MockHeartbeatServiceMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockHeartbeatService) EXPECT() *MockHeartbeatServiceMockRecorder {
return m.recorder
}
// Heartbeat mocks base method
func (m *MockHeartbeatService) Heartbeat(arg0 context.Context, arg1 *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Heartbeat", arg0, arg1)
ret0, _ := ret[0].(*api.HeartbeatResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Heartbeat indicates an expected call of Heartbeat
func (mr *MockHeartbeatServiceMockRecorder) Heartbeat(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Heartbeat", reflect.TypeOf((*MockHeartbeatService)(nil).Heartbeat), arg0, arg1)
}

View file

@ -24,7 +24,7 @@ func NewClient(
) *Client { ) *Client {
return &Client{ return &Client{
l: l, l: l,
client: &api.Client{Endpoint: apiEndpoint}, client: &api.Client{endpoint: apiEndpoint},
sourceDir: sourceDir, sourceDir: sourceDir,
} }
} }