Finish distbuild/pkg/filecache

This commit is contained in:
Fedor Korotkiy 2020-04-04 21:45:29 +03:00
parent 03e63c2646
commit 21db0f4d0b
10 changed files with 388 additions and 12 deletions

View file

@ -6,6 +6,7 @@
набора файлов и директорий. набора файлов и директорий.
Основной тип `artifact.Cache` занимается хранением артефактов на диске и контролем одновременного доступа. Основной тип `artifact.Cache` занимается хранением артефактов на диске и контролем одновременного доступа.
Все методы `artifact.Cache` должны быть *concurrency safe*.
Одна горутина может начать писать артефакт. Начало записи берёт лок на запись. Никто другой не может работать с артефактом, Одна горутина может начать писать артефакт. Начало записи берёт лок на запись. Никто другой не может работать с артефактом,
на который взят лок на запись. Горутина должна позвать `commit` или `abort` после того, как она закончила работать с артефактом. на который взят лок на запись. Горутина должна позвать `commit` или `abort` после того, как она закончила работать с артефактом.

View file

@ -14,6 +14,7 @@ import (
var ( var (
ErrNotFound = errors.New("artifact not found") ErrNotFound = errors.New("artifact not found")
ErrExists = errors.New("artifact exists")
ErrWriteLocked = errors.New("artifact is locked for write") ErrWriteLocked = errors.New("artifact is locked for write")
ErrReadLocked = errors.New("artifact is locked for read") ErrReadLocked = errors.New("artifact is locked for read")
) )
@ -79,10 +80,17 @@ func (c *Cache) readUnlock(id build.ID) {
} }
} }
func (c *Cache) writeLock(id build.ID) error { func (c *Cache) writeLock(id build.ID, remove bool) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
_, err := os.Stat(filepath.Join(c.cacheDir, id.Path()))
if !os.IsNotExist(err) && err != nil {
return err
} else if err == nil && !remove {
return ErrExists
}
if _, ok := c.writeLocked[id]; ok { if _, ok := c.writeLocked[id]; ok {
return ErrWriteLocked return ErrWriteLocked
} }
@ -129,7 +137,7 @@ func (c *Cache) Range(artifactFn func(artifact build.ID) error) error {
} }
func (c *Cache) Remove(artifact build.ID) error { func (c *Cache) Remove(artifact build.ID) error {
if err := c.writeLock(artifact); err != nil { if err := c.writeLock(artifact, true); err != nil {
return err return err
} }
defer c.writeUnlock(artifact) defer c.writeUnlock(artifact)
@ -138,7 +146,7 @@ func (c *Cache) Remove(artifact build.ID) error {
} }
func (c *Cache) Create(artifact build.ID) (path string, commit, abort func() error, err error) { func (c *Cache) Create(artifact build.ID) (path string, commit, abort func() error, err error) {
if err = c.writeLock(artifact); err != nil { if err = c.writeLock(artifact, false); err != nil {
return return
} }

View file

@ -84,3 +84,17 @@ func TestAbortWrite(t *testing.T) {
_, _, err = c.Get(idA) _, _, err = c.Get(idA)
require.Truef(t, errors.Is(err, artifact.ErrNotFound), "%v", err) require.Truef(t, errors.Is(err, artifact.ErrNotFound), "%v", err)
} }
func TestArtifactExists(t *testing.T) {
c := newTestCache(t)
defer c.cleanup()
idA := build.ID{'a'}
_, commit, _, err := c.Create(idA)
require.NoError(t, err)
require.NoError(t, commit())
_, _, _, err = c.Create(idA)
require.Truef(t, errors.Is(err, artifact.ErrExists), "%v", err)
}

View file

@ -0,0 +1,18 @@
# filecache
Пакет `filecache` занимается хранением кеша файлов и определяет протокол передачи файлов между частями системы.
`filecache.Cache` управляет файлами и занимается контролем одновременного доступа. Вы можете реализовать этот
тип поверх `*artifact.Cache`, поведение требуется точно такое же.
## Передача файлов
Тип `filecache.Handler` реализует handler, позволяющий заливать и скачивать файлы из кеша.
- Вызов `GET /file?id=123` должен возвращать содержимое файла с `id=123`.
- Вызов `PUT /file?id=123` должен заливать содержимое файла с `id=123`.
**Обратите внимание:** Несколько клиентов могут начать заливать в кеш один и тот же набор файлов. В наивной реализации,
первый клиент залочит файл на запись, а следующие упадут с ошибкой. Ваш код должен обрабатывать эту ситуацию корректно,
тоесть последующие запросы должны дожидаться, пока первый запрос завершится. Для реализации этой логики
поведения вам поможет пакет [singleflight](https://godoc.org/golang.org/x/sync/singleflight).

View file

@ -0,0 +1,83 @@
package filecache
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"go.uber.org/zap"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
type Client struct {
l *zap.Logger
endpoint string
}
func NewClient(l *zap.Logger, endpoint string) *Client {
return &Client{
l: l,
endpoint: endpoint,
}
}
func (c *Client) Upload(ctx context.Context, id build.ID, localPath string) error {
f, err := os.Open(localPath)
if err != nil {
return err
}
defer f.Close()
req, err := http.NewRequestWithContext(ctx, http.MethodPut, c.endpoint+"/file?id="+id.String(), f)
if err != nil {
return err
}
rsp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer rsp.Body.Close()
if rsp.StatusCode != 200 {
errStr, _ := ioutil.ReadAll(rsp.Body)
return fmt.Errorf("file upload: %s", errStr)
}
return nil
}
func (c *Client) Download(ctx context.Context, localCache *Cache, id build.ID) error {
w, abort, err := localCache.Write(id)
if err != nil {
return err
}
defer abort()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.endpoint+"/file?id="+id.String(), nil)
if err != nil {
return err
}
rsp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer rsp.Body.Close()
if rsp.StatusCode != 200 {
errStr, _ := ioutil.ReadAll(rsp.Body)
return fmt.Errorf("file upload: %s", errStr)
}
_, err = io.Copy(w, rsp.Body)
if err != nil {
return err
}
return w.Close()
}

View file

@ -0,0 +1,140 @@
package filecache_test
import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"path/filepath"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache"
)
type env struct {
cache *testCache
server *httptest.Server
client *filecache.Client
}
func newEnv(t *testing.T) *env {
l := zaptest.NewLogger(t)
mux := http.NewServeMux()
cache := newCache(t)
defer func() {
if cache != nil {
_ = cache.cleanup()
}
}()
handler := filecache.NewHandler(l, cache.Cache)
handler.Register(mux)
server := httptest.NewServer(mux)
client := filecache.NewClient(l, server.URL)
env := &env{
cache: cache,
server: server,
client: client,
}
cache = nil
return env
}
func (e *env) stop() {
e.server.Close()
_ = e.cache.cleanup()
}
func TestFileUpload(t *testing.T) {
env := newEnv(t)
defer env.stop()
tmpFilePath := filepath.Join(env.cache.tmpDir, "foo.txt")
require.NoError(t, ioutil.WriteFile(tmpFilePath, []byte("foobar"), 0666))
ctx := context.Background()
t.Run("UploadSingleFile", func(t *testing.T) {
id := build.ID{0x01}
require.NoError(t, env.client.Upload(ctx, id, tmpFilePath))
path, unlock, err := env.cache.Get(id)
require.NoError(t, err)
defer unlock()
content, err := ioutil.ReadFile(path)
require.NoError(t, err)
require.Equal(t, []byte("foobar"), content)
})
t.Run("RepeatedUpload", func(t *testing.T) {
id := build.ID{0x02}
require.NoError(t, env.client.Upload(ctx, id, tmpFilePath))
require.NoError(t, env.client.Upload(ctx, id, tmpFilePath))
})
t.Run("ConcurrentUpload", func(t *testing.T) {
const (
N = 100
G = 100
)
for i := 0; i < N; i++ {
var wg sync.WaitGroup
wg.Add(G)
for j := 0; j < G; j++ {
go func() {
defer wg.Done()
id := build.ID{0x03, byte(j)}
assert.NoError(t, env.client.Upload(ctx, id, tmpFilePath))
}()
}
wg.Wait()
}
})
}
func TestFileDownload(t *testing.T) {
env := newEnv(t)
defer env.stop()
localCache := newCache(t)
defer localCache.cleanup()
id := build.ID{0x01}
w, abort, err := env.cache.Write(id)
require.NoError(t, err)
defer abort()
_, err = w.Write([]byte("foobar"))
require.NoError(t, err)
require.NoError(t, w.Close())
ctx := context.Background()
require.NoError(t, env.client.Download(ctx, localCache.Cache, id))
path, unlock, err := localCache.Get(id)
require.NoError(t, err)
defer unlock()
content, err := ioutil.ReadFile(path)
require.NoError(t, err)
require.Equal(t, []byte("foobar"), content)
}

View file

@ -3,7 +3,6 @@ package filecache
import ( import (
"errors" "errors"
"io" "io"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
@ -13,6 +12,7 @@ import (
var ( var (
ErrNotFound = errors.New("file not found") ErrNotFound = errors.New("file not found")
ErrExists = errors.New("file exists")
ErrWriteLocked = errors.New("file is locked for write") ErrWriteLocked = errors.New("file is locked for write")
ErrReadLocked = errors.New("file is locked for read") ErrReadLocked = errors.New("file is locked for read")
) )
@ -23,6 +23,8 @@ func convertErr(err error) error {
switch { switch {
case errors.Is(err, artifact.ErrNotFound): case errors.Is(err, artifact.ErrNotFound):
return ErrNotFound return ErrNotFound
case errors.Is(err, artifact.ErrExists):
return ErrExists
case errors.Is(err, artifact.ErrWriteLocked): case errors.Is(err, artifact.ErrWriteLocked):
return ErrWriteLocked return ErrWriteLocked
case errors.Is(err, artifact.ErrReadLocked): case errors.Is(err, artifact.ErrReadLocked):
@ -107,7 +109,3 @@ func (c *Cache) Get(file build.ID) (path string, unlock func(), err error) {
err = convertErr(err) err = convertErr(err)
return return
} }
func NewHandler(c *Cache) http.Handler {
panic("implement me")
}

View file

@ -1,28 +1,45 @@
package filecache package filecache_test
import ( import (
"errors" "errors"
"io/ioutil" "io/ioutil"
"os"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache"
) )
func TestFileCache(t *testing.T) { type testCache struct {
*filecache.Cache
tmpDir string
}
func newCache(t *testing.T) *testCache {
tmpDir, err := ioutil.TempDir("", "filecache") tmpDir, err := ioutil.TempDir("", "filecache")
require.NoError(t, err) require.NoError(t, err)
cache, err := New(tmpDir) c, err := filecache.New(tmpDir)
require.NoError(t, err) require.NoError(t, err)
return &testCache{Cache: c, tmpDir: tmpDir}
}
func (c *testCache) cleanup() error {
return os.Remove(c.tmpDir)
}
func TestFileCache(t *testing.T) {
cache := newCache(t)
_, abort, err := cache.Write(build.ID{01}) _, abort, err := cache.Write(build.ID{01})
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, abort()) require.NoError(t, abort())
_, _, err = cache.Get(build.ID{01}) _, _, err = cache.Get(build.ID{01})
require.Truef(t, errors.Is(err, ErrNotFound), "real error: %v", err) require.Truef(t, errors.Is(err, filecache.ErrNotFound), "%v", err)
f, _, err := cache.Write(build.ID{02}) f, _, err := cache.Write(build.ID{02})
require.NoError(t, err) require.NoError(t, err)

View file

@ -0,0 +1,96 @@
package filecache
import (
"errors"
"fmt"
"io"
"net/http"
"os"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
"gitlab.com/slon/shad-go/distbuild/pkg/build"
)
type Handler struct {
l *zap.Logger
cache *Cache
single singleflight.Group
}
func NewHandler(l *zap.Logger, cache *Cache) *Handler {
return &Handler{
l: l,
cache: cache,
}
}
func (h *Handler) Register(mux *http.ServeMux) {
mux.HandleFunc("/file", h.file)
}
func (h *Handler) doGet(w http.ResponseWriter, r *http.Request, id build.ID) error {
path, unlock, err := h.cache.Get(id)
if err != nil {
return err
}
defer unlock()
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
if _, err = io.Copy(w, f); err != nil {
h.l.Warn("error streaming file", zap.Error(err))
}
return nil
}
func (h *Handler) doPut(w http.ResponseWriter, r *http.Request, id build.ID) error {
_, err, _ := h.single.Do(id.String(), func() (interface{}, error) {
w, abort, err := h.cache.Write(id)
if errors.Is(err, ErrExists) {
return nil, nil
} else if err != nil {
return nil, err
}
defer abort()
if _, err = io.Copy(w, r.Body); err != nil {
return nil, err
}
return nil, w.Close()
})
if err != nil {
return err
}
w.WriteHeader(http.StatusOK)
return nil
}
func (h *Handler) file(w http.ResponseWriter, r *http.Request) {
var id build.ID
err := id.UnmarshalText([]byte(r.URL.Query().Get("id")))
if err == nil {
switch r.Method {
case http.MethodGet:
err = h.doGet(w, r, id)
case http.MethodPut:
err = h.doPut(w, r, id)
default:
err = fmt.Errorf("filehandler: unsupported method %s", r.Method)
}
}
if err != nil {
h.l.Warn("file error", zap.String("method", r.Method), zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "%v", err)
}
}

1
go.mod
View file

@ -15,6 +15,7 @@ require (
go.uber.org/zap v1.14.0 go.uber.org/zap v1.14.0
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 golang.org/x/net v0.0.0-20190628185345-da137c7871d7
golang.org/x/perf v0.0.0-20191209155426-36b577b0eb03 golang.org/x/perf v0.0.0-20191209155426-36b577b0eb03
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/tools v0.0.0-20200125223703-d33eef8e6825 golang.org/x/tools v0.0.0-20200125223703-d33eef8e6825
gopkg.in/yaml.v2 v2.2.8 gopkg.in/yaml.v2 v2.2.8
) )