From df318e9a07209f8172392e4fa8dbfe604c5ddbd7 Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 7 Jan 2025 02:03:42 +0530 Subject: [PATCH 1/5] Remove no-op transport Let's remove no-op transport as we will implement grpc transport in subsequent commits. --- internal/gitaly/storage/raftmgr/transport.go | 119 ------------- .../gitaly/storage/raftmgr/transport_test.go | 164 ------------------ 2 files changed, 283 deletions(-) delete mode 100644 internal/gitaly/storage/raftmgr/transport.go delete mode 100644 internal/gitaly/storage/raftmgr/transport_test.go diff --git a/internal/gitaly/storage/raftmgr/transport.go b/internal/gitaly/storage/raftmgr/transport.go deleted file mode 100644 index 279ffdd2405..00000000000 --- a/internal/gitaly/storage/raftmgr/transport.go +++ /dev/null @@ -1,119 +0,0 @@ -package raftmgr - -import ( - "bytes" - "context" - "fmt" - - "gitlab.com/gitlab-org/gitaly/v16/internal/archive" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" - "go.etcd.io/etcd/raft/v3/raftpb" - "google.golang.org/protobuf/proto" -) - -// Transport defines the interface for sending Raft protocol messages. -type Transport interface { - // Send dispatches a batch of Raft messages. It returns an error if the sending fails. This function receives a - // context, the list of messages to send and a function that returns the path of WAL directory of a particular - // log entry. The implementation must respect input context's cancellation. - Send(ctx context.Context, walDirForLSN func(storage.LSN) string, messages []raftpb.Message) error - - // GetRecordedMessages retrieves all recorded messages if recording is enabled. - // This is typically used in a testing environment to verify message transmission. - GetRecordedMessages() []raftpb.Message -} - -// NoopTransport is a transport implementation that logs messages and optionally records them. -// It is useful in testing environments where message delivery is non-functional but needs to be observed. -type NoopTransport struct { - logger log.Logger // Logger for outputting message information - recordTransport bool // Flag indicating whether message recording is enabled - recordedMessages []*raftpb.Message // Slice to store recorded messages -} - -// NewNoopTransport constructs a new NoopTransport instance. -// The logger is used for logging message information, and the recordTransport flag -// determines whether messages should be recorded. -func NewNoopTransport(logger log.Logger, recordTransport bool) Transport { - return &NoopTransport{ - logger: logger, - recordTransport: recordTransport, - } -} - -// Send logs each message being sent and records it if recording is enabled. -func (t *NoopTransport) Send(ctx context.Context, pathForLSN func(storage.LSN) string, messages []raftpb.Message) error { - for i := range messages { - for j := range messages[i].Entries { - if messages[i].Entries[j].Type != raftpb.EntryNormal { - continue - } - var msg gitalypb.RaftEntry - - if err := proto.Unmarshal(messages[i].Entries[j].Data, &msg); err != nil { - return fmt.Errorf("unmarshalling entry type: %w", err) - } - - // This is a very native implementation. Noop Transport is only used for testing - // purposes. All external messages are swallowed and stored in a recorder. It packages - // the whole log entry directory as a tar ball using an existing backup utility. The - // resulting binary data is stored inside a subfield of the message for examining - // purpose. A real implementation of Transaction will likely use an optimized method - // (such as sidechannel) to deliver the data. It does not necessarily store the data in - // the memory. - if len(msg.GetData().GetPacked()) == 0 { - lsn := storage.LSN(messages[i].Entries[j].Index) - path := pathForLSN(lsn) - if err := t.packLogData(ctx, lsn, &msg, path); err != nil { - return fmt.Errorf("packing log data: %w", err) - } - } - data, err := proto.Marshal(&msg) - if err != nil { - return fmt.Errorf("marshaling Raft entry: %w", err) - } - messages[i].Entries[j].Data = data - } - - t.logger.WithFields(log.Fields{ - "raft.type": messages[i].Type, - "raft.to": messages[i].To, - "raft.from": messages[i].From, - "raft.term": messages[i].Term, - "raft.num_entries": len(messages[i].Entries), - }).Info("sending message") - - // Record messages if recording is enabled. - if t.recordTransport { - t.recordedMessages = append(t.recordedMessages, &messages[i]) - } - } - return nil -} - -func (t *NoopTransport) packLogData(ctx context.Context, lsn storage.LSN, message *gitalypb.RaftEntry, logEntryPath string) error { - var logData bytes.Buffer - if err := archive.WriteTarball(ctx, t.logger.WithFields(log.Fields{ - "raft.component": "WAL archiver", - "raft.log_entry_lsn": lsn, - "raft.log_entry_path": logEntryPath, - }), &logData, logEntryPath, "."); err != nil { - return fmt.Errorf("archiving WAL log entry") - } - message.Data = &gitalypb.RaftEntry_LogData{ - LocalPath: message.GetData().GetLocalPath(), - Packed: logData.Bytes(), - } - return nil -} - -// GetRecordedMessages returns the list of recorded messages. -func (t *NoopTransport) GetRecordedMessages() []raftpb.Message { - messages := make([]raftpb.Message, 0, len(t.recordedMessages)) - for _, m := range t.recordedMessages { - messages = append(messages, *m) - } - return messages -} diff --git a/internal/gitaly/storage/raftmgr/transport_test.go b/internal/gitaly/storage/raftmgr/transport_test.go deleted file mode 100644 index 876814f9d58..00000000000 --- a/internal/gitaly/storage/raftmgr/transport_test.go +++ /dev/null @@ -1,164 +0,0 @@ -package raftmgr - -import ( - "bytes" - "fmt" - "io/fs" - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/archive" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" - "go.etcd.io/etcd/raft/v3/raftpb" - "google.golang.org/protobuf/proto" -) - -func TestNoopTransport_Send(t *testing.T) { - t.Parallel() - - mustMarshalProto := func(msg proto.Message) []byte { - data, err := proto.Marshal(msg) - if err != nil { - panic(fmt.Sprintf("failed to marshal proto: %v", err)) - } - return data - } - - tests := []struct { - name string - setupFunc func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) - }{ - { - name: "No messages", - setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) { - return []raftpb.Message{}, nil - }, - }, - { - name: "Empty Entries", - setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) { - return []raftpb.Message{ - { - Type: raftpb.MsgApp, - From: 2, - To: 1, - Term: 1, - Entries: []raftpb.Entry{}, // Empty Entries - }, - }, nil - }, - }, - { - name: "Messages with already packed data", - setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) { - initialMessage := gitalypb.RaftEntry{ - Id: 1, - Data: &gitalypb.RaftEntry_LogData{Packed: []byte("already packed data")}, - } - messages := []raftpb.Message{ - { - Type: raftpb.MsgApp, - From: 2, - To: 1, - Term: 1, - Index: 1, - Entries: []raftpb.Entry{{Index: uint64(1), Type: raftpb.EntryNormal, Data: mustMarshalProto(&initialMessage)}}, - }, - } - return messages, nil - }, - }, - { - name: "Messages with referenced data", - setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) { - // Simulate a log entry dir with files - fileContents := testhelper.DirectoryState{ - ".": {Mode: archive.TarFileMode | archive.ExecuteMode | fs.ModeDir}, - "1": {Mode: archive.TarFileMode, Content: []byte("file1 content")}, - "2": {Mode: archive.TarFileMode, Content: []byte("file2 content")}, - "3": {Mode: archive.TarFileMode, Content: []byte("file3 content")}, - } - for name, file := range fileContents { - if file.Content != nil { - content := file.Content.([]byte) - require.NoError(t, os.WriteFile(filepath.Join(tempDir, name), content, 0o644)) - } - } - - initialMessage := gitalypb.RaftEntry{ - Id: 1, - Data: &gitalypb.RaftEntry_LogData{LocalPath: []byte(tempDir)}, - } - - messages := []raftpb.Message{ - { - Type: raftpb.MsgApp, - From: 2, - To: 1, - Term: 1, - Index: 1, - Entries: []raftpb.Entry{ - {Index: uint64(1), Type: raftpb.EntryNormal, Data: mustMarshalProto(&initialMessage)}, - }, - }, - } - return messages, fileContents - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - // Create a temporary directory - tempDir := testhelper.TempDir(t) - - // Execute setup function to prepare messages and any necessary file contents - messages, expectedContents := tc.setupFunc(tempDir) - - // Setup logger and transport - logger := testhelper.SharedLogger(t) - transport := NewNoopTransport(logger, true) - - // Execute the Send operation - require.NoError(t, transport.Send(testhelper.Context(t), func(storage.LSN) string { return tempDir }, messages)) - - // Fetch recorded messages for verification - recordedMessages := transport.GetRecordedMessages() - - require.Len(t, recordedMessages, len(messages)) - - // Messages must be sent in order. - for i := range messages { - require.Equal(t, messages[i].Type, recordedMessages[i].Type) - require.Equal(t, messages[i].From, recordedMessages[i].From) - require.Equal(t, messages[i].To, recordedMessages[i].To) - require.Equal(t, messages[i].Term, recordedMessages[i].Term) - require.Equal(t, messages[i].Index, recordedMessages[i].Index) - - if len(messages[i].Entries) == 0 { - require.Empty(t, recordedMessages[i].Entries) - } else { - var resultMessage gitalypb.RaftEntry - require.NoError(t, proto.Unmarshal(recordedMessages[i].Entries[0].Data, &resultMessage)) - - require.True(t, len(resultMessage.GetData().GetPacked()) > 0, "packed data must have packed type") - tarballData := resultMessage.GetData().GetPacked() - require.NotEmpty(t, tarballData) - - // Optionally verify packed data if expected - if expectedContents != nil { - // Verify tarball content matches expectations - reader := bytes.NewReader(tarballData) - testhelper.RequireTarState(t, reader, expectedContents) - } - } - } - }) - } -} -- GitLab From a923273259c1a89bc701d395a43578c08d63ce07 Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 7 Jan 2025 02:10:01 +0530 Subject: [PATCH 2/5] raft: Add SendMessage RPC to NonTransactionalRPCs The SendMessage RPC doesn't need to be transactionalized so it safely bypass the transaction middleware. --- internal/gitaly/storage/storagemgr/middleware.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/gitaly/storage/storagemgr/middleware.go b/internal/gitaly/storage/storagemgr/middleware.go index 60029409c01..5b41895807f 100644 --- a/internal/gitaly/storage/storagemgr/middleware.go +++ b/internal/gitaly/storage/storagemgr/middleware.go @@ -45,6 +45,8 @@ var NonTransactionalRPCs = map[string]struct{}{ gitalypb.ServerService_ServerInfo_FullMethodName: {}, gitalypb.ServerService_ReadinessCheck_FullMethodName: {}, gitalypb.ServerService_ServerSignature_FullMethodName: {}, + // This RPC does not need to be transactional as it acts as a forwarder. + gitalypb.RaftService_SendMessage_FullMethodName: {}, } // repositoryCreatingRPCs are all of the RPCs that may create a repository. -- GitLab From 9e8dfb40fb56e2cdb6edb0189e18a24ffc17d095 Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 7 Jan 2025 02:26:04 +0530 Subject: [PATCH 3/5] raft: Implement gRPC transport interface This commit introduces a gRPC transport interface for handling Raft communication. It implements the logic for sending and receiving Raft messages, including packing and unpacking log data. Additionally, the RaftManagerRegistry facilitates Raft manager registration and partition lookup. For the address lookup, we introduce a static routing table for now. In future, the table can be dynamic so that the addresses are propagated through gossiping. --- internal/cli/gitaly/serve.go | 8 + internal/gitaly/service/dependencies.go | 7 + .../{registry.go => event_registry.go} | 0 ...egistry_test.go => event_registry_test.go} | 0 .../gitaly/storage/raftmgr/grpc_transport.go | 248 ++++++++++++++++++ internal/gitaly/storage/raftmgr/manager.go | 14 + .../storage/raftmgr/manager_registry.go | 50 ++++ internal/gitaly/storage/raftmgr/routing.go | 52 ++++ 8 files changed, 379 insertions(+) rename internal/gitaly/storage/raftmgr/{registry.go => event_registry.go} (100%) rename internal/gitaly/storage/raftmgr/{registry_test.go => event_registry_test.go} (100%) create mode 100644 internal/gitaly/storage/raftmgr/grpc_transport.go create mode 100644 internal/gitaly/storage/raftmgr/manager.go create mode 100644 internal/gitaly/storage/raftmgr/manager_registry.go create mode 100644 internal/gitaly/storage/raftmgr/routing.go diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index a99a7516ad9..7fa7e9416a5 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -37,6 +37,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mdfile" nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" @@ -539,6 +540,12 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("resolve backup locator: %w", err) } } + raftTransport := &raftmgr.GrpcTransport{} + if cfg.Raft.Enabled { + raftManagerRegistry := raftmgr.NewRaftManagerRegistry() + routingTable := raftmgr.NewStaticRaftRoutingTable() + raftTransport = raftmgr.NewGrpcTransport(logger, cfg, routingTable, raftManagerRegistry, conns) + } var bundleURISink *bundleuri.Sink var bundleURIManager *bundleuri.GenerationManager @@ -587,6 +594,7 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { Logger: logger, Cfg: cfg, GitalyHookManager: hookManager, + RaftGrpcTransport: raftTransport, TransactionManager: transactionManager, StorageLocator: locator, ClientPool: conns, diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 1925e140e57..2ffa5af67fb 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" @@ -29,6 +30,7 @@ type Dependencies struct { Logger log.Logger Cfg config.Cfg GitalyHookManager gitalyhook.Manager + RaftGrpcTransport raftmgr.Transport TransactionManager transaction.Manager StorageLocator storage.Locator ClientPool *client.Pool @@ -72,6 +74,11 @@ func (dc *Dependencies) GetTxManager() transaction.Manager { return dc.TransactionManager } +// GetRaftGrpcTransport returns raft transport. +func (dc *Dependencies) GetRaftGrpcTransport() raftmgr.Transport { + return dc.RaftGrpcTransport +} + // GetLocator returns storage locator. func (dc *Dependencies) GetLocator() storage.Locator { return dc.StorageLocator diff --git a/internal/gitaly/storage/raftmgr/registry.go b/internal/gitaly/storage/raftmgr/event_registry.go similarity index 100% rename from internal/gitaly/storage/raftmgr/registry.go rename to internal/gitaly/storage/raftmgr/event_registry.go diff --git a/internal/gitaly/storage/raftmgr/registry_test.go b/internal/gitaly/storage/raftmgr/event_registry_test.go similarity index 100% rename from internal/gitaly/storage/raftmgr/registry_test.go rename to internal/gitaly/storage/raftmgr/event_registry_test.go diff --git a/internal/gitaly/storage/raftmgr/grpc_transport.go b/internal/gitaly/storage/raftmgr/grpc_transport.go new file mode 100644 index 00000000000..f0e153fb493 --- /dev/null +++ b/internal/gitaly/storage/raftmgr/grpc_transport.go @@ -0,0 +1,248 @@ +package raftmgr + +import ( + "archive/tar" + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + + "gitlab.com/gitlab-org/gitaly/v16/internal/archive" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "go.etcd.io/etcd/raft/v3/raftpb" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +// Transport defines the interface for sending Raft protocol messages. +type Transport interface { + // Send dispatches a batch of Raft messages. It returns an error if the sending fails. This function receives a + // context, the list of messages to send and a function that returns the path of WAL directory of a particular + // log entry. The implementation must respect input context's cancellation. + Send(ctx context.Context, walDirForLSN func(storage.LSN) string, partitionID uint64, messages []raftpb.Message) error + // Receive receives a Raft message and processes it. + Receive(ctx context.Context, authorityName string, partitionID uint64, raftMsg raftpb.Message) error +} + +// GrpcTransport is a gRPC transport implementation for sending Raft messages across nodes. +type GrpcTransport struct { + logger log.Logger + cfg config.Cfg + routingTable RoutingTable + registry ManagerRegistry + connectionPool *client.Pool +} + +// NewGrpcTransport creates a new GrpcTransport instance. +func NewGrpcTransport(logger log.Logger, cfg config.Cfg, routingTable RoutingTable, registry ManagerRegistry, conns *client.Pool) *GrpcTransport { + return &GrpcTransport{ + logger: logger, + cfg: cfg, + routingTable: routingTable, + registry: registry, + connectionPool: conns, + } +} + +// Send sends Raft messages to the appropriate nodes. +func (t *GrpcTransport) Send(ctx context.Context, walDirForLSN func(storage.LSN) string, partitionID uint64, messages []raftpb.Message) error { + // Group by destination node + messagesByNode := make(map[uint64][]raftpb.Message) + for i := range messages { + nodeID := messages[i].To + messagesByNode[nodeID] = append(messagesByNode[nodeID], messages[i]) + } + + g := &errgroup.Group{} + + for nodeID, msgs := range messagesByNode { + g.Go(func() error { + err := t.sendToNode(ctx, walDirForLSN, nodeID, partitionID, msgs) + if err != nil { + return err + } + return nil + }) + } + + return g.Wait() +} + +func (t *GrpcTransport) sendToNode(ctx context.Context, walDirForLSN func(storage.LSN) string, nodeID uint64, partitionID uint64, msgs []raftpb.Message) error { + // For now, we are using a static routing table that contains mapping of nodeID to address. In future, the routing + // table can become dynamic so that storage addresses are propagated through gossiping. + addr, err := t.routingTable.Translate(nodeID) + if err != nil { + return fmt.Errorf("translate nodeID %d: %w", nodeID, err) + } + + // Raft messages can contain entries intended for multiple storages. We need to get the storage name for the + // destination node. + storageName, err := t.routingTable.GetStorageName(nodeID) + if err != nil { + return fmt.Errorf("get storage name for nodeID %d: %w", nodeID, err) + } + + // get the connection to the node + conn, err := t.connectionPool.Dial(ctx, addr, t.cfg.Auth.Token) + if err != nil { + return fmt.Errorf("get connection to node %d: %w", nodeID, err) + } + + client := gitalypb.NewRaftServiceClient(conn) + stream, err := client.SendMessage(ctx) + if err != nil { + return fmt.Errorf("create stream to node %d: %w", nodeID, err) + } + + for _, msg := range msgs { + for i := range msg.Entries { + if msg.Entries[i].Type != raftpb.EntryNormal { + continue + } + + var raftMsg gitalypb.RaftEntry + if err := proto.Unmarshal(msg.Entries[i].Data, &raftMsg); err != nil { + return fmt.Errorf("unmarshalling entry type: %w", err) + } + + // Pack the log data if needed + if raftMsg.GetData().GetPacked() == nil { + lsn := storage.LSN(msg.Entries[i].Index) + path := walDirForLSN(lsn) + if err := t.packLogData(ctx, lsn, &raftMsg, path); err != nil { + return fmt.Errorf("packing log data: %w", err) + } + + // Marshal back into entry + data, err := proto.Marshal(&raftMsg) + if err != nil { + return fmt.Errorf("marshal entry: %w", err) + } + msg.Entries[i].Data = data + } + } + + if err := stream.Send(&gitalypb.RaftMessageRequest{ + ClusterId: t.cfg.Raft.ClusterID, + AuthorityName: storageName, + PartitionId: partitionID, + Message: &msg, + }); err != nil { + return fmt.Errorf("send batch to node %d: %w", nodeID, err) + } + } + + if _, err := stream.CloseAndRecv(); err != nil { + return fmt.Errorf("close stream to node %d: %w", nodeID, err) + } + + return nil +} + +func (t *GrpcTransport) packLogData(ctx context.Context, lsn storage.LSN, message *gitalypb.RaftEntry, logEntryPath string) error { + var logData bytes.Buffer + if err := archive.WriteTarball(ctx, t.logger.WithFields(log.Fields{ + "raft.component": "WAL archiver", + "raft.log_entry_lsn": lsn, + "raft.log_entry_path": logEntryPath, + }), &logData, logEntryPath, "."); err != nil { + return fmt.Errorf("archiving WAL log entry: %w", err) + } + message.Data = &gitalypb.RaftEntry_LogData{ + LocalPath: []byte(logEntryPath), + Packed: logData.Bytes(), + } + return nil +} + +// Receive receives a stream of Raft messages and processes them. +func (t *GrpcTransport) Receive(ctx context.Context, authorityName string, partitionID uint64, raftMsg raftpb.Message) error { + // Retrieve the raft manager from the registry, assumption is that all the messages are from the same partition key. + raftManager, err := t.registry.GetManager(PartitionKey{ + authorityName: authorityName, + partitionID: partitionID, + }) + if err != nil { + return status.Errorf(codes.NotFound, "raft manager not found for partition %d: %v", + partitionID, err) + } + + for _, entry := range raftMsg.Entries { + var msg gitalypb.RaftEntry + if err := proto.Unmarshal(entry.Data, &msg); err != nil { + return status.Errorf(codes.InvalidArgument, "failed to unmarshal message: %v", err) + } + + if msg.GetData().GetPacked() != nil { + if err := unpackLogData(&msg, raftManager.GetEntryPath(storage.LSN(entry.Index))); err != nil { + return status.Errorf(codes.Internal, "failed to unpack log data: %v", err) + } + } + } + + // Step messages per partition with their respective entries + if err := raftManager.Step(ctx, raftMsg); err != nil { + return status.Errorf(codes.Internal, "failed to step message: %v", err) + } + + return nil +} + +func unpackLogData(msg *gitalypb.RaftEntry, logEntryPath string) error { + logData := msg.GetData().GetPacked() + + if err := os.MkdirAll(filepath.Dir(logEntryPath), mode.Directory); err != nil { + return fmt.Errorf("creating WAL directory: %w", err) + } + + tarReader := tar.NewReader(bytes.NewReader(logData)) + for { + header, err := tarReader.Next() + if errors.Is(err, io.EOF) { + break + } + + actualName := header.Name + + switch header.Typeflag { + case tar.TypeDir: + // create the directory if not exists + if _, err := os.Stat(filepath.Join(logEntryPath, actualName)); os.IsNotExist(err) { + if err := os.Mkdir(filepath.Join(logEntryPath, actualName), mode.Directory); err != nil { + return fmt.Errorf("creating directory: %w", err) + } + } + case tar.TypeReg: + if err := func() error { + path := filepath.Join(logEntryPath, actualName) + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, mode.File) + if err != nil { + return fmt.Errorf("writing log entry file: %w", err) + } + defer f.Close() + + if _, err := io.Copy(f, tarReader); err != nil { + return fmt.Errorf("writing log entry file: %w", err) + } + + return nil + }(); err != nil { + return err + } + + } + } + + return nil +} diff --git a/internal/gitaly/storage/raftmgr/manager.go b/internal/gitaly/storage/raftmgr/manager.go new file mode 100644 index 00000000000..91b2a97213a --- /dev/null +++ b/internal/gitaly/storage/raftmgr/manager.go @@ -0,0 +1,14 @@ +package raftmgr + +import ( + "context" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +// RaftManager is an interface that defines the methods to orchestrate the Raft consensus protocol. +type RaftManager interface { + GetEntryPath(storage.LSN) string + Step(ctx context.Context, msg raftpb.Message) error +} diff --git a/internal/gitaly/storage/raftmgr/manager_registry.go b/internal/gitaly/storage/raftmgr/manager_registry.go new file mode 100644 index 00000000000..a013c29120a --- /dev/null +++ b/internal/gitaly/storage/raftmgr/manager_registry.go @@ -0,0 +1,50 @@ +package raftmgr + +import ( + "fmt" + "sync" +) + +// PartitionKey is used to uniquely identify a partition. +type PartitionKey struct { + authorityName string + partitionID uint64 +} + +// ManagerRegistry is an interface that defines the methods to register and retrieve managers. +type ManagerRegistry interface { + // GetManager returns the manager for a given partition key. + GetManager(key PartitionKey) (RaftManager, error) + // RegisterManager registers a manager for a given partition key. + RegisterManager(key PartitionKey, manager RaftManager) error +} + +// RaftManagerRegistry is a concrete implementation of the ManagerRegistry interface. +type raftManagerRegistry struct { + managers *sync.Map +} + +// NewRaftManagerRegistry creates a new RaftManagerRegistry. +func NewRaftManagerRegistry() *raftManagerRegistry { + return &raftManagerRegistry{managers: &sync.Map{}} +} + +// GetManager returns the manager for a given partitionKey. +func (r *raftManagerRegistry) GetManager(key PartitionKey) (RaftManager, error) { + r.managers.Range(func(k, v any) bool { + fmt.Printf("key %+v value %+v\n", k, v) + return true + }) + if mgr, ok := r.managers.Load(key); ok { + return mgr.(RaftManager), nil + } + return nil, fmt.Errorf("no manager found for partition key %+v", key) +} + +// RegisterManager registers a manager for a given partitionKey. +func (r *raftManagerRegistry) RegisterManager(key PartitionKey, manager RaftManager) error { + if _, loaded := r.managers.LoadOrStore(key, manager); loaded { + return fmt.Errorf("manager already registered for partition key %+v", key) + } + return nil +} diff --git a/internal/gitaly/storage/raftmgr/routing.go b/internal/gitaly/storage/raftmgr/routing.go new file mode 100644 index 00000000000..ea46a415cda --- /dev/null +++ b/internal/gitaly/storage/raftmgr/routing.go @@ -0,0 +1,52 @@ +package raftmgr + +import ( + "fmt" + "sync" +) + +// RoutingTable handles translation between node IDs and addresses +type RoutingTable interface { + Translate(nodeID uint64) (string, error) + AddMember(nodeID uint64, address string, storageName string) error + GetStorageName(nodeID uint64) (string, error) +} + +// StaticRaftRoutingTable is an implementation of the RoutingTable interface. +// It maps node IDs to their corresponding addresses. +type staticRaftRoutingTable struct { + members sync.Map + storageNames sync.Map +} + +// NewStaticRaftRoutingTable creates a new staticRaftRoutingTable. +func NewStaticRaftRoutingTable() *staticRaftRoutingTable { + return &staticRaftRoutingTable{members: sync.Map{}, storageNames: sync.Map{}} +} + +// AddMember adds the mapping between nodeID, address, and storageName to the routing table. +func (r *staticRaftRoutingTable) AddMember(nodeID uint64, address string, storageName string) error { + if _, ok := r.members.Load(nodeID); !ok { + r.members.Store(nodeID, address) + r.storageNames.Store(nodeID, storageName) + } else { + return fmt.Errorf("node ID %d already exists in routing table", nodeID) + } + return nil +} + +// GetStorageName returns the storage name for a given node ID. +func (r *staticRaftRoutingTable) GetStorageName(nodeID uint64) (string, error) { + if name, ok := r.storageNames.Load(nodeID); ok { + return name.(string), nil + } + return "", fmt.Errorf("no storage name found for nodeID %d", nodeID) +} + +// Translate converts a node ID to its network address. +func (r *staticRaftRoutingTable) Translate(nodeID uint64) (string, error) { + if addr, ok := r.members.Load(nodeID); ok { + return addr.(string), nil + } + return "", fmt.Errorf("no address found for nodeID %d", nodeID) +} -- GitLab From bbd1ce3bbf49c70eac6cf6594ff134471ba2bf60 Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 7 Jan 2025 02:38:29 +0530 Subject: [PATCH 4/5] raft: Implement SendMessage RPC server Add SendMessage RPC which leverages GrpcTransport.Receive to process incoming Raft messages. The Receive method is resposible for routing messages to the appropriate partition manager, unpacking log data and advancing the Raft state machine. --- internal/gitaly/service/raft/send_message.go | 49 +++++++ .../gitaly/service/raft/send_message_test.go | 136 ++++++++++++++++++ internal/gitaly/service/raft/server.go | 26 ++++ .../gitaly/service/raft/testhelper_test.go | 32 +++++ internal/gitaly/service/setup/register.go | 2 + 5 files changed, 245 insertions(+) create mode 100644 internal/gitaly/service/raft/send_message.go create mode 100644 internal/gitaly/service/raft/send_message_test.go create mode 100644 internal/gitaly/service/raft/server.go create mode 100644 internal/gitaly/service/raft/testhelper_test.go diff --git a/internal/gitaly/service/raft/send_message.go b/internal/gitaly/service/raft/send_message.go new file mode 100644 index 00000000000..fbc01c140b7 --- /dev/null +++ b/internal/gitaly/service/raft/send_message.go @@ -0,0 +1,49 @@ +package raft + +import ( + "errors" + "io" + + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +// SendMessage is a gRPC method for sending a Raft message across nodes. +func (s *Server) SendMessage(stream gitalypb.RaftService_SendMessageServer) error { + for { + req, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return structerr.NewInternal("receive error: %w", err) + } + + // The cluster ID protects Gitaly from cross-cluster interactions, which could potentially corrupt the clusters. + // This is particularly crucial after disaster recovery so that an identical cluster is restored from backup. + if req.GetClusterId() == "" { + return structerr.NewInvalidArgument("cluster_id is required") + } + + // Let's assume we have a single cluster per node for now. + if req.GetClusterId() != s.cfg.Raft.ClusterID { + return structerr.NewPermissionDenied("message from wrong cluster: got %q, want %q", + req.GetClusterId(), s.cfg.Raft.ClusterID) + } + + if req.GetAuthorityName() == "" { + return structerr.NewInvalidArgument("authority_name is required") + } + if req.GetPartitionId() == 0 { + return structerr.NewInvalidArgument("partition_id is required") + } + + raftMsg := req.GetMessage() + + if err := s.transport.Receive(stream.Context(), req.GetAuthorityName(), req.GetPartitionId(), *raftMsg); err != nil { + return structerr.NewInternal("receive error: %w", err) + } + } + + return stream.SendAndClose(&gitalypb.RaftMessageResponse{}) +} diff --git a/internal/gitaly/service/raft/send_message_test.go b/internal/gitaly/service/raft/send_message_test.go new file mode 100644 index 00000000000..35756627078 --- /dev/null +++ b/internal/gitaly/service/raft/send_message_test.go @@ -0,0 +1,136 @@ +package raft + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "go.etcd.io/etcd/raft/v3/raftpb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +func TestServer_SendMessage(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + cfg.Raft.ClusterID = "test-cluster" + + client := runRaftServer(t, ctx, cfg) + + testCases := []struct { + desc string + req *gitalypb.RaftMessageRequest + expectedGrpcErr codes.Code + expectedError string + }{ + { + desc: "successful message send", + req: &gitalypb.RaftMessageRequest{ + ClusterId: "test-cluster", + AuthorityName: "test-authority", + PartitionId: 1, + Message: &raftpb.Message{ + Type: raftpb.MsgApp, + To: 2, + }, + }, + }, + { + desc: "missing cluster ID", + req: &gitalypb.RaftMessageRequest{ + AuthorityName: "test-authority", + PartitionId: 1, + Message: &raftpb.Message{ + Type: raftpb.MsgApp, + To: 2, + }, + }, + expectedGrpcErr: codes.InvalidArgument, + expectedError: "rpc error: code = InvalidArgument desc = cluster_id is required", + }, + { + desc: "wrong cluster ID", + req: &gitalypb.RaftMessageRequest{ + ClusterId: "wrong-cluster", + AuthorityName: "test-authority", + PartitionId: 1, + Message: &raftpb.Message{ + Type: raftpb.MsgApp, + To: 2, + }, + }, + expectedGrpcErr: codes.PermissionDenied, + expectedError: `rpc error: code = PermissionDenied desc = message from wrong cluster: got "wrong-cluster", want "test-cluster"`, + }, + { + desc: "missing authority name", + req: &gitalypb.RaftMessageRequest{ + ClusterId: "test-cluster", + PartitionId: 1, + Message: &raftpb.Message{ + Type: raftpb.MsgApp, + To: 2, + }, + }, + expectedGrpcErr: codes.InvalidArgument, + expectedError: "rpc error: code = InvalidArgument desc = authority_name is required", + }, + { + desc: "missing partition ID", + req: &gitalypb.RaftMessageRequest{ + ClusterId: "test-cluster", + AuthorityName: "test-authority", + Message: &raftpb.Message{ + Type: raftpb.MsgApp, + To: 2, + }, + }, + expectedGrpcErr: codes.InvalidArgument, + expectedError: "rpc error: code = InvalidArgument desc = partition_id is required", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + stream, err := client.SendMessage(ctx) + require.NoError(t, err) + + require.NoError(t, stream.Send(tc.req)) + + _, err = stream.CloseAndRecv() + if tc.expectedGrpcErr == codes.OK { + require.NoError(t, err) + } else { + testhelper.RequireGrpcCode(t, err, tc.expectedGrpcErr) + require.Contains(t, err.Error(), tc.expectedError) + } + }) + } +} + +func runRaftServer(t *testing.T, ctx context.Context, cfg config.Cfg) gitalypb.RaftServiceClient { + serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { + transport := newMockTransport(t) + deps.RaftGrpcTransport = transport + deps.Cfg = cfg + + gitalypb.RegisterRaftServiceServer(srv, NewServer(deps)) + }, testserver.WithDisablePraefect()) + + cfg.SocketPath = serverSocketPath + + conn := gittest.DialService(t, ctx, cfg) + + return gitalypb.NewRaftServiceClient(conn) +} diff --git a/internal/gitaly/service/raft/server.go b/internal/gitaly/service/raft/server.go new file mode 100644 index 00000000000..8763c735af5 --- /dev/null +++ b/internal/gitaly/service/raft/server.go @@ -0,0 +1,26 @@ +package raft + +import ( + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +// Server is a gRPC server for the Raft service. +type Server struct { + gitalypb.UnimplementedRaftServiceServer + logger log.Logger + transport raftmgr.Transport + cfg config.Cfg +} + +// NewServer creates a new Raft gRPC server. +func NewServer(deps *service.Dependencies) *Server { + return &Server{ + logger: deps.GetLogger(), + transport: deps.GetRaftGrpcTransport(), + cfg: deps.GetCfg(), + } +} diff --git a/internal/gitaly/service/raft/testhelper_test.go b/internal/gitaly/service/raft/testhelper_test.go new file mode 100644 index 00000000000..6de8c2d8d0d --- /dev/null +++ b/internal/gitaly/service/raft/testhelper_test.go @@ -0,0 +1,32 @@ +package raft + +import ( + "context" + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} + +type mockTransport struct { + t *testing.T + receivedMessage *raftpb.Message +} + +func newMockTransport(t *testing.T) *mockTransport { + return &mockTransport{t: t} +} + +func (m *mockTransport) Receive(ctx context.Context, authorityName string, partitionID uint64, raftMsg raftpb.Message) error { + m.receivedMessage = &raftMsg + return nil +} + +func (m *mockTransport) Send(ctx context.Context, getPath func(lsn storage.LSN) string, partitionID uint64, msgs []raftpb.Message) error { + return nil +} diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index dbffad507df..732cb31d8bb 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -17,6 +17,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/objectpool" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/operations" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/partition" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/raft" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/ref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/remote" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository" @@ -56,6 +57,7 @@ var ( func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { gitalypb.RegisterAnalysisServiceServer(srv, analysis.NewServer(deps)) gitalypb.RegisterBlobServiceServer(srv, blob.NewServer(deps)) + gitalypb.RegisterRaftServiceServer(srv, raft.NewServer(deps)) gitalypb.RegisterCleanupServiceServer(srv, cleanup.NewServer(deps)) gitalypb.RegisterCommitServiceServer(srv, commit.NewServer(deps)) gitalypb.RegisterDiffServiceServer(srv, diff.NewServer(deps)) -- GitLab From 9ad7de61365b754d06a4389f105d43813874d8a1 Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 7 Jan 2025 02:52:28 +0530 Subject: [PATCH 5/5] raft: Add tests for gRPC transport implementation This commit introduces a mock raft server and node for testing and adds tests to ensure the transport implementation is functioning correctly. --- .../storage/raftmgr/grpc_transport_test.go | 319 ++++++++++++++++++ .../gitaly/storage/raftmgr/testhelper_test.go | 20 ++ 2 files changed, 339 insertions(+) create mode 100644 internal/gitaly/storage/raftmgr/grpc_transport_test.go diff --git a/internal/gitaly/storage/raftmgr/grpc_transport_test.go b/internal/gitaly/storage/raftmgr/grpc_transport_test.go new file mode 100644 index 00000000000..774bf6b884e --- /dev/null +++ b/internal/gitaly/storage/raftmgr/grpc_transport_test.go @@ -0,0 +1,319 @@ +package raftmgr + +import ( + "errors" + "fmt" + "io" + "net" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + logger "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "go.etcd.io/etcd/raft/v3/raftpb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +type testNode struct { + id uint64 + name string + transport *GrpcTransport + server *grpc.Server + managerRegistry ManagerRegistry +} + +type cluster struct { + leader *testNode + followers []*testNode +} + +type walEntry struct { + lsn storage.LSN + content string +} + +const ( + walFile = "wal-file" + clusterID = "44c58f50-0a8b-4849-bf8b-d5a56198ea7c" +) + +type mockRaftServer struct { + gitalypb.UnimplementedRaftServiceServer + transport *GrpcTransport +} + +func (s *mockRaftServer) SendMessage(stream gitalypb.RaftService_SendMessageServer) error { + for { + req, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return status.Errorf(codes.Internal, "receive error: %v", err) + } + + raftMsg := req.GetMessage() + + if err := s.transport.Receive(stream.Context(), req.GetAuthorityName(), req.GetPartitionId(), *raftMsg); err != nil { + return status.Errorf(codes.Internal, "receive error: %v", err) + } + } + + return stream.SendAndClose(&gitalypb.RaftMessageResponse{}) +} + +func TestGrpcTransport_SendAndReceive(t *testing.T) { + t.Parallel() + + type setup struct { + name string + numNodes int + removeConn bool + partitionID int + walEntries []walEntry + expectedError string + } + + tests := []setup{ + { + name: "raft messages sent to multiple followers", + numNodes: 3, + partitionID: 1, + walEntries: []walEntry{ + {lsn: storage.LSN(1), content: "content-1"}, + {lsn: storage.LSN(2), content: "content-2"}, + }, + }, + { + name: "raft messages sent to multiple followers with one follower not reachable", + numNodes: 3, + partitionID: 1, + removeConn: true, + walEntries: []walEntry{ + {lsn: storage.LSN(1), content: "random-content"}, + }, + expectedError: "create stream to node 2: rpc error: code = Unavailable desc = last connection error: connection error:", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logger := testhelper.NewLogger(t) + + require.Greater(t, tc.numNodes, 1) + testCluster, routingTable := setupCluster(t, logger, tc.numNodes, tc.partitionID) + leader := testCluster.leader + + if tc.removeConn { + // Stop the server to make it unreachable, by default we remove the first follower + testCluster.followers[0].server.Stop() + } + + t.Cleanup(func() { + for _, follower := range testCluster.followers { + require.NoError(t, follower.transport.connectionPool.Close()) + } + require.NoError(t, leader.transport.connectionPool.Close()) + }) + + leaderStorageName, err := routingTable.GetStorageName(leader.id) + require.NoError(t, err) + + mgr, err := leader.managerRegistry.GetManager(PartitionKey{ + partitionID: uint64(tc.partitionID), + authorityName: leaderStorageName, + }) + require.NoError(t, err) + + // Create test messages + msgs := createTestMessages(t, testCluster, mgr.GetEntryPath, tc.walEntries) + + // Send Message from leader to all followers + err = leader.transport.Send(ctx, mgr.GetEntryPath, 1, msgs) + if tc.expectedError != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedError) + } else { + require.NoError(t, err) + } + + // Verify WAL replication + for i, follower := range testCluster.followers { + followerStorageName, err := routingTable.GetStorageName(follower.id) + require.NoError(t, err) + + mgr, err := follower.managerRegistry.GetManager(PartitionKey{ + partitionID: uint64(tc.partitionID), + authorityName: followerStorageName, + }) + require.NoError(t, err) + + for _, entry := range tc.walEntries { + walPath := mgr.GetEntryPath(entry.lsn) + + if i == 0 && tc.removeConn { + require.NoDirExists(t, walPath, "WAL should not exist on failed follower %s", follower.name) + continue + } + + require.DirExists(t, walPath, "WAL missing on follower %s", follower.name) + content, err := os.ReadFile(filepath.Join(walPath, walFile)) + require.NoError(t, err) + require.Equal(t, entry.content, string(content), "wrong content on follower %s", follower.name) + } + } + }) + } +} + +func setupCluster(t *testing.T, logger logger.LogrusLogger, numNodes int, partitionID int) (*cluster, *staticRaftRoutingTable) { + routingTable := NewStaticRaftRoutingTable() + var servers []*grpc.Server + var listeners []net.Listener + var addresses []string + + createTransport := func(cfg config.Cfg, srv *grpc.Server, listener net.Listener, addr string, registry ManagerRegistry) *GrpcTransport { + pool := client.NewPool(client.WithDialOptions( + client.UnaryInterceptor(), + client.StreamInterceptor(), + )) + + transport := NewGrpcTransport(logger, cfg, routingTable, registry, pool) + testRaftServer := &mockRaftServer{transport: transport} + gitalypb.RegisterRaftServiceServer(srv, testRaftServer) + + go testhelper.MustServe(t, srv, listener) + + t.Cleanup(func() { + srv.GracefulStop() + }) + transport.cfg.SocketPath = addr + return transport + } + + registries := []ManagerRegistry{} + storageNames := []string{} + for i := 0; i < numNodes; i++ { + registries = append(registries, NewRaftManagerRegistry()) + storageNames = append(storageNames, fmt.Sprintf("storage-%d", i+1)) + } + + cluster := &cluster{} + cluster.leader = &testNode{} + cluster.followers = []*testNode{} + + // First set up all servers and fill routing table + for i := range numNodes { + srv, listener, addr := runServer(t) + require.NoError(t, routingTable.AddMember(uint64(i+1), addr, storageNames[i])) + servers = append(servers, srv) + listeners = append(listeners, listener) + addresses = append(addresses, addr) + } + + // create transport interfaces for each registry and setup nodes + for i := range numNodes { + config := testcfg.Build(t) + config.Raft.ClusterID = clusterID + transport := createTransport(config, servers[i], listeners[i], addresses[i], registries[i]) + node := &testNode{ + transport: transport, + server: servers[i], + managerRegistry: registries[i], + name: fmt.Sprintf("gitaly-%d", i+1), + id: uint64(i + 1), + } + + // Create and set up manager + manager := newManager(logger, transport, config) + + // Register the manager with the registry + require.NoError(t, registries[i].RegisterManager(PartitionKey{ + partitionID: uint64(partitionID), + authorityName: storageNames[i], + }, manager)) + + if i == 0 { + cluster.leader = node + } else { + cluster.followers = append(cluster.followers, node) + } + } + + return cluster, routingTable +} + +func newManager(logger logger.LogrusLogger, transport Transport, cfg config.Cfg) RaftManager { + walManager := log.NewManager("default", 1, cfg.Storages[0].Path, cfg.Storages[0].Path, nil, nil) + + return &mockRaftManager{ + logger: logger, + wal: walManager, + transport: transport, + } +} + +func runServer(t *testing.T) (*grpc.Server, net.Listener, string) { + socketPath := testhelper.GetTemporaryGitalySocketFileName(t) + listener, err := net.Listen("unix", socketPath) + require.NoError(t, err) + + srv := grpc.NewServer() + + return srv, listener, "unix://" + socketPath +} + +func createTestMessages(t *testing.T, cluster *cluster, getEntryPath func(storage.LSN) string, entries []walEntry) []raftpb.Message { + var raftEntries []raftpb.Entry + for _, entry := range entries { + // Create WAL directory and file + walDir := getEntryPath(entry.lsn) + require.NoError(t, os.MkdirAll(walDir, mode.Directory)) + walPath := filepath.Join(walDir, walFile) + require.NoError(t, os.WriteFile(walPath, []byte(entry.content), mode.File)) + + // Create Raft entry + entryData, err := proto.Marshal(&gitalypb.RaftEntry{ + Data: &gitalypb.RaftEntry_LogData{ + LocalPath: []byte(walPath), + }, + }) + require.NoError(t, err) + + raftEntries = append(raftEntries, raftpb.Entry{ + Index: uint64(entry.lsn), + Type: raftpb.EntryNormal, + Data: entryData, + }) + } + + // Create messages for all followers + var messages []raftpb.Message + for _, follower := range cluster.followers { + messages = append(messages, raftpb.Message{ + Type: raftpb.MsgApp, + From: cluster.leader.id, + To: follower.id, + Term: 1, + Index: 1, + Entries: raftEntries, + }) + } + + return messages +} diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index c80ed0e3813..b42ec1cade2 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -1,11 +1,31 @@ package raftmgr import ( + "context" "testing" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + logger "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "go.etcd.io/etcd/raft/v3/raftpb" ) func TestMain(m *testing.M) { testhelper.Run(m) } + +type mockRaftManager struct { + logger logger.LogrusLogger + transport Transport + wal storage.LogManager +} + +// EntryPath returns an absolute path to a given log entry's WAL files. +func (m *mockRaftManager) GetEntryPath(lsn storage.LSN) string { + return m.wal.GetEntryPath(lsn) +} + +// Step is a mock implementation of the raft.Node.Step method. +func (m *mockRaftManager) Step(ctx context.Context, msg raftpb.Message) error { + return nil +} -- GitLab