This commit is contained in:
Fedor Korotkiy 2020-03-10 15:08:59 +03:00
parent 4be9a5a982
commit bb446505a8
9 changed files with 526 additions and 2 deletions

View file

@ -108,9 +108,119 @@ type Cmd struct {
3. Воркеры начинают выполнять вершины графа, пересылая друг другу выходные директории джобов.
4. Результаты работы джобов скачиваются на клиента.
## Протокол: Клиент <-> Координатор
## Протоколы
## Протокол: Координатор <-> Воркер
Общение между компонентами будет происходить поверх HTTP и json. В реальной системе мы бы
взяли более продвинутый протокол и более эффективный формат сериализации, но в этой учебной
задаче нам важнее уменьшить сложность системы.
### Протокол: Клиент <-> Координатор
При общении клиента и кординатора, клиент всегда выступает инициатором запроса.
* `POST /build` - стартует новый билд. Клиент посылает в Body запроса json c описанием сборки. Сервер
стримит в body ответа json сообщения описывающие прогресс сборки (тут правильнее было бы использовать
websocket, но нас устраивает более простое решение).
* `POST /source/{sha1}` - загружает файл с исходным кодом на координатор. Клиент посылает
содержимое файла в body запроса. `{sha1}` - равен `{sha1}` хешу от содержимого файла.
### Протокол: Координатор <-> Воркер
При общении воркера и координатора, воркер всегда выступает инициатором запроса.
* `GET /source/{sha1}` - скачивает файл с исходным кодом с координатора.
* `POST /heartbeat` - синхронизирует состояние воркера и координатора. Воркер посылает
в теле запроса json описывающий изменение в своём состоянии. Коордитора отвечает json-ом со
списком задач, которые должен выполнить воркер.
```go
package proto
// CompleteJob описывает результат работы джоба.
type CompletedJob struct {
ID graph.ID
Stdout, Stderr []byte
// Error описывает сообщение об ошибке, из-за которого джоб не удалось выполнить.
//
// Если Error == nil, значит джоб завершился успешно.
Error *string
}
type HeartbeatRequest struct {
// WorkerID задаёт персистентный идентификатор данного воркера.
//
// WorkerID так же выступает в качестве endpoint-а, к которому можно подключиться по HTTP.
//
// В наших тестов, идентификатор будет иметь вид "localhost:%d".
WorkerID string
// ProcessID задаёт эфемерный идентификатор текущего процесса воркера.
//
// Координатор запоминает ProcessID для каждого воркера.
//
// Измение ProcessID значит, что воркер перезапустился.
ProcessID string
// RunningJobs перечисляет список джобов, которые выполняются на этом воркере
// в данный момент.
RunningJobs []graph.ID
DownloadingSources []graph.ID
DownloadingArtifacts []graph.ID
// FreeSlots сообщаяет, сколько еще процессов можно запустить на этом воркере.
FreeSlots int
// CompletedJobs сообщает координатору, какие джобы завершили исполнение на этом воркере
// на этой итерации цикла.
CompletedJobs []CompletedJob
// AddedArtifacts говорит, какие артефакты появились в кеше на этой итерации цикла.
AddedArtifacts []graph.ID
// AddedSourceFiles говорит, какие файлы появились в кеше на этой итерации цикла.
AddedSourceFiles []graph.ID
}
// JobSpec описывает джоб, который нужно запустить.
type JobSpec struct {
}
// ArtifactSpec описывает артефакт, который нужно скачать с другого воркера.
type ArtifactSpec struct {
}
// SourceFileSpec описывает файл с исходным кодом, который нужно скачать с координатора.
type SourceFileSpec struct {
}
type HeartbeatResponse struct {
JobsToRun map[graph.ID]JobSpec
ArtifactsToDownload map[graph.ID]ArtifactSpec
ArtifactsToRemove []graph.ID
SourceFilesToDownload map[graph.ID]SourceFileSpec
SourceFilesToRemove []graph.ID
}
```
### Протокол: Воркер <-> Воркер
Общение между воркерам происходит тогда, когда системе нужно передать артефакты сборки с
одного воркера на другой.
* `GET /artifact/{sha1}` - возвращает директорию с выходными данными джоба в формате `tar`.
## Кеширование

View file

@ -0,0 +1,40 @@
package artifact
import (
"errors"
"net/http"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
var (
ErrNotFound = errors.New("file not found")
ErrWriteLocked = errors.New("file is locked for write")
ErrReadLocked = errors.New("file is locked for read")
)
type Cache struct{}
func NewCache(root string) (*Cache, error) {
panic("implement me")
}
func (c *Cache) Range(artifactFn func(file build.ID) error) error {
panic("implement me")
}
func (c *Cache) Remove(artifact build.ID) error {
panic("implement me")
}
func (c *Cache) Create(artifact build.ID) (path string, abort, commit func(), err error) {
panic("implement me")
}
func (c *Cache) Get(file build.ID) (path string, unlock func(), err error) {
panic("implement me")
}
func NewHandler(c *Cache) http.Handler {
panic("implement me")
}

View file

@ -0,0 +1,74 @@
package build
import "crypto/sha1"
type ID [sha1.Size]byte
// Job описывает одну вершину графа сборки.
type Job struct {
// ID задаёт уникальный идентификатор джоба.
//
// ID вычисляется как хеш от всех входных файлов, команд запуска и хешей зависимых джобов.
//
// Выход джоба целиком опеределяется его ID. Это важное свойство позволяет кешировать
// результаты сборки.
ID ID
// Name задаёт человекочитаемое имя джоба.
//
// Например:
// build gitlab.com/slon/disbuild/pkg/b
// vet gitlab.com/slon/disbuild/pkg/a
// test gitlab.com/slon/disbuild/pkg/test
Name string
// Inputs задаёт список файлов из директории с исходным кодом,
// которые нужны для работы этого джоба.
//
// В типичном случае, тут будут перечислены все .go файлы одного пакета.
Inputs []string
// Deps задаёт список джобов, выходы которых нужны для работы этого джоба.
Deps []ID
// Cmds описывает список команд, которые нужно выполнить в рамках этого джоба.
Cmds []Cmd
}
// Cmd описывает одну команду сборки.
//
// Есть несколько видов команд. Все виды команд описываются одной структурой.
// Реальный тип определяется тем, какие поля структуры заполнены.
//
// exec - выполняет произвольную команду
// cat - записывает строку в файл
//
// Все строки в описании команды могут содержать в себе на переменные. Перед выполнением
// реальной команды, переменные заменяются на их реальные значения.
//
// {{OUTPUT_DIR}} - абсолютный путь до выходной директории джоба.
// {{SOURCE_DIR}} - абсолютный путь до директории с исходными файлами.
// {{DEP:f374b81d81f641c8c3d5d5468081ef83b2c7dae9}} - абсолютный путь до директории,
// содержащей выход джоба с id f374b81d81f641c8c3d5d5468081ef83b2c7dae9.
type Cmd struct {
// Exec описывает команду, которую нужно выполнить.
Exec []string
// Environ описывает переменные окружения, которые необходимы для работы команды из Exec.
Environ []string
// WorkingDirectory задаёт рабочую директорию для команды из Exec.
WorkingDirectory string
// CatTemplate задаёт шаблон строки, которую нужно записать в файл.
CatTemplate string
// CatOutput задаёт выходной файл для команды типа cat.
CatOutput string
}
type Graph struct {
SourceFiles map[ID]string
Jobs []Job
}

View file

@ -0,0 +1,107 @@
package client
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/proto"
)
type Client struct {
CoordinatorEndpoint string
SourceDir string
}
type BuildListener interface {
OnJobStdout(jobID build.ID, stdout []byte) error
OnJobStderr(jobID build.ID, stdout []byte) error
OnJobFinished(jobID build.ID) error
OnJobFailed(jobID build.ID, code int, error string) error
}
func (c *Client) uploadSources(ctx context.Context, src proto.MissingSources) error {
}
func (c *Client) Build(ctx context.Context, graph build.Graph, lsn BuildListener) error {
graphJS, err := json.Marshal(graph)
if err != nil {
return err
}
req, err := http.NewRequest("POST", c.CoordinatorEndpoint+"/build", bytes.NewBuffer(graphJS))
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/json")
req = req.WithContext(ctx)
rsp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("build failed: %w", err)
}
defer rsp.Body.Close()
if rsp.StatusCode != 200 {
errorMsg, _ := ioutil.ReadAll(rsp.Body)
return fmt.Errorf("build failed: %s", errorMsg)
}
d := json.NewDecoder(rsp.Body)
var missing proto.MissingSources
if err := d.Decode(&missing); err != nil {
return err
}
if err := c.uploadSources(ctx, missing); err != nil {
return err
}
for {
var update proto.StatusUpdate
if err := d.Decode(&update); err != nil {
return err
}
switch {
case update.BuildFailed != nil:
return fmt.Errorf("build failed: %s", update.BuildFailed.Error)
case update.JobFinished != nil:
jf := update.JobFinished
if jf.Stdout != nil {
if err := lsn.OnJobStdout(jf.ID, jf.Stdout); err != nil {
return err
}
}
if jf.Stderr != nil {
if err := lsn.OnJobStderr(jf.ID, jf.Stderr); err != nil {
return err
}
}
if jf.Error != nil {
if err := lsn.OnJobFailed(jf.ID, jf.ExitCode, *jf.Error); err != nil {
return err
}
} else {
if err := lsn.OnJobFinished(jf.ID); err != nil {
return err
}
}
default:
return fmt.Errorf("build failed: unexpected status update")
}
}
}

4
distbuild/pkg/dist/build.go vendored Normal file
View file

@ -0,0 +1,4 @@
package dist
type Build struct {
}

View file

@ -0,0 +1,42 @@
package filecache
import (
"errors"
"io"
"net/http"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
var (
ErrNotFound = errors.New("file not found")
ErrWriteLocked = errors.New("file is locked for write")
ErrReadLocked = errors.New("file is locked for read")
)
type Cache struct {
}
func New(rootDir string) (*Cache, error) {
panic("implement me")
}
func (c *Cache) Range(fileFn func(file build.ID) error) error {
panic("implement me")
}
func (c *Cache) Remove(file build.ID) error {
panic("implement me")
}
func (c *Cache) Write(file build.ID) (w io.WriteCloser, abort func(), err error) {
panic("implement me")
}
func (c *Cache) Get(file build.ID) (path string, unlock func(), err error) {
panic("implement me")
}
func NewHandler(c *Cache) http.Handler {
panic("implement me")
}

View file

@ -0,0 +1,18 @@
package proto
import (
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
type MissingSources struct {
MissingFiles []build.ID
}
type StatusUpdate struct {
JobFinished *FinishedJob
BuildFailed *BuildFailed
}
type BuildFailed struct {
Error string
}

View file

@ -0,0 +1,83 @@
package proto
import (
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
// CompleteJob описывает результат работы джоба.
type FinishedJob struct {
ID build.ID
Stdout, Stderr []byte
ExitCode int
// Error описывает сообщение об ошибке, из-за которого джоб не удалось выполнить.
//
// Если Error == nil, значит джоб завершился успешно.
Error *string
}
type HeartbeatRequest struct {
// WorkerID задаёт персистентный идентификатор данного воркера.
//
// WorkerID так же выступает в качестве endpoint-а, к которому можно подключиться по HTTP.
//
// В наших тестов, идентификатор будет иметь вид "localhost:%d".
WorkerID string
// ProcessID задаёт эфемерный идентификатор текущего процесса воркера.
//
// Координатор запоминает ProcessID для каждого воркера.
//
// Измение ProcessID значит, что воркер перезапустился.
ProcessID string
// RunningJobs перечисляет список джобов, которые выполняются на этом воркере
// в данный момент.
RunningJobs []build.ID
DownloadingSources []build.ID
DownloadingArtifacts []build.ID
// FreeSlots сообщаяет, сколько еще процессов можно запустить на этом воркере.
FreeSlots int
// FinishedJob сообщает координатору, какие джобы завершили исполнение на этом воркере
// на этой итерации цикла.
FinishedJob []FinishedJob
// AddedArtifacts говорит, какие артефакты появились в кеше на этой итерации цикла.
AddedArtifacts []build.ID
// AddedSourceFiles говорит, какие файлы появились в кеше на этой итерации цикла.
AddedSourceFiles []build.ID
}
// JobSpec описывает джоб, который нужно запустить.
type JobSpec struct {
SourceFiles map[build.ID]string
Job build.Job
}
// ArtifactSpec описывает артефакт, который нужно скачать с другого воркера.
type ArtifactSpec struct {
}
// SourceFileSpec описывает файл с исходным кодом, который нужно скачать с координатора.
type SourceFileSpec struct {
}
type HeartbeatResponse struct {
JobsToRun map[build.ID]JobSpec
ArtifactsToDownload map[build.ID]ArtifactSpec
ArtifactsToRemove []build.ID
SourceFilesToDownload map[build.ID]SourceFileSpec
SourceFilesToRemove []build.ID
}

View file

@ -0,0 +1,46 @@
package worker
import (
"context"
"sync"
"gitlab.com/slon/shad-go/distbuild/pkg/artifact"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache"
)
type Worker struct {
CoordinatorEndpoint string
SourceFiles *filecache.Cache
Artifacts *artifact.Cache
mu sync.Mutex
newArtifacts []build.ID
newSources []build.ID
}
func (w *Worker) recover() error {
err := w.SourceFiles.Range(func(file build.ID) error {
w.newSources = append(w.newSources, file)
return nil
})
if err != nil {
return err
}
return w.Artifacts.Range(func(file build.ID) error {
w.newArtifacts = append(w.newArtifacts, file)
return nil
})
}
func (w *Worker) Run(ctx context.Context) error {
if err := w.recover(); err != nil {
return err
}
for {
}
}