shad-go/disttest/fixture.go

189 lines
4.2 KiB
Go
Raw Normal View History

2020-03-11 22:46:45 +00:00
package disttest
import (
"context"
2020-03-14 10:24:44 +00:00
"errors"
2020-03-11 22:46:45 +00:00
"fmt"
2020-04-20 21:08:13 +00:00
"io/ioutil"
2020-03-11 22:46:45 +00:00
"net"
"net/http"
"net/url"
2020-03-11 22:46:45 +00:00
"os"
"path/filepath"
"runtime"
2020-03-11 22:46:45 +00:00
"testing"
"github.com/stretchr/testify/require"
2020-04-05 12:47:58 +00:00
"go.uber.org/goleak"
2020-03-29 16:03:07 +00:00
"gitlab.com/slon/shad-go/distbuild/pkg/api"
2020-03-11 22:46:45 +00:00
"gitlab.com/slon/shad-go/distbuild/pkg/artifact"
"gitlab.com/slon/shad-go/distbuild/pkg/client"
"gitlab.com/slon/shad-go/distbuild/pkg/dist"
"gitlab.com/slon/shad-go/distbuild/pkg/filecache"
"gitlab.com/slon/shad-go/distbuild/pkg/worker"
"gitlab.com/slon/shad-go/tools/testtool"
"go.uber.org/zap"
)
type env struct {
RootDir string
Logger *zap.Logger
Ctx context.Context
Client *client.Client
Coordinator *dist.Coordinator
Workers []*worker.Worker
HTTP *http.Server
}
2020-04-04 20:11:21 +00:00
const (
logToStderr = true
)
2020-03-11 22:46:45 +00:00
2020-04-05 12:00:33 +00:00
type Config struct {
WorkerCount int
}
func newEnv(t *testing.T, config *Config) (e *env, cancel func()) {
2020-03-11 22:46:45 +00:00
cwd, err := os.Getwd()
require.NoError(t, err)
absCWD, err := filepath.Abs(cwd)
require.NoError(t, err)
2020-04-20 21:08:13 +00:00
rootDir := filepath.Join(absCWD, "workdir", t.Name())
require.NoError(t, os.RemoveAll(rootDir))
if err = os.MkdirAll(rootDir, 0777); err != nil {
if errors.Is(err, os.ErrPermission) {
rootDir, err = ioutil.TempDir("", "")
require.NoError(t, err)
} else {
require.NoError(t, err)
}
2020-03-11 22:46:45 +00:00
}
2020-04-20 21:08:13 +00:00
env := &env{
RootDir: rootDir,
}
2020-03-11 22:46:45 +00:00
cfg := zap.NewDevelopmentConfig()
2020-04-29 11:27:44 +00:00
if runtime.GOOS == "windows" {
cfg.OutputPaths = []string{filepath.Join("winfile://", env.RootDir, "test.log")}
err = zap.RegisterSink("winfile", newWinFileSink)
require.NoError(t, err)
} else {
cfg.OutputPaths = []string{filepath.Join(env.RootDir, "test.log")}
}
2020-04-04 20:11:21 +00:00
if logToStderr {
cfg.OutputPaths = append(cfg.OutputPaths, "stderr")
}
2020-03-11 22:46:45 +00:00
env.Logger, err = cfg.Build()
require.NoError(t, err)
t.Helper()
t.Logf("test is running inside %s; see test.log file for more info", filepath.Join("workdir", t.Name()))
port, err := testtool.GetFreePort()
require.NoError(t, err)
addr := "127.0.0.1:" + port
coordinatorEndpoint := "http://" + addr + "/coordinator"
var cancelRootContext func()
env.Ctx, cancelRootContext = context.WithCancel(context.Background())
2020-03-29 16:24:18 +00:00
env.Client = client.NewClient(
env.Logger.Named("client"),
coordinatorEndpoint,
2020-04-04 21:21:55 +00:00
filepath.Join(absCWD, "testdata", t.Name()))
2020-03-11 22:46:45 +00:00
coordinatorCache, err := filecache.New(filepath.Join(env.RootDir, "coordinator", "filecache"))
require.NoError(t, err)
env.Coordinator = dist.NewCoordinator(
env.Logger.Named("coordinator"),
coordinatorCache,
)
2020-03-28 21:34:09 +00:00
router := http.NewServeMux()
router.Handle("/coordinator/", http.StripPrefix("/coordinator", env.Coordinator))
2020-04-05 12:00:33 +00:00
for i := 0; i < config.WorkerCount; i++ {
2020-03-11 22:46:45 +00:00
workerName := fmt.Sprintf("worker%d", i)
workerDir := filepath.Join(env.RootDir, workerName)
2020-03-28 14:35:01 +00:00
var fileCache *filecache.Cache
fileCache, err = filecache.New(filepath.Join(workerDir, "filecache"))
2020-03-11 22:46:45 +00:00
require.NoError(t, err)
2020-03-28 14:35:01 +00:00
var artifacts *artifact.Cache
artifacts, err = artifact.NewCache(filepath.Join(workerDir, "artifacts"))
2020-03-11 22:46:45 +00:00
require.NoError(t, err)
2020-03-28 21:34:09 +00:00
workerPrefix := fmt.Sprintf("/worker/%d", i)
2020-03-29 16:03:07 +00:00
workerID := api.WorkerID("http://" + addr + workerPrefix)
2020-03-28 21:34:09 +00:00
2020-03-11 22:46:45 +00:00
w := worker.New(
2020-03-28 21:34:09 +00:00
workerID,
2020-03-11 22:46:45 +00:00
coordinatorEndpoint,
env.Logger.Named(workerName),
fileCache,
artifacts,
)
env.Workers = append(env.Workers, w)
2020-03-28 21:34:09 +00:00
router.Handle(workerPrefix+"/", http.StripPrefix(workerPrefix, w))
2020-03-11 22:46:45 +00:00
}
env.HTTP = &http.Server{
Addr: addr,
2020-03-28 21:34:09 +00:00
Handler: router,
2020-03-11 22:46:45 +00:00
}
lsn, err := net.Listen("tcp", env.HTTP.Addr)
require.NoError(t, err)
go func() {
2020-03-14 10:24:44 +00:00
err := env.HTTP.Serve(lsn)
if err != http.ErrServerClosed {
env.Logger.Fatal("http server stopped", zap.Error(err))
}
2020-03-11 22:46:45 +00:00
}()
2020-03-14 10:24:44 +00:00
for _, w := range env.Workers {
go func(w *worker.Worker) {
err := w.Run(env.Ctx)
if errors.Is(err, context.Canceled) {
return
}
env.Logger.Fatal("worker stopped", zap.Error(err))
}(w)
}
2020-03-11 22:46:45 +00:00
return env, func() {
cancelRootContext()
_ = env.HTTP.Shutdown(context.Background())
_ = env.Logger.Sync()
2020-04-05 12:47:58 +00:00
goleak.VerifyNone(t)
2020-03-11 22:46:45 +00:00
}
}
func newWinFileSink(u *url.URL) (zap.Sink, error) {
if len(u.Opaque) > 0 {
// Remove leading slash left by url.Parse()
return os.OpenFile(u.Opaque[1:], os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
}
// if url.URL is empty, don't panic slice index error
return os.OpenFile(u.Opaque, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
}