diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index a99a7516ad9864a15a2eef1a50f321ffaa2dda37..7fa7e9416a570e16a0f53c3aad324560800f558e 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -37,6 +37,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mdfile" nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" @@ -539,6 +540,12 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("resolve backup locator: %w", err) } } + raftTransport := &raftmgr.GrpcTransport{} + if cfg.Raft.Enabled { + raftManagerRegistry := raftmgr.NewRaftManagerRegistry() + routingTable := raftmgr.NewStaticRaftRoutingTable() + raftTransport = raftmgr.NewGrpcTransport(logger, cfg, routingTable, raftManagerRegistry, conns) + } var bundleURISink *bundleuri.Sink var bundleURIManager *bundleuri.GenerationManager @@ -587,6 +594,7 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { Logger: logger, Cfg: cfg, GitalyHookManager: hookManager, + RaftGrpcTransport: raftTransport, TransactionManager: transactionManager, StorageLocator: locator, ClientPool: conns, diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 1925e140e5772525b61a58dc38e9401b30d0825a..2ffa5af67fb22ea97128b3fcb399116fc0812f47 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" @@ -29,6 +30,7 @@ type Dependencies struct { Logger log.Logger Cfg config.Cfg GitalyHookManager gitalyhook.Manager + RaftGrpcTransport raftmgr.Transport TransactionManager transaction.Manager StorageLocator storage.Locator ClientPool *client.Pool @@ -72,6 +74,11 @@ func (dc *Dependencies) GetTxManager() transaction.Manager { return dc.TransactionManager } +// GetRaftGrpcTransport returns raft transport. +func (dc *Dependencies) GetRaftGrpcTransport() raftmgr.Transport { + return dc.RaftGrpcTransport +} + // GetLocator returns storage locator. func (dc *Dependencies) GetLocator() storage.Locator { return dc.StorageLocator diff --git a/internal/gitaly/service/raft/send_message.go b/internal/gitaly/service/raft/send_message.go new file mode 100644 index 0000000000000000000000000000000000000000..fbc01c140b70ff707228fba5ef269ea95a5a187a --- /dev/null +++ b/internal/gitaly/service/raft/send_message.go @@ -0,0 +1,49 @@ +package raft + +import ( + "errors" + "io" + + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +// SendMessage is a gRPC method for sending a Raft message across nodes. +func (s *Server) SendMessage(stream gitalypb.RaftService_SendMessageServer) error { + for { + req, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return structerr.NewInternal("receive error: %w", err) + } + + // The cluster ID protects Gitaly from cross-cluster interactions, which could potentially corrupt the clusters. + // This is particularly crucial after disaster recovery so that an identical cluster is restored from backup. + if req.GetClusterId() == "" { + return structerr.NewInvalidArgument("cluster_id is required") + } + + // Let's assume we have a single cluster per node for now. + if req.GetClusterId() != s.cfg.Raft.ClusterID { + return structerr.NewPermissionDenied("message from wrong cluster: got %q, want %q", + req.GetClusterId(), s.cfg.Raft.ClusterID) + } + + if req.GetAuthorityName() == "" { + return structerr.NewInvalidArgument("authority_name is required") + } + if req.GetPartitionId() == 0 { + return structerr.NewInvalidArgument("partition_id is required") + } + + raftMsg := req.GetMessage() + + if err := s.transport.Receive(stream.Context(), req.GetAuthorityName(), req.GetPartitionId(), *raftMsg); err != nil { + return structerr.NewInternal("receive error: %w", err) + } + } + + return stream.SendAndClose(&gitalypb.RaftMessageResponse{}) +} diff --git a/internal/gitaly/service/raft/send_message_test.go b/internal/gitaly/service/raft/send_message_test.go new file mode 100644 index 0000000000000000000000000000000000000000..3575662707829a6c24768684199c966b59caa092 --- /dev/null +++ b/internal/gitaly/service/raft/send_message_test.go @@ -0,0 +1,136 @@ +package raft + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "go.etcd.io/etcd/raft/v3/raftpb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +func TestServer_SendMessage(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + cfg.Raft.ClusterID = "test-cluster" + + client := runRaftServer(t, ctx, cfg) + + testCases := []struct { + desc string + req *gitalypb.RaftMessageRequest + expectedGrpcErr codes.Code + expectedError string + }{ + { + desc: "successful message send", + req: &gitalypb.RaftMessageRequest{ + ClusterId: "test-cluster", + AuthorityName: "test-authority", + PartitionId: 1, + Message: &raftpb.Message{ + Type: raftpb.MsgApp, + To: 2, + }, + }, + }, + { + desc: "missing cluster ID", + req: &gitalypb.RaftMessageRequest{ + AuthorityName: "test-authority", + PartitionId: 1, + Message: &raftpb.Message{ + Type: raftpb.MsgApp, + To: 2, + }, + }, + expectedGrpcErr: codes.InvalidArgument, + expectedError: "rpc error: code = InvalidArgument desc = cluster_id is required", + }, + { + desc: "wrong cluster ID", + req: &gitalypb.RaftMessageRequest{ + ClusterId: "wrong-cluster", + AuthorityName: "test-authority", + PartitionId: 1, + Message: &raftpb.Message{ + Type: raftpb.MsgApp, + To: 2, + }, + }, + expectedGrpcErr: codes.PermissionDenied, + expectedError: `rpc error: code = PermissionDenied desc = message from wrong cluster: got "wrong-cluster", want "test-cluster"`, + }, + { + desc: "missing authority name", + req: &gitalypb.RaftMessageRequest{ + ClusterId: "test-cluster", + PartitionId: 1, + Message: &raftpb.Message{ + Type: raftpb.MsgApp, + To: 2, + }, + }, + expectedGrpcErr: codes.InvalidArgument, + expectedError: "rpc error: code = InvalidArgument desc = authority_name is required", + }, + { + desc: "missing partition ID", + req: &gitalypb.RaftMessageRequest{ + ClusterId: "test-cluster", + AuthorityName: "test-authority", + Message: &raftpb.Message{ + Type: raftpb.MsgApp, + To: 2, + }, + }, + expectedGrpcErr: codes.InvalidArgument, + expectedError: "rpc error: code = InvalidArgument desc = partition_id is required", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + stream, err := client.SendMessage(ctx) + require.NoError(t, err) + + require.NoError(t, stream.Send(tc.req)) + + _, err = stream.CloseAndRecv() + if tc.expectedGrpcErr == codes.OK { + require.NoError(t, err) + } else { + testhelper.RequireGrpcCode(t, err, tc.expectedGrpcErr) + require.Contains(t, err.Error(), tc.expectedError) + } + }) + } +} + +func runRaftServer(t *testing.T, ctx context.Context, cfg config.Cfg) gitalypb.RaftServiceClient { + serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { + transport := newMockTransport(t) + deps.RaftGrpcTransport = transport + deps.Cfg = cfg + + gitalypb.RegisterRaftServiceServer(srv, NewServer(deps)) + }, testserver.WithDisablePraefect()) + + cfg.SocketPath = serverSocketPath + + conn := gittest.DialService(t, ctx, cfg) + + return gitalypb.NewRaftServiceClient(conn) +} diff --git a/internal/gitaly/service/raft/server.go b/internal/gitaly/service/raft/server.go new file mode 100644 index 0000000000000000000000000000000000000000..8763c735af54562ab9e15021329d86fe327ac359 --- /dev/null +++ b/internal/gitaly/service/raft/server.go @@ -0,0 +1,26 @@ +package raft + +import ( + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +// Server is a gRPC server for the Raft service. +type Server struct { + gitalypb.UnimplementedRaftServiceServer + logger log.Logger + transport raftmgr.Transport + cfg config.Cfg +} + +// NewServer creates a new Raft gRPC server. +func NewServer(deps *service.Dependencies) *Server { + return &Server{ + logger: deps.GetLogger(), + transport: deps.GetRaftGrpcTransport(), + cfg: deps.GetCfg(), + } +} diff --git a/internal/gitaly/service/raft/testhelper_test.go b/internal/gitaly/service/raft/testhelper_test.go new file mode 100644 index 0000000000000000000000000000000000000000..6de8c2d8d0d33f4dbc0738b6f251bf6703bd3f25 --- /dev/null +++ b/internal/gitaly/service/raft/testhelper_test.go @@ -0,0 +1,32 @@ +package raft + +import ( + "context" + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} + +type mockTransport struct { + t *testing.T + receivedMessage *raftpb.Message +} + +func newMockTransport(t *testing.T) *mockTransport { + return &mockTransport{t: t} +} + +func (m *mockTransport) Receive(ctx context.Context, authorityName string, partitionID uint64, raftMsg raftpb.Message) error { + m.receivedMessage = &raftMsg + return nil +} + +func (m *mockTransport) Send(ctx context.Context, getPath func(lsn storage.LSN) string, partitionID uint64, msgs []raftpb.Message) error { + return nil +} diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index dbffad507df98d77616c63dbf01ca1079ac695c5..732cb31d8bb6dd5c23730ef0dba47bb43bd0c5da 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -17,6 +17,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/objectpool" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/operations" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/partition" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/raft" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/ref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/remote" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository" @@ -56,6 +57,7 @@ var ( func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { gitalypb.RegisterAnalysisServiceServer(srv, analysis.NewServer(deps)) gitalypb.RegisterBlobServiceServer(srv, blob.NewServer(deps)) + gitalypb.RegisterRaftServiceServer(srv, raft.NewServer(deps)) gitalypb.RegisterCleanupServiceServer(srv, cleanup.NewServer(deps)) gitalypb.RegisterCommitServiceServer(srv, commit.NewServer(deps)) gitalypb.RegisterDiffServiceServer(srv, diff.NewServer(deps)) diff --git a/internal/gitaly/storage/raftmgr/registry.go b/internal/gitaly/storage/raftmgr/event_registry.go similarity index 100% rename from internal/gitaly/storage/raftmgr/registry.go rename to internal/gitaly/storage/raftmgr/event_registry.go diff --git a/internal/gitaly/storage/raftmgr/registry_test.go b/internal/gitaly/storage/raftmgr/event_registry_test.go similarity index 100% rename from internal/gitaly/storage/raftmgr/registry_test.go rename to internal/gitaly/storage/raftmgr/event_registry_test.go diff --git a/internal/gitaly/storage/raftmgr/grpc_transport.go b/internal/gitaly/storage/raftmgr/grpc_transport.go new file mode 100644 index 0000000000000000000000000000000000000000..f0e153fb493a6f884f6be39ef83ada51a3bd5aed --- /dev/null +++ b/internal/gitaly/storage/raftmgr/grpc_transport.go @@ -0,0 +1,248 @@ +package raftmgr + +import ( + "archive/tar" + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + + "gitlab.com/gitlab-org/gitaly/v16/internal/archive" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "go.etcd.io/etcd/raft/v3/raftpb" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +// Transport defines the interface for sending Raft protocol messages. +type Transport interface { + // Send dispatches a batch of Raft messages. It returns an error if the sending fails. This function receives a + // context, the list of messages to send and a function that returns the path of WAL directory of a particular + // log entry. The implementation must respect input context's cancellation. + Send(ctx context.Context, walDirForLSN func(storage.LSN) string, partitionID uint64, messages []raftpb.Message) error + // Receive receives a Raft message and processes it. + Receive(ctx context.Context, authorityName string, partitionID uint64, raftMsg raftpb.Message) error +} + +// GrpcTransport is a gRPC transport implementation for sending Raft messages across nodes. +type GrpcTransport struct { + logger log.Logger + cfg config.Cfg + routingTable RoutingTable + registry ManagerRegistry + connectionPool *client.Pool +} + +// NewGrpcTransport creates a new GrpcTransport instance. +func NewGrpcTransport(logger log.Logger, cfg config.Cfg, routingTable RoutingTable, registry ManagerRegistry, conns *client.Pool) *GrpcTransport { + return &GrpcTransport{ + logger: logger, + cfg: cfg, + routingTable: routingTable, + registry: registry, + connectionPool: conns, + } +} + +// Send sends Raft messages to the appropriate nodes. +func (t *GrpcTransport) Send(ctx context.Context, walDirForLSN func(storage.LSN) string, partitionID uint64, messages []raftpb.Message) error { + // Group by destination node + messagesByNode := make(map[uint64][]raftpb.Message) + for i := range messages { + nodeID := messages[i].To + messagesByNode[nodeID] = append(messagesByNode[nodeID], messages[i]) + } + + g := &errgroup.Group{} + + for nodeID, msgs := range messagesByNode { + g.Go(func() error { + err := t.sendToNode(ctx, walDirForLSN, nodeID, partitionID, msgs) + if err != nil { + return err + } + return nil + }) + } + + return g.Wait() +} + +func (t *GrpcTransport) sendToNode(ctx context.Context, walDirForLSN func(storage.LSN) string, nodeID uint64, partitionID uint64, msgs []raftpb.Message) error { + // For now, we are using a static routing table that contains mapping of nodeID to address. In future, the routing + // table can become dynamic so that storage addresses are propagated through gossiping. + addr, err := t.routingTable.Translate(nodeID) + if err != nil { + return fmt.Errorf("translate nodeID %d: %w", nodeID, err) + } + + // Raft messages can contain entries intended for multiple storages. We need to get the storage name for the + // destination node. + storageName, err := t.routingTable.GetStorageName(nodeID) + if err != nil { + return fmt.Errorf("get storage name for nodeID %d: %w", nodeID, err) + } + + // get the connection to the node + conn, err := t.connectionPool.Dial(ctx, addr, t.cfg.Auth.Token) + if err != nil { + return fmt.Errorf("get connection to node %d: %w", nodeID, err) + } + + client := gitalypb.NewRaftServiceClient(conn) + stream, err := client.SendMessage(ctx) + if err != nil { + return fmt.Errorf("create stream to node %d: %w", nodeID, err) + } + + for _, msg := range msgs { + for i := range msg.Entries { + if msg.Entries[i].Type != raftpb.EntryNormal { + continue + } + + var raftMsg gitalypb.RaftEntry + if err := proto.Unmarshal(msg.Entries[i].Data, &raftMsg); err != nil { + return fmt.Errorf("unmarshalling entry type: %w", err) + } + + // Pack the log data if needed + if raftMsg.GetData().GetPacked() == nil { + lsn := storage.LSN(msg.Entries[i].Index) + path := walDirForLSN(lsn) + if err := t.packLogData(ctx, lsn, &raftMsg, path); err != nil { + return fmt.Errorf("packing log data: %w", err) + } + + // Marshal back into entry + data, err := proto.Marshal(&raftMsg) + if err != nil { + return fmt.Errorf("marshal entry: %w", err) + } + msg.Entries[i].Data = data + } + } + + if err := stream.Send(&gitalypb.RaftMessageRequest{ + ClusterId: t.cfg.Raft.ClusterID, + AuthorityName: storageName, + PartitionId: partitionID, + Message: &msg, + }); err != nil { + return fmt.Errorf("send batch to node %d: %w", nodeID, err) + } + } + + if _, err := stream.CloseAndRecv(); err != nil { + return fmt.Errorf("close stream to node %d: %w", nodeID, err) + } + + return nil +} + +func (t *GrpcTransport) packLogData(ctx context.Context, lsn storage.LSN, message *gitalypb.RaftEntry, logEntryPath string) error { + var logData bytes.Buffer + if err := archive.WriteTarball(ctx, t.logger.WithFields(log.Fields{ + "raft.component": "WAL archiver", + "raft.log_entry_lsn": lsn, + "raft.log_entry_path": logEntryPath, + }), &logData, logEntryPath, "."); err != nil { + return fmt.Errorf("archiving WAL log entry: %w", err) + } + message.Data = &gitalypb.RaftEntry_LogData{ + LocalPath: []byte(logEntryPath), + Packed: logData.Bytes(), + } + return nil +} + +// Receive receives a stream of Raft messages and processes them. +func (t *GrpcTransport) Receive(ctx context.Context, authorityName string, partitionID uint64, raftMsg raftpb.Message) error { + // Retrieve the raft manager from the registry, assumption is that all the messages are from the same partition key. + raftManager, err := t.registry.GetManager(PartitionKey{ + authorityName: authorityName, + partitionID: partitionID, + }) + if err != nil { + return status.Errorf(codes.NotFound, "raft manager not found for partition %d: %v", + partitionID, err) + } + + for _, entry := range raftMsg.Entries { + var msg gitalypb.RaftEntry + if err := proto.Unmarshal(entry.Data, &msg); err != nil { + return status.Errorf(codes.InvalidArgument, "failed to unmarshal message: %v", err) + } + + if msg.GetData().GetPacked() != nil { + if err := unpackLogData(&msg, raftManager.GetEntryPath(storage.LSN(entry.Index))); err != nil { + return status.Errorf(codes.Internal, "failed to unpack log data: %v", err) + } + } + } + + // Step messages per partition with their respective entries + if err := raftManager.Step(ctx, raftMsg); err != nil { + return status.Errorf(codes.Internal, "failed to step message: %v", err) + } + + return nil +} + +func unpackLogData(msg *gitalypb.RaftEntry, logEntryPath string) error { + logData := msg.GetData().GetPacked() + + if err := os.MkdirAll(filepath.Dir(logEntryPath), mode.Directory); err != nil { + return fmt.Errorf("creating WAL directory: %w", err) + } + + tarReader := tar.NewReader(bytes.NewReader(logData)) + for { + header, err := tarReader.Next() + if errors.Is(err, io.EOF) { + break + } + + actualName := header.Name + + switch header.Typeflag { + case tar.TypeDir: + // create the directory if not exists + if _, err := os.Stat(filepath.Join(logEntryPath, actualName)); os.IsNotExist(err) { + if err := os.Mkdir(filepath.Join(logEntryPath, actualName), mode.Directory); err != nil { + return fmt.Errorf("creating directory: %w", err) + } + } + case tar.TypeReg: + if err := func() error { + path := filepath.Join(logEntryPath, actualName) + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, mode.File) + if err != nil { + return fmt.Errorf("writing log entry file: %w", err) + } + defer f.Close() + + if _, err := io.Copy(f, tarReader); err != nil { + return fmt.Errorf("writing log entry file: %w", err) + } + + return nil + }(); err != nil { + return err + } + + } + } + + return nil +} diff --git a/internal/gitaly/storage/raftmgr/grpc_transport_test.go b/internal/gitaly/storage/raftmgr/grpc_transport_test.go new file mode 100644 index 0000000000000000000000000000000000000000..774bf6b884ef696e7abf87470a3604ff318408b6 --- /dev/null +++ b/internal/gitaly/storage/raftmgr/grpc_transport_test.go @@ -0,0 +1,319 @@ +package raftmgr + +import ( + "errors" + "fmt" + "io" + "net" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + logger "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "go.etcd.io/etcd/raft/v3/raftpb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +type testNode struct { + id uint64 + name string + transport *GrpcTransport + server *grpc.Server + managerRegistry ManagerRegistry +} + +type cluster struct { + leader *testNode + followers []*testNode +} + +type walEntry struct { + lsn storage.LSN + content string +} + +const ( + walFile = "wal-file" + clusterID = "44c58f50-0a8b-4849-bf8b-d5a56198ea7c" +) + +type mockRaftServer struct { + gitalypb.UnimplementedRaftServiceServer + transport *GrpcTransport +} + +func (s *mockRaftServer) SendMessage(stream gitalypb.RaftService_SendMessageServer) error { + for { + req, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return status.Errorf(codes.Internal, "receive error: %v", err) + } + + raftMsg := req.GetMessage() + + if err := s.transport.Receive(stream.Context(), req.GetAuthorityName(), req.GetPartitionId(), *raftMsg); err != nil { + return status.Errorf(codes.Internal, "receive error: %v", err) + } + } + + return stream.SendAndClose(&gitalypb.RaftMessageResponse{}) +} + +func TestGrpcTransport_SendAndReceive(t *testing.T) { + t.Parallel() + + type setup struct { + name string + numNodes int + removeConn bool + partitionID int + walEntries []walEntry + expectedError string + } + + tests := []setup{ + { + name: "raft messages sent to multiple followers", + numNodes: 3, + partitionID: 1, + walEntries: []walEntry{ + {lsn: storage.LSN(1), content: "content-1"}, + {lsn: storage.LSN(2), content: "content-2"}, + }, + }, + { + name: "raft messages sent to multiple followers with one follower not reachable", + numNodes: 3, + partitionID: 1, + removeConn: true, + walEntries: []walEntry{ + {lsn: storage.LSN(1), content: "random-content"}, + }, + expectedError: "create stream to node 2: rpc error: code = Unavailable desc = last connection error: connection error:", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logger := testhelper.NewLogger(t) + + require.Greater(t, tc.numNodes, 1) + testCluster, routingTable := setupCluster(t, logger, tc.numNodes, tc.partitionID) + leader := testCluster.leader + + if tc.removeConn { + // Stop the server to make it unreachable, by default we remove the first follower + testCluster.followers[0].server.Stop() + } + + t.Cleanup(func() { + for _, follower := range testCluster.followers { + require.NoError(t, follower.transport.connectionPool.Close()) + } + require.NoError(t, leader.transport.connectionPool.Close()) + }) + + leaderStorageName, err := routingTable.GetStorageName(leader.id) + require.NoError(t, err) + + mgr, err := leader.managerRegistry.GetManager(PartitionKey{ + partitionID: uint64(tc.partitionID), + authorityName: leaderStorageName, + }) + require.NoError(t, err) + + // Create test messages + msgs := createTestMessages(t, testCluster, mgr.GetEntryPath, tc.walEntries) + + // Send Message from leader to all followers + err = leader.transport.Send(ctx, mgr.GetEntryPath, 1, msgs) + if tc.expectedError != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedError) + } else { + require.NoError(t, err) + } + + // Verify WAL replication + for i, follower := range testCluster.followers { + followerStorageName, err := routingTable.GetStorageName(follower.id) + require.NoError(t, err) + + mgr, err := follower.managerRegistry.GetManager(PartitionKey{ + partitionID: uint64(tc.partitionID), + authorityName: followerStorageName, + }) + require.NoError(t, err) + + for _, entry := range tc.walEntries { + walPath := mgr.GetEntryPath(entry.lsn) + + if i == 0 && tc.removeConn { + require.NoDirExists(t, walPath, "WAL should not exist on failed follower %s", follower.name) + continue + } + + require.DirExists(t, walPath, "WAL missing on follower %s", follower.name) + content, err := os.ReadFile(filepath.Join(walPath, walFile)) + require.NoError(t, err) + require.Equal(t, entry.content, string(content), "wrong content on follower %s", follower.name) + } + } + }) + } +} + +func setupCluster(t *testing.T, logger logger.LogrusLogger, numNodes int, partitionID int) (*cluster, *staticRaftRoutingTable) { + routingTable := NewStaticRaftRoutingTable() + var servers []*grpc.Server + var listeners []net.Listener + var addresses []string + + createTransport := func(cfg config.Cfg, srv *grpc.Server, listener net.Listener, addr string, registry ManagerRegistry) *GrpcTransport { + pool := client.NewPool(client.WithDialOptions( + client.UnaryInterceptor(), + client.StreamInterceptor(), + )) + + transport := NewGrpcTransport(logger, cfg, routingTable, registry, pool) + testRaftServer := &mockRaftServer{transport: transport} + gitalypb.RegisterRaftServiceServer(srv, testRaftServer) + + go testhelper.MustServe(t, srv, listener) + + t.Cleanup(func() { + srv.GracefulStop() + }) + transport.cfg.SocketPath = addr + return transport + } + + registries := []ManagerRegistry{} + storageNames := []string{} + for i := 0; i < numNodes; i++ { + registries = append(registries, NewRaftManagerRegistry()) + storageNames = append(storageNames, fmt.Sprintf("storage-%d", i+1)) + } + + cluster := &cluster{} + cluster.leader = &testNode{} + cluster.followers = []*testNode{} + + // First set up all servers and fill routing table + for i := range numNodes { + srv, listener, addr := runServer(t) + require.NoError(t, routingTable.AddMember(uint64(i+1), addr, storageNames[i])) + servers = append(servers, srv) + listeners = append(listeners, listener) + addresses = append(addresses, addr) + } + + // create transport interfaces for each registry and setup nodes + for i := range numNodes { + config := testcfg.Build(t) + config.Raft.ClusterID = clusterID + transport := createTransport(config, servers[i], listeners[i], addresses[i], registries[i]) + node := &testNode{ + transport: transport, + server: servers[i], + managerRegistry: registries[i], + name: fmt.Sprintf("gitaly-%d", i+1), + id: uint64(i + 1), + } + + // Create and set up manager + manager := newManager(logger, transport, config) + + // Register the manager with the registry + require.NoError(t, registries[i].RegisterManager(PartitionKey{ + partitionID: uint64(partitionID), + authorityName: storageNames[i], + }, manager)) + + if i == 0 { + cluster.leader = node + } else { + cluster.followers = append(cluster.followers, node) + } + } + + return cluster, routingTable +} + +func newManager(logger logger.LogrusLogger, transport Transport, cfg config.Cfg) RaftManager { + walManager := log.NewManager("default", 1, cfg.Storages[0].Path, cfg.Storages[0].Path, nil, nil) + + return &mockRaftManager{ + logger: logger, + wal: walManager, + transport: transport, + } +} + +func runServer(t *testing.T) (*grpc.Server, net.Listener, string) { + socketPath := testhelper.GetTemporaryGitalySocketFileName(t) + listener, err := net.Listen("unix", socketPath) + require.NoError(t, err) + + srv := grpc.NewServer() + + return srv, listener, "unix://" + socketPath +} + +func createTestMessages(t *testing.T, cluster *cluster, getEntryPath func(storage.LSN) string, entries []walEntry) []raftpb.Message { + var raftEntries []raftpb.Entry + for _, entry := range entries { + // Create WAL directory and file + walDir := getEntryPath(entry.lsn) + require.NoError(t, os.MkdirAll(walDir, mode.Directory)) + walPath := filepath.Join(walDir, walFile) + require.NoError(t, os.WriteFile(walPath, []byte(entry.content), mode.File)) + + // Create Raft entry + entryData, err := proto.Marshal(&gitalypb.RaftEntry{ + Data: &gitalypb.RaftEntry_LogData{ + LocalPath: []byte(walPath), + }, + }) + require.NoError(t, err) + + raftEntries = append(raftEntries, raftpb.Entry{ + Index: uint64(entry.lsn), + Type: raftpb.EntryNormal, + Data: entryData, + }) + } + + // Create messages for all followers + var messages []raftpb.Message + for _, follower := range cluster.followers { + messages = append(messages, raftpb.Message{ + Type: raftpb.MsgApp, + From: cluster.leader.id, + To: follower.id, + Term: 1, + Index: 1, + Entries: raftEntries, + }) + } + + return messages +} diff --git a/internal/gitaly/storage/raftmgr/manager.go b/internal/gitaly/storage/raftmgr/manager.go new file mode 100644 index 0000000000000000000000000000000000000000..91b2a97213a2c1d44410e0c82802d334ad3f8f79 --- /dev/null +++ b/internal/gitaly/storage/raftmgr/manager.go @@ -0,0 +1,14 @@ +package raftmgr + +import ( + "context" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +// RaftManager is an interface that defines the methods to orchestrate the Raft consensus protocol. +type RaftManager interface { + GetEntryPath(storage.LSN) string + Step(ctx context.Context, msg raftpb.Message) error +} diff --git a/internal/gitaly/storage/raftmgr/manager_registry.go b/internal/gitaly/storage/raftmgr/manager_registry.go new file mode 100644 index 0000000000000000000000000000000000000000..a013c29120a58d5762d97ffe54df2ed6f0a97a8a --- /dev/null +++ b/internal/gitaly/storage/raftmgr/manager_registry.go @@ -0,0 +1,50 @@ +package raftmgr + +import ( + "fmt" + "sync" +) + +// PartitionKey is used to uniquely identify a partition. +type PartitionKey struct { + authorityName string + partitionID uint64 +} + +// ManagerRegistry is an interface that defines the methods to register and retrieve managers. +type ManagerRegistry interface { + // GetManager returns the manager for a given partition key. + GetManager(key PartitionKey) (RaftManager, error) + // RegisterManager registers a manager for a given partition key. + RegisterManager(key PartitionKey, manager RaftManager) error +} + +// RaftManagerRegistry is a concrete implementation of the ManagerRegistry interface. +type raftManagerRegistry struct { + managers *sync.Map +} + +// NewRaftManagerRegistry creates a new RaftManagerRegistry. +func NewRaftManagerRegistry() *raftManagerRegistry { + return &raftManagerRegistry{managers: &sync.Map{}} +} + +// GetManager returns the manager for a given partitionKey. +func (r *raftManagerRegistry) GetManager(key PartitionKey) (RaftManager, error) { + r.managers.Range(func(k, v any) bool { + fmt.Printf("key %+v value %+v\n", k, v) + return true + }) + if mgr, ok := r.managers.Load(key); ok { + return mgr.(RaftManager), nil + } + return nil, fmt.Errorf("no manager found for partition key %+v", key) +} + +// RegisterManager registers a manager for a given partitionKey. +func (r *raftManagerRegistry) RegisterManager(key PartitionKey, manager RaftManager) error { + if _, loaded := r.managers.LoadOrStore(key, manager); loaded { + return fmt.Errorf("manager already registered for partition key %+v", key) + } + return nil +} diff --git a/internal/gitaly/storage/raftmgr/routing.go b/internal/gitaly/storage/raftmgr/routing.go new file mode 100644 index 0000000000000000000000000000000000000000..ea46a415cdae67a5abb411c8254b4230d4d3a654 --- /dev/null +++ b/internal/gitaly/storage/raftmgr/routing.go @@ -0,0 +1,52 @@ +package raftmgr + +import ( + "fmt" + "sync" +) + +// RoutingTable handles translation between node IDs and addresses +type RoutingTable interface { + Translate(nodeID uint64) (string, error) + AddMember(nodeID uint64, address string, storageName string) error + GetStorageName(nodeID uint64) (string, error) +} + +// StaticRaftRoutingTable is an implementation of the RoutingTable interface. +// It maps node IDs to their corresponding addresses. +type staticRaftRoutingTable struct { + members sync.Map + storageNames sync.Map +} + +// NewStaticRaftRoutingTable creates a new staticRaftRoutingTable. +func NewStaticRaftRoutingTable() *staticRaftRoutingTable { + return &staticRaftRoutingTable{members: sync.Map{}, storageNames: sync.Map{}} +} + +// AddMember adds the mapping between nodeID, address, and storageName to the routing table. +func (r *staticRaftRoutingTable) AddMember(nodeID uint64, address string, storageName string) error { + if _, ok := r.members.Load(nodeID); !ok { + r.members.Store(nodeID, address) + r.storageNames.Store(nodeID, storageName) + } else { + return fmt.Errorf("node ID %d already exists in routing table", nodeID) + } + return nil +} + +// GetStorageName returns the storage name for a given node ID. +func (r *staticRaftRoutingTable) GetStorageName(nodeID uint64) (string, error) { + if name, ok := r.storageNames.Load(nodeID); ok { + return name.(string), nil + } + return "", fmt.Errorf("no storage name found for nodeID %d", nodeID) +} + +// Translate converts a node ID to its network address. +func (r *staticRaftRoutingTable) Translate(nodeID uint64) (string, error) { + if addr, ok := r.members.Load(nodeID); ok { + return addr.(string), nil + } + return "", fmt.Errorf("no address found for nodeID %d", nodeID) +} diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index c80ed0e3813c41269f68102742adce1594ed4520..b42ec1cade20ee61fca8cdac5528faa33375be4f 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -1,11 +1,31 @@ package raftmgr import ( + "context" "testing" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + logger "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "go.etcd.io/etcd/raft/v3/raftpb" ) func TestMain(m *testing.M) { testhelper.Run(m) } + +type mockRaftManager struct { + logger logger.LogrusLogger + transport Transport + wal storage.LogManager +} + +// EntryPath returns an absolute path to a given log entry's WAL files. +func (m *mockRaftManager) GetEntryPath(lsn storage.LSN) string { + return m.wal.GetEntryPath(lsn) +} + +// Step is a mock implementation of the raft.Node.Step method. +func (m *mockRaftManager) Step(ctx context.Context, msg raftpb.Message) error { + return nil +} diff --git a/internal/gitaly/storage/raftmgr/transport.go b/internal/gitaly/storage/raftmgr/transport.go deleted file mode 100644 index 279ffdd2405c63cf56b1a3e831f6f633a70f470e..0000000000000000000000000000000000000000 --- a/internal/gitaly/storage/raftmgr/transport.go +++ /dev/null @@ -1,119 +0,0 @@ -package raftmgr - -import ( - "bytes" - "context" - "fmt" - - "gitlab.com/gitlab-org/gitaly/v16/internal/archive" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" - "go.etcd.io/etcd/raft/v3/raftpb" - "google.golang.org/protobuf/proto" -) - -// Transport defines the interface for sending Raft protocol messages. -type Transport interface { - // Send dispatches a batch of Raft messages. It returns an error if the sending fails. This function receives a - // context, the list of messages to send and a function that returns the path of WAL directory of a particular - // log entry. The implementation must respect input context's cancellation. - Send(ctx context.Context, walDirForLSN func(storage.LSN) string, messages []raftpb.Message) error - - // GetRecordedMessages retrieves all recorded messages if recording is enabled. - // This is typically used in a testing environment to verify message transmission. - GetRecordedMessages() []raftpb.Message -} - -// NoopTransport is a transport implementation that logs messages and optionally records them. -// It is useful in testing environments where message delivery is non-functional but needs to be observed. -type NoopTransport struct { - logger log.Logger // Logger for outputting message information - recordTransport bool // Flag indicating whether message recording is enabled - recordedMessages []*raftpb.Message // Slice to store recorded messages -} - -// NewNoopTransport constructs a new NoopTransport instance. -// The logger is used for logging message information, and the recordTransport flag -// determines whether messages should be recorded. -func NewNoopTransport(logger log.Logger, recordTransport bool) Transport { - return &NoopTransport{ - logger: logger, - recordTransport: recordTransport, - } -} - -// Send logs each message being sent and records it if recording is enabled. -func (t *NoopTransport) Send(ctx context.Context, pathForLSN func(storage.LSN) string, messages []raftpb.Message) error { - for i := range messages { - for j := range messages[i].Entries { - if messages[i].Entries[j].Type != raftpb.EntryNormal { - continue - } - var msg gitalypb.RaftEntry - - if err := proto.Unmarshal(messages[i].Entries[j].Data, &msg); err != nil { - return fmt.Errorf("unmarshalling entry type: %w", err) - } - - // This is a very native implementation. Noop Transport is only used for testing - // purposes. All external messages are swallowed and stored in a recorder. It packages - // the whole log entry directory as a tar ball using an existing backup utility. The - // resulting binary data is stored inside a subfield of the message for examining - // purpose. A real implementation of Transaction will likely use an optimized method - // (such as sidechannel) to deliver the data. It does not necessarily store the data in - // the memory. - if len(msg.GetData().GetPacked()) == 0 { - lsn := storage.LSN(messages[i].Entries[j].Index) - path := pathForLSN(lsn) - if err := t.packLogData(ctx, lsn, &msg, path); err != nil { - return fmt.Errorf("packing log data: %w", err) - } - } - data, err := proto.Marshal(&msg) - if err != nil { - return fmt.Errorf("marshaling Raft entry: %w", err) - } - messages[i].Entries[j].Data = data - } - - t.logger.WithFields(log.Fields{ - "raft.type": messages[i].Type, - "raft.to": messages[i].To, - "raft.from": messages[i].From, - "raft.term": messages[i].Term, - "raft.num_entries": len(messages[i].Entries), - }).Info("sending message") - - // Record messages if recording is enabled. - if t.recordTransport { - t.recordedMessages = append(t.recordedMessages, &messages[i]) - } - } - return nil -} - -func (t *NoopTransport) packLogData(ctx context.Context, lsn storage.LSN, message *gitalypb.RaftEntry, logEntryPath string) error { - var logData bytes.Buffer - if err := archive.WriteTarball(ctx, t.logger.WithFields(log.Fields{ - "raft.component": "WAL archiver", - "raft.log_entry_lsn": lsn, - "raft.log_entry_path": logEntryPath, - }), &logData, logEntryPath, "."); err != nil { - return fmt.Errorf("archiving WAL log entry") - } - message.Data = &gitalypb.RaftEntry_LogData{ - LocalPath: message.GetData().GetLocalPath(), - Packed: logData.Bytes(), - } - return nil -} - -// GetRecordedMessages returns the list of recorded messages. -func (t *NoopTransport) GetRecordedMessages() []raftpb.Message { - messages := make([]raftpb.Message, 0, len(t.recordedMessages)) - for _, m := range t.recordedMessages { - messages = append(messages, *m) - } - return messages -} diff --git a/internal/gitaly/storage/raftmgr/transport_test.go b/internal/gitaly/storage/raftmgr/transport_test.go deleted file mode 100644 index 876814f9d58147bf82cbf3388b3e5f9402746b35..0000000000000000000000000000000000000000 --- a/internal/gitaly/storage/raftmgr/transport_test.go +++ /dev/null @@ -1,164 +0,0 @@ -package raftmgr - -import ( - "bytes" - "fmt" - "io/fs" - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/archive" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" - "go.etcd.io/etcd/raft/v3/raftpb" - "google.golang.org/protobuf/proto" -) - -func TestNoopTransport_Send(t *testing.T) { - t.Parallel() - - mustMarshalProto := func(msg proto.Message) []byte { - data, err := proto.Marshal(msg) - if err != nil { - panic(fmt.Sprintf("failed to marshal proto: %v", err)) - } - return data - } - - tests := []struct { - name string - setupFunc func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) - }{ - { - name: "No messages", - setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) { - return []raftpb.Message{}, nil - }, - }, - { - name: "Empty Entries", - setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) { - return []raftpb.Message{ - { - Type: raftpb.MsgApp, - From: 2, - To: 1, - Term: 1, - Entries: []raftpb.Entry{}, // Empty Entries - }, - }, nil - }, - }, - { - name: "Messages with already packed data", - setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) { - initialMessage := gitalypb.RaftEntry{ - Id: 1, - Data: &gitalypb.RaftEntry_LogData{Packed: []byte("already packed data")}, - } - messages := []raftpb.Message{ - { - Type: raftpb.MsgApp, - From: 2, - To: 1, - Term: 1, - Index: 1, - Entries: []raftpb.Entry{{Index: uint64(1), Type: raftpb.EntryNormal, Data: mustMarshalProto(&initialMessage)}}, - }, - } - return messages, nil - }, - }, - { - name: "Messages with referenced data", - setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) { - // Simulate a log entry dir with files - fileContents := testhelper.DirectoryState{ - ".": {Mode: archive.TarFileMode | archive.ExecuteMode | fs.ModeDir}, - "1": {Mode: archive.TarFileMode, Content: []byte("file1 content")}, - "2": {Mode: archive.TarFileMode, Content: []byte("file2 content")}, - "3": {Mode: archive.TarFileMode, Content: []byte("file3 content")}, - } - for name, file := range fileContents { - if file.Content != nil { - content := file.Content.([]byte) - require.NoError(t, os.WriteFile(filepath.Join(tempDir, name), content, 0o644)) - } - } - - initialMessage := gitalypb.RaftEntry{ - Id: 1, - Data: &gitalypb.RaftEntry_LogData{LocalPath: []byte(tempDir)}, - } - - messages := []raftpb.Message{ - { - Type: raftpb.MsgApp, - From: 2, - To: 1, - Term: 1, - Index: 1, - Entries: []raftpb.Entry{ - {Index: uint64(1), Type: raftpb.EntryNormal, Data: mustMarshalProto(&initialMessage)}, - }, - }, - } - return messages, fileContents - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - // Create a temporary directory - tempDir := testhelper.TempDir(t) - - // Execute setup function to prepare messages and any necessary file contents - messages, expectedContents := tc.setupFunc(tempDir) - - // Setup logger and transport - logger := testhelper.SharedLogger(t) - transport := NewNoopTransport(logger, true) - - // Execute the Send operation - require.NoError(t, transport.Send(testhelper.Context(t), func(storage.LSN) string { return tempDir }, messages)) - - // Fetch recorded messages for verification - recordedMessages := transport.GetRecordedMessages() - - require.Len(t, recordedMessages, len(messages)) - - // Messages must be sent in order. - for i := range messages { - require.Equal(t, messages[i].Type, recordedMessages[i].Type) - require.Equal(t, messages[i].From, recordedMessages[i].From) - require.Equal(t, messages[i].To, recordedMessages[i].To) - require.Equal(t, messages[i].Term, recordedMessages[i].Term) - require.Equal(t, messages[i].Index, recordedMessages[i].Index) - - if len(messages[i].Entries) == 0 { - require.Empty(t, recordedMessages[i].Entries) - } else { - var resultMessage gitalypb.RaftEntry - require.NoError(t, proto.Unmarshal(recordedMessages[i].Entries[0].Data, &resultMessage)) - - require.True(t, len(resultMessage.GetData().GetPacked()) > 0, "packed data must have packed type") - tarballData := resultMessage.GetData().GetPacked() - require.NotEmpty(t, tarballData) - - // Optionally verify packed data if expected - if expectedContents != nil { - // Verify tarball content matches expectations - reader := bytes.NewReader(tarballData) - testhelper.RequireTarState(t, reader, expectedContents) - } - } - } - }) - } -} diff --git a/internal/gitaly/storage/storagemgr/middleware.go b/internal/gitaly/storage/storagemgr/middleware.go index 60029409c01c48749ae0df85230f9f0098e7d997..5b41895807f50a4fddc55b81485361f832feb204 100644 --- a/internal/gitaly/storage/storagemgr/middleware.go +++ b/internal/gitaly/storage/storagemgr/middleware.go @@ -45,6 +45,8 @@ var NonTransactionalRPCs = map[string]struct{}{ gitalypb.ServerService_ServerInfo_FullMethodName: {}, gitalypb.ServerService_ReadinessCheck_FullMethodName: {}, gitalypb.ServerService_ServerSignature_FullMethodName: {}, + // This RPC does not need to be transactional as it acts as a forwarder. + gitalypb.RaftService_SendMessage_FullMethodName: {}, } // repositoryCreatingRPCs are all of the RPCs that may create a repository.