diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 512eb2c3f044ceda03d963b9e4f901b3f730fdbe..11e2f17bb5cd500de6a10185516331aa8f9f885b 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -440,7 +440,15 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("new node: %w", err) } defer nodeMgr.Close() - node = nodeMgr + + if cfg.Raft.Enabled { + node, err = raftmgr.NewNode(cfg, nodeMgr, logger, dbMgr, conns) + if err != nil { + return fmt.Errorf("new raft node: %w", err) + } + } else { + node = nodeMgr + } txMiddleware = server.TransactionMiddleware{ UnaryInterceptor: storagemgr.NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, txRegistry, node, locator), @@ -554,13 +562,6 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { } } - var raftTransport *raftmgr.GrpcTransport - if cfg.Raft.Enabled { - raftManagerRegistry := raftmgr.NewRaftManagerRegistry() - routingTable := raftmgr.NewStaticRaftRoutingTable() - raftTransport = raftmgr.NewGrpcTransport(logger, cfg, routingTable, raftManagerRegistry, conns) - } - var bundleURIManager *bundleuri.GenerationManager if cfg.BundleURI.GoCloudURL != "" { bundleURISink, err := bundleuri.NewSink(ctx, cfg.BundleURI.GoCloudURL) @@ -622,7 +623,6 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { Logger: logger, Cfg: cfg, GitalyHookManager: hookManager, - RaftGrpcTransport: raftTransport, TransactionManager: transactionManager, StorageLocator: locator, ClientPool: conns, diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 800f902d5aed74c83d6f21e4fe80daafd15d9188..de8741f06d131531d5dde2da279d948afc26b0ca 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -13,7 +13,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" @@ -31,7 +30,6 @@ type Dependencies struct { Logger log.Logger Cfg config.Cfg GitalyHookManager gitalyhook.Manager - RaftGrpcTransport raftmgr.Transport TransactionManager transaction.Manager StorageLocator storage.Locator ClientPool *client.Pool @@ -76,11 +74,6 @@ func (dc *Dependencies) GetTxManager() transaction.Manager { return dc.TransactionManager } -// GetRaftGrpcTransport returns raft transport. -func (dc *Dependencies) GetRaftGrpcTransport() raftmgr.Transport { - return dc.RaftGrpcTransport -} - // GetLocator returns storage locator. func (dc *Dependencies) GetLocator() storage.Locator { return dc.StorageLocator diff --git a/internal/gitaly/service/raft/send_message.go b/internal/gitaly/service/raft/send_message.go index 4dd561dec77bc48c4794fca08ed8a145983797d9..1d54711554e074f6291c9612daaeb789f124cde9 100644 --- a/internal/gitaly/service/raft/send_message.go +++ b/internal/gitaly/service/raft/send_message.go @@ -4,6 +4,7 @@ import ( "errors" "io" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -19,6 +20,11 @@ func (s *Server) SendMessage(stream gitalypb.RaftService_SendMessageServer) erro return structerr.NewInternal("receive error: %w", err) } + replicaID := req.GetReplicaId() + partitionKey := replicaID.GetPartitionKey() + authorityName := partitionKey.GetAuthorityName() + partitionID := partitionKey.GetPartitionId() + // The cluster ID protects Gitaly from cross-cluster interactions, which could potentially corrupt the clusters. // This is particularly crucial after disaster recovery so that an identical cluster is restored from backup. if req.GetClusterId() == "" { @@ -31,16 +37,35 @@ func (s *Server) SendMessage(stream gitalypb.RaftService_SendMessageServer) erro req.GetClusterId(), s.cfg.Raft.ClusterID) } - if req.GetAuthorityName() == "" { + if authorityName == "" { return structerr.NewInvalidArgument("authority_name is required") } - if req.GetPartitionId() == 0 { + if partitionID == 0 { return structerr.NewInvalidArgument("partition_id is required") } - raftMsg := req.GetMessage() + storageName := replicaID.GetStorageName() + node, ok := s.node.(*raftmgr.Node) + if !ok { + return structerr.NewInternal("node is not Raft-enabled") + } + + storageManager, err := node.GetStorage(storageName) + if err != nil { + return structerr.NewInternal("get storage manager: %w", err) + } + + raftStorage, ok := storageManager.(*raftmgr.RaftStorageWrapper) + if !ok { + return structerr.NewInternal("storage is not Raft-enabled") + } + + transport := raftStorage.GetTransport() + if transport == nil { + return structerr.NewInternal("transport not available") + } - if err := s.transport.Receive(stream.Context(), req.GetPartitionId(), req.GetAuthorityName(), *raftMsg); err != nil { + if err := transport.Receive(stream.Context(), partitionKey, *req.GetMessage()); err != nil { return structerr.NewInternal("receive error: %w", err) } } diff --git a/internal/gitaly/service/raft/send_message_test.go b/internal/gitaly/service/raft/send_message_test.go index 3575662707829a6c24768684199c966b59caa092..9c8901abfc259360f6caea04ecb95f9a2f420d43 100644 --- a/internal/gitaly/service/raft/send_message_test.go +++ b/internal/gitaly/service/raft/send_message_test.go @@ -2,12 +2,20 @@ package raft import ( "context" + "path/filepath" "testing" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" @@ -17,14 +25,67 @@ import ( "google.golang.org/grpc/codes" ) +const ( + clusterID = "test-cluster" + authorityName = "test-authority" + storageNameOne = "default" + storageNameTwo = "default-two" +) + func TestServer_SendMessage(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) - cfg := testcfg.Build(t) - cfg.Raft.ClusterID = "test-cluster" + cfg := testcfg.Build(t, testcfg.WithStorages(storageNameOne, storageNameTwo)) + cfg.Raft.ClusterID = clusterID + logger := testhelper.SharedLogger(t) + + // Create unique directory for database + dbPath := testhelper.TempDir(t) + dbMgr, err := databasemgr.NewDBManager( + ctx, + cfg.Storages, + func(logger log.Logger, path string) (keyvalue.Store, error) { + return keyvalue.NewBadgerStore(logger, filepath.Join(dbPath, path)) + }, + helper.NewNullTickerFactory(), + logger, + ) + require.NoError(t, err) + t.Cleanup(dbMgr.Close) + + metrics := storagemgr.NewMetrics(cfg.Prometheus) + nodeMgr, err := node.NewManager(cfg.Storages, storagemgr.NewFactory(logger, dbMgr, nil, 2, metrics)) + require.NoError(t, err) + t.Cleanup(nodeMgr.Close) + + mockNode, err := raftmgr.NewNode(cfg, nodeMgr, log.LogrusLogger{}, dbMgr, nil) + require.NoError(t, err) + + // Register storage one + storage, err := mockNode.GetStorage(storageNameOne) + require.NoError(t, err) - client := runRaftServer(t, ctx, cfg) + registry := storage.(*raftmgr.RaftStorageWrapper).GetManagerRegistry() + raftMgr := &mockRaftManager{} + + partitionKey := &gitalypb.PartitionKey{ + AuthorityName: authorityName, + PartitionId: 1, + } + err = registry.RegisterManager(partitionKey, raftMgr) + require.NoError(t, err) + + // Register storage two + storageTwo, err := mockNode.GetStorage(storageNameTwo) + require.NoError(t, err) + + registryTwo := storageTwo.(*raftmgr.RaftStorageWrapper).GetManagerRegistry() + raftMgrTwo := &mockRaftManager{} + err = registryTwo.RegisterManager(partitionKey, raftMgrTwo) + require.NoError(t, err) + + client := runRaftServer(t, ctx, cfg, mockNode) testCases := []struct { desc string @@ -33,11 +94,33 @@ func TestServer_SendMessage(t *testing.T) { expectedError string }{ { - desc: "successful message send", + desc: "successful message send to storage one", req: &gitalypb.RaftMessageRequest{ - ClusterId: "test-cluster", - AuthorityName: "test-authority", - PartitionId: 1, + ClusterId: "test-cluster", + ReplicaId: &gitalypb.ReplicaID{ + StorageName: storageNameOne, + PartitionKey: &gitalypb.PartitionKey{ + AuthorityName: authorityName, + PartitionId: 1, + }, + }, + Message: &raftpb.Message{ + Type: raftpb.MsgApp, + To: 2, + }, + }, + }, + { + desc: "successful message send to storage two", + req: &gitalypb.RaftMessageRequest{ + ClusterId: "test-cluster", + ReplicaId: &gitalypb.ReplicaID{ + StorageName: storageNameTwo, + PartitionKey: &gitalypb.PartitionKey{ + AuthorityName: authorityName, + PartitionId: 1, + }, + }, Message: &raftpb.Message{ Type: raftpb.MsgApp, To: 2, @@ -47,8 +130,13 @@ func TestServer_SendMessage(t *testing.T) { { desc: "missing cluster ID", req: &gitalypb.RaftMessageRequest{ - AuthorityName: "test-authority", - PartitionId: 1, + ReplicaId: &gitalypb.ReplicaID{ + StorageName: "storage-name", + PartitionKey: &gitalypb.PartitionKey{ + AuthorityName: authorityName, + PartitionId: 1, + }, + }, Message: &raftpb.Message{ Type: raftpb.MsgApp, To: 2, @@ -60,9 +148,14 @@ func TestServer_SendMessage(t *testing.T) { { desc: "wrong cluster ID", req: &gitalypb.RaftMessageRequest{ - ClusterId: "wrong-cluster", - AuthorityName: "test-authority", - PartitionId: 1, + ClusterId: "wrong-cluster", + ReplicaId: &gitalypb.ReplicaID{ + StorageName: "storage-name", + PartitionKey: &gitalypb.PartitionKey{ + AuthorityName: authorityName, + PartitionId: 1, + }, + }, Message: &raftpb.Message{ Type: raftpb.MsgApp, To: 2, @@ -74,8 +167,13 @@ func TestServer_SendMessage(t *testing.T) { { desc: "missing authority name", req: &gitalypb.RaftMessageRequest{ - ClusterId: "test-cluster", - PartitionId: 1, + ClusterId: "test-cluster", + ReplicaId: &gitalypb.ReplicaID{ + StorageName: storageNameOne, + PartitionKey: &gitalypb.PartitionKey{ + PartitionId: 1, + }, + }, Message: &raftpb.Message{ Type: raftpb.MsgApp, To: 2, @@ -87,8 +185,13 @@ func TestServer_SendMessage(t *testing.T) { { desc: "missing partition ID", req: &gitalypb.RaftMessageRequest{ - ClusterId: "test-cluster", - AuthorityName: "test-authority", + ClusterId: "test-cluster", + ReplicaId: &gitalypb.ReplicaID{ + StorageName: storageNameOne, + PartitionKey: &gitalypb.PartitionKey{ + AuthorityName: authorityName, + }, + }, Message: &raftpb.Message{ Type: raftpb.MsgApp, To: 2, @@ -119,12 +222,10 @@ func TestServer_SendMessage(t *testing.T) { } } -func runRaftServer(t *testing.T, ctx context.Context, cfg config.Cfg) gitalypb.RaftServiceClient { +func runRaftServer(t *testing.T, ctx context.Context, cfg config.Cfg, node *raftmgr.Node) gitalypb.RaftServiceClient { serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { - transport := newMockTransport(t) - deps.RaftGrpcTransport = transport deps.Cfg = cfg - + deps.Node = node gitalypb.RegisterRaftServiceServer(srv, NewServer(deps)) }, testserver.WithDisablePraefect()) diff --git a/internal/gitaly/service/raft/server.go b/internal/gitaly/service/raft/server.go index 8763c735af54562ab9e15021329d86fe327ac359..f92f54c0c2a90664b9c4e5130de5e277eacb666f 100644 --- a/internal/gitaly/service/raft/server.go +++ b/internal/gitaly/service/raft/server.go @@ -3,7 +3,7 @@ package raft import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -11,16 +11,16 @@ import ( // Server is a gRPC server for the Raft service. type Server struct { gitalypb.UnimplementedRaftServiceServer - logger log.Logger - transport raftmgr.Transport - cfg config.Cfg + logger log.Logger + node storage.Node + cfg config.Cfg } // NewServer creates a new Raft gRPC server. func NewServer(deps *service.Dependencies) *Server { return &Server{ - logger: deps.GetLogger(), - transport: deps.GetRaftGrpcTransport(), - cfg: deps.GetCfg(), + logger: deps.GetLogger(), + node: deps.GetNode(), + cfg: deps.GetCfg(), } } diff --git a/internal/gitaly/service/raft/testhelper_test.go b/internal/gitaly/service/raft/testhelper_test.go index d19eb4461ec14445a2fa7fc60c12a36f84f652b2..1ee45bc2dfb2c7718be6244a82a4cb04d8f2a70a 100644 --- a/internal/gitaly/service/raft/testhelper_test.go +++ b/internal/gitaly/service/raft/testhelper_test.go @@ -4,7 +4,7 @@ import ( "context" "testing" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "go.etcd.io/etcd/raft/v3/raftpb" ) @@ -13,20 +13,11 @@ func TestMain(m *testing.M) { testhelper.Run(m) } -type mockTransport struct { - t *testing.T - receivedMessage *raftpb.Message +type mockRaftManager struct { + raftmgr.RaftManager } -func newMockTransport(t *testing.T) *mockTransport { - return &mockTransport{t: t} -} - -func (m *mockTransport) Receive(ctx context.Context, partitionID uint64, authorityName string, raftMsg raftpb.Message) error { - m.receivedMessage = &raftMsg - return nil -} - -func (m *mockTransport) Send(ctx context.Context, logReader storage.LogReader, partitionID uint64, authorityName string, msgs []raftpb.Message) error { +// Step is a mock implementation of the raft.Node.Step method. +func (m *mockRaftManager) Step(ctx context.Context, msg raftpb.Message) error { return nil } diff --git a/internal/gitaly/storage/raftmgr/grpc_transport.go b/internal/gitaly/storage/raftmgr/grpc_transport.go index 1ef949464a55706f38cefdaec8882d9975937939..e01c1708c16934f3eefb7dd44ad7479b21776188 100644 --- a/internal/gitaly/storage/raftmgr/grpc_transport.go +++ b/internal/gitaly/storage/raftmgr/grpc_transport.go @@ -30,9 +30,9 @@ type Transport interface { // Send dispatches a batch of Raft messages. It returns an error if the sending fails. This function receives a // context, the list of messages to send and a function that returns the path of WAL directory of a particular // log entry. The implementation must respect input context's cancellation. - Send(ctx context.Context, logReader storage.LogReader, partitionID uint64, authorityName string, messages []raftpb.Message) error + Send(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.PartitionKey, messages []raftpb.Message) error // Receive receives a Raft message and processes it. - Receive(ctx context.Context, partitionID uint64, authorityName string, raftMsg raftpb.Message) error + Receive(ctx context.Context, partitionKey *gitalypb.PartitionKey, raftMsg raftpb.Message) error } // GrpcTransport is a gRPC transport implementation for sending Raft messages across nodes. @@ -57,8 +57,8 @@ func NewGrpcTransport(logger log.Logger, cfg config.Cfg, routingTable RoutingTab } // Send sends Raft messages to the appropriate nodes. -func (t *GrpcTransport) Send(ctx context.Context, logReader storage.LogReader, partitionID uint64, authorityName string, messages []raftpb.Message) error { - messagesByNode, err := t.prepareRaftMessageRequests(ctx, logReader, partitionID, authorityName, messages) +func (t *GrpcTransport) Send(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.PartitionKey, messages []raftpb.Message) error { + messagesByNode, err := t.prepareRaftMessageRequests(ctx, logReader, partitionKey, messages) if err != nil { return fmt.Errorf("preparing raft messages: %w", err) } @@ -66,9 +66,10 @@ func (t *GrpcTransport) Send(ctx context.Context, logReader storage.LogReader, p g := &errgroup.Group{} errCh := make(chan error, len(messagesByNode)) - for nodeID, reqs := range messagesByNode { + for addr, reqs := range messagesByNode { g.Go(func() error { - if err := t.sendToNode(ctx, nodeID, reqs); err != nil { + nodeID := reqs[0].GetReplicaId().GetNodeId() + if err := t.sendToNode(ctx, addr, reqs); err != nil { errCh <- fmt.Errorf("node %d: %w", nodeID, err) return err } @@ -91,11 +92,12 @@ func (t *GrpcTransport) Send(ctx context.Context, logReader storage.LogReader, p return nil } -func (t *GrpcTransport) prepareRaftMessageRequests(ctx context.Context, logReader storage.LogReader, partitionID uint64, authorityName string, msgs []raftpb.Message) (map[uint64][]*gitalypb.RaftMessageRequest, error) { - requests := make([]*gitalypb.RaftMessageRequest, len(msgs)) +func (t *GrpcTransport) prepareRaftMessageRequests(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.PartitionKey, msgs []raftpb.Message) (map[string][]*gitalypb.RaftMessageRequest, error) { + messagesByAddress := make(map[string][]*gitalypb.RaftMessageRequest) + messagesByAddressMutex := sync.Mutex{} g := &errgroup.Group{} - for i, msg := range msgs { + for _, msg := range msgs { g.Go(func() error { for j := range msg.Entries { if msg.Entries[j].Type != raftpb.EntryNormal { @@ -127,13 +129,26 @@ func (t *GrpcTransport) prepareRaftMessageRequests(ctx context.Context, logReade t.mutex.Unlock() } - requests[i] = &gitalypb.RaftMessageRequest{ - ClusterId: t.cfg.Raft.ClusterID, - AuthorityName: authorityName, - PartitionId: partitionID, - Message: &msg, + replica, err := t.routingTable.Translate(partitionKey, msg.To) + if err != nil { + return fmt.Errorf("translate nodeID %d: %w", msg.To, err) } + addr := replica.GetMetadata().GetAddress() + + messagesByAddressMutex.Lock() + // We are not adding address in the request because it will increase the payload size, and + // is not needed on the receiver end. + messagesByAddress[addr] = append(messagesByAddress[addr], &gitalypb.RaftMessageRequest{ + ClusterId: t.cfg.Raft.ClusterID, + ReplicaId: &gitalypb.ReplicaID{ + PartitionKey: partitionKey, + NodeId: msg.To, + StorageName: replica.GetStorageName(), + }, + Message: &msg, + }) + messagesByAddressMutex.Unlock() return nil }) } @@ -143,50 +158,30 @@ func (t *GrpcTransport) prepareRaftMessageRequests(ctx context.Context, logReade return nil, err } - messagesByNode := make(map[uint64][]*gitalypb.RaftMessageRequest) - for _, req := range requests { - nodeID := req.GetMessage().To - messagesByNode[nodeID] = append(messagesByNode[nodeID], req) - } - - return messagesByNode, nil + return messagesByAddress, nil } -func (t *GrpcTransport) sendToNode(ctx context.Context, nodeID uint64, reqs []*gitalypb.RaftMessageRequest) error { - // For now, we are using a static routing table that contains mapping of nodeID to address. In future, the routing - // table can become dynamic so that storage addresses are propagated through gossiping. - authorityName, partitionID := reqs[0].GetAuthorityName(), reqs[0].GetPartitionId() - addr, err := t.routingTable.Translate(RoutingKey{ - partitionKey: PartitionKey{ - authorityName: authorityName, - partitionID: partitionID, - }, - nodeID: nodeID, - }) - if err != nil { - return fmt.Errorf("translate nodeID %d: %w", nodeID, err) - } - +func (t *GrpcTransport) sendToNode(ctx context.Context, addr string, reqs []*gitalypb.RaftMessageRequest) error { // get the connection to the node conn, err := t.connectionPool.Dial(ctx, addr, t.cfg.Auth.Token) if err != nil { - return fmt.Errorf("get connection to node %d: %w", nodeID, err) + return fmt.Errorf("get connection to address %s: %w", addr, err) } client := gitalypb.NewRaftServiceClient(conn) stream, err := client.SendMessage(ctx) if err != nil { - return fmt.Errorf("create stream to node %d: %w", nodeID, err) + return fmt.Errorf("create stream to address %s: %w", addr, err) } for _, req := range reqs { if err := stream.Send(req); err != nil { - return fmt.Errorf("send request to node %d: %w", nodeID, err) + return fmt.Errorf("send request to address %s: %w", addr, err) } } if _, err := stream.CloseAndRecv(); err != nil { - return fmt.Errorf("close stream to node %d: %w", nodeID, err) + return fmt.Errorf("close stream to address %s: %w", addr, err) } return nil @@ -209,15 +204,12 @@ func (t *GrpcTransport) packLogData(ctx context.Context, lsn storage.LSN, messag } // Receive receives a stream of Raft messages and processes them. -func (t *GrpcTransport) Receive(ctx context.Context, partitionID uint64, authorityName string, raftMsg raftpb.Message) error { +func (t *GrpcTransport) Receive(ctx context.Context, partitionKey *gitalypb.PartitionKey, raftMsg raftpb.Message) error { // Retrieve the raft manager from the registry, assumption is that all the messages are from the same partition key. - raftManager, err := t.registry.GetManager(PartitionKey{ - authorityName: authorityName, - partitionID: partitionID, - }) + raftManager, err := t.registry.GetManager(partitionKey) if err != nil { return status.Errorf(codes.NotFound, "raft manager not found for partition %d: %v", - partitionID, err) + partitionKey.GetPartitionId(), err) } for _, entry := range raftMsg.Entries { diff --git a/internal/gitaly/storage/raftmgr/grpc_transport_test.go b/internal/gitaly/storage/raftmgr/grpc_transport_test.go index 2b2db087905348603962a64481d119d1ec4d0386..4697d8eaaab1127a9ad7e67849028bc3b6b4dfa4 100644 --- a/internal/gitaly/storage/raftmgr/grpc_transport_test.go +++ b/internal/gitaly/storage/raftmgr/grpc_transport_test.go @@ -8,11 +8,11 @@ import ( "os" "path/filepath" "testing" - "time" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" @@ -23,24 +23,13 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/health" - "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) -type testNode struct { - id uint64 - name string - transport *GrpcTransport - server *grpc.Server - managerRegistry ManagerRegistry -} - type cluster struct { - leader *testNode - followers []*testNode + leader *mockStorageNode + followers []*mockStorageNode } type walEntry struct { @@ -54,9 +43,21 @@ const ( clusterID = "44c58f50-0a8b-4849-bf8b-d5a56198ea7c" ) +type mockStorageNode struct { + id uint64 + name string + transport *GrpcTransport + server *grpc.Server + managerRegistry ManagerRegistry +} + +func (m mockStorageNode) GetTransport() Transport { + return m.transport +} + type mockRaftServer struct { gitalypb.UnimplementedRaftServiceServer - transport *GrpcTransport + node mockStorageNode } func (s *mockRaftServer) SendMessage(stream gitalypb.RaftService_SendMessageServer) error { @@ -71,7 +72,9 @@ func (s *mockRaftServer) SendMessage(stream gitalypb.RaftService_SendMessageServ raftMsg := req.GetMessage() - if err := s.transport.Receive(stream.Context(), req.GetPartitionId(), req.GetAuthorityName(), *raftMsg); err != nil { + partitionKey := req.GetReplicaId().GetPartitionKey() + + if err := s.node.GetTransport().Receive(stream.Context(), partitionKey, *raftMsg); err != nil { return status.Errorf(codes.Internal, "receive error: %v", err) } } @@ -111,7 +114,7 @@ func TestGrpcTransport_SendAndReceive(t *testing.T) { walEntries: []walEntry{ {lsn: storage.LSN(1), content: "random-content"}, }, - expectedError: "create stream to node 2: rpc error: code = Unavailable desc = last connection error: connection error:", + expectedError: "connect: no such file or directory", }, } @@ -138,17 +141,19 @@ func TestGrpcTransport_SendAndReceive(t *testing.T) { require.NoError(t, leader.transport.connectionPool.Close()) }) - mgr, err := leader.managerRegistry.GetManager(PartitionKey{ - partitionID: uint64(tc.partitionID), - authorityName: storageName, - }) + partitionKey := &gitalypb.PartitionKey{ + PartitionId: uint64(tc.partitionID), + AuthorityName: storageName, + } + + mgr, err := leader.managerRegistry.GetManager(partitionKey) require.NoError(t, err) // Create test messages msgs := createTestMessages(t, testCluster, mgr, tc.walEntries) // Send Message from leader to all followers - err = leader.transport.Send(ctx, mgr, 1, storageName, msgs) + err = leader.transport.Send(ctx, mgr, partitionKey, msgs) if tc.expectedError != "" { require.Error(t, err) require.Contains(t, err.Error(), tc.expectedError) @@ -158,10 +163,7 @@ func TestGrpcTransport_SendAndReceive(t *testing.T) { // Verify WAL replication for i, follower := range testCluster.followers { - mgr, err := follower.managerRegistry.GetManager(PartitionKey{ - partitionID: uint64(tc.partitionID), - authorityName: storageName, - }) + mgr, err := follower.managerRegistry.GetManager(partitionKey) require.NoError(t, err) for _, entry := range tc.walEntries { @@ -183,11 +185,19 @@ func TestGrpcTransport_SendAndReceive(t *testing.T) { } func setupCluster(t *testing.T, logger logger.LogrusLogger, numNodes int, partitionID int) *cluster { - routingTable := NewStaticRaftRoutingTable() + dir := t.TempDir() + kvStore, err := keyvalue.NewBadgerStore(logger, dir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, kvStore.Close()) + }) + var servers []*grpc.Server var listeners []net.Listener var addresses []string + routingTable := NewKVRoutingTable(kvStore) + createTransport := func(cfg config.Cfg, srv *grpc.Server, listener net.Listener, addr string, registry ManagerRegistry) *GrpcTransport { pool := client.NewPool(client.WithDialOptions( client.UnaryInterceptor(), @@ -195,38 +205,18 @@ func setupCluster(t *testing.T, logger logger.LogrusLogger, numNodes int, partit )) transport := NewGrpcTransport(logger, cfg, routingTable, registry, pool) - testRaftServer := &mockRaftServer{transport: transport} + testRaftServer := &mockRaftServer{ + node: mockStorageNode{ + transport: transport, + }, + } gitalypb.RegisterRaftServiceServer(srv, testRaftServer) - healthServer := health.NewServer() - healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) - grpc_health_v1.RegisterHealthServer(srv, healthServer) - - ready := make(chan struct{}) - go testhelper.MustServe(t, srv, listener) - go func() { - ctx := testhelper.Context(t) - require.Eventually(t, func() bool { - // Try to connect to the server - if conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())); err == nil { - defer conn.Close() - - // Check health service - healthClient := grpc_health_v1.NewHealthClient(conn) - resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) - - if err == nil && resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING { - return true - } - } - return false - }, 5*time.Second, 10*time.Millisecond) - close(ready) - }() - <-ready - t.Cleanup(func() { srv.GracefulStop() }) + t.Cleanup(func() { + srv.GracefulStop() + }) transport.cfg.SocketPath = addr return transport } @@ -237,19 +227,12 @@ func setupCluster(t *testing.T, logger logger.LogrusLogger, numNodes int, partit } cluster := &cluster{} - cluster.leader = &testNode{} - cluster.followers = []*testNode{} + cluster.leader = &mockStorageNode{} + cluster.followers = []*mockStorageNode{} // First set up all servers and fill routing table - for i := range numNodes { + for range numNodes { srv, listener, addr := runServer(t) - require.NoError(t, routingTable.AddMember(RoutingKey{ - partitionKey: PartitionKey{ - authorityName: storageName, - partitionID: uint64(partitionID), - }, - nodeID: uint64(i + 1), - }, addr)) servers = append(servers, srv) listeners = append(listeners, listener) addresses = append(addresses, addr) @@ -260,7 +243,7 @@ func setupCluster(t *testing.T, logger logger.LogrusLogger, numNodes int, partit config := testcfg.Build(t) config.Raft.ClusterID = clusterID transport := createTransport(config, servers[i], listeners[i], addresses[i], registries[i]) - node := &testNode{ + node := &mockStorageNode{ transport: transport, server: servers[i], managerRegistry: registries[i], @@ -269,31 +252,68 @@ func setupCluster(t *testing.T, logger logger.LogrusLogger, numNodes int, partit } // Create and set up manager - manager := newManager(logger, transport, config) + manager := newManager(config) + + partitionKey := &gitalypb.PartitionKey{ + AuthorityName: storageName, + PartitionId: uint64(partitionID), + } // Register the manager with the registry - require.NoError(t, registries[i].RegisterManager(PartitionKey{ - partitionID: uint64(partitionID), - authorityName: storageName, - }, manager)) + require.NoError(t, registries[i].RegisterManager(partitionKey, manager)) + nodeID := uint64(i + 1) if i == 0 { cluster.leader = node + raftMgr, err := node.managerRegistry.GetManager(partitionKey) + require.Equal(t, raftMgr, manager) + require.NoError(t, err) + + entry := RoutingTableEntry{ + Replicas: []*gitalypb.ReplicaID{ + { + PartitionKey: partitionKey, + NodeId: nodeID, + Metadata: &gitalypb.ReplicaID_Metadata{ + Address: addresses[i], + }, + }, + }, + Term: 1, + Index: 1, + } + + require.NoError(t, routingTable.UpsertEntry(entry)) + } else { cluster.followers = append(cluster.followers, node) + + // Get existing entry and add the new follower to replicas + existingEntry, err := routingTable.GetEntry(partitionKey) + require.NoError(t, err) + + existingEntry.Index = uint64(i + 1) + existingEntry.Replicas = append(existingEntry.Replicas, &gitalypb.ReplicaID{ + PartitionKey: partitionKey, + NodeId: nodeID, + Metadata: &gitalypb.ReplicaID_Metadata{ + Address: addresses[i], + }, + }) + + require.NoError(t, routingTable.UpsertEntry(*existingEntry)) + } } return cluster } -func newManager(logger logger.LogrusLogger, transport Transport, cfg config.Cfg) RaftManager { +func newManager(cfg config.Cfg) RaftManager { walManager := log.NewManager("default", 1, cfg.Storages[0].Path, cfg.Storages[0].Path, nil, nil) return &mockRaftManager{ - logger: logger, logManager: walManager, - transport: transport, } } diff --git a/internal/gitaly/storage/raftmgr/manager_registry.go b/internal/gitaly/storage/raftmgr/manager_registry.go index 686ca12b47c377a375affed135053288ec76afec..11b24557a01eea7325e2c5e1545883dac0f4f1b4 100644 --- a/internal/gitaly/storage/raftmgr/manager_registry.go +++ b/internal/gitaly/storage/raftmgr/manager_registry.go @@ -3,20 +3,20 @@ package raftmgr import ( "fmt" "sync" + + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) -// PartitionKey is used to uniquely identify a partition. -type PartitionKey struct { - authorityName string - partitionID uint64 +func partitionKeyToString(pk *gitalypb.PartitionKey) string { + return fmt.Sprintf("%d:%s", pk.GetPartitionId(), pk.GetAuthorityName()) } // ManagerRegistry is an interface that defines the methods to register and retrieve managers. type ManagerRegistry interface { // GetManager returns the manager for a given partition key. - GetManager(key PartitionKey) (RaftManager, error) + GetManager(key *gitalypb.PartitionKey) (RaftManager, error) // RegisterManager registers a manager for a given partition key. - RegisterManager(key PartitionKey, manager RaftManager) error + RegisterManager(key *gitalypb.PartitionKey, manager RaftManager) error } // RaftManagerRegistry is a concrete implementation of the ManagerRegistry interface. @@ -30,16 +30,16 @@ func NewRaftManagerRegistry() *raftManagerRegistry { } // GetManager returns the manager for a given partitionKey. -func (r *raftManagerRegistry) GetManager(key PartitionKey) (RaftManager, error) { - if mgr, ok := r.managers.Load(key); ok { +func (r *raftManagerRegistry) GetManager(key *gitalypb.PartitionKey) (RaftManager, error) { + if mgr, ok := r.managers.Load(partitionKeyToString(key)); ok { return mgr.(RaftManager), nil } return nil, fmt.Errorf("no manager found for partition key %+v", key) } // RegisterManager registers a manager for a given partitionKey. -func (r *raftManagerRegistry) RegisterManager(key PartitionKey, manager RaftManager) error { - if _, loaded := r.managers.LoadOrStore(key, manager); loaded { +func (r *raftManagerRegistry) RegisterManager(key *gitalypb.PartitionKey, manager RaftManager) error { + if _, loaded := r.managers.LoadOrStore(partitionKeyToString(key), manager); loaded { return fmt.Errorf("manager already registered for partition key %+v", key) } return nil diff --git a/internal/gitaly/storage/raftmgr/raft_storage_wrapper.go b/internal/gitaly/storage/raftmgr/raft_storage_wrapper.go new file mode 100644 index 0000000000000000000000000000000000000000..18e5b04aae1182bcb9f50d1ebb02e66e656c11a5 --- /dev/null +++ b/internal/gitaly/storage/raftmgr/raft_storage_wrapper.go @@ -0,0 +1,85 @@ +package raftmgr + +import ( + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" +) + +// RaftStorageWrapper wraps a storage.Storage instance with Raft functionality +type RaftStorageWrapper struct { + storage.Storage + transport Transport + routingTable RoutingTable + managerRegistry ManagerRegistry +} + +// GetTransport returns the Raft transport for this storage +func (s *RaftStorageWrapper) GetTransport() Transport { + return s.transport +} + +// GetRoutingTable returns the Raft routing table for this storage +func (s *RaftStorageWrapper) GetRoutingTable() RoutingTable { + return s.routingTable +} + +// GetManagerRegistry returns the Raft manager registry for this storage +func (s *RaftStorageWrapper) GetManagerRegistry() ManagerRegistry { + return s.managerRegistry +} + +// Node wraps a storage.Node instance and adds Raft functionality to each storage +type Node struct { + node storage.Node + storages map[string]*RaftStorageWrapper +} + +// NewNode creates a new Node that wraps the provided storage.Node with Raft functionality +func NewNode(cfg config.Cfg, baseNode storage.Node, logger log.Logger, dbMgr *databasemgr.DBManager, connsPool *client.Pool) (*Node, error) { + n := &Node{ + node: baseNode, + storages: make(map[string]*RaftStorageWrapper), + } + + for _, cfgStorage := range cfg.Storages { + baseStorage, err := baseNode.GetStorage(cfgStorage.Name) + if err != nil { + return nil, fmt.Errorf("get base storage %q: %w", cfgStorage.Name, err) + } + + // Get the storage's KV store for the routing table + kvStore, err := dbMgr.GetDB(cfgStorage.Name) + if err != nil { + return nil, fmt.Errorf("get KV store for storage %q: %w", cfgStorage.Name, err) + } + + // Create per-storage Raft components + routingTable := NewKVRoutingTable(kvStore) + managerRegistry := NewRaftManagerRegistry() + transport := NewGrpcTransport(logger, cfg, routingTable, managerRegistry, connsPool) + + n.storages[cfgStorage.Name] = &RaftStorageWrapper{ + Storage: baseStorage, + transport: transport, + routingTable: routingTable, + managerRegistry: managerRegistry, + } + } + + return n, nil +} + +// GetStorage implements storage.Node interface +func (n *Node) GetStorage(storageName string) (storage.Storage, error) { + wrapper, ok := n.storages[storageName] + if !ok { + return nil, storage.NewStorageNotFoundError(storageName) + } + + return wrapper, nil +} diff --git a/internal/gitaly/storage/raftmgr/routing.go b/internal/gitaly/storage/raftmgr/routing.go index 14b5a69199586b62bd7d6ba31d84d623a29ce897..c8144ba908dc725c65e49ddd19c557254f716f96 100644 --- a/internal/gitaly/storage/raftmgr/routing.go +++ b/internal/gitaly/storage/raftmgr/routing.go @@ -1,47 +1,140 @@ package raftmgr import ( + "encoding/json" + "errors" "fmt" "sync" + + "github.com/dgraph-io/badger/v4" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) +func routingKey(partitionKey *gitalypb.PartitionKey) []byte { + return []byte(fmt.Sprintf("/raft/%s/%d", partitionKey.GetAuthorityName(), partitionKey.GetPartitionId())) +} + +// RoutingTableEntry represents a Raft cluster's routing state for a partition. +// It includes the current leader, all replicas, and Raft consensus metadata. +type RoutingTableEntry struct { + RelativePath string // For backward compatibility + Replicas []*gitalypb.ReplicaID + LeaderID uint64 + Term uint64 + Index uint64 +} + +// ReplicaMetadata contains additional information about a replica +// that is needed for routing messages. +type ReplicaMetadata struct { + Address string +} + // RoutingTable handles translation between node IDs and addresses type RoutingTable interface { - Translate(key RoutingKey) (string, error) - AddMember(key RoutingKey, address string) error + Translate(partitionKey *gitalypb.PartitionKey, nodeID uint64) (*gitalypb.ReplicaID, error) + GetEntry(partitionKey *gitalypb.PartitionKey) (*RoutingTableEntry, error) + UpsertEntry(entry RoutingTableEntry) error } -// StaticRaftRoutingTable is an implementation of the RoutingTable interface. -// It maps node IDs to their corresponding addresses. -type staticRaftRoutingTable struct { - members sync.Map +// PersistentRoutingTable implements the RoutingTable interface with KV storage +type kvRoutingTable struct { + kvStore keyvalue.Transactioner + mutex sync.RWMutex } -// RoutingKey is used to identify destination raft node in the routing table. -type RoutingKey struct { - partitionKey PartitionKey - nodeID uint64 +// NewKVRoutingTable creates a new key-value based routing table implementation +// that persists routing information using badgerDB. +func NewKVRoutingTable(kvStore keyvalue.Store) *kvRoutingTable { + prefix := []byte(fmt.Sprintf("p/%d", storagemgr.MetadataPartitionID)) + prefixedStore := keyvalue.NewPrefixedTransactioner(kvStore, prefix) + return &kvRoutingTable{ + kvStore: prefixedStore, + } } -// NewStaticRaftRoutingTable creates a new staticRaftRoutingTable. -func NewStaticRaftRoutingTable() *staticRaftRoutingTable { - return &staticRaftRoutingTable{members: sync.Map{}} +// UpsertEntry updates or creates a routing table entry +func (r *kvRoutingTable) UpsertEntry(entry RoutingTableEntry) error { + r.mutex.Lock() + defer r.mutex.Unlock() + + return r.kvStore.Update(func(txn keyvalue.ReadWriter) error { + partitionKey := entry.Replicas[0].GetPartitionKey() + key := routingKey(partitionKey) + + item, err := txn.Get(key) + if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { + return fmt.Errorf("get existing entry: %w", err) + } + + var existing *RoutingTableEntry + if item != nil { + existing = &RoutingTableEntry{} + if err := item.Value(func(val []byte) error { + return json.Unmarshal(val, existing) + }); err != nil { + return fmt.Errorf("unmarshal existing entry: %w", err) + } + } + + // Only update if new entry has higher term or index + if existing != nil { + if entry.Term < existing.Term || + (entry.Term == existing.Term && entry.Index <= existing.Index) { + return fmt.Errorf("stale entry: current term=%d,index=%d, new term=%d,index=%d", + existing.Term, existing.Index, entry.Term, entry.Index) + } + } + + data, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("marshal entry: %w", err) + } + + if err := txn.Set(key, data); err != nil { + return fmt.Errorf("set entry: %w", err) + } + + return nil + }) } -// AddMember adds the mapping between nodeID, address, and storageName to the routing table. -func (r *staticRaftRoutingTable) AddMember(key RoutingKey, address string) error { - if _, ok := r.members.Load(key); !ok { - r.members.Store(key, address) - } else { - return fmt.Errorf("node ID %d already exists in routing table", key.nodeID) +// GetEntry retrieves a routing table entry +func (r *kvRoutingTable) GetEntry(partitionKey *gitalypb.PartitionKey) (*RoutingTableEntry, error) { + key := routingKey(partitionKey) + + var entry RoutingTableEntry + if err := r.kvStore.View(func(txn keyvalue.ReadWriter) error { + item, err := txn.Get(key) + if err != nil { + return err + } + + return item.Value(func(value []byte) error { + return json.Unmarshal(value, &entry) + }) + }); err != nil { + return nil, fmt.Errorf("view: %w", err) } - return nil + + return &entry, nil } -// Translate converts a node ID to its network address. -func (r *staticRaftRoutingTable) Translate(key RoutingKey) (string, error) { - if addr, ok := r.members.Load(key); ok { - return addr.(string), nil +// Translate returns the storage name and address for a given partition key and node ID +func (r *kvRoutingTable) Translate(partitionKey *gitalypb.PartitionKey, nodeID uint64) (*gitalypb.ReplicaID, error) { + entry, err := r.GetEntry(partitionKey) + if err != nil { + return nil, fmt.Errorf("get entry: %w", err) } - return "", fmt.Errorf("no address found for nodeID %d", key.nodeID) + + // Look for the node in replicas + for _, replica := range entry.Replicas { + if replica.GetNodeId() == nodeID { + return replica, nil + } + } + + return nil, fmt.Errorf("no address found for nodeID %d", nodeID) } diff --git a/internal/gitaly/storage/raftmgr/routing_test.go b/internal/gitaly/storage/raftmgr/routing_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4c10e806cc722e0ab838d7adb532b04f560e31dc --- /dev/null +++ b/internal/gitaly/storage/raftmgr/routing_test.go @@ -0,0 +1,97 @@ +package raftmgr + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +func TestPersistentRoutingTable(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + kvStore, err := keyvalue.NewBadgerStore(testhelper.NewLogger(t), dir) + require.NoError(t, err) + defer func() { + require.NoError(t, kvStore.Close()) + }() + + rt := NewKVRoutingTable(kvStore) + + t.Run("add and translate member", func(t *testing.T) { + nodeID := 1 + address := "localhost:1234" + partitionKey := &gitalypb.PartitionKey{ + AuthorityName: "test-authority", + PartitionId: 1, + } + + entry := RoutingTableEntry{ + Replicas: []*gitalypb.ReplicaID{ + { + PartitionKey: partitionKey, + NodeId: uint64(nodeID), + StorageName: "test-storage", + Metadata: &gitalypb.ReplicaID_Metadata{ + Address: address, + }, + }, + }, + Term: 1, + Index: 1, + } + + err := rt.UpsertEntry(entry) + require.NoError(t, err) + + replica, err := rt.Translate(partitionKey, uint64(nodeID)) + require.NoError(t, err) + require.Equal(t, address, replica.GetMetadata().GetAddress()) + require.Equal(t, "test-storage", replica.GetStorageName()) + }) + + t.Run("stale entry rejected", func(t *testing.T) { + key := &gitalypb.PartitionKey{ + AuthorityName: "test-authority", + PartitionId: 2, + } + + entry1 := RoutingTableEntry{ + Replicas: []*gitalypb.ReplicaID{ + { + PartitionKey: key, + NodeId: 1, + Metadata: &gitalypb.ReplicaID_Metadata{ + Address: "addr1", + }, + }, + }, + Term: 2, + Index: 3, + } + + require.NoError(t, rt.UpsertEntry(entry1)) + + entry2 := entry1 + entry2.Term = 1 // Lower term + err := rt.UpsertEntry(entry2) + require.Error(t, err) + require.Contains(t, err.Error(), "stale entry") + }) + + t.Run("node not found", func(t *testing.T) { + partitionKey := &gitalypb.PartitionKey{ + AuthorityName: "test-authority", + PartitionId: 3, + } + + nodeID := 999 // Non-existent node + + _, err := rt.Translate(partitionKey, uint64(nodeID)) + require.Error(t, err) + require.Contains(t, err.Error(), "Key not found") + }) +} diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index 6c24a02451165d641eda834dc438b7de29a371f8..6231d9f5603bf02c13eba9ee1e5cf3ec61e92dd5 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -22,8 +22,6 @@ func TestMain(m *testing.M) { type mockRaftManager struct { RaftManager - logger logger.LogrusLogger - transport Transport logManager storage.LogManager } @@ -32,10 +30,6 @@ func (m *mockRaftManager) GetEntryPath(lsn storage.LSN) string { return m.logManager.GetEntryPath(lsn) } -func (m *mockRaftManager) GetLogReader() storage.LogReader { - return m.logManager -} - // Step is a mock implementation of the raft.Node.Step method. func (m *mockRaftManager) Step(ctx context.Context, msg raftpb.Message) error { return nil diff --git a/proto/cluster.proto b/proto/cluster.proto index 1a6289edb04f6b534757767d7070ba424df4a8b4..4bc271950ed18ec91ac2b0aca6f25d84e2eb1fea 100644 --- a/proto/cluster.proto +++ b/proto/cluster.proto @@ -36,6 +36,37 @@ message RaftEntry { LogData data = 2; } + +// PartitionKey identifies which partition this replica belongs to. +message PartitionKey { + // authority_name represents the storage that created the partition. + string authority_name = 1 [(gitaly.storage) = true]; + // partition_id is the local incrementing ID of a specific partition within a + // storage. Together with `authority_name`, this forms a unique identifier for + // a partition across the cluster. A partition belongs to a Raft group. + uint64 partition_id = 2; +} + + + +// ReplicaID uniquely identifies a replica in the Raft cluster. +// It combines partition information with node-specific details. +message ReplicaID { + // partition_key identifies which partition this replica belongs to. + PartitionKey partition_key = 1; + // node_id is the unique identifier assigned by etcd/raft. + uint64 node_id = 2; + // storage_name is the name of the storage where this replica is hosted. + string storage_name = 3; + // Metadata contains routing information for the replica. + message Metadata { + // address is the network address of the replica. + string address = 1; + } + // metadata contains replica information. + Metadata metadata = 4; +} + // RaftMessageRequest is a request for the SendMessage RPC. It serves as a // wrapper for raftpb.Message. etcd/raft's state machines on each member emit // this message. Since Gitaly employs multi-raft, routing metadata is attached @@ -45,16 +76,11 @@ message RaftMessageRequest { // cluster_id is the identifier of the Raft cluster to which this message belongs. string cluster_id = 1; - // authority_name is the storage name of the authority that creates a partition. - string authority_name = 2 [(gitaly.storage) = true]; - - // partition_id is the local incrementing ID of a specific partition within a - // storage. Together with `authority_name`, this forms a unique identifier for - // a partition across the cluster. A partition belongs to a Raft group. - uint64 partition_id = 3; + // replica_id uniquely identifies a replica in the Raft cluster. + ReplicaID replica_id = 2; // message is the Raft message to be delivered. - raftpb.Message message = 4; + raftpb.Message message = 3; } // RaftMessageResponse represents a response to the SendMessage RPC. diff --git a/proto/go/gitalypb/cluster.pb.go b/proto/go/gitalypb/cluster.pb.go index 5d93f2697e0fe36996639be46629fa41eeee650e..a6a73fafaccc193e6e67a4886a33c53c8ac3ce10 100644 --- a/proto/go/gitalypb/cluster.pb.go +++ b/proto/go/gitalypb/cluster.pb.go @@ -81,6 +81,137 @@ func (x *RaftEntry) GetData() *RaftEntry_LogData { return nil } +// PartitionKey identifies which partition this replica belongs to. +type PartitionKey struct { + state protoimpl.MessageState `protogen:"open.v1"` + // authority_name represents the storage that created the partition. + AuthorityName string `protobuf:"bytes,1,opt,name=authority_name,json=authorityName,proto3" json:"authority_name,omitempty"` + // partition_id is the local incrementing ID of a specific partition within a + // storage. Together with `authority_name`, this forms a unique identifier for + // a partition across the cluster. A partition belongs to a Raft group. + PartitionId uint64 `protobuf:"varint,2,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PartitionKey) Reset() { + *x = PartitionKey{} + mi := &file_cluster_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PartitionKey) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PartitionKey) ProtoMessage() {} + +func (x *PartitionKey) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PartitionKey.ProtoReflect.Descriptor instead. +func (*PartitionKey) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{1} +} + +func (x *PartitionKey) GetAuthorityName() string { + if x != nil { + return x.AuthorityName + } + return "" +} + +func (x *PartitionKey) GetPartitionId() uint64 { + if x != nil { + return x.PartitionId + } + return 0 +} + +// ReplicaID uniquely identifies a replica in the Raft cluster. +// It combines partition information with node-specific details. +type ReplicaID struct { + state protoimpl.MessageState `protogen:"open.v1"` + // partition_key identifies which partition this replica belongs to. + PartitionKey *PartitionKey `protobuf:"bytes,1,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"` + // node_id is the unique identifier assigned by etcd/raft. + NodeId uint64 `protobuf:"varint,2,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"` + // storage_name is the name of the storage where this replica is hosted. + StorageName string `protobuf:"bytes,3,opt,name=storage_name,json=storageName,proto3" json:"storage_name,omitempty"` + // metadata contains replica information. + Metadata *ReplicaID_Metadata `protobuf:"bytes,4,opt,name=metadata,proto3" json:"metadata,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReplicaID) Reset() { + *x = ReplicaID{} + mi := &file_cluster_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReplicaID) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReplicaID) ProtoMessage() {} + +func (x *ReplicaID) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReplicaID.ProtoReflect.Descriptor instead. +func (*ReplicaID) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{2} +} + +func (x *ReplicaID) GetPartitionKey() *PartitionKey { + if x != nil { + return x.PartitionKey + } + return nil +} + +func (x *ReplicaID) GetNodeId() uint64 { + if x != nil { + return x.NodeId + } + return 0 +} + +func (x *ReplicaID) GetStorageName() string { + if x != nil { + return x.StorageName + } + return "" +} + +func (x *ReplicaID) GetMetadata() *ReplicaID_Metadata { + if x != nil { + return x.Metadata + } + return nil +} + // RaftMessageRequest is a request for the SendMessage RPC. It serves as a // wrapper for raftpb.Message. etcd/raft's state machines on each member emit // this message. Since Gitaly employs multi-raft, routing metadata is attached @@ -90,21 +221,17 @@ type RaftMessageRequest struct { state protoimpl.MessageState `protogen:"open.v1"` // cluster_id is the identifier of the Raft cluster to which this message belongs. ClusterId string `protobuf:"bytes,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"` - // authority_name is the storage name of the authority that creates a partition. - AuthorityName string `protobuf:"bytes,2,opt,name=authority_name,json=authorityName,proto3" json:"authority_name,omitempty"` - // partition_id is the local incrementing ID of a specific partition within a - // storage. Together with `authority_name`, this forms a unique identifier for - // a partition across the cluster. A partition belongs to a Raft group. - PartitionId uint64 `protobuf:"varint,3,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` + // replica_id uniquely identifies a replica in the Raft cluster. + ReplicaId *ReplicaID `protobuf:"bytes,2,opt,name=replica_id,json=replicaId,proto3" json:"replica_id,omitempty"` // message is the Raft message to be delivered. - Message *raftpb.Message `protobuf:"bytes,4,opt,name=message,proto3" json:"message,omitempty"` + Message *raftpb.Message `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *RaftMessageRequest) Reset() { *x = RaftMessageRequest{} - mi := &file_cluster_proto_msgTypes[1] + mi := &file_cluster_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -116,7 +243,7 @@ func (x *RaftMessageRequest) String() string { func (*RaftMessageRequest) ProtoMessage() {} func (x *RaftMessageRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[1] + mi := &file_cluster_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -129,7 +256,7 @@ func (x *RaftMessageRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RaftMessageRequest.ProtoReflect.Descriptor instead. func (*RaftMessageRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{1} + return file_cluster_proto_rawDescGZIP(), []int{3} } func (x *RaftMessageRequest) GetClusterId() string { @@ -139,18 +266,11 @@ func (x *RaftMessageRequest) GetClusterId() string { return "" } -func (x *RaftMessageRequest) GetAuthorityName() string { +func (x *RaftMessageRequest) GetReplicaId() *ReplicaID { if x != nil { - return x.AuthorityName + return x.ReplicaId } - return "" -} - -func (x *RaftMessageRequest) GetPartitionId() uint64 { - if x != nil { - return x.PartitionId - } - return 0 + return nil } func (x *RaftMessageRequest) GetMessage() *raftpb.Message { @@ -169,7 +289,7 @@ type RaftMessageResponse struct { func (x *RaftMessageResponse) Reset() { *x = RaftMessageResponse{} - mi := &file_cluster_proto_msgTypes[2] + mi := &file_cluster_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -181,7 +301,7 @@ func (x *RaftMessageResponse) String() string { func (*RaftMessageResponse) ProtoMessage() {} func (x *RaftMessageResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[2] + mi := &file_cluster_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -194,7 +314,7 @@ func (x *RaftMessageResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RaftMessageResponse.ProtoReflect.Descriptor instead. func (*RaftMessageResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{2} + return file_cluster_proto_rawDescGZIP(), []int{4} } // LogData contains serialized log data, including the log entry itself @@ -218,7 +338,7 @@ type RaftEntry_LogData struct { func (x *RaftEntry_LogData) Reset() { *x = RaftEntry_LogData{} - mi := &file_cluster_proto_msgTypes[3] + mi := &file_cluster_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -230,7 +350,7 @@ func (x *RaftEntry_LogData) String() string { func (*RaftEntry_LogData) ProtoMessage() {} func (x *RaftEntry_LogData) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[3] + mi := &file_cluster_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -260,6 +380,52 @@ func (x *RaftEntry_LogData) GetPacked() []byte { return nil } +// Metadata contains routing information for the replica. +type ReplicaID_Metadata struct { + state protoimpl.MessageState `protogen:"open.v1"` + // address is the network address of the replica. + Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReplicaID_Metadata) Reset() { + *x = ReplicaID_Metadata{} + mi := &file_cluster_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReplicaID_Metadata) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReplicaID_Metadata) ProtoMessage() {} + +func (x *ReplicaID_Metadata) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReplicaID_Metadata.ProtoReflect.Descriptor instead. +func (*ReplicaID_Metadata) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{2, 0} +} + +func (x *ReplicaID_Metadata) GetAddress() string { + if x != nil { + return x.Address + } + return "" +} + var File_cluster_proto protoreflect.FileDescriptor var file_cluster_proto_rawDesc = string([]byte{ @@ -275,29 +441,48 @@ var file_cluster_proto_rawDesc = string([]byte{ 0x0a, 0x0a, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x70, - 0x61, 0x63, 0x6b, 0x65, 0x64, 0x22, 0xae, 0x01, 0x0a, 0x12, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, - 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x2b, 0x0a, 0x0e, 0x61, - 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x42, 0x04, 0x88, 0xc6, 0x2c, 0x01, 0x52, 0x0d, 0x61, 0x75, 0x74, 0x68, 0x6f, - 0x72, 0x69, 0x74, 0x79, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, - 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x72, - 0x61, 0x66, 0x74, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x61, 0x0a, - 0x0b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x52, 0x0a, 0x0b, - 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, - 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, - 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x63, 0x6b, 0x65, 0x64, 0x22, 0x5e, 0x0a, 0x0c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x2b, 0x0a, 0x0e, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, + 0x74, 0x79, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x04, 0x88, + 0xc6, 0x2c, 0x01, 0x52, 0x0d, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x4e, 0x61, + 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, + 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0xe0, 0x01, 0x0a, 0x09, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x49, 0x44, 0x12, 0x39, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x69, 0x74, + 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, + 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x17, + 0x0a, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, + 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, + 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x2e, + 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x1a, 0x24, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x18, + 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x90, 0x01, 0x0a, 0x12, 0x52, 0x61, 0x66, + 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x30, + 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, + 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, + 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x0f, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x52, + 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x32, 0x61, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x12, 0x52, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, + 0x01, 0x10, 0x02, 0x28, 0x01, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, + 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, }) var ( @@ -312,24 +497,30 @@ func file_cluster_proto_rawDescGZIP() []byte { return file_cluster_proto_rawDescData } -var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_cluster_proto_goTypes = []any{ (*RaftEntry)(nil), // 0: gitaly.RaftEntry - (*RaftMessageRequest)(nil), // 1: gitaly.RaftMessageRequest - (*RaftMessageResponse)(nil), // 2: gitaly.RaftMessageResponse - (*RaftEntry_LogData)(nil), // 3: gitaly.RaftEntry.LogData - (*raftpb.Message)(nil), // 4: raftpb.Message + (*PartitionKey)(nil), // 1: gitaly.PartitionKey + (*ReplicaID)(nil), // 2: gitaly.ReplicaID + (*RaftMessageRequest)(nil), // 3: gitaly.RaftMessageRequest + (*RaftMessageResponse)(nil), // 4: gitaly.RaftMessageResponse + (*RaftEntry_LogData)(nil), // 5: gitaly.RaftEntry.LogData + (*ReplicaID_Metadata)(nil), // 6: gitaly.ReplicaID.Metadata + (*raftpb.Message)(nil), // 7: raftpb.Message } var file_cluster_proto_depIdxs = []int32{ - 3, // 0: gitaly.RaftEntry.data:type_name -> gitaly.RaftEntry.LogData - 4, // 1: gitaly.RaftMessageRequest.message:type_name -> raftpb.Message - 1, // 2: gitaly.RaftService.SendMessage:input_type -> gitaly.RaftMessageRequest - 2, // 3: gitaly.RaftService.SendMessage:output_type -> gitaly.RaftMessageResponse - 3, // [3:4] is the sub-list for method output_type - 2, // [2:3] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 5, // 0: gitaly.RaftEntry.data:type_name -> gitaly.RaftEntry.LogData + 1, // 1: gitaly.ReplicaID.partition_key:type_name -> gitaly.PartitionKey + 6, // 2: gitaly.ReplicaID.metadata:type_name -> gitaly.ReplicaID.Metadata + 2, // 3: gitaly.RaftMessageRequest.replica_id:type_name -> gitaly.ReplicaID + 7, // 4: gitaly.RaftMessageRequest.message:type_name -> raftpb.Message + 3, // 5: gitaly.RaftService.SendMessage:input_type -> gitaly.RaftMessageRequest + 4, // 6: gitaly.RaftService.SendMessage:output_type -> gitaly.RaftMessageResponse + 6, // [6:7] is the sub-list for method output_type + 5, // [5:6] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_cluster_proto_init() } @@ -344,7 +535,7 @@ func file_cluster_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_cluster_proto_rawDesc), len(file_cluster_proto_rawDesc)), NumEnums: 0, - NumMessages: 4, + NumMessages: 7, NumExtensions: 0, NumServices: 1, },