Release filecache, artifactcache and tarstream source code

This commit is contained in:
Fedor Korotkiy 2023-04-26 16:06:56 +04:00
parent e8d1af24b3
commit 730654cac8
7 changed files with 331 additions and 49 deletions

View file

@ -87,7 +87,8 @@ type Job struct {
- [`distbuild/pkg/build`](./pkg/build) - определение графа сборки. В этом пакете ничего писать не нужно, - [`distbuild/pkg/build`](./pkg/build) - определение графа сборки. В этом пакете ничего писать не нужно,
нужно ознакомиться с существующим кодом. нужно ознакомиться с существующим кодом.
- [`distbuild/pkg/tarstream`](./pkg/tarstream) - передача директории через сокет. - [`distbuild/pkg/tarstream`](./pkg/tarstream) - передача директории через сокет. В этом пакете ничего
писать не нужно, нужно ознакомиться с существующим кодом.
- [`distbuild/pkg/api`](./pkg/api) - протокол общения между компонентами. - [`distbuild/pkg/api`](./pkg/api) - протокол общения между компонентами.
- [`distbuild/pkg/artifact`](./pkg/artifact) - кеш артефактов и протокол передачи артефактов между воркерами. - [`distbuild/pkg/artifact`](./pkg/artifact) - кеш артефактов и протокол передачи артефактов между воркерами.
- [`distbuild/pkg/filecache`](./pkg/filecache) - кеш файлов и протокол передачи файлов между компонентами. - [`distbuild/pkg/filecache`](./pkg/filecache) - кеш файлов и протокол передачи файлов между компонентами.

View file

@ -6,19 +6,14 @@
набора файлов и директорий. набора файлов и директорий.
Основной тип `artifact.Cache` занимается хранением артефактов на диске и контролем одновременного доступа. Основной тип `artifact.Cache` занимается хранением артефактов на диске и контролем одновременного доступа.
Все методы `artifact.Cache` должны быть *concurrency safe*. Все методы `artifact.Cache` *concurrency safe*.
Одна горутина может начать писать артефакт. Начало записи берёт лок на запись. Никто другой не может работать с артефактом, Одна горутина может начать писать артефакт. Начало записи берёт лок на запись. Никто другой не может работать с артефактом,
на который взят лок на запись. Горутина должна позвать `commit` или `abort` после того, как она закончила работать с артефактом. на который взят лок на запись. Горутина должна позвать `commit` или `abort` после того, как она закончила работать с артефактом.
`commit` помещает артефакт в кеш. `abort` отменяет запись артефакта, удаляя все данные. `commit` помещает артефакт в кеш. `abort` отменяет запись артефакта, удаляя все данные.
После первого вызова `commit` все последующие вызовы `commit` и `abort` должны ничего не делать. Реализация `artifact.Cache` вам дана.
Точно так же, после вызова `abort` все последующие вызовы `commit` и `abort` должны ничего не делать.
Горутина может начать читать артефакт, позвав метод `Get`. Много горутин могут читать артефакт одновременно.
Горутина должна позвать `unlock`, после того как она закончила работать с артефактом.
## Скачивание артефакта ## Скачивание артефакта

View file

@ -1,9 +1,13 @@
//go:build !solution
package artifact package artifact
import ( import (
"encoding/hex"
"errors" "errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
) )
@ -16,24 +20,172 @@ var (
) )
type Cache struct { type Cache struct {
tmpDir string
cacheDir string
mu sync.Mutex
writeLocked map[build.ID]struct{}
readLocked map[build.ID]int
} }
func NewCache(root string) (*Cache, error) { func NewCache(root string) (*Cache, error) {
panic("implement me") tmpDir := filepath.Join(root, "tmp")
if err := os.RemoveAll(tmpDir); err != nil {
return nil, err
}
if err := os.MkdirAll(tmpDir, 0777); err != nil {
return nil, err
}
cacheDir := filepath.Join(root, "c")
if err := os.MkdirAll(cacheDir, 0777); err != nil {
return nil, err
}
for i := 0; i < 256; i++ {
d := hex.EncodeToString([]byte{uint8(i)})
if err := os.MkdirAll(filepath.Join(cacheDir, d), 0777); err != nil {
return nil, err
}
}
return &Cache{
tmpDir: tmpDir,
cacheDir: cacheDir,
writeLocked: make(map[build.ID]struct{}),
readLocked: make(map[build.ID]int),
}, nil
}
func (c *Cache) readLock(id build.ID) error {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.writeLocked[id]; ok {
return ErrWriteLocked
}
c.readLocked[id]++
return nil
}
func (c *Cache) readUnlock(id build.ID) {
c.mu.Lock()
defer c.mu.Unlock()
c.readLocked[id]--
if c.readLocked[id] == 0 {
delete(c.readLocked, id)
}
}
func (c *Cache) writeLock(id build.ID, remove bool) error {
c.mu.Lock()
defer c.mu.Unlock()
_, err := os.Stat(filepath.Join(c.cacheDir, id.Path()))
if !os.IsNotExist(err) && err != nil {
return err
} else if err == nil && !remove {
return ErrExists
}
if _, ok := c.writeLocked[id]; ok {
return ErrWriteLocked
}
if c.readLocked[id] > 0 {
return ErrReadLocked
}
c.writeLocked[id] = struct{}{}
return nil
}
func (c *Cache) writeUnlock(id build.ID) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.writeLocked, id)
} }
func (c *Cache) Range(artifactFn func(artifact build.ID) error) error { func (c *Cache) Range(artifactFn func(artifact build.ID) error) error {
panic("implement me") shards, err := ioutil.ReadDir(c.cacheDir)
if err != nil {
return err
}
for _, shard := range shards {
dirs, err := ioutil.ReadDir(filepath.Join(c.cacheDir, shard.Name()))
if err != nil {
return err
}
for _, d := range dirs {
var id build.ID
if err := id.UnmarshalText([]byte(d.Name())); err != nil {
return fmt.Errorf("invalid artifact name: %w", err)
}
if err := artifactFn(id); err != nil {
return err
}
}
}
return nil
} }
func (c *Cache) Remove(artifact build.ID) error { func (c *Cache) Remove(artifact build.ID) error {
panic("implement me") if err := c.writeLock(artifact, true); err != nil {
return err
}
defer c.writeUnlock(artifact)
return os.RemoveAll(filepath.Join(c.cacheDir, artifact.Path()))
} }
func (c *Cache) Create(artifact build.ID) (path string, commit, abort func() error, err error) { func (c *Cache) Create(artifact build.ID) (path string, commit, abort func() error, err error) {
panic("implement me") if err = c.writeLock(artifact, false); err != nil {
return
}
path = filepath.Join(c.tmpDir, artifact.String())
if err = os.MkdirAll(path, 0777); err != nil {
c.writeUnlock(artifact)
return
}
abort = func() error {
defer c.writeUnlock(artifact)
return os.RemoveAll(path)
}
commit = func() error {
defer c.writeUnlock(artifact)
return os.Rename(path, filepath.Join(c.cacheDir, artifact.Path()))
}
return
} }
func (c *Cache) Get(artifact build.ID) (path string, unlock func(), err error) { func (c *Cache) Get(artifact build.ID) (path string, unlock func(), err error) {
panic("implement me") if err = c.readLock(artifact); err != nil {
return
}
path = filepath.Join(c.cacheDir, artifact.Path())
if _, err = os.Stat(path); err != nil {
c.readUnlock(artifact)
if os.IsNotExist(err) {
err = ErrNotFound
}
return
}
unlock = func() {
c.readUnlock(artifact)
}
return
} }

View file

@ -2,8 +2,7 @@
Пакет `filecache` занимается хранением кеша файлов и определяет протокол передачи файлов между частями системы. Пакет `filecache` занимается хранением кеша файлов и определяет протокол передачи файлов между частями системы.
`filecache.Cache` управляет файлами и занимается контролем одновременного доступа. Вы можете реализовать этот `filecache.Cache` управляет файлами и занимается контролем одновременного доступа. Реализация этого типа вам уже дана.
тип поверх `*artifact.Cache`, поведение требуется точно такое же.
## Передача файлов ## Передача файлов

View file

@ -1,11 +1,12 @@
//go:build !solution
package filecache package filecache
import ( import (
"errors" "errors"
"io" "io"
"os"
"path/filepath"
"gitlab.com/slon/shad-go/distbuild/pkg/artifact"
"gitlab.com/slon/shad-go/distbuild/pkg/build" "gitlab.com/slon/shad-go/distbuild/pkg/build"
) )
@ -16,25 +17,95 @@ var (
ErrReadLocked = errors.New("file is locked for read") ErrReadLocked = errors.New("file is locked for read")
) )
const fileName = "file"
func convertErr(err error) error {
switch {
case errors.Is(err, artifact.ErrNotFound):
return ErrNotFound
case errors.Is(err, artifact.ErrExists):
return ErrExists
case errors.Is(err, artifact.ErrWriteLocked):
return ErrWriteLocked
case errors.Is(err, artifact.ErrReadLocked):
return ErrReadLocked
default:
return err
}
}
type Cache struct { type Cache struct {
cache *artifact.Cache
} }
func New(rootDir string) (*Cache, error) { func New(rootDir string) (*Cache, error) {
panic("implement me") cache, err := artifact.NewCache(rootDir)
if err != nil {
return nil, err
}
c := &Cache{cache: cache}
return c, nil
} }
func (c *Cache) Range(fileFn func(file build.ID) error) error { func (c *Cache) Range(fileFn func(file build.ID) error) error {
panic("implement me") return c.cache.Range(fileFn)
} }
func (c *Cache) Remove(file build.ID) error { func (c *Cache) Remove(file build.ID) error {
panic("implement me") return convertErr(c.cache.Remove(file))
}
type fileWriter struct {
f *os.File
commit func() error
}
func (f *fileWriter) Write(p []byte) (int, error) {
return f.f.Write(p)
}
func (f *fileWriter) Close() error {
closeErr := f.f.Close()
commitErr := f.commit()
if closeErr != nil {
return closeErr
}
return commitErr
} }
func (c *Cache) Write(file build.ID) (w io.WriteCloser, abort func() error, err error) { func (c *Cache) Write(file build.ID) (w io.WriteCloser, abort func() error, err error) {
panic("implement me") path, commit, abortDir, err := c.cache.Create(file)
if err != nil {
err = convertErr(err)
return
}
f, err := os.Create(filepath.Join(path, fileName))
if err != nil {
_ = abort()
return
}
w = &fileWriter{f: f, commit: commit}
abort = func() error {
closeErr := f.Close()
abortErr := abortDir()
if closeErr != nil {
return closeErr
}
return abortErr
}
return
} }
func (c *Cache) Get(file build.ID) (path string, unlock func(), err error) { func (c *Cache) Get(file build.ID) (path string, unlock func(), err error) {
panic("implement me") root, unlock, err := c.cache.Get(file)
path = filepath.Join(root, fileName)
err = convertErr(err)
return
} }

View file

@ -1,21 +0,0 @@
# tarstream
Вам нужно уметь передавать директорию с артефактами между воркерами. Для этого вам нужно
реализовать две операции:
```go
package tarstream
import "io"
// Send рекурсивно обходит директорию и сериализует её содержимое в поток w.
func Send(dir string, w io.Writer) error
// Receive читает поток r и материализует содержимое потока внутри dir.
func Receive(dir string, r io.Reader) error
```
- Функции должны корректно обрабатывать директории и обычные файлы.
- executable бит на файлах должен сохраняться.
- Используйте формат [tar](https://golang.org/pkg/archive/tar/)
- Используйте [filepath.Walk](https://golang.org/pkg/path/filepath/) для рекурсивного обхода.

View file

@ -1,15 +1,100 @@
//go:build !solution
package tarstream package tarstream
import ( import (
"archive/tar"
"io" "io"
"os"
"path/filepath"
) )
// Send рекурсивно обходит директорию и сериализует её содержимое в поток w.
func Send(dir string, w io.Writer) error { func Send(dir string, w io.Writer) error {
panic("implement me") tw := tar.NewWriter(w)
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
} }
func Receive(dir string, r io.Reader) error { rel, err := filepath.Rel(dir, path)
panic("implement me") if err != nil {
return err
}
if rel == "." {
return nil
}
switch {
case info.IsDir():
return tw.WriteHeader(&tar.Header{
Name: rel,
Typeflag: tar.TypeDir,
})
default:
h := &tar.Header{
Typeflag: tar.TypeReg,
Name: rel,
Size: info.Size(),
Mode: int64(info.Mode()),
}
if err := tw.WriteHeader(h); err != nil {
return err
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(tw, f)
return err
}
})
if err != nil {
return err
}
return tw.Close()
}
// Receive читает поток r и материализует содержимое потока внутри dir.
func Receive(dir string, r io.Reader) error {
tr := tar.NewReader(r)
for {
h, err := tr.Next()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
absPath := filepath.Join(dir, h.Name)
if h.Typeflag == tar.TypeDir {
if err := os.Mkdir(absPath, 0777); err != nil {
return err
}
} else {
writeFile := func() error {
f, err := os.OpenFile(absPath, os.O_CREATE|os.O_WRONLY, os.FileMode(h.Mode))
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(f, tr)
return err
}
if err := writeFile(); err != nil {
return err
}
}
}
} }