Finish distbuild/pkg/artifact

This commit is contained in:
Fedor Korotkiy 2020-04-04 20:16:36 +03:00
parent f221388f5c
commit 03e63c2646
6 changed files with 205 additions and 19 deletions

View file

@ -0,0 +1,26 @@
# artifact
Пакет `artifact` реализует кеш хранения артефактов и протокол для передачи артефактов между воркерами.
Артефакт - это директория, содержащая в себе результат работы джоба. Артефакт может состоять из произвольного
набора файлов и директорий.
Основной тип `artifact.Cache` занимается хранением артефактов на диске и контролем одновременного доступа.
Одна горутина может начать писать артефакт. Начало записи берёт лок на запись. Никто другой не может работать с артефактом,
на который взят лок на запись. Горутина должна позвать `commit` или `abort` после того, как она закончила работать с артефактом.
`commit` помечает артефакт в кеш. `abort` отменяет запись артефакта, удаляя все данные.
Горутина может начать читать артефакт, позвав метод `Get`. Много горутин могут читать артефакт одновременно.
Горутина должна позвать `unlock`, после того как она закончила работать с артефактом.
## Скачивание артефакта
`*artifact.Handler` должен реализовывать один метод `GET /artifact?id=1234`. Хендлер отвечает на
запрос содержимым артефакта в формате `tarstream`.
Функция `Download` должна скачивать артефакт из удалённого кеша в локальный.
Обратите внимание, что конструктор хендлера принимает `*zap.Logger`. Запишите в этот логгер интересные события,
это поможет при отладке в следующих частях задачи.

View file

@ -5,7 +5,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
@ -182,7 +181,3 @@ func (c *Cache) Get(artifact build.ID) (path string, unlock func(), err error) {
} }
return return
} }
func NewHandler(c *Cache) http.Handler {
panic("implement me")
}

View file

@ -1,6 +1,7 @@
package artifact package artifact_test
import ( import (
"errors"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -8,24 +9,43 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/slon/shad-go/distbuild/pkg/artifact"
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
) )
func TestCache(t *testing.T) { type testCache struct {
*artifact.Cache
tmpDir string
}
func (c *testCache) cleanup() error {
return os.RemoveAll(c.tmpDir)
}
func newTestCache(t *testing.T) *testCache {
tmpDir, err := ioutil.TempDir("", "") tmpDir, err := ioutil.TempDir("", "")
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(tmpDir)
c, err := NewCache(tmpDir) cache, err := artifact.NewCache(tmpDir)
if err != nil {
_ = os.RemoveAll(tmpDir)
}
require.NoError(t, err) require.NoError(t, err)
return &testCache{Cache: cache, tmpDir: tmpDir}
}
func TestCache(t *testing.T) {
c := newTestCache(t)
defer c.cleanup()
idA := build.ID{'a'} idA := build.ID{'a'}
path, commit, _, err := c.Create(idA) path, commit, _, err := c.Create(idA)
require.NoError(t, err) require.NoError(t, err)
_, _, _, err = c.Create(idA) _, _, _, err = c.Create(idA)
require.Equal(t, ErrWriteLocked, err) require.Truef(t, errors.Is(err, artifact.ErrWriteLocked), "%v", err)
_, err = os.Create(filepath.Join(path, "a.txt")) _, err = os.Create(filepath.Join(path, "a.txt"))
require.NoError(t, err) require.NoError(t, err)
@ -39,11 +59,11 @@ func TestCache(t *testing.T) {
_, err = os.Stat(filepath.Join(path, "a.txt")) _, err = os.Stat(filepath.Join(path, "a.txt"))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, ErrReadLocked, c.Remove(idA)) require.Truef(t, errors.Is(c.Remove(idA), artifact.ErrReadLocked), "%v", err)
idB := build.ID{'b'} idB := build.ID{'b'}
_, _, err = c.Get(idB) _, _, err = c.Get(idB)
require.Equal(t, ErrNotFound, err) require.Truef(t, errors.Is(err, artifact.ErrNotFound), "%v", err)
require.NoError(t, c.Range(func(artifact build.ID) error { require.NoError(t, c.Range(func(artifact build.ID) error {
require.Equal(t, idA, artifact) require.Equal(t, idA, artifact)
@ -52,12 +72,8 @@ func TestCache(t *testing.T) {
} }
func TestAbortWrite(t *testing.T) { func TestAbortWrite(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "") c := newTestCache(t)
require.NoError(t, err) defer c.cleanup()
defer os.RemoveAll(tmpDir)
c, err := NewCache(tmpDir)
require.NoError(t, err)
idA := build.ID{'a'} idA := build.ID{'a'}
@ -66,5 +82,5 @@ func TestAbortWrite(t *testing.T) {
require.NoError(t, abort()) require.NoError(t, abort())
_, _, err = c.Get(idA) _, _, err = c.Get(idA)
require.Equal(t, ErrNotFound, err) require.Truef(t, errors.Is(err, artifact.ErrNotFound), "%v", err)
} }

View file

@ -0,0 +1,42 @@
package artifact
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/tarstream"
)
// Download artifact from remote cache into local cache.
func Download(ctx context.Context, endpoint string, c *Cache, artifactID build.ID) error {
dir, commit, abort, err := c.Create(artifactID)
if err != nil {
return err
}
defer abort()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint+"/artifact?id="+artifactID.String(), nil)
if err != nil {
return err
}
rsp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
errStr, _ := ioutil.ReadAll(rsp.Body)
return fmt.Errorf("download: %s", errStr)
}
if err := tarstream.Receive(dir, rsp.Body); err != nil {
return err
}
return commit()
}

View file

@ -0,0 +1,53 @@
package artifact_test
import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"gitlab.com/slon/shad-go/distbuild/pkg/artifact"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
func TestArtifactTransfer(t *testing.T) {
remoteCache := newTestCache(t)
defer remoteCache.cleanup()
localCache := newTestCache(t)
defer localCache.cleanup()
id := build.ID{0x01}
dir, commit, _, err := remoteCache.Create(id)
require.NoError(t, err)
require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "a.txt"), []byte("foobar"), 0777))
require.NoError(t, commit())
l := zaptest.NewLogger(t)
h := artifact.NewHandler(l, remoteCache.Cache)
mux := http.NewServeMux()
h.Register(mux)
server := httptest.NewServer(mux)
defer server.Close()
ctx := context.Background()
require.NoError(t, artifact.Download(ctx, server.URL, localCache.Cache, id))
dir, unlock, err := localCache.Get(id)
require.NoError(t, err)
defer unlock()
content, err := ioutil.ReadFile(filepath.Join(dir, "a.txt"))
require.NoError(t, err)
require.Equal(t, []byte("foobar"), content)
err = artifact.Download(ctx, server.URL, localCache.Cache, build.ID{0x02})
require.Error(t, err)
}

View file

@ -0,0 +1,54 @@
package artifact
import (
"fmt"
"net/http"
"go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/tarstream"
)
type Handler struct {
l *zap.Logger
c *Cache
}
func NewHandler(l *zap.Logger, c *Cache) *Handler {
return &Handler{l: l, c: c}
}
func (h *Handler) Register(mux *http.ServeMux) {
mux.HandleFunc("/artifact", h.artifact)
}
func (h *Handler) doArtifact(w http.ResponseWriter, r *http.Request) error {
idStr := r.URL.Query().Get("id")
var id build.ID
if err := id.UnmarshalText([]byte(idStr)); err != nil {
return err
}
h.l.Debug("streaming artifact", zap.String("artifact_id", id.String()))
artifactDir, unlock, err := h.c.Get(id)
if err != nil {
return err
}
defer unlock()
w.WriteHeader(http.StatusOK)
if err := tarstream.Send(artifactDir, w); err != nil {
h.l.Warn("error streaming artifact", zap.Error(err))
}
return nil
}
func (h *Handler) artifact(w http.ResponseWriter, r *http.Request) {
if err := h.doArtifact(w, r); err != nil {
h.l.Warn("artifact handler error", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "%v", err)
}
}