Add gossip task

This commit is contained in:
Fedor Korotkiy 2023-04-22 12:58:14 +04:00
parent 5c8dd14d7e
commit e41a5d63f0
7 changed files with 522 additions and 0 deletions

7
gossip/Makefile Normal file
View file

@ -0,0 +1,7 @@
default:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative meshpb/protocol.proto
solution:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative meshpb/solution.proto
.PHONY: default solution

57
gossip/README.md Normal file
View file

@ -0,0 +1,57 @@
# gossip
В этой задаче вам нужно придумать и реализовать протокол gossip.
Gossip-ом называют протоколы, которые применяют в распределённых системах без
центральной конфигурации, чтобы обнаруживать участников протокола. Участники
протокола периодически рассказывают друг другу про своих соседей, так что
каждый "пир" (peer) знает всё про всех. Новому "пир"-у достаточно подключиться
к одному из членов группы, чтобы узнать про всех остальных.
Протокол вам нужно придумать самим, но он должен соответствовать интерфейсу и
иметь нужные свойства.
* Каждый участник протокола имеет уникальный идентификатор, совпадающий с его адресом.
Этот адрес/идентификатор передаётся вам в конструкторе.
* Каждый участник знает метаданные про себя. Метаданные описываются типом `*meshpb.PeerMeta`.
Метаданные могут обновляться в процессе работы.
* Каждый участник eventually должен узнать идентификаторы и метаданные всех остальных участников.
Снепшот этой информации должен возвращать метод `GetMembers`.
* Метод `AddSeed` добавляет адрес `seed`-а. По этому адресу может находиться другой участник
(а может и не находиться). Адрес нужно использовать, чтобы пытаться подключиться к группе.
Нельзя считать, что `seed` существует (возвращать его из GetMembers), пока к нему не удалось
подключиться.
* Если какой-то из участников становится недоступным (перестаёт отвечать на RPC вызовы), то он должен
быть удалён из группы (метод `GetMembers` должен перестать возвращать его).
* Сам `Peer` должен реализовывать `GossipService` из `.proto` файла. Конструктор `NewPeer`
не должен запускать горутины, это должен делать метод `Run`. Метод `Stop` должен
завершать все горутины.
Ваш протокол должен работать поверх GRPC, поэтому в начале ознакомьтесь
с [tutorial](https://grpc.io/docs/languages/go/basics/) по GRPC.
## Установка protoc
В этом задании вам нужно будет генерировать код для работы с GRPC.
Для того, чтобы заработал генератор, вам нужно установить несколько утилит.
Установите их по [инструкции](https://grpc.io/docs/languages/go/quickstart/#prerequisites).
После установки, у вас в PATH должны появиться 3 команды. Проверьте это:
```
protoc --help
protoc-gen-go --help
protoc-gen-go-grpc --help
```
## Генерация кода
Описание protobuf сообщений и сервисов находится в файле `meshpb/protocol.proto`
Команда для генерации записана в `Makefile`. Чтобы запустить её, перейдите в директорию `servicemesh` и выполните команду `make`.
```
prime@fedora ~/C/shad-go (master)> cd servicemesh/
prime@fedora ~/C/s/servicemesh (master)> make
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative meshpb/protocol.proto
```

47
gossip/gossip.go Normal file
View file

@ -0,0 +1,47 @@
//go:build !solution
package gossip
import (
"time"
"gitlab.com/slon/shad-go/gossip/meshpb"
"google.golang.org/grpc"
)
type PeerConfig struct {
SelfEndpoint string
PingPeriod time.Duration
}
type Peer struct {
config PeerConfig
}
func (p *Peer) UpdateMeta(meta *meshpb.PeerMeta) {
panic("implement me")
}
func (p *Peer) AddSeed(seed string) {
panic("implement me")
}
func (p *Peer) Addr() string {
return p.config.SelfEndpoint
}
func (p *Peer) GetMembers() map[string]*meshpb.PeerMeta {
panic("implement me")
}
func (p *Peer) Run() {
panic("implement me")
}
func (p *Peer) Stop() {
panic("implement me")
}
func NewPeer(config PeerConfig) *Peer {
panic("implement me")
}

182
gossip/gossip_test.go Normal file
View file

@ -0,0 +1,182 @@
package gossip_test
import (
"fmt"
"math/rand"
"net"
"testing"
"time"
"github.com/stretchr/testify/require"
"gitlab.com/slon/shad-go/gossip"
"gitlab.com/slon/shad-go/gossip/meshpb"
"go.uber.org/goleak"
"google.golang.org/grpc"
)
const (
pingPeriod = time.Millisecond * 10
waitPeriod = pingPeriod * 10
)
type env struct {
newPeer func() (*gossip.Peer, func())
}
func newEnv(t *testing.T) *env {
t.Cleanup(func() {
goleak.VerifyNone(t)
})
return &env{
newPeer: func() (*gossip.Peer, func()) {
lsn, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
peer := gossip.NewPeer(gossip.PeerConfig{
SelfEndpoint: lsn.Addr().String(),
PingPeriod: pingPeriod,
})
server := grpc.NewServer()
meshpb.RegisterGossipServiceServer(server, peer)
go func() { _ = server.Serve(lsn) }()
go peer.Run()
stop := func() {
server.Stop()
peer.Stop()
}
t.Cleanup(stop)
return peer, stop
},
}
}
func TestGossip_SinglePeer(t *testing.T) {
env := newEnv(t)
peer0, _ := env.newPeer()
members := peer0.GetMembers()
require.Contains(t, members, peer0.Addr())
peer0.UpdateMeta(&meshpb.PeerMeta{Name: "peer0"})
members = peer0.GetMembers()
require.Contains(t, members, peer0.Addr())
require.Equal(t, "peer0", members[peer0.Addr()].Name)
deadPeer, stopDeadPeer := env.newPeer()
stopDeadPeer()
peer0.AddSeed(deadPeer.Addr())
time.Sleep(waitPeriod)
require.Len(t, peer0.GetMembers(), 1)
}
func TestGossip_TwoPeers(t *testing.T) {
env := newEnv(t)
peer0, _ := env.newPeer()
peer1, stop1 := env.newPeer()
peer0.AddSeed(peer1.Addr())
time.Sleep(waitPeriod)
members0 := peer0.GetMembers()
require.Contains(t, members0, peer0.Addr())
require.Contains(t, members0, peer1.Addr())
members1 := peer1.GetMembers()
require.Contains(t, members1, peer0.Addr())
require.Contains(t, members1, peer1.Addr())
peer1.UpdateMeta(&meshpb.PeerMeta{Name: "bob"})
time.Sleep(waitPeriod)
require.Equal(t, "bob", peer0.GetMembers()[peer1.Addr()].Name)
peer1.UpdateMeta(&meshpb.PeerMeta{Name: "sam"})
time.Sleep(waitPeriod)
require.Equal(t, "sam", peer0.GetMembers()[peer1.Addr()].Name)
stop1()
time.Sleep(waitPeriod)
members0 = peer0.GetMembers()
require.NotContains(t, members0, peer1.Addr())
}
func TestGossip_ManyPeers(t *testing.T) {
env := newEnv(t)
seed, stopSeed := env.newPeer()
var peers []*gossip.Peer
names := map[string]string{}
for i := 0; i < 10; i++ {
peer, _ := env.newPeer()
peer.AddSeed(seed.Addr())
peer.UpdateMeta(&meshpb.PeerMeta{Name: fmt.Sprint(i)})
names[peer.Addr()] = fmt.Sprint(i)
peers = append(peers, peer)
}
time.Sleep(waitPeriod)
stopSeed()
time.Sleep(waitPeriod)
for _, peer := range peers {
members := peer.GetMembers()
require.NotContains(t, members, seed.Addr())
for addr, name := range names {
require.Contains(t, members, addr)
require.Equal(t, members[addr].Name, name)
}
}
peers[0].UpdateMeta(&meshpb.PeerMeta{Name: "leader"})
time.Sleep(waitPeriod)
for _, peer := range peers {
members := peer.GetMembers()
require.Contains(t, members, peers[0].Addr())
require.Equal(t, members[peers[0].Addr()].Name, "leader")
}
}
func TestGossip_Groups(t *testing.T) {
env := newEnv(t)
aSize, bSize := 1, 1
seedA, _ := env.newPeer()
seedB, _ := env.newPeer()
for i := 0; i < 10; i++ {
peer, _ := env.newPeer()
if rand.Int()%2 == 0 {
peer.AddSeed(seedA.Addr())
aSize++
} else {
peer.AddSeed(seedB.Addr())
bSize++
}
}
time.Sleep(waitPeriod)
require.Len(t, seedA.GetMembers(), aSize)
require.Len(t, seedB.GetMembers(), bSize)
seedA.AddSeed(seedB.Addr())
time.Sleep(waitPeriod)
require.Len(t, seedA.GetMembers(), aSize+bSize)
require.Len(t, seedB.GetMembers(), aSize+bSize)
}

View file

@ -0,0 +1,147 @@
//go:build !solution
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.19.6
// source: meshpb/protocol.proto
package meshpb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// PeerMeta is arbitary message that is propageted with peer gossip.
type PeerMeta struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}
func (x *PeerMeta) Reset() {
*x = PeerMeta{}
if protoimpl.UnsafeEnabled {
mi := &file_meshpb_protocol_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PeerMeta) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PeerMeta) ProtoMessage() {}
func (x *PeerMeta) ProtoReflect() protoreflect.Message {
mi := &file_meshpb_protocol_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PeerMeta.ProtoReflect.Descriptor instead.
func (*PeerMeta) Descriptor() ([]byte, []int) {
return file_meshpb_protocol_proto_rawDescGZIP(), []int{0}
}
func (x *PeerMeta) GetName() string {
if x != nil {
return x.Name
}
return ""
}
var File_meshpb_protocol_proto protoreflect.FileDescriptor
var file_meshpb_protocol_proto_rawDesc = []byte{
0x0a, 0x15, 0x6d, 0x65, 0x73, 0x68, 0x70, 0x62, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f,
0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1e, 0x0a, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4d,
0x65, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x32, 0x0f, 0x0a, 0x0d, 0x47, 0x6f, 0x73, 0x73, 0x69,
0x70, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x42, 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x6c,
0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6c, 0x6f, 0x6e, 0x2f, 0x73, 0x68, 0x61, 0x64,
0x2d, 0x67, 0x6f, 0x2f, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2f, 0x6d, 0x65, 0x73, 0x68, 0x70,
0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_meshpb_protocol_proto_rawDescOnce sync.Once
file_meshpb_protocol_proto_rawDescData = file_meshpb_protocol_proto_rawDesc
)
func file_meshpb_protocol_proto_rawDescGZIP() []byte {
file_meshpb_protocol_proto_rawDescOnce.Do(func() {
file_meshpb_protocol_proto_rawDescData = protoimpl.X.CompressGZIP(file_meshpb_protocol_proto_rawDescData)
})
return file_meshpb_protocol_proto_rawDescData
}
var file_meshpb_protocol_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_meshpb_protocol_proto_goTypes = []interface{}{
(*PeerMeta)(nil), // 0: PeerMeta
}
var file_meshpb_protocol_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_meshpb_protocol_proto_init() }
func file_meshpb_protocol_proto_init() {
if File_meshpb_protocol_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_meshpb_protocol_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PeerMeta); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_meshpb_protocol_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_meshpb_protocol_proto_goTypes,
DependencyIndexes: file_meshpb_protocol_proto_depIdxs,
MessageInfos: file_meshpb_protocol_proto_msgTypes,
}.Build()
File_meshpb_protocol_proto = out.File
file_meshpb_protocol_proto_rawDesc = nil
file_meshpb_protocol_proto_goTypes = nil
file_meshpb_protocol_proto_depIdxs = nil
}

View file

@ -0,0 +1,13 @@
//go:build !solution
syntax = "proto3";
option go_package = "gitlab.com/slon/shad-go/gossip/meshpb";
// PeerMeta is arbitary message that is propageted with peer gossip.
message PeerMeta {
string name = 1;
}
service GossipService {
}

View file

@ -0,0 +1,69 @@
//go:build !solution
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v3.19.6
// source: meshpb/protocol.proto
package meshpb
import (
grpc "google.golang.org/grpc"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
const ()
// GossipServiceClient is the client API for GossipService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type GossipServiceClient interface {
}
type gossipServiceClient struct {
cc grpc.ClientConnInterface
}
func NewGossipServiceClient(cc grpc.ClientConnInterface) GossipServiceClient {
return &gossipServiceClient{cc}
}
// GossipServiceServer is the server API for GossipService service.
// All implementations must embed UnimplementedGossipServiceServer
// for forward compatibility
type GossipServiceServer interface {
mustEmbedUnimplementedGossipServiceServer()
}
// UnimplementedGossipServiceServer must be embedded to have forward compatible implementations.
type UnimplementedGossipServiceServer struct {
}
func (UnimplementedGossipServiceServer) mustEmbedUnimplementedGossipServiceServer() {}
// UnsafeGossipServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to GossipServiceServer will
// result in compilation errors.
type UnsafeGossipServiceServer interface {
mustEmbedUnimplementedGossipServiceServer()
}
func RegisterGossipServiceServer(s grpc.ServiceRegistrar, srv GossipServiceServer) {
s.RegisterService(&GossipService_ServiceDesc, srv)
}
// GossipService_ServiceDesc is the grpc.ServiceDesc for GossipService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var GossipService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "GossipService",
HandlerType: (*GossipServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{},
Metadata: "meshpb/protocol.proto",
}