From 21db0f4d0b1a812fa957b063e9b97dad09b84f7e Mon Sep 17 00:00:00 2001 From: Fedor Korotkiy Date: Sat, 4 Apr 2020 21:45:29 +0300 Subject: [PATCH] Finish distbuild/pkg/filecache --- distbuild/pkg/artifact/README.md | 1 + distbuild/pkg/artifact/cache.go | 14 ++- distbuild/pkg/artifact/cache_test.go | 14 +++ distbuild/pkg/filecache/README.md | 18 +++ distbuild/pkg/filecache/client.go | 83 +++++++++++++ distbuild/pkg/filecache/client_test.go | 140 ++++++++++++++++++++++ distbuild/pkg/filecache/filecache.go | 8 +- distbuild/pkg/filecache/filecache_test.go | 25 +++- distbuild/pkg/filecache/handler.go | 96 +++++++++++++++ go.mod | 1 + 10 files changed, 388 insertions(+), 12 deletions(-) create mode 100644 distbuild/pkg/filecache/README.md create mode 100644 distbuild/pkg/filecache/client.go create mode 100644 distbuild/pkg/filecache/client_test.go create mode 100644 distbuild/pkg/filecache/handler.go diff --git a/distbuild/pkg/artifact/README.md b/distbuild/pkg/artifact/README.md index 8c32e5e..567a4f3 100644 --- a/distbuild/pkg/artifact/README.md +++ b/distbuild/pkg/artifact/README.md @@ -6,6 +6,7 @@ набора файлов и директорий. Основной тип `artifact.Cache` занимается хранением артефактов на диске и контролем одновременного доступа. +Все методы `artifact.Cache` должны быть *concurrency safe*. Одна горутина может начать писать артефакт. Начало записи берёт лок на запись. Никто другой не может работать с артефактом, на который взят лок на запись. Горутина должна позвать `commit` или `abort` после того, как она закончила работать с артефактом. diff --git a/distbuild/pkg/artifact/cache.go b/distbuild/pkg/artifact/cache.go index b12b017..5ad9a1a 100644 --- a/distbuild/pkg/artifact/cache.go +++ b/distbuild/pkg/artifact/cache.go @@ -14,6 +14,7 @@ import ( var ( ErrNotFound = errors.New("artifact not found") + ErrExists = errors.New("artifact exists") ErrWriteLocked = errors.New("artifact is locked for write") ErrReadLocked = errors.New("artifact is locked for read") ) @@ -79,10 +80,17 @@ func (c *Cache) readUnlock(id build.ID) { } } -func (c *Cache) writeLock(id build.ID) error { +func (c *Cache) writeLock(id build.ID, remove bool) error { c.mu.Lock() defer c.mu.Unlock() + _, err := os.Stat(filepath.Join(c.cacheDir, id.Path())) + if !os.IsNotExist(err) && err != nil { + return err + } else if err == nil && !remove { + return ErrExists + } + if _, ok := c.writeLocked[id]; ok { return ErrWriteLocked } @@ -129,7 +137,7 @@ func (c *Cache) Range(artifactFn func(artifact build.ID) error) error { } func (c *Cache) Remove(artifact build.ID) error { - if err := c.writeLock(artifact); err != nil { + if err := c.writeLock(artifact, true); err != nil { return err } defer c.writeUnlock(artifact) @@ -138,7 +146,7 @@ func (c *Cache) Remove(artifact build.ID) error { } func (c *Cache) Create(artifact build.ID) (path string, commit, abort func() error, err error) { - if err = c.writeLock(artifact); err != nil { + if err = c.writeLock(artifact, false); err != nil { return } diff --git a/distbuild/pkg/artifact/cache_test.go b/distbuild/pkg/artifact/cache_test.go index 4712f29..faf88c4 100644 --- a/distbuild/pkg/artifact/cache_test.go +++ b/distbuild/pkg/artifact/cache_test.go @@ -84,3 +84,17 @@ func TestAbortWrite(t *testing.T) { _, _, err = c.Get(idA) require.Truef(t, errors.Is(err, artifact.ErrNotFound), "%v", err) } + +func TestArtifactExists(t *testing.T) { + c := newTestCache(t) + defer c.cleanup() + + idA := build.ID{'a'} + + _, commit, _, err := c.Create(idA) + require.NoError(t, err) + require.NoError(t, commit()) + + _, _, _, err = c.Create(idA) + require.Truef(t, errors.Is(err, artifact.ErrExists), "%v", err) +} diff --git a/distbuild/pkg/filecache/README.md b/distbuild/pkg/filecache/README.md new file mode 100644 index 0000000..d912560 --- /dev/null +++ b/distbuild/pkg/filecache/README.md @@ -0,0 +1,18 @@ +# filecache + +Пакет `filecache` занимается хранением кеша файлов и определяет протокол передачи файлов между частями системы. + +`filecache.Cache` управляет файлами и занимается контролем одновременного доступа. Вы можете реализовать этот +тип поверх `*artifact.Cache`, поведение требуется точно такое же. + +## Передача файлов + +Тип `filecache.Handler` реализует handler, позволяющий заливать и скачивать файлы из кеша. + +- Вызов `GET /file?id=123` должен возвращать содержимое файла с `id=123`. +- Вызов `PUT /file?id=123` должен заливать содержимое файла с `id=123`. + +**Обратите внимание:** Несколько клиентов могут начать заливать в кеш один и тот же набор файлов. В наивной реализации, +первый клиент залочит файл на запись, а следующие упадут с ошибкой. Ваш код должен обрабатывать эту ситуацию корректно, +тоесть последующие запросы должны дожидаться, пока первый запрос завершится. Для реализации этой логики +поведения вам поможет пакет [singleflight](https://godoc.org/golang.org/x/sync/singleflight). diff --git a/distbuild/pkg/filecache/client.go b/distbuild/pkg/filecache/client.go new file mode 100644 index 0000000..f422919 --- /dev/null +++ b/distbuild/pkg/filecache/client.go @@ -0,0 +1,83 @@ +package filecache + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + + "go.uber.org/zap" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" +) + +type Client struct { + l *zap.Logger + endpoint string +} + +func NewClient(l *zap.Logger, endpoint string) *Client { + return &Client{ + l: l, + endpoint: endpoint, + } +} + +func (c *Client) Upload(ctx context.Context, id build.ID, localPath string) error { + f, err := os.Open(localPath) + if err != nil { + return err + } + defer f.Close() + + req, err := http.NewRequestWithContext(ctx, http.MethodPut, c.endpoint+"/file?id="+id.String(), f) + if err != nil { + return err + } + + rsp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer rsp.Body.Close() + + if rsp.StatusCode != 200 { + errStr, _ := ioutil.ReadAll(rsp.Body) + return fmt.Errorf("file upload: %s", errStr) + } + + return nil +} + +func (c *Client) Download(ctx context.Context, localCache *Cache, id build.ID) error { + w, abort, err := localCache.Write(id) + if err != nil { + return err + } + defer abort() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.endpoint+"/file?id="+id.String(), nil) + if err != nil { + return err + } + + rsp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer rsp.Body.Close() + + if rsp.StatusCode != 200 { + errStr, _ := ioutil.ReadAll(rsp.Body) + return fmt.Errorf("file upload: %s", errStr) + } + + _, err = io.Copy(w, rsp.Body) + if err != nil { + return err + } + + return w.Close() +} diff --git a/distbuild/pkg/filecache/client_test.go b/distbuild/pkg/filecache/client_test.go new file mode 100644 index 0000000..6656410 --- /dev/null +++ b/distbuild/pkg/filecache/client_test.go @@ -0,0 +1,140 @@ +package filecache_test + +import ( + "context" + "io/ioutil" + "net/http" + "net/http/httptest" + "path/filepath" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" + "gitlab.com/slon/shad-go/distbuild/pkg/filecache" +) + +type env struct { + cache *testCache + server *httptest.Server + client *filecache.Client +} + +func newEnv(t *testing.T) *env { + l := zaptest.NewLogger(t) + mux := http.NewServeMux() + + cache := newCache(t) + defer func() { + if cache != nil { + _ = cache.cleanup() + } + }() + + handler := filecache.NewHandler(l, cache.Cache) + handler.Register(mux) + + server := httptest.NewServer(mux) + + client := filecache.NewClient(l, server.URL) + + env := &env{ + cache: cache, + server: server, + client: client, + } + + cache = nil + return env +} + +func (e *env) stop() { + e.server.Close() + _ = e.cache.cleanup() +} + +func TestFileUpload(t *testing.T) { + env := newEnv(t) + defer env.stop() + + tmpFilePath := filepath.Join(env.cache.tmpDir, "foo.txt") + require.NoError(t, ioutil.WriteFile(tmpFilePath, []byte("foobar"), 0666)) + + ctx := context.Background() + + t.Run("UploadSingleFile", func(t *testing.T) { + id := build.ID{0x01} + + require.NoError(t, env.client.Upload(ctx, id, tmpFilePath)) + + path, unlock, err := env.cache.Get(id) + require.NoError(t, err) + defer unlock() + + content, err := ioutil.ReadFile(path) + require.NoError(t, err) + require.Equal(t, []byte("foobar"), content) + }) + + t.Run("RepeatedUpload", func(t *testing.T) { + id := build.ID{0x02} + + require.NoError(t, env.client.Upload(ctx, id, tmpFilePath)) + require.NoError(t, env.client.Upload(ctx, id, tmpFilePath)) + }) + + t.Run("ConcurrentUpload", func(t *testing.T) { + const ( + N = 100 + G = 100 + ) + + for i := 0; i < N; i++ { + var wg sync.WaitGroup + wg.Add(G) + + for j := 0; j < G; j++ { + go func() { + defer wg.Done() + + id := build.ID{0x03, byte(j)} + assert.NoError(t, env.client.Upload(ctx, id, tmpFilePath)) + }() + } + + wg.Wait() + } + }) +} + +func TestFileDownload(t *testing.T) { + env := newEnv(t) + defer env.stop() + + localCache := newCache(t) + defer localCache.cleanup() + + id := build.ID{0x01} + + w, abort, err := env.cache.Write(id) + require.NoError(t, err) + defer abort() + + _, err = w.Write([]byte("foobar")) + require.NoError(t, err) + require.NoError(t, w.Close()) + + ctx := context.Background() + require.NoError(t, env.client.Download(ctx, localCache.Cache, id)) + + path, unlock, err := localCache.Get(id) + require.NoError(t, err) + defer unlock() + + content, err := ioutil.ReadFile(path) + require.NoError(t, err) + require.Equal(t, []byte("foobar"), content) +} diff --git a/distbuild/pkg/filecache/filecache.go b/distbuild/pkg/filecache/filecache.go index 4ee9b63..8f31d50 100644 --- a/distbuild/pkg/filecache/filecache.go +++ b/distbuild/pkg/filecache/filecache.go @@ -3,7 +3,6 @@ package filecache import ( "errors" "io" - "net/http" "os" "path/filepath" @@ -13,6 +12,7 @@ import ( var ( ErrNotFound = errors.New("file not found") + ErrExists = errors.New("file exists") ErrWriteLocked = errors.New("file is locked for write") ErrReadLocked = errors.New("file is locked for read") ) @@ -23,6 +23,8 @@ func convertErr(err error) error { switch { case errors.Is(err, artifact.ErrNotFound): return ErrNotFound + case errors.Is(err, artifact.ErrExists): + return ErrExists case errors.Is(err, artifact.ErrWriteLocked): return ErrWriteLocked case errors.Is(err, artifact.ErrReadLocked): @@ -107,7 +109,3 @@ func (c *Cache) Get(file build.ID) (path string, unlock func(), err error) { err = convertErr(err) return } - -func NewHandler(c *Cache) http.Handler { - panic("implement me") -} diff --git a/distbuild/pkg/filecache/filecache_test.go b/distbuild/pkg/filecache/filecache_test.go index 346fc1b..6575b04 100644 --- a/distbuild/pkg/filecache/filecache_test.go +++ b/distbuild/pkg/filecache/filecache_test.go @@ -1,28 +1,45 @@ -package filecache +package filecache_test import ( "errors" "io/ioutil" + "os" "testing" "github.com/stretchr/testify/require" "gitlab.com/slon/shad-go/distbuild/pkg/build" + "gitlab.com/slon/shad-go/distbuild/pkg/filecache" ) -func TestFileCache(t *testing.T) { +type testCache struct { + *filecache.Cache + tmpDir string +} + +func newCache(t *testing.T) *testCache { tmpDir, err := ioutil.TempDir("", "filecache") require.NoError(t, err) - cache, err := New(tmpDir) + c, err := filecache.New(tmpDir) require.NoError(t, err) + return &testCache{Cache: c, tmpDir: tmpDir} +} + +func (c *testCache) cleanup() error { + return os.Remove(c.tmpDir) +} + +func TestFileCache(t *testing.T) { + cache := newCache(t) + _, abort, err := cache.Write(build.ID{01}) require.NoError(t, err) require.NoError(t, abort()) _, _, err = cache.Get(build.ID{01}) - require.Truef(t, errors.Is(err, ErrNotFound), "real error: %v", err) + require.Truef(t, errors.Is(err, filecache.ErrNotFound), "%v", err) f, _, err := cache.Write(build.ID{02}) require.NoError(t, err) diff --git a/distbuild/pkg/filecache/handler.go b/distbuild/pkg/filecache/handler.go new file mode 100644 index 0000000..920776b --- /dev/null +++ b/distbuild/pkg/filecache/handler.go @@ -0,0 +1,96 @@ +package filecache + +import ( + "errors" + "fmt" + "io" + "net/http" + "os" + + "go.uber.org/zap" + "golang.org/x/sync/singleflight" + + "gitlab.com/slon/shad-go/distbuild/pkg/build" +) + +type Handler struct { + l *zap.Logger + cache *Cache + single singleflight.Group +} + +func NewHandler(l *zap.Logger, cache *Cache) *Handler { + return &Handler{ + l: l, + cache: cache, + } +} + +func (h *Handler) Register(mux *http.ServeMux) { + mux.HandleFunc("/file", h.file) +} + +func (h *Handler) doGet(w http.ResponseWriter, r *http.Request, id build.ID) error { + path, unlock, err := h.cache.Get(id) + if err != nil { + return err + } + defer unlock() + + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + + if _, err = io.Copy(w, f); err != nil { + h.l.Warn("error streaming file", zap.Error(err)) + } + return nil +} + +func (h *Handler) doPut(w http.ResponseWriter, r *http.Request, id build.ID) error { + _, err, _ := h.single.Do(id.String(), func() (interface{}, error) { + w, abort, err := h.cache.Write(id) + if errors.Is(err, ErrExists) { + return nil, nil + } else if err != nil { + return nil, err + } + defer abort() + + if _, err = io.Copy(w, r.Body); err != nil { + return nil, err + } + return nil, w.Close() + }) + + if err != nil { + return err + } + + w.WriteHeader(http.StatusOK) + return nil +} + +func (h *Handler) file(w http.ResponseWriter, r *http.Request) { + var id build.ID + err := id.UnmarshalText([]byte(r.URL.Query().Get("id"))) + + if err == nil { + switch r.Method { + case http.MethodGet: + err = h.doGet(w, r, id) + case http.MethodPut: + err = h.doPut(w, r, id) + default: + err = fmt.Errorf("filehandler: unsupported method %s", r.Method) + } + } + + if err != nil { + h.l.Warn("file error", zap.String("method", r.Method), zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + _, _ = fmt.Fprintf(w, "%v", err) + } +} diff --git a/go.mod b/go.mod index 6a78e46..8ebf696 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( go.uber.org/zap v1.14.0 golang.org/x/net v0.0.0-20190628185345-da137c7871d7 golang.org/x/perf v0.0.0-20191209155426-36b577b0eb03 + golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/tools v0.0.0-20200125223703-d33eef8e6825 gopkg.in/yaml.v2 v2.2.8 )