diff --git a/gossip/Makefile b/gossip/Makefile new file mode 100644 index 0000000..a2510aa --- /dev/null +++ b/gossip/Makefile @@ -0,0 +1,7 @@ +default: + protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative meshpb/protocol.proto + +solution: + protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative meshpb/solution.proto + +.PHONY: default solution \ No newline at end of file diff --git a/gossip/README.md b/gossip/README.md new file mode 100644 index 0000000..31e5b02 --- /dev/null +++ b/gossip/README.md @@ -0,0 +1,57 @@ +# gossip + +В этой задаче вам нужно придумать и реализовать протокол gossip. + +Gossip-ом называют протоколы, которые применяют в распределённых системах без +центральной конфигурации, чтобы обнаруживать участников протокола. Участники +протокола периодически рассказывают друг другу про своих соседей, так что +каждый "пир" (peer) знает всё про всех. Новому "пир"-у достаточно подключиться +к одному из членов группы, чтобы узнать про всех остальных. + +Протокол вам нужно придумать самим, но он должен соответствовать интерфейсу и +иметь нужные свойства. + + * Каждый участник протокола имеет уникальный идентификатор, совпадающий с его адресом. + Этот адрес/идентификатор передаётся вам в конструкторе. + * Каждый участник знает метаданные про себя. Метаданные описываются типом `*meshpb.PeerMeta`. + Метаданные могут обновляться в процессе работы. + * Каждый участник eventually должен узнать идентификаторы и метаданные всех остальных участников. + Снепшот этой информации должен возвращать метод `GetMembers`. + * Метод `AddSeed` добавляет адрес `seed`-а. По этому адресу может находиться другой участник + (а может и не находиться). Адрес нужно использовать, чтобы пытаться подключиться к группе. + Нельзя считать, что `seed` существует (возвращать его из GetMembers), пока к нему не удалось + подключиться. + * Если какой-то из участников становится недоступным (перестаёт отвечать на RPC вызовы), то он должен + быть удалён из группы (метод `GetMembers` должен перестать возвращать его). + * Сам `Peer` должен реализовывать `GossipService` из `.proto` файла. Конструктор `NewPeer` + не должен запускать горутины, это должен делать метод `Run`. Метод `Stop` должен + завершать все горутины. + +Ваш протокол должен работать поверх GRPC, поэтому в начале ознакомьтесь +с [tutorial](https://grpc.io/docs/languages/go/basics/) по GRPC. + +## Установка protoc + +В этом задании вам нужно будет генерировать код для работы с GRPC. + +Для того, чтобы заработал генератор, вам нужно установить несколько утилит. +Установите их по [инструкции](https://grpc.io/docs/languages/go/quickstart/#prerequisites). +После установки, у вас в PATH должны появиться 3 команды. Проверьте это: + +``` +protoc --help +protoc-gen-go --help +protoc-gen-go-grpc --help +``` + +## Генерация кода + +Описание protobuf сообщений и сервисов находится в файле `meshpb/protocol.proto` + +Команда для генерации записана в `Makefile`. Чтобы запустить её, перейдите в директорию `servicemesh` и выполните команду `make`. + +``` +prime@fedora ~/C/shad-go (master)> cd servicemesh/ +prime@fedora ~/C/s/servicemesh (master)> make +protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative meshpb/protocol.proto +``` diff --git a/gossip/gossip.go b/gossip/gossip.go new file mode 100644 index 0000000..01268e9 --- /dev/null +++ b/gossip/gossip.go @@ -0,0 +1,47 @@ +//go:build !solution + +package gossip + +import ( + "time" + + "gitlab.com/slon/shad-go/gossip/meshpb" + "google.golang.org/grpc" +) + +type PeerConfig struct { + SelfEndpoint string + PingPeriod time.Duration +} + +type Peer struct { + config PeerConfig +} + +func (p *Peer) UpdateMeta(meta *meshpb.PeerMeta) { + panic("implement me") +} + +func (p *Peer) AddSeed(seed string) { + panic("implement me") +} + +func (p *Peer) Addr() string { + return p.config.SelfEndpoint +} + +func (p *Peer) GetMembers() map[string]*meshpb.PeerMeta { + panic("implement me") +} + +func (p *Peer) Run() { + panic("implement me") +} + +func (p *Peer) Stop() { + panic("implement me") +} + +func NewPeer(config PeerConfig) *Peer { + panic("implement me") +} diff --git a/gossip/gossip_test.go b/gossip/gossip_test.go new file mode 100644 index 0000000..7cae31a --- /dev/null +++ b/gossip/gossip_test.go @@ -0,0 +1,182 @@ +package gossip_test + +import ( + "fmt" + "math/rand" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/slon/shad-go/gossip" + "gitlab.com/slon/shad-go/gossip/meshpb" + "go.uber.org/goleak" + "google.golang.org/grpc" +) + +const ( + pingPeriod = time.Millisecond * 10 + waitPeriod = pingPeriod * 10 +) + +type env struct { + newPeer func() (*gossip.Peer, func()) +} + +func newEnv(t *testing.T) *env { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) + + return &env{ + newPeer: func() (*gossip.Peer, func()) { + lsn, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + peer := gossip.NewPeer(gossip.PeerConfig{ + SelfEndpoint: lsn.Addr().String(), + PingPeriod: pingPeriod, + }) + + server := grpc.NewServer() + meshpb.RegisterGossipServiceServer(server, peer) + + go func() { _ = server.Serve(lsn) }() + go peer.Run() + + stop := func() { + server.Stop() + peer.Stop() + } + t.Cleanup(stop) + + return peer, stop + }, + } +} + +func TestGossip_SinglePeer(t *testing.T) { + env := newEnv(t) + + peer0, _ := env.newPeer() + + members := peer0.GetMembers() + require.Contains(t, members, peer0.Addr()) + + peer0.UpdateMeta(&meshpb.PeerMeta{Name: "peer0"}) + + members = peer0.GetMembers() + require.Contains(t, members, peer0.Addr()) + require.Equal(t, "peer0", members[peer0.Addr()].Name) + + deadPeer, stopDeadPeer := env.newPeer() + stopDeadPeer() + + peer0.AddSeed(deadPeer.Addr()) + time.Sleep(waitPeriod) + + require.Len(t, peer0.GetMembers(), 1) +} + +func TestGossip_TwoPeers(t *testing.T) { + env := newEnv(t) + + peer0, _ := env.newPeer() + peer1, stop1 := env.newPeer() + + peer0.AddSeed(peer1.Addr()) + time.Sleep(waitPeriod) + + members0 := peer0.GetMembers() + require.Contains(t, members0, peer0.Addr()) + require.Contains(t, members0, peer1.Addr()) + + members1 := peer1.GetMembers() + require.Contains(t, members1, peer0.Addr()) + require.Contains(t, members1, peer1.Addr()) + + peer1.UpdateMeta(&meshpb.PeerMeta{Name: "bob"}) + time.Sleep(waitPeriod) + require.Equal(t, "bob", peer0.GetMembers()[peer1.Addr()].Name) + + peer1.UpdateMeta(&meshpb.PeerMeta{Name: "sam"}) + time.Sleep(waitPeriod) + require.Equal(t, "sam", peer0.GetMembers()[peer1.Addr()].Name) + + stop1() + time.Sleep(waitPeriod) + + members0 = peer0.GetMembers() + require.NotContains(t, members0, peer1.Addr()) +} + +func TestGossip_ManyPeers(t *testing.T) { + env := newEnv(t) + + seed, stopSeed := env.newPeer() + + var peers []*gossip.Peer + names := map[string]string{} + + for i := 0; i < 10; i++ { + peer, _ := env.newPeer() + peer.AddSeed(seed.Addr()) + peer.UpdateMeta(&meshpb.PeerMeta{Name: fmt.Sprint(i)}) + names[peer.Addr()] = fmt.Sprint(i) + peers = append(peers, peer) + } + + time.Sleep(waitPeriod) + stopSeed() + time.Sleep(waitPeriod) + + for _, peer := range peers { + members := peer.GetMembers() + require.NotContains(t, members, seed.Addr()) + for addr, name := range names { + require.Contains(t, members, addr) + require.Equal(t, members[addr].Name, name) + } + } + + peers[0].UpdateMeta(&meshpb.PeerMeta{Name: "leader"}) + time.Sleep(waitPeriod) + + for _, peer := range peers { + members := peer.GetMembers() + + require.Contains(t, members, peers[0].Addr()) + require.Equal(t, members[peers[0].Addr()].Name, "leader") + } +} + +func TestGossip_Groups(t *testing.T) { + env := newEnv(t) + + aSize, bSize := 1, 1 + seedA, _ := env.newPeer() + seedB, _ := env.newPeer() + + for i := 0; i < 10; i++ { + peer, _ := env.newPeer() + + if rand.Int()%2 == 0 { + peer.AddSeed(seedA.Addr()) + aSize++ + } else { + peer.AddSeed(seedB.Addr()) + bSize++ + } + } + + time.Sleep(waitPeriod) + + require.Len(t, seedA.GetMembers(), aSize) + require.Len(t, seedB.GetMembers(), bSize) + + seedA.AddSeed(seedB.Addr()) + time.Sleep(waitPeriod) + + require.Len(t, seedA.GetMembers(), aSize+bSize) + require.Len(t, seedB.GetMembers(), aSize+bSize) +} diff --git a/gossip/meshpb/protocol.pb.go b/gossip/meshpb/protocol.pb.go new file mode 100644 index 0000000..f1ce329 --- /dev/null +++ b/gossip/meshpb/protocol.pb.go @@ -0,0 +1,147 @@ +//go:build !solution + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.19.6 +// source: meshpb/protocol.proto + +package meshpb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// PeerMeta is arbitary message that is propageted with peer gossip. +type PeerMeta struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` +} + +func (x *PeerMeta) Reset() { + *x = PeerMeta{} + if protoimpl.UnsafeEnabled { + mi := &file_meshpb_protocol_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PeerMeta) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PeerMeta) ProtoMessage() {} + +func (x *PeerMeta) ProtoReflect() protoreflect.Message { + mi := &file_meshpb_protocol_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PeerMeta.ProtoReflect.Descriptor instead. +func (*PeerMeta) Descriptor() ([]byte, []int) { + return file_meshpb_protocol_proto_rawDescGZIP(), []int{0} +} + +func (x *PeerMeta) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +var File_meshpb_protocol_proto protoreflect.FileDescriptor + +var file_meshpb_protocol_proto_rawDesc = []byte{ + 0x0a, 0x15, 0x6d, 0x65, 0x73, 0x68, 0x70, 0x62, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, + 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1e, 0x0a, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4d, + 0x65, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x32, 0x0f, 0x0a, 0x0d, 0x47, 0x6f, 0x73, 0x73, 0x69, + 0x70, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x42, 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x6c, + 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6c, 0x6f, 0x6e, 0x2f, 0x73, 0x68, 0x61, 0x64, + 0x2d, 0x67, 0x6f, 0x2f, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2f, 0x6d, 0x65, 0x73, 0x68, 0x70, + 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_meshpb_protocol_proto_rawDescOnce sync.Once + file_meshpb_protocol_proto_rawDescData = file_meshpb_protocol_proto_rawDesc +) + +func file_meshpb_protocol_proto_rawDescGZIP() []byte { + file_meshpb_protocol_proto_rawDescOnce.Do(func() { + file_meshpb_protocol_proto_rawDescData = protoimpl.X.CompressGZIP(file_meshpb_protocol_proto_rawDescData) + }) + return file_meshpb_protocol_proto_rawDescData +} + +var file_meshpb_protocol_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_meshpb_protocol_proto_goTypes = []interface{}{ + (*PeerMeta)(nil), // 0: PeerMeta +} +var file_meshpb_protocol_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_meshpb_protocol_proto_init() } +func file_meshpb_protocol_proto_init() { + if File_meshpb_protocol_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_meshpb_protocol_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PeerMeta); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_meshpb_protocol_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_meshpb_protocol_proto_goTypes, + DependencyIndexes: file_meshpb_protocol_proto_depIdxs, + MessageInfos: file_meshpb_protocol_proto_msgTypes, + }.Build() + File_meshpb_protocol_proto = out.File + file_meshpb_protocol_proto_rawDesc = nil + file_meshpb_protocol_proto_goTypes = nil + file_meshpb_protocol_proto_depIdxs = nil +} diff --git a/gossip/meshpb/protocol.proto b/gossip/meshpb/protocol.proto new file mode 100644 index 0000000..d418f0a --- /dev/null +++ b/gossip/meshpb/protocol.proto @@ -0,0 +1,13 @@ +//go:build !solution + +syntax = "proto3"; + +option go_package = "gitlab.com/slon/shad-go/gossip/meshpb"; + +// PeerMeta is arbitary message that is propageted with peer gossip. +message PeerMeta { + string name = 1; +} + +service GossipService { +} \ No newline at end of file diff --git a/gossip/meshpb/protocol_grpc.pb.go b/gossip/meshpb/protocol_grpc.pb.go new file mode 100644 index 0000000..9e79d81 --- /dev/null +++ b/gossip/meshpb/protocol_grpc.pb.go @@ -0,0 +1,69 @@ +//go:build !solution + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v3.19.6 +// source: meshpb/protocol.proto + +package meshpb + +import ( + grpc "google.golang.org/grpc" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const () + +// GossipServiceClient is the client API for GossipService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type GossipServiceClient interface { +} + +type gossipServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewGossipServiceClient(cc grpc.ClientConnInterface) GossipServiceClient { + return &gossipServiceClient{cc} +} + +// GossipServiceServer is the server API for GossipService service. +// All implementations must embed UnimplementedGossipServiceServer +// for forward compatibility +type GossipServiceServer interface { + mustEmbedUnimplementedGossipServiceServer() +} + +// UnimplementedGossipServiceServer must be embedded to have forward compatible implementations. +type UnimplementedGossipServiceServer struct { +} + +func (UnimplementedGossipServiceServer) mustEmbedUnimplementedGossipServiceServer() {} + +// UnsafeGossipServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to GossipServiceServer will +// result in compilation errors. +type UnsafeGossipServiceServer interface { + mustEmbedUnimplementedGossipServiceServer() +} + +func RegisterGossipServiceServer(s grpc.ServiceRegistrar, srv GossipServiceServer) { + s.RegisterService(&GossipService_ServiceDesc, srv) +} + +// GossipService_ServiceDesc is the grpc.ServiceDesc for GossipService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var GossipService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "GossipService", + HandlerType: (*GossipServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{}, + Metadata: "meshpb/protocol.proto", +}