diff --git a/distbuild/README.md b/distbuild/README.md index 51468b8..5098e8e 100644 --- a/distbuild/README.md +++ b/distbuild/README.md @@ -87,7 +87,8 @@ type Job struct { - [`distbuild/pkg/build`](./pkg/build) - определение графа сборки. В этом пакете ничего писать не нужно, нужно ознакомиться с существующим кодом. -- [`distbuild/pkg/tarstream`](./pkg/tarstream) - передача директории через сокет. +- [`distbuild/pkg/tarstream`](./pkg/tarstream) - передача директории через сокет. В этом пакете ничего + писать не нужно, нужно ознакомиться с существующим кодом. - [`distbuild/pkg/api`](./pkg/api) - протокол общения между компонентами. - [`distbuild/pkg/artifact`](./pkg/artifact) - кеш артефактов и протокол передачи артефактов между воркерами. - [`distbuild/pkg/filecache`](./pkg/filecache) - кеш файлов и протокол передачи файлов между компонентами. diff --git a/distbuild/pkg/artifact/README.md b/distbuild/pkg/artifact/README.md index 3861a9a..ba836dd 100644 --- a/distbuild/pkg/artifact/README.md +++ b/distbuild/pkg/artifact/README.md @@ -6,19 +6,14 @@ набора файлов и директорий. Основной тип `artifact.Cache` занимается хранением артефактов на диске и контролем одновременного доступа. -Все методы `artifact.Cache` должны быть *concurrency safe*. +Все методы `artifact.Cache` *concurrency safe*. Одна горутина может начать писать артефакт. Начало записи берёт лок на запись. Никто другой не может работать с артефактом, на который взят лок на запись. Горутина должна позвать `commit` или `abort` после того, как она закончила работать с артефактом. `commit` помещает артефакт в кеш. `abort` отменяет запись артефакта, удаляя все данные. -После первого вызова `commit` все последующие вызовы `commit` и `abort` должны ничего не делать. - -Точно так же, после вызова `abort` все последующие вызовы `commit` и `abort` должны ничего не делать. - -Горутина может начать читать артефакт, позвав метод `Get`. Много горутин могут читать артефакт одновременно. -Горутина должна позвать `unlock`, после того как она закончила работать с артефактом. +Реализация `artifact.Cache` вам дана. ## Скачивание артефакта diff --git a/distbuild/pkg/artifact/cache.go b/distbuild/pkg/artifact/cache.go index 0d69c9d..5ad9a1a 100644 --- a/distbuild/pkg/artifact/cache.go +++ b/distbuild/pkg/artifact/cache.go @@ -1,9 +1,13 @@ -//go:build !solution - package artifact import ( + "encoding/hex" "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sync" "gitlab.com/slon/shad-go/distbuild/pkg/build" ) @@ -16,24 +20,172 @@ var ( ) type Cache struct { + tmpDir string + cacheDir string + + mu sync.Mutex + writeLocked map[build.ID]struct{} + readLocked map[build.ID]int } func NewCache(root string) (*Cache, error) { - panic("implement me") + tmpDir := filepath.Join(root, "tmp") + + if err := os.RemoveAll(tmpDir); err != nil { + return nil, err + } + if err := os.MkdirAll(tmpDir, 0777); err != nil { + return nil, err + } + + cacheDir := filepath.Join(root, "c") + if err := os.MkdirAll(cacheDir, 0777); err != nil { + return nil, err + } + + for i := 0; i < 256; i++ { + d := hex.EncodeToString([]byte{uint8(i)}) + if err := os.MkdirAll(filepath.Join(cacheDir, d), 0777); err != nil { + return nil, err + } + } + + return &Cache{ + tmpDir: tmpDir, + cacheDir: cacheDir, + writeLocked: make(map[build.ID]struct{}), + readLocked: make(map[build.ID]int), + }, nil +} + +func (c *Cache) readLock(id build.ID) error { + c.mu.Lock() + defer c.mu.Unlock() + + if _, ok := c.writeLocked[id]; ok { + return ErrWriteLocked + } + + c.readLocked[id]++ + return nil +} + +func (c *Cache) readUnlock(id build.ID) { + c.mu.Lock() + defer c.mu.Unlock() + + c.readLocked[id]-- + if c.readLocked[id] == 0 { + delete(c.readLocked, id) + } +} + +func (c *Cache) writeLock(id build.ID, remove bool) error { + c.mu.Lock() + defer c.mu.Unlock() + + _, err := os.Stat(filepath.Join(c.cacheDir, id.Path())) + if !os.IsNotExist(err) && err != nil { + return err + } else if err == nil && !remove { + return ErrExists + } + + if _, ok := c.writeLocked[id]; ok { + return ErrWriteLocked + } + if c.readLocked[id] > 0 { + return ErrReadLocked + } + + c.writeLocked[id] = struct{}{} + return nil +} + +func (c *Cache) writeUnlock(id build.ID) { + c.mu.Lock() + defer c.mu.Unlock() + + delete(c.writeLocked, id) } func (c *Cache) Range(artifactFn func(artifact build.ID) error) error { - panic("implement me") + shards, err := ioutil.ReadDir(c.cacheDir) + if err != nil { + return err + } + + for _, shard := range shards { + dirs, err := ioutil.ReadDir(filepath.Join(c.cacheDir, shard.Name())) + if err != nil { + return err + } + + for _, d := range dirs { + var id build.ID + if err := id.UnmarshalText([]byte(d.Name())); err != nil { + return fmt.Errorf("invalid artifact name: %w", err) + } + + if err := artifactFn(id); err != nil { + return err + } + } + } + + return nil } func (c *Cache) Remove(artifact build.ID) error { - panic("implement me") + if err := c.writeLock(artifact, true); err != nil { + return err + } + defer c.writeUnlock(artifact) + + return os.RemoveAll(filepath.Join(c.cacheDir, artifact.Path())) } func (c *Cache) Create(artifact build.ID) (path string, commit, abort func() error, err error) { - panic("implement me") + if err = c.writeLock(artifact, false); err != nil { + return + } + + path = filepath.Join(c.tmpDir, artifact.String()) + if err = os.MkdirAll(path, 0777); err != nil { + c.writeUnlock(artifact) + return + } + + abort = func() error { + defer c.writeUnlock(artifact) + return os.RemoveAll(path) + } + + commit = func() error { + defer c.writeUnlock(artifact) + return os.Rename(path, filepath.Join(c.cacheDir, artifact.Path())) + } + + return } func (c *Cache) Get(artifact build.ID) (path string, unlock func(), err error) { - panic("implement me") + if err = c.readLock(artifact); err != nil { + return + } + + path = filepath.Join(c.cacheDir, artifact.Path()) + if _, err = os.Stat(path); err != nil { + c.readUnlock(artifact) + + if os.IsNotExist(err) { + err = ErrNotFound + } + return + } + + unlock = func() { + c.readUnlock(artifact) + } + return } diff --git a/distbuild/pkg/filecache/README.md b/distbuild/pkg/filecache/README.md index dc8f108..bd3cfa0 100644 --- a/distbuild/pkg/filecache/README.md +++ b/distbuild/pkg/filecache/README.md @@ -2,8 +2,7 @@ Пакет `filecache` занимается хранением кеша файлов и определяет протокол передачи файлов между частями системы. -`filecache.Cache` управляет файлами и занимается контролем одновременного доступа. Вы можете реализовать этот -тип поверх `*artifact.Cache`, поведение требуется точно такое же. +`filecache.Cache` управляет файлами и занимается контролем одновременного доступа. Реализация этого типа вам уже дана. ## Передача файлов diff --git a/distbuild/pkg/filecache/filecache.go b/distbuild/pkg/filecache/filecache.go index 768f17f..8f31d50 100644 --- a/distbuild/pkg/filecache/filecache.go +++ b/distbuild/pkg/filecache/filecache.go @@ -1,11 +1,12 @@ -//go:build !solution - package filecache import ( "errors" "io" + "os" + "path/filepath" + "gitlab.com/slon/shad-go/distbuild/pkg/artifact" "gitlab.com/slon/shad-go/distbuild/pkg/build" ) @@ -16,25 +17,95 @@ var ( ErrReadLocked = errors.New("file is locked for read") ) +const fileName = "file" + +func convertErr(err error) error { + switch { + case errors.Is(err, artifact.ErrNotFound): + return ErrNotFound + case errors.Is(err, artifact.ErrExists): + return ErrExists + case errors.Is(err, artifact.ErrWriteLocked): + return ErrWriteLocked + case errors.Is(err, artifact.ErrReadLocked): + return ErrReadLocked + default: + return err + } +} + type Cache struct { + cache *artifact.Cache } func New(rootDir string) (*Cache, error) { - panic("implement me") + cache, err := artifact.NewCache(rootDir) + if err != nil { + return nil, err + } + + c := &Cache{cache: cache} + return c, nil } func (c *Cache) Range(fileFn func(file build.ID) error) error { - panic("implement me") + return c.cache.Range(fileFn) } func (c *Cache) Remove(file build.ID) error { - panic("implement me") + return convertErr(c.cache.Remove(file)) +} + +type fileWriter struct { + f *os.File + commit func() error +} + +func (f *fileWriter) Write(p []byte) (int, error) { + return f.f.Write(p) +} + +func (f *fileWriter) Close() error { + closeErr := f.f.Close() + commitErr := f.commit() + + if closeErr != nil { + return closeErr + } + + return commitErr } func (c *Cache) Write(file build.ID) (w io.WriteCloser, abort func() error, err error) { - panic("implement me") + path, commit, abortDir, err := c.cache.Create(file) + if err != nil { + err = convertErr(err) + return + } + + f, err := os.Create(filepath.Join(path, fileName)) + if err != nil { + _ = abort() + return + } + + w = &fileWriter{f: f, commit: commit} + abort = func() error { + closeErr := f.Close() + abortErr := abortDir() + + if closeErr != nil { + return closeErr + } + + return abortErr + } + return } func (c *Cache) Get(file build.ID) (path string, unlock func(), err error) { - panic("implement me") + root, unlock, err := c.cache.Get(file) + path = filepath.Join(root, fileName) + err = convertErr(err) + return } diff --git a/distbuild/pkg/tarstream/README.md b/distbuild/pkg/tarstream/README.md deleted file mode 100644 index 8357a12..0000000 --- a/distbuild/pkg/tarstream/README.md +++ /dev/null @@ -1,21 +0,0 @@ -# tarstream - -Вам нужно уметь передавать директорию с артефактами между воркерами. Для этого вам нужно -реализовать две операции: - -```go -package tarstream - -import "io" - -// Send рекурсивно обходит директорию и сериализует её содержимое в поток w. -func Send(dir string, w io.Writer) error - -// Receive читает поток r и материализует содержимое потока внутри dir. -func Receive(dir string, r io.Reader) error -``` - -- Функции должны корректно обрабатывать директории и обычные файлы. -- executable бит на файлах должен сохраняться. -- Используйте формат [tar](https://golang.org/pkg/archive/tar/) -- Используйте [filepath.Walk](https://golang.org/pkg/path/filepath/) для рекурсивного обхода. diff --git a/distbuild/pkg/tarstream/stream.go b/distbuild/pkg/tarstream/stream.go index b4d760b..4390a2c 100644 --- a/distbuild/pkg/tarstream/stream.go +++ b/distbuild/pkg/tarstream/stream.go @@ -1,15 +1,100 @@ -//go:build !solution - package tarstream import ( + "archive/tar" "io" + "os" + "path/filepath" ) +// Send рекурсивно обходит директорию и сериализует её содержимое в поток w. func Send(dir string, w io.Writer) error { - panic("implement me") + tw := tar.NewWriter(w) + + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + rel, err := filepath.Rel(dir, path) + if err != nil { + return err + } + + if rel == "." { + return nil + } + + switch { + case info.IsDir(): + return tw.WriteHeader(&tar.Header{ + Name: rel, + Typeflag: tar.TypeDir, + }) + + default: + h := &tar.Header{ + Typeflag: tar.TypeReg, + Name: rel, + Size: info.Size(), + Mode: int64(info.Mode()), + } + + if err := tw.WriteHeader(h); err != nil { + return err + } + + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + + _, err = io.Copy(tw, f) + return err + } + }) + + if err != nil { + return err + } + + return tw.Close() } +// Receive читает поток r и материализует содержимое потока внутри dir. func Receive(dir string, r io.Reader) error { - panic("implement me") + tr := tar.NewReader(r) + + for { + h, err := tr.Next() + if err == io.EOF { + return nil + } else if err != nil { + return err + } + + absPath := filepath.Join(dir, h.Name) + + if h.Typeflag == tar.TypeDir { + if err := os.Mkdir(absPath, 0777); err != nil { + return err + } + } else { + writeFile := func() error { + f, err := os.OpenFile(absPath, os.O_CREATE|os.O_WRONLY, os.FileMode(h.Mode)) + if err != nil { + return err + } + defer f.Close() + + _, err = io.Copy(f, tr) + return err + } + + if err := writeFile(); err != nil { + return err + } + } + } }